import os.path
import time
import re
-import platform
import logging
import copy
import OpenSSL
import shutil
import itertools
import operator
+import ipaddr
from ganeti import ssh
from ganeti import utils
from ganeti import opcodes
from ganeti import ht
from ganeti import rpc
+from ganeti import runtime
+from ganeti import network
import ganeti.masterd.instance # pylint: disable=W0611
"""Data container for LU results with jobs.
Instances of this class returned from L{LogicalUnit.Exec} will be recognized
- by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+ by L{mcpu._ProcessResult}. The latter will then submit the jobs
contained in the C{jobs} attribute and include the job IDs in the opcode
result.
#: Attribute holding field definitions
FIELDS = None
+ #: Field to sort by
+ SORT_FIELD = "name"
+
def __init__(self, qfilter, fields, use_locking):
"""Initializes this class.
self.use_locking = use_locking
self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
- namefield="name")
+ namefield=self.SORT_FIELD)
self.requested_data = self.query.RequestedData()
self.names = self.query.RequestedNames()
})
+def _AnnotateDiskParams(instance, devs, cfg):
+ """Little helper wrapper to the rpc annotation method.
+
+ @param instance: The instance object
+ @type devs: List of L{objects.Disk}
+ @param devs: The root devices (not any of its children!)
+ @param cfg: The config object
+ @returns The annotated disk copies
+ @see L{rpc.AnnotateDiskParams}
+
+ """
+ return rpc.AnnotateDiskParams(instance.disk_template, devs,
+ cfg.GetInstanceDiskParams(instance))
+
+
+def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
+ cur_group_uuid):
+ """Checks if node groups for locked instances are still correct.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: Cluster configuration
+ @type instances: dict; string as key, L{objects.Instance} as value
+ @param instances: Dictionary, instance name as key, instance object as value
+ @type owned_groups: iterable of string
+ @param owned_groups: List of owned groups
+ @type owned_nodes: iterable of string
+ @param owned_nodes: List of owned nodes
+ @type cur_group_uuid: string or None
+ @param cur_group_uuid: Optional group UUID to check against instance's groups
+
+ """
+ for (name, inst) in instances.items():
+ assert owned_nodes.issuperset(inst.all_nodes), \
+ "Instance %s's nodes changed while we kept the lock" % name
+
+ inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
+
+ assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
+ "Instance %s has no node in group %s" % (name, cur_group_uuid)
+
+
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
"""Checks if the owned node groups are still correct for an instance.
use_none=use_none,
use_default=use_default)
else:
- if not value or value == [constants.VALUE_DEFAULT]:
+ if (not value or value == [constants.VALUE_DEFAULT] or
+ value == constants.VALUE_DEFAULT):
if group_policy:
del ipolicy[key]
else:
# in a nicer way
ipolicy[key] = list(value)
try:
- objects.InstancePolicy.CheckParameterSyntax(ipolicy)
+ objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
except errors.ConfigurationError, err:
raise errors.OpPrereqError("Invalid instance policy: %s" % err,
errors.ECODE_INVAL)
(instance.name, msg), errors.ECODE_STATE)
-def _ComputeMinMaxSpec(name, ipolicy, value):
+def _ComputeMinMaxSpec(name, qualifier, ipolicy, value):
"""Computes if value is in the desired range.
@param name: name of the parameter for which we perform the check
+ @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
+ not just 'disk')
@param ipolicy: dictionary containing min, max and std values
@param value: actual value that we want to use
@return: None or element not meeting the criteria
max_v = ipolicy[constants.ISPECS_MAX].get(name, value)
min_v = ipolicy[constants.ISPECS_MIN].get(name, value)
if value > max_v or min_v > value:
+ if qualifier:
+ fqn = "%s/%s" % (name, qualifier)
+ else:
+ fqn = name
return ("%s value %s is not in range [%s, %s]" %
- (name, value, min_v, max_v))
+ (fqn, value, min_v, max_v))
return None
def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
- nic_count, disk_sizes,
+ nic_count, disk_sizes, spindle_use,
_compute_fn=_ComputeMinMaxSpec):
"""Verifies ipolicy against provided specs.
@param nic_count: Number of nics used
@type disk_sizes: list of ints
@param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
+ @type spindle_use: int
+ @param spindle_use: The number of spindles this instance uses
@param _compute_fn: The compute function (unittest only)
@return: A list of violations, or an empty list of no violations are found
assert disk_count == len(disk_sizes)
test_settings = [
- (constants.ISPEC_MEM_SIZE, mem_size),
- (constants.ISPEC_CPU_COUNT, cpu_count),
- (constants.ISPEC_DISK_COUNT, disk_count),
- (constants.ISPEC_NIC_COUNT, nic_count),
- ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes)
+ (constants.ISPEC_MEM_SIZE, "", mem_size),
+ (constants.ISPEC_CPU_COUNT, "", cpu_count),
+ (constants.ISPEC_DISK_COUNT, "", disk_count),
+ (constants.ISPEC_NIC_COUNT, "", nic_count),
+ (constants.ISPEC_SPINDLE_USE, "", spindle_use),
+ ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
+ for idx, d in enumerate(disk_sizes)]
return filter(None,
- (_compute_fn(name, ipolicy, value)
- for (name, value) in test_settings))
+ (_compute_fn(name, qualifier, ipolicy, value)
+ for (name, qualifier, value) in test_settings))
def _ComputeIPolicyInstanceViolation(ipolicy, instance,
"""
mem_size = instance.beparams.get(constants.BE_MAXMEM, None)
cpu_count = instance.beparams.get(constants.BE_VCPUS, None)
+ spindle_use = instance.beparams.get(constants.BE_SPINDLE_USE, None)
disk_count = len(instance.disks)
disk_sizes = [disk.size for disk in instance.disks]
nic_count = len(instance.nics)
return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
- disk_sizes)
+ disk_sizes, spindle_use)
def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec,
disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
+ spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
- disk_sizes)
+ disk_sizes, spindle_use)
def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
@param old_ipolicy: The current (still in-place) ipolicy
@param new_ipolicy: The new (to become) ipolicy
@param instances: List of instances to verify
- @return: A list of instances which violates the new ipolicy but did not before
+ @return: A list of instances which violates the new ipolicy but
+ did not before
"""
- return (_ComputeViolatingInstances(old_ipolicy, instances) -
- _ComputeViolatingInstances(new_ipolicy, instances))
+ return (_ComputeViolatingInstances(new_ipolicy, instances) -
+ _ComputeViolatingInstances(old_ipolicy, instances))
def _ExpandItemName(fn, name, kind):
"""Wrapper over L{_ExpandItemName} for instance."""
return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
+def _BuildNetworkHookEnv(name, network, gateway, network6, gateway6,
+ network_type, mac_prefix, tags):
+ env = dict()
+ if name:
+ env["NETWORK_NAME"] = name
+ if network:
+ env["NETWORK_SUBNET"] = network
+ if gateway:
+ env["NETWORK_GATEWAY"] = gateway
+ if network6:
+ env["NETWORK_SUBNET6"] = network6
+ if gateway6:
+ env["NETWORK_GATEWAY6"] = gateway6
+ if mac_prefix:
+ env["NETWORK_MAC_PREFIX"] = mac_prefix
+ if network_type:
+ env["NETWORK_TYPE"] = network_type
+ if tags:
+ env["NETWORK_TAGS"] = " ".join(tags)
+
+ return env
+
+
+def _BuildNetworkHookEnvByObject(lu, network):
+ args = {
+ "name": network.name,
+ "network": network.network,
+ "gateway": network.gateway,
+ "network6": network.network6,
+ "gateway6": network.gateway6,
+ "network_type": network.network_type,
+ "mac_prefix": network.mac_prefix,
+ "tags" : network.tags,
+ }
+ return _BuildNetworkHookEnv(**args)
+
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
minmem, maxmem, vcpus, nics, disk_template, disks,
@type vcpus: string
@param vcpus: the count of VCPUs the instance has
@type nics: list
- @param nics: list of tuples (ip, mac, mode, link) representing
+ @param nics: list of tuples (ip, mac, mode, link, network) representing
the NICs the instance has
@type disk_template: string
@param disk_template: the disk template of the instance
}
if nics:
nic_count = len(nics)
- for idx, (ip, mac, mode, link) in enumerate(nics):
+ for idx, (ip, mac, mode, link, network, netinfo) in enumerate(nics):
if ip is None:
ip = ""
env["INSTANCE_NIC%d_IP" % idx] = ip
env["INSTANCE_NIC%d_MAC" % idx] = mac
env["INSTANCE_NIC%d_MODE" % idx] = mode
env["INSTANCE_NIC%d_LINK" % idx] = link
+ if network:
+ env["INSTANCE_NIC%d_NETWORK" % idx] = network
+ if netinfo:
+ nobj = objects.Network.FromDict(netinfo)
+ if nobj.network:
+ env["INSTANCE_NIC%d_NETWORK_SUBNET" % idx] = nobj.network
+ if nobj.gateway:
+ env["INSTANCE_NIC%d_NETWORK_GATEWAY" % idx] = nobj.gateway
+ if nobj.network6:
+ env["INSTANCE_NIC%d_NETWORK_SUBNET6" % idx] = nobj.network6
+ if nobj.gateway6:
+ env["INSTANCE_NIC%d_NETWORK_GATEWAY6" % idx] = nobj.gateway6
+ if nobj.mac_prefix:
+ env["INSTANCE_NIC%d_NETWORK_MAC_PREFIX" % idx] = nobj.mac_prefix
+ if nobj.network_type:
+ env["INSTANCE_NIC%d_NETWORK_TYPE" % idx] = nobj.network_type
+ if nobj.tags:
+ env["INSTANCE_NIC%d_NETWORK_TAGS" % idx] = " ".join(nobj.tags)
if mode == constants.NIC_MODE_BRIDGED:
env["INSTANCE_NIC%d_BRIDGE" % idx] = link
else:
return env
+def _NICToTuple(lu, nic):
+ """Build a tupple of nic information.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type nic: L{objects.NIC}
+ @param nic: nic to convert to hooks tuple
+
+ """
+ cluster = lu.cfg.GetClusterInfo()
+ ip = nic.ip
+ mac = nic.mac
+ filled_params = cluster.SimpleFillNIC(nic.nicparams)
+ mode = filled_params[constants.NIC_MODE]
+ link = filled_params[constants.NIC_LINK]
+ network = nic.network
+ netinfo = None
+ if network:
+ net_uuid = lu.cfg.LookupNetwork(network)
+ if net_uuid:
+ nobj = lu.cfg.GetNetwork(net_uuid)
+ netinfo = objects.Network.ToDict(nobj)
+ return (ip, mac, mode, link, network, netinfo)
def _NICListToTuple(lu, nics):
"""Build a list of nic information tuples.
hooks_nics = []
cluster = lu.cfg.GetClusterInfo()
for nic in nics:
- ip = nic.ip
- mac = nic.mac
- filled_params = cluster.SimpleFillNIC(nic.nicparams)
- mode = filled_params[constants.NIC_MODE]
- link = filled_params[constants.NIC_LINK]
- hooks_nics.append((ip, mac, mode, link))
+ hooks_nics.append(_NICToTuple(lu, nic))
return hooks_nics
-
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
"""Builds instance related env variables for hooks from an object.
for dev in instance.disks:
cfg.SetDiskID(dev, node_name)
- result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
+ result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
+ instance))
result.Raise("Failed to get disk status from node %s" % node_name,
prereq=prereq, ecode=errors.ECODE_ENVIRON)
"""Verifies the cluster config.
"""
- REQ_BGL = True
+ REQ_BGL = False
def _VerifyHVP(self, hvp_data):
"""Verifies locally the syntax of the hypervisor parameters.
self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
def ExpandNames(self):
- # Information can be safely retrieved as the BGL is acquired in exclusive
- # mode
- assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
+ self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
+ self.share_locks = _ShareAll()
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ # Retrieve all information
self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
self.all_node_info = self.cfg.GetAllNodesInfo()
self.all_inst_info = self.cfg.GetAllInstancesInfo()
- self.needed_locks = {}
def Exec(self, feedback_fn):
"""Verify integrity of cluster, performing various test on nodes.
ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info)
err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
- _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err)
+ _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err))
for node in node_vol_should:
n_img = node_image[node]
node_disks[nname] = disks
- # Creating copies as SetDiskID below will modify the objects and that can
- # lead to incorrect data returned from nodes
- devonly = [dev.Copy() for (_, dev) in disks]
-
- for dev in devonly:
- self.cfg.SetDiskID(dev, nname)
+ # _AnnotateDiskParams makes already copies of the disks
+ devonly = []
+ for (inst, dev) in disks:
+ (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
+ self.cfg.SetDiskID(anno_disk, nname)
+ devonly.append(anno_disk)
node_disks_devonly[nname] = devonly
for instance in self.my_inst_names:
inst_config = self.my_inst_info[instance]
+ if inst_config.admin_state == constants.ADMINST_OFFLINE:
+ i_offline += 1
for nname in inst_config.all_nodes:
if nname not in node_image:
if master_node not in self.my_node_info:
additional_nodes.append(master_node)
vf_node_info.append(self.all_node_info[master_node])
- # Add the first vm_capable node we find which is not included
+ # Add the first vm_capable node we find which is not included,
+ # excluding the master node (which we already have)
for node in absent_nodes:
nodeinfo = self.all_node_info[node]
- if nodeinfo.vm_capable and not nodeinfo.offline:
+ if (nodeinfo.vm_capable and not nodeinfo.offline and
+ node != master_node):
additional_nodes.append(node)
vf_node_info.append(self.all_node_info[node])
break
non_primary_inst = set(nimg.instances).difference(nimg.pinst)
for inst in non_primary_inst:
- # FIXME: investigate best way to handle offline insts
- if inst.admin_state == constants.ADMINST_OFFLINE:
- if verbose:
- feedback_fn("* Skipping offline instance %s" % inst.name)
- i_offline += 1
- continue
test = inst in self.all_inst_info
_ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
"instance should not run on node %s", node_i.name)
self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
- for (instance_name, inst) in self.instances.items():
- assert owned_nodes.issuperset(inst.all_nodes), \
- "Instance %s's nodes changed while we kept the lock" % instance_name
-
- inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
- owned_groups)
-
- assert self.group_uuid in inst_groups, \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ _CheckInstancesNodeGroups(self.cfg, self.instances,
+ owned_groups, owned_nodes, self.group_uuid)
def Exec(self, feedback_fn):
"""Verify integrity of cluster disks.
if self.op.diskparams:
for dt_params in self.op.diskparams.values():
utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
+ try:
+ utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
if violations:
self.LogWarning("After the ipolicy change the following instances"
" violate them: %s",
- utils.CommaJoin(violations))
+ utils.CommaJoin(utils.NiceSort(violations)))
if self.op.nicparams:
utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
if cluster.modify_etc_hosts:
files_all.add(constants.ETC_HOSTS)
+ if cluster.use_external_mip_script:
+ files_all.add(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+
# Files which are optional, these must:
# - be present in one other category as well
# - either exist or not exist on all nodes of that category (mc, vm all)
if not redist:
files_mc.add(constants.CLUSTER_CONF_FILE)
- # FIXME: this should also be replicated but Ganeti doesn't support files_mc
- # replication
- files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
-
# Files which should only be on VM-capable nodes
files_vm = set(filename
for hv_name in cluster.enabled_hypervisors
master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
online_nodes = lu.cfg.GetOnlineNodeList()
- vm_nodes = lu.cfg.GetVmCapableNodeList()
+ online_set = frozenset(online_nodes)
+ vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
if additional_nodes is not None:
online_nodes.extend(additional_nodes)
max_time = 0
done = True
cumul_degraded = False
- rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
+ rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
msg = rstats.fail_msg
if msg:
lu.LogWarning("Can't get any data from node %s: %s", node, msg)
return not cumul_degraded
-def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
+def _BlockdevFind(lu, node, dev, instance):
+ """Wrapper around call_blockdev_find to annotate diskparams.
+
+ @param lu: A reference to the lu object
+ @param node: The node to call out
+ @param dev: The device to find
+ @param instance: The instance object the device belongs to
+ @returns The result of the rpc call
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return lu.rpc.call_blockdev_find(node, disk)
+
+
+def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
+ """Wrapper around L{_CheckDiskConsistencyInner}.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
+ ldisk=ldisk)
+
+
+def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
+ ldisk=False):
"""Check that mirrors are not degraded.
+ @attention: The device has to be annotated already.
+
The ldisk parameter, if True, will change the test from the
is_degraded attribute (which represents overall non-ok status for
the device(s)) to the ldisk (representing the local storage status).
if dev.children:
for child in dev.children:
- result = result and _CheckDiskConsistency(lu, child, node, on_primary)
+ result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
+ on_primary)
return result
"""Logical unit for OOB handling.
"""
- REG_BGL = False
+ REQ_BGL = False
_SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
def ExpandNames(self):
return self.oq.OldStyleQuery(self)
+class _ExtStorageQuery(_QueryBase):
+ FIELDS = query.EXTSTORAGE_FIELDS
+
+ def ExpandNames(self, lu):
+ # Lock all nodes in shared mode
+ # Temporary removal of locks, should be reverted later
+ # TODO: reintroduce locks when they are lighter-weight
+ lu.needed_locks = {}
+ #self.share_locks[locking.LEVEL_NODE] = 1
+ #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ # The following variables interact with _QueryBase._GetNames
+ if self.names:
+ self.wanted = self.names
+ else:
+ self.wanted = locking.ALL_SET
+
+ self.do_locking = self.use_locking
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ @staticmethod
+ def _DiagnoseByProvider(rlist):
+ """Remaps a per-node return list into an a per-provider per-node dictionary
+
+ @param rlist: a map with node names as keys and ExtStorage objects as values
+
+ @rtype: dict
+ @return: a dictionary with extstorage providers as keys and as
+ value another map, with nodes as keys and tuples of
+ (path, status, diagnose, parameters) as values, eg::
+
+ {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
+ "node2": [(/srv/..., False, "missing file")]
+ "node3": [(/srv/..., True, "", [])]
+ }
+
+ """
+ all_es = {}
+ # we build here the list of nodes that didn't fail the RPC (at RPC
+ # level), so that nodes with a non-responding node daemon don't
+ # make all OSes invalid
+ good_nodes = [node_name for node_name in rlist
+ if not rlist[node_name].fail_msg]
+ for node_name, nr in rlist.items():
+ if nr.fail_msg or not nr.payload:
+ continue
+ for (name, path, status, diagnose, params) in nr.payload:
+ if name not in all_es:
+ # build a list of nodes for this os containing empty lists
+ # for each node in node_list
+ all_es[name] = {}
+ for nname in good_nodes:
+ all_es[name][nname] = []
+ # convert params from [name, help] to (name, help)
+ params = [tuple(v) for v in params]
+ all_es[name][node_name].append((path, status, diagnose, params))
+ return all_es
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
+
+ """
+ # Locking is not used
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ valid_nodes = [node.name
+ for node in lu.cfg.GetAllNodesInfo().values()
+ if not node.offline and node.vm_capable]
+ pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
+
+ data = {}
+
+ nodegroup_list = lu.cfg.GetNodeGroupList()
+
+ for (es_name, es_data) in pol.items():
+ # For every provider compute the nodegroup validity.
+ # To do this we need to check the validity of each node in es_data
+ # and then construct the corresponding nodegroup dict:
+ # { nodegroup1: status
+ # nodegroup2: status
+ # }
+ ndgrp_data = {}
+ for nodegroup in nodegroup_list:
+ ndgrp = lu.cfg.GetNodeGroup(nodegroup)
+
+ nodegroup_nodes = ndgrp.members
+ nodegroup_name = ndgrp.name
+ node_statuses = []
+
+ for node in nodegroup_nodes:
+ if node in valid_nodes:
+ if es_data[node] != []:
+ node_status = es_data[node][0][1]
+ node_statuses.append(node_status)
+ else:
+ node_statuses.append(False)
+
+ if False in node_statuses:
+ ndgrp_data[nodegroup_name] = False
+ else:
+ ndgrp_data[nodegroup_name] = True
+
+ # Compute the provider's parameters
+ parameters = set()
+ for idx, esl in enumerate(es_data.values()):
+ valid = bool(esl and esl[0][1])
+ if not valid:
+ break
+
+ node_params = esl[0][3]
+ if idx == 0:
+ # First entry
+ parameters.update(node_params)
+ else:
+ # Filter out inconsistent values
+ parameters.intersection_update(node_params)
+
+ params = list(parameters)
+
+ # Now fill all the info for this provider
+ info = query.ExtStorageInfo(name=es_name, node_status=es_data,
+ nodegroup_status=ndgrp_data,
+ parameters=params)
+
+ data[es_name] = info
+
+ # Prepare data in requested order
+ return [data[name] for name in self._GetNames(lu, pol.keys(), None)
+ if name in data]
+
+
+class LUExtStorageDiagnose(NoHooksLU):
+ """Logical unit for ExtStorage diagnose/query.
+
+ """
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
+ self.op.output_fields, False)
+
+ def ExpandNames(self):
+ self.eq.ExpandNames(self)
+
+ def Exec(self, feedback_fn):
+ return self.eq.OldStyleQuery(self)
+
+
class LUNodeRemove(LogicalUnit):
"""Logical unit for removing a node.
if self.op.disk_state:
self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+ # TODO: If we need to have multiple DnsOnlyRunner we probably should make
+ # it a property on the base class.
+ result = rpc.DnsOnlyRunner().call_version([node])[node]
+ result.Raise("Can't get version information from node %s" % node)
+ if constants.PROTOCOL_VERSION == result.payload:
+ logging.info("Communication to node %s fine, sw version %s match",
+ node, result.payload)
+ else:
+ raise errors.OpPrereqError("Version mismatch master version %s,"
+ " node version %s" %
+ (constants.PROTOCOL_VERSION, result.payload),
+ errors.ECODE_ENVIRON)
+
def Exec(self, feedback_fn):
"""Adds the new node to the cluster.
if self.op.disk_state:
new_node.disk_state_static = self.new_disk_state
- # check connectivity
- result = self.rpc.call_version([node])[node]
- result.Raise("Can't get version information from node %s" % node)
- if constants.PROTOCOL_VERSION == result.payload:
- logging.info("Communication to node %s fine, sw version %s match",
- node, result.payload)
- else:
- raise errors.OpExecError("Version mismatch master version %s,"
- " node version %s" %
- (constants.PROTOCOL_VERSION, result.payload))
-
# Add node to our /etc/hosts, and add key to known_hosts
if self.cfg.GetClusterInfo().modify_etc_hosts:
master_node = self.cfg.GetMasterNode()
if mc_remaining < mc_should:
raise errors.OpPrereqError("Not enough master candidates, please"
" pass auto promote option to allow"
- " promotion", errors.ECODE_STATE)
+ " promotion (--auto-promote or RAPI"
+ " auto_promote=True)", errors.ECODE_STATE)
self.old_flags = old_flags = (node.master_candidate,
node.drained, node.offline)
if old_role == self._ROLE_OFFLINE and new_role != old_role:
# Trying to transition out of offline status
- # TODO: Use standard RPC runner, but make sure it works when the node is
- # still marked offline
- result = rpc.BootstrapRunner().call_version([node.name])[node.name]
+ result = self.rpc.call_version([node.name])[node.name]
if result.fail_msg:
raise errors.OpPrereqError("Node %s is being de-offlined but fails"
" to report its version: %s" %
"config_version": constants.CONFIG_VERSION,
"os_api_version": max(constants.OS_API_VERSIONS),
"export_version": constants.EXPORT_VERSION,
- "architecture": (platform.architecture()[0], platform.machine()),
+ "architecture": runtime.GetArchInfo(),
"name": cluster.cluster_name,
"master": cluster.master_node,
"default_hypervisor": cluster.primary_hypervisor,
"ipolicy": cluster.ipolicy,
"nicparams": cluster.nicparams,
"ndparams": cluster.ndparams,
+ "diskparams": cluster.diskparams,
"candidate_pool_size": cluster.candidate_pool_size,
"master_netdev": cluster.master_netdev,
"master_netmask": cluster.master_netmask,
"""
REQ_BGL = False
- _FIELDS_DYNAMIC = utils.FieldSet()
- _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
- "watcher_pause", "volume_group_name")
def CheckArguments(self):
- _CheckOutputFields(static=self._FIELDS_STATIC,
- dynamic=self._FIELDS_DYNAMIC,
- selected=self.op.output_fields)
+ self.cq = _ClusterQuery(None, self.op.output_fields, False)
def ExpandNames(self):
- self.needed_locks = {}
+ self.cq.ExpandNames(self)
+
+ def DeclareLocks(self, level):
+ self.cq.DeclareLocks(self, level)
def Exec(self, feedback_fn):
- """Dump a representation of the cluster config to the standard output.
-
- """
- values = []
- for field in self.op.output_fields:
- if field == "cluster_name":
- entry = self.cfg.GetClusterName()
- elif field == "master_node":
- entry = self.cfg.GetMasterNode()
- elif field == "drain_flag":
- entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
- elif field == "watcher_pause":
- entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
- elif field == "volume_group_name":
- entry = self.cfg.GetVGName()
- else:
- raise errors.ParameterError(field)
- values.append(entry)
- return values
+ result = self.cq.OldStyleQuery(self)
+
+ assert len(result) == 1
+
+ return result[0]
+
+
+class _ClusterQuery(_QueryBase):
+ FIELDS = query.CLUSTER_FIELDS
+
+ #: Do not sort (there is only one item)
+ SORT_FIELD = None
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ # The following variables interact with _QueryBase._GetNames
+ self.wanted = locking.ALL_SET
+ self.do_locking = self.use_locking
+
+ if self.do_locking:
+ raise errors.OpPrereqError("Can not use locking for cluster queries",
+ errors.ECODE_INVAL)
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
+
+ """
+ # Locking is not used
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ if query.CQ_CONFIG in self.requested_data:
+ cluster = lu.cfg.GetClusterInfo()
+ else:
+ cluster = NotImplemented
+
+ if query.CQ_QUEUE_DRAINED in self.requested_data:
+ drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+ else:
+ drain_flag = NotImplemented
+
+ if query.CQ_WATCHER_PAUSE in self.requested_data:
+ watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+ else:
+ watcher_pause = NotImplemented
+
+ return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
class LUInstanceActivateDisks(NoHooksLU):
def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
- ignore_size=False):
+ ignore_size=False, check=True):
"""Prepare the block devices for an instance.
This sets up the block devices on all nodes.
device_info = []
disks_ok = True
iname = instance.name
- disks = _ExpandCheckDisks(instance, disks)
+ if check:
+ disks = _ExpandCheckDisks(instance, disks)
# With the two passes mechanism we try to reduce the window of
# opportunity for the race condition of switching DRBD to primary
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ False, idx)
msg = result.fail_msg
if msg:
+ is_offline_secondary = (node in instance.secondary_nodes and
+ result.offline)
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=False, pass=1): %s",
inst_disk.iv_name, node, msg)
- if not ignore_secondaries:
+ if not (ignore_secondaries or is_offline_secondary):
disks_ok = False
# FIXME: race condition on drbd migration to primary
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ True, idx)
msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
for disk in disks:
for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(top_disk, node)
- result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+ result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
msg = result.fail_msg
if msg:
lu.LogWarning("Could not shutdown block device %s on node %s: %s",
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, instance.primary_node, "Instance primary node"
" offline, cannot reinstall")
- for node in instance.secondary_nodes:
- _CheckNodeOnline(self, node, "Instance secondary node offline,"
- " cannot reinstall")
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
# TODO: Implement support changing VG while recreating
constants.IDISK_VG,
constants.IDISK_METAVG,
+ constants.IDISK_PROVIDER,
]))
def CheckArguments(self):
"""
logging.info("Removing block devices for instance %s", instance.name)
- if not _RemoveDisks(lu, instance):
+ if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures):
if not ignore_failures:
raise errors.OpExecError("Can't remove instance's disks")
feedback_fn("Warning: can't remove instance's disks")
# activate, get path, copy the data over
for idx, disk in enumerate(instance.disks):
self.LogInfo("Copying data for disk %d", idx)
- result = self.rpc.call_blockdev_assemble(target_node, disk,
+ result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
instance.name, True, idx)
if result.fail_msg:
self.LogWarning("Can't assemble newly created disk %d: %s",
errs.append(result.fail_msg)
break
dev_path = result.payload
- result = self.rpc.call_blockdev_export(source_node, disk,
+ result = self.rpc.call_blockdev_export(source_node, (disk, instance),
target_node, dev_path,
cluster_name)
if result.fail_msg:
ial = IAllocator(self.cfg, self.rpc,
mode=constants.IALLOCATOR_MODE_RELOC,
name=self.instance_name,
- # TODO See why hail breaks with a single node below
- relocate_from=[self.instance.primary_node,
- self.instance.primary_node],
+ relocate_from=[self.instance.primary_node],
)
ial.Run(self.lu.op.iallocator)
all_done = True
result = self.rpc.call_drbd_wait_sync(self.all_nodes,
self.nodes_ip,
- self.instance.disks)
+ (self.instance.disks,
+ self.instance))
min_percent = 100
for node, nres in result.items():
nres.Raise("Cannot resync disks on node %s" % node)
msg = "single-master"
self.feedback_fn("* changing disks into %s mode" % msg)
result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
- self.instance.disks,
+ (self.instance.disks, self.instance),
self.instance.name, multimaster)
for node, nres in result.items():
nres.Raise("Cannot change disks config on node %s" % node)
self.feedback_fn("* checking disk consistency between source and target")
for (idx, dev) in enumerate(instance.disks):
- if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+ if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
raise errors.OpExecError("Disk %s is degraded or not fully"
" synchronized on target node,"
" aborting migration" % idx)
self._GoReconnect(False)
self._WaitUntilSync()
- # If the instance's disk template is `rbd' and there was a successful
- # migration, unmap the device from the source node.
- if self.instance.disk_template == constants.DT_RBD:
+ # If the instance's disk template is `rbd' or `ext' and there was a
+ # successful migration, unmap the device from the source node.
+ if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
disks = _ExpandCheckDisks(instance, instance.disks)
self.feedback_fn("* unmapping instance's disks from %s" % source_node)
for disk in disks:
- result = self.rpc.call_blockdev_shutdown(source_node, disk)
+ result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
msg = result.fail_msg
if msg:
logging.error("Migration was successful, but couldn't unmap the"
self.feedback_fn("* checking disk consistency between source and target")
for (idx, dev) in enumerate(instance.disks):
# for drbd, these are drbd over lvm
- if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+ if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
+ False):
if primary_node.offline:
self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
" target node %s" %
return self._ExecMigration()
-def _CreateBlockDev(lu, node, instance, device, force_create,
- info, force_open):
+def _CreateBlockDev(lu, node, instance, device, force_create, info,
+ force_open):
+ """Wrapper around L{_CreateBlockDevInner}.
+
+ This method annotates the root device first.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
+ return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
+ force_open)
+
+
+def _CreateBlockDevInner(lu, node, instance, device, force_create,
+ info, force_open):
"""Create a tree of block devices on a given node.
If this device type has to be created on secondaries, create it and
If not, just recurse to children keeping the same 'force' value.
+ @attention: The device has to be annotated already.
+
@param lu: the lu on whose behalf we execute
@param node: the node on which to create the device
@type instance: L{objects.Instance}
if device.children:
for child in device.children:
- _CreateBlockDev(lu, node, instance, child, force_create,
- info, force_open)
+ _CreateBlockDevInner(lu, node, instance, child, force_create,
+ info, force_open)
if not force_create:
return
results.append("%s%s" % (new_id, val))
return results
+def _GetPCIInfo(lu, dev_type):
-def _ComputeLDParams(disk_template, disk_params):
- """Computes Logical Disk parameters from Disk Template parameters.
-
- @type disk_template: string
- @param disk_template: disk template, one of L{constants.DISK_TEMPLATES}
- @type disk_params: dict
- @param disk_params: disk template parameters; dict(template_name -> parameters
- @rtype: list(dict)
- @return: a list of dicts, one for each node of the disk hierarchy. Each dict
- contains the LD parameters of the node. The tree is flattened in-order.
-
- """
- if disk_template not in constants.DISK_TEMPLATES:
- raise errors.ProgrammerError("Unknown disk template %s" % disk_template)
-
- result = list()
- dt_params = disk_params[disk_template]
- if disk_template == constants.DT_DRBD8:
- drbd_params = {
- constants.LDP_RESYNC_RATE: dt_params[constants.DRBD_RESYNC_RATE],
- constants.LDP_BARRIERS: dt_params[constants.DRBD_DISK_BARRIERS],
- constants.LDP_NO_META_FLUSH: dt_params[constants.DRBD_META_BARRIERS],
- constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG],
- constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM],
- constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM],
- constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC],
- constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD],
- constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET],
- constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET],
- constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE],
- constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE],
- }
-
- drbd_params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_DRBD8],
- drbd_params)
-
- result.append(drbd_params)
-
- # data LV
- data_params = {
- constants.LDP_STRIPES: dt_params[constants.DRBD_DATA_STRIPES],
- }
- data_params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
- data_params)
- result.append(data_params)
-
- # metadata LV
- meta_params = {
- constants.LDP_STRIPES: dt_params[constants.DRBD_META_STRIPES],
- }
- meta_params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
- meta_params)
- result.append(meta_params)
-
- elif (disk_template == constants.DT_FILE or
- disk_template == constants.DT_SHARED_FILE):
- result.append(constants.DISK_LD_DEFAULTS[constants.LD_FILE])
-
- elif disk_template == constants.DT_PLAIN:
- params = {
- constants.LDP_STRIPES: dt_params[constants.LV_STRIPES],
- }
- params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
- params)
- result.append(params)
-
- elif disk_template == constants.DT_BLOCK:
- result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV])
+ if lu.op.hotplug:
+ # case of InstanceCreate()
+ if hasattr(lu, 'hotplug_info'):
+ if lu.hotplug_info is not None:
+ idx = getattr(lu.hotplug_info, dev_type)
+ setattr(lu.hotplug_info, dev_type, idx+1)
+ pci = lu.hotplug_info.pci_pool.pop()
+ lu.LogInfo("Choosing pci slot %d" % pci)
+ return idx, pci
+ # case of InstanceSetParams()
+ elif lu.instance.hotplug_info is not None:
+ idx, pci = lu.cfg.GetPCIInfo(lu.instance.name, dev_type)
+ lu.LogInfo("Choosing pci slot %d" % pci)
+ return idx, pci
- elif disk_template == constants.DT_RBD:
- params = {
- constants.LDP_POOL: dt_params[constants.RBD_POOL]
- }
- params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD],
- params)
- result.append(params)
-
- return result
+ lu.LogWarning("Hotplug not supported for this instance.")
+ return None, None
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
- iv_name, p_minor, s_minor, drbd_params, data_params,
- meta_params):
+ iv_name, p_minor, s_minor):
"""Generate a drbd8 device complete with its children.
"""
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
logical_id=(vgnames[0], names[0]),
- params=data_params)
+ params={})
dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
logical_id=(vgnames[1], names[1]),
- params=meta_params)
- drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
+ params={})
+
+ disk_idx, pci = _GetPCIInfo(lu, 'disks')
+ drbd_dev = objects.Disk(idx=disk_idx, pci=pci,
+ dev_type=constants.LD_DRBD8, size=size,
logical_id=(primary, secondary, port,
p_minor, s_minor,
shared_secret),
children=[dev_data, dev_meta],
- iv_name=iv_name, params=drbd_params)
+ iv_name=iv_name, params={})
return drbd_dev
_DISK_TEMPLATE_NAME_PREFIX = {
constants.DT_PLAIN: "",
constants.DT_RBD: ".rbd",
+ constants.DT_EXT: ".ext",
}
constants.DT_SHARED_FILE: constants.LD_FILE,
constants.DT_BLOCK: constants.LD_BLOCKDEV,
constants.DT_RBD: constants.LD_RBD,
+ constants.DT_EXT: constants.LD_EXT,
}
def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
secondary_nodes, disk_info, file_storage_dir, file_driver, base_index,
- feedback_fn, disk_params,
- _req_file_storage=opcodes.RequireFileStorage,
+ feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
_req_shr_file_storage=opcodes.RequireSharedFileStorage):
"""Generate the entire disk layout for a given template type.
vgname = lu.cfg.GetVGName()
disk_count = len(disk_info)
disks = []
- ld_params = _ComputeLDParams(template_name, disk_params)
if template_name == constants.DT_DISKLESS:
pass
elif template_name == constants.DT_DRBD8:
- drbd_params, data_params, meta_params = ld_params
if len(secondary_nodes) != 1:
raise errors.ProgrammerError("Wrong template configuration")
remote_node = secondary_nodes[0]
minors = lu.cfg.AllocateDRBDMinor(
[primary_node, remote_node] * len(disk_info), instance_name)
+ (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
+ full_disk_params)
+ drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
+
names = []
for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
for i in range(disk_count)]):
names.append(lv_prefix + "_meta")
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
- drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
data_vg = disk.get(constants.IDISK_VG, vgname)
meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
[data_vg, meta_vg],
names[idx * 2:idx * 2 + 2],
"disk/%d" % disk_index,
- minors[idx * 2], minors[idx * 2 + 1],
- drbd_params, data_params, meta_params)
+ minors[idx * 2], minors[idx * 2 + 1])
disk_dev.mode = disk[constants.IDISK_MODE]
disks.append(disk_dev)
else:
(name_prefix, base_index + i)
for i in range(disk_count)])
- dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
-
if template_name == constants.DT_PLAIN:
def logical_id_fn(idx, _, disk):
vg = disk.get(constants.IDISK_VG, vgname)
disk[constants.IDISK_ADOPT])
elif template_name == constants.DT_RBD:
logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
+ elif template_name == constants.DT_EXT:
+ def logical_id_fn(idx, _, disk):
+ provider = disk.get(constants.IDISK_PROVIDER, None)
+ if provider is None:
+ raise errors.ProgrammerError("Disk template is %s, but '%s' is"
+ " not found", constants.DT_EXT,
+ constants.IDISK_PROVIDER)
+ return (provider, names[idx])
else:
raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
+ dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
+
for idx, disk in enumerate(disk_info):
+ params={}
+ # Only for the Ext template add disk_info to params
+ if template_name == constants.DT_EXT:
+ params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
+ for key in disk:
+ if key not in constants.IDISK_PARAMS:
+ params[key] = disk[key]
disk_index = idx + base_index
size = disk[constants.IDISK_SIZE]
feedback_fn("* disk %s, size %s" %
(disk_index, utils.FormatUnit(size, "h")))
+
+ disk_idx, pci = _GetPCIInfo(lu, 'disks')
+
disks.append(objects.Disk(dev_type=dev_type, size=size,
logical_id=logical_id_fn(idx, disk_index, disk),
iv_name="disk/%d" % disk_index,
mode=disk[constants.IDISK_MODE],
- params=ld_params[0]))
+ params=params, idx=disk_idx, pci=pci))
return disks
lu.cfg.SetDiskID(device, node)
logging.info("Pause sync of instance %s disks", instance.name)
- result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (instance.disks, instance),
+ True)
+ result.Raise("Failed RPC to node %s for pausing the disk syncing" % node)
for idx, success in enumerate(result.payload):
if not success:
wipe_size = min(wipe_chunk_size, size - offset)
logging.debug("Wiping disk %d, offset %s, chunk %s",
idx, offset, wipe_size)
- result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+ result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
+ wipe_size)
result.Raise("Could not wipe disk %d at offset %d for size %d" %
(idx, offset, wipe_size))
now = time.time()
finally:
logging.info("Resume sync of instance %s disks", instance.name)
- result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (instance.disks, instance),
+ False)
- for idx, success in enumerate(result.payload):
- if not success:
- lu.LogWarning("Resume sync of disk %d failed, please have a"
- " look at the status and troubleshoot the issue", idx)
- logging.warn("resume-sync of instance %s for disks %d failed",
- instance.name, idx)
+ if result.fail_msg:
+ lu.LogWarning("RPC call to %s for resuming disk syncing failed,"
+ " please have a look at the status and troubleshoot"
+ " the issue: %s", node, result.fail_msg)
+ else:
+ for idx, success in enumerate(result.payload):
+ if not success:
+ lu.LogWarning("Resume sync of disk %d failed, please have a"
+ " look at the status and troubleshoot the issue", idx)
+ logging.warn("resume-sync of instance %s for disks %d failed",
+ instance.name, idx)
def _CreateDisks(lu, instance, to_skip=None, target_node=None):
_CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
-def _RemoveDisks(lu, instance, target_node=None):
+def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
"""Remove all disks for an instance.
This abstracts away some work from `AddInstance()` and
logging.info("Removing block devices for instance %s", instance.name)
all_result = True
- for (idx, device) in enumerate(instance.disks):
+ ports_to_release = set()
+ anno_disks = _AnnotateDiskParams(instance, instance.disks, lu.cfg)
+ for (idx, device) in enumerate(anno_disks):
if target_node:
edata = [(target_node, device)]
else:
edata = device.ComputeNodeTree(instance.primary_node)
for node, disk in edata:
lu.cfg.SetDiskID(disk, node)
- msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
- if msg:
+ result = lu.rpc.call_blockdev_remove(node, disk)
+ if result.fail_msg:
lu.LogWarning("Could not remove disk %s on node %s,"
- " continuing anyway: %s", idx, node, msg)
- all_result = False
+ " continuing anyway: %s", idx, node, result.fail_msg)
+ if not (result.offline and node != instance.primary_node):
+ all_result = False
# if this is a DRBD disk, return its port to the pool
if device.dev_type in constants.LDS_DRBD:
- tcp_port = device.logical_id[2]
- lu.cfg.AddTcpUdpPort(tcp_port)
+ ports_to_release.add(device.logical_id[2])
+
+ if all_result or ignore_failures:
+ for port in ports_to_release:
+ lu.cfg.AddTcpUdpPort(port)
if instance.disk_template == constants.DT_FILE:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
def _ComputeDiskSize(disk_template, disks):
- """Compute disk size requirements in the volume group
+ """Compute disk size requirements according to disk template
"""
# Required free disk space as a function of disk and swap space
# 128 MB are added for drbd metadata for each disk
constants.DT_DRBD8:
sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
- constants.DT_FILE: None,
- constants.DT_SHARED_FILE: 0,
+ constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
+ constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
constants.DT_BLOCK: 0,
- constants.DT_RBD: 0,
+ constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
+ constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
}
if disk_template not in req_size_dict:
# check disks. parameter names and consistent adopt/no-adopt strategy
has_adopt = has_no_adopt = False
for disk in self.op.disks:
- utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
+ if self.op.disk_template != constants.DT_EXT:
+ utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
if constants.IDISK_ADOPT in disk:
has_adopt = True
else:
"""Run the allocator based on input opcode.
"""
+ #TODO Export network to iallocator so that it chooses a pnode
+ # in a nodegroup that has the desired network connected to
nics = [n.ToDict() for n in self.nics]
ial = IAllocator(self.cfg, self.rpc,
mode=constants.IALLOCATOR_MODE_ALLOC,
os=self.op.os_type,
vcpus=self.be_full[constants.BE_VCPUS],
memory=self.be_full[constants.BE_MAXMEM],
+ spindle_use=self.be_full[constants.BE_SPINDLE_USE],
disks=self.disks,
nics=nics,
hypervisor=self.op.hypervisor,
if self.op.mode == constants.INSTANCE_IMPORT:
export_info = self._ReadExportInfo()
self._ReadExportParams(export_info)
+ self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
+ else:
+ self._old_instance_name = None
if (not self.cfg.GetVGName() and
self.op.disk_template not in constants.DTS_NOT_LVM):
if self.op.identify_defaults:
self._RevertToDefaults(cluster)
+ self.hotplug_info = None
+ if self.op.hotplug:
+ self.LogInfo("Enabling hotplug.")
+ self.hotplug_info = objects.HotplugInfo(disks=0, nics=0,
+ pci_pool=list(range(16,32)))
# NIC buildup
self.nics = []
for idx, nic in enumerate(self.op.nics):
if nic_mode is None or nic_mode == constants.VALUE_AUTO:
nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
- # in routed mode, for the first nic, the default ip is 'auto'
- if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
- default_ip_mode = constants.VALUE_AUTO
+ net = nic.get(constants.INIC_NETWORK, None)
+ link = nic.get(constants.NIC_LINK, None)
+ ip = nic.get(constants.INIC_IP, None)
+
+ if net is None or net.lower() == constants.VALUE_NONE:
+ net = None
else:
- default_ip_mode = constants.VALUE_NONE
+ if nic_mode_req is not None or link is not None:
+ raise errors.OpPrereqError("If network is given, no mode or link"
+ " is allowed to be passed",
+ errors.ECODE_INVAL)
# ip validity checks
- ip = nic.get(constants.INIC_IP, default_ip_mode)
if ip is None or ip.lower() == constants.VALUE_NONE:
nic_ip = None
elif ip.lower() == constants.VALUE_AUTO:
errors.ECODE_INVAL)
nic_ip = self.hostname1.ip
else:
- if not netutils.IPAddress.IsValid(ip):
+ # We defer pool operations until later, so that the iallocator has
+ # filled in the instance's node(s) dimara
+ if ip.lower() == constants.NIC_IP_POOL:
+ if net is None:
+ raise errors.OpPrereqError("if ip=pool, parameter network"
+ " must be passed too",
+ errors.ECODE_INVAL)
+
+ elif not netutils.IPAddress.IsValid(ip):
raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
errors.ECODE_INVAL)
+
nic_ip = ip
# TODO: check the ip address for uniqueness
errors.ECODE_NOTUNIQUE)
# Build nic parameters
- link = nic.get(constants.INIC_LINK, None)
- if link == constants.VALUE_AUTO:
- link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK]
nicparams = {}
if nic_mode_req:
nicparams[constants.NIC_MODE] = nic_mode
check_params = cluster.SimpleFillNIC(nicparams)
objects.NIC.CheckParameterSyntax(check_params)
- self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
+ nic_idx, pci = _GetPCIInfo(self, 'nics')
+ self.nics.append(objects.NIC(idx=nic_idx, pci=pci,
+ mac=mac, ip=nic_ip, network=net,
+ nicparams=check_params))
# disk checks/pre-build
default_vg = self.cfg.GetVGName()
raise errors.OpPrereqError("Invalid disk size '%s'" % size,
errors.ECODE_INVAL)
+ ext_provider = disk.get(constants.IDISK_PROVIDER, None)
+ if ext_provider and self.op.disk_template != constants.DT_EXT:
+ raise errors.OpPrereqError("The '%s' option is only valid for the %s"
+ " disk template, not %s" %
+ (constants.IDISK_PROVIDER, constants.DT_EXT,
+ self.op.disk_template), errors.ECODE_INVAL)
+
data_vg = disk.get(constants.IDISK_VG, default_vg)
new_disk = {
constants.IDISK_SIZE: size,
constants.IDISK_MODE: mode,
constants.IDISK_VG: data_vg,
}
+
if constants.IDISK_METAVG in disk:
new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
if constants.IDISK_ADOPT in disk:
new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
+
+ # For extstorage, demand the `provider' option and add any
+ # additional parameters (ext-params) to the dict
+ if self.op.disk_template == constants.DT_EXT:
+ if ext_provider:
+ new_disk[constants.IDISK_PROVIDER] = ext_provider
+ for key in disk:
+ if key not in constants.IDISK_PARAMS:
+ new_disk[key] = disk[key]
+ else:
+ raise errors.OpPrereqError("Missing provider for template '%s'" %
+ constants.DT_EXT, errors.ECODE_INVAL)
+
self.disks.append(new_disk)
if self.op.mode == constants.INSTANCE_IMPORT:
self.src_images = disk_images
- old_name = export_info.get(constants.INISECT_INS, "name")
- if self.op.instance_name == old_name:
+ if self.op.instance_name == self._old_instance_name:
for idx, nic in enumerate(self.nics):
if nic.mac == constants.VALUE_AUTO:
nic_mac_ini = "nic%d_mac" % idx
# creation job will fail.
for nic in self.nics:
if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
+ nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId())
#### allocator run
self.secondaries = []
+ # Fill in any IPs from IP pools. This must happen here, because we need to
+ # know the nic's primary node, as specified by the iallocator
+ for idx, nic in enumerate(self.nics):
+ net = nic.network
+ if net is not None:
+ netparams = self.cfg.GetGroupNetParams(net, self.pnode.name)
+ if netparams is None:
+ raise errors.OpPrereqError("No netparams found for network"
+ " %s. Propably not connected to"
+ " node's %s nodegroup" %
+ (net, self.pnode.name),
+ errors.ECODE_INVAL)
+ self.LogInfo("NIC/%d inherits netparams %s" %
+ (idx, netparams.values()))
+ nic.nicparams = dict(netparams)
+ if nic.ip is not None:
+ filled_params = cluster.SimpleFillNIC(nic.nicparams)
+ if nic.ip.lower() == constants.NIC_IP_POOL:
+ try:
+ nic.ip = self.cfg.GenerateIp(net, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("Unable to get a free IP for NIC %d"
+ " from the address pool" % idx,
+ errors.ECODE_STATE)
+ self.LogInfo("Chose IP %s from network %s", nic.ip, net)
+ else:
+ try:
+ self.cfg.ReserveIp(net, nic.ip, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("IP address %s already in use"
+ " or does not belong to network %s" %
+ (nic.ip, net),
+ errors.ECODE_NOTUNIQUE)
+ else:
+ # net is None, ip None or given
+ if self.op.conflicts_check:
+ _CheckForConflictingIp(self, nic.ip, self.pnode.name)
+
+
# mirror node verification
if self.op.disk_template in constants.DTS_INT_MIRROR:
if self.op.snode == pnode.name:
nodenames = [pnode.name] + self.secondaries
# Verify instance specs
+ spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
ispec = {
constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
constants.ISPEC_DISK_COUNT: len(self.disks),
constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks],
constants.ISPEC_NIC_COUNT: len(self.nics),
+ constants.ISPEC_SPINDLE_USE: spindle_use,
}
group_info = self.cfg.GetNodeGroup(pnode.group)
utils.CommaJoin(res)),
errors.ECODE_INVAL)
- # disk parameters (not customizable at instance or node level)
- # just use the primary node parameters, ignoring the secondary.
- self.diskparams = group_info.diskparams
-
if not self.adopt_disks:
if self.op.disk_template == constants.DT_RBD:
# _CheckRADOSFreeSpace() is just a placeholder.
# Any function that checks prerequisites can be placed here.
# Check if there is enough space on the RADOS cluster.
_CheckRADOSFreeSpace()
+ elif self.op.disk_template == constants.DT_EXT:
+ # FIXME: Function that checks prereqs if needed
+ pass
else:
# Check lv size requirements, if not adopting
req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
_CheckNicsBridgesExist(self, self.nics, self.pnode.name)
+ #TODO: _CheckExtParams (remotely)
+ # Check parameters for extstorage
+
# memory check on primary node
#TODO(dynmem): use MINMEM for checking
if self.op.start:
else:
network_port = None
+ # This is ugly but we got a chicken-egg problem here
+ # We can only take the group disk parameters, as the instance
+ # has no disks yet (we are generating them right here).
+ node = self.cfg.GetNodeInfo(pnode_name)
+ nodegroup = self.cfg.GetNodeGroup(node.group)
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
instance, pnode_name,
self.op.file_driver,
0,
feedback_fn,
- self.diskparams)
+ self.cfg.GetGroupDiskParams(nodegroup))
iobj = objects.Instance(name=instance, os=self.op.os_type,
primary_node=pnode_name,
hvparams=self.op.hvparams,
hypervisor=self.op.hypervisor,
osparams=self.op.osparams,
+ hotplug_info=self.hotplug_info,
)
if self.op.tags:
_ReleaseLocks(self, locking.LEVEL_NODE_RES)
if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
+ # we need to set the disks ID to the primary node, since the
+ # preceding code might or might have not done it, depending on
+ # disk template and other options
+ for disk in iobj.disks:
+ self.cfg.SetDiskID(disk, pnode_name)
if self.op.mode == constants.INSTANCE_CREATE:
if not self.op.no_install:
pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
if pause_sync:
feedback_fn("* pausing disk sync to install instance OS")
result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
- iobj.disks, True)
+ (iobj.disks,
+ iobj), True)
for idx, success in enumerate(result.payload):
if not success:
logging.warn("pause-sync of instance %s for disk %d failed",
if pause_sync:
feedback_fn("* resuming disk sync")
result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
- iobj.disks, False)
+ (iobj.disks,
+ iobj), False)
for idx, success in enumerate(result.payload):
if not success:
logging.warn("resume-sync of instance %s for disk %d failed",
os_add_result.Raise("Could not add os for instance %s"
" on node %s" % (instance, pnode_name))
- elif self.op.mode == constants.INSTANCE_IMPORT:
- feedback_fn("* running the instance OS import scripts...")
+ else:
+ if self.op.mode == constants.INSTANCE_IMPORT:
+ feedback_fn("* running the instance OS import scripts...")
+
+ transfers = []
+
+ for idx, image in enumerate(self.src_images):
+ if not image:
+ continue
+
+ # FIXME: pass debug option from opcode to backend
+ dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+ constants.IEIO_FILE, (image, ),
+ constants.IEIO_SCRIPT,
+ (iobj.disks[idx], idx),
+ None)
+ transfers.append(dt)
+
+ import_result = \
+ masterd.instance.TransferInstanceData(self, feedback_fn,
+ self.op.src_node, pnode_name,
+ self.pnode.secondary_ip,
+ iobj, transfers)
+ if not compat.all(import_result):
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
+ rename_from = self._old_instance_name
+
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ feedback_fn("* preparing remote import...")
+ # The source cluster will stop the instance before attempting to make
+ # a connection. In some cases stopping an instance can take a long
+ # time, hence the shutdown timeout is added to the connection
+ # timeout.
+ connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
+ self.op.source_shutdown_timeout)
+ timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
- transfers = []
+ assert iobj.primary_node == self.pnode.name
+ disk_results = \
+ masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
+ self.source_x509_ca,
+ self._cds, timeouts)
+ if not compat.all(disk_results):
+ # TODO: Should the instance still be started, even if some disks
+ # failed to import (valid for local imports, too)?
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
- for idx, image in enumerate(self.src_images):
- if not image:
- continue
+ rename_from = self.source_instance_name
- # FIXME: pass debug option from opcode to backend
- dt = masterd.instance.DiskTransfer("disk/%s" % idx,
- constants.IEIO_FILE, (image, ),
- constants.IEIO_SCRIPT,
- (iobj.disks[idx], idx),
- None)
- transfers.append(dt)
-
- import_result = \
- masterd.instance.TransferInstanceData(self, feedback_fn,
- self.op.src_node, pnode_name,
- self.pnode.secondary_ip,
- iobj, transfers)
- if not compat.all(import_result):
- self.LogWarning("Some disks for instance %s on node %s were not"
- " imported successfully" % (instance, pnode_name))
-
- elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
- feedback_fn("* preparing remote import...")
- # The source cluster will stop the instance before attempting to make a
- # connection. In some cases stopping an instance can take a long time,
- # hence the shutdown timeout is added to the connection timeout.
- connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
- self.op.source_shutdown_timeout)
- timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
-
- assert iobj.primary_node == self.pnode.name
- disk_results = \
- masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
- self.source_x509_ca,
- self._cds, timeouts)
- if not compat.all(disk_results):
- # TODO: Should the instance still be started, even if some disks
- # failed to import (valid for local imports, too)?
- self.LogWarning("Some disks for instance %s on node %s were not"
- " imported successfully" % (instance, pnode_name))
+ else:
+ # also checked in the prereq part
+ raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
+ % self.op.mode)
# Run rename script on newly imported instance
assert iobj.name == instance
feedback_fn("Running rename script for %s" % instance)
result = self.rpc.call_instance_run_rename(pnode_name, iobj,
- self.source_instance_name,
+ rename_from,
self.op.debug_level)
if result.fail_msg:
self.LogWarning("Failed to run rename script for %s on node"
" %s: %s" % (instance, pnode_name, result.fail_msg))
- else:
- # also checked in the prereq part
- raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
- % self.op.mode)
-
assert not self.owned_locks(locking.LEVEL_NODE_RES)
if self.op.start:
self.lu.LogInfo("Checking disk/%d on %s", idx, node)
self.cfg.SetDiskID(dev, node)
- result = self.rpc.call_blockdev_find(node, dev)
+ result = _BlockdevFind(self, node, dev, instance)
if result.offline:
continue
_CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
ignore=self.ignore_ipolicy)
- # TODO: compute disk parameters
- primary_node_info = self.cfg.GetNodeInfo(instance.primary_node)
- secondary_node_info = self.cfg.GetNodeInfo(secondary_node)
- if primary_node_info.group != secondary_node_info.group:
- self.lu.LogInfo("The instance primary and secondary nodes are in two"
- " different node groups; the disk parameters of the"
- " primary node's group will be applied.")
-
- self.diskparams = self.cfg.GetNodeGroup(primary_node_info.group).diskparams
-
for node in check_nodes:
_CheckNodeOnline(self.lu, node)
self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
self.cfg.SetDiskID(dev, node)
- result = self.rpc.call_blockdev_find(node, dev)
+ result = _BlockdevFind(self, node, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
self.lu.LogInfo("Checking disk/%d consistency on node %s" %
(idx, node_name))
- if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
- ldisk=ldisk):
+ if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+ on_primary, ldisk=ldisk):
raise errors.OpExecError("Node %s has degraded storage, unsafe to"
" replace disks for instance %s" %
(node_name, self.instance.name))
"""
iv_names = {}
- for idx, dev in enumerate(self.instance.disks):
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ for idx, dev in enumerate(disks):
if idx not in self.disks:
continue
lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
names = _GenerateUniqueNames(self.lu, lv_names)
- _, data_p, meta_p = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
-
- vg_data = dev.children[0].logical_id[0]
+ (data_disk, meta_disk) = dev.children
+ vg_data = data_disk.logical_id[0]
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
- logical_id=(vg_data, names[0]), params=data_p)
- vg_meta = dev.children[1].logical_id[0]
+ logical_id=(vg_data, names[0]),
+ params=data_disk.params)
+ vg_meta = meta_disk.logical_id[0]
lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
- logical_id=(vg_meta, names[1]), params=meta_p)
+ logical_id=(vg_meta, names[1]),
+ params=meta_disk.params)
new_lvs = [lv_data, lv_meta]
old_lvs = [child.Copy() for child in dev.children]
# we pass force_create=True to force the LVM creation
for new_lv in new_lvs:
- _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False)
+ _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
+ _GetInstanceInfoText(self.instance), False)
return iv_names
for name, (dev, _, _) in iv_names.iteritems():
self.cfg.SetDiskID(dev, node_name)
- result = self.rpc.call_blockdev_find(node_name, dev)
+ result = _BlockdevFind(self, node_name, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
# Now that the new lvs have the old name, we can add them to the device
self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
- result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
- new_lvs)
+ result = self.rpc.call_blockdev_addchildren(self.target_node,
+ (dev, self.instance), new_lvs)
msg = result.fail_msg
if msg:
for new_lv in new_lvs:
# Step: create new storage
self.lu.LogStep(3, steps_total, "Allocate new storage")
- for idx, dev in enumerate(self.instance.disks):
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ for idx, dev in enumerate(disks):
self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
(self.new_node, idx))
# we pass force_create=True to force LVM creation
for new_lv in dev.children:
- _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False)
+ _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
+ True, _GetInstanceInfoText(self.instance), False)
# Step 4: dbrd minors and drbd setups changes
# after this, we must manually remove the drbd minors on both the
iv_names[idx] = (dev, dev.children, new_net_id)
logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
new_net_id)
- drbd_params, _, _ = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
logical_id=new_alone_id,
children=dev.children,
size=dev.size,
- params=drbd_params)
+ params={})
+ (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
+ self.cfg)
try:
- _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
+ _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
+ anno_new_drbd,
_GetInstanceInfoText(self.instance), False)
except errors.GenericError:
self.cfg.ReleaseDRBDMinors(self.instance.name)
for idx, dev in enumerate(self.instance.disks):
self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
self.cfg.SetDiskID(dev, self.target_node)
- msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
+ msg = self.rpc.call_blockdev_shutdown(self.target_node,
+ (dev, self.instance)).fail_msg
if msg:
self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
"node: %s" % (idx, msg),
result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
self.new_node],
self.node_secondary_ip,
- self.instance.disks,
+ (self.instance.disks, self.instance),
self.instance.name,
False)
for to_node, to_result in result.items():
env = {
"DISK": self.op.disk,
"AMOUNT": self.op.amount,
+ "ABSOLUTE": self.op.absolute,
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
return env
self.disk = instance.FindDisk(self.op.disk)
+ if self.op.absolute:
+ self.target = self.op.amount
+ self.delta = self.target - self.disk.size
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested size (%s) is smaller than "
+ "current disk size (%s)" %
+ (utils.FormatUnit(self.target, "h"),
+ utils.FormatUnit(self.disk.size, "h")),
+ errors.ECODE_STATE)
+ else:
+ self.delta = self.op.amount
+ self.target = self.disk.size + self.delta
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested increment (%s) is negative" %
+ utils.FormatUnit(self.delta, "h"),
+ errors.ECODE_INVAL)
+
if instance.disk_template not in (constants.DT_FILE,
constants.DT_SHARED_FILE,
- constants.DT_RBD):
+ constants.DT_RBD,
+ constants.DT_EXT):
# TODO: check the free disk space for file, when that feature will be
# supported
_CheckNodesFreeDiskPerVG(self, nodenames,
- self.disk.ComputeGrowth(self.op.amount))
+ self.disk.ComputeGrowth(self.delta))
def Exec(self, feedback_fn):
"""Execute disk grow.
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
- feedback_fn("Growing disk %s of instance '%s' by %s" %
+ feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
(self.op.disk, instance.name,
- utils.FormatUnit(self.op.amount, "h")))
+ utils.FormatUnit(self.delta, "h"),
+ utils.FormatUnit(self.target, "h")))
# First run all grow ops in dry-run mode
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ True)
result.Raise("Grow request failed to node %s" % node)
# We know that (as far as we can test) operations across different
# nodes will succeed, time to run it for real
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ False)
result.Raise("Grow request failed to node %s" % node)
# TODO: Rewrite code to work properly
# time is a work-around.
time.sleep(5)
- disk.RecordGrow(self.op.amount)
+ disk.RecordGrow(self.delta)
self.cfg.Update(instance, feedback_fn)
# Changes have been recorded, release node lock
else:
self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
- if self.op.use_locking and level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
+ if self.op.use_locking:
+ if level == locking.LEVEL_NODEGROUP:
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+
+ # Lock all groups used by instances optimistically; this requires going
+ # via the node before it's locked, requiring verification later on
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ frozenset(group_uuid
+ for instance_name in owned_instances
+ for group_uuid in
+ self.cfg.GetInstanceNodeGroups(instance_name))
+
+ elif level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
def CheckPrereq(self):
"""Check prerequisites.
This only checks the optional instance list against the existing names.
"""
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
+
if self.wanted_names is None:
assert self.op.use_locking, "Locking was not used"
- self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
+ self.wanted_names = owned_instances
- self.wanted_instances = \
- map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
+ instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
+
+ if self.op.use_locking:
+ _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
+ None)
+ else:
+ assert not (owned_instances or owned_groups or owned_nodes)
- def _ComputeBlockdevStatus(self, node, instance_name, dev):
+ self.wanted_instances = instances.values()
+
+ def _ComputeBlockdevStatus(self, node, instance, dev):
"""Returns the status of a block device
"""
if result.offline:
return None
- result.Raise("Can't compute disk status for %s" % instance_name)
+ result.Raise("Can't compute disk status for %s" % instance.name)
status = result.payload
if status is None:
"""Compute block device status.
"""
+ (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
+
+ return self._ComputeDiskStatusInner(instance, snode, anno_dev)
+
+ def _ComputeDiskStatusInner(self, instance, snode, dev):
+ """Compute block device status.
+
+ @attention: The device has to be annotated already.
+
+ """
if dev.dev_type in constants.LDS_DRBD:
# we change the snode then (otherwise we use the one passed in)
if dev.logical_id[0] == instance.primary_node:
snode = dev.logical_id[0]
dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
- instance.name, dev)
- dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
+ instance, dev)
+ dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
if dev.children:
- dev_children = map(compat.partial(self._ComputeDiskStatus,
+ dev_children = map(compat.partial(self._ComputeDiskStatusInner,
instance, snode),
dev.children)
else:
cluster = self.cfg.GetClusterInfo()
- pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node
- for i in self.wanted_instances)
- for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes):
+ node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
+ nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
+
+ groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
+ for node in nodes.values()))
+
+ group2name_fn = lambda uuid: groups[uuid].name
+
+ for instance in self.wanted_instances:
+ pnode = nodes[instance.primary_node]
+
if self.op.static or pnode.offline:
remote_state = None
if pnode.offline:
disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
instance.disks)
+ snodes_group_uuids = [nodes[snode_name].group
+ for snode_name in instance.secondary_nodes]
+
result[instance.name] = {
"name": instance.name,
"config_state": instance.admin_state,
"run_state": remote_state,
"pnode": instance.primary_node,
+ "pnode_group_uuid": pnode.group,
+ "pnode_group_name": group2name_fn(pnode.group),
"snodes": instance.secondary_nodes,
+ "snodes_group_uuids": snodes_group_uuids,
+ "snodes_group_names": map(group2name_fn, snodes_group_uuids),
"os": instance.os,
# this happens to be the same format used for hooks
"nics": _NICListToTuple(self, instance.nics),
if remove_fn is not None:
remove_fn(absidx, item, private)
+ #TODO: include a hotplugged msg in changes
changes = [("%s/%s" % (kind, absidx), "remove")]
assert container[absidx] == item
del container[absidx]
elif op == constants.DDM_MODIFY:
if modify_fn is not None:
+ #TODO: include a hotplugged msg in changes
changes = modify_fn(absidx, item, params, private)
+
else:
raise errors.ProgrammerError("Unhandled operation '%s'" % op)
for (op, _, params) in mods:
assert ht.TDict(params)
- utils.ForceDictType(params, key_types)
+ # If key_types is an empty dict, we assume we have an 'ext' template
+ # and thus do not ForceDictType
+ if key_types:
+ utils.ForceDictType(params, key_types)
if op == constants.DDM_REMOVE:
if params:
params[constants.IDISK_SIZE] = size
- elif op == constants.DDM_MODIFY and constants.IDISK_SIZE in params:
- raise errors.OpPrereqError("Disk size change not possible, use"
- " grow-disk", errors.ECODE_INVAL)
+ elif op == constants.DDM_MODIFY:
+ if constants.IDISK_SIZE in params:
+ raise errors.OpPrereqError("Disk size change not possible, use"
+ " grow-disk", errors.ECODE_INVAL)
+ if constants.IDISK_MODE not in params:
+ raise errors.OpPrereqError("Disk 'mode' is the only kind of"
+ " modification supported, but missing",
+ errors.ECODE_NOENT)
+ if len(params) > 1:
+ raise errors.OpPrereqError("Disk modification doesn't support"
+ " additional arbitrary parameters",
+ errors.ECODE_INVAL)
@staticmethod
def _VerifyNicModification(op, params):
"""
if op in (constants.DDM_ADD, constants.DDM_MODIFY):
ip = params.get(constants.INIC_IP, None)
- if ip is None:
- pass
- elif ip.lower() == constants.VALUE_NONE:
- params[constants.INIC_IP] = None
- elif not netutils.IPAddress.IsValid(ip):
- raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
- errors.ECODE_INVAL)
-
- bridge = params.get("bridge", None)
- link = params.get(constants.INIC_LINK, None)
- if bridge and link:
- raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
- " at the same time", errors.ECODE_INVAL)
- elif bridge and bridge.lower() == constants.VALUE_NONE:
- params["bridge"] = None
- elif link and link.lower() == constants.VALUE_NONE:
- params[constants.INIC_LINK] = None
+ req_net = params.get(constants.INIC_NETWORK, None)
+ link = params.get(constants.NIC_LINK, None)
+ mode = params.get(constants.NIC_MODE, None)
+ if req_net is not None:
+ if req_net.lower() == constants.VALUE_NONE:
+ params[constants.INIC_NETWORK] = None
+ req_net = None
+ elif link is not None or mode is not None:
+ raise errors.OpPrereqError("If network is given"
+ " mode or link should not",
+ errors.ECODE_INVAL)
if op == constants.DDM_ADD:
macaddr = params.get(constants.INIC_MAC, None)
if macaddr is None:
params[constants.INIC_MAC] = constants.VALUE_AUTO
+ if ip is not None:
+ if ip.lower() == constants.VALUE_NONE:
+ params[constants.INIC_IP] = None
+ else:
+ if ip.lower() == constants.NIC_IP_POOL:
+ if op == constants.DDM_ADD and req_net is None:
+ raise errors.OpPrereqError("If ip=pool, parameter network"
+ " cannot be none",
+ errors.ECODE_INVAL)
+ else:
+ if not netutils.IPAddress.IsValid(ip):
+ raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
+ errors.ECODE_INVAL)
+
if constants.INIC_MAC in params:
macaddr = params[constants.INIC_MAC]
if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
if self.op.hvparams:
_CheckGlobalHvParams(self.op.hvparams)
- self.op.disks = \
- self._UpgradeDiskNicMods("disk", self.op.disks,
- opcodes.OpInstanceSetParams.TestDiskModifications)
+ if self.op.allow_arbit_params:
+ self.op.disks = \
+ self._UpgradeDiskNicMods("disk", self.op.disks,
+ opcodes.OpInstanceSetParams.TestExtDiskModifications)
+ else:
+ self.op.disks = \
+ self._UpgradeDiskNicMods("disk", self.op.disks,
+ opcodes.OpInstanceSetParams.TestDiskModifications)
+
self.op.nics = \
self._UpgradeDiskNicMods("NIC", self.op.nics,
opcodes.OpInstanceSetParams.TestNicModifications)
# Check disk modifications
- self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
- self._VerifyDiskModification)
+ if self.op.allow_arbit_params:
+ self._CheckMods("disk", self.op.disks, {},
+ self._VerifyDiskModification)
+ else:
+ self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
+ self._VerifyDiskModification)
if self.op.disks and self.op.disk_template is not None:
raise errors.OpPrereqError("Disk template conversion and other disk"
nics = []
for nic in self._new_nics:
- nicparams = self.cluster.SimpleFillNIC(nic.nicparams)
- mode = nicparams[constants.NIC_MODE]
- link = nicparams[constants.NIC_LINK]
- nics.append((nic.ip, nic.mac, mode, link))
+ n = copy.deepcopy(nic)
+ nicparams = self.cluster.SimpleFillNIC(n.nicparams)
+ n.nicparams = nicparams
+ nics.append(_NICToTuple(self, n))
args["nics"] = nics
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
return (nl, nl)
- def _PrepareNicModification(self, params, private, old_ip, old_params,
- cluster, pnode):
+ def _PrepareNicModification(self, params, private, old_ip, old_net,
+ old_params, cluster, pnode):
+
update_params_dict = dict([(key, params[key])
for key in constants.NICS_PARAMETERS
if key in params])
- if "bridge" in params:
- update_params_dict[constants.NIC_LINK] = params["bridge"]
+ req_link = update_params_dict.get(constants.NIC_LINK, None)
+ req_mode = update_params_dict.get(constants.NIC_MODE, None)
+
+ new_net = params.get(constants.INIC_NETWORK, old_net)
+ if new_net is not None:
+ netparams = self.cfg.GetGroupNetParams(new_net, pnode)
+ if netparams is None:
+ raise errors.OpPrereqError("No netparams found for the network"
+ " %s, propably not connected." % new_net,
+ errors.ECODE_INVAL)
+ new_params = dict(netparams)
+ else:
+ new_params = _GetUpdatedParams(old_params, update_params_dict)
- new_params = _GetUpdatedParams(old_params, update_params_dict)
utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES)
new_filled_params = cluster.SimpleFillNIC(new_params)
elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
# otherwise generate the MAC address
params[constants.INIC_MAC] = \
- self.cfg.GenerateMAC(self.proc.GetECId())
+ self.cfg.GenerateMAC(new_net, self.proc.GetECId())
else:
# or validate/reserve the current one
try:
raise errors.OpPrereqError("MAC address '%s' already in use"
" in cluster" % mac,
errors.ECODE_NOTUNIQUE)
+ elif new_net != old_net:
+ def get_net_prefix(net):
+ if net:
+ uuid = self.cfg.LookupNetwork(net)
+ if uuid:
+ nobj = self.cfg.GetNetwork(uuid)
+ return nobj.mac_prefix
+ return None
+ new_prefix = get_net_prefix(new_net)
+ old_prefix = get_net_prefix(old_net)
+ if old_prefix != new_prefix:
+ params[constants.INIC_MAC] = \
+ self.cfg.GenerateMAC(new_net, self.proc.GetECId())
+
+ #if there is a change in nic-network configuration
+ new_ip = params.get(constants.INIC_IP, old_ip)
+ if (new_ip, new_net) != (old_ip, old_net):
+ if new_ip:
+ if new_net:
+ if new_ip.lower() == constants.NIC_IP_POOL:
+ try:
+ new_ip = self.cfg.GenerateIp(new_net, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("Unable to get a free IP"
+ " from the address pool",
+ errors.ECODE_STATE)
+ self.LogInfo("Chose IP %s from pool %s", new_ip, new_net)
+ params[constants.INIC_IP] = new_ip
+ elif new_ip != old_ip or new_net != old_net:
+ try:
+ self.LogInfo("Reserving IP %s in pool %s", new_ip, new_net)
+ self.cfg.ReserveIp(new_net, new_ip, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("IP %s not available in network %s" %
+ (new_ip, new_net),
+ errors.ECODE_NOTUNIQUE)
+ elif new_ip.lower() == constants.NIC_IP_POOL:
+ raise errors.OpPrereqError("ip=pool, but no network found",
+ ECODEE_INVAL)
+ else:
+ # new net is None
+ if self.op.conflicts_check:
+ _CheckForConflictingIp(self, new_ip, pnode)
+
+ if old_ip:
+ if old_net:
+ try:
+ self.cfg.ReleaseIp(old_net, old_ip, self.proc.GetECId())
+ except errors.AddressPoolError:
+ logging.warning("Release IP %s not contained in network %s",
+ old_ip, old_net)
+
+ # there are no changes in (net, ip) tuple
+ elif (old_net is not None and
+ (req_link is not None or req_mode is not None)):
+ raise errors.OpPrereqError("Not allowed to change link or mode of"
+ " a NIC that is connected to a network.",
+ errors.ECODE_INVAL)
+ logging.info("new_params %s", new_params)
+ logging.info("new_filled_params %s", new_filled_params)
private.params = new_params
private.filled = new_filled_params
- return (None, None)
-
def CheckPrereq(self):
"""Check prerequisites.
pnode = instance.primary_node
nodelist = list(instance.all_nodes)
pnode_info = self.cfg.GetNodeInfo(pnode)
- self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams
+ self.diskparams = self.cfg.GetInstanceDiskParams(instance)
# Prepare disk/NIC modifications
self.diskmod = PrepareContainerMods(self.op.disks, None)
self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate)
+ logging.info("nicmod %s", self.nicmod)
+
+ # Check the validity of the `provider' parameter
+ if instance.disk_template in constants.DT_EXT:
+ for mod in self.diskmod:
+ ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
+ if mod[0] == constants.DDM_ADD:
+ if ext_provider is None:
+ raise errors.OpPrereqError("Instance template is '%s' and parameter"
+ " '%s' missing, during disk add" %
+ (constants.DT_EXT,
+ constants.IDISK_PROVIDER),
+ errors.ECODE_NOENT)
+ elif mod[0] == constants.DDM_MODIFY:
+ if ext_provider:
+ raise errors.OpPrereqError("Parameter '%s' is invalid during disk"
+ " modification" % constants.IDISK_PROVIDER,
+ errors.ECODE_INVAL)
+ else:
+ for mod in self.diskmod:
+ ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
+ if ext_provider is not None:
+ raise errors.OpPrereqError("Parameter '%s' is only valid for instances"
+ " of type '%s'" % (constants.IDISK_PROVIDER,
+ constants.DT_EXT), errors.ECODE_INVAL)
# OS change
if self.op.os_name and not self.op.force:
self.be_proposed = cluster.SimpleFillBE(instance.beparams)
be_old = cluster.FillBE(instance)
- # CPU param validation -- checking every time a paramtere is
+ # CPU param validation -- checking every time a parameter is
# changed to cover all cases where either CPU mask or vcpus have
# changed
if (constants.BE_VCPUS in self.be_proposed and
errors.ECODE_INVAL)
def _PrepareNicCreate(_, params, private):
- return self._PrepareNicModification(params, private, None, {},
- cluster, pnode)
+ self._PrepareNicModification(params, private, None, None,
+ {}, cluster, pnode)
+ return (None, None)
def _PrepareNicMod(_, nic, params, private):
- return self._PrepareNicModification(params, private, nic.ip,
- nic.nicparams, cluster, pnode)
+ self._PrepareNicModification(params, private, nic.ip, nic.network,
+ nic.nicparams, cluster, pnode)
+ return None
+
+ def _PrepareNicRemove(_, params, private):
+ ip = params.ip
+ net = params.network
+ if net is not None and ip is not None:
+ self.cfg.ReleaseIp(net, ip, self.proc.GetECId())
# Verify NIC changes (operating on copy)
nics = instance.nics[:]
ApplyContainerMods("NIC", nics, None, self.nicmod,
- _PrepareNicCreate, _PrepareNicMod, None)
+ _PrepareNicCreate, _PrepareNicMod, _PrepareNicRemove)
if len(nics) > constants.MAX_NICS:
raise errors.OpPrereqError("Instance has too many network interfaces"
" (%d), cannot add more" % constants.MAX_NICS,
errors.ECODE_STATE)
+
# Verify disk changes (operating on a copy)
disks = instance.disks[:]
- ApplyContainerMods("disk", disks, None, self.diskmod, None, None, None)
+ ApplyContainerMods("disk", disks, None, self.diskmod,
+ None, None, None)
if len(disks) > constants.MAX_DISKS:
raise errors.OpPrereqError("Instance has too many disks (%d), cannot add"
" more" % constants.MAX_DISKS,
# Operate on copies as this is still in prereq
nics = [nic.Copy() for nic in instance.nics]
ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod,
- self._CreateNewNic, self._ApplyNicMods, None)
+ self._CreateNewNic, self._ApplyNicMods,
+ self._RemoveNic)
self._new_nics = nics
else:
self._new_nics = None
+
def _ConvertPlainToDrbd(self, feedback_fn):
"""Converts an instance from plain to drbd.
instance.name, pnode, [snode],
disk_info, None, None, 0, feedback_fn,
self.diskparams)
+ anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
+ self.diskparams)
info = _GetInstanceInfoText(instance)
- feedback_fn("Creating aditional volumes...")
+ feedback_fn("Creating additional volumes...")
# first, create the missing data and meta devices
- for disk in new_disks:
+ for disk in anno_disks:
# unfortunately this is... not too nice
_CreateSingleBlockDev(self, pnode, instance, disk.children[1],
info, True)
feedback_fn("Initializing DRBD devices...")
# all child devices are in place, we can now create the DRBD devices
- for disk in new_disks:
+ for disk in anno_disks:
for node in [pnode, snode]:
f_create = node == pnode
_CreateSingleBlockDev(self, node, instance, disk, info, f_create)
snode = instance.secondary_nodes[0]
feedback_fn("Converting template to plain")
- old_disks = instance.disks
- new_disks = [d.children[0] for d in old_disks]
+ old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
+ new_disks = [d.children[0] for d in instance.disks]
# copy over size and mode
for parent, child in zip(old_disks, new_disks):
child.size = parent.size
child.mode = parent.mode
+ # this is a DRBD disk, return its port to the pool
+ # NOTE: this must be done right before the call to cfg.Update!
+ for disk in old_disks:
+ tcp_port = disk.logical_id[2]
+ self.cfg.AddTcpUdpPort(tcp_port)
+
# update instance structure
instance.disks = new_disks
instance.disk_template = constants.DT_PLAIN
self.LogWarning("Could not remove metadata for disk %d on node %s,"
" continuing anyway: %s", idx, pnode, msg)
- # this is a DRBD disk, return its port to the pool
- for disk in old_disks:
- tcp_port = disk.logical_id[2]
- self.cfg.AddTcpUdpPort(tcp_port)
-
- # Node resource locks will be released by caller
-
def _CreateNewDisk(self, idx, params, _):
"""Creates a new disk.
self.LogWarning("Failed to create volume %s (%s) on node '%s': %s",
disk.iv_name, disk, node, err)
+ if self.op.hotplug and disk.pci:
+ self.LogInfo("Trying to hotplug device.")
+ disk_ok, device_info = _AssembleInstanceDisks(self, self.instance,
+ [disk], check=False)
+ _, _, dev_path = device_info[0]
+ result = self.rpc.call_hot_add_disk(self.instance.primary_node,
+ self.instance, disk, dev_path, idx)
return (disk, [
("disk/%d" % idx, "add:size=%s,mode=%s" % (disk.size, disk.mode)),
])
"""Removes a disk.
"""
- for node, disk in root.ComputeNodeTree(self.instance.primary_node):
+ #TODO: log warning in case hotplug is not possible
+ # handle errors
+ if root.pci and not self.op.hotplug:
+ raise errors.OpPrereqError("Cannot remove a disk that has"
+ " been hotplugged"
+ " without removing it with hotplug",
+ errors.ECODE_INVAL)
+ if self.op.hotplug and root.pci:
+ self.LogInfo("Trying to hotplug device.")
+ self.rpc.call_hot_del_disk(self.instance.primary_node,
+ self.instance, root, idx)
+ _ShutdownInstanceDisks(self, self.instance, [root])
+ self.cfg.UpdatePCIInfo(self.instance.name, root.pci)
+
+ (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
+ for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
self.cfg.SetDiskID(disk, node)
msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
if msg:
if root.dev_type in constants.LDS_DRBD:
self.cfg.AddTcpUdpPort(root.logical_id[2])
- @staticmethod
- def _CreateNewNic(idx, params, private):
+ def _CreateNewNic(self, idx, params, private):
"""Creates data structure for a new network interface.
"""
mac = params[constants.INIC_MAC]
ip = params.get(constants.INIC_IP, None)
- nicparams = private.params
-
- return (objects.NIC(mac=mac, ip=ip, nicparams=nicparams), [
+ network = params.get(constants.INIC_NETWORK, None)
+ #TODO: not private.filled?? can a nic have no nicparams??
+ nicparams = private.filled
+
+ nic = objects.NIC(mac=mac, ip=ip, network=network, nicparams=nicparams)
+
+ #TODO: log warning in case hotplug is not possible
+ # handle errors
+ # return changes
+ if self.op.hotplug:
+ nic_idx, pci = _GetPCIInfo(self, 'nics')
+ if pci is not None:
+ nic.idx = nic_idx
+ nic.pci = pci
+ result = self.rpc.call_hot_add_nic(self.instance.primary_node,
+ self.instance, nic, idx)
+ desc = [
("nic.%d" % idx,
- "add:mac=%s,ip=%s,mode=%s,link=%s" %
+ "add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" %
(mac, ip, private.filled[constants.NIC_MODE],
- private.filled[constants.NIC_LINK])),
- ])
+ private.filled[constants.NIC_LINK],
+ network)),
+ ]
+ return (nic, desc)
- @staticmethod
- def _ApplyNicMods(idx, nic, params, private):
+ def _ApplyNicMods(self, idx, nic, params, private):
"""Modifies a network interface.
"""
changes = []
- for key in [constants.INIC_MAC, constants.INIC_IP]:
+ for key in [constants.INIC_MAC, constants.INIC_IP, constants.INIC_NETWORK]:
if key in params:
changes.append(("nic.%s/%d" % (key, idx), params[key]))
setattr(nic, key, params[key])
- if private.params:
- nic.nicparams = private.params
+ if private.filled:
+ nic.nicparams = private.filled
- for (key, val) in params.items():
+ for (key, val) in nic.nicparams.items():
changes.append(("nic.%s/%d" % (key, idx), val))
+ #TODO: log warning in case hotplug is not possible
+ # handle errors
+ if self.op.hotplug and nic.pci:
+ self.LogInfo("Trying to hotplug device.")
+ self.rpc.call_hot_del_nic(self.instance.primary_node,
+ self.instance, nic, idx)
+ result = self.rpc.call_hot_add_nic(self.instance.primary_node,
+ self.instance, nic, idx)
return changes
+ def _RemoveNic(self, idx, nic, private):
+ if nic.pci and not self.op.hotplug:
+ raise errors.OpPrereqError("Cannot remove a nic that has been hotplugged"
+ " without removing it with hotplug",
+ errors.ECODE_INVAL)
+ #TODO: log warning in case hotplug is not possible
+ # handle errors
+ if self.op.hotplug and nic.pci:
+ self.LogInfo("Trying to hotplug device.")
+ self.rpc.call_hot_del_nic(self.instance.primary_node,
+ self.instance, nic, idx)
+ self.cfg.UpdatePCIInfo(self.instance.name, nic.pci)
+
+
def Exec(self, feedback_fn):
"""Modifies an instance.
self.cfg.MarkInstanceDown(instance.name)
result.append(("admin_state", constants.ADMINST_DOWN))
- self.cfg.Update(instance, feedback_fn)
+ self.cfg.Update(instance, feedback_fn, self.proc.GetECId())
assert not (self.owned_locks(locking.LEVEL_NODE_RES) or
self.owned_locks(locking.LEVEL_NODE)), \
if self.req_target_uuids:
# User requested specific target groups
- self.target_uuids = self.req_target_uuids
+ self.target_uuids = frozenset(self.req_target_uuids)
else:
# All groups except those used by the instance are potential targets
self.target_uuids = owned_groups - inst_groups
"""
REQ_BGL = False
+ def CheckArguments(self):
+ self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
+ ["node", "export"], self.op.use_locking)
+
def ExpandNames(self):
- self.needed_locks = {}
- self.share_locks[locking.LEVEL_NODE] = 1
- if not self.op.nodes:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- else:
- self.needed_locks[locking.LEVEL_NODE] = \
- _GetWantedNodes(self, self.op.nodes)
+ self.expq.ExpandNames(self)
+
+ def DeclareLocks(self, level):
+ self.expq.DeclareLocks(self, level)
def Exec(self, feedback_fn):
- """Compute the list of all the exported system images.
+ result = {}
- @rtype: dict
- @return: a dictionary with the structure node->(export-list)
- where export-list is a list of the instances exported on
- that node.
+ for (node, expname) in self.expq.OldStyleQuery(self):
+ if expname is None:
+ result[node] = False
+ else:
+ result.setdefault(node, []).append(expname)
+
+ return result
+
+
+class _ExportQuery(_QueryBase):
+ FIELDS = query.EXPORT_FIELDS
+
+ #: The node name is not a unique key for this query
+ SORT_FIELD = "node"
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ # The following variables interact with _QueryBase._GetNames
+ if self.names:
+ self.wanted = _GetWantedNodes(lu, self.names)
+ else:
+ self.wanted = locking.ALL_SET
+
+ self.do_locking = self.use_locking
+
+ if self.do_locking:
+ lu.share_locks = _ShareAll()
+ lu.needed_locks = {
+ locking.LEVEL_NODE: self.wanted,
+ }
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
"""
- self.nodes = self.owned_locks(locking.LEVEL_NODE)
- rpcresult = self.rpc.call_export_list(self.nodes)
- result = {}
- for node in rpcresult:
- if rpcresult[node].fail_msg:
- result[node] = False
+ # Locking is not used
+ # TODO
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
+
+ result = []
+
+ for (node, nres) in lu.rpc.call_export_list(nodes).items():
+ if nres.fail_msg:
+ result.append((node, None))
else:
- result[node] = rpcresult[node].payload
+ result.extend((node, expname) for expname in nres.payload)
return result
if self.op.diskparams:
for templ in constants.DISK_TEMPLATES:
- if templ not in self.op.diskparams:
- self.op.diskparams[templ] = {}
- utils.ForceDictType(self.op.diskparams[templ], constants.DISK_DT_TYPES)
+ if templ in self.op.diskparams:
+ utils.ForceDictType(self.op.diskparams[templ],
+ constants.DISK_DT_TYPES)
+ self.new_diskparams = self.op.diskparams
+ try:
+ utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
else:
- self.op.diskparams = self.cfg.GetClusterInfo().diskparams
+ self.new_diskparams = {}
if self.op.ipolicy:
cluster = self.cfg.GetClusterInfo()
full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
try:
- objects.InstancePolicy.CheckParameterSyntax(full_ipolicy)
+ objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False)
except errors.ConfigurationError, err:
raise errors.OpPrereqError("Invalid instance policy: %s" % err,
errors.ECODE_INVAL)
uuid=self.group_uuid,
alloc_policy=self.op.alloc_policy,
ndparams=self.op.ndparams,
- diskparams=self.op.diskparams,
+ diskparams=self.new_diskparams,
ipolicy=self.op.ipolicy,
hv_state_static=self.new_hv_state,
disk_state_static=self.new_disk_state)
return query.GroupQueryData(self._cluster,
[self._all_groups[uuid]
for uuid in self.wanted],
- group_to_nodes, group_to_instances)
+ group_to_nodes, group_to_instances,
+ query.GQ_DISKPARAMS in self.requested_data)
class LUGroupQuery(NoHooksLU):
self.needed_locks[locking.LEVEL_INSTANCE] = \
self.cfg.GetNodeGroupInstances(self.group_uuid)
+ @staticmethod
+ def _UpdateAndVerifyDiskParams(old, new):
+ """Updates and verifies disk parameters.
+
+ """
+ new_params = _GetUpdatedParams(old, new)
+ utils.ForceDictType(new_params, constants.DISK_DT_TYPES)
+ return new_params
+
def CheckPrereq(self):
"""Check prerequisites.
if self.op.ndparams:
new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams)
- utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+ utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = new_ndparams
if self.op.diskparams:
- self.new_diskparams = dict()
- for templ in constants.DISK_TEMPLATES:
- if templ not in self.op.diskparams:
- self.op.diskparams[templ] = {}
- new_templ_params = _GetUpdatedParams(self.group.diskparams[templ],
- self.op.diskparams[templ])
- utils.ForceDictType(new_templ_params, constants.DISK_DT_TYPES)
- self.new_diskparams[templ] = new_templ_params
+ diskparams = self.group.diskparams
+ uavdp = self._UpdateAndVerifyDiskParams
+ # For each disktemplate subdict update and verify the values
+ new_diskparams = dict((dt,
+ uavdp(diskparams.get(dt, {}),
+ self.op.diskparams[dt]))
+ for dt in constants.DISK_TEMPLATES
+ if dt in self.op.diskparams)
+ # As we've all subdicts of diskparams ready, lets merge the actual
+ # dict with all updated subdicts
+ self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
+ try:
+ utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
if self.op.hv_state:
self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
- for instance_name in owned_instances:
- inst = self.instances[instance_name]
- assert owned_nodes.issuperset(inst.all_nodes), \
- "Instance %s's nodes changed while we kept the lock" % instance_name
-
- inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
- owned_groups)
-
- assert self.group_uuid in inst_groups, \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ _CheckInstancesNodeGroups(self.cfg, self.instances,
+ owned_groups, owned_nodes, self.group_uuid)
if self.req_target_uuids:
# User requested specific target groups
def ExpandNames(self):
self.group_uuid = None
self.needed_locks = {}
+
if self.op.kind == constants.TAG_NODE:
self.op.name = _ExpandNodeName(self.cfg, self.op.name)
- self.needed_locks[locking.LEVEL_NODE] = self.op.name
+ lock_level = locking.LEVEL_NODE
+ lock_name = self.op.name
elif self.op.kind == constants.TAG_INSTANCE:
self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
- self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+ lock_level = locking.LEVEL_INSTANCE
+ lock_name = self.op.name
elif self.op.kind == constants.TAG_NODEGROUP:
self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
+ lock_level = locking.LEVEL_NODEGROUP
+ lock_name = self.group_uuid
+ elif self.op.kind == constants.TAG_NETWORK:
+ self.network_uuid = self.cfg.LookupNetwork(self.op.name)
+ lock_level = locking.LEVEL_NETWORK
+ lock_name = self.network_uuid
+ else:
+ lock_level = None
+ lock_name = None
+
+ if lock_level and getattr(self.op, "use_locking", True):
+ self.needed_locks[lock_level] = lock_name
# FIXME: Acquire BGL for cluster tag operations (as of this writing it's
# not possible to acquire the BGL based on opcode parameters)
self.target = self.cfg.GetInstanceInfo(self.op.name)
elif self.op.kind == constants.TAG_NODEGROUP:
self.target = self.cfg.GetNodeGroup(self.group_uuid)
+ elif self.op.kind == constants.TAG_NETWORK:
+ self.target = self.cfg.GetNetwork(self.network_uuid)
else:
raise errors.OpPrereqError("Wrong tag type requested (%s)" %
str(self.op.kind), errors.ECODE_INVAL)
self.in_text = self.out_text = self.in_data = self.out_data = None
# init all input fields so that pylint is happy
self.mode = mode
- self.memory = self.disks = self.disk_template = None
+ self.memory = self.disks = self.disk_template = self.spindle_use = None
self.os = self.tags = self.nics = self.vcpus = None
self.hypervisor = None
self.relocate_from = None
"ip": nic.ip,
"mode": filled_params[constants.NIC_MODE],
"link": filled_params[constants.NIC_LINK],
+ "network": nic.network,
}
if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
nic_dict["bridge"] = filled_params[constants.NIC_LINK]
"admin_state": iinfo.admin_state,
"vcpus": beinfo[constants.BE_VCPUS],
"memory": beinfo[constants.BE_MAXMEM],
+ "spindle_use": beinfo[constants.BE_SPINDLE_USE],
"os": iinfo.os,
"nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
"nics": nic_data,
"os": self.os,
"vcpus": self.vcpus,
"memory": self.memory,
+ "spindle_use": self.spindle_use,
"disks": self.disks,
"disk_space_total": disk_space,
"nics": self.nics,
[
("name", ht.TString),
("memory", ht.TInt),
+ ("spindle_use", ht.TInt),
("disks", ht.TListOf(ht.TDict)),
("disk_template", ht.TString),
("os", ht.TString),
result = ial.out_text
return result
+# Network LUs
+class LUNetworkAdd(LogicalUnit):
+ """Logical unit for creating networks.
+
+ """
+ HPATH = "network-add"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
+
+ def ExpandNames(self):
+ self.network_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
+ self.needed_locks = {}
+ self.add_locks[locking.LEVEL_NETWORK] = self.network_uuid
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the given group name is not an existing node group
+ already.
+
+ """
+ if self.op.network is None:
+ raise errors.OpPrereqError("Network must be given",
+ errors.ECODE_INVAL)
+
+ uuid = self.cfg.LookupNetwork(self.op.network_name)
+
+ if uuid:
+ raise errors.OpPrereqError("Network '%s' already defined" %
+ self.op.network, errors.ECODE_EXISTS)
+
+ if self.op.mac_prefix:
+ utils.NormalizeAndValidateMac(self.op.mac_prefix+":00:00:00")
+
+ # Check tag validity
+ for tag in self.op.tags:
+ objects.TaggableObject.ValidateTag(tag)
+
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ args = {
+ "name": self.op.network_name,
+ "network": self.op.network,
+ "gateway": self.op.gateway,
+ "network6": self.op.network6,
+ "gateway6": self.op.gateway6,
+ "mac_prefix": self.op.mac_prefix,
+ "network_type": self.op.network_type,
+ "tags": self.op.tags,
+ }
+ return _BuildNetworkHookEnv(**args)
+
+ def Exec(self, feedback_fn):
+ """Add the ip pool to the cluster.
+
+ """
+ nobj = objects.Network(name=self.op.network_name,
+ network=self.op.network,
+ gateway=self.op.gateway,
+ network6=self.op.network6,
+ gateway6=self.op.gateway6,
+ mac_prefix=self.op.mac_prefix,
+ network_type=self.op.network_type,
+ uuid=self.network_uuid,
+ family=4)
+ # Initialize the associated address pool
+ try:
+ pool = network.AddressPool.InitializeNetwork(nobj)
+ except errors.AddressPoolError, e:
+ raise errors.OpExecError("Cannot create IP pool for this network. %s" % e)
+
+ # Check if we need to reserve the nodes and the cluster master IP
+ # These may not be allocated to any instances in routed mode, as
+ # they wouldn't function anyway.
+ for node in self.cfg.GetAllNodesInfo().values():
+ for ip in [node.primary_ip, node.secondary_ip]:
+ try:
+ pool.Reserve(ip)
+ self.LogInfo("Reserved node %s's IP (%s)", node.name, ip)
+
+ except errors.AddressPoolError:
+ pass
+
+ master_ip = self.cfg.GetClusterInfo().master_ip
+ try:
+ pool.Reserve(master_ip)
+ self.LogInfo("Reserved cluster master IP (%s)", master_ip)
+ except errors.AddressPoolError:
+ pass
+
+ if self.op.add_reserved_ips:
+ for ip in self.op.add_reserved_ips:
+ try:
+ pool.Reserve(ip, external=True)
+ except errors.AddressPoolError, e:
+ raise errors.OpExecError("Cannot reserve IP %s. %s " % (ip, e))
+
+ if self.op.tags:
+ for tag in self.op.tags:
+ nobj.AddTag(tag)
+
+ self.cfg.AddNetwork(nobj, self.proc.GetECId(), check_uuid=False)
+ del self.remove_locks[locking.LEVEL_NETWORK]
+
+
+class LUNetworkRemove(LogicalUnit):
+ HPATH = "network-remove"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.network_uuid = self.cfg.LookupNetwork(self.op.network_name)
+
+ self.needed_locks = {
+ locking.LEVEL_NETWORK: [self.network_uuid],
+ }
+
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the given network name exists as a network, that is
+ empty (i.e., contains no nodes), and that is not the last group of the
+ cluster.
+
+ """
+ if not self.network_uuid:
+ raise errors.OpPrereqError("Network %s not found" % self.op.network_name,
+ errors.ECODE_INVAL)
+
+ # Verify that the network is not conncted.
+ node_groups = [group.name
+ for group in self.cfg.GetAllNodeGroupsInfo().values()
+ for network in group.networks.keys()
+ if network == self.network_uuid]
+
+ if node_groups:
+ self.LogWarning("Nework '%s' is connected to the following"
+ " node groups: %s" % (self.op.network_name,
+ utils.CommaJoin(utils.NiceSort(node_groups))))
+ raise errors.OpPrereqError("Network still connected",
+ errors.ECODE_STATE)
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ return {
+ "NETWORK_NAME": self.op.network_name,
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
+
+ def Exec(self, feedback_fn):
+ """Remove the network.
+
+ """
+ try:
+ self.cfg.RemoveNetwork(self.network_uuid)
+ except errors.ConfigurationError:
+ raise errors.OpExecError("Network '%s' with UUID %s disappeared" %
+ (self.op.network_name, self.network_uuid))
+
+
+class LUNetworkSetParams(LogicalUnit):
+ """Modifies the parameters of a network.
+
+ """
+ HPATH = "network-modify"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ if (self.op.gateway and
+ (self.op.add_reserved_ips or self.op.remove_reserved_ips)):
+ raise errors.OpPrereqError("Cannot modify gateway and reserved ips"
+ " at once", errors.ECODE_INVAL)
+
+
+ def ExpandNames(self):
+ self.network_uuid = self.cfg.LookupNetwork(self.op.network_name)
+ self.network = self.cfg.GetNetwork(self.network_uuid)
+ self.needed_locks = {
+ locking.LEVEL_NETWORK: [self.network_uuid],
+ }
+
+
+ if self.network is None:
+ raise errors.OpPrereqError("Could not retrieve network '%s' (UUID: %s)" %
+ (self.op.network_name, self.network_uuid),
+ errors.ECODE_INVAL)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ self.gateway = self.network.gateway
+ self.network_type = self.network.network_type
+ self.mac_prefix = self.network.mac_prefix
+ self.network6 = self.network.network6
+ self.gateway6 = self.network.gateway6
+ self.tags = self.network.tags
+
+ self.pool = network.AddressPool(self.network)
+
+ if self.op.gateway:
+ if self.op.gateway == constants.VALUE_NONE:
+ self.gateway = None
+ else:
+ self.gateway = self.op.gateway
+ if self.pool.IsReserved(self.gateway):
+ raise errors.OpPrereqError("%s is already reserved" %
+ self.gateway, errors.ECODE_INVAL)
+
+ if self.op.network_type:
+ if self.op.network_type == constants.VALUE_NONE:
+ self.network_type = None
+ else:
+ self.network_type = self.op.network_type
+
+ if self.op.mac_prefix:
+ if self.op.mac_prefix == constants.VALUE_NONE:
+ self.mac_prefix = None
+ else:
+ utils.NormalizeAndValidateMac(self.op.mac_prefix+":00:00:00")
+ self.mac_prefix = self.op.mac_prefix
+
+ if self.op.gateway6:
+ if self.op.gateway6 == constants.VALUE_NONE:
+ self.gateway6 = None
+ else:
+ self.gateway6 = self.op.gateway6
+
+ if self.op.network6:
+ if self.op.network6 == constants.VALUE_NONE:
+ self.network6 = None
+ else:
+ self.network6 = self.op.network6
+
+
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ args = {
+ "name": self.op.network_name,
+ "network": self.network.network,
+ "gateway": self.gateway,
+ "network6": self.network6,
+ "gateway6": self.gateway6,
+ "mac_prefix": self.mac_prefix,
+ "network_type": self.network_type,
+ "tags": self.tags,
+ }
+ return _BuildNetworkHookEnv(**args)
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
+
+ def Exec(self, feedback_fn):
+ """Modifies the network.
+
+ """
+ #TODO: reserve/release via temporary reservation manager
+ # extend cfg.ReserveIp/ReleaseIp with the external flag
+ if self.op.gateway:
+ if self.gateway == self.network.gateway:
+ self.LogWarning("Gateway is already %s" % self.gateway)
+ else:
+ if self.gateway:
+ self.pool.Reserve(self.gateway, external=True)
+ if self.network.gateway:
+ self.pool.Release(self.network.gateway, external=True)
+ self.network.gateway = self.gateway
+
+ if self.op.add_reserved_ips:
+ for ip in self.op.add_reserved_ips:
+ try:
+ if self.pool.IsReserved(ip):
+ self.LogWarning("IP %s is already reserved" % ip)
+ else:
+ self.pool.Reserve(ip, external=True)
+ except errors.AddressPoolError, e:
+ self.LogWarning("Cannot reserve ip %s. %s" % (ip, e))
+
+ if self.op.remove_reserved_ips:
+ for ip in self.op.remove_reserved_ips:
+ if ip == self.network.gateway:
+ self.LogWarning("Cannot unreserve Gateway's IP")
+ continue
+ try:
+ if not self.pool.IsReserved(ip):
+ self.LogWarning("IP %s is already unreserved" % ip)
+ else:
+ self.pool.Release(ip, external=True)
+ except errors.AddressPoolError, e:
+ self.LogWarning("Cannot release ip %s. %s" % (ip, e))
+
+ if self.op.mac_prefix:
+ self.network.mac_prefix = self.mac_prefix
+
+ if self.op.network6:
+ self.network.network6 = self.network6
+
+ if self.op.gateway6:
+ self.network.gateway6 = self.gateway6
+
+ if self.op.network_type:
+ self.network.network_type = self.network_type
+
+ self.pool.Validate()
+
+ self.cfg.Update(self.network, feedback_fn)
+
+
+class _NetworkQuery(_QueryBase):
+ FIELDS = query.NETWORK_FIELDS
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ self._all_networks = lu.cfg.GetAllNetworksInfo()
+ name_to_uuid = dict((n.name, n.uuid) for n in self._all_networks.values())
+
+ if not self.names:
+ self.wanted = [name_to_uuid[name]
+ for name in utils.NiceSort(name_to_uuid.keys())]
+ else:
+ # Accept names to be either names or UUIDs.
+ missing = []
+ self.wanted = []
+ all_uuid = frozenset(self._all_networks.keys())
+
+ for name in self.names:
+ if name in all_uuid:
+ self.wanted.append(name)
+ elif name in name_to_uuid:
+ self.wanted.append(name_to_uuid[name])
+ else:
+ missing.append(name)
+
+ if missing:
+ raise errors.OpPrereqError("Some networks do not exist: %s" % missing,
+ errors.ECODE_NOENT)
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of networks and their attributes.
+
+ """
+ do_instances = query.NETQ_INST in self.requested_data
+ do_groups = do_instances or (query.NETQ_GROUP in self.requested_data)
+ do_stats = query.NETQ_STATS in self.requested_data
+ cluster = lu.cfg.GetClusterInfo()
+
+ network_to_groups = None
+ network_to_instances = None
+ stats = None
+
+ # For NETQ_GROUP, we need to map network->[groups]
+ if do_groups:
+ all_groups = lu.cfg.GetAllNodeGroupsInfo()
+ network_to_groups = dict((uuid, []) for uuid in self.wanted)
+ default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
+
+ if do_instances:
+ all_instances = lu.cfg.GetAllInstancesInfo()
+ all_nodes = lu.cfg.GetAllNodesInfo()
+ network_to_instances = dict((uuid, []) for uuid in self.wanted)
+
+
+ for group in all_groups.values():
+ if do_instances:
+ group_nodes = [node.name for node in all_nodes.values() if
+ node.group == group.uuid]
+ group_instances = [instance for instance in all_instances.values()
+ if instance.primary_node in group_nodes]
+
+ for net_uuid in group.networks.keys():
+ if net_uuid in network_to_groups:
+ netparams = group.networks[net_uuid]
+ mode = netparams[constants.NIC_MODE]
+ link = netparams[constants.NIC_LINK]
+ info = group.name + '(' + mode + ', ' + link + ')'
+ network_to_groups[net_uuid].append(info)
+
+ if do_instances:
+ for instance in group_instances:
+ for nic in instance.nics:
+ if nic.network == self._all_networks[net_uuid].name:
+ network_to_instances[net_uuid].append(instance.name)
+ break
+
+ if do_stats:
+ stats = {}
+ for uuid, net in self._all_networks.items():
+ if uuid in self.wanted:
+ pool = network.AddressPool(net)
+ stats[uuid] = {
+ "free_count": pool.GetFreeCount(),
+ "reserved_count": pool.GetReservedCount(),
+ "map": pool.GetMap(),
+ "external_reservations": ", ".join(pool.GetExternalReservations()),
+ }
+
+ return query.NetworkQueryData([self._all_networks[uuid]
+ for uuid in self.wanted],
+ network_to_groups,
+ network_to_instances,
+ stats)
+
+
+class LUNetworkQuery(NoHooksLU):
+ """Logical unit for querying networks.
+
+ """
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.nq = _NetworkQuery(qlang.MakeSimpleFilter("name", self.op.names),
+ self.op.output_fields, False)
+
+ def ExpandNames(self):
+ self.nq.ExpandNames(self)
+
+ def Exec(self, feedback_fn):
+ return self.nq.OldStyleQuery(self)
+
+
+
+class LUNetworkConnect(LogicalUnit):
+ """Connect a network to a nodegroup
+
+ """
+ HPATH = "network-connect"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.network_name = self.op.network_name
+ self.group_name = self.op.group_name
+ self.network_mode = self.op.network_mode
+ self.network_link = self.op.network_link
+
+ self.network_uuid = self.cfg.LookupNetwork(self.network_name)
+ self.network = self.cfg.GetNetwork(self.network_uuid)
+ self.group_uuid = self.cfg.LookupNodeGroup(self.group_name)
+ self.group = self.cfg.GetNodeGroup(self.group_uuid)
+
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [self.group_uuid],
+ }
+ self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once group lock has
+ # been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ def BuildHooksEnv(self):
+ ret = dict()
+ ret["GROUP_NAME"] = self.group_name
+ ret["GROUP_NETWORK_MODE"] = self.network_mode
+ ret["GROUP_NETWORK_LINK"] = self.network_link
+ ret.update(_BuildNetworkHookEnvByObject(self, self.network))
+ return ret
+
+ def BuildHooksNodes(self):
+ nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ return (nodes, nodes)
+
+
+ def CheckPrereq(self):
+ l = lambda value: ", ".join("%s: %s/%s" % (i[0], i[1], i[2])
+ for i in value)
+
+ if self.network is None:
+ raise errors.OpPrereqError("Network %s does not exist" %
+ self.network_name, errors.ECODE_INVAL)
+
+ self.netparams = dict()
+ self.netparams[constants.NIC_MODE] = self.network_mode
+ self.netparams[constants.NIC_LINK] = self.network_link
+ objects.NIC.CheckParameterSyntax(self.netparams)
+
+ #if self.network_mode == constants.NIC_MODE_BRIDGED:
+ # _CheckNodeGroupBridgesExist(self, self.network_link, self.group_uuid)
+ self.connected = False
+ if self.network_uuid in self.group.networks:
+ self.LogWarning("Network '%s' is already mapped to group '%s'" %
+ (self.network_name, self.group.name))
+ self.connected = True
+ return
+
+ pool = network.AddressPool(self.network)
+ if self.op.conflicts_check:
+ groupinstances = []
+ for n in self.cfg.GetNodeGroupInstances(self.group_uuid):
+ groupinstances.append(self.cfg.GetInstanceInfo(n))
+ instances = [(instance.name, idx, nic.ip)
+ for instance in groupinstances
+ for idx, nic in enumerate(instance.nics)
+ if (not nic.network and pool._Contains(nic.ip))]
+ if instances:
+ self.LogWarning("Following occurences use IPs from network %s"
+ " that is about to connect to nodegroup %s: %s" %
+ (self.network_name, self.group.name,
+ l(instances)))
+ raise errors.OpPrereqError("Conflicting IPs found."
+ " Please remove/modify"
+ " corresponding NICs",
+ errors.ECODE_INVAL)
+
+ def Exec(self, feedback_fn):
+ if self.connected:
+ return
+
+ self.group.networks[self.network_uuid] = self.netparams
+ self.cfg.Update(self.group, feedback_fn)
+
+
+class LUNetworkDisconnect(LogicalUnit):
+ """Disconnect a network to a nodegroup
+
+ """
+ HPATH = "network-disconnect"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.network_name = self.op.network_name
+ self.group_name = self.op.group_name
+
+ self.network_uuid = self.cfg.LookupNetwork(self.network_name)
+ self.network = self.cfg.GetNetwork(self.network_uuid)
+ self.group_uuid = self.cfg.LookupNodeGroup(self.group_name)
+ self.group = self.cfg.GetNodeGroup(self.group_uuid)
+
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [self.group_uuid],
+ }
+ self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once group lock has
+ # been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ def BuildHooksEnv(self):
+ ret = dict()
+ ret["GROUP_NAME"] = self.group_name
+ ret.update(_BuildNetworkHookEnvByObject(self, self.network))
+ return ret
+
+ def BuildHooksNodes(self):
+ nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ return (nodes, nodes)
+
+
+ def CheckPrereq(self):
+ l = lambda value: ", ".join("%s: %s/%s" % (i[0], i[1], i[2])
+ for i in value)
+
+ self.connected = True
+ if self.network_uuid not in self.group.networks:
+ self.LogWarning("Network '%s' is"
+ " not mapped to group '%s'" %
+ (self.network_name, self.group.name))
+ self.connected = False
+ return
+
+ if self.op.conflicts_check:
+ groupinstances = []
+ for n in self.cfg.GetNodeGroupInstances(self.group_uuid):
+ groupinstances.append(self.cfg.GetInstanceInfo(n))
+ instances = [(instance.name, idx, nic.ip)
+ for instance in groupinstances
+ for idx, nic in enumerate(instance.nics)
+ if nic.network == self.network_name]
+ if instances:
+ self.LogWarning("Following occurences use IPs from network %s"
+ " that is about to disconnected from the nodegroup"
+ " %s: %s" %
+ (self.network_name, self.group.name,
+ l(instances)))
+ raise errors.OpPrereqError("Conflicting IPs."
+ " Please remove/modify"
+ " corresponding NICS",
+ errors.ECODE_INVAL)
+
+ def Exec(self, feedback_fn):
+ if not self.connected:
+ return
+
+ del self.group.networks[self.network_uuid]
+ self.cfg.Update(self.group, feedback_fn)
+
#: Query type implementations
_QUERY_IMPL = {
+ constants.QR_CLUSTER: _ClusterQuery,
constants.QR_INSTANCE: _InstanceQuery,
constants.QR_NODE: _NodeQuery,
constants.QR_GROUP: _GroupQuery,
+ constants.QR_NETWORK: _NetworkQuery,
constants.QR_OS: _OsQuery,
+ constants.QR_EXTSTORAGE: _ExtStorageQuery,
+ constants.QR_EXPORT: _ExportQuery,
}
assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
except KeyError:
raise errors.OpPrereqError("Unknown query resource '%s'" % name,
errors.ECODE_INVAL)
+
+def _CheckForConflictingIp(lu, ip, node):
+ """In case of conflicting ip raise error.
+
+ @type ip: string
+ @param ip: ip address
+ @type node: string
+ @param node: node name
+
+ """
+ (conf_net, conf_netparams) = lu.cfg.CheckIPInNodeGroup(ip, node)
+ if conf_net is not None:
+ raise errors.OpPrereqError("Conflicting IP found:"
+ " %s <> %s." % (ip, conf_net),
+ errors.ECODE_INVAL)
+
+ return (None, None)