import os.path
import time
import re
-import platform
import logging
import copy
import OpenSSL
from ganeti import locking
from ganeti import constants
from ganeti import objects
-from ganeti import serializer
from ganeti import ssconf
from ganeti import uidpool
from ganeti import compat
from ganeti import opcodes
from ganeti import ht
from ganeti import rpc
+from ganeti import runtime
+from ganeti.masterd import iallocator
import ganeti.masterd.instance # pylint: disable=W0611
-#: Size of DRBD meta block device
-DRBD_META_SIZE = 128
-
# States of instance
INSTANCE_DOWN = [constants.ADMINST_DOWN]
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
"""Data container for LU results with jobs.
Instances of this class returned from L{LogicalUnit.Exec} will be recognized
- by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+ by L{mcpu._ProcessResult}. The latter will then submit the jobs
contained in the C{jobs} attribute and include the job IDs in the opcode
result.
as values. Rules:
- use an empty dict if you don't need any lock
- - if you don't need any lock at a particular level omit that level
+ - if you don't need any lock at a particular level omit that
+ level (note that in this case C{DeclareLocks} won't be called
+ at all for that level)
+ - if you need locks at a level, but you can't calculate it in
+ this function, initialise that level with an empty list and do
+ further processing in L{LogicalUnit.DeclareLocks} (see that
+ function's docstring)
- don't put anything for the BGL level
- - if you want all locks at a level use locking.ALL_SET as a value
+ - if you want all locks at a level use L{locking.ALL_SET} as a value
If you need to share locks (rather than acquire them exclusively) at one
level you can modify self.share_locks, setting a true value (usually 1) for
self.needed_locks for the level.
@param level: Locking level which is going to be locked
- @type level: member of ganeti.locking.LEVELS
+ @type level: member of L{ganeti.locking.LEVELS}
"""
#: Attribute holding field definitions
FIELDS = None
+ #: Field to sort by
+ SORT_FIELD = "name"
+
def __init__(self, qfilter, fields, use_locking):
"""Initializes this class.
self.use_locking = use_locking
self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
- namefield="name")
+ namefield=self.SORT_FIELD)
self.requested_data = self.query.RequestedData()
self.names = self.query.RequestedNames()
return dict.fromkeys(locking.LEVELS, 1)
-def _MakeLegacyNodeInfo(data):
- """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
+def _AnnotateDiskParams(instance, devs, cfg):
+ """Little helper wrapper to the rpc annotation method.
+
+ @param instance: The instance object
+ @type devs: List of L{objects.Disk}
+ @param devs: The root devices (not any of its children!)
+ @param cfg: The config object
+ @returns The annotated disk copies
+ @see L{rpc.AnnotateDiskParams}
+
+ """
+ return rpc.AnnotateDiskParams(instance.disk_template, devs,
+ cfg.GetInstanceDiskParams(instance))
+
- Converts the data into a single dictionary. This is fine for most use cases,
- but some require information from more than one volume group or hypervisor.
+def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
+ cur_group_uuid):
+ """Checks if node groups for locked instances are still correct.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: Cluster configuration
+ @type instances: dict; string as key, L{objects.Instance} as value
+ @param instances: Dictionary, instance name as key, instance object as value
+ @type owned_groups: iterable of string
+ @param owned_groups: List of owned groups
+ @type owned_nodes: iterable of string
+ @param owned_nodes: List of owned nodes
+ @type cur_group_uuid: string or None
+ @param cur_group_uuid: Optional group UUID to check against instance's groups
"""
- (bootid, (vg_info, ), (hv_info, )) = data
+ for (name, inst) in instances.items():
+ assert owned_nodes.issuperset(inst.all_nodes), \
+ "Instance %s's nodes changed while we kept the lock" % name
- return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
- "bootid": bootid,
- })
+ inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
+ assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
+ "Instance %s has no node in group %s" % (name, cur_group_uuid)
-def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
+
+def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
+ primary_only=False):
"""Checks if the owned node groups are still correct for an instance.
@type cfg: L{config.ConfigWriter}
@param instance_name: Instance name
@type owned_groups: set or frozenset
@param owned_groups: List of currently owned node groups
+ @type primary_only: boolean
+ @param primary_only: Whether to check node groups for only the primary node
"""
- inst_groups = cfg.GetInstanceNodeGroups(instance_name)
+ inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
if not owned_groups.issuperset(inst_groups):
raise errors.OpPrereqError("Instance %s's node groups changed since"
use_none=use_none,
use_default=use_default)
else:
- if not value or value == [constants.VALUE_DEFAULT]:
+ if (not value or value == [constants.VALUE_DEFAULT] or
+ value == constants.VALUE_DEFAULT):
if group_policy:
del ipolicy[key]
else:
# in a nicer way
ipolicy[key] = list(value)
try:
- objects.InstancePolicy.CheckParameterSyntax(ipolicy)
+ objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
except errors.ConfigurationError, err:
raise errors.OpPrereqError("Invalid instance policy: %s" % err,
errors.ECODE_INVAL)
hm = lu.proc.BuildHooksManager(lu)
try:
hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
- except:
- # pylint: disable=W0702
- lu.LogWarning("Errors occurred running hooks on %s" % node_name)
+ except Exception, err: # pylint: disable=W0703
+ lu.LogWarning("Errors occurred running hooks on %s: %s" % (node_name, err))
def _CheckOutputFields(static, dynamic, selected):
if constants.ADMINST_UP not in req_states:
pnode = instance.primary_node
- ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
- ins_l.Raise("Can't contact node %s for instance information" % pnode,
- prereq=True, ecode=errors.ECODE_ENVIRON)
-
- if instance.name in ins_l.payload:
- raise errors.OpPrereqError("Instance %s is running, %s" %
- (instance.name, msg), errors.ECODE_STATE)
+ if not lu.cfg.GetNodeInfo(pnode).offline:
+ ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
+ ins_l.Raise("Can't contact node %s for instance information" % pnode,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ if instance.name in ins_l.payload:
+ raise errors.OpPrereqError("Instance %s is running, %s" %
+ (instance.name, msg), errors.ECODE_STATE)
+ else:
+ lu.LogWarning("Primary node offline, ignoring check that instance"
+ " is down")
-def _ComputeMinMaxSpec(name, ipolicy, value):
+def _ComputeMinMaxSpec(name, qualifier, ipolicy, value):
"""Computes if value is in the desired range.
@param name: name of the parameter for which we perform the check
+ @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
+ not just 'disk')
@param ipolicy: dictionary containing min, max and std values
@param value: actual value that we want to use
@return: None or element not meeting the criteria
max_v = ipolicy[constants.ISPECS_MAX].get(name, value)
min_v = ipolicy[constants.ISPECS_MIN].get(name, value)
if value > max_v or min_v > value:
+ if qualifier:
+ fqn = "%s/%s" % (name, qualifier)
+ else:
+ fqn = name
return ("%s value %s is not in range [%s, %s]" %
- (name, value, min_v, max_v))
+ (fqn, value, min_v, max_v))
return None
def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
- nic_count, disk_sizes,
+ nic_count, disk_sizes, spindle_use,
_compute_fn=_ComputeMinMaxSpec):
"""Verifies ipolicy against provided specs.
@param nic_count: Number of nics used
@type disk_sizes: list of ints
@param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
+ @type spindle_use: int
+ @param spindle_use: The number of spindles this instance uses
@param _compute_fn: The compute function (unittest only)
@return: A list of violations, or an empty list of no violations are found
assert disk_count == len(disk_sizes)
test_settings = [
- (constants.ISPEC_MEM_SIZE, mem_size),
- (constants.ISPEC_CPU_COUNT, cpu_count),
- (constants.ISPEC_DISK_COUNT, disk_count),
- (constants.ISPEC_NIC_COUNT, nic_count),
- ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes)
+ (constants.ISPEC_MEM_SIZE, "", mem_size),
+ (constants.ISPEC_CPU_COUNT, "", cpu_count),
+ (constants.ISPEC_DISK_COUNT, "", disk_count),
+ (constants.ISPEC_NIC_COUNT, "", nic_count),
+ (constants.ISPEC_SPINDLE_USE, "", spindle_use),
+ ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
+ for idx, d in enumerate(disk_sizes)]
return filter(None,
- (_compute_fn(name, ipolicy, value)
- for (name, value) in test_settings))
+ (_compute_fn(name, qualifier, ipolicy, value)
+ for (name, qualifier, value) in test_settings))
def _ComputeIPolicyInstanceViolation(ipolicy, instance,
"""
mem_size = instance.beparams.get(constants.BE_MAXMEM, None)
cpu_count = instance.beparams.get(constants.BE_VCPUS, None)
+ spindle_use = instance.beparams.get(constants.BE_SPINDLE_USE, None)
disk_count = len(instance.disks)
disk_sizes = [disk.size for disk in instance.disks]
nic_count = len(instance.nics)
return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
- disk_sizes)
+ disk_sizes, spindle_use)
-def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec,
- _compute_fn=_ComputeIPolicySpecViolation):
+def _ComputeIPolicyInstanceSpecViolation(
+ ipolicy, instance_spec, _compute_fn=_ComputeIPolicySpecViolation):
"""Compute if instance specs meets the specs of ipolicy.
@type ipolicy: dict
disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
+ spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
- disk_sizes)
+ disk_sizes, spindle_use)
def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
@param old_ipolicy: The current (still in-place) ipolicy
@param new_ipolicy: The new (to become) ipolicy
@param instances: List of instances to verify
- @return: A list of instances which violates the new ipolicy but did not before
+ @return: A list of instances which violates the new ipolicy but
+ did not before
"""
- return (_ComputeViolatingInstances(old_ipolicy, instances) -
- _ComputeViolatingInstances(new_ipolicy, instances))
+ return (_ComputeViolatingInstances(new_ipolicy, instances) -
+ _ComputeViolatingInstances(old_ipolicy, instances))
def _ExpandItemName(fn, name, kind):
return mc_now < mc_should
-def _CalculateGroupIPolicy(cluster, group):
- """Calculate instance policy for group.
-
- """
- return cluster.SimpleFillIPolicy(group.ipolicy)
-
-
def _ComputeViolatingInstances(ipolicy, instances):
"""Computes a set of instances who violates given ipolicy.
for dev in instance.disks:
cfg.SetDiskID(dev, node_name)
- result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
+ result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
+ instance))
result.Raise("Failed to get disk status from node %s" % node_name,
prereq=prereq, ecode=errors.ECODE_ENVIRON)
"""
node = getattr(lu.op, node_slot, None)
- iallocator = getattr(lu.op, iallocator_slot, None)
+ ialloc = getattr(lu.op, iallocator_slot, None)
- if node is not None and iallocator is not None:
+ if node is not None and ialloc is not None:
raise errors.OpPrereqError("Do not specify both, iallocator and node",
errors.ECODE_INVAL)
- elif node is None and iallocator is None:
+ elif node is None and ialloc is None:
default_iallocator = lu.cfg.GetDefaultIAllocator()
if default_iallocator:
setattr(lu.op, iallocator_slot, default_iallocator)
" cluster-wide default iallocator found;"
" please specify either an iallocator or a"
" node, or set a cluster-wide default"
- " iallocator")
+ " iallocator", errors.ECODE_INVAL)
-def _GetDefaultIAllocator(cfg, iallocator):
+def _GetDefaultIAllocator(cfg, ialloc):
"""Decides on which iallocator to use.
@type cfg: L{config.ConfigWriter}
@param cfg: Cluster configuration object
- @type iallocator: string or None
- @param iallocator: Iallocator specified in opcode
+ @type ialloc: string or None
+ @param ialloc: Iallocator specified in opcode
@rtype: string
@return: Iallocator name
"""
- if not iallocator:
+ if not ialloc:
# Use default iallocator
- iallocator = cfg.GetDefaultIAllocator()
+ ialloc = cfg.GetDefaultIAllocator()
- if not iallocator:
+ if not ialloc:
raise errors.OpPrereqError("No iallocator was specified, neither in the"
" opcode nor as a cluster-wide default",
errors.ECODE_INVAL)
- return iallocator
+ return ialloc
class LUClusterPostInit(LogicalUnit):
# Always depend on global verification
depends_fn = lambda: [(-len(jobs), [])]
- jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
- ignore_errors=self.op.ignore_errors,
- depends=depends_fn())]
- for group in groups)
+ jobs.extend(
+ [opcodes.OpClusterVerifyGroup(group_name=group,
+ ignore_errors=self.op.ignore_errors,
+ depends=depends_fn())]
+ for group in groups)
# Fix up all parameters
for op in itertools.chain(*jobs): # pylint: disable=W0142
"""Verifies the cluster config.
"""
- REQ_BGL = True
+ REQ_BGL = False
def _VerifyHVP(self, hvp_data):
"""Verifies locally the syntax of the hypervisor parameters.
self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
def ExpandNames(self):
- # Information can be safely retrieved as the BGL is acquired in exclusive
- # mode
- assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
+ self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
+ self.share_locks = _ShareAll()
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ # Retrieve all information
self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
self.all_node_info = self.cfg.GetAllNodesInfo()
self.all_inst_info = self.cfg.GetAllInstancesInfo()
- self.needed_locks = {}
def Exec(self, feedback_fn):
"""Verify integrity of cluster, performing various test on nodes.
self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
# Get instances in node group; this is unsafe and needs verification later
- inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
+ inst_names = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
self.needed_locks = {
locking.LEVEL_INSTANCE: inst_names,
self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
group_nodes = set(self.group_info.members)
- group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
+ group_instances = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
unlocked_nodes = \
group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
if unlocked_nodes:
raise errors.OpPrereqError("Missing lock for nodes: %s" %
- utils.CommaJoin(unlocked_nodes))
+ utils.CommaJoin(unlocked_nodes),
+ errors.ECODE_STATE)
if unlocked_instances:
raise errors.OpPrereqError("Missing lock for instances: %s" %
- utils.CommaJoin(unlocked_instances))
+ utils.CommaJoin(unlocked_instances),
+ errors.ECODE_STATE)
self.all_node_info = self.cfg.GetAllNodesInfo()
self.all_inst_info = self.cfg.GetAllInstancesInfo()
for inst in self.my_inst_info.values():
if inst.disk_template in constants.DTS_INT_MIRROR:
- group = self.my_node_info[inst.primary_node].group
- for nname in inst.secondary_nodes:
- if self.all_node_info[nname].group != group:
+ for nname in inst.all_nodes:
+ if self.all_node_info[nname].group != self.group_uuid:
extra_lv_nodes.add(nname)
unlocked_lv_nodes = \
extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
if unlocked_lv_nodes:
- raise errors.OpPrereqError("these nodes could be locked: %s" %
- utils.CommaJoin(unlocked_lv_nodes))
+ raise errors.OpPrereqError("Missing node locks for LV check: %s" %
+ utils.CommaJoin(unlocked_lv_nodes),
+ errors.ECODE_STATE)
self.extra_lv_nodes = list(extra_lv_nodes)
def _VerifyNode(self, ninfo, nresult):
node_vol_should = {}
instanceconfig.MapLVsByNode(node_vol_should)
- ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info)
+ cluster = self.cfg.GetClusterInfo()
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ self.group_info)
err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
- _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err)
+ _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err))
for node in node_vol_should:
n_img = node_image[node]
"""
for node, n_img in node_image.items():
- if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
+ if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
+ self.all_node_info[node].group != self.group_uuid):
# skip non-healthy nodes
continue
for volume in n_img.volumes:
# WARNING: we currently take into account down instances as well
# as up ones, considering that even if they're down someone
# might want to start them even in the event of a node failure.
- if n_img.offline:
- # we're skipping offline nodes from the N+1 warning, since
- # most likely we don't have good memory infromation from them;
- # we already list instances living on such nodes, and that's
- # enough warning
+ if n_img.offline or self.all_node_info[node].group != self.group_uuid:
+ # we're skipping nodes marked offline and nodes in other groups from
+ # the N+1 warning, since most likely we don't have good memory
+ # infromation from them; we already list instances living on such
+ # nodes, and that's enough warning
continue
#TODO(dynmem): also consider ballooning out other instances
for prinode, instances in n_img.sbp.items():
if drbd_helper:
helper_result = nresult.get(constants.NV_DRBDHELPER, None)
- test = (helper_result == None)
+ test = (helper_result is None)
_ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
"no drbd usermode helper returned")
if helper_result:
node_disks[nname] = disks
- # Creating copies as SetDiskID below will modify the objects and that can
- # lead to incorrect data returned from nodes
- devonly = [dev.Copy() for (_, dev) in disks]
-
- for dev in devonly:
- self.cfg.SetDiskID(dev, nname)
+ # _AnnotateDiskParams makes already copies of the disks
+ devonly = []
+ for (inst, dev) in disks:
+ (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
+ self.cfg.SetDiskID(anno_disk, nname)
+ devonly.append(anno_disk)
node_disks_devonly[nname] = devonly
for instance in self.my_inst_names:
inst_config = self.my_inst_info[instance]
+ if inst_config.admin_state == constants.ADMINST_OFFLINE:
+ i_offline += 1
for nname in inst_config.all_nodes:
if nname not in node_image:
if master_node not in self.my_node_info:
additional_nodes.append(master_node)
vf_node_info.append(self.all_node_info[master_node])
- # Add the first vm_capable node we find which is not included
+ # Add the first vm_capable node we find which is not included,
+ # excluding the master node (which we already have)
for node in absent_nodes:
nodeinfo = self.all_node_info[node]
- if nodeinfo.vm_capable and not nodeinfo.offline:
+ if (nodeinfo.vm_capable and not nodeinfo.offline and
+ node != master_node):
additional_nodes.append(node)
vf_node_info.append(self.all_node_info[node])
break
non_primary_inst = set(nimg.instances).difference(nimg.pinst)
for inst in non_primary_inst:
- # FIXME: investigate best way to handle offline insts
- if inst.admin_state == constants.ADMINST_OFFLINE:
- if verbose:
- feedback_fn("* Skipping offline instance %s" % inst.name)
- i_offline += 1
- continue
test = inst in self.all_inst_info
_ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
"instance should not run on node %s", node_i.name)
self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
- for (instance_name, inst) in self.instances.items():
- assert owned_nodes.issuperset(inst.all_nodes), \
- "Instance %s's nodes changed while we kept the lock" % instance_name
-
- inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
- owned_groups)
-
- assert self.group_uuid in inst_groups, \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ _CheckInstancesNodeGroups(self.cfg, self.instances,
+ owned_groups, owned_nodes, self.group_uuid)
def Exec(self, feedback_fn):
"""Verify integrity of cluster disks.
res_instances = set()
res_missing = {}
- nv_dict = _MapInstanceDisksToNodes([inst
- for inst in self.instances.values()
- if inst.admin_state == constants.ADMINST_UP])
+ nv_dict = _MapInstanceDisksToNodes(
+ [inst for inst in self.instances.values()
+ if inst.admin_state == constants.ADMINST_UP])
if nv_dict:
nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
except errors.ProgrammerError:
raise errors.OpPrereqError("Invalid primary ip family: %s." %
- ip_family)
+ ip_family, errors.ECODE_INVAL)
if not ipcls.ValidateNetmask(netmask):
raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
- (netmask))
+ (netmask), errors.ECODE_INVAL)
class LUClusterSetParams(LogicalUnit):
if self.op.diskparams:
for dt_params in self.op.diskparams.values():
utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
+ try:
+ utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
if compat.any(node in group.members
for node in inst.all_nodes)])
new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
- new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
- group),
+ ipol = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group)
+ new = _ComputeNewInstanceViolations(ipol,
new_ipolicy, instances)
if new:
violations.update(new)
if violations:
self.LogWarning("After the ipolicy change the following instances"
" violate them: %s",
- utils.CommaJoin(violations))
+ utils.CommaJoin(utils.NiceSort(violations)))
if self.op.nicparams:
utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
" address" % (instance.name, nic_idx))
if nic_errors:
raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
- "\n".join(nic_errors))
+ "\n".join(nic_errors), errors.ECODE_INVAL)
# hypervisor list/parameters
self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
if cluster.modify_etc_hosts:
files_all.add(constants.ETC_HOSTS)
+ if cluster.use_external_mip_script:
+ files_all.add(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+
# Files which are optional, these must:
# - be present in one other category as well
# - either exist or not exist on all nodes of that category (mc, vm all)
if not redist:
files_mc.add(constants.CLUSTER_CONF_FILE)
- # FIXME: this should also be replicated but Ganeti doesn't support files_mc
- # replication
- files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
-
# Files which should only be on VM-capable nodes
- files_vm = set(filename
+ files_vm = set(
+ filename
for hv_name in cluster.enabled_hypervisors
for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
- files_opt |= set(filename
+ files_opt |= set(
+ filename
for hv_name in cluster.enabled_hypervisors
for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
online_nodes = lu.cfg.GetOnlineNodeList()
- vm_nodes = lu.cfg.GetVmCapableNodeList()
+ online_set = frozenset(online_nodes)
+ vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
if additional_nodes is not None:
online_nodes.extend(additional_nodes)
max_time = 0
done = True
cumul_degraded = False
- rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
+ rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
msg = rstats.fail_msg
if msg:
lu.LogWarning("Can't get any data from node %s: %s", node, msg)
return not cumul_degraded
-def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
+def _BlockdevFind(lu, node, dev, instance):
+ """Wrapper around call_blockdev_find to annotate diskparams.
+
+ @param lu: A reference to the lu object
+ @param node: The node to call out
+ @param dev: The device to find
+ @param instance: The instance object the device belongs to
+ @returns The result of the rpc call
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return lu.rpc.call_blockdev_find(node, disk)
+
+
+def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
+ """Wrapper around L{_CheckDiskConsistencyInner}.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
+ ldisk=ldisk)
+
+
+def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
+ ldisk=False):
"""Check that mirrors are not degraded.
+ @attention: The device has to be annotated already.
+
The ldisk parameter, if True, will change the test from the
is_degraded attribute (which represents overall non-ok status for
the device(s)) to the ldisk (representing the local storage status).
if dev.children:
for child in dev.children:
- result = result and _CheckDiskConsistency(lu, child, node, on_primary)
+ result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
+ on_primary)
return result
"""Logical unit for OOB handling.
"""
- REG_BGL = False
+ REQ_BGL = False
_SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
def ExpandNames(self):
type(result.payload))
if self.op.command in [
- constants.OOB_POWER_ON,
- constants.OOB_POWER_OFF,
- constants.OOB_POWER_CYCLE,
- ]:
+ constants.OOB_POWER_ON,
+ constants.OOB_POWER_OFF,
+ constants.OOB_POWER_CYCLE,
+ ]:
if result.payload is not None:
errs.append("%s is expected to not return payload but got '%s'" %
(self.op.command, result.payload))
def BuildHooksEnv(self):
"""Build hooks env.
- This doesn't run on the target node in the pre phase as a failed
- node would then be impossible to remove.
-
"""
return {
"OP_TARGET": self.op.node_name,
def BuildHooksNodes(self):
"""Build hooks nodes.
+ This doesn't run on the target node in the pre phase as a failed
+ node would then be impossible to remove.
+
"""
all_nodes = self.cfg.GetNodeList()
try:
all_nodes.remove(self.op.node_name)
except ValueError:
- logging.warning("Node '%s', which is about to be removed, was not found"
- " in the list of all nodes", self.op.node_name)
+ pass
return (all_nodes, all_nodes)
def CheckPrereq(self):
node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
[lu.cfg.GetHypervisorType()])
- live_data = dict((name, _MakeLegacyNodeInfo(nresult.payload))
+ live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
for (name, nresult) in node_data.items()
if not nresult.fail_msg and nresult.payload)
else:
live_data = {}
if query.IQ_DISKUSAGE in self.requested_data:
+ gmi = ganeti.masterd.instance
disk_usage = dict((inst.name,
- _ComputeDiskSize(inst.disk_template,
- [{constants.IDISK_SIZE: disk.size}
- for disk in inst.disks]))
+ gmi.ComputeDiskSize(inst.disk_template,
+ [{constants.IDISK_SIZE: disk.size}
+ for disk in inst.disks]))
for inst in instance_list)
else:
disk_usage = None
if not newbie_singlehomed:
# check reachability from my secondary ip to newbie's secondary ip
if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
- source=myself.secondary_ip):
+ source=myself.secondary_ip):
raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
" based ping to node daemon port",
errors.ECODE_ENVIRON)
if self.op.disk_state:
self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+ # TODO: If we need to have multiple DnsOnlyRunner we probably should make
+ # it a property on the base class.
+ result = rpc.DnsOnlyRunner().call_version([node])[node]
+ result.Raise("Can't get version information from node %s" % node)
+ if constants.PROTOCOL_VERSION == result.payload:
+ logging.info("Communication to node %s fine, sw version %s match",
+ node, result.payload)
+ else:
+ raise errors.OpPrereqError("Version mismatch master version %s,"
+ " node version %s" %
+ (constants.PROTOCOL_VERSION, result.payload),
+ errors.ECODE_ENVIRON)
+
def Exec(self, feedback_fn):
"""Adds the new node to the cluster.
if self.op.disk_state:
new_node.disk_state_static = self.new_disk_state
- # check connectivity
- result = self.rpc.call_version([node])[node]
- result.Raise("Can't get version information from node %s" % node)
- if constants.PROTOCOL_VERSION == result.payload:
- logging.info("Communication to node %s fine, sw version %s match",
- node, result.payload)
- else:
- raise errors.OpExecError("Version mismatch master version %s,"
- " node version %s" %
- (constants.PROTOCOL_VERSION, result.payload))
-
# Add node to our /etc/hosts, and add key to known_hosts
if self.cfg.GetClusterInfo().modify_etc_hosts:
master_node = self.cfg.GetMasterNode()
errors.ECODE_INVAL)
# Boolean value that tells us whether we might be demoting from MC
- self.might_demote = (self.op.master_candidate == False or
- self.op.offline == True or
- self.op.drained == True or
- self.op.master_capable == False)
+ self.might_demote = (self.op.master_candidate is False or
+ self.op.offline is True or
+ self.op.drained is True or
+ self.op.master_capable is False)
if self.op.secondary_ip:
if not netutils.IP4Address.IsValid(self.op.secondary_ip):
" it a master candidate" % node.name,
errors.ECODE_STATE)
- if self.op.vm_capable == False:
+ if self.op.vm_capable is False:
(ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
if ipri or isec:
raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
if mc_remaining < mc_should:
raise errors.OpPrereqError("Not enough master candidates, please"
" pass auto promote option to allow"
- " promotion", errors.ECODE_STATE)
+ " promotion (--auto-promote or RAPI"
+ " auto_promote=True)", errors.ECODE_STATE)
self.old_flags = old_flags = (node.master_candidate,
node.drained, node.offline)
# Check for ineffective changes
for attr in self._FLAGS:
- if (getattr(self.op, attr) == False and getattr(node, attr) == False):
+ if (getattr(self.op, attr) is False and getattr(node, attr) is False):
self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
setattr(self.op, attr, None)
# TODO: We might query the real power state if it supports OOB
if _SupportsOob(self.cfg, node):
if self.op.offline is False and not (node.powered or
- self.op.powered == True):
+ self.op.powered is True):
raise errors.OpPrereqError(("Node %s needs to be turned on before its"
" offline status can be reset") %
- self.op.node_name)
+ self.op.node_name, errors.ECODE_STATE)
elif self.op.powered is not None:
raise errors.OpPrereqError(("Unable to change powered state for node %s"
" as it does not support out-of-band"
- " handling") % self.op.node_name)
+ " handling") % self.op.node_name,
+ errors.ECODE_STATE)
# If we're being deofflined/drained, we'll MC ourself if needed
- if (self.op.drained == False or self.op.offline == False or
+ if (self.op.drained is False or self.op.offline is False or
(self.op.master_capable and not node.master_capable)):
if _DecideSelfPromotion(self):
self.op.master_candidate = True
self.LogInfo("Auto-promoting node to master candidate")
# If we're no longer master capable, we'll demote ourselves from MC
- if self.op.master_capable == False and node.master_candidate:
+ if self.op.master_capable is False and node.master_candidate:
self.LogInfo("Demoting from master candidate")
self.op.master_candidate = False
if old_role == self._ROLE_OFFLINE and new_role != old_role:
# Trying to transition out of offline status
- # TODO: Use standard RPC runner, but make sure it works when the node is
- # still marked offline
- result = rpc.BootstrapRunner().call_version([node.name])[node.name]
+ result = self.rpc.call_version([node.name])[node.name]
if result.fail_msg:
raise errors.OpPrereqError("Node %s is being de-offlined but fails"
" to report its version: %s" %
" without using re-add. Please make sure the node"
" is healthy!")
+ # When changing the secondary ip, verify if this is a single-homed to
+ # multi-homed transition or vice versa, and apply the relevant
+ # restrictions.
if self.op.secondary_ip:
# Ok even without locking, because this can't be changed by any LU
master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
master_singlehomed = master.secondary_ip == master.primary_ip
- if master_singlehomed and self.op.secondary_ip:
- raise errors.OpPrereqError("Cannot change the secondary ip on a single"
- " homed cluster", errors.ECODE_INVAL)
+ if master_singlehomed and self.op.secondary_ip != node.primary_ip:
+ if self.op.force and node.name == master.name:
+ self.LogWarning("Transitioning from single-homed to multi-homed"
+ " cluster. All nodes will require a secondary ip.")
+ else:
+ raise errors.OpPrereqError("Changing the secondary ip on a"
+ " single-homed cluster requires the"
+ " --force option to be passed, and the"
+ " target node to be the master",
+ errors.ECODE_INVAL)
+ elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
+ if self.op.force and node.name == master.name:
+ self.LogWarning("Transitioning from multi-homed to single-homed"
+ " cluster. Secondary IPs will have to be removed.")
+ else:
+ raise errors.OpPrereqError("Cannot set the secondary IP to be the"
+ " same as the primary IP on a multi-homed"
+ " cluster, unless the --force option is"
+ " passed, and the target node is the"
+ " master", errors.ECODE_INVAL)
assert not (frozenset(affected_instances) -
self.owned_locks(locking.LEVEL_INSTANCE))
if node.offline:
if affected_instances:
- raise errors.OpPrereqError("Cannot change secondary IP address:"
- " offline node has instances (%s)"
- " configured to use it" %
- utils.CommaJoin(affected_instances.keys()))
+ msg = ("Cannot change secondary IP address: offline node has"
+ " instances (%s) configured to use it" %
+ utils.CommaJoin(affected_instances.keys()))
+ raise errors.OpPrereqError(msg, errors.ECODE_STATE)
else:
# On online nodes, check that no instances are running, and that
# the node has the new ip and we can reach it.
"config_version": constants.CONFIG_VERSION,
"os_api_version": max(constants.OS_API_VERSIONS),
"export_version": constants.EXPORT_VERSION,
- "architecture": (platform.architecture()[0], platform.machine()),
+ "architecture": runtime.GetArchInfo(),
"name": cluster.cluster_name,
"master": cluster.master_node,
"default_hypervisor": cluster.primary_hypervisor,
"ipolicy": cluster.ipolicy,
"nicparams": cluster.nicparams,
"ndparams": cluster.ndparams,
+ "diskparams": cluster.diskparams,
"candidate_pool_size": cluster.candidate_pool_size,
"master_netdev": cluster.master_netdev,
"master_netmask": cluster.master_netmask,
"""
REQ_BGL = False
- _FIELDS_DYNAMIC = utils.FieldSet()
- _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
- "watcher_pause", "volume_group_name")
def CheckArguments(self):
- _CheckOutputFields(static=self._FIELDS_STATIC,
- dynamic=self._FIELDS_DYNAMIC,
- selected=self.op.output_fields)
+ self.cq = _ClusterQuery(None, self.op.output_fields, False)
def ExpandNames(self):
- self.needed_locks = {}
+ self.cq.ExpandNames(self)
+
+ def DeclareLocks(self, level):
+ self.cq.DeclareLocks(self, level)
def Exec(self, feedback_fn):
- """Dump a representation of the cluster config to the standard output.
-
- """
- values = []
- for field in self.op.output_fields:
- if field == "cluster_name":
- entry = self.cfg.GetClusterName()
- elif field == "master_node":
- entry = self.cfg.GetMasterNode()
- elif field == "drain_flag":
- entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
- elif field == "watcher_pause":
- entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
- elif field == "volume_group_name":
- entry = self.cfg.GetVGName()
- else:
- raise errors.ParameterError(field)
- values.append(entry)
- return values
+ result = self.cq.OldStyleQuery(self)
+
+ assert len(result) == 1
+
+ return result[0]
+
+
+class _ClusterQuery(_QueryBase):
+ FIELDS = query.CLUSTER_FIELDS
+
+ #: Do not sort (there is only one item)
+ SORT_FIELD = None
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ # The following variables interact with _QueryBase._GetNames
+ self.wanted = locking.ALL_SET
+ self.do_locking = self.use_locking
+
+ if self.do_locking:
+ raise errors.OpPrereqError("Can not use locking for cluster queries",
+ errors.ECODE_INVAL)
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
+
+ """
+ # Locking is not used
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ if query.CQ_CONFIG in self.requested_data:
+ cluster = lu.cfg.GetClusterInfo()
+ else:
+ cluster = NotImplemented
+
+ if query.CQ_QUEUE_DRAINED in self.requested_data:
+ drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+ else:
+ drain_flag = NotImplemented
+
+ if query.CQ_WATCHER_PAUSE in self.requested_data:
+ watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+ else:
+ watcher_pause = NotImplemented
+
+ return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
class LUInstanceActivateDisks(NoHooksLU):
if not disks_ok:
raise errors.OpExecError("Cannot activate block devices")
+ if self.op.wait_for_sync:
+ if not _WaitForSync(self, self.instance):
+ raise errors.OpExecError("Some disks of the instance are degraded!")
+
return disks_info
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ False, idx)
msg = result.fail_msg
if msg:
+ is_offline_secondary = (node in instance.secondary_nodes and
+ result.offline)
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=False, pass=1): %s",
inst_disk.iv_name, node, msg)
- if not ignore_secondaries:
+ if not (ignore_secondaries or is_offline_secondary):
disks_ok = False
# FIXME: race condition on drbd migration to primary
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ True, idx)
msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
for disk in disks:
for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(top_disk, node)
- result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+ result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
msg = result.fail_msg
if msg:
lu.LogWarning("Could not shutdown block device %s on node %s: %s",
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, instance.primary_node, "Instance primary node"
" offline, cannot reinstall")
- for node in instance.secondary_nodes:
- _CheckNodeOnline(self, node, "Instance secondary node offline,"
- " cannot reinstall")
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
constants.IDISK_METAVG,
]))
+ def _RunAllocator(self):
+ """Run the allocator based on input opcode.
+
+ """
+ be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
+
+ # FIXME
+ # The allocator should actually run in "relocate" mode, but current
+ # allocators don't support relocating all the nodes of an instance at
+ # the same time. As a workaround we use "allocate" mode, but this is
+ # suboptimal for two reasons:
+ # - The instance name passed to the allocator is present in the list of
+ # existing instances, so there could be a conflict within the
+ # internal structures of the allocator. This doesn't happen with the
+ # current allocators, but it's a liability.
+ # - The allocator counts the resources used by the instance twice: once
+ # because the instance exists already, and once because it tries to
+ # allocate a new instance.
+ # The allocator could choose some of the nodes on which the instance is
+ # running, but that's not a problem. If the instance nodes are broken,
+ # they should be already be marked as drained or offline, and hence
+ # skipped by the allocator. If instance disks have been lost for other
+ # reasons, then recreating the disks on the same nodes should be fine.
+ disk_template = self.instance.disk_template
+ spindle_use = be_full[constants.BE_SPINDLE_USE]
+ req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
+ disk_template=disk_template,
+ tags=list(self.instance.GetTags()),
+ os=self.instance.os,
+ nics=[{}],
+ vcpus=be_full[constants.BE_VCPUS],
+ memory=be_full[constants.BE_MAXMEM],
+ spindle_use=spindle_use,
+ disks=[{constants.IDISK_SIZE: d.size,
+ constants.IDISK_MODE: d.mode}
+ for d in self.instance.disks],
+ hypervisor=self.instance.hypervisor)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+ ial.Run(self.op.iallocator)
+
+ assert ial.required_nodes == len(self.instance.all_nodes)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+ " %s" % (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
+
+ if len(ial.result) != ial.required_nodes:
+ raise errors.OpPrereqError("iallocator '%s' returned invalid number"
+ " of nodes (%s), required %s" %
+ (self.op.iallocator, len(ial.result),
+ ial.required_nodes), errors.ECODE_FAULT)
+
+ self.op.nodes = ial.result
+ self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+ self.op.instance_name, self.op.iallocator,
+ utils.CommaJoin(ial.result))
+
def CheckArguments(self):
if self.op.disks and ht.TPositiveInt(self.op.disks[0]):
# Normalize and convert deprecated list of disk indices
" once: %s" % utils.CommaJoin(duplicates),
errors.ECODE_INVAL)
+ if self.op.iallocator and self.op.nodes:
+ raise errors.OpPrereqError("Give either the iallocator or the new"
+ " nodes, not both", errors.ECODE_INVAL)
+
for (idx, params) in self.op.disks:
utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
unsupported = frozenset(params.keys()) - self._MODIFYABLE
self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
else:
self.needed_locks[locking.LEVEL_NODE] = []
+ if self.op.iallocator:
+ # iallocator will select a new node in the same group
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
self.needed_locks[locking.LEVEL_NODE_RES] = []
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- # if we replace the nodes, we only need to lock the old primary,
- # otherwise we need to lock all nodes for disk re-creation
- primary_only = bool(self.op.nodes)
- self._LockInstancesNodes(primary_only=primary_only)
+ if level == locking.LEVEL_NODEGROUP:
+ assert self.op.iallocator is not None
+ assert not self.op.nodes
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
+ # Lock the primary group used by the instance optimistically; this
+ # requires going via the node before it's locked, requiring
+ # verification later on
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
+
+ elif level == locking.LEVEL_NODE:
+ # If an allocator is used, then we lock all the nodes in the current
+ # instance group, as we don't know yet which ones will be selected;
+ # if we replace the nodes without using an allocator, we only need to
+ # lock the old primary for doing RPCs (FIXME: we don't lock nodes for
+ # RPC anymore), otherwise we need to lock all the instance nodes for
+ # disk re-creation
+ if self.op.iallocator:
+ assert not self.op.nodes
+ assert not self.needed_locks[locking.LEVEL_NODE]
+ assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
+
+ # Lock member nodes of the group of the primary node
+ for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
+ self.needed_locks[locking.LEVEL_NODE].extend(
+ self.cfg.GetNodeGroup(group_uuid).members)
+ else:
+ primary_only = bool(self.op.nodes)
+ self._LockInstancesNodes(primary_only=primary_only)
elif level == locking.LEVEL_NODE_RES:
# Copy node locks
self.needed_locks[locking.LEVEL_NODE_RES] = \
primary_node = self.op.nodes[0]
else:
primary_node = instance.primary_node
- _CheckNodeOnline(self, primary_node)
+ if not self.op.iallocator:
+ _CheckNodeOnline(self, primary_node)
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name, errors.ECODE_INVAL)
+ # Verify if node group locks are still correct
+ owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+ if owned_groups:
+ # Node group locks are acquired only for the primary node (and only
+ # when the allocator is used)
+ _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
+ primary_only=True)
+
# if we replace nodes *and* the old primary is offline, we don't
# check
assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
- if not (self.op.nodes and old_pnode.offline):
+ if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
_CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
msg="cannot recreate disks")
raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
errors.ECODE_INVAL)
- if (self.op.nodes and
+ if ((self.op.nodes or self.op.iallocator) and
sorted(self.disks.keys()) != range(len(instance.disks))):
raise errors.OpPrereqError("Can't recreate disks partially and"
" change the nodes at the same time",
self.instance = instance
+ if self.op.iallocator:
+ self._RunAllocator()
+
+ # Release unneeded node and node resource locks
+ _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
+
def Exec(self, feedback_fn):
"""Recreate the disks.
"""
logging.info("Removing block devices for instance %s", instance.name)
- if not _RemoveDisks(lu, instance):
+ if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures):
if not ignore_failures:
raise errors.OpExecError("Can't remove instance's disks")
feedback_fn("Warning: can't remove instance's disks")
_CheckNodeOnline(self, target_node)
_CheckNodeNotDrained(self, target_node)
_CheckNodeVmCapable(self, target_node)
- ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
- self.cfg.GetNodeGroup(node.group))
+ cluster = self.cfg.GetClusterInfo()
+ group_info = self.cfg.GetNodeGroup(node.group)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
_CheckTargetNodeIPolicy(self, ipolicy, instance, node,
ignore=self.op.ignore_ipolicy)
# activate, get path, copy the data over
for idx, disk in enumerate(instance.disks):
self.LogInfo("Copying data for disk %d", idx)
- result = self.rpc.call_blockdev_assemble(target_node, disk,
+ result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
instance.name, True, idx)
if result.fail_msg:
self.LogWarning("Can't assemble newly created disk %d: %s",
errs.append(result.fail_msg)
break
dev_path = result.payload
- result = self.rpc.call_blockdev_export(source_node, disk,
+ result = self.rpc.call_blockdev_export(source_node, (disk, instance),
target_node, dev_path,
cluster_name)
if result.fail_msg:
# Check that the target node is correct in terms of instance policy
nodeinfo = self.cfg.GetNodeInfo(self.target_node)
group_info = self.cfg.GetNodeGroup(nodeinfo.group)
- ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ group_info)
_CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
ignore=self.ignore_ipolicy)
if self.target_node == instance.primary_node:
raise errors.OpPrereqError("Cannot migrate instance %s"
" to its primary (%s)" %
- (instance.name, instance.primary_node))
+ (instance.name, instance.primary_node),
+ errors.ECODE_STATE)
if len(self.lu.tasklets) == 1:
# It is safe to release locks only when we're the only tasklet
errors.ECODE_INVAL)
nodeinfo = self.cfg.GetNodeInfo(target_node)
group_info = self.cfg.GetNodeGroup(nodeinfo.group)
- ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ group_info)
_CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
ignore=self.ignore_ipolicy)
"""
# FIXME: add a self.ignore_ipolicy option
- ial = IAllocator(self.cfg, self.rpc,
- mode=constants.IALLOCATOR_MODE_RELOC,
- name=self.instance_name,
- # TODO See why hail breaks with a single node below
- relocate_from=[self.instance.primary_node,
- self.instance.primary_node],
- )
+ req = iallocator.IAReqRelocate(name=self.instance_name,
+ relocate_from=[self.instance.primary_node])
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.lu.op.iallocator)
ial.required_nodes), errors.ECODE_FAULT)
self.target_node = ial.result[0]
self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
- self.instance_name, self.lu.op.iallocator,
- utils.CommaJoin(ial.result))
+ self.instance_name, self.lu.op.iallocator,
+ utils.CommaJoin(ial.result))
def _WaitUntilSync(self):
"""Poll with custom rpc for disk sync.
all_done = True
result = self.rpc.call_drbd_wait_sync(self.all_nodes,
self.nodes_ip,
- self.instance.disks)
+ (self.instance.disks,
+ self.instance))
min_percent = 100
for node, nres in result.items():
nres.Raise("Cannot resync disks on node %s" % node)
msg = "single-master"
self.feedback_fn("* changing disks into %s mode" % msg)
result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
- self.instance.disks,
+ (self.instance.disks, self.instance),
self.instance.name, multimaster)
for node, nres in result.items():
nres.Raise("Cannot change disks config on node %s" % node)
# Don't raise an exception here, as we stil have to try to revert the
# disk status, even if this step failed.
- abort_result = self.rpc.call_instance_finalize_migration_src(source_node,
- instance, False, self.live)
+ abort_result = self.rpc.call_instance_finalize_migration_src(
+ source_node, instance, False, self.live)
abort_msg = abort_result.fail_msg
if abort_msg:
logging.error("Aborting migration failed on source node %s: %s",
self.feedback_fn("* checking disk consistency between source and target")
for (idx, dev) in enumerate(instance.disks):
- if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+ if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
raise errors.OpExecError("Disk %s is degraded or not fully"
" synchronized on target node,"
" aborting migration" % idx)
disks = _ExpandCheckDisks(instance, instance.disks)
self.feedback_fn("* unmapping instance's disks from %s" % source_node)
for disk in disks:
- result = self.rpc.call_blockdev_shutdown(source_node, disk)
+ result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
msg = result.fail_msg
if msg:
logging.error("Migration was successful, but couldn't unmap the"
self.feedback_fn("* checking disk consistency between source and target")
for (idx, dev) in enumerate(instance.disks):
# for drbd, these are drbd over lvm
- if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+ if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
+ False):
if primary_node.offline:
self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
" target node %s" %
return self._ExecMigration()
-def _CreateBlockDev(lu, node, instance, device, force_create,
- info, force_open):
+def _CreateBlockDev(lu, node, instance, device, force_create, info,
+ force_open):
+ """Wrapper around L{_CreateBlockDevInner}.
+
+ This method annotates the root device first.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
+ return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
+ force_open)
+
+
+def _CreateBlockDevInner(lu, node, instance, device, force_create,
+ info, force_open):
"""Create a tree of block devices on a given node.
If this device type has to be created on secondaries, create it and
If not, just recurse to children keeping the same 'force' value.
+ @attention: The device has to be annotated already.
+
@param lu: the lu on whose behalf we execute
@param node: the node on which to create the device
@type instance: L{objects.Instance}
if device.children:
for child in device.children:
- _CreateBlockDev(lu, node, instance, child, force_create,
- info, force_open)
+ _CreateBlockDevInner(lu, node, instance, child, force_create,
+ info, force_open)
if not force_create:
return
return results
-def _ComputeLDParams(disk_template, disk_params):
- """Computes Logical Disk parameters from Disk Template parameters.
-
- @type disk_template: string
- @param disk_template: disk template, one of L{constants.DISK_TEMPLATES}
- @type disk_params: dict
- @param disk_params: disk template parameters; dict(template_name -> parameters
- @rtype: list(dict)
- @return: a list of dicts, one for each node of the disk hierarchy. Each dict
- contains the LD parameters of the node. The tree is flattened in-order.
-
- """
- if disk_template not in constants.DISK_TEMPLATES:
- raise errors.ProgrammerError("Unknown disk template %s" % disk_template)
-
- result = list()
- dt_params = disk_params[disk_template]
- if disk_template == constants.DT_DRBD8:
- drbd_params = {
- constants.LDP_RESYNC_RATE: dt_params[constants.DRBD_RESYNC_RATE],
- constants.LDP_BARRIERS: dt_params[constants.DRBD_DISK_BARRIERS],
- constants.LDP_NO_META_FLUSH: dt_params[constants.DRBD_META_BARRIERS],
- constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG],
- constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM],
- constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM],
- constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC],
- constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD],
- constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET],
- constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET],
- constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE],
- constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE],
- }
-
- drbd_params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_DRBD8],
- drbd_params)
-
- result.append(drbd_params)
-
- # data LV
- data_params = {
- constants.LDP_STRIPES: dt_params[constants.DRBD_DATA_STRIPES],
- }
- data_params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
- data_params)
- result.append(data_params)
-
- # metadata LV
- meta_params = {
- constants.LDP_STRIPES: dt_params[constants.DRBD_META_STRIPES],
- }
- meta_params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
- meta_params)
- result.append(meta_params)
-
- elif (disk_template == constants.DT_FILE or
- disk_template == constants.DT_SHARED_FILE):
- result.append(constants.DISK_LD_DEFAULTS[constants.LD_FILE])
-
- elif disk_template == constants.DT_PLAIN:
- params = {
- constants.LDP_STRIPES: dt_params[constants.LV_STRIPES],
- }
- params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
- params)
- result.append(params)
-
- elif disk_template == constants.DT_BLOCK:
- result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV])
-
- elif disk_template == constants.DT_RBD:
- params = {
- constants.LDP_POOL: dt_params[constants.RBD_POOL]
- }
- params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD],
- params)
- result.append(params)
-
- return result
-
-
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
- iv_name, p_minor, s_minor, drbd_params, data_params,
- meta_params):
+ iv_name, p_minor, s_minor):
"""Generate a drbd8 device complete with its children.
"""
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
logical_id=(vgnames[0], names[0]),
- params=data_params)
- dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
+ params={})
+ dev_meta = objects.Disk(dev_type=constants.LD_LV,
+ size=constants.DRBD_META_SIZE,
logical_id=(vgnames[1], names[1]),
- params=meta_params)
+ params={})
drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
logical_id=(primary, secondary, port,
p_minor, s_minor,
shared_secret),
children=[dev_data, dev_meta],
- iv_name=iv_name, params=drbd_params)
+ iv_name=iv_name, params={})
return drbd_dev
}
-def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
- secondary_nodes, disk_info, file_storage_dir, file_driver, base_index,
- feedback_fn, disk_params,
- _req_file_storage=opcodes.RequireFileStorage,
- _req_shr_file_storage=opcodes.RequireSharedFileStorage):
+def _GenerateDiskTemplate(
+ lu, template_name, instance_name, primary_node, secondary_nodes,
+ disk_info, file_storage_dir, file_driver, base_index,
+ feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
+ _req_shr_file_storage=opcodes.RequireSharedFileStorage):
"""Generate the entire disk layout for a given template type.
"""
vgname = lu.cfg.GetVGName()
disk_count = len(disk_info)
disks = []
- ld_params = _ComputeLDParams(template_name, disk_params)
if template_name == constants.DT_DISKLESS:
pass
elif template_name == constants.DT_DRBD8:
- drbd_params, data_params, meta_params = ld_params
if len(secondary_nodes) != 1:
raise errors.ProgrammerError("Wrong template configuration")
remote_node = secondary_nodes[0]
minors = lu.cfg.AllocateDRBDMinor(
[primary_node, remote_node] * len(disk_info), instance_name)
+ (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
+ full_disk_params)
+ drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
+
names = []
for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
for i in range(disk_count)]):
names.append(lv_prefix + "_meta")
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
- drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
data_vg = disk.get(constants.IDISK_VG, vgname)
meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
[data_vg, meta_vg],
names[idx * 2:idx * 2 + 2],
"disk/%d" % disk_index,
- minors[idx * 2], minors[idx * 2 + 1],
- drbd_params, data_params, meta_params)
+ minors[idx * 2], minors[idx * 2 + 1])
disk_dev.mode = disk[constants.IDISK_MODE]
disks.append(disk_dev)
else:
(name_prefix, base_index + i)
for i in range(disk_count)])
- dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
-
if template_name == constants.DT_PLAIN:
def logical_id_fn(idx, _, disk):
vg = disk.get(constants.IDISK_VG, vgname)
else:
raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
+ dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
+
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
size = disk[constants.IDISK_SIZE]
logical_id=logical_id_fn(idx, disk_index, disk),
iv_name="disk/%d" % disk_index,
mode=disk[constants.IDISK_MODE],
- params=ld_params[0]))
+ params={}))
return disks
lu.cfg.SetDiskID(device, node)
logging.info("Pause sync of instance %s disks", instance.name)
- result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (instance.disks, instance),
+ True)
+ result.Raise("Failed RPC to node %s for pausing the disk syncing" % node)
for idx, success in enumerate(result.payload):
if not success:
wipe_size = min(wipe_chunk_size, size - offset)
logging.debug("Wiping disk %d, offset %s, chunk %s",
idx, offset, wipe_size)
- result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+ result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
+ wipe_size)
result.Raise("Could not wipe disk %d at offset %d for size %d" %
(idx, offset, wipe_size))
now = time.time()
finally:
logging.info("Resume sync of instance %s disks", instance.name)
- result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (instance.disks, instance),
+ False)
- for idx, success in enumerate(result.payload):
- if not success:
- lu.LogWarning("Resume sync of disk %d failed, please have a"
- " look at the status and troubleshoot the issue", idx)
- logging.warn("resume-sync of instance %s for disks %d failed",
- instance.name, idx)
+ if result.fail_msg:
+ lu.LogWarning("RPC call to %s for resuming disk syncing failed,"
+ " please have a look at the status and troubleshoot"
+ " the issue: %s", node, result.fail_msg)
+ else:
+ for idx, success in enumerate(result.payload):
+ if not success:
+ lu.LogWarning("Resume sync of disk %d failed, please have a"
+ " look at the status and troubleshoot the issue", idx)
+ logging.warn("resume-sync of instance %s for disks %d failed",
+ instance.name, idx)
def _CreateDisks(lu, instance, to_skip=None, target_node=None):
_CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
-def _RemoveDisks(lu, instance, target_node=None):
+def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
"""Remove all disks for an instance.
This abstracts away some work from `AddInstance()` and
logging.info("Removing block devices for instance %s", instance.name)
all_result = True
- for (idx, device) in instance.disks:
+ ports_to_release = set()
+ anno_disks = _AnnotateDiskParams(instance, instance.disks, lu.cfg)
+ for (idx, device) in enumerate(anno_disks):
if target_node:
edata = [(target_node, device)]
else:
edata = device.ComputeNodeTree(instance.primary_node)
for node, disk in edata:
lu.cfg.SetDiskID(disk, node)
- msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
- if msg:
+ result = lu.rpc.call_blockdev_remove(node, disk)
+ if result.fail_msg:
lu.LogWarning("Could not remove disk %s on node %s,"
- " continuing anyway: %s", idx, node, msg)
- all_result = False
+ " continuing anyway: %s", idx, node, result.fail_msg)
+ if not (result.offline and node != instance.primary_node):
+ all_result = False
# if this is a DRBD disk, return its port to the pool
if device.dev_type in constants.LDS_DRBD:
- tcp_port = device.logical_id[2]
- lu.cfg.AddTcpUdpPort(tcp_port)
+ ports_to_release.add(device.logical_id[2])
+
+ if all_result or ignore_failures:
+ for port in ports_to_release:
+ lu.cfg.AddTcpUdpPort(port)
if instance.disk_template == constants.DT_FILE:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
constants.DT_DISKLESS: {},
constants.DT_PLAIN: _compute(disks, 0),
# 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE),
+ constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
constants.DT_FILE: {},
constants.DT_SHARED_FILE: {},
}
return req_size_dict[disk_template]
-def _ComputeDiskSize(disk_template, disks):
- """Compute disk size requirements in the volume group
-
- """
- # Required free disk space as a function of disk and swap space
- req_size_dict = {
- constants.DT_DISKLESS: None,
- constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
- # 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8:
- sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
- constants.DT_FILE: None,
- constants.DT_SHARED_FILE: 0,
- constants.DT_BLOCK: 0,
- constants.DT_RBD: 0,
- }
-
- if disk_template not in req_size_dict:
- raise errors.ProgrammerError("Disk template '%s' size requirement"
- " is unknown" % disk_template)
-
- return req_size_dict[disk_template]
-
-
def _FilterVmNodes(lu, nodenames):
"""Filters out non-vm_capable nodes from a list.
"""
nics = [n.ToDict() for n in self.nics]
- ial = IAllocator(self.cfg, self.rpc,
- mode=constants.IALLOCATOR_MODE_ALLOC,
- name=self.op.instance_name,
- disk_template=self.op.disk_template,
- tags=self.op.tags,
- os=self.op.os_type,
- vcpus=self.be_full[constants.BE_VCPUS],
- memory=self.be_full[constants.BE_MAXMEM],
- disks=self.disks,
- nics=nics,
- hypervisor=self.op.hypervisor,
- )
+ memory = self.be_full[constants.BE_MAXMEM]
+ spindle_use = self.be_full[constants.BE_SPINDLE_USE]
+ req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
+ disk_template=self.op.disk_template,
+ tags=self.op.tags,
+ os=self.op.os_type,
+ vcpus=self.be_full[constants.BE_VCPUS],
+ memory=memory,
+ spindle_use=spindle_use,
+ disks=self.disks,
+ nics=nics,
+ hypervisor=self.op.hypervisor)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
if self.op.disk_template not in constants.DISK_TEMPLATES:
raise errors.OpPrereqError("Disk template specified in configuration"
" file is not one of the allowed values:"
- " %s" % " ".join(constants.DISK_TEMPLATES))
+ " %s" %
+ " ".join(constants.DISK_TEMPLATES),
+ errors.ECODE_INVAL)
else:
raise errors.OpPrereqError("No disk template specified and the export"
" is missing the disk_template information",
cfg_storagedir = get_fsd_fn()
if not cfg_storagedir:
- raise errors.OpPrereqError("Cluster file storage dir not defined")
+ raise errors.OpPrereqError("Cluster file storage dir not defined",
+ errors.ECODE_STATE)
joinargs.append(cfg_storagedir)
if self.op.file_storage_dir is not None:
if self.op.mode == constants.INSTANCE_IMPORT:
export_info = self._ReadExportInfo()
self._ReadExportParams(export_info)
+ self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
+ else:
+ self._old_instance_name = None
if (not self.cfg.GetVGName() and
self.op.disk_template not in constants.DTS_NOT_LVM):
enabled_hvs = cluster.enabled_hypervisors
if self.op.hypervisor not in enabled_hvs:
raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
- " cluster (%s)" % (self.op.hypervisor,
- ",".join(enabled_hvs)),
+ " cluster (%s)" %
+ (self.op.hypervisor, ",".join(enabled_hvs)),
errors.ECODE_STATE)
# Check tag validity
self.src_images = disk_images
- old_name = export_info.get(constants.INISECT_INS, "name")
- if self.op.instance_name == old_name:
+ if self.op.instance_name == self._old_instance_name:
for idx, nic in enumerate(self.nics):
if nic.mac == constants.VALUE_AUTO:
nic_mac_ini = "nic%d_mac" % idx
nodenames = [pnode.name] + self.secondaries
# Verify instance specs
+ spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
ispec = {
constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
constants.ISPEC_DISK_COUNT: len(self.disks),
constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks],
constants.ISPEC_NIC_COUNT: len(self.nics),
+ constants.ISPEC_SPINDLE_USE: spindle_use,
}
group_info = self.cfg.GetNodeGroup(pnode.group)
- ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec)
if not self.op.ignore_ipolicy and res:
raise errors.OpPrereqError(("Instance allocation to group %s violates"
utils.CommaJoin(res)),
errors.ECODE_INVAL)
- # disk parameters (not customizable at instance or node level)
- # just use the primary node parameters, ignoring the secondary.
- self.diskparams = group_info.diskparams
-
if not self.adopt_disks:
if self.op.disk_template == constants.DT_RBD:
# _CheckRADOSFreeSpace() is just a placeholder.
else:
network_port = None
+ # This is ugly but we got a chicken-egg problem here
+ # We can only take the group disk parameters, as the instance
+ # has no disks yet (we are generating them right here).
+ node = self.cfg.GetNodeInfo(pnode_name)
+ nodegroup = self.cfg.GetNodeGroup(node.group)
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
instance, pnode_name,
self.op.file_driver,
0,
feedback_fn,
- self.diskparams)
+ self.cfg.GetGroupDiskParams(nodegroup))
iobj = objects.Instance(name=instance, os=self.op.os_type,
primary_node=pnode_name,
_ReleaseLocks(self, locking.LEVEL_NODE_RES)
if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
+ # we need to set the disks ID to the primary node, since the
+ # preceding code might or might have not done it, depending on
+ # disk template and other options
+ for disk in iobj.disks:
+ self.cfg.SetDiskID(disk, pnode_name)
if self.op.mode == constants.INSTANCE_CREATE:
if not self.op.no_install:
pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
if pause_sync:
feedback_fn("* pausing disk sync to install instance OS")
result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
- iobj.disks, True)
+ (iobj.disks,
+ iobj), True)
for idx, success in enumerate(result.payload):
if not success:
logging.warn("pause-sync of instance %s for disk %d failed",
if pause_sync:
feedback_fn("* resuming disk sync")
result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
- iobj.disks, False)
+ (iobj.disks,
+ iobj), False)
for idx, success in enumerate(result.payload):
if not success:
logging.warn("resume-sync of instance %s for disk %d failed",
os_add_result.Raise("Could not add os for instance %s"
" on node %s" % (instance, pnode_name))
- elif self.op.mode == constants.INSTANCE_IMPORT:
- feedback_fn("* running the instance OS import scripts...")
+ else:
+ if self.op.mode == constants.INSTANCE_IMPORT:
+ feedback_fn("* running the instance OS import scripts...")
+
+ transfers = []
+
+ for idx, image in enumerate(self.src_images):
+ if not image:
+ continue
+
+ # FIXME: pass debug option from opcode to backend
+ dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+ constants.IEIO_FILE, (image, ),
+ constants.IEIO_SCRIPT,
+ (iobj.disks[idx], idx),
+ None)
+ transfers.append(dt)
+
+ import_result = \
+ masterd.instance.TransferInstanceData(self, feedback_fn,
+ self.op.src_node, pnode_name,
+ self.pnode.secondary_ip,
+ iobj, transfers)
+ if not compat.all(import_result):
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
+ rename_from = self._old_instance_name
+
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ feedback_fn("* preparing remote import...")
+ # The source cluster will stop the instance before attempting to make
+ # a connection. In some cases stopping an instance can take a long
+ # time, hence the shutdown timeout is added to the connection
+ # timeout.
+ connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
+ self.op.source_shutdown_timeout)
+ timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
- transfers = []
+ assert iobj.primary_node == self.pnode.name
+ disk_results = \
+ masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
+ self.source_x509_ca,
+ self._cds, timeouts)
+ if not compat.all(disk_results):
+ # TODO: Should the instance still be started, even if some disks
+ # failed to import (valid for local imports, too)?
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
- for idx, image in enumerate(self.src_images):
- if not image:
- continue
+ rename_from = self.source_instance_name
- # FIXME: pass debug option from opcode to backend
- dt = masterd.instance.DiskTransfer("disk/%s" % idx,
- constants.IEIO_FILE, (image, ),
- constants.IEIO_SCRIPT,
- (iobj.disks[idx], idx),
- None)
- transfers.append(dt)
-
- import_result = \
- masterd.instance.TransferInstanceData(self, feedback_fn,
- self.op.src_node, pnode_name,
- self.pnode.secondary_ip,
- iobj, transfers)
- if not compat.all(import_result):
- self.LogWarning("Some disks for instance %s on node %s were not"
- " imported successfully" % (instance, pnode_name))
-
- elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
- feedback_fn("* preparing remote import...")
- # The source cluster will stop the instance before attempting to make a
- # connection. In some cases stopping an instance can take a long time,
- # hence the shutdown timeout is added to the connection timeout.
- connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
- self.op.source_shutdown_timeout)
- timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
-
- assert iobj.primary_node == self.pnode.name
- disk_results = \
- masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
- self.source_x509_ca,
- self._cds, timeouts)
- if not compat.all(disk_results):
- # TODO: Should the instance still be started, even if some disks
- # failed to import (valid for local imports, too)?
- self.LogWarning("Some disks for instance %s on node %s were not"
- " imported successfully" % (instance, pnode_name))
+ else:
+ # also checked in the prereq part
+ raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
+ % self.op.mode)
# Run rename script on newly imported instance
assert iobj.name == instance
feedback_fn("Running rename script for %s" % instance)
result = self.rpc.call_instance_run_rename(pnode_name, iobj,
- self.source_instance_name,
+ rename_from,
self.op.debug_level)
if result.fail_msg:
self.LogWarning("Failed to run rename script for %s on node"
" %s: %s" % (instance, pnode_name, result.fail_msg))
- else:
- # also checked in the prereq part
- raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
- % self.op.mode)
-
assert not self.owned_locks(locking.LEVEL_NODE_RES)
if self.op.start:
assert not self.needed_locks[locking.LEVEL_NODE]
# Lock member nodes of all locked groups
- self.needed_locks[locking.LEVEL_NODE] = [node_name
- for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
- for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+ self.needed_locks[locking.LEVEL_NODE] = \
+ [node_name
+ for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
+ for node_name in self.cfg.GetNodeGroup(group_uuid).members]
else:
self._LockInstancesNodes()
elif level == locking.LEVEL_NODE_RES:
self.node_secondary_ip = None
@staticmethod
- def CheckArguments(mode, remote_node, iallocator):
+ def CheckArguments(mode, remote_node, ialloc):
"""Helper function for users of this class.
"""
# check for valid parameter combination
if mode == constants.REPLACE_DISK_CHG:
- if remote_node is None and iallocator is None:
+ if remote_node is None and ialloc is None:
raise errors.OpPrereqError("When changing the secondary either an"
" iallocator script must be used or the"
" new node given", errors.ECODE_INVAL)
- if remote_node is not None and iallocator is not None:
+ if remote_node is not None and ialloc is not None:
raise errors.OpPrereqError("Give either the iallocator or the new"
" secondary, not both", errors.ECODE_INVAL)
- elif remote_node is not None or iallocator is not None:
+ elif remote_node is not None or ialloc is not None:
# Not replacing the secondary
raise errors.OpPrereqError("The iallocator and new node options can"
" only be used when changing the"
"""Compute a new secondary node using an IAllocator.
"""
- ial = IAllocator(lu.cfg, lu.rpc,
- mode=constants.IALLOCATOR_MODE_RELOC,
- name=instance_name,
- relocate_from=list(relocate_from))
+ req = iallocator.IAReqRelocate(name=instance_name,
+ relocate_from=list(relocate_from))
+ ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
ial.Run(iallocator_name)
self.lu.LogInfo("Checking disk/%d on %s", idx, node)
self.cfg.SetDiskID(dev, node)
- result = self.rpc.call_blockdev_find(node, dev)
+ result = _BlockdevFind(self, node, dev, instance)
if result.offline:
continue
if self.remote_node_info:
# We change the node, lets verify it still meets instance policy
new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
- ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
- new_group_info)
+ cluster = self.cfg.GetClusterInfo()
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ new_group_info)
_CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
ignore=self.ignore_ipolicy)
- # TODO: compute disk parameters
- primary_node_info = self.cfg.GetNodeInfo(instance.primary_node)
- secondary_node_info = self.cfg.GetNodeInfo(secondary_node)
- if primary_node_info.group != secondary_node_info.group:
- self.lu.LogInfo("The instance primary and secondary nodes are in two"
- " different node groups; the disk parameters of the"
- " primary node's group will be applied.")
-
- self.diskparams = self.cfg.GetNodeGroup(primary_node_info.group).diskparams
-
for node in check_nodes:
_CheckNodeOnline(self.lu, node)
self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
self.cfg.SetDiskID(dev, node)
- result = self.rpc.call_blockdev_find(node, dev)
+ result = _BlockdevFind(self, node, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
self.lu.LogInfo("Checking disk/%d consistency on node %s" %
(idx, node_name))
- if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
- ldisk=ldisk):
+ if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+ on_primary, ldisk=ldisk):
raise errors.OpExecError("Node %s has degraded storage, unsafe to"
" replace disks for instance %s" %
(node_name, self.instance.name))
"""
iv_names = {}
- for idx, dev in enumerate(self.instance.disks):
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ for idx, dev in enumerate(disks):
if idx not in self.disks:
continue
lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
names = _GenerateUniqueNames(self.lu, lv_names)
- _, data_p, meta_p = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
-
- vg_data = dev.children[0].logical_id[0]
+ (data_disk, meta_disk) = dev.children
+ vg_data = data_disk.logical_id[0]
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
- logical_id=(vg_data, names[0]), params=data_p)
- vg_meta = dev.children[1].logical_id[0]
- lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
- logical_id=(vg_meta, names[1]), params=meta_p)
+ logical_id=(vg_data, names[0]),
+ params=data_disk.params)
+ vg_meta = meta_disk.logical_id[0]
+ lv_meta = objects.Disk(dev_type=constants.LD_LV,
+ size=constants.DRBD_META_SIZE,
+ logical_id=(vg_meta, names[1]),
+ params=meta_disk.params)
new_lvs = [lv_data, lv_meta]
old_lvs = [child.Copy() for child in dev.children]
# we pass force_create=True to force the LVM creation
for new_lv in new_lvs:
- _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False)
+ _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
+ _GetInstanceInfoText(self.instance), False)
return iv_names
for name, (dev, _, _) in iv_names.iteritems():
self.cfg.SetDiskID(dev, node_name)
- result = self.rpc.call_blockdev_find(node_name, dev)
+ result = _BlockdevFind(self, node_name, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
# Now that the new lvs have the old name, we can add them to the device
self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
- result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
- new_lvs)
+ result = self.rpc.call_blockdev_addchildren(self.target_node,
+ (dev, self.instance), new_lvs)
msg = result.fail_msg
if msg:
for new_lv in new_lvs:
# Step: create new storage
self.lu.LogStep(3, steps_total, "Allocate new storage")
- for idx, dev in enumerate(self.instance.disks):
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ for idx, dev in enumerate(disks):
self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
(self.new_node, idx))
# we pass force_create=True to force LVM creation
for new_lv in dev.children:
- _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False)
+ _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
+ True, _GetInstanceInfoText(self.instance), False)
# Step 4: dbrd minors and drbd setups changes
# after this, we must manually remove the drbd minors on both the
iv_names[idx] = (dev, dev.children, new_net_id)
logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
new_net_id)
- drbd_params, _, _ = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
logical_id=new_alone_id,
children=dev.children,
size=dev.size,
- params=drbd_params)
+ params={})
+ (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
+ self.cfg)
try:
- _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
+ _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
+ anno_new_drbd,
_GetInstanceInfoText(self.instance), False)
except errors.GenericError:
self.cfg.ReleaseDRBDMinors(self.instance.name)
for idx, dev in enumerate(self.instance.disks):
self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
self.cfg.SetDiskID(dev, self.target_node)
- msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
+ msg = self.rpc.call_blockdev_shutdown(self.target_node,
+ (dev, self.instance)).fail_msg
if msg:
self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
"node: %s" % (idx, msg),
result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
self.new_node],
self.node_secondary_ip,
- self.instance.disks,
+ (self.instance.disks, self.instance),
self.instance.name,
False)
for to_node, to_result in result.items():
elif self.op.iallocator is not None:
# TODO: Implement relocation to other group
- ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC,
- evac_mode=self._MODE2IALLOCATOR[self.op.mode],
- instances=list(self.instance_names))
+ evac_mode = self._MODE2IALLOCATOR[self.op.mode]
+ req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
+ instances=list(self.instance_names))
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
env = {
"DISK": self.op.disk,
"AMOUNT": self.op.amount,
+ "ABSOLUTE": self.op.absolute,
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
return env
self.disk = instance.FindDisk(self.op.disk)
+ if self.op.absolute:
+ self.target = self.op.amount
+ self.delta = self.target - self.disk.size
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested size (%s) is smaller than "
+ "current disk size (%s)" %
+ (utils.FormatUnit(self.target, "h"),
+ utils.FormatUnit(self.disk.size, "h")),
+ errors.ECODE_STATE)
+ else:
+ self.delta = self.op.amount
+ self.target = self.disk.size + self.delta
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested increment (%s) is negative" %
+ utils.FormatUnit(self.delta, "h"),
+ errors.ECODE_INVAL)
+
if instance.disk_template not in (constants.DT_FILE,
constants.DT_SHARED_FILE,
constants.DT_RBD):
# TODO: check the free disk space for file, when that feature will be
# supported
_CheckNodesFreeDiskPerVG(self, nodenames,
- self.disk.ComputeGrowth(self.op.amount))
+ self.disk.ComputeGrowth(self.delta))
def Exec(self, feedback_fn):
"""Execute disk grow.
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
- feedback_fn("Growing disk %s of instance '%s' by %s" %
+ feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
(self.op.disk, instance.name,
- utils.FormatUnit(self.op.amount, "h")))
+ utils.FormatUnit(self.delta, "h"),
+ utils.FormatUnit(self.target, "h")))
# First run all grow ops in dry-run mode
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ True, True)
result.Raise("Grow request failed to node %s" % node)
# We know that (as far as we can test) operations across different
- # nodes will succeed, time to run it for real
+ # nodes will succeed, time to run it for real on the backing storage
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ False, True)
result.Raise("Grow request failed to node %s" % node)
- # TODO: Rewrite code to work properly
- # DRBD goes into sync mode for a short amount of time after executing the
- # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
- # calling "resize" in sync mode fails. Sleeping for a short amount of
- # time is a work-around.
- time.sleep(5)
+ # And now execute it for logical storage, on the primary node
+ node = instance.primary_node
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ False, False)
+ result.Raise("Grow request failed to node %s" % node)
- disk.RecordGrow(self.op.amount)
+ disk.RecordGrow(self.delta)
self.cfg.Update(instance, feedback_fn)
# Changes have been recorded, release node lock
else:
self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
- if self.op.use_locking and level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
+ if self.op.use_locking:
+ if level == locking.LEVEL_NODEGROUP:
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+
+ # Lock all groups used by instances optimistically; this requires going
+ # via the node before it's locked, requiring verification later on
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ frozenset(group_uuid
+ for instance_name in owned_instances
+ for group_uuid in
+ self.cfg.GetInstanceNodeGroups(instance_name))
+
+ elif level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
def CheckPrereq(self):
"""Check prerequisites.
This only checks the optional instance list against the existing names.
"""
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
+
if self.wanted_names is None:
assert self.op.use_locking, "Locking was not used"
- self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
+ self.wanted_names = owned_instances
- self.wanted_instances = \
- map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
+ instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
+
+ if self.op.use_locking:
+ _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
+ None)
+ else:
+ assert not (owned_instances or owned_groups or owned_nodes)
+
+ self.wanted_instances = instances.values()
- def _ComputeBlockdevStatus(self, node, instance_name, dev):
+ def _ComputeBlockdevStatus(self, node, instance, dev):
"""Returns the status of a block device
"""
if result.offline:
return None
- result.Raise("Can't compute disk status for %s" % instance_name)
+ result.Raise("Can't compute disk status for %s" % instance.name)
status = result.payload
if status is None:
"""Compute block device status.
"""
+ (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
+
+ return self._ComputeDiskStatusInner(instance, snode, anno_dev)
+
+ def _ComputeDiskStatusInner(self, instance, snode, dev):
+ """Compute block device status.
+
+ @attention: The device has to be annotated already.
+
+ """
if dev.dev_type in constants.LDS_DRBD:
# we change the snode then (otherwise we use the one passed in)
if dev.logical_id[0] == instance.primary_node:
snode = dev.logical_id[0]
dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
- instance.name, dev)
- dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
+ instance, dev)
+ dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
if dev.children:
- dev_children = map(compat.partial(self._ComputeDiskStatus,
+ dev_children = map(compat.partial(self._ComputeDiskStatusInner,
instance, snode),
dev.children)
else:
cluster = self.cfg.GetClusterInfo()
- pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node
- for i in self.wanted_instances)
- for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes):
+ node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
+ nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
+
+ groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
+ for node in nodes.values()))
+
+ group2name_fn = lambda uuid: groups[uuid].name
+
+ for instance in self.wanted_instances:
+ pnode = nodes[instance.primary_node]
+
if self.op.static or pnode.offline:
remote_state = None
if pnode.offline:
disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
instance.disks)
+ snodes_group_uuids = [nodes[snode_name].group
+ for snode_name in instance.secondary_nodes]
+
result[instance.name] = {
"name": instance.name,
"config_state": instance.admin_state,
"run_state": remote_state,
"pnode": instance.primary_node,
+ "pnode_group_uuid": pnode.group,
+ "pnode_group_name": group2name_fn(pnode.group),
"snodes": instance.secondary_nodes,
+ "snodes_group_uuids": snodes_group_uuids,
+ "snodes_group_names": map(group2name_fn, snodes_group_uuids),
"os": instance.os,
# this happens to be the same format used for hooks
"nics": _NICListToTuple(self, instance.nics),
# Append
absidx = len(container) - 1
elif idx < 0:
- raise IndexError("Not accepting negative indices")
+ raise IndexError("Not accepting negative indices other than -1")
+ elif idx > len(container):
+ raise IndexError("Got %s index %s, but there are only %s" %
+ (kind, idx, len(container)))
else:
absidx = idx
changes = None
if op == constants.DDM_ADD:
+ # Calculate where item will be added
+ if idx == -1:
+ addidx = len(container)
+ else:
+ addidx = idx
+
if create_fn is None:
item = params
else:
- (item, changes) = create_fn(absidx + 1, params, private)
+ (item, changes) = create_fn(addidx, params, private)
if idx == -1:
container.append(item)
else:
assert idx >= 0
+ assert idx <= len(container)
# list.insert does so before the specified index
container.insert(idx, item)
else:
chgdesc.extend(changes)
+def _UpdateIvNames(base_index, disks):
+ """Updates the C{iv_name} attribute of disks.
+
+ @type disks: list of L{objects.Disk}
+
+ """
+ for (idx, disk) in enumerate(disks):
+ disk.iv_name = "disk/%s" % (base_index + idx, )
+
+
class _InstNicModPrivate:
"""Data structure for network interface modifications.
if self.op.hvparams:
_CheckGlobalHvParams(self.op.hvparams)
- self.op.disks = \
- self._UpgradeDiskNicMods("disk", self.op.disks,
- opcodes.OpInstanceSetParams.TestDiskModifications)
- self.op.nics = \
- self._UpgradeDiskNicMods("NIC", self.op.nics,
- opcodes.OpInstanceSetParams.TestNicModifications)
+ self.op.disks = self._UpgradeDiskNicMods(
+ "disk", self.op.disks, opcodes.OpInstanceSetParams.TestDiskModifications)
+ self.op.nics = self._UpgradeDiskNicMods(
+ "NIC", self.op.nics, opcodes.OpInstanceSetParams.TestNicModifications)
# Check disk modifications
self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
private.params = new_params
private.filled = new_filled_params
- return (None, None)
-
def CheckPrereq(self):
"""Check prerequisites.
pnode = instance.primary_node
nodelist = list(instance.all_nodes)
pnode_info = self.cfg.GetNodeInfo(pnode)
- self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams
+ self.diskparams = self.cfg.GetInstanceDiskParams(instance)
# Prepare disk/NIC modifications
self.diskmod = PrepareContainerMods(self.op.disks, None)
snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
snode_group = self.cfg.GetNodeGroup(snode_info.group)
- ipolicy = _CalculateGroupIPolicy(cluster, snode_group)
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ snode_group)
_CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info,
ignore=self.op.ignore_ipolicy)
if pnode_info.group != snode_info.group:
self.be_proposed = cluster.SimpleFillBE(instance.beparams)
be_old = cluster.FillBE(instance)
- # CPU param validation -- checking every time a paramtere is
+ # CPU param validation -- checking every time a parameter is
# changed to cover all cases where either CPU mask or vcpus have
# changed
if (constants.BE_VCPUS in self.be_proposed and
" free memory information" % pnode)
elif instance_info.fail_msg:
self.warn.append("Can't get instance runtime information: %s" %
- instance_info.fail_msg)
+ instance_info.fail_msg)
else:
if instance_info.payload:
current_mem = int(instance_info.payload["memory"])
raise errors.OpPrereqError("This change will prevent the instance"
" from starting, due to %d MB of memory"
" missing on its primary node" %
- miss_mem,
- errors.ECODE_NORES)
+ miss_mem, errors.ECODE_NORES)
if be_new[constants.BE_AUTO_BALANCE]:
for node, nres in nodeinfo.items():
instance.hypervisor)
remote_info.Raise("Error checking node %s" % instance.primary_node)
if not remote_info.payload: # not running already
- raise errors.OpPrereqError("Instance %s is not running" % instance.name,
- errors.ECODE_STATE)
+ raise errors.OpPrereqError("Instance %s is not running" %
+ instance.name, errors.ECODE_STATE)
current_memory = remote_info.payload["memory"]
if (not self.op.force and
self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
raise errors.OpPrereqError("Instance %s must have memory between %d"
" and %d MB of memory unless --force is"
- " given" % (instance.name,
+ " given" %
+ (instance.name,
self.be_proposed[constants.BE_MINMEM],
self.be_proposed[constants.BE_MAXMEM]),
errors.ECODE_INVAL)
if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Disk operations not supported for"
- " diskless instances",
- errors.ECODE_INVAL)
+ " diskless instances", errors.ECODE_INVAL)
def _PrepareNicCreate(_, params, private):
- return self._PrepareNicModification(params, private, None, {},
- cluster, pnode)
+ self._PrepareNicModification(params, private, None, {}, cluster, pnode)
+ return (None, None)
def _PrepareNicMod(_, nic, params, private):
- return self._PrepareNicModification(params, private, nic.ip,
- nic.nicparams, cluster, pnode)
+ self._PrepareNicModification(params, private, nic.ip,
+ nic.nicparams, cluster, pnode)
+ return None
# Verify NIC changes (operating on copy)
nics = instance.nics[:]
instance.name, pnode, [snode],
disk_info, None, None, 0, feedback_fn,
self.diskparams)
+ anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
+ self.diskparams)
info = _GetInstanceInfoText(instance)
- feedback_fn("Creating aditional volumes...")
+ feedback_fn("Creating additional volumes...")
# first, create the missing data and meta devices
- for disk in new_disks:
+ for disk in anno_disks:
# unfortunately this is... not too nice
_CreateSingleBlockDev(self, pnode, instance, disk.children[1],
info, True)
feedback_fn("Initializing DRBD devices...")
# all child devices are in place, we can now create the DRBD devices
- for disk in new_disks:
+ for disk in anno_disks:
for node in [pnode, snode]:
f_create = node == pnode
_CreateSingleBlockDev(self, node, instance, disk, info, f_create)
snode = instance.secondary_nodes[0]
feedback_fn("Converting template to plain")
- old_disks = instance.disks
- new_disks = [d.children[0] for d in old_disks]
+ old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
+ new_disks = [d.children[0] for d in instance.disks]
# copy over size and mode
for parent, child in zip(old_disks, new_disks):
child.size = parent.size
child.mode = parent.mode
+ # this is a DRBD disk, return its port to the pool
+ # NOTE: this must be done right before the call to cfg.Update!
+ for disk in old_disks:
+ tcp_port = disk.logical_id[2]
+ self.cfg.AddTcpUdpPort(tcp_port)
+
# update instance structure
instance.disks = new_disks
instance.disk_template = constants.DT_PLAIN
self.LogWarning("Could not remove metadata for disk %d on node %s,"
" continuing anyway: %s", idx, pnode, msg)
- # this is a DRBD disk, return its port to the pool
- for disk in old_disks:
- tcp_port = disk.logical_id[2]
- self.cfg.AddTcpUdpPort(tcp_port)
-
- # Node resource locks will be released by caller
-
def _CreateNewDisk(self, idx, params, _):
"""Creates a new disk.
"""Removes a disk.
"""
- for node, disk in root.ComputeNodeTree(self.instance.primary_node):
+ (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
+ for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
self.cfg.SetDiskID(disk, node)
msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
if msg:
# Apply disk changes
ApplyContainerMods("disk", instance.disks, result, self.diskmod,
self._CreateNewDisk, self._ModifyDisk, self._RemoveDisk)
+ _UpdateIvNames(0, instance.disks)
if self.op.disk_template:
if __debug__:
if self.req_target_uuids:
# User requested specific target groups
- self.target_uuids = self.req_target_uuids
+ self.target_uuids = frozenset(self.req_target_uuids)
else:
# All groups except those used by the instance are potential targets
self.target_uuids = owned_groups - inst_groups
assert instances == [self.op.instance_name], "Instance not locked"
- ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP,
- instances=instances, target_groups=list(self.target_uuids))
+ req = iallocator.IAReqGroupChange(instances=instances,
+ target_groups=list(self.target_uuids))
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
raise errors.OpPrereqError("Can't compute solution for changing group of"
" instance '%s' using iallocator '%s': %s" %
(self.op.instance_name, self.op.iallocator,
- ial.info),
- errors.ECODE_NORES)
+ ial.info), errors.ECODE_NORES)
jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
"""
REQ_BGL = False
+ def CheckArguments(self):
+ self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
+ ["node", "export"], self.op.use_locking)
+
def ExpandNames(self):
- self.needed_locks = {}
- self.share_locks[locking.LEVEL_NODE] = 1
- if not self.op.nodes:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- else:
- self.needed_locks[locking.LEVEL_NODE] = \
- _GetWantedNodes(self, self.op.nodes)
+ self.expq.ExpandNames(self)
+
+ def DeclareLocks(self, level):
+ self.expq.DeclareLocks(self, level)
def Exec(self, feedback_fn):
- """Compute the list of all the exported system images.
+ result = {}
- @rtype: dict
- @return: a dictionary with the structure node->(export-list)
- where export-list is a list of the instances exported on
- that node.
+ for (node, expname) in self.expq.OldStyleQuery(self):
+ if expname is None:
+ result[node] = False
+ else:
+ result.setdefault(node, []).append(expname)
+
+ return result
+
+
+class _ExportQuery(_QueryBase):
+ FIELDS = query.EXPORT_FIELDS
+
+ #: The node name is not a unique key for this query
+ SORT_FIELD = "node"
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ # The following variables interact with _QueryBase._GetNames
+ if self.names:
+ self.wanted = _GetWantedNodes(lu, self.names)
+ else:
+ self.wanted = locking.ALL_SET
+
+ self.do_locking = self.use_locking
+
+ if self.do_locking:
+ lu.share_locks = _ShareAll()
+ lu.needed_locks = {
+ locking.LEVEL_NODE: self.wanted,
+ }
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
"""
- self.nodes = self.owned_locks(locking.LEVEL_NODE)
- rpcresult = self.rpc.call_export_list(self.nodes)
- result = {}
- for node in rpcresult:
- if rpcresult[node].fail_msg:
- result[node] = False
+ # Locking is not used
+ # TODO
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
+
+ result = []
+
+ for (node, nres) in lu.rpc.call_export_list(nodes).items():
+ if nres.fail_msg:
+ result.append((node, None))
else:
- result[node] = rpcresult[node].payload
+ result.extend((node, expname) for expname in nres.payload)
return result
self.instance.admin_state == constants.ADMINST_UP and
not self.op.shutdown):
raise errors.OpPrereqError("Can not remove instance without shutting it"
- " down before")
+ " down before", errors.ECODE_STATE)
if self.op.mode == constants.EXPORT_MODE_LOCAL:
self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
try:
(key_name, hmac_digest, hmac_salt) = self.x509_key_name
except (TypeError, ValueError), err:
- raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
+ raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
+ errors.ECODE_INVAL)
if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
raise errors.OpPrereqError("HMAC for X509 key name is wrong",
if self.op.diskparams:
for templ in constants.DISK_TEMPLATES:
- if templ not in self.op.diskparams:
- self.op.diskparams[templ] = {}
- utils.ForceDictType(self.op.diskparams[templ], constants.DISK_DT_TYPES)
+ if templ in self.op.diskparams:
+ utils.ForceDictType(self.op.diskparams[templ],
+ constants.DISK_DT_TYPES)
+ self.new_diskparams = self.op.diskparams
+ try:
+ utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
else:
- self.op.diskparams = self.cfg.GetClusterInfo().diskparams
+ self.new_diskparams = {}
if self.op.ipolicy:
cluster = self.cfg.GetClusterInfo()
full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
try:
- objects.InstancePolicy.CheckParameterSyntax(full_ipolicy)
+ objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False)
except errors.ConfigurationError, err:
raise errors.OpPrereqError("Invalid instance policy: %s" % err,
errors.ECODE_INVAL)
uuid=self.group_uuid,
alloc_policy=self.op.alloc_policy,
ndparams=self.op.ndparams,
- diskparams=self.op.diskparams,
+ diskparams=self.new_diskparams,
ipolicy=self.op.ipolicy,
hv_state_static=self.new_hv_state,
disk_state_static=self.new_disk_state)
return query.GroupQueryData(self._cluster,
[self._all_groups[uuid]
for uuid in self.wanted],
- group_to_nodes, group_to_instances)
+ group_to_nodes, group_to_instances,
+ query.GQ_DISKPARAMS in self.requested_data)
class LUGroupQuery(NoHooksLU):
self.needed_locks[locking.LEVEL_INSTANCE] = \
self.cfg.GetNodeGroupInstances(self.group_uuid)
+ @staticmethod
+ def _UpdateAndVerifyDiskParams(old, new):
+ """Updates and verifies disk parameters.
+
+ """
+ new_params = _GetUpdatedParams(old, new)
+ utils.ForceDictType(new_params, constants.DISK_DT_TYPES)
+ return new_params
+
def CheckPrereq(self):
"""Check prerequisites.
if self.op.ndparams:
new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams)
- utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+ utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = new_ndparams
if self.op.diskparams:
- self.new_diskparams = dict()
- for templ in constants.DISK_TEMPLATES:
- if templ not in self.op.diskparams:
- self.op.diskparams[templ] = {}
- new_templ_params = _GetUpdatedParams(self.group.diskparams[templ],
- self.op.diskparams[templ])
- utils.ForceDictType(new_templ_params, constants.DISK_DT_TYPES)
- self.new_diskparams[templ] = new_templ_params
+ diskparams = self.group.diskparams
+ uavdp = self._UpdateAndVerifyDiskParams
+ # For each disktemplate subdict update and verify the values
+ new_diskparams = dict((dt,
+ uavdp(diskparams.get(dt, {}),
+ self.op.diskparams[dt]))
+ for dt in constants.DISK_TEMPLATES
+ if dt in self.op.diskparams)
+ # As we've all subdicts of diskparams ready, lets merge the actual
+ # dict with all updated subdicts
+ self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
+ try:
+ utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
if self.op.hv_state:
self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy)
inst_filter = lambda inst: inst.name in owned_instances
instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values()
+ gmi = ganeti.masterd.instance
violations = \
- _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
- self.group),
+ _ComputeNewInstanceViolations(gmi.CalculateGroupIPolicy(cluster,
+ self.group),
new_ipolicy, instances)
if violations:
# Verify the cluster would not be left group-less.
if len(self.cfg.GetNodeGroupList()) == 1:
- raise errors.OpPrereqError("Group '%s' is the only group,"
- " cannot be removed" %
- self.op.group_name,
+ raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
+ " removed" % self.op.group_name,
errors.ECODE_STATE)
def BuildHooksEnv(self):
self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
- for instance_name in owned_instances:
- inst = self.instances[instance_name]
- assert owned_nodes.issuperset(inst.all_nodes), \
- "Instance %s's nodes changed while we kept the lock" % instance_name
-
- inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
- owned_groups)
-
- assert self.group_uuid in inst_groups, \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ _CheckInstancesNodeGroups(self.cfg, self.instances,
+ owned_groups, owned_nodes, self.group_uuid)
if self.req_target_uuids:
# User requested specific target groups
assert self.group_uuid not in self.target_uuids
- ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP,
- instances=instances, target_groups=self.target_uuids)
+ req = iallocator.IAReqGroupChange(instances=instances,
+ target_groups=self.target_uuids)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
ial.Run(self.op.iallocator)
def ExpandNames(self):
self.group_uuid = None
self.needed_locks = {}
+
if self.op.kind == constants.TAG_NODE:
self.op.name = _ExpandNodeName(self.cfg, self.op.name)
- self.needed_locks[locking.LEVEL_NODE] = self.op.name
+ lock_level = locking.LEVEL_NODE
+ lock_name = self.op.name
elif self.op.kind == constants.TAG_INSTANCE:
self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
- self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+ lock_level = locking.LEVEL_INSTANCE
+ lock_name = self.op.name
elif self.op.kind == constants.TAG_NODEGROUP:
self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
+ lock_level = locking.LEVEL_NODEGROUP
+ lock_name = self.group_uuid
+ else:
+ lock_level = None
+ lock_name = None
+
+ if lock_level and getattr(self.op, "use_locking", True):
+ self.needed_locks[lock_level] = lock_name
# FIXME: Acquire BGL for cluster tag operations (as of this writing it's
# not possible to acquire the BGL based on opcode parameters)
return True
-class IAllocator(object):
- """IAllocator framework.
-
- An IAllocator instance has three sets of attributes:
- - cfg that is needed to query the cluster
- - input data (all members of the _KEYS class attribute are required)
- - four buffer attributes (in|out_data|text), that represent the
- input (to the external script) in text and data structure format,
- and the output from it, again in two formats
- - the result variables from the script (success, info, nodes) for
- easy usage
-
- """
- # pylint: disable=R0902
- # lots of instance attributes
-
- def __init__(self, cfg, rpc_runner, mode, **kwargs):
- self.cfg = cfg
- self.rpc = rpc_runner
- # init buffer variables
- self.in_text = self.out_text = self.in_data = self.out_data = None
- # init all input fields so that pylint is happy
- self.mode = mode
- self.memory = self.disks = self.disk_template = None
- self.os = self.tags = self.nics = self.vcpus = None
- self.hypervisor = None
- self.relocate_from = None
- self.name = None
- self.instances = None
- self.evac_mode = None
- self.target_groups = []
- # computed fields
- self.required_nodes = None
- # init result fields
- self.success = self.info = self.result = None
-
- try:
- (fn, keydata, self._result_check) = self._MODE_DATA[self.mode]
- except KeyError:
- raise errors.ProgrammerError("Unknown mode '%s' passed to the"
- " IAllocator" % self.mode)
-
- keyset = [n for (n, _) in keydata]
-
- for key in kwargs:
- if key not in keyset:
- raise errors.ProgrammerError("Invalid input parameter '%s' to"
- " IAllocator" % key)
- setattr(self, key, kwargs[key])
-
- for key in keyset:
- if key not in kwargs:
- raise errors.ProgrammerError("Missing input parameter '%s' to"
- " IAllocator" % key)
- self._BuildInputData(compat.partial(fn, self), keydata)
-
- def _ComputeClusterData(self):
- """Compute the generic allocator input data.
-
- This is the data that is independent of the actual operation.
-
- """
- cfg = self.cfg
- cluster_info = cfg.GetClusterInfo()
- # cluster data
- data = {
- "version": constants.IALLOCATOR_VERSION,
- "cluster_name": cfg.GetClusterName(),
- "cluster_tags": list(cluster_info.GetTags()),
- "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
- "ipolicy": cluster_info.ipolicy,
- }
- ninfo = cfg.GetAllNodesInfo()
- iinfo = cfg.GetAllInstancesInfo().values()
- i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
-
- # node data
- node_list = [n.name for n in ninfo.values() if n.vm_capable]
-
- if self.mode == constants.IALLOCATOR_MODE_ALLOC:
- hypervisor_name = self.hypervisor
- elif self.mode == constants.IALLOCATOR_MODE_RELOC:
- hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
- else:
- hypervisor_name = cluster_info.primary_hypervisor
-
- node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
- [hypervisor_name])
- node_iinfo = \
- self.rpc.call_all_instances_info(node_list,
- cluster_info.enabled_hypervisors)
-
- data["nodegroups"] = self._ComputeNodeGroupData(cfg)
-
- config_ndata = self._ComputeBasicNodeData(ninfo)
- data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
- i_list, config_ndata)
- assert len(data["nodes"]) == len(ninfo), \
- "Incomplete node data computed"
-
- data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
-
- self.in_data = data
-
- @staticmethod
- def _ComputeNodeGroupData(cfg):
- """Compute node groups data.
-
- """
- cluster = cfg.GetClusterInfo()
- ng = dict((guuid, {
- "name": gdata.name,
- "alloc_policy": gdata.alloc_policy,
- "ipolicy": _CalculateGroupIPolicy(cluster, gdata),
- })
- for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
-
- return ng
-
- @staticmethod
- def _ComputeBasicNodeData(node_cfg):
- """Compute global node data.
-
- @rtype: dict
- @returns: a dict of name: (node dict, node config)
-
- """
- # fill in static (config-based) values
- node_results = dict((ninfo.name, {
- "tags": list(ninfo.GetTags()),
- "primary_ip": ninfo.primary_ip,
- "secondary_ip": ninfo.secondary_ip,
- "offline": ninfo.offline,
- "drained": ninfo.drained,
- "master_candidate": ninfo.master_candidate,
- "group": ninfo.group,
- "master_capable": ninfo.master_capable,
- "vm_capable": ninfo.vm_capable,
- })
- for ninfo in node_cfg.values())
-
- return node_results
-
- @staticmethod
- def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
- node_results):
- """Compute global node data.
-
- @param node_results: the basic node structures as filled from the config
-
- """
- #TODO(dynmem): compute the right data on MAX and MIN memory
- # make a copy of the current dict
- node_results = dict(node_results)
- for nname, nresult in node_data.items():
- assert nname in node_results, "Missing basic data for node %s" % nname
- ninfo = node_cfg[nname]
-
- if not (ninfo.offline or ninfo.drained):
- nresult.Raise("Can't get data for node %s" % nname)
- node_iinfo[nname].Raise("Can't get node instance info from node %s" %
- nname)
- remote_info = _MakeLegacyNodeInfo(nresult.payload)
-
- for attr in ["memory_total", "memory_free", "memory_dom0",
- "vg_size", "vg_free", "cpu_total"]:
- if attr not in remote_info:
- raise errors.OpExecError("Node '%s' didn't return attribute"
- " '%s'" % (nname, attr))
- if not isinstance(remote_info[attr], int):
- raise errors.OpExecError("Node '%s' returned invalid value"
- " for '%s': %s" %
- (nname, attr, remote_info[attr]))
- # compute memory used by primary instances
- i_p_mem = i_p_up_mem = 0
- for iinfo, beinfo in i_list:
- if iinfo.primary_node == nname:
- i_p_mem += beinfo[constants.BE_MAXMEM]
- if iinfo.name not in node_iinfo[nname].payload:
- i_used_mem = 0
- else:
- i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
- i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
- remote_info["memory_free"] -= max(0, i_mem_diff)
-
- if iinfo.admin_state == constants.ADMINST_UP:
- i_p_up_mem += beinfo[constants.BE_MAXMEM]
-
- # compute memory used by instances
- pnr_dyn = {
- "total_memory": remote_info["memory_total"],
- "reserved_memory": remote_info["memory_dom0"],
- "free_memory": remote_info["memory_free"],
- "total_disk": remote_info["vg_size"],
- "free_disk": remote_info["vg_free"],
- "total_cpus": remote_info["cpu_total"],
- "i_pri_memory": i_p_mem,
- "i_pri_up_memory": i_p_up_mem,
- }
- pnr_dyn.update(node_results[nname])
- node_results[nname] = pnr_dyn
-
- return node_results
-
- @staticmethod
- def _ComputeInstanceData(cluster_info, i_list):
- """Compute global instance data.
-
- """
- instance_data = {}
- for iinfo, beinfo in i_list:
- nic_data = []
- for nic in iinfo.nics:
- filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
- nic_dict = {
- "mac": nic.mac,
- "ip": nic.ip,
- "mode": filled_params[constants.NIC_MODE],
- "link": filled_params[constants.NIC_LINK],
- }
- if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
- nic_dict["bridge"] = filled_params[constants.NIC_LINK]
- nic_data.append(nic_dict)
- pir = {
- "tags": list(iinfo.GetTags()),
- "admin_state": iinfo.admin_state,
- "vcpus": beinfo[constants.BE_VCPUS],
- "memory": beinfo[constants.BE_MAXMEM],
- "os": iinfo.os,
- "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
- "nics": nic_data,
- "disks": [{constants.IDISK_SIZE: dsk.size,
- constants.IDISK_MODE: dsk.mode}
- for dsk in iinfo.disks],
- "disk_template": iinfo.disk_template,
- "hypervisor": iinfo.hypervisor,
- }
- pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
- pir["disks"])
- instance_data[iinfo.name] = pir
-
- return instance_data
-
- def _AddNewInstance(self):
- """Add new instance data to allocator structure.
-
- This in combination with _AllocatorGetClusterData will create the
- correct structure needed as input for the allocator.
-
- The checks for the completeness of the opcode must have already been
- done.
-
- """
- disk_space = _ComputeDiskSize(self.disk_template, self.disks)
-
- if self.disk_template in constants.DTS_INT_MIRROR:
- self.required_nodes = 2
- else:
- self.required_nodes = 1
-
- request = {
- "name": self.name,
- "disk_template": self.disk_template,
- "tags": self.tags,
- "os": self.os,
- "vcpus": self.vcpus,
- "memory": self.memory,
- "disks": self.disks,
- "disk_space_total": disk_space,
- "nics": self.nics,
- "required_nodes": self.required_nodes,
- "hypervisor": self.hypervisor,
- }
-
- return request
-
- def _AddRelocateInstance(self):
- """Add relocate instance data to allocator structure.
-
- This in combination with _IAllocatorGetClusterData will create the
- correct structure needed as input for the allocator.
-
- The checks for the completeness of the opcode must have already been
- done.
-
- """
- instance = self.cfg.GetInstanceInfo(self.name)
- if instance is None:
- raise errors.ProgrammerError("Unknown instance '%s' passed to"
- " IAllocator" % self.name)
-
- if instance.disk_template not in constants.DTS_MIRRORED:
- raise errors.OpPrereqError("Can't relocate non-mirrored instances",
- errors.ECODE_INVAL)
-
- if instance.disk_template in constants.DTS_INT_MIRROR and \
- len(instance.secondary_nodes) != 1:
- raise errors.OpPrereqError("Instance has not exactly one secondary node",
- errors.ECODE_STATE)
-
- self.required_nodes = 1
- disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
- disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
-
- request = {
- "name": self.name,
- "disk_space_total": disk_space,
- "required_nodes": self.required_nodes,
- "relocate_from": self.relocate_from,
- }
- return request
-
- def _AddNodeEvacuate(self):
- """Get data for node-evacuate requests.
-
- """
- return {
- "instances": self.instances,
- "evac_mode": self.evac_mode,
- }
-
- def _AddChangeGroup(self):
- """Get data for node-evacuate requests.
-
- """
- return {
- "instances": self.instances,
- "target_groups": self.target_groups,
- }
-
- def _BuildInputData(self, fn, keydata):
- """Build input data structures.
-
- """
- self._ComputeClusterData()
-
- request = fn()
- request["type"] = self.mode
- for keyname, keytype in keydata:
- if keyname not in request:
- raise errors.ProgrammerError("Request parameter %s is missing" %
- keyname)
- val = request[keyname]
- if not keytype(val):
- raise errors.ProgrammerError("Request parameter %s doesn't pass"
- " validation, value %s, expected"
- " type %s" % (keyname, val, keytype))
- self.in_data["request"] = request
-
- self.in_text = serializer.Dump(self.in_data)
-
- _STRING_LIST = ht.TListOf(ht.TString)
- _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
- # pylint: disable=E1101
- # Class '...' has no 'OP_ID' member
- "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
- opcodes.OpInstanceMigrate.OP_ID,
- opcodes.OpInstanceReplaceDisks.OP_ID])
- })))
-
- _NEVAC_MOVED = \
- ht.TListOf(ht.TAnd(ht.TIsLength(3),
- ht.TItems([ht.TNonEmptyString,
- ht.TNonEmptyString,
- ht.TListOf(ht.TNonEmptyString),
- ])))
- _NEVAC_FAILED = \
- ht.TListOf(ht.TAnd(ht.TIsLength(2),
- ht.TItems([ht.TNonEmptyString,
- ht.TMaybeString,
- ])))
- _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
- ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
-
- _MODE_DATA = {
- constants.IALLOCATOR_MODE_ALLOC:
- (_AddNewInstance,
- [
- ("name", ht.TString),
- ("memory", ht.TInt),
- ("disks", ht.TListOf(ht.TDict)),
- ("disk_template", ht.TString),
- ("os", ht.TString),
- ("tags", _STRING_LIST),
- ("nics", ht.TListOf(ht.TDict)),
- ("vcpus", ht.TInt),
- ("hypervisor", ht.TString),
- ], ht.TList),
- constants.IALLOCATOR_MODE_RELOC:
- (_AddRelocateInstance,
- [("name", ht.TString), ("relocate_from", _STRING_LIST)],
- ht.TList),
- constants.IALLOCATOR_MODE_NODE_EVAC:
- (_AddNodeEvacuate, [
- ("instances", _STRING_LIST),
- ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
- ], _NEVAC_RESULT),
- constants.IALLOCATOR_MODE_CHG_GROUP:
- (_AddChangeGroup, [
- ("instances", _STRING_LIST),
- ("target_groups", _STRING_LIST),
- ], _NEVAC_RESULT),
- }
-
- def Run(self, name, validate=True, call_fn=None):
- """Run an instance allocator and return the results.
-
- """
- if call_fn is None:
- call_fn = self.rpc.call_iallocator_runner
-
- result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
- result.Raise("Failure while running the iallocator script")
-
- self.out_text = result.payload
- if validate:
- self._ValidateResult()
-
- def _ValidateResult(self):
- """Process the allocator results.
-
- This will process and if successful save the result in
- self.out_data and the other parameters.
-
- """
- try:
- rdict = serializer.Load(self.out_text)
- except Exception, err:
- raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
-
- if not isinstance(rdict, dict):
- raise errors.OpExecError("Can't parse iallocator results: not a dict")
-
- # TODO: remove backwards compatiblity in later versions
- if "nodes" in rdict and "result" not in rdict:
- rdict["result"] = rdict["nodes"]
- del rdict["nodes"]
-
- for key in "success", "info", "result":
- if key not in rdict:
- raise errors.OpExecError("Can't parse iallocator results:"
- " missing key '%s'" % key)
- setattr(self, key, rdict[key])
-
- if not self._result_check(self.result):
- raise errors.OpExecError("Iallocator returned invalid result,"
- " expected %s, got %s" %
- (self._result_check, self.result),
- errors.ECODE_INVAL)
-
- if self.mode == constants.IALLOCATOR_MODE_RELOC:
- assert self.relocate_from is not None
- assert self.required_nodes == 1
-
- node2group = dict((name, ndata["group"])
- for (name, ndata) in self.in_data["nodes"].items())
-
- fn = compat.partial(self._NodesToGroups, node2group,
- self.in_data["nodegroups"])
-
- instance = self.cfg.GetInstanceInfo(self.name)
- request_groups = fn(self.relocate_from + [instance.primary_node])
- result_groups = fn(rdict["result"] + [instance.primary_node])
-
- if self.success and not set(result_groups).issubset(request_groups):
- raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
- " differ from original groups (%s)" %
- (utils.CommaJoin(result_groups),
- utils.CommaJoin(request_groups)))
-
- elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
- assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES
-
- self.out_data = rdict
-
- @staticmethod
- def _NodesToGroups(node2group, groups, nodes):
- """Returns a list of unique group names for a list of nodes.
-
- @type node2group: dict
- @param node2group: Map from node name to group UUID
- @type groups: dict
- @param groups: Group information
- @type nodes: list
- @param nodes: Node names
-
- """
- result = set()
-
- for node in nodes:
- try:
- group_uuid = node2group[node]
- except KeyError:
- # Ignore unknown node
- pass
- else:
- try:
- group = groups[group_uuid]
- except KeyError:
- # Can't find group, let's use UUID
- group_name = group_uuid
- else:
- group_name = group["name"]
-
- result.add(group_name)
-
- return sorted(result)
-
-
class LUTestAllocator(NoHooksLU):
"""Run allocator tests.
This checks the opcode parameters depending on the director and mode test.
"""
- if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
+ if self.op.mode in (constants.IALLOCATOR_MODE_ALLOC,
+ constants.IALLOCATOR_MODE_MULTI_ALLOC):
for attr in ["memory", "disks", "disk_template",
"os", "tags", "nics", "vcpus"]:
if not hasattr(self.op, attr):
"""
if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
- ial = IAllocator(self.cfg, self.rpc,
- mode=self.op.mode,
- name=self.op.name,
- memory=self.op.memory,
- disks=self.op.disks,
- disk_template=self.op.disk_template,
- os=self.op.os,
- tags=self.op.tags,
- nics=self.op.nics,
- vcpus=self.op.vcpus,
- hypervisor=self.op.hypervisor,
- )
+ req = iallocator.IAReqInstanceAlloc(name=self.op.name,
+ memory=self.op.memory,
+ disks=self.op.disks,
+ disk_template=self.op.disk_template,
+ os=self.op.os,
+ tags=self.op.tags,
+ nics=self.op.nics,
+ vcpus=self.op.vcpus,
+ spindle_use=self.op.spindle_use,
+ hypervisor=self.op.hypervisor)
elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
- ial = IAllocator(self.cfg, self.rpc,
- mode=self.op.mode,
- name=self.op.name,
- relocate_from=list(self.relocate_from),
- )
+ req = iallocator.IAReqRelocate(name=self.op.name,
+ relocate_from=list(self.relocate_from))
elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
- ial = IAllocator(self.cfg, self.rpc,
- mode=self.op.mode,
- instances=self.op.instances,
- target_groups=self.op.target_groups)
+ req = iallocator.IAReqGroupChange(instances=self.op.instances,
+ target_groups=self.op.target_groups)
elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
- ial = IAllocator(self.cfg, self.rpc,
- mode=self.op.mode,
- instances=self.op.instances,
- evac_mode=self.op.evac_mode)
+ req = iallocator.IAReqNodeEvac(instances=self.op.instances,
+ evac_mode=self.op.evac_mode)
+ elif self.op.mode == constants.IALLOCATOR_MODE_MULTI_ALLOC:
+ disk_template = self.op.disk_template
+ insts = [iallocator.IAReqInstanceAlloc(name="%s%s" % (self.op.name, idx),
+ memory=self.op.memory,
+ disks=self.op.disks,
+ disk_template=disk_template,
+ os=self.op.os,
+ tags=self.op.tags,
+ nics=self.op.nics,
+ vcpus=self.op.vcpus,
+ spindle_use=self.op.spindle_use,
+ hypervisor=self.op.hypervisor)
+ for idx in range(self.op.count)]
+ req = iallocator.IAReqMultiInstanceAlloc(instances=insts)
else:
raise errors.ProgrammerError("Uncatched mode %s in"
" LUTestAllocator.Exec", self.op.mode)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
if self.op.direction == constants.IALLOCATOR_DIR_IN:
result = ial.in_text
else:
#: Query type implementations
_QUERY_IMPL = {
+ constants.QR_CLUSTER: _ClusterQuery,
constants.QR_INSTANCE: _InstanceQuery,
constants.QR_NODE: _NodeQuery,
constants.QR_GROUP: _GroupQuery,
constants.QR_OS: _OsQuery,
+ constants.QR_EXPORT: _ExportQuery,
}
assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP