#
#
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# W0201 since most LU attributes are defined in CheckPrereq or similar
# functions
-# C0302: since we have waaaay to many lines in this module
+# C0302: since we have waaaay too many lines in this module
import os
import os.path
import time
import re
-import platform
import logging
import copy
import OpenSSL
import shutil
import itertools
import operator
+import ipaddr
from ganeti import ssh
from ganeti import utils
from ganeti import qlang
from ganeti import opcodes
from ganeti import ht
+from ganeti import rpc
+from ganeti import runtime
+from ganeti import network
import ganeti.masterd.instance # pylint: disable=W0611
+#: Size of DRBD meta block device
+DRBD_META_SIZE = 128
+
+# States of instance
+INSTANCE_DOWN = [constants.ADMINST_DOWN]
+INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
+INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
+
+#: Instance status in which an instance can be marked as offline/online
+CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
+ constants.ADMINST_OFFLINE,
+ ]))
+
+
class ResultWithJobs:
"""Data container for LU results with jobs.
Instances of this class returned from L{LogicalUnit.Exec} will be recognized
- by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+ by L{mcpu._ProcessResult}. The latter will then submit the jobs
contained in the C{jobs} attribute and include the job IDs in the opcode
result.
HTYPE = None
REQ_BGL = True
- def __init__(self, processor, op, context, rpc):
+ def __init__(self, processor, op, context, rpc_runner):
"""Constructor for LogicalUnit.
This needs to be overridden in derived classes in order to check op
# readability alias
self.owned_locks = context.glm.list_owned
self.context = context
- self.rpc = rpc
+ self.rpc = rpc_runner
# Dicts used to declare locking needs to mcpu
self.needed_locks = None
self.share_locks = dict.fromkeys(locking.LEVELS, 0)
as values. Rules:
- use an empty dict if you don't need any lock
- - if you don't need any lock at a particular level omit that level
+ - if you don't need any lock at a particular level omit that
+ level (note that in this case C{DeclareLocks} won't be called
+ at all for that level)
+ - if you need locks at a level, but you can't calculate it in
+ this function, initialise that level with an empty list and do
+ further processing in L{LogicalUnit.DeclareLocks} (see that
+ function's docstring)
- don't put anything for the BGL level
- - if you want all locks at a level use locking.ALL_SET as a value
+ - if you want all locks at a level use L{locking.ALL_SET} as a value
If you need to share locks (rather than acquire them exclusively) at one
level you can modify self.share_locks, setting a true value (usually 1) for
self.needed_locks for the level.
@param level: Locking level which is going to be locked
- @type level: member of ganeti.locking.LEVELS
+ @type level: member of L{ganeti.locking.LEVELS}
"""
self.op.instance_name)
self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
- def _LockInstancesNodes(self, primary_only=False):
+ def _LockInstancesNodes(self, primary_only=False,
+ level=locking.LEVEL_NODE):
"""Helper function to declare instances' nodes for locking.
This function should be called after locking one or more instances to lock
@type primary_only: boolean
@param primary_only: only lock primary nodes of locked instances
+ @param level: Which lock level to use for locking nodes
"""
- assert locking.LEVEL_NODE in self.recalculate_locks, \
+ assert level in self.recalculate_locks, \
"_LockInstancesNodes helper function called with no nodes to recalculate"
# TODO: check if we're really been called with the instance locks held
if not primary_only:
wanted_nodes.extend(instance.secondary_nodes)
- if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
- self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
- elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
- self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
+ if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
+ self.needed_locks[level] = wanted_nodes
+ elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
+ self.needed_locks[level].extend(wanted_nodes)
+ else:
+ raise errors.ProgrammerError("Unknown recalculation mode")
- del self.recalculate_locks[locking.LEVEL_NODE]
+ del self.recalculate_locks[level]
class NoHooksLU(LogicalUnit): # pylint: disable=W0223
#: Attribute holding field definitions
FIELDS = None
- def __init__(self, filter_, fields, use_locking):
+ #: Field to sort by
+ SORT_FIELD = "name"
+
+ def __init__(self, qfilter, fields, use_locking):
"""Initializes this class.
"""
self.use_locking = use_locking
- self.query = query.Query(self.FIELDS, fields, filter_=filter_,
- namefield="name")
+ self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
+ namefield=self.SORT_FIELD)
self.requested_data = self.query.RequestedData()
self.names = self.query.RequestedNames()
return dict.fromkeys(locking.LEVELS, 1)
+def _MakeLegacyNodeInfo(data):
+ """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
+
+ Converts the data into a single dictionary. This is fine for most use cases,
+ but some require information from more than one volume group or hypervisor.
+
+ """
+ (bootid, (vg_info, ), (hv_info, )) = data
+
+ return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), {
+ "bootid": bootid,
+ })
+
+
+def _AnnotateDiskParams(instance, devs, cfg):
+ """Little helper wrapper to the rpc annotation method.
+
+ @param instance: The instance object
+ @type devs: List of L{objects.Disk}
+ @param devs: The root devices (not any of its children!)
+ @param cfg: The config object
+ @returns The annotated disk copies
+ @see L{rpc.AnnotateDiskParams}
+
+ """
+ return rpc.AnnotateDiskParams(instance.disk_template, devs,
+ cfg.GetInstanceDiskParams(instance))
+
+
+def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
+ cur_group_uuid):
+ """Checks if node groups for locked instances are still correct.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: Cluster configuration
+ @type instances: dict; string as key, L{objects.Instance} as value
+ @param instances: Dictionary, instance name as key, instance object as value
+ @type owned_groups: iterable of string
+ @param owned_groups: List of owned groups
+ @type owned_nodes: iterable of string
+ @param owned_nodes: List of owned nodes
+ @type cur_group_uuid: string or None
+ @param cur_group_uuid: Optional group UUID to check against instance's groups
+
+ """
+ for (name, inst) in instances.items():
+ assert owned_nodes.issuperset(inst.all_nodes), \
+ "Instance %s's nodes changed while we kept the lock" % name
+
+ inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
+
+ assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
+ "Instance %s has no node in group %s" % (name, cur_group_uuid)
+
+
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
"""Checks if the owned node groups are still correct for an instance.
return params_copy
+def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
+ """Return the new version of a instance policy.
+
+ @param group_policy: whether this policy applies to a group and thus
+ we should support removal of policy entries
+
+ """
+ use_none = use_default = group_policy
+ ipolicy = copy.deepcopy(old_ipolicy)
+ for key, value in new_ipolicy.items():
+ if key not in constants.IPOLICY_ALL_KEYS:
+ raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
+ errors.ECODE_INVAL)
+ if key in constants.IPOLICY_ISPECS:
+ utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES)
+ ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
+ use_none=use_none,
+ use_default=use_default)
+ else:
+ if (not value or value == [constants.VALUE_DEFAULT] or
+ value == constants.VALUE_DEFAULT):
+ if group_policy:
+ del ipolicy[key]
+ else:
+ raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
+ " on the cluster'" % key,
+ errors.ECODE_INVAL)
+ else:
+ if key in constants.IPOLICY_PARAMETERS:
+ # FIXME: we assume all such values are float
+ try:
+ ipolicy[key] = float(value)
+ except (TypeError, ValueError), err:
+ raise errors.OpPrereqError("Invalid value for attribute"
+ " '%s': '%s', error: %s" %
+ (key, value, err), errors.ECODE_INVAL)
+ else:
+ # FIXME: we assume all others are lists; this should be redone
+ # in a nicer way
+ ipolicy[key] = list(value)
+ try:
+ objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
+ except errors.ConfigurationError, err:
+ raise errors.OpPrereqError("Invalid instance policy: %s" % err,
+ errors.ECODE_INVAL)
+ return ipolicy
+
+
+def _UpdateAndVerifySubDict(base, updates, type_check):
+ """Updates and verifies a dict with sub dicts of the same type.
+
+ @param base: The dict with the old data
+ @param updates: The dict with the new data
+ @param type_check: Dict suitable to ForceDictType to verify correct types
+ @returns: A new dict with updated and verified values
+
+ """
+ def fn(old, value):
+ new = _GetUpdatedParams(old, value)
+ utils.ForceDictType(new, type_check)
+ return new
+
+ ret = copy.deepcopy(base)
+ ret.update(dict((key, fn(base.get(key, {}), value))
+ for key, value in updates.items()))
+ return ret
+
+
+def _MergeAndVerifyHvState(op_input, obj_input):
+ """Combines the hv state from an opcode with the one of the object
+
+ @param op_input: The input dict from the opcode
+ @param obj_input: The input dict from the objects
+ @return: The verified and updated dict
+
+ """
+ if op_input:
+ invalid_hvs = set(op_input) - constants.HYPER_TYPES
+ if invalid_hvs:
+ raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
+ " %s" % utils.CommaJoin(invalid_hvs),
+ errors.ECODE_INVAL)
+ if obj_input is None:
+ obj_input = {}
+ type_check = constants.HVSTS_PARAMETER_TYPES
+ return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
+
+ return None
+
+
+def _MergeAndVerifyDiskState(op_input, obj_input):
+ """Combines the disk state from an opcode with the one of the object
+
+ @param op_input: The input dict from the opcode
+ @param obj_input: The input dict from the objects
+ @return: The verified and updated dict
+ """
+ if op_input:
+ invalid_dst = set(op_input) - constants.DS_VALID_TYPES
+ if invalid_dst:
+ raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
+ utils.CommaJoin(invalid_dst),
+ errors.ECODE_INVAL)
+ type_check = constants.DSS_PARAMETER_TYPES
+ if obj_input is None:
+ obj_input = {}
+ return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
+ type_check))
+ for key, value in op_input.items())
+
+ return None
+
+
def _ReleaseLocks(lu, level, names=None, keep=None):
"""Releases locks owned by an LU.
else:
should_release = None
- if should_release:
+ owned = lu.owned_locks(level)
+ if not owned:
+ # Not owning any lock at this level, do nothing
+ pass
+
+ elif should_release:
retain = []
release = []
# Determine which locks to release
- for name in lu.owned_locks(level):
+ for name in owned:
if should_release(name):
release.append(name)
else:
"""Runs the post-hook for an opcode on a single node.
"""
- hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
+ hm = lu.proc.BuildHooksManager(lu)
try:
hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
except:
strict=True)
-def _CheckInstanceDown(lu, instance, reason):
- """Ensure that an instance is not running."""
- if instance.admin_up:
- raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
- (instance.name, reason), errors.ECODE_STATE)
+def _CheckInstanceState(lu, instance, req_states, msg=None):
+ """Ensure that an instance is in one of the required states.
+
+ @param lu: the LU on behalf of which we make the check
+ @param instance: the instance to check
+ @param msg: if passed, should be a message to replace the default one
+ @raise errors.OpPrereqError: if the instance is not in the required state
+
+ """
+ if msg is None:
+ msg = "can't use instance from outside %s states" % ", ".join(req_states)
+ if instance.admin_state not in req_states:
+ raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
+ (instance.name, instance.admin_state, msg),
+ errors.ECODE_STATE)
+
+ if constants.ADMINST_UP not in req_states:
+ pnode = instance.primary_node
+ ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
+ ins_l.Raise("Can't contact node %s for instance information" % pnode,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+
+ if instance.name in ins_l.payload:
+ raise errors.OpPrereqError("Instance %s is running, %s" %
+ (instance.name, msg), errors.ECODE_STATE)
+
+
+def _ComputeMinMaxSpec(name, qualifier, ipolicy, value):
+ """Computes if value is in the desired range.
+
+ @param name: name of the parameter for which we perform the check
+ @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
+ not just 'disk')
+ @param ipolicy: dictionary containing min, max and std values
+ @param value: actual value that we want to use
+ @return: None or element not meeting the criteria
+
+
+ """
+ if value in [None, constants.VALUE_AUTO]:
+ return None
+ max_v = ipolicy[constants.ISPECS_MAX].get(name, value)
+ min_v = ipolicy[constants.ISPECS_MIN].get(name, value)
+ if value > max_v or min_v > value:
+ if qualifier:
+ fqn = "%s/%s" % (name, qualifier)
+ else:
+ fqn = name
+ return ("%s value %s is not in range [%s, %s]" %
+ (fqn, value, min_v, max_v))
+ return None
+
+
+def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
+ nic_count, disk_sizes, spindle_use,
+ _compute_fn=_ComputeMinMaxSpec):
+ """Verifies ipolicy against provided specs.
+
+ @type ipolicy: dict
+ @param ipolicy: The ipolicy
+ @type mem_size: int
+ @param mem_size: The memory size
+ @type cpu_count: int
+ @param cpu_count: Used cpu cores
+ @type disk_count: int
+ @param disk_count: Number of disks used
+ @type nic_count: int
+ @param nic_count: Number of nics used
+ @type disk_sizes: list of ints
+ @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
+ @type spindle_use: int
+ @param spindle_use: The number of spindles this instance uses
+ @param _compute_fn: The compute function (unittest only)
+ @return: A list of violations, or an empty list of no violations are found
+
+ """
+ assert disk_count == len(disk_sizes)
+
+ test_settings = [
+ (constants.ISPEC_MEM_SIZE, "", mem_size),
+ (constants.ISPEC_CPU_COUNT, "", cpu_count),
+ (constants.ISPEC_DISK_COUNT, "", disk_count),
+ (constants.ISPEC_NIC_COUNT, "", nic_count),
+ (constants.ISPEC_SPINDLE_USE, "", spindle_use),
+ ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
+ for idx, d in enumerate(disk_sizes)]
+
+ return filter(None,
+ (_compute_fn(name, qualifier, ipolicy, value)
+ for (name, qualifier, value) in test_settings))
+
+
+def _ComputeIPolicyInstanceViolation(ipolicy, instance,
+ _compute_fn=_ComputeIPolicySpecViolation):
+ """Compute if instance meets the specs of ipolicy.
+
+ @type ipolicy: dict
+ @param ipolicy: The ipolicy to verify against
+ @type instance: L{objects.Instance}
+ @param instance: The instance to verify
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{_ComputeIPolicySpecViolation}
+
+ """
+ mem_size = instance.beparams.get(constants.BE_MAXMEM, None)
+ cpu_count = instance.beparams.get(constants.BE_VCPUS, None)
+ spindle_use = instance.beparams.get(constants.BE_SPINDLE_USE, None)
+ disk_count = len(instance.disks)
+ disk_sizes = [disk.size for disk in instance.disks]
+ nic_count = len(instance.nics)
+
+ return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
+ disk_sizes, spindle_use)
+
+
+def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec,
+ _compute_fn=_ComputeIPolicySpecViolation):
+ """Compute if instance specs meets the specs of ipolicy.
+
+ @type ipolicy: dict
+ @param ipolicy: The ipolicy to verify against
+ @param instance_spec: dict
+ @param instance_spec: The instance spec to verify
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{_ComputeIPolicySpecViolation}
+
+ """
+ mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
+ cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
+ disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
+ disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
+ nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
+ spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
+
+ return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
+ disk_sizes, spindle_use)
+
- pnode = instance.primary_node
- ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
- ins_l.Raise("Can't contact node %s for instance information" % pnode,
- prereq=True, ecode=errors.ECODE_ENVIRON)
+def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
+ target_group,
+ _compute_fn=_ComputeIPolicyInstanceViolation):
+ """Compute if instance meets the specs of the new target group.
- if instance.name in ins_l.payload:
- raise errors.OpPrereqError("Instance %s is running, %s" %
- (instance.name, reason), errors.ECODE_STATE)
+ @param ipolicy: The ipolicy to verify
+ @param instance: The instance object to verify
+ @param current_group: The current group of the instance
+ @param target_group: The new group of the instance
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{_ComputeIPolicySpecViolation}
+
+ """
+ if current_group == target_group:
+ return []
+ else:
+ return _compute_fn(ipolicy, instance)
+
+
+def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, ignore=False,
+ _compute_fn=_ComputeIPolicyNodeViolation):
+ """Checks that the target node is correct in terms of instance policy.
+
+ @param ipolicy: The ipolicy to verify
+ @param instance: The instance object to verify
+ @param node: The new node to relocate
+ @param ignore: Ignore violations of the ipolicy
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{_ComputeIPolicySpecViolation}
+
+ """
+ primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
+ res = _compute_fn(ipolicy, instance, primary_node.group, node.group)
+
+ if res:
+ msg = ("Instance does not meet target node group's (%s) instance"
+ " policy: %s") % (node.group, utils.CommaJoin(res))
+ if ignore:
+ lu.LogWarning(msg)
+ else:
+ raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+
+def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances):
+ """Computes a set of any instances that would violate the new ipolicy.
+
+ @param old_ipolicy: The current (still in-place) ipolicy
+ @param new_ipolicy: The new (to become) ipolicy
+ @param instances: List of instances to verify
+ @return: A list of instances which violates the new ipolicy but
+ did not before
+
+ """
+ return (_ComputeViolatingInstances(new_ipolicy, instances) -
+ _ComputeViolatingInstances(old_ipolicy, instances))
def _ExpandItemName(fn, name, kind):
"""Wrapper over L{_ExpandItemName} for instance."""
return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
+def _BuildNetworkHookEnv(name, network, gateway, network6, gateway6,
+ network_type, mac_prefix, tags):
+ env = dict()
+ if name:
+ env["NETWORK_NAME"] = name
+ if network:
+ env["NETWORK_SUBNET"] = network
+ if gateway:
+ env["NETWORK_GATEWAY"] = gateway
+ if network6:
+ env["NETWORK_SUBNET6"] = network6
+ if gateway6:
+ env["NETWORK_GATEWAY6"] = gateway6
+ if mac_prefix:
+ env["NETWORK_MAC_PREFIX"] = mac_prefix
+ if network_type:
+ env["NETWORK_TYPE"] = network_type
+ if tags:
+ env["NETWORK_TAGS"] = " ".join(tags)
+
+ return env
+
+
+def _BuildNetworkHookEnvByObject(lu, network):
+ args = {
+ "name": network.name,
+ "network": network.network,
+ "gateway": network.gateway,
+ "network6": network.network6,
+ "gateway6": network.gateway6,
+ "network_type": network.network_type,
+ "mac_prefix": network.mac_prefix,
+ "tags" : network.tags,
+ }
+ return _BuildNetworkHookEnv(**args)
+
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
- memory, vcpus, nics, disk_template, disks,
+ minmem, maxmem, vcpus, nics, disk_template, disks,
bep, hvp, hypervisor_name, tags):
"""Builds instance related env variables for hooks
@param secondary_nodes: list of secondary nodes as strings
@type os_type: string
@param os_type: the name of the instance's OS
- @type status: boolean
- @param status: the should_run status of the instance
- @type memory: string
- @param memory: the memory size of the instance
+ @type status: string
+ @param status: the desired status of the instance
+ @type minmem: string
+ @param minmem: the minimum memory size of the instance
+ @type maxmem: string
+ @param maxmem: the maximum memory size of the instance
@type vcpus: string
@param vcpus: the count of VCPUs the instance has
@type nics: list
- @param nics: list of tuples (ip, mac, mode, link) representing
+ @param nics: list of tuples (ip, mac, mode, link, network) representing
the NICs the instance has
@type disk_template: string
@param disk_template: the disk template of the instance
@return: the hook environment for this instance
"""
- if status:
- str_status = "up"
- else:
- str_status = "down"
env = {
"OP_TARGET": name,
"INSTANCE_NAME": name,
"INSTANCE_PRIMARY": primary_node,
"INSTANCE_SECONDARIES": " ".join(secondary_nodes),
"INSTANCE_OS_TYPE": os_type,
- "INSTANCE_STATUS": str_status,
- "INSTANCE_MEMORY": memory,
+ "INSTANCE_STATUS": status,
+ "INSTANCE_MINMEM": minmem,
+ "INSTANCE_MAXMEM": maxmem,
+ # TODO(2.7) remove deprecated "memory" value
+ "INSTANCE_MEMORY": maxmem,
"INSTANCE_VCPUS": vcpus,
"INSTANCE_DISK_TEMPLATE": disk_template,
"INSTANCE_HYPERVISOR": hypervisor_name,
}
-
if nics:
nic_count = len(nics)
- for idx, (ip, mac, mode, link) in enumerate(nics):
+ for idx, (ip, mac, mode, link, network, netinfo) in enumerate(nics):
if ip is None:
ip = ""
env["INSTANCE_NIC%d_IP" % idx] = ip
env["INSTANCE_NIC%d_MAC" % idx] = mac
env["INSTANCE_NIC%d_MODE" % idx] = mode
env["INSTANCE_NIC%d_LINK" % idx] = link
+ if network:
+ env["INSTANCE_NIC%d_NETWORK" % idx] = network
+ if netinfo:
+ nobj = objects.Network.FromDict(netinfo)
+ if nobj.network:
+ env["INSTANCE_NIC%d_NETWORK_SUBNET" % idx] = nobj.network
+ if nobj.gateway:
+ env["INSTANCE_NIC%d_NETWORK_GATEWAY" % idx] = nobj.gateway
+ if nobj.network6:
+ env["INSTANCE_NIC%d_NETWORK_SUBNET6" % idx] = nobj.network6
+ if nobj.gateway6:
+ env["INSTANCE_NIC%d_NETWORK_GATEWAY6" % idx] = nobj.gateway6
+ if nobj.mac_prefix:
+ env["INSTANCE_NIC%d_NETWORK_MAC_PREFIX" % idx] = nobj.mac_prefix
+ if nobj.network_type:
+ env["INSTANCE_NIC%d_NETWORK_TYPE" % idx] = nobj.network_type
+ if nobj.tags:
+ env["INSTANCE_NIC%d_NETWORK_TAGS" % idx] = " ".join(nobj.tags)
if mode == constants.NIC_MODE_BRIDGED:
env["INSTANCE_NIC%d_BRIDGE" % idx] = link
else:
return env
+def _NICToTuple(lu, nic):
+ """Build a tupple of nic information.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type nic: L{objects.NIC}
+ @param nic: nic to convert to hooks tuple
+
+ """
+ cluster = lu.cfg.GetClusterInfo()
+ ip = nic.ip
+ mac = nic.mac
+ filled_params = cluster.SimpleFillNIC(nic.nicparams)
+ mode = filled_params[constants.NIC_MODE]
+ link = filled_params[constants.NIC_LINK]
+ network = nic.network
+ netinfo = None
+ if network:
+ net_uuid = lu.cfg.LookupNetwork(network)
+ if net_uuid:
+ nobj = lu.cfg.GetNetwork(net_uuid)
+ netinfo = objects.Network.ToDict(nobj)
+ return (ip, mac, mode, link, network, netinfo)
def _NICListToTuple(lu, nics):
"""Build a list of nic information tuples.
hooks_nics = []
cluster = lu.cfg.GetClusterInfo()
for nic in nics:
- ip = nic.ip
- mac = nic.mac
- filled_params = cluster.SimpleFillNIC(nic.nicparams)
- mode = filled_params[constants.NIC_MODE]
- link = filled_params[constants.NIC_LINK]
- hooks_nics.append((ip, mac, mode, link))
+ hooks_nics.append(_NICToTuple(lu, nic))
return hooks_nics
-
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
"""Builds instance related env variables for hooks from an object.
"primary_node": instance.primary_node,
"secondary_nodes": instance.secondary_nodes,
"os_type": instance.os,
- "status": instance.admin_up,
- "memory": bep[constants.BE_MEMORY],
+ "status": instance.admin_state,
+ "maxmem": bep[constants.BE_MAXMEM],
+ "minmem": bep[constants.BE_MINMEM],
"vcpus": bep[constants.BE_VCPUS],
"nics": _NICListToTuple(lu, instance.nics),
"disk_template": instance.disk_template,
return mc_now < mc_should
+def _CalculateGroupIPolicy(cluster, group):
+ """Calculate instance policy for group.
+
+ """
+ return cluster.SimpleFillIPolicy(group.ipolicy)
+
+
+def _ComputeViolatingInstances(ipolicy, instances):
+ """Computes a set of instances who violates given ipolicy.
+
+ @param ipolicy: The ipolicy to verify
+ @type instances: object.Instance
+ @param instances: List of instances to verify
+ @return: A frozenset of instance names violating the ipolicy
+
+ """
+ return frozenset([inst.name for inst in instances
+ if _ComputeIPolicyInstanceViolation(ipolicy, inst)])
+
+
def _CheckNicsBridgesExist(lu, target_nics, target_node):
"""Check that the brigdes needed by a list of nics exist.
return []
-def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
+def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
faulty = []
for dev in instance.disks:
cfg.SetDiskID(dev, node_name)
- result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
+ result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
+ instance))
result.Raise("Failed to get disk status from node %s" % node_name,
prereq=prereq, ecode=errors.ECODE_ENVIRON)
"""Destroys the cluster.
"""
- master = self.cfg.GetMasterNode()
+ master_params = self.cfg.GetMasterNetworkParameters()
# Run post hooks on master node before it's removed
- _RunPostHook(self, master)
+ _RunPostHook(self, master_params.name)
- result = self.rpc.call_node_stop_master(master, False)
- result.Raise("Could not disable the master role")
+ ems = self.cfg.GetUseExternalMipScript()
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params, ems)
+ if result.fail_msg:
+ self.LogWarning("Error disabling the master IP address: %s",
+ result.fail_msg)
- return master
+ return master_params.name
def _VerifyCertificate(filename):
self.op and self._feedback_fn to be available.)
"""
- TCLUSTER = "cluster"
- TNODE = "node"
- TINSTANCE = "instance"
-
- ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
- ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
- ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
- ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
- ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
- EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
- EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
- EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
- EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
- EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
- EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
- EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
- ENODEDRBD = (TNODE, "ENODEDRBD")
- ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
- ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
- ENODEHOOKS = (TNODE, "ENODEHOOKS")
- ENODEHV = (TNODE, "ENODEHV")
- ENODELVM = (TNODE, "ENODELVM")
- ENODEN1 = (TNODE, "ENODEN1")
- ENODENET = (TNODE, "ENODENET")
- ENODEOS = (TNODE, "ENODEOS")
- ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
- ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
- ENODERPC = (TNODE, "ENODERPC")
- ENODESSH = (TNODE, "ENODESSH")
- ENODEVERSION = (TNODE, "ENODEVERSION")
- ENODESETUP = (TNODE, "ENODESETUP")
- ENODETIME = (TNODE, "ENODETIME")
- ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
ETYPE_FIELD = "code"
ETYPE_ERROR = "ERROR"
"""
ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
- itype, etxt = ecode
+ itype, etxt, _ = ecode
# first complete the msg
if args:
msg = msg % args
# and finally report it via the feedback_fn
self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
- def _ErrorIf(self, cond, *args, **kwargs):
+ def _ErrorIf(self, cond, ecode, *args, **kwargs):
"""Log an error message if the passed condition is True.
"""
cond = (bool(cond)
or self.op.debug_simulate_errors) # pylint: disable=E1101
+
+ # If the error code is in the list of ignored errors, demote the error to a
+ # warning
+ (_, etxt, _) = ecode
+ if etxt in self.op.ignore_errors: # pylint: disable=E1101
+ kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
+
if cond:
- self._Error(*args, **kwargs)
+ self._Error(ecode, *args, **kwargs)
+
# do not mark the operation as failed for WARN cases only
if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
self.bad = self.bad or cond
groups = self.cfg.GetNodeGroupList()
# Verify global configuration
- jobs.append([opcodes.OpClusterVerifyConfig()])
+ jobs.append([
+ opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
+ ])
# Always depend on global verification
depends_fn = lambda: [(-len(jobs), [])]
jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group,
- depends=depends_fn())]
+ ignore_errors=self.op.ignore_errors,
+ depends=depends_fn())]
for group in groups)
# Fix up all parameters
utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
hv_class.CheckParameterSyntax(hv_params)
except errors.GenericError, err:
- self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+ self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
def ExpandNames(self):
self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
feedback_fn("* Verifying cluster config")
for msg in self.cfg.VerifyConfig():
- self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
+ self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
feedback_fn("* Verifying cluster certificate files")
for cert_filename in constants.ALL_CERT_FILES:
(errcode, msg) = _VerifyCertificate(cert_filename)
- self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
+ self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
feedback_fn("* Verifying hypervisor parameters")
["no instances"])))
for node in dangling_nodes]
- self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
+ self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
+ None,
"the following nodes (and their instances) belong to a non"
" existing group: %s", utils.CommaJoin(pretty_dangling))
- self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
+ self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
+ None,
"the following instances have a non-existing primary-node:"
" %s", utils.CommaJoin(no_node_instances))
# main result, nresult should be a non-empty dict
test = not nresult or not isinstance(nresult, dict)
- _ErrorIf(test, self.ENODERPC, node,
+ _ErrorIf(test, constants.CV_ENODERPC, node,
"unable to verify node: no data returned")
if test:
return False
test = not (remote_version and
isinstance(remote_version, (list, tuple)) and
len(remote_version) == 2)
- _ErrorIf(test, self.ENODERPC, node,
+ _ErrorIf(test, constants.CV_ENODERPC, node,
"connection to node returned invalid data")
if test:
return False
test = local_version != remote_version[0]
- _ErrorIf(test, self.ENODEVERSION, node,
+ _ErrorIf(test, constants.CV_ENODEVERSION, node,
"incompatible protocol versions: master %s,"
" node %s", local_version, remote_version[0])
if test:
# full package version
self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
- self.ENODEVERSION, node,
+ constants.CV_ENODEVERSION, node,
"software version mismatch: master %s, node %s",
constants.RELEASE_VERSION, remote_version[1],
code=self.ETYPE_WARNING)
if ninfo.vm_capable and isinstance(hyp_result, dict):
for hv_name, hv_result in hyp_result.iteritems():
test = hv_result is not None
- _ErrorIf(test, self.ENODEHV, node,
+ _ErrorIf(test, constants.CV_ENODEHV, node,
"hypervisor %s verify failure: '%s'", hv_name, hv_result)
hvp_result = nresult.get(constants.NV_HVPARAMS, None)
if ninfo.vm_capable and isinstance(hvp_result, list):
for item, hv_name, hv_result in hvp_result:
- _ErrorIf(True, self.ENODEHV, node,
+ _ErrorIf(True, constants.CV_ENODEHV, node,
"hypervisor %s parameter verify failure (source %s): %s",
hv_name, item, hv_result)
test = nresult.get(constants.NV_NODESETUP,
["Missing NODESETUP results"])
- _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
+ _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
"; ".join(test))
return True
try:
ntime_merged = utils.MergeTime(ntime)
except (ValueError, TypeError):
- _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
+ _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
return
if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
else:
ntime_diff = None
- _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
+ _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
"Node time diverges by at least %s from master node time",
ntime_diff)
# checks vg existence and size > 20G
vglist = nresult.get(constants.NV_VGLIST, None)
test = not vglist
- _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
+ _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
if not test:
vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
constants.MIN_VG_SIZE)
- _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
+ _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
# check pv names
pvlist = nresult.get(constants.NV_PVLIST, None)
test = pvlist is None
- _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
+ _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
if not test:
# check that ':' is not present in PV names, since it's a
# special character for lvcreate (denotes the range of PEs to
# use on the PV)
for _, pvname, owner_vg in pvlist:
test = ":" in pvname
- _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
- " '%s' of VG '%s'", pvname, owner_vg)
+ _ErrorIf(test, constants.CV_ENODELVM, node,
+ "Invalid character ':' in PV '%s' of VG '%s'",
+ pvname, owner_vg)
def _VerifyNodeBridges(self, ninfo, nresult, bridges):
"""Check the node bridges.
missing = nresult.get(constants.NV_BRIDGES, None)
test = not isinstance(missing, list)
- _ErrorIf(test, self.ENODENET, node,
+ _ErrorIf(test, constants.CV_ENODENET, node,
"did not return valid bridge information")
if not test:
- _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
- utils.CommaJoin(sorted(missing)))
+ _ErrorIf(bool(missing), constants.CV_ENODENET, node,
+ "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
+
+ def _VerifyNodeUserScripts(self, ninfo, nresult):
+ """Check the results of user scripts presence and executability on the node
+
+ @type ninfo: L{objects.Node}
+ @param ninfo: the node to check
+ @param nresult: the remote results for the node
+
+ """
+ node = ninfo.name
+
+ test = not constants.NV_USERSCRIPTS in nresult
+ self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
+ "did not return user scripts information")
+
+ broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
+ if not test:
+ self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
+ "user scripts not present or not executable: %s" %
+ utils.CommaJoin(sorted(broken_scripts)))
def _VerifyNodeNetwork(self, ninfo, nresult):
"""Check the node network connectivity results.
_ErrorIf = self._ErrorIf # pylint: disable=C0103
test = constants.NV_NODELIST not in nresult
- _ErrorIf(test, self.ENODESSH, node,
+ _ErrorIf(test, constants.CV_ENODESSH, node,
"node hasn't returned node ssh connectivity data")
if not test:
if nresult[constants.NV_NODELIST]:
for a_node, a_msg in nresult[constants.NV_NODELIST].items():
- _ErrorIf(True, self.ENODESSH, node,
+ _ErrorIf(True, constants.CV_ENODESSH, node,
"ssh communication with node '%s': %s", a_node, a_msg)
test = constants.NV_NODENETTEST not in nresult
- _ErrorIf(test, self.ENODENET, node,
+ _ErrorIf(test, constants.CV_ENODENET, node,
"node hasn't returned node tcp connectivity data")
if not test:
if nresult[constants.NV_NODENETTEST]:
nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
for anode in nlist:
- _ErrorIf(True, self.ENODENET, node,
+ _ErrorIf(True, constants.CV_ENODENET, node,
"tcp communication with node '%s': %s",
anode, nresult[constants.NV_NODENETTEST][anode])
test = constants.NV_MASTERIP not in nresult
- _ErrorIf(test, self.ENODENET, node,
+ _ErrorIf(test, constants.CV_ENODENET, node,
"node hasn't returned node master IP reachability data")
if not test:
if not nresult[constants.NV_MASTERIP]:
msg = "the master node cannot reach the master IP (not configured?)"
else:
msg = "cannot reach the master IP"
- _ErrorIf(True, self.ENODENET, node, msg)
+ _ErrorIf(True, constants.CV_ENODENET, node, msg)
def _VerifyInstance(self, instance, instanceconfig, node_image,
diskstatus):
node_vol_should = {}
instanceconfig.MapLVsByNode(node_vol_should)
+ ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info)
+ err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
+ _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err))
+
for node in node_vol_should:
n_img = node_image[node]
if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
continue
for volume in node_vol_should[node]:
test = volume not in n_img.volumes
- _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
+ _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
"volume %s missing on node %s", volume, node)
- if instanceconfig.admin_up:
+ if instanceconfig.admin_state == constants.ADMINST_UP:
pri_img = node_image[node_current]
test = instance not in pri_img.instances and not pri_img.offline
- _ErrorIf(test, self.EINSTANCEDOWN, instance,
+ _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
"instance not running on its primary node %s",
node_current)
# node here
snode = node_image[nname]
bad_snode = snode.ghost or snode.offline
- _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
- self.EINSTANCEFAULTYDISK, instance,
+ _ErrorIf(instanceconfig.admin_state == constants.ADMINST_UP and
+ not success and not bad_snode,
+ constants.CV_EINSTANCEFAULTYDISK, instance,
"couldn't retrieve status for disk/%s on %s: %s",
idx, nname, bdev_status)
- _ErrorIf((instanceconfig.admin_up and success and
- bdev_status.ldisk_status == constants.LDS_FAULTY),
- self.EINSTANCEFAULTYDISK, instance,
+ _ErrorIf((instanceconfig.admin_state == constants.ADMINST_UP and
+ success and bdev_status.ldisk_status == constants.LDS_FAULTY),
+ constants.CV_EINSTANCEFAULTYDISK, instance,
"disk/%s on %s is faulty", idx, nname)
def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
test = ((node not in node_vol_should or
volume not in node_vol_should[node]) and
not reserved.Matches(volume))
- self._ErrorIf(test, self.ENODEORPHANLV, node,
+ self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
"volume %s is unknown", volume)
def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
# infromation from them; we already list instances living on such
# nodes, and that's enough warning
continue
+ #TODO(dynmem): also consider ballooning out other instances
for prinode, instances in n_img.sbp.items():
needed_mem = 0
for instance in instances:
bep = cluster_info.FillBE(instance_cfg[instance])
if bep[constants.BE_AUTO_BALANCE]:
- needed_mem += bep[constants.BE_MEMORY]
+ needed_mem += bep[constants.BE_MINMEM]
test = n_img.mfree < needed_mem
- self._ErrorIf(test, self.ENODEN1, node,
+ self._ErrorIf(test, constants.CV_ENODEN1, node,
"not enough memory to accomodate instance failovers"
" should node %s fail (%dMiB needed, %dMiB available)",
prinode, needed_mem, n_img.mfree)
@classmethod
def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
- (files_all, files_all_opt, files_mc, files_vm)):
+ (files_all, files_opt, files_mc, files_vm)):
"""Verifies file checksums collected from all nodes.
@param errorif: Callback for reporting errors
@param all_nvinfo: RPC results
"""
- assert (len(files_all | files_all_opt | files_mc | files_vm) ==
- sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
- "Found file listed in more than one file list"
-
# Define functions determining which nodes to consider for a file
files2nodefn = [
(files_all, None),
- (files_all_opt, None),
(files_mc, lambda node: (node.master_candidate or
node.name == master_node)),
(files_vm, lambda node: node.vm_capable),
frozenset(map(operator.attrgetter("name"), filenodes)))
for filename in files)
- assert set(nodefiles) == (files_all | files_all_opt | files_mc | files_vm)
+ assert set(nodefiles) == (files_all | files_mc | files_vm)
fileinfo = dict((filename, {}) for filename in nodefiles)
ignore_nodes = set()
node_files = nresult.payload.get(constants.NV_FILELIST, None)
test = not (node_files and isinstance(node_files, dict))
- errorif(test, cls.ENODEFILECHECK, node.name,
+ errorif(test, constants.CV_ENODEFILECHECK, node.name,
"Node did not return file checksum data")
if test:
ignore_nodes.add(node.name)
# Nodes missing file
missing_file = expected_nodes - with_file
- if filename in files_all_opt:
+ if filename in files_opt:
# All or no nodes
errorif(missing_file and missing_file != expected_nodes,
- cls.ECLUSTERFILECHECK, None,
+ constants.CV_ECLUSTERFILECHECK, None,
"File %s is optional, but it must exist on all or no"
" nodes (not found on %s)",
filename, utils.CommaJoin(utils.NiceSort(missing_file)))
else:
- # Non-optional files
- errorif(missing_file, cls.ECLUSTERFILECHECK, None,
+ errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
"File %s is missing from node(s) %s", filename,
utils.CommaJoin(utils.NiceSort(missing_file)))
# Warn if a node has a file it shouldn't
unexpected = with_file - expected_nodes
errorif(unexpected,
- cls.ECLUSTERFILECHECK, None,
+ constants.CV_ECLUSTERFILECHECK, None,
"File %s should not exist on node(s) %s",
filename, utils.CommaJoin(utils.NiceSort(unexpected)))
else:
variants = []
- errorif(test, cls.ECLUSTERFILECHECK, None,
+ errorif(test, constants.CV_ECLUSTERFILECHECK, None,
"File %s found with %s different checksums (%s)",
filename, len(checksums), "; ".join(variants))
if drbd_helper:
helper_result = nresult.get(constants.NV_DRBDHELPER, None)
test = (helper_result == None)
- _ErrorIf(test, self.ENODEDRBDHELPER, node,
+ _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
"no drbd usermode helper returned")
if helper_result:
status, payload = helper_result
test = not status
- _ErrorIf(test, self.ENODEDRBDHELPER, node,
+ _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
"drbd usermode helper check unsuccessful: %s", payload)
test = status and (payload != drbd_helper)
- _ErrorIf(test, self.ENODEDRBDHELPER, node,
+ _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
"wrong drbd usermode helper: %s", payload)
# compute the DRBD minors
node_drbd = {}
for minor, instance in drbd_map[node].items():
test = instance not in instanceinfo
- _ErrorIf(test, self.ECLUSTERCFG, None,
+ _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
"ghost instance '%s' in temporary DRBD map", instance)
# ghost instance should not be running, but otherwise we
# don't give double warnings (both ghost instance and
node_drbd[minor] = (instance, False)
else:
instance = instanceinfo[instance]
- node_drbd[minor] = (instance.name, instance.admin_up)
+ node_drbd[minor] = (instance.name,
+ instance.admin_state == constants.ADMINST_UP)
# and now check them
used_minors = nresult.get(constants.NV_DRBDLIST, [])
test = not isinstance(used_minors, (tuple, list))
- _ErrorIf(test, self.ENODEDRBD, node,
+ _ErrorIf(test, constants.CV_ENODEDRBD, node,
"cannot parse drbd status file: %s", str(used_minors))
if test:
# we cannot check drbd status
for minor, (iname, must_exist) in node_drbd.items():
test = minor not in used_minors and must_exist
- _ErrorIf(test, self.ENODEDRBD, node,
+ _ErrorIf(test, constants.CV_ENODEDRBD, node,
"drbd minor %d of instance %s is not active", minor, iname)
for minor in used_minors:
test = minor not in node_drbd
- _ErrorIf(test, self.ENODEDRBD, node,
+ _ErrorIf(test, constants.CV_ENODEDRBD, node,
"unallocated drbd minor %d is in use", minor)
def _UpdateNodeOS(self, ninfo, nresult, nimg):
not compat.all(isinstance(v, list) and len(v) == 7
for v in remote_os))
- _ErrorIf(test, self.ENODEOS, node,
+ _ErrorIf(test, constants.CV_ENODEOS, node,
"node hasn't returned valid OS data")
nimg.os_fail = test
for os_name, os_data in nimg.oslist.items():
assert os_data, "Empty OS status for OS %s?!" % os_name
f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
- _ErrorIf(not f_status, self.ENODEOS, node,
+ _ErrorIf(not f_status, constants.CV_ENODEOS, node,
"Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
- _ErrorIf(len(os_data) > 1, self.ENODEOS, node,
+ _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
"OS '%s' has multiple entries (first one shadows the rest): %s",
os_name, utils.CommaJoin([v[0] for v in os_data]))
# comparisons with the 'base' image
test = os_name not in base.oslist
- _ErrorIf(test, self.ENODEOS, node,
+ _ErrorIf(test, constants.CV_ENODEOS, node,
"Extra OS %s not present on reference node (%s)",
os_name, base.name)
if test:
("variants list", f_var, b_var),
("parameters", beautify_params(f_param),
beautify_params(b_param))]:
- _ErrorIf(a != b, self.ENODEOS, node,
+ _ErrorIf(a != b, constants.CV_ENODEOS, node,
"OS %s for %s differs from reference node %s: [%s] vs. [%s]",
kind, os_name, base.name,
utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
# check any missing OSes
missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
- _ErrorIf(missing, self.ENODEOS, node,
+ _ErrorIf(missing, constants.CV_ENODEOS, node,
"OSes present on reference node %s but missing on this node: %s",
base.name, utils.CommaJoin(missing))
if ((ninfo.master_candidate or ninfo.master_capable) and
constants.NV_OOB_PATHS in nresult):
for path_result in nresult[constants.NV_OOB_PATHS]:
- self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
+ self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
"""Verifies and updates the node volume data.
if vg_name is None:
pass
elif isinstance(lvdata, basestring):
- _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
+ _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
utils.SafeEncode(lvdata))
elif not isinstance(lvdata, dict):
- _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
+ _ErrorIf(True, constants.CV_ENODELVM, node,
+ "rpc call to node failed (lvlist)")
else:
nimg.volumes = lvdata
nimg.lvm_fail = False
"""
idata = nresult.get(constants.NV_INSTANCELIST, None)
test = not isinstance(idata, list)
- self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
- " (instancelist): %s", utils.SafeEncode(str(idata)))
+ self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
+ "rpc call to node failed (instancelist): %s",
+ utils.SafeEncode(str(idata)))
if test:
nimg.hyp_fail = True
else:
# try to read free memory (from the hypervisor)
hv_info = nresult.get(constants.NV_HVINFO, None)
test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
- _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
+ _ErrorIf(test, constants.CV_ENODEHV, node,
+ "rpc call to node failed (hvinfo)")
if not test:
try:
nimg.mfree = int(hv_info["memory_free"])
except (ValueError, TypeError):
- _ErrorIf(True, self.ENODERPC, node,
+ _ErrorIf(True, constants.CV_ENODERPC, node,
"node returned invalid nodeinfo, check hypervisor")
# FIXME: devise a free space model for file based instances as well
if vg_name is not None:
test = (constants.NV_VGLIST not in nresult or
vg_name not in nresult[constants.NV_VGLIST])
- _ErrorIf(test, self.ENODELVM, node,
+ _ErrorIf(test, constants.CV_ENODELVM, node,
"node didn't return data for the volume group '%s'"
" - it is either missing or broken", vg_name)
if not test:
try:
nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
except (ValueError, TypeError):
- _ErrorIf(True, self.ENODERPC, node,
+ _ErrorIf(True, constants.CV_ENODERPC, node,
"node returned invalid LVM info, check LVM status")
def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
node_disks[nname] = disks
- # Creating copies as SetDiskID below will modify the objects and that can
- # lead to incorrect data returned from nodes
- devonly = [dev.Copy() for (_, dev) in disks]
-
- for dev in devonly:
- self.cfg.SetDiskID(dev, nname)
+ # _AnnotateDiskParams makes already copies of the disks
+ devonly = []
+ for (inst, dev) in disks:
+ (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
+ self.cfg.SetDiskID(anno_disk, nname)
+ devonly.append(anno_disk)
node_disks_devonly[nname] = devonly
data = len(disks) * [(False, "node offline")]
else:
msg = nres.fail_msg
- _ErrorIf(msg, self.ENODERPC, nname,
+ _ErrorIf(msg, constants.CV_ENODERPC, nname,
"while getting disk information: %s", msg)
if msg:
# No data from this node
i_non_redundant = [] # Non redundant instances
i_non_a_balanced = [] # Non auto-balanced instances
+ i_offline = 0 # Count of offline instances
n_offline = 0 # Count of offline nodes
n_drained = 0 # Count of nodes being drained
node_vol_should = {}
feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
+ user_scripts = []
+ if self.cfg.GetUseExternalMipScript():
+ user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+
node_verify_param = {
constants.NV_FILELIST:
utils.UniqueSequence(filename
constants.NV_MASTERIP: (master_node, master_ip),
constants.NV_OSLIST: None,
constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
+ constants.NV_USERSCRIPTS: user_scripts,
}
if vg_name is not None:
for instance in self.my_inst_names:
inst_config = self.my_inst_info[instance]
+ if inst_config.admin_state == constants.ADMINST_OFFLINE:
+ i_offline += 1
for nname in inst_config.all_nodes:
if nname not in node_image:
if master_node not in self.my_node_info:
additional_nodes.append(master_node)
vf_node_info.append(self.all_node_info[master_node])
- # Add the first vm_capable node we find which is not included
+ # Add the first vm_capable node we find which is not included,
+ # excluding the master node (which we already have)
for node in absent_nodes:
nodeinfo = self.all_node_info[node]
- if nodeinfo.vm_capable and not nodeinfo.offline:
+ if (nodeinfo.vm_capable and not nodeinfo.offline and
+ node != master_node):
additional_nodes.append(node)
vf_node_info.append(self.all_node_info[node])
break
feedback_fn("* Verifying node %s (%s)" % (node, ntype))
msg = all_nvinfo[node].fail_msg
- _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
+ _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
+ msg)
if msg:
nimg.rpc_fail = True
continue
nimg.call_ok = self._VerifyNode(node_i, nresult)
self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
self._VerifyNodeNetwork(node_i, nresult)
+ self._VerifyNodeUserScripts(node_i, nresult)
self._VerifyOob(node_i, nresult)
if nimg.vm_capable:
for inst in non_primary_inst:
test = inst in self.all_inst_info
- _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
+ _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
"instance should not run on node %s", node_i.name)
- _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
+ _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
"node is running unknown instance %s", inst)
for node, result in extra_lv_nvinfo.items():
pnode = inst_config.primary_node
pnode_img = node_image[pnode]
_ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
- self.ENODERPC, pnode, "instance %s, connection to"
+ constants.CV_ENODERPC, pnode, "instance %s, connection to"
" primary node failed", instance)
- _ErrorIf(inst_config.admin_up and pnode_img.offline,
- self.EINSTANCEBADNODE, instance,
+ _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
+ pnode_img.offline,
+ constants.CV_EINSTANCEBADNODE, instance,
"instance is marked as running and lives on offline node %s",
inst_config.primary_node)
if not inst_config.secondary_nodes:
i_non_redundant.append(instance)
- _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
+ _ErrorIf(len(inst_config.secondary_nodes) > 1,
+ constants.CV_EINSTANCELAYOUT,
instance, "instance has multiple secondary nodes: %s",
utils.CommaJoin(inst_config.secondary_nodes),
code=self.ETYPE_WARNING)
key=lambda (_, nodes): pnode in nodes,
reverse=True)]
- self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
+ self._ErrorIf(len(instance_groups) > 1,
+ constants.CV_EINSTANCESPLITGROUPS,
instance, "instance has primary and secondary nodes in"
" different groups: %s", utils.CommaJoin(pretty_list),
code=self.ETYPE_WARNING)
for snode in inst_config.secondary_nodes:
s_img = node_image[snode]
- _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
- "instance %s, connection to secondary node failed", instance)
+ _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
+ snode, "instance %s, connection to secondary node failed",
+ instance)
if s_img.offline:
inst_nodes_offline.append(snode)
# warn that the instance lives on offline nodes
- _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
+ _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
"instance has offline secondary node(s) %s",
utils.CommaJoin(inst_nodes_offline))
# ... or ghost/non-vm_capable nodes
for node in inst_config.all_nodes:
- _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
- "instance lives on ghost node %s", node)
- _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
+ _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
+ instance, "instance lives on ghost node %s", node)
+ _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
instance, "instance lives on non-vm_capable node %s", node)
feedback_fn("* Verifying orphan volumes")
feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
% len(i_non_a_balanced))
+ if i_offline:
+ feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
+
if n_offline:
feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
res = hooks_results[node_name]
msg = res.fail_msg
test = msg and not res.offline
- self._ErrorIf(test, self.ENODEHOOKS, node_name,
+ self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
"Communication failure in hooks execution: %s", msg)
if res.offline or msg:
# No need to investigate payload if node is offline or gave
continue
for script, hkr, output in res.payload:
test = hkr == constants.HKR_FAIL
- self._ErrorIf(test, self.ENODEHOOKS, node_name,
+ self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
"Script %s failed, output:", script)
if test:
output = self._HOOKS_INDENT_RE.sub(" ", output)
self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
- for (instance_name, inst) in self.instances.items():
- assert owned_nodes.issuperset(inst.all_nodes), \
- "Instance %s's nodes changed while we kept the lock" % instance_name
-
- inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
- owned_groups)
-
- assert self.group_uuid in inst_groups, \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ _CheckInstancesNodeGroups(self.cfg, self.instances,
+ owned_groups, owned_nodes, self.group_uuid)
def Exec(self, feedback_fn):
"""Verify integrity of cluster disks.
res_missing = {}
nv_dict = _MapInstanceDisksToNodes([inst
- for inst in self.instances.values()
- if inst.admin_up])
+ for inst in self.instances.values()
+ if inst.admin_state == constants.ADMINST_UP])
if nv_dict:
nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
if self.op.instances:
self.wanted_names = _GetWantedInstances(self, self.op.instances)
self.needed_locks = {
- locking.LEVEL_NODE: [],
+ locking.LEVEL_NODE_RES: [],
locking.LEVEL_INSTANCE: self.wanted_names,
}
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
else:
self.wanted_names = None
self.needed_locks = {
- locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_NODE_RES: locking.ALL_SET,
locking.LEVEL_INSTANCE: locking.ALL_SET,
}
self.share_locks = {
- locking.LEVEL_NODE: 1,
+ locking.LEVEL_NODE_RES: 1,
locking.LEVEL_INSTANCE: 0,
}
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE and self.wanted_names is not None:
- self._LockInstancesNodes(primary_only=True)
+ if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
+ self._LockInstancesNodes(primary_only=True, level=level)
def CheckPrereq(self):
"""Check prerequisites.
for idx, disk in enumerate(instance.disks):
per_node_disks[pnode].append((instance, idx, disk))
+ assert not (frozenset(per_node_disks.keys()) -
+ self.owned_locks(locking.LEVEL_NODE_RES)), \
+ "Not owning correct locks"
+ assert not self.owned_locks(locking.LEVEL_NODE)
+
changed = []
for node, dskl in per_node_disks.items():
newl = [v[2].Copy() for v in dskl]
"""
clustername = self.op.name
- ip = self.ip
+ new_ip = self.ip
# shutdown the master IP
- master = self.cfg.GetMasterNode()
- result = self.rpc.call_node_stop_master(master, False)
+ master_params = self.cfg.GetMasterNetworkParameters()
+ ems = self.cfg.GetUseExternalMipScript()
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params, ems)
result.Raise("Could not disable the master role")
try:
cluster = self.cfg.GetClusterInfo()
cluster.cluster_name = clustername
- cluster.master_ip = ip
+ cluster.master_ip = new_ip
self.cfg.Update(cluster, feedback_fn)
# update the known hosts file
ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
node_list = self.cfg.GetOnlineNodeList()
try:
- node_list.remove(master)
+ node_list.remove(master_params.name)
except ValueError:
pass
_UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
finally:
- result = self.rpc.call_node_start_master(master, False, False)
+ master_params.ip = new_ip
+ result = self.rpc.call_node_activate_master_ip(master_params.name,
+ master_params, ems)
msg = result.fail_msg
if msg:
self.LogWarning("Could not re-enable the master role on"
return clustername
+def _ValidateNetmask(cfg, netmask):
+ """Checks if a netmask is valid.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: The cluster configuration
+ @type netmask: int
+ @param netmask: the netmask to be verified
+ @raise errors.OpPrereqError: if the validation fails
+
+ """
+ ip_family = cfg.GetPrimaryIPFamily()
+ try:
+ ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
+ except errors.ProgrammerError:
+ raise errors.OpPrereqError("Invalid primary ip family: %s." %
+ ip_family)
+ if not ipcls.ValidateNetmask(netmask):
+ raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
+ (netmask))
+
+
class LUClusterSetParams(LogicalUnit):
"""Change the parameters of the cluster.
if self.op.remove_uids:
uidpool.CheckUidPool(self.op.remove_uids)
+ if self.op.master_netmask is not None:
+ _ValidateNetmask(self.cfg, self.op.master_netmask)
+
+ if self.op.diskparams:
+ for dt_params in self.op.diskparams.values():
+ utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
+ try:
+ utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
+
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
# all nodes to be modified.
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
+ locking.LEVEL_INSTANCE: locking.ALL_SET,
+ locking.LEVEL_NODEGROUP: locking.ALL_SET,
+ }
+ self.share_locks = {
+ locking.LEVEL_NODE: 1,
+ locking.LEVEL_INSTANCE: 1,
+ locking.LEVEL_NODEGROUP: 1,
}
- self.share_locks[locking.LEVEL_NODE] = 1
def BuildHooksEnv(self):
"""Build hooks env.
self.cluster = cluster = self.cfg.GetClusterInfo()
# validate params changes
if self.op.beparams:
+ objects.UpgradeBeParams(self.op.beparams)
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
self.new_ndparams["oob_program"] = \
constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
+ if self.op.hv_state:
+ new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+ self.cluster.hv_state_static)
+ self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
+ for hv, values in new_hv_state.items())
+
+ if self.op.disk_state:
+ new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state,
+ self.cluster.disk_state_static)
+ self.new_disk_state = \
+ dict((storage, dict((name, cluster.SimpleFillDiskState(values))
+ for name, values in svalues.items()))
+ for storage, svalues in new_disk_state.items())
+
+ if self.op.ipolicy:
+ self.new_ipolicy = _GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
+ group_policy=False)
+
+ all_instances = self.cfg.GetAllInstancesInfo().values()
+ violations = set()
+ for group in self.cfg.GetAllNodeGroupsInfo().values():
+ instances = frozenset([inst for inst in all_instances
+ if compat.any(node in group.members
+ for node in inst.all_nodes)])
+ new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
+ new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
+ group),
+ new_ipolicy, instances)
+ if new:
+ violations.update(new)
+
+ if violations:
+ self.LogWarning("After the ipolicy change the following instances"
+ " violate them: %s",
+ utils.CommaJoin(utils.NiceSort(violations)))
+
if self.op.nicparams:
utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
else:
self.new_hvparams[hv_name].update(hv_dict)
+ # disk template parameters
+ self.new_diskparams = objects.FillDict(cluster.diskparams, {})
+ if self.op.diskparams:
+ for dt_name, dt_params in self.op.diskparams.items():
+ if dt_name not in self.op.diskparams:
+ self.new_diskparams[dt_name] = dt_params
+ else:
+ self.new_diskparams[dt_name].update(dt_params)
+
# os hypervisor parameters
self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
if self.op.os_hvp:
self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
if self.op.nicparams:
self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
+ if self.op.ipolicy:
+ self.cluster.ipolicy = self.new_ipolicy
if self.op.osparams:
self.cluster.osparams = self.new_osp
if self.op.ndparams:
self.cluster.ndparams = self.new_ndparams
+ if self.op.diskparams:
+ self.cluster.diskparams = self.new_diskparams
+ if self.op.hv_state:
+ self.cluster.hv_state_static = self.new_hv_state
+ if self.op.disk_state:
+ self.cluster.disk_state_static = self.new_disk_state
if self.op.candidate_pool_size is not None:
self.cluster.candidate_pool_size = self.op.candidate_pool_size
_AdjustCandidatePool(self, [])
if self.op.maintain_node_health is not None:
+ if self.op.maintain_node_health and not constants.ENABLE_CONFD:
+ feedback_fn("Note: CONFD was disabled at build time, node health"
+ " maintenance is not useful (still enabling it)")
self.cluster.maintain_node_health = self.op.maintain_node_health
if self.op.prealloc_wipe_disks is not None:
if self.op.reserved_lvs is not None:
self.cluster.reserved_lvs = self.op.reserved_lvs
+ if self.op.use_external_mip_script is not None:
+ self.cluster.use_external_mip_script = self.op.use_external_mip_script
+
def helper_os(aname, mods, desc):
desc += " OS list"
lst = getattr(self.cluster, aname)
helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
if self.op.master_netdev:
- master = self.cfg.GetMasterNode()
+ master_params = self.cfg.GetMasterNetworkParameters()
+ ems = self.cfg.GetUseExternalMipScript()
feedback_fn("Shutting down master ip on the current netdev (%s)" %
self.cluster.master_netdev)
- result = self.rpc.call_node_stop_master(master, False)
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params, ems)
result.Raise("Could not disable the master ip")
feedback_fn("Changing master_netdev from %s to %s" %
- (self.cluster.master_netdev, self.op.master_netdev))
+ (master_params.netdev, self.op.master_netdev))
self.cluster.master_netdev = self.op.master_netdev
+ if self.op.master_netmask:
+ master_params = self.cfg.GetMasterNetworkParameters()
+ feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
+ result = self.rpc.call_node_change_master_netmask(master_params.name,
+ master_params.netmask,
+ self.op.master_netmask,
+ master_params.ip,
+ master_params.netdev)
+ if result.fail_msg:
+ msg = "Could not change the master IP netmask: %s" % result.fail_msg
+ feedback_fn(msg)
+
+ self.cluster.master_netmask = self.op.master_netmask
+
self.cfg.Update(self.cluster, feedback_fn)
if self.op.master_netdev:
+ master_params = self.cfg.GetMasterNetworkParameters()
feedback_fn("Starting the master ip on the new master netdev (%s)" %
self.op.master_netdev)
- result = self.rpc.call_node_start_master(master, False, False)
+ ems = self.cfg.GetUseExternalMipScript()
+ result = self.rpc.call_node_activate_master_ip(master_params.name,
+ master_params, ems)
if result.fail_msg:
self.LogWarning("Could not re-enable the master ip on"
" the master, please restart manually: %s",
constants.SSH_KNOWN_HOSTS_FILE,
constants.CONFD_HMAC_KEY,
constants.CLUSTER_DOMAIN_SECRET_FILE,
+ constants.SPICE_CERT_FILE,
+ constants.SPICE_CACERT_FILE,
+ constants.RAPI_USERS_FILE,
])
if not redist:
if cluster.modify_etc_hosts:
files_all.add(constants.ETC_HOSTS)
- # Files which must either exist on all nodes or on none
- files_all_opt = set([
+ if cluster.use_external_mip_script:
+ files_all.add(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+
+ # Files which are optional, these must:
+ # - be present in one other category as well
+ # - either exist or not exist on all nodes of that category (mc, vm all)
+ files_opt = set([
constants.RAPI_USERS_FILE,
])
# Files which should only be on master candidates
files_mc = set()
+
if not redist:
files_mc.add(constants.CLUSTER_CONF_FILE)
# Files which should only be on VM-capable nodes
files_vm = set(filename
for hv_name in cluster.enabled_hypervisors
- for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
+ for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0])
+
+ files_opt |= set(filename
+ for hv_name in cluster.enabled_hypervisors
+ for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1])
- # Filenames must be unique
- assert (len(files_all | files_all_opt | files_mc | files_vm) ==
- sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+ # Filenames in each category must be unique
+ all_files_set = files_all | files_mc | files_vm
+ assert (len(all_files_set) ==
+ sum(map(len, [files_all, files_mc, files_vm]))), \
"Found file listed in more than one file list"
- return (files_all, files_all_opt, files_mc, files_vm)
+ # Optional files must be present in one other category
+ assert all_files_set.issuperset(files_opt), \
+ "Optional file not in a different required list"
+
+ return (files_all, files_opt, files_mc, files_vm)
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
online_nodes = lu.cfg.GetOnlineNodeList()
- vm_nodes = lu.cfg.GetVmCapableNodeList()
+ online_set = frozenset(online_nodes)
+ vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
if additional_nodes is not None:
online_nodes.extend(additional_nodes)
nodelist.remove(master_info.name)
# Gather file lists
- (files_all, files_all_opt, files_mc, files_vm) = \
+ (files_all, _, files_mc, files_vm) = \
_ComputeAncillaryFiles(cluster, True)
# Never re-distribute configuration file from here
filemap = [
(online_nodes, files_all),
- (online_nodes, files_all_opt),
(vm_nodes, files_vm),
]
_RedistributeAncillaryFiles(self)
+class LUClusterActivateMasterIp(NoHooksLU):
+ """Activate the master IP on the master node.
+
+ """
+ def Exec(self, feedback_fn):
+ """Activate the master IP.
+
+ """
+ master_params = self.cfg.GetMasterNetworkParameters()
+ ems = self.cfg.GetUseExternalMipScript()
+ result = self.rpc.call_node_activate_master_ip(master_params.name,
+ master_params, ems)
+ result.Raise("Could not activate the master IP")
+
+
+class LUClusterDeactivateMasterIp(NoHooksLU):
+ """Deactivate the master IP on the master node.
+
+ """
+ def Exec(self, feedback_fn):
+ """Deactivate the master IP.
+
+ """
+ master_params = self.cfg.GetMasterNetworkParameters()
+ ems = self.cfg.GetUseExternalMipScript()
+ result = self.rpc.call_node_deactivate_master_ip(master_params.name,
+ master_params, ems)
+ result.Raise("Could not deactivate the master IP")
+
+
def _WaitForSync(lu, instance, disks=None, oneshot=False):
"""Sleep and poll for an instance's disk to sync.
max_time = 0
done = True
cumul_degraded = False
- rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
+ rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
msg = rstats.fail_msg
if msg:
lu.LogWarning("Can't get any data from node %s: %s", node, msg)
return not cumul_degraded
-def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
+def _BlockdevFind(lu, node, dev, instance):
+ """Wrapper around call_blockdev_find to annotate diskparams.
+
+ @param lu: A reference to the lu object
+ @param node: The node to call out
+ @param dev: The device to find
+ @param instance: The instance object the device belongs to
+ @returns The result of the rpc call
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return lu.rpc.call_blockdev_find(node, disk)
+
+
+def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
+ """Wrapper around L{_CheckDiskConsistencyInner}.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
+ ldisk=ldisk)
+
+
+def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
+ ldisk=False):
"""Check that mirrors are not degraded.
+ @attention: The device has to be annotated already.
+
The ldisk parameter, if True, will change the test from the
is_degraded attribute (which represents overall non-ok status for
the device(s)) to the ldisk (representing the local storage status).
if dev.children:
for child in dev.children:
- result = result and _CheckDiskConsistency(lu, child, node, on_primary)
+ result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
+ on_primary)
return result
"""Logical unit for OOB handling.
"""
- REG_BGL = False
+ REQ_BGL = False
_SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
def ExpandNames(self):
return self.oq.OldStyleQuery(self)
-class LUNodeRemove(LogicalUnit):
- """Logical unit for removing a node.
+class _ExtStorageQuery(_QueryBase):
+ FIELDS = query.EXTSTORAGE_FIELDS
- """
- HPATH = "node-remove"
- HTYPE = constants.HTYPE_NODE
+ def ExpandNames(self, lu):
+ # Lock all nodes in shared mode
+ # Temporary removal of locks, should be reverted later
+ # TODO: reintroduce locks when they are lighter-weight
+ lu.needed_locks = {}
+ #self.share_locks[locking.LEVEL_NODE] = 1
+ #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- def BuildHooksEnv(self):
- """Build hooks env.
+ # The following variables interact with _QueryBase._GetNames
+ if self.names:
+ self.wanted = self.names
+ else:
+ self.wanted = locking.ALL_SET
- This doesn't run on the target node in the pre phase as a failed
- node would then be impossible to remove.
+ self.do_locking = self.use_locking
- """
- return {
- "OP_TARGET": self.op.node_name,
- "NODE_NAME": self.op.node_name,
- }
+ def DeclareLocks(self, lu, level):
+ pass
- def BuildHooksNodes(self):
- """Build hooks nodes.
+ @staticmethod
+ def _DiagnoseByProvider(rlist):
+ """Remaps a per-node return list into an a per-provider per-node dictionary
- """
- all_nodes = self.cfg.GetNodeList()
- try:
- all_nodes.remove(self.op.node_name)
- except ValueError:
- logging.warning("Node '%s', which is about to be removed, was not found"
- " in the list of all nodes", self.op.node_name)
- return (all_nodes, all_nodes)
+ @param rlist: a map with node names as keys and ExtStorage objects as values
+
+ @rtype: dict
+ @return: a dictionary with extstorage providers as keys and as
+ value another map, with nodes as keys and tuples of
+ (path, status, diagnose, parameters) as values, eg::
+
+ {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
+ "node2": [(/srv/..., False, "missing file")]
+ "node3": [(/srv/..., True, "", [])]
+ }
+
+ """
+ all_es = {}
+ # we build here the list of nodes that didn't fail the RPC (at RPC
+ # level), so that nodes with a non-responding node daemon don't
+ # make all OSes invalid
+ good_nodes = [node_name for node_name in rlist
+ if not rlist[node_name].fail_msg]
+ for node_name, nr in rlist.items():
+ if nr.fail_msg or not nr.payload:
+ continue
+ for (name, path, status, diagnose, params) in nr.payload:
+ if name not in all_es:
+ # build a list of nodes for this os containing empty lists
+ # for each node in node_list
+ all_es[name] = {}
+ for nname in good_nodes:
+ all_es[name][nname] = []
+ # convert params from [name, help] to (name, help)
+ params = [tuple(v) for v in params]
+ all_es[name][node_name].append((path, status, diagnose, params))
+ return all_es
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
+
+ """
+ # Locking is not used
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ valid_nodes = [node.name
+ for node in lu.cfg.GetAllNodesInfo().values()
+ if not node.offline and node.vm_capable]
+ pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
+
+ data = {}
+
+ nodegroup_list = lu.cfg.GetNodeGroupList()
+
+ for (es_name, es_data) in pol.items():
+ # For every provider compute the nodegroup validity.
+ # To do this we need to check the validity of each node in es_data
+ # and then construct the corresponding nodegroup dict:
+ # { nodegroup1: status
+ # nodegroup2: status
+ # }
+ ndgrp_data = {}
+ for nodegroup in nodegroup_list:
+ ndgrp = lu.cfg.GetNodeGroup(nodegroup)
+
+ nodegroup_nodes = ndgrp.members
+ nodegroup_name = ndgrp.name
+ node_statuses = []
+
+ for node in nodegroup_nodes:
+ if node in valid_nodes:
+ if es_data[node] != []:
+ node_status = es_data[node][0][1]
+ node_statuses.append(node_status)
+ else:
+ node_statuses.append(False)
+
+ if False in node_statuses:
+ ndgrp_data[nodegroup_name] = False
+ else:
+ ndgrp_data[nodegroup_name] = True
+
+ # Compute the provider's parameters
+ parameters = set()
+ for idx, esl in enumerate(es_data.values()):
+ valid = bool(esl and esl[0][1])
+ if not valid:
+ break
+
+ node_params = esl[0][3]
+ if idx == 0:
+ # First entry
+ parameters.update(node_params)
+ else:
+ # Filter out inconsistent values
+ parameters.intersection_update(node_params)
+
+ params = list(parameters)
+
+ # Now fill all the info for this provider
+ info = query.ExtStorageInfo(name=es_name, node_status=es_data,
+ nodegroup_status=ndgrp_data,
+ parameters=params)
+
+ data[es_name] = info
+
+ # Prepare data in requested order
+ return [data[name] for name in self._GetNames(lu, pol.keys(), None)
+ if name in data]
+
+
+class LUExtStorageDiagnose(NoHooksLU):
+ """Logical unit for ExtStorage diagnose/query.
+
+ """
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
+ self.op.output_fields, False)
+
+ def ExpandNames(self):
+ self.eq.ExpandNames(self)
+
+ def Exec(self, feedback_fn):
+ return self.eq.OldStyleQuery(self)
+
+
+class LUNodeRemove(LogicalUnit):
+ """Logical unit for removing a node.
+
+ """
+ HPATH = "node-remove"
+ HTYPE = constants.HTYPE_NODE
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ return {
+ "OP_TARGET": self.op.node_name,
+ "NODE_NAME": self.op.node_name,
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ This doesn't run on the target node in the pre phase as a failed
+ node would then be impossible to remove.
+
+ """
+ all_nodes = self.cfg.GetNodeList()
+ try:
+ all_nodes.remove(self.op.node_name)
+ except ValueError:
+ pass
+ return (all_nodes, all_nodes)
def CheckPrereq(self):
"""Check prerequisites.
modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+ "Not owning BGL"
+
# Promote nodes to master candidate as needed
_AdjustCandidatePool(self, exceptions=[node.name])
self.context.RemoveNode(node.name)
# filter out non-vm_capable nodes
toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
- node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
- lu.cfg.GetHypervisorType())
- live_data = dict((name, nresult.payload)
+ node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
+ [lu.cfg.GetHypervisorType()])
+ live_data = dict((name, _MakeLegacyNodeInfo(nresult.payload))
for (name, nresult) in node_data.items()
if not nresult.fail_msg and nresult.payload)
else:
def ExpandNames(self):
self.nq.ExpandNames(self)
+ def DeclareLocks(self, level):
+ self.nq.DeclareLocks(self, level)
+
def Exec(self, feedback_fn):
return self.nq.OldStyleQuery(self)
selected=self.op.output_fields)
def ExpandNames(self):
+ self.share_locks = _ShareAll()
self.needed_locks = {}
- self.share_locks[locking.LEVEL_NODE] = 1
+
if not self.op.nodes:
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
else:
selected=self.op.output_fields)
def ExpandNames(self):
+ self.share_locks = _ShareAll()
self.needed_locks = {}
- self.share_locks[locking.LEVEL_NODE] = 1
if self.op.nodes:
self.needed_locks[locking.LEVEL_NODE] = \
def CheckArguments(self):
qcls = _GetQueryImplementation(self.op.what)
- self.impl = qcls(self.op.filter, self.op.fields, self.op.use_locking)
+ self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
def ExpandNames(self):
self.impl.ExpandNames(self)
if self.op.ndparams:
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
- # check connectivity
- result = self.rpc.call_version([self.new_node.name])[self.new_node.name]
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
+
+ if self.op.disk_state:
+ self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+
+ # TODO: If we need to have multiple DnsOnlyRunner we probably should make
+ # it a property on the base class.
+ result = rpc.DnsOnlyRunner().call_version([node])[node]
result.Raise("Can't get version information from node %s" % node)
if constants.PROTOCOL_VERSION == result.payload:
logging.info("Communication to node %s fine, sw version %s match",
new_node = self.new_node
node = new_node.name
+ assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
+ "Not owning BGL"
+
# We adding a new node so we assume it's powered
new_node.powered = True
else:
new_node.ndparams = {}
+ if self.op.hv_state:
+ new_node.hv_state_static = self.new_hv_state
+
+ if self.op.disk_state:
+ new_node.disk_state_static = self.new_disk_state
+
# Add node to our /etc/hosts, and add key to known_hosts
if self.cfg.GetClusterInfo().modify_etc_hosts:
master_node = self.cfg.GetMasterNode()
self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
self.op.master_capable, self.op.vm_capable,
- self.op.secondary_ip, self.op.ndparams]
+ self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
+ self.op.disk_state]
if all_mods.count(None) == len(all_mods):
raise errors.OpPrereqError("Please pass at least one modification",
errors.ECODE_INVAL)
self.lock_all = self.op.auto_promote and self.might_demote
self.lock_instances = self.op.secondary_ip is not None
+ def _InstanceFilter(self, instance):
+ """Filter for getting affected instances.
+
+ """
+ return (instance.disk_template in constants.DTS_INT_MIRROR and
+ self.op.node_name in instance.all_nodes)
+
def ExpandNames(self):
if self.lock_all:
self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
else:
self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
- if self.lock_instances:
- self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+ # Since modifying a node can have severe effects on currently running
+ # operations the resource lock is at least acquired in shared mode
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE]
- def DeclareLocks(self, level):
- # If we have locked all instances, before waiting to lock nodes, release
- # all the ones living on nodes unrelated to the current operation.
- if level == locking.LEVEL_NODE and self.lock_instances:
- self.affected_instances = []
- if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
- instances_keep = []
-
- # Build list of instances to release
- locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
- for instance_name, instance in self.cfg.GetMultiInstanceInfo(locked_i):
- if (instance.disk_template in constants.DTS_INT_MIRROR and
- self.op.node_name in instance.all_nodes):
- instances_keep.append(instance_name)
- self.affected_instances.append(instance)
-
- _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
-
- assert (set(self.owned_locks(locking.LEVEL_INSTANCE)) ==
- set(instances_keep))
+ # Get node resource and instance locks in shared mode; they are not used
+ # for anything but read-only access
+ self.share_locks[locking.LEVEL_NODE_RES] = 1
+ self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+ if self.lock_instances:
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
def BuildHooksEnv(self):
"""Build hooks env.
"""
node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
+ if self.lock_instances:
+ affected_instances = \
+ self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
+
+ # Verify instance locks
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+ wanted_instances = frozenset(affected_instances.keys())
+ if wanted_instances - owned_instances:
+ raise errors.OpPrereqError("Instances affected by changing node %s's"
+ " secondary IP address have changed since"
+ " locks were acquired, wanted '%s', have"
+ " '%s'; retry the operation" %
+ (self.op.node_name,
+ utils.CommaJoin(wanted_instances),
+ utils.CommaJoin(owned_instances)),
+ errors.ECODE_STATE)
+ else:
+ affected_instances = None
+
if (self.op.master_candidate is not None or
self.op.drained is not None or
self.op.offline is not None):
if mc_remaining < mc_should:
raise errors.OpPrereqError("Not enough master candidates, please"
" pass auto promote option to allow"
- " promotion", errors.ECODE_STATE)
+ " promotion (--auto-promote or RAPI"
+ " auto_promote=True)", errors.ECODE_STATE)
self.old_flags = old_flags = (node.master_candidate,
node.drained, node.offline)
raise errors.OpPrereqError("Cannot change the secondary ip on a single"
" homed cluster", errors.ECODE_INVAL)
+ assert not (frozenset(affected_instances) -
+ self.owned_locks(locking.LEVEL_INSTANCE))
+
if node.offline:
- if self.affected_instances:
- raise errors.OpPrereqError("Cannot change secondary ip: offline"
- " node has instances (%s) configured"
- " to use it" % self.affected_instances)
+ if affected_instances:
+ raise errors.OpPrereqError("Cannot change secondary IP address:"
+ " offline node has instances (%s)"
+ " configured to use it" %
+ utils.CommaJoin(affected_instances.keys()))
else:
# On online nodes, check that no instances are running, and that
# the node has the new ip and we can reach it.
- for instance in self.affected_instances:
- _CheckInstanceDown(self, instance, "cannot change secondary ip")
+ for instance in affected_instances.values():
+ _CheckInstanceState(self, instance, INSTANCE_DOWN,
+ msg="cannot change secondary ip")
_CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
if master.name != node.name:
utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = new_ndparams
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+ self.node.hv_state_static)
+
+ if self.op.disk_state:
+ self.new_disk_state = \
+ _MergeAndVerifyDiskState(self.op.disk_state,
+ self.node.disk_state_static)
+
def Exec(self, feedback_fn):
"""Modifies a node.
if self.op.powered is not None:
node.powered = self.op.powered
+ if self.op.hv_state:
+ node.hv_state_static = self.new_hv_state
+
+ if self.op.disk_state:
+ node.disk_state_static = self.new_disk_state
+
for attr in ["master_capable", "vm_capable"]:
val = getattr(self.op, attr)
if val is not None:
"config_version": constants.CONFIG_VERSION,
"os_api_version": max(constants.OS_API_VERSIONS),
"export_version": constants.EXPORT_VERSION,
- "architecture": (platform.architecture()[0], platform.machine()),
+ "architecture": runtime.GetArchInfo(),
"name": cluster.cluster_name,
"master": cluster.master_node,
- "default_hypervisor": cluster.enabled_hypervisors[0],
+ "default_hypervisor": cluster.primary_hypervisor,
"enabled_hypervisors": cluster.enabled_hypervisors,
"hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
for hypervisor_name in cluster.enabled_hypervisors]),
"os_hvp": os_hvp,
"beparams": cluster.beparams,
"osparams": cluster.osparams,
+ "ipolicy": cluster.ipolicy,
"nicparams": cluster.nicparams,
"ndparams": cluster.ndparams,
+ "diskparams": cluster.diskparams,
"candidate_pool_size": cluster.candidate_pool_size,
"master_netdev": cluster.master_netdev,
+ "master_netmask": cluster.master_netmask,
+ "use_external_mip_script": cluster.use_external_mip_script,
"volume_group_name": cluster.volume_group_name,
"drbd_usermode_helper": cluster.drbd_usermode_helper,
"file_storage_dir": cluster.file_storage_dir,
"""
REQ_BGL = False
- _FIELDS_DYNAMIC = utils.FieldSet()
- _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
- "watcher_pause", "volume_group_name")
def CheckArguments(self):
- _CheckOutputFields(static=self._FIELDS_STATIC,
- dynamic=self._FIELDS_DYNAMIC,
- selected=self.op.output_fields)
+ self.cq = _ClusterQuery(None, self.op.output_fields, False)
def ExpandNames(self):
- self.needed_locks = {}
+ self.cq.ExpandNames(self)
+
+ def DeclareLocks(self, level):
+ self.cq.DeclareLocks(self, level)
def Exec(self, feedback_fn):
- """Dump a representation of the cluster config to the standard output.
-
- """
- values = []
- for field in self.op.output_fields:
- if field == "cluster_name":
- entry = self.cfg.GetClusterName()
- elif field == "master_node":
- entry = self.cfg.GetMasterNode()
- elif field == "drain_flag":
- entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
- elif field == "watcher_pause":
- entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
- elif field == "volume_group_name":
- entry = self.cfg.GetVGName()
- else:
- raise errors.ParameterError(field)
- values.append(entry)
- return values
+ result = self.cq.OldStyleQuery(self)
+
+ assert len(result) == 1
+
+ return result[0]
+
+
+class _ClusterQuery(_QueryBase):
+ FIELDS = query.CLUSTER_FIELDS
+
+ #: Do not sort (there is only one item)
+ SORT_FIELD = None
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ # The following variables interact with _QueryBase._GetNames
+ self.wanted = locking.ALL_SET
+ self.do_locking = self.use_locking
+
+ if self.do_locking:
+ raise errors.OpPrereqError("Can not use locking for cluster queries",
+ errors.ECODE_INVAL)
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
+
+ """
+ # Locking is not used
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ if query.CQ_CONFIG in self.requested_data:
+ cluster = lu.cfg.GetClusterInfo()
+ else:
+ cluster = NotImplemented
+
+ if query.CQ_QUEUE_DRAINED in self.requested_data:
+ drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+ else:
+ drain_flag = NotImplemented
+
+ if query.CQ_WATCHER_PAUSE in self.requested_data:
+ watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+ else:
+ watcher_pause = NotImplemented
+
+ return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
class LUInstanceActivateDisks(NoHooksLU):
def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
- ignore_size=False):
+ ignore_size=False, check=True):
"""Prepare the block devices for an instance.
This sets up the block devices on all nodes.
device_info = []
disks_ok = True
iname = instance.name
- disks = _ExpandCheckDisks(instance, disks)
+ if check:
+ disks = _ExpandCheckDisks(instance, disks)
# With the two passes mechanism we try to reduce the window of
# opportunity for the race condition of switching DRBD to primary
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ False, idx)
msg = result.fail_msg
if msg:
+ is_offline_secondary = (node in instance.secondary_nodes and
+ result.offline)
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=False, pass=1): %s",
inst_disk.iv_name, node, msg)
- if not ignore_secondaries:
+ if not (ignore_secondaries or is_offline_secondary):
disks_ok = False
# FIXME: race condition on drbd migration to primary
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ True, idx)
msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
_ShutdownInstanceDisks.
"""
- _CheckInstanceDown(lu, instance, "cannot shutdown disks")
+ _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
_ShutdownInstanceDisks(lu, instance, disks=disks)
for disk in disks:
for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(top_disk, node)
- result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+ result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
msg = result.fail_msg
if msg:
lu.LogWarning("Could not shutdown block device %s on node %s: %s",
@param requested: the amount of memory in MiB to check for
@type hypervisor_name: C{str}
@param hypervisor_name: the hypervisor to ask for memory stats
+ @rtype: integer
+ @return: node current free memory
@raise errors.OpPrereqError: if the node doesn't have enough memory, or
we cannot check the node
"""
- nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name)
+ nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name])
nodeinfo[node].Raise("Can't get data from node %s" % node,
prereq=True, ecode=errors.ECODE_ENVIRON)
- free_mem = nodeinfo[node].payload.get("memory_free", None)
+ (_, _, (hv_info, )) = nodeinfo[node].payload
+
+ free_mem = hv_info.get("memory_free", None)
if not isinstance(free_mem, int):
raise errors.OpPrereqError("Can't compute free memory on node %s, result"
" was '%s'" % (node, free_mem),
" needed %s MiB, available %s MiB" %
(node, reason, requested, free_mem),
errors.ECODE_NORES)
+ return free_mem
def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
or we cannot check the node
"""
- nodeinfo = lu.rpc.call_node_info(nodenames, vg, None)
+ nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None)
for node in nodenames:
info = nodeinfo[node]
info.Raise("Cannot get current information from node %s" % node,
prereq=True, ecode=errors.ECODE_ENVIRON)
- vg_free = info.payload.get("vg_free", None)
+ (_, (vg_info, ), _) = info.payload
+ vg_free = vg_info.get("vg_free", None)
if not isinstance(vg_free, int):
raise errors.OpPrereqError("Can't compute free disk space on node"
" %s for vg %s, result was '%s'" %
errors.ECODE_NORES)
+def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
+ """Checks if nodes have enough physical CPUs
+
+ This function checks if all given nodes have the needed number of
+ physical CPUs. In case any node has less CPUs or we cannot get the
+ information from the node, this function raises an OpPrereqError
+ exception.
+
+ @type lu: C{LogicalUnit}
+ @param lu: a logical unit from which we get configuration data
+ @type nodenames: C{list}
+ @param nodenames: the list of node names to check
+ @type requested: C{int}
+ @param requested: the minimum acceptable number of physical CPUs
+ @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
+ or we cannot check the node
+
+ """
+ nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name])
+ for node in nodenames:
+ info = nodeinfo[node]
+ info.Raise("Cannot get current information from node %s" % node,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ (_, _, (hv_info, )) = info.payload
+ num_cpus = hv_info.get("cpu_total", None)
+ if not isinstance(num_cpus, int):
+ raise errors.OpPrereqError("Can't compute the number of physical CPUs"
+ " on node %s, result was '%s'" %
+ (node, num_cpus), errors.ECODE_ENVIRON)
+ if requested > num_cpus:
+ raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
+ "required" % (node, num_cpus, requested),
+ errors.ECODE_NORES)
+
+
class LUInstanceStartup(LogicalUnit):
"""Starts an instance.
# extra beparams
if self.op.beparams:
# fill the beparams dict
+ objects.UpgradeBeParams(self.op.beparams)
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
def ExpandNames(self):
self._ExpandAndLockInstance()
+ self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE_RES:
+ self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
def BuildHooksEnv(self):
"""Build hooks env.
hv_type.CheckParameterSyntax(filled_hvp)
_CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
+ _CheckInstanceState(self, instance, INSTANCE_ONLINE)
+
self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
if self.primary_offline and self.op.ignore_offline_nodes:
_CheckNodeOnline(self, instance.primary_node)
bep = self.cfg.GetClusterInfo().FillBE(instance)
+ bep.update(self.op.beparams)
# check bridges existence
_CheckInstanceBridgesExist(self, instance)
if not remote_info.payload: # not running already
_CheckNodeFreeMemory(self, instance.primary_node,
"starting instance %s" % instance.name,
- bep[constants.BE_MEMORY], instance.hypervisor)
+ bep[constants.BE_MINMEM], instance.hypervisor)
def Exec(self, feedback_fn):
"""Start the instance.
_StartInstanceDisks(self, instance, force)
- result = self.rpc.call_instance_start(node_current, instance,
- self.op.hvparams, self.op.beparams,
- self.op.startup_paused)
+ result = \
+ self.rpc.call_instance_start(node_current,
+ (instance, self.op.hvparams,
+ self.op.beparams),
+ self.op.startup_paused)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
-
+ _CheckInstanceState(self, instance, INSTANCE_ONLINE)
_CheckNodeOnline(self, instance.primary_node)
# check bridges existence
self.LogInfo("Instance %s was already stopped, starting now",
instance.name)
_StartInstanceDisks(self, instance, ignore_secondaries)
- result = self.rpc.call_instance_start(node_current, instance,
- None, None, False)
+ result = self.rpc.call_instance_start(node_current,
+ (instance, None, None), False)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
+ _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
+
self.primary_offline = \
self.cfg.GetNodeInfo(self.instance.primary_node).offline
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, instance.primary_node, "Instance primary node"
" offline, cannot reinstall")
- for node in instance.secondary_nodes:
- _CheckNodeOnline(self, node, "Instance secondary node offline,"
- " cannot reinstall")
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name,
errors.ECODE_INVAL)
- _CheckInstanceDown(self, instance, "cannot reinstall")
+ _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
if self.op.os_type is not None:
# OS verification
try:
feedback_fn("Running the instance OS create scripts...")
# FIXME: pass debug option from opcode to backend
- result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
- self.op.debug_level,
- osparams=self.os_inst)
+ result = self.rpc.call_instance_os_add(inst.primary_node,
+ (inst, self.os_inst), True,
+ self.op.debug_level)
result.Raise("Could not install OS for instance %s on node %s" %
(inst.name, inst.primary_node))
finally:
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
+ _MODIFYABLE = frozenset([
+ constants.IDISK_SIZE,
+ constants.IDISK_MODE,
+ ])
+
+ # New or changed disk parameters may have different semantics
+ assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
+ constants.IDISK_ADOPT,
+
+ # TODO: Implement support changing VG while recreating
+ constants.IDISK_VG,
+ constants.IDISK_METAVG,
+ constants.IDISK_PROVIDER,
+ ]))
+
def CheckArguments(self):
- # normalise the disk list
- self.op.disks = sorted(frozenset(self.op.disks))
+ if self.op.disks and ht.TPositiveInt(self.op.disks[0]):
+ # Normalize and convert deprecated list of disk indices
+ self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
+
+ duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
+ if duplicates:
+ raise errors.OpPrereqError("Some disks have been specified more than"
+ " once: %s" % utils.CommaJoin(duplicates),
+ errors.ECODE_INVAL)
+
+ for (idx, params) in self.op.disks:
+ utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
+ unsupported = frozenset(params.keys()) - self._MODIFYABLE
+ if unsupported:
+ raise errors.OpPrereqError("Parameters for disk %s try to change"
+ " unmodifyable parameter(s): %s" %
+ (idx, utils.CommaJoin(unsupported)),
+ errors.ECODE_INVAL)
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
else:
self.needed_locks[locking.LEVEL_NODE] = []
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
# otherwise we need to lock all nodes for disk re-creation
primary_only = bool(self.op.nodes)
self._LockInstancesNodes(primary_only=primary_only)
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name, errors.ECODE_INVAL)
+
# if we replace nodes *and* the old primary is offline, we don't
# check
- assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+ assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
+ assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
if not (self.op.nodes and old_pnode.offline):
- _CheckInstanceDown(self, instance, "cannot recreate disks")
+ _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
+ msg="cannot recreate disks")
- if not self.op.disks:
- self.op.disks = range(len(instance.disks))
+ if self.op.disks:
+ self.disks = dict(self.op.disks)
else:
- for idx in self.op.disks:
- if idx >= len(instance.disks):
- raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
- errors.ECODE_INVAL)
- if self.op.disks != range(len(instance.disks)) and self.op.nodes:
+ self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
+
+ maxidx = max(self.disks.keys())
+ if maxidx >= len(instance.disks):
+ raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
+ errors.ECODE_INVAL)
+
+ if (self.op.nodes and
+ sorted(self.disks.keys()) != range(len(instance.disks))):
raise errors.OpPrereqError("Can't recreate disks partially and"
" change the nodes at the same time",
errors.ECODE_INVAL)
+
self.instance = instance
def Exec(self, feedback_fn):
"""
instance = self.instance
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+
to_skip = []
- mods = [] # keeps track of needed logical_id changes
+ mods = [] # keeps track of needed changes
for idx, disk in enumerate(instance.disks):
- if idx not in self.op.disks: # disk idx has not been passed in
+ try:
+ changes = self.disks[idx]
+ except KeyError:
+ # Disk should not be recreated
to_skip.append(idx)
continue
+
# update secondaries for disks, if needed
- if self.op.nodes:
- if disk.dev_type == constants.LD_DRBD8:
- # need to update the nodes and minors
- assert len(self.op.nodes) == 2
- assert len(disk.logical_id) == 6 # otherwise disk internals
- # have changed
- (_, _, old_port, _, _, old_secret) = disk.logical_id
- new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
- new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
- new_minors[0], new_minors[1], old_secret)
- assert len(disk.logical_id) == len(new_id)
- mods.append((idx, new_id))
+ if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
+ # need to update the nodes and minors
+ assert len(self.op.nodes) == 2
+ assert len(disk.logical_id) == 6 # otherwise disk internals
+ # have changed
+ (_, _, old_port, _, _, old_secret) = disk.logical_id
+ new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
+ new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
+ new_minors[0], new_minors[1], old_secret)
+ assert len(disk.logical_id) == len(new_id)
+ else:
+ new_id = None
+
+ mods.append((idx, new_id, changes))
# now that we have passed all asserts above, we can apply the mods
# in a single run (to avoid partial changes)
- for idx, new_id in mods:
- instance.disks[idx].logical_id = new_id
+ for idx, new_id, changes in mods:
+ disk = instance.disks[idx]
+ if new_id is not None:
+ assert disk.dev_type == constants.LD_DRBD8
+ disk.logical_id = new_id
+ if changes:
+ disk.Update(size=changes.get(constants.IDISK_SIZE, None),
+ mode=changes.get(constants.IDISK_MODE, None))
# change primary node, if needed
if self.op.nodes:
instance = self.cfg.GetInstanceInfo(self.op.instance_name)
assert instance is not None
_CheckNodeOnline(self, instance.primary_node)
- _CheckInstanceDown(self, instance, "cannot rename")
+ _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
+ msg="cannot rename")
self.instance = instance
new_name = self.op.new_name
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
" node %s: %s" %
(instance.name, instance.primary_node, msg))
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+ assert not (set(instance.all_nodes) -
+ self.owned_locks(locking.LEVEL_NODE)), \
+ "Not owning correct locks"
+
_RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
"""
logging.info("Removing block devices for instance %s", instance.name)
- if not _RemoveDisks(lu, instance):
+ if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures):
if not ignore_failures:
raise errors.OpExecError("Can't remove instance's disks")
feedback_fn("Warning: can't remove instance's disks")
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
+ self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+
ignore_consistency = self.op.ignore_consistency
shutdown_timeout = self.op.shutdown_timeout
self._migrater = TLMigrateInstance(self, self.op.instance_name,
cleanup=False,
failover=True,
ignore_consistency=ignore_consistency,
- shutdown_timeout=shutdown_timeout)
+ shutdown_timeout=shutdown_timeout,
+ ignore_ipolicy=self.op.ignore_ipolicy)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
del self.recalculate_locks[locking.LEVEL_NODE]
else:
self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
- self._migrater = TLMigrateInstance(self, self.op.instance_name,
- cleanup=self.op.cleanup,
- failover=False,
- fallback=self.op.allow_failover)
+ self.needed_locks[locking.LEVEL_NODE] = []
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+ self._migrater = \
+ TLMigrateInstance(self, self.op.instance_name,
+ cleanup=self.op.cleanup,
+ failover=False,
+ fallback=self.op.allow_failover,
+ allow_runtime_changes=self.op.allow_runtime_changes,
+ ignore_ipolicy=self.op.ignore_ipolicy)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
del self.recalculate_locks[locking.LEVEL_NODE]
else:
self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
"MIGRATE_CLEANUP": self.op.cleanup,
"OLD_PRIMARY": source_node,
"NEW_PRIMARY": target_node,
+ "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
})
if instance.disk_template in constants.DTS_INT_MIRROR:
target_node = _ExpandNodeName(self.cfg, self.op.target_node)
self.op.target_node = target_node
self.needed_locks[locking.LEVEL_NODE] = [target_node]
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes(primary_only=True)
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
_CheckNodeOnline(self, target_node)
_CheckNodeNotDrained(self, target_node)
_CheckNodeVmCapable(self, target_node)
+ ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
+ self.cfg.GetNodeGroup(node.group))
+ _CheckTargetNodeIPolicy(self, ipolicy, instance, node,
+ ignore=self.op.ignore_ipolicy)
- if instance.admin_up:
+ if instance.admin_state == constants.ADMINST_UP:
# check memory requirements on the secondary node
_CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
- instance.name, bep[constants.BE_MEMORY],
+ instance.name, bep[constants.BE_MAXMEM],
instance.hypervisor)
else:
self.LogInfo("Not checking memory on the secondary node as"
self.LogInfo("Shutting down instance %s on source node %s",
instance.name, source_node)
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+
result = self.rpc.call_instance_shutdown(source_node, instance,
self.op.shutdown_timeout)
msg = result.fail_msg
# activate, get path, copy the data over
for idx, disk in enumerate(instance.disks):
self.LogInfo("Copying data for disk %d", idx)
- result = self.rpc.call_blockdev_assemble(target_node, disk,
+ result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
instance.name, True, idx)
if result.fail_msg:
self.LogWarning("Can't assemble newly created disk %d: %s",
errs.append(result.fail_msg)
break
dev_path = result.payload
- result = self.rpc.call_blockdev_export(source_node, disk,
+ result = self.rpc.call_blockdev_export(source_node, (disk, instance),
target_node, dev_path,
cluster_name)
if result.fail_msg:
_RemoveDisks(self, instance, target_node=source_node)
# Only start the instance if it's marked as up
- if instance.admin_up:
+ if instance.admin_state == constants.ADMINST_UP:
self.LogInfo("Starting instance %s on node %s",
instance.name, target_node)
_ShutdownInstanceDisks(self, instance)
raise errors.OpExecError("Can't activate the instance's disks")
- result = self.rpc.call_instance_start(target_node, instance,
- None, None, False)
+ result = self.rpc.call_instance_start(target_node,
+ (instance, None, None), False)
msg = result.fail_msg
if msg:
_ShutdownInstanceDisks(self, instance)
"""
return {
"NODE_NAME": self.op.node_name,
+ "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
}
def BuildHooksNodes(self):
def Exec(self, feedback_fn):
# Prepare jobs for migration instances
+ allow_runtime_changes = self.op.allow_runtime_changes
jobs = [
[opcodes.OpInstanceMigrate(instance_name=inst.name,
mode=self.op.mode,
live=self.op.live,
iallocator=self.op.iallocator,
- target_node=self.op.target_node)]
+ target_node=self.op.target_node,
+ allow_runtime_changes=allow_runtime_changes,
+ ignore_ipolicy=self.op.ignore_ipolicy)]
for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)
]
and target node
@type shutdown_timeout: int
@ivar shutdown_timeout: In case of failover timeout of the shutdown
+ @type ignore_ipolicy: bool
+ @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
"""
+
+ # Constants
+ _MIGRATION_POLL_INTERVAL = 1 # seconds
+ _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
+
def __init__(self, lu, instance_name, cleanup=False,
failover=False, fallback=False,
ignore_consistency=False,
- shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
+ allow_runtime_changes=True,
+ shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT,
+ ignore_ipolicy=False):
"""Initializes this class.
"""
self.fallback = fallback
self.ignore_consistency = ignore_consistency
self.shutdown_timeout = shutdown_timeout
+ self.ignore_ipolicy = ignore_ipolicy
+ self.allow_runtime_changes = allow_runtime_changes
def CheckPrereq(self):
"""Check prerequisites.
instance = self.cfg.GetInstanceInfo(instance_name)
assert instance is not None
self.instance = instance
+ cluster = self.cfg.GetClusterInfo()
- if (not self.cleanup and not instance.admin_up and not self.failover and
- self.fallback):
- self.lu.LogInfo("Instance is marked down, fallback allowed, switching"
- " to failover")
+ if (not self.cleanup and
+ not instance.admin_state == constants.ADMINST_UP and
+ not self.failover and self.fallback):
+ self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
+ " switching to failover")
self.failover = True
if instance.disk_template not in constants.DTS_MIRRORED:
# BuildHooksEnv
self.target_node = self.lu.op.target_node
+ # Check that the target node is correct in terms of instance policy
+ nodeinfo = self.cfg.GetNodeInfo(self.target_node)
+ group_info = self.cfg.GetNodeGroup(nodeinfo.group)
+ ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
+ ignore=self.ignore_ipolicy)
+
# self.target_node is already populated, either directly or by the
# iallocator run
target_node = self.target_node
" node can be passed)" %
(instance.disk_template, text),
errors.ECODE_INVAL)
+ nodeinfo = self.cfg.GetNodeInfo(target_node)
+ group_info = self.cfg.GetNodeGroup(nodeinfo.group)
+ ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo,
+ ignore=self.ignore_ipolicy)
- i_be = self.cfg.GetClusterInfo().FillBE(instance)
+ i_be = cluster.FillBE(instance)
# check memory requirements on the secondary node
- if not self.cleanup and (not self.failover or instance.admin_up):
- _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
- instance.name, i_be[constants.BE_MEMORY],
- instance.hypervisor)
+ if (not self.cleanup and
+ (not self.failover or instance.admin_state == constants.ADMINST_UP)):
+ self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
+ "migrating instance %s" %
+ instance.name,
+ i_be[constants.BE_MINMEM],
+ instance.hypervisor)
else:
self.lu.LogInfo("Not checking memory on the secondary node as"
" instance will not be started")
+ # check if failover must be forced instead of migration
+ if (not self.cleanup and not self.failover and
+ i_be[constants.BE_ALWAYS_FAILOVER]):
+ if self.fallback:
+ self.lu.LogInfo("Instance configured to always failover; fallback"
+ " to failover")
+ self.failover = True
+ else:
+ raise errors.OpPrereqError("This instance has been configured to"
+ " always failover, please allow failover",
+ errors.ECODE_STATE)
+
# check bridge existance
_CheckInstanceBridgesExist(self.lu, instance, node=target_node)
self.lu.op.live = None
elif self.lu.op.mode is None:
# read the default value from the hypervisor
- i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
- skip_globals=False)
+ i_hv = cluster.FillHV(self.instance, skip_globals=False)
self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
# Failover is never live
self.live = False
+ if not (self.failover or self.cleanup):
+ remote_info = self.rpc.call_instance_info(instance.primary_node,
+ instance.name,
+ instance.hypervisor)
+ remote_info.Raise("Error checking instance on node %s" %
+ instance.primary_node)
+ instance_running = bool(remote_info.payload)
+ if instance_running:
+ self.current_mem = int(remote_info.payload["memory"])
+
def _RunAllocator(self):
"""Run the allocator based on input opcode.
"""
+ # FIXME: add a self.ignore_ipolicy option
ial = IAllocator(self.cfg, self.rpc,
mode=constants.IALLOCATOR_MODE_RELOC,
name=self.instance_name,
- # TODO See why hail breaks with a single node below
- relocate_from=[self.instance.primary_node,
- self.instance.primary_node],
+ relocate_from=[self.instance.primary_node],
)
ial.Run(self.lu.op.iallocator)
all_done = True
result = self.rpc.call_drbd_wait_sync(self.all_nodes,
self.nodes_ip,
- self.instance.disks)
+ (self.instance.disks,
+ self.instance))
min_percent = 100
for node, nres in result.items():
nres.Raise("Cannot resync disks on node %s" % node)
msg = "single-master"
self.feedback_fn("* changing disks into %s mode" % msg)
result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
- self.instance.disks,
+ (self.instance.disks, self.instance),
self.instance.name, multimaster)
for node, nres in result.items():
nres.Raise("Cannot change disks config on node %s" % node)
"""
instance = self.instance
target_node = self.target_node
+ source_node = self.source_node
migration_info = self.migration_info
- abort_result = self.rpc.call_finalize_migration(target_node,
- instance,
- migration_info,
- False)
+ abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
+ instance,
+ migration_info,
+ False)
abort_msg = abort_result.fail_msg
if abort_msg:
logging.error("Aborting migration failed on target node %s: %s",
# Don't raise an exception here, as we stil have to try to revert the
# disk status, even if this step failed.
+ abort_result = self.rpc.call_instance_finalize_migration_src(source_node,
+ instance, False, self.live)
+ abort_msg = abort_result.fail_msg
+ if abort_msg:
+ logging.error("Aborting migration failed on source node %s: %s",
+ source_node, abort_msg)
+
def _ExecMigration(self):
"""Migrate an instance.
target_node = self.target_node
source_node = self.source_node
+ # Check for hypervisor version mismatch and warn the user.
+ nodeinfo = self.rpc.call_node_info([source_node, target_node],
+ None, [self.instance.hypervisor])
+ for ninfo in nodeinfo.values():
+ ninfo.Raise("Unable to retrieve node information from node '%s'" %
+ ninfo.node)
+ (_, _, (src_info, )) = nodeinfo[source_node].payload
+ (_, _, (dst_info, )) = nodeinfo[target_node].payload
+
+ if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
+ (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
+ src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
+ dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
+ if src_version != dst_version:
+ self.feedback_fn("* warning: hypervisor version mismatch between"
+ " source (%s) and target (%s) node" %
+ (src_version, dst_version))
+
self.feedback_fn("* checking disk consistency between source and target")
- for dev in instance.disks:
- if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+ for (idx, dev) in enumerate(instance.disks):
+ if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
raise errors.OpExecError("Disk %s is degraded or not fully"
" synchronized on target node,"
- " aborting migration" % dev.iv_name)
+ " aborting migration" % idx)
+
+ if self.current_mem > self.tgt_free_mem:
+ if not self.allow_runtime_changes:
+ raise errors.OpExecError("Memory ballooning not allowed and not enough"
+ " free memory to fit instance %s on target"
+ " node %s (have %dMB, need %dMB)" %
+ (instance.name, target_node,
+ self.tgt_free_mem, self.current_mem))
+ self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
+ rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
+ instance,
+ self.tgt_free_mem)
+ rpcres.Raise("Cannot modify instance runtime memory")
# First get the migration information from the remote node
result = self.rpc.call_migration_info(source_node, instance)
raise errors.OpExecError("Could not migrate instance %s: %s" %
(instance.name, msg))
+ self.feedback_fn("* starting memory transfer")
+ last_feedback = time.time()
+ while True:
+ result = self.rpc.call_instance_get_migration_status(source_node,
+ instance)
+ msg = result.fail_msg
+ ms = result.payload # MigrationStatus instance
+ if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
+ logging.error("Instance migration failed, trying to revert"
+ " disk status: %s", msg)
+ self.feedback_fn("Migration failed, aborting")
+ self._AbortMigration()
+ self._RevertDiskStatus()
+ raise errors.OpExecError("Could not migrate instance %s: %s" %
+ (instance.name, msg))
+
+ if result.payload.status != constants.HV_MIGRATION_ACTIVE:
+ self.feedback_fn("* memory transfer complete")
+ break
+
+ if (utils.TimeoutExpired(last_feedback,
+ self._MIGRATION_FEEDBACK_INTERVAL) and
+ ms.transferred_ram is not None):
+ mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
+ self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
+ last_feedback = time.time()
+
+ time.sleep(self._MIGRATION_POLL_INTERVAL)
+
+ result = self.rpc.call_instance_finalize_migration_src(source_node,
+ instance,
+ True,
+ self.live)
+ msg = result.fail_msg
+ if msg:
+ logging.error("Instance migration succeeded, but finalization failed"
+ " on the source node: %s", msg)
+ raise errors.OpExecError("Could not finalize instance migration: %s" %
+ msg)
+
instance.primary_node = target_node
+
# distribute new instance config to the other nodes
self.cfg.Update(instance, self.feedback_fn)
- result = self.rpc.call_finalize_migration(target_node,
- instance,
- migration_info,
- True)
+ result = self.rpc.call_instance_finalize_migration_dst(target_node,
+ instance,
+ migration_info,
+ True)
msg = result.fail_msg
if msg:
- logging.error("Instance migration succeeded, but finalization failed:"
- " %s", msg)
+ logging.error("Instance migration succeeded, but finalization failed"
+ " on the target node: %s", msg)
raise errors.OpExecError("Could not finalize instance migration: %s" %
msg)
self._GoReconnect(False)
self._WaitUntilSync()
+ # If the instance's disk template is `rbd' or `ext' and there was a
+ # successful migration, unmap the device from the source node.
+ if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
+ disks = _ExpandCheckDisks(instance, instance.disks)
+ self.feedback_fn("* unmapping instance's disks from %s" % source_node)
+ for disk in disks:
+ result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
+ msg = result.fail_msg
+ if msg:
+ logging.error("Migration was successful, but couldn't unmap the"
+ " block device %s on source node %s: %s",
+ disk.iv_name, source_node, msg)
+ logging.error("You need to unmap the device %s manually on %s",
+ disk.iv_name, source_node)
+
self.feedback_fn("* done")
def _ExecFailover(self):
source_node = instance.primary_node
target_node = self.target_node
- if instance.admin_up:
+ if instance.admin_state == constants.ADMINST_UP:
self.feedback_fn("* checking disk consistency between source and target")
- for dev in instance.disks:
+ for (idx, dev) in enumerate(instance.disks):
# for drbd, these are drbd over lvm
- if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+ if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
+ False):
if primary_node.offline:
self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
" target node %s" %
- (primary_node.name, dev.iv_name, target_node))
+ (primary_node.name, idx, target_node))
elif not self.ignore_consistency:
raise errors.OpExecError("Disk %s is degraded on target node,"
- " aborting failover" % dev.iv_name)
+ " aborting failover" % idx)
else:
self.feedback_fn("* not checking disk consistency as instance is not"
" running")
self.cfg.Update(instance, self.feedback_fn)
# Only start the instance if it's marked as up
- if instance.admin_up:
+ if instance.admin_state == constants.ADMINST_UP:
self.feedback_fn("* activating the instance's disks on target node %s" %
target_node)
logging.info("Starting instance %s on node %s",
self.feedback_fn("* starting the instance on the target node %s" %
target_node)
- result = self.rpc.call_instance_start(target_node, instance, None, None,
+ result = self.rpc.call_instance_start(target_node, (instance, None, None),
False)
msg = result.fail_msg
if msg:
return self._ExecMigration()
-def _CreateBlockDev(lu, node, instance, device, force_create,
- info, force_open):
- """Create a tree of block devices on a given node.
+def _CreateBlockDev(lu, node, instance, device, force_create, info,
+ force_open):
+ """Wrapper around L{_CreateBlockDevInner}.
- If this device type has to be created on secondaries, create it and
+ This method annotates the root device first.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
+ return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
+ force_open)
+
+
+def _CreateBlockDevInner(lu, node, instance, device, force_create,
+ info, force_open):
+ """Create a tree of block devices on a given node.
+
+ If this device type has to be created on secondaries, create it and
all its children.
If not, just recurse to children keeping the same 'force' value.
+ @attention: The device has to be annotated already.
+
@param lu: the lu on whose behalf we execute
@param node: the node on which to create the device
@type instance: L{objects.Instance}
if device.children:
for child in device.children:
- _CreateBlockDev(lu, node, instance, child, force_create,
- info, force_open)
+ _CreateBlockDevInner(lu, node, instance, child, force_create,
+ info, force_open)
if not force_create:
return
results.append("%s%s" % (new_id, val))
return results
+def _GetPCIInfo(lu, dev_type):
+
+ if lu.op.hotplug:
+ # case of InstanceCreate()
+ if hasattr(lu, 'hotplug_info'):
+ if lu.hotplug_info is not None:
+ idx = getattr(lu.hotplug_info, dev_type)
+ setattr(lu.hotplug_info, dev_type, idx+1)
+ pci = lu.hotplug_info.pci_pool.pop()
+ lu.LogInfo("Choosing pci slot %d" % pci)
+ return idx, pci
+ # case of InstanceSetParams()
+ elif lu.instance.hotplug_info is not None:
+ idx, pci = lu.cfg.GetPCIInfo(lu.instance.name, dev_type)
+ lu.LogInfo("Choosing pci slot %d" % pci)
+ return idx, pci
+
+ lu.LogWarning("Hotplug not supported for this instance.")
+ return None, None
+
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
iv_name, p_minor, s_minor):
assert len(vgnames) == len(names) == 2
port = lu.cfg.AllocatePort()
shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
+
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
- logical_id=(vgnames[0], names[0]))
- dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
- logical_id=(vgnames[1], names[1]))
- drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
+ logical_id=(vgnames[0], names[0]),
+ params={})
+ dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
+ logical_id=(vgnames[1], names[1]),
+ params={})
+
+ disk_idx, pci = _GetPCIInfo(lu, 'disks')
+ drbd_dev = objects.Disk(idx=disk_idx, pci=pci,
+ dev_type=constants.LD_DRBD8, size=size,
logical_id=(primary, secondary, port,
p_minor, s_minor,
shared_secret),
children=[dev_data, dev_meta],
- iv_name=iv_name)
+ iv_name=iv_name, params={})
return drbd_dev
-def _GenerateDiskTemplate(lu, template_name,
- instance_name, primary_node,
- secondary_nodes, disk_info,
- file_storage_dir, file_driver,
- base_index, feedback_fn):
+_DISK_TEMPLATE_NAME_PREFIX = {
+ constants.DT_PLAIN: "",
+ constants.DT_RBD: ".rbd",
+ constants.DT_EXT: ".ext",
+ }
+
+
+_DISK_TEMPLATE_DEVICE_TYPE = {
+ constants.DT_PLAIN: constants.LD_LV,
+ constants.DT_FILE: constants.LD_FILE,
+ constants.DT_SHARED_FILE: constants.LD_FILE,
+ constants.DT_BLOCK: constants.LD_BLOCKDEV,
+ constants.DT_RBD: constants.LD_RBD,
+ constants.DT_EXT: constants.LD_EXT,
+ }
+
+
+def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
+ secondary_nodes, disk_info, file_storage_dir, file_driver, base_index,
+ feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
+ _req_shr_file_storage=opcodes.RequireSharedFileStorage):
"""Generate the entire disk layout for a given template type.
"""
vgname = lu.cfg.GetVGName()
disk_count = len(disk_info)
disks = []
+
if template_name == constants.DT_DISKLESS:
pass
- elif template_name == constants.DT_PLAIN:
- if len(secondary_nodes) != 0:
- raise errors.ProgrammerError("Wrong template configuration")
-
- names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
- for i in range(disk_count)])
- for idx, disk in enumerate(disk_info):
- disk_index = idx + base_index
- vg = disk.get(constants.IDISK_VG, vgname)
- feedback_fn("* disk %i, vg %s, name %s" % (idx, vg, names[idx]))
- disk_dev = objects.Disk(dev_type=constants.LD_LV,
- size=disk[constants.IDISK_SIZE],
- logical_id=(vg, names[idx]),
- iv_name="disk/%d" % disk_index,
- mode=disk[constants.IDISK_MODE])
- disks.append(disk_dev)
elif template_name == constants.DT_DRBD8:
if len(secondary_nodes) != 1:
raise errors.ProgrammerError("Wrong template configuration")
minors = lu.cfg.AllocateDRBDMinor(
[primary_node, remote_node] * len(disk_info), instance_name)
+ (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
+ full_disk_params)
+ drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
+
names = []
for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
for i in range(disk_count)]):
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
data_vg = disk.get(constants.IDISK_VG, vgname)
- meta_vg = disk.get(constants.IDISK_METAVG, data_vg)
+ meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
disk[constants.IDISK_SIZE],
[data_vg, meta_vg],
minors[idx * 2], minors[idx * 2 + 1])
disk_dev.mode = disk[constants.IDISK_MODE]
disks.append(disk_dev)
- elif template_name == constants.DT_FILE:
- if len(secondary_nodes) != 0:
+ else:
+ if secondary_nodes:
raise errors.ProgrammerError("Wrong template configuration")
- opcodes.RequireFileStorage()
+ if template_name == constants.DT_FILE:
+ _req_file_storage()
+ elif template_name == constants.DT_SHARED_FILE:
+ _req_shr_file_storage()
- for idx, disk in enumerate(disk_info):
- disk_index = idx + base_index
- disk_dev = objects.Disk(dev_type=constants.LD_FILE,
- size=disk[constants.IDISK_SIZE],
- iv_name="disk/%d" % disk_index,
- logical_id=(file_driver,
- "%s/disk%d" % (file_storage_dir,
- disk_index)),
- mode=disk[constants.IDISK_MODE])
- disks.append(disk_dev)
- elif template_name == constants.DT_SHARED_FILE:
- if len(secondary_nodes) != 0:
- raise errors.ProgrammerError("Wrong template configuration")
+ name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
+ if name_prefix is None:
+ names = None
+ else:
+ names = _GenerateUniqueNames(lu, ["%s.disk%s" %
+ (name_prefix, base_index + i)
+ for i in range(disk_count)])
+
+ if template_name == constants.DT_PLAIN:
+ def logical_id_fn(idx, _, disk):
+ vg = disk.get(constants.IDISK_VG, vgname)
+ return (vg, names[idx])
+ elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
+ logical_id_fn = \
+ lambda _, disk_index, disk: (file_driver,
+ "%s/disk%d" % (file_storage_dir,
+ disk_index))
+ elif template_name == constants.DT_BLOCK:
+ logical_id_fn = \
+ lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
+ disk[constants.IDISK_ADOPT])
+ elif template_name == constants.DT_RBD:
+ logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
+ elif template_name == constants.DT_EXT:
+ def logical_id_fn(idx, _, disk):
+ provider = disk.get(constants.IDISK_PROVIDER, None)
+ if provider is None:
+ raise errors.ProgrammerError("Disk template is %s, but '%s' is"
+ " not found", constants.DT_EXT,
+ constants.IDISK_PROVIDER)
+ return (provider, names[idx])
+ else:
+ raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
- opcodes.RequireSharedFileStorage()
+ dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
for idx, disk in enumerate(disk_info):
+ params={}
+ # Only for the Ext template add disk_info to params
+ if template_name == constants.DT_EXT:
+ params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
+ for key in disk:
+ if key not in constants.IDISK_PARAMS:
+ params[key] = disk[key]
disk_index = idx + base_index
- disk_dev = objects.Disk(dev_type=constants.LD_FILE,
- size=disk[constants.IDISK_SIZE],
- iv_name="disk/%d" % disk_index,
- logical_id=(file_driver,
- "%s/disk%d" % (file_storage_dir,
- disk_index)),
- mode=disk[constants.IDISK_MODE])
- disks.append(disk_dev)
- elif template_name == constants.DT_BLOCK:
- if len(secondary_nodes) != 0:
- raise errors.ProgrammerError("Wrong template configuration")
+ size = disk[constants.IDISK_SIZE]
+ feedback_fn("* disk %s, size %s" %
+ (disk_index, utils.FormatUnit(size, "h")))
- for idx, disk in enumerate(disk_info):
- disk_index = idx + base_index
- disk_dev = objects.Disk(dev_type=constants.LD_BLOCKDEV,
- size=disk[constants.IDISK_SIZE],
- logical_id=(constants.BLOCKDEV_DRIVER_MANUAL,
- disk[constants.IDISK_ADOPT]),
- iv_name="disk/%d" % disk_index,
- mode=disk[constants.IDISK_MODE])
- disks.append(disk_dev)
+ disk_idx, pci = _GetPCIInfo(lu, 'disks')
+
+ disks.append(objects.Disk(dev_type=dev_type, size=size,
+ logical_id=logical_id_fn(idx, disk_index, disk),
+ iv_name="disk/%d" % disk_index,
+ mode=disk[constants.IDISK_MODE],
+ params=params, idx=disk_idx, pci=pci))
- else:
- raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
return disks
lu.cfg.SetDiskID(device, node)
logging.info("Pause sync of instance %s disks", instance.name)
- result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (instance.disks, instance),
+ True)
+ result.Raise("Failed RPC to node %s for pausing the disk syncing" % node)
for idx, success in enumerate(result.payload):
if not success:
wipe_size = min(wipe_chunk_size, size - offset)
logging.debug("Wiping disk %d, offset %s, chunk %s",
idx, offset, wipe_size)
- result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+ result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
+ wipe_size)
result.Raise("Could not wipe disk %d at offset %d for size %d" %
(idx, offset, wipe_size))
now = time.time()
finally:
logging.info("Resume sync of instance %s disks", instance.name)
- result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (instance.disks, instance),
+ False)
- for idx, success in enumerate(result.payload):
- if not success:
- lu.LogWarning("Resume sync of disk %d failed, please have a"
- " look at the status and troubleshoot the issue", idx)
- logging.warn("resume-sync of instance %s for disks %d failed",
- instance.name, idx)
+ if result.fail_msg:
+ lu.LogWarning("RPC call to %s for resuming disk syncing failed,"
+ " please have a look at the status and troubleshoot"
+ " the issue: %s", node, result.fail_msg)
+ else:
+ for idx, success in enumerate(result.payload):
+ if not success:
+ lu.LogWarning("Resume sync of disk %d failed, please have a"
+ " look at the status and troubleshoot the issue", idx)
+ logging.warn("resume-sync of instance %s for disks %d failed",
+ instance.name, idx)
def _CreateDisks(lu, instance, to_skip=None, target_node=None):
for idx, device in enumerate(instance.disks):
if to_skip and idx in to_skip:
continue
- logging.info("Creating volume %s for instance %s",
- device.iv_name, instance.name)
+ logging.info("Creating disk %s for instance '%s'", idx, instance.name)
#HARDCODE
for node in all_nodes:
f_create = node == pnode
_CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
-def _RemoveDisks(lu, instance, target_node=None):
+def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
"""Remove all disks for an instance.
This abstracts away some work from `AddInstance()` and
logging.info("Removing block devices for instance %s", instance.name)
all_result = True
- for device in instance.disks:
+ ports_to_release = set()
+ anno_disks = _AnnotateDiskParams(instance, instance.disks, lu.cfg)
+ for (idx, device) in enumerate(anno_disks):
if target_node:
edata = [(target_node, device)]
else:
edata = device.ComputeNodeTree(instance.primary_node)
for node, disk in edata:
lu.cfg.SetDiskID(disk, node)
- msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
- if msg:
- lu.LogWarning("Could not remove block device %s on node %s,"
- " continuing anyway: %s", device.iv_name, node, msg)
- all_result = False
+ result = lu.rpc.call_blockdev_remove(node, disk)
+ if result.fail_msg:
+ lu.LogWarning("Could not remove disk %s on node %s,"
+ " continuing anyway: %s", idx, node, result.fail_msg)
+ if not (result.offline and node != instance.primary_node):
+ all_result = False
# if this is a DRBD disk, return its port to the pool
if device.dev_type in constants.LDS_DRBD:
- tcp_port = device.logical_id[2]
- lu.cfg.AddTcpUdpPort(tcp_port)
+ ports_to_release.add(device.logical_id[2])
+
+ if all_result or ignore_failures:
+ for port in ports_to_release:
+ lu.cfg.AddTcpUdpPort(port)
if instance.disk_template == constants.DT_FILE:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
constants.DT_DISKLESS: {},
constants.DT_PLAIN: _compute(disks, 0),
# 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: _compute(disks, 128),
+ constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE),
constants.DT_FILE: {},
constants.DT_SHARED_FILE: {},
}
def _ComputeDiskSize(disk_template, disks):
- """Compute disk size requirements in the volume group
+ """Compute disk size requirements according to disk template
"""
# Required free disk space as a function of disk and swap space
constants.DT_DISKLESS: None,
constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
# 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks),
- constants.DT_FILE: None,
- constants.DT_SHARED_FILE: 0,
+ constants.DT_DRBD8:
+ sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
+ constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
+ constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
constants.DT_BLOCK: 0,
+ constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
+ constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
}
if disk_template not in req_size_dict:
"""
nodenames = _FilterVmNodes(lu, nodenames)
- hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
- hvname,
- hvparams)
+
+ cluster = lu.cfg.GetClusterInfo()
+ hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
+
+ hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
for node in nodenames:
info = hvinfo[node]
if info.offline:
"""
nodenames = _FilterVmNodes(lu, nodenames)
- result = lu.rpc.call_os_validate(required, nodenames, osname,
+ result = lu.rpc.call_os_validate(nodenames, required, osname,
[constants.OS_VALIDATE_PARAMETERS],
osparams)
for node, nres in result.items():
# check disks. parameter names and consistent adopt/no-adopt strategy
has_adopt = has_no_adopt = False
for disk in self.op.disks:
- utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
+ if self.op.disk_template != constants.DT_EXT:
+ utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
if constants.IDISK_ADOPT in disk:
has_adopt = True
else:
self.add_locks[locking.LEVEL_INSTANCE] = instance_name
if self.op.iallocator:
+ # TODO: Find a solution to not lock all nodes in the cluster, e.g. by
+ # specifying a group on instance creation and then selecting nodes from
+ # that group
self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET
else:
self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
nodelist = [self.op.pnode]
self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
nodelist.append(self.op.snode)
self.needed_locks[locking.LEVEL_NODE] = nodelist
+ # Lock resources of instance's primary and secondary nodes (copy to
+ # prevent accidential modification)
+ self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist)
# in case of import lock the source node too
if self.op.mode == constants.INSTANCE_IMPORT:
"""Run the allocator based on input opcode.
"""
+ #TODO Export network to iallocator so that it chooses a pnode
+ # in a nodegroup that has the desired network connected to
nics = [n.ToDict() for n in self.nics]
ial = IAllocator(self.cfg, self.rpc,
mode=constants.IALLOCATOR_MODE_ALLOC,
tags=self.op.tags,
os=self.op.os_type,
vcpus=self.be_full[constants.BE_VCPUS],
- memory=self.be_full[constants.BE_MEMORY],
+ memory=self.be_full[constants.BE_MAXMEM],
+ spindle_use=self.be_full[constants.BE_SPINDLE_USE],
disks=self.disks,
nics=nics,
hypervisor=self.op.hypervisor,
secondary_nodes=self.secondaries,
status=self.op.start,
os_type=self.op.os_type,
- memory=self.be_full[constants.BE_MEMORY],
+ minmem=self.be_full[constants.BE_MINMEM],
+ maxmem=self.be_full[constants.BE_MAXMEM],
vcpus=self.be_full[constants.BE_VCPUS],
nics=_NICListToTuple(self, self.nics),
disk_template=self.op.disk_template,
if einfo.has_option(constants.INISECT_INS, "disk_template"):
self.op.disk_template = einfo.get(constants.INISECT_INS,
"disk_template")
+ if self.op.disk_template not in constants.DISK_TEMPLATES:
+ raise errors.OpPrereqError("Disk template specified in configuration"
+ " file is not one of the allowed values:"
+ " %s" % " ".join(constants.DISK_TEMPLATES))
else:
raise errors.OpPrereqError("No disk template specified and the export"
" is missing the disk_template information",
errors.ECODE_INVAL)
if not self.op.disks:
- if einfo.has_option(constants.INISECT_INS, "disk_count"):
- disks = []
- # TODO: import the disk iv_name too
- for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
+ disks = []
+ # TODO: import the disk iv_name too
+ for idx in range(constants.MAX_DISKS):
+ if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx):
disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
disks.append({constants.IDISK_SIZE: disk_sz})
- self.op.disks = disks
- else:
+ self.op.disks = disks
+ if not disks and self.op.disk_template != constants.DT_DISKLESS:
raise errors.OpPrereqError("No disk info specified and the export"
" is missing the disk information",
errors.ECODE_INVAL)
- if (not self.op.nics and
- einfo.has_option(constants.INISECT_INS, "nic_count")):
+ if not self.op.nics:
nics = []
- for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")):
- ndict = {}
- for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
- v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
- ndict[name] = v
- nics.append(ndict)
+ for idx in range(constants.MAX_NICS):
+ if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx):
+ ndict = {}
+ for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
+ v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
+ ndict[name] = v
+ nics.append(ndict)
+ else:
+ break
self.op.nics = nics
if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
for name, value in einfo.items(constants.INISECT_BEP):
if name not in self.op.beparams:
self.op.beparams[name] = value
+ # Compatibility for the old "memory" be param
+ if name == constants.BE_MEMORY:
+ if constants.BE_MAXMEM not in self.op.beparams:
+ self.op.beparams[constants.BE_MAXMEM] = value
+ if constants.BE_MINMEM not in self.op.beparams:
+ self.op.beparams[constants.BE_MINMEM] = value
else:
# try to read the parameters old style, from the main section
for name in constants.BES_PARAMETERS:
# pylint: disable=W0142
self.instance_file_storage_dir = utils.PathJoin(*joinargs)
- def CheckPrereq(self):
+ def CheckPrereq(self): # pylint: disable=R0914
"""Check prerequisites.
"""
if self.op.mode == constants.INSTANCE_IMPORT:
export_info = self._ReadExportInfo()
self._ReadExportParams(export_info)
+ self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
+ else:
+ self._old_instance_name = None
if (not self.cfg.GetVGName() and
self.op.disk_template not in constants.DTS_NOT_LVM):
raise errors.OpPrereqError("Cluster does not support lvm-based"
" instances", errors.ECODE_STATE)
- if self.op.hypervisor is None:
+ if (self.op.hypervisor is None or
+ self.op.hypervisor == constants.VALUE_AUTO):
self.op.hypervisor = self.cfg.GetHypervisorType()
cluster = self.cfg.GetClusterInfo()
_CheckGlobalHvParams(self.op.hvparams)
# fill and remember the beparams dict
+ default_beparams = cluster.beparams[constants.PP_DEFAULT]
+ for param, value in self.op.beparams.iteritems():
+ if value == constants.VALUE_AUTO:
+ self.op.beparams[param] = default_beparams[param]
+ objects.UpgradeBeParams(self.op.beparams)
utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
self.be_full = cluster.SimpleFillBE(self.op.beparams)
if self.op.identify_defaults:
self._RevertToDefaults(cluster)
+ self.hotplug_info = None
+ if self.op.hotplug:
+ self.LogInfo("Enabling hotplug.")
+ self.hotplug_info = objects.HotplugInfo(disks=0, nics=0,
+ pci_pool=list(range(16,32)))
# NIC buildup
self.nics = []
for idx, nic in enumerate(self.op.nics):
nic_mode_req = nic.get(constants.INIC_MODE, None)
nic_mode = nic_mode_req
- if nic_mode is None:
+ if nic_mode is None or nic_mode == constants.VALUE_AUTO:
nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
- # in routed mode, for the first nic, the default ip is 'auto'
- if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
- default_ip_mode = constants.VALUE_AUTO
+ net = nic.get(constants.INIC_NETWORK, None)
+ link = nic.get(constants.NIC_LINK, None)
+ ip = nic.get(constants.INIC_IP, None)
+
+ if net is None or net.lower() == constants.VALUE_NONE:
+ net = None
else:
- default_ip_mode = constants.VALUE_NONE
+ if nic_mode_req is not None or link is not None:
+ raise errors.OpPrereqError("If network is given, no mode or link"
+ " is allowed to be passed",
+ errors.ECODE_INVAL)
# ip validity checks
- ip = nic.get(constants.INIC_IP, default_ip_mode)
if ip is None or ip.lower() == constants.VALUE_NONE:
nic_ip = None
elif ip.lower() == constants.VALUE_AUTO:
errors.ECODE_INVAL)
nic_ip = self.hostname1.ip
else:
- if not netutils.IPAddress.IsValid(ip):
+ # We defer pool operations until later, so that the iallocator has
+ # filled in the instance's node(s) dimara
+ if ip.lower() == constants.NIC_IP_POOL:
+ if net is None:
+ raise errors.OpPrereqError("if ip=pool, parameter network"
+ " must be passed too",
+ errors.ECODE_INVAL)
+
+ elif not netutils.IPAddress.IsValid(ip):
raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
errors.ECODE_INVAL)
+
nic_ip = ip
# TODO: check the ip address for uniqueness
errors.ECODE_NOTUNIQUE)
# Build nic parameters
- link = nic.get(constants.INIC_LINK, None)
nicparams = {}
if nic_mode_req:
- nicparams[constants.NIC_MODE] = nic_mode_req
+ nicparams[constants.NIC_MODE] = nic_mode
if link:
nicparams[constants.NIC_LINK] = link
check_params = cluster.SimpleFillNIC(nicparams)
objects.NIC.CheckParameterSyntax(check_params)
- self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
+ nic_idx, pci = _GetPCIInfo(self, 'nics')
+ self.nics.append(objects.NIC(idx=nic_idx, pci=pci,
+ mac=mac, ip=nic_ip, network=net,
+ nicparams=check_params))
# disk checks/pre-build
default_vg = self.cfg.GetVGName()
raise errors.OpPrereqError("Invalid disk size '%s'" % size,
errors.ECODE_INVAL)
+ ext_provider = disk.get(constants.IDISK_PROVIDER, None)
+ if ext_provider and self.op.disk_template != constants.DT_EXT:
+ raise errors.OpPrereqError("The '%s' option is only valid for the %s"
+ " disk template, not %s" %
+ (constants.IDISK_PROVIDER, constants.DT_EXT,
+ self.op.disk_template), errors.ECODE_INVAL)
+
data_vg = disk.get(constants.IDISK_VG, default_vg)
new_disk = {
constants.IDISK_SIZE: size,
constants.IDISK_MODE: mode,
constants.IDISK_VG: data_vg,
- constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg),
}
+
+ if constants.IDISK_METAVG in disk:
+ new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
if constants.IDISK_ADOPT in disk:
new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
- self.disks.append(new_disk)
- if self.op.mode == constants.INSTANCE_IMPORT:
+ # For extstorage, demand the `provider' option and add any
+ # additional parameters (ext-params) to the dict
+ if self.op.disk_template == constants.DT_EXT:
+ if ext_provider:
+ new_disk[constants.IDISK_PROVIDER] = ext_provider
+ for key in disk:
+ if key not in constants.IDISK_PARAMS:
+ new_disk[key] = disk[key]
+ else:
+ raise errors.OpPrereqError("Missing provider for template '%s'" %
+ constants.DT_EXT, errors.ECODE_INVAL)
- # Check that the new instance doesn't have less disks than the export
- instance_disks = len(self.disks)
- export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
- if instance_disks < export_disks:
- raise errors.OpPrereqError("Not enough disks to import."
- " (instance: %d, export: %d)" %
- (instance_disks, export_disks),
- errors.ECODE_INVAL)
+ self.disks.append(new_disk)
+ if self.op.mode == constants.INSTANCE_IMPORT:
disk_images = []
- for idx in range(export_disks):
+ for idx in range(len(self.disks)):
option = "disk%d_dump" % idx
if export_info.has_option(constants.INISECT_INS, option):
# FIXME: are the old os-es, disk sizes, etc. useful?
self.src_images = disk_images
- old_name = export_info.get(constants.INISECT_INS, "name")
- try:
- exp_nic_count = export_info.getint(constants.INISECT_INS, "nic_count")
- except (TypeError, ValueError), err:
- raise errors.OpPrereqError("Invalid export file, nic_count is not"
- " an integer: %s" % str(err),
- errors.ECODE_STATE)
- if self.op.instance_name == old_name:
+ if self.op.instance_name == self._old_instance_name:
for idx, nic in enumerate(self.nics):
- if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
+ if nic.mac == constants.VALUE_AUTO:
nic_mac_ini = "nic%d_mac" % idx
nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
# creation job will fail.
for nic in self.nics:
if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
+ nic.mac = self.cfg.GenerateMAC(nic.network, self.proc.GetECId())
#### allocator run
if self.op.iallocator is not None:
self._RunAllocator()
+ # Release all unneeded node locks
+ _ReleaseLocks(self, locking.LEVEL_NODE,
+ keep=filter(None, [self.op.pnode, self.op.snode,
+ self.op.src_node]))
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES,
+ keep=filter(None, [self.op.pnode, self.op.snode,
+ self.op.src_node]))
+
#### node related checks
# check primary node
self.secondaries = []
+ # Fill in any IPs from IP pools. This must happen here, because we need to
+ # know the nic's primary node, as specified by the iallocator
+ for idx, nic in enumerate(self.nics):
+ net = nic.network
+ if net is not None:
+ netparams = self.cfg.GetGroupNetParams(net, self.pnode.name)
+ if netparams is None:
+ raise errors.OpPrereqError("No netparams found for network"
+ " %s. Propably not connected to"
+ " node's %s nodegroup" %
+ (net, self.pnode.name),
+ errors.ECODE_INVAL)
+ self.LogInfo("NIC/%d inherits netparams %s" %
+ (idx, netparams.values()))
+ nic.nicparams = dict(netparams)
+ if nic.ip is not None:
+ filled_params = cluster.SimpleFillNIC(nic.nicparams)
+ if nic.ip.lower() == constants.NIC_IP_POOL:
+ try:
+ nic.ip = self.cfg.GenerateIp(net, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("Unable to get a free IP for NIC %d"
+ " from the address pool" % idx,
+ errors.ECODE_STATE)
+ self.LogInfo("Chose IP %s from network %s", nic.ip, net)
+ else:
+ try:
+ self.cfg.ReserveIp(net, nic.ip, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("IP address %s already in use"
+ " or does not belong to network %s" %
+ (nic.ip, net),
+ errors.ECODE_NOTUNIQUE)
+ else:
+ # net is None, ip None or given
+ if self.op.conflicts_check:
+ _CheckForConflictingIp(self, nic.ip, self.pnode.name)
+
+
# mirror node verification
if self.op.disk_template in constants.DTS_INT_MIRROR:
if self.op.snode == pnode.name:
_CheckNodeVmCapable(self, self.op.snode)
self.secondaries.append(self.op.snode)
+ snode = self.cfg.GetNodeInfo(self.op.snode)
+ if pnode.group != snode.group:
+ self.LogWarning("The primary and secondary nodes are in two"
+ " different node groups; the disk parameters"
+ " from the first disk's node group will be"
+ " used")
+
nodenames = [pnode.name] + self.secondaries
+ # Verify instance specs
+ spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
+ ispec = {
+ constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
+ constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
+ constants.ISPEC_DISK_COUNT: len(self.disks),
+ constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks],
+ constants.ISPEC_NIC_COUNT: len(self.nics),
+ constants.ISPEC_SPINDLE_USE: spindle_use,
+ }
+
+ group_info = self.cfg.GetNodeGroup(pnode.group)
+ ipolicy = _CalculateGroupIPolicy(cluster, group_info)
+ res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec)
+ if not self.op.ignore_ipolicy and res:
+ raise errors.OpPrereqError(("Instance allocation to group %s violates"
+ " policy: %s") % (pnode.group,
+ utils.CommaJoin(res)),
+ errors.ECODE_INVAL)
+
if not self.adopt_disks:
- # Check lv size requirements, if not adopting
- req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
- _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
+ if self.op.disk_template == constants.DT_RBD:
+ # _CheckRADOSFreeSpace() is just a placeholder.
+ # Any function that checks prerequisites can be placed here.
+ # Check if there is enough space on the RADOS cluster.
+ _CheckRADOSFreeSpace()
+ elif self.op.disk_template == constants.DT_EXT:
+ # FIXME: Function that checks prereqs if needed
+ pass
+ else:
+ # Check lv size requirements, if not adopting
+ req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
+ _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
_CheckNicsBridgesExist(self, self.nics, self.pnode.name)
+ #TODO: _CheckExtParams (remotely)
+ # Check parameters for extstorage
+
# memory check on primary node
+ #TODO(dynmem): use MINMEM for checking
if self.op.start:
_CheckNodeFreeMemory(self, self.pnode.name,
"creating instance %s" % self.op.instance_name,
- self.be_full[constants.BE_MEMORY],
+ self.be_full[constants.BE_MAXMEM],
self.op.hypervisor)
self.dry_run_result = list(nodenames)
instance = self.op.instance_name
pnode_name = self.pnode.name
+ assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
+ self.owned_locks(locking.LEVEL_NODE)), \
+ "Node locks differ from node resource locks"
+
ht_kind = self.op.hypervisor
if ht_kind in constants.HTS_REQ_PORT:
network_port = self.cfg.AllocatePort()
else:
network_port = None
+ # This is ugly but we got a chicken-egg problem here
+ # We can only take the group disk parameters, as the instance
+ # has no disks yet (we are generating them right here).
+ node = self.cfg.GetNodeInfo(pnode_name)
+ nodegroup = self.cfg.GetNodeGroup(node.group)
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
instance, pnode_name,
self.instance_file_storage_dir,
self.op.file_driver,
0,
- feedback_fn)
+ feedback_fn,
+ self.cfg.GetGroupDiskParams(nodegroup))
iobj = objects.Instance(name=instance, os=self.op.os_type,
primary_node=pnode_name,
nics=self.nics, disks=disks,
disk_template=self.op.disk_template,
- admin_up=False,
+ admin_state=constants.ADMINST_DOWN,
network_port=network_port,
beparams=self.op.beparams,
hvparams=self.op.hvparams,
hypervisor=self.op.hypervisor,
osparams=self.op.osparams,
+ hotplug_info=self.hotplug_info,
)
if self.op.tags:
raise errors.OpExecError("There are some degraded disks for"
" this instance")
+ # Release all node resource locks
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES)
+
if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
+ # we need to set the disks ID to the primary node, since the
+ # preceding code might or might have not done it, depending on
+ # disk template and other options
+ for disk in iobj.disks:
+ self.cfg.SetDiskID(disk, pnode_name)
if self.op.mode == constants.INSTANCE_CREATE:
if not self.op.no_install:
pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
if pause_sync:
feedback_fn("* pausing disk sync to install instance OS")
result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
- iobj.disks, True)
+ (iobj.disks,
+ iobj), True)
for idx, success in enumerate(result.payload):
if not success:
logging.warn("pause-sync of instance %s for disk %d failed",
feedback_fn("* running the instance OS create scripts...")
# FIXME: pass debug option from opcode to backend
- result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
- self.op.debug_level)
+ os_add_result = \
+ self.rpc.call_instance_os_add(pnode_name, (iobj, None), False,
+ self.op.debug_level)
if pause_sync:
feedback_fn("* resuming disk sync")
result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
- iobj.disks, False)
+ (iobj.disks,
+ iobj), False)
for idx, success in enumerate(result.payload):
if not success:
logging.warn("resume-sync of instance %s for disk %d failed",
instance, idx)
- result.Raise("Could not add os for instance %s"
- " on node %s" % (instance, pnode_name))
+ os_add_result.Raise("Could not add os for instance %s"
+ " on node %s" % (instance, pnode_name))
- elif self.op.mode == constants.INSTANCE_IMPORT:
- feedback_fn("* running the instance OS import scripts...")
+ else:
+ if self.op.mode == constants.INSTANCE_IMPORT:
+ feedback_fn("* running the instance OS import scripts...")
+
+ transfers = []
+
+ for idx, image in enumerate(self.src_images):
+ if not image:
+ continue
+
+ # FIXME: pass debug option from opcode to backend
+ dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+ constants.IEIO_FILE, (image, ),
+ constants.IEIO_SCRIPT,
+ (iobj.disks[idx], idx),
+ None)
+ transfers.append(dt)
+
+ import_result = \
+ masterd.instance.TransferInstanceData(self, feedback_fn,
+ self.op.src_node, pnode_name,
+ self.pnode.secondary_ip,
+ iobj, transfers)
+ if not compat.all(import_result):
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
+ rename_from = self._old_instance_name
+
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ feedback_fn("* preparing remote import...")
+ # The source cluster will stop the instance before attempting to make
+ # a connection. In some cases stopping an instance can take a long
+ # time, hence the shutdown timeout is added to the connection
+ # timeout.
+ connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
+ self.op.source_shutdown_timeout)
+ timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
- transfers = []
+ assert iobj.primary_node == self.pnode.name
+ disk_results = \
+ masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
+ self.source_x509_ca,
+ self._cds, timeouts)
+ if not compat.all(disk_results):
+ # TODO: Should the instance still be started, even if some disks
+ # failed to import (valid for local imports, too)?
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
- for idx, image in enumerate(self.src_images):
- if not image:
- continue
+ rename_from = self.source_instance_name
- # FIXME: pass debug option from opcode to backend
- dt = masterd.instance.DiskTransfer("disk/%s" % idx,
- constants.IEIO_FILE, (image, ),
- constants.IEIO_SCRIPT,
- (iobj.disks[idx], idx),
- None)
- transfers.append(dt)
-
- import_result = \
- masterd.instance.TransferInstanceData(self, feedback_fn,
- self.op.src_node, pnode_name,
- self.pnode.secondary_ip,
- iobj, transfers)
- if not compat.all(import_result):
- self.LogWarning("Some disks for instance %s on node %s were not"
- " imported successfully" % (instance, pnode_name))
-
- elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
- feedback_fn("* preparing remote import...")
- # The source cluster will stop the instance before attempting to make a
- # connection. In some cases stopping an instance can take a long time,
- # hence the shutdown timeout is added to the connection timeout.
- connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
- self.op.source_shutdown_timeout)
- timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
-
- assert iobj.primary_node == self.pnode.name
- disk_results = \
- masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
- self.source_x509_ca,
- self._cds, timeouts)
- if not compat.all(disk_results):
- # TODO: Should the instance still be started, even if some disks
- # failed to import (valid for local imports, too)?
- self.LogWarning("Some disks for instance %s on node %s were not"
- " imported successfully" % (instance, pnode_name))
+ else:
+ # also checked in the prereq part
+ raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
+ % self.op.mode)
# Run rename script on newly imported instance
assert iobj.name == instance
feedback_fn("Running rename script for %s" % instance)
result = self.rpc.call_instance_run_rename(pnode_name, iobj,
- self.source_instance_name,
+ rename_from,
self.op.debug_level)
if result.fail_msg:
self.LogWarning("Failed to run rename script for %s on node"
" %s: %s" % (instance, pnode_name, result.fail_msg))
- else:
- # also checked in the prereq part
- raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
- % self.op.mode)
+ assert not self.owned_locks(locking.LEVEL_NODE_RES)
if self.op.start:
- iobj.admin_up = True
+ iobj.admin_state = constants.ADMINST_UP
self.cfg.Update(iobj, feedback_fn)
logging.info("Starting instance %s on node %s", instance, pnode_name)
feedback_fn("* starting instance...")
- result = self.rpc.call_instance_start(pnode_name, iobj,
- None, None, False)
+ result = self.rpc.call_instance_start(pnode_name, (iobj, None, None),
+ False)
result.Raise("Could not start instance")
return list(iobj.all_nodes)
+def _CheckRADOSFreeSpace():
+ """Compute disk size requirements inside the RADOS cluster.
+
+ """
+ # For the RADOS cluster we assume there is always enough space.
+ pass
+
+
class LUInstanceConsole(NoHooksLU):
"""Connect to an instance's console.
REQ_BGL = False
def ExpandNames(self):
+ self.share_locks = _ShareAll()
self._ExpandAndLockInstance()
def CheckPrereq(self):
node_insts.Raise("Can't get node information from %s" % node)
if instance.name not in node_insts.payload:
- if instance.admin_up:
+ if instance.admin_state == constants.ADMINST_UP:
state = constants.INSTST_ERRORDOWN
- else:
+ elif instance.admin_state == constants.ADMINST_DOWN:
state = constants.INSTST_ADMINDOWN
+ else:
+ state = constants.INSTST_ADMINOFFLINE
raise errors.OpExecError("Instance %s is not running (state %s)" %
(instance.name, state))
self._ExpandAndLockInstance()
assert locking.LEVEL_NODE not in self.needed_locks
+ assert locking.LEVEL_NODE_RES not in self.needed_locks
assert locking.LEVEL_NODEGROUP not in self.needed_locks
assert self.op.iallocator is None or self.op.remote_node is None, \
# iallocator will select a new node in the same group
self.needed_locks[locking.LEVEL_NODEGROUP] = []
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
+
self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
self.op.iallocator, self.op.remote_node,
- self.op.disks, False, self.op.early_release)
+ self.op.disks, False, self.op.early_release,
+ self.op.ignore_ipolicy)
self.tasklets = [self.replacer]
assert not self.needed_locks[locking.LEVEL_NODEGROUP]
self.share_locks[locking.LEVEL_NODEGROUP] = 1
+ # Lock all groups used by instance optimistically; this requires going
+ # via the node before it's locked, requiring verification later on
self.needed_locks[locking.LEVEL_NODEGROUP] = \
self.cfg.GetInstanceNodeGroups(self.op.instance_name)
for node_name in self.cfg.GetNodeGroup(group_uuid).members]
else:
self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Reuse node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE]
def BuildHooksEnv(self):
"""Build hooks env.
assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
self.op.iallocator is None)
+ # Verify if node group locks are still correct
owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
if owned_groups:
_CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
"""
def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
- disks, delay_iallocator, early_release):
+ disks, delay_iallocator, early_release, ignore_ipolicy):
"""Initializes this class.
"""
self.disks = disks
self.delay_iallocator = delay_iallocator
self.early_release = early_release
+ self.ignore_ipolicy = ignore_ipolicy
# Runtime data
self.instance = None
self.lu.LogInfo("Checking disk/%d on %s", idx, node)
self.cfg.SetDiskID(dev, node)
- result = self.rpc.call_blockdev_find(node, dev)
+ result = _BlockdevFind(self, node, dev, instance)
if result.offline:
continue
if not self.disks:
self.disks = range(len(self.instance.disks))
+ # TODO: This is ugly, but right now we can't distinguish between internal
+ # submitted opcode and external one. We should fix that.
+ if self.remote_node_info:
+ # We change the node, lets verify it still meets instance policy
+ new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
+ ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(),
+ new_group_info)
+ _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
+ ignore=self.ignore_ipolicy)
+
for node in check_nodes:
_CheckNodeOnline(self.lu, node)
self.target_node]
if node_name is not None)
- # Release unneeded node locks
+ # Release unneeded node and node resource locks
_ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
# Release any owned node group
if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
assert set(owned_nodes) == set(self.node_secondary_ip), \
("Incorrect node locks, owning %s, expected %s" %
(owned_nodes, self.node_secondary_ip.keys()))
+ assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
+ self.lu.owned_locks(locking.LEVEL_NODE_RES))
owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
assert list(owned_instances) == [self.instance_name], \
feedback_fn("Replacing disk(s) %s for %s" %
(utils.CommaJoin(self.disks), self.instance.name))
- activate_disks = (not self.instance.admin_up)
+ activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
# Activate the instance disks if we're replacing them on a down instance
if activate_disks:
if activate_disks:
_SafeShutdownInstanceDisks(self.lu, self.instance)
+ assert not self.lu.owned_locks(locking.LEVEL_NODE)
+
if __debug__:
# Verify owned locks
- owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
+ owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
nodes = frozenset(self.node_secondary_ip)
assert ((self.early_release and not owned_nodes) or
(not self.early_release and not (set(owned_nodes) - nodes))), \
self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
self.cfg.SetDiskID(dev, node)
- result = self.rpc.call_blockdev_find(node, dev)
+ result = _BlockdevFind(self, node, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
self.lu.LogInfo("Checking disk/%d consistency on node %s" %
(idx, node_name))
- if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
- ldisk=ldisk):
+ if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+ on_primary, ldisk=ldisk):
raise errors.OpExecError("Node %s has degraded storage, unsafe to"
" replace disks for instance %s" %
(node_name, self.instance.name))
"""
iv_names = {}
- for idx, dev in enumerate(self.instance.disks):
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ for idx, dev in enumerate(disks):
if idx not in self.disks:
continue
lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
names = _GenerateUniqueNames(self.lu, lv_names)
- vg_data = dev.children[0].logical_id[0]
+ (data_disk, meta_disk) = dev.children
+ vg_data = data_disk.logical_id[0]
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
- logical_id=(vg_data, names[0]))
- vg_meta = dev.children[1].logical_id[0]
- lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
- logical_id=(vg_meta, names[1]))
+ logical_id=(vg_data, names[0]),
+ params=data_disk.params)
+ vg_meta = meta_disk.logical_id[0]
+ lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
+ logical_id=(vg_meta, names[1]),
+ params=meta_disk.params)
new_lvs = [lv_data, lv_meta]
old_lvs = [child.Copy() for child in dev.children]
# we pass force_create=True to force the LVM creation
for new_lv in new_lvs:
- _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False)
+ _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
+ _GetInstanceInfoText(self.instance), False)
return iv_names
for name, (dev, _, _) in iv_names.iteritems():
self.cfg.SetDiskID(dev, node_name)
- result = self.rpc.call_blockdev_find(node_name, dev)
+ result = _BlockdevFind(self, node_name, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
# Now that the new lvs have the old name, we can add them to the device
self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
- result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
- new_lvs)
+ result = self.rpc.call_blockdev_addchildren(self.target_node,
+ (dev, self.instance), new_lvs)
msg = result.fail_msg
if msg:
for new_lv in new_lvs:
"volumes"))
raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
- cstep = 5
+ cstep = itertools.count(5)
+
if self.early_release:
- self.lu.LogStep(cstep, steps_total, "Removing old storage")
- cstep += 1
+ self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node, iv_names)
- # WARNING: we release both node locks here, do not do other RPCs
- # than WaitForSync to the primary node
- _ReleaseLocks(self.lu, locking.LEVEL_NODE,
- names=[self.target_node, self.other_node])
+ # TODO: Check if releasing locks early still makes sense
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+ else:
+ # Release all resource locks except those used by the instance
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+ keep=self.node_secondary_ip.keys())
+
+ # Release all node locks while waiting for sync
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+
+ # TODO: Can the instance lock be downgraded here? Take the optional disk
+ # shutdown in the caller into consideration.
# Wait for sync
# This can fail as the old devices are degraded and _WaitForSync
# does a combined result over all disks, so we don't check its return value
- self.lu.LogStep(cstep, steps_total, "Sync devices")
- cstep += 1
+ self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
_WaitForSync(self.lu, self.instance)
# Check all devices manually
# Step: remove old storage
if not self.early_release:
- self.lu.LogStep(cstep, steps_total, "Removing old storage")
- cstep += 1
+ self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node, iv_names)
def _ExecDrbd8Secondary(self, feedback_fn):
# Step: create new storage
self.lu.LogStep(3, steps_total, "Allocate new storage")
- for idx, dev in enumerate(self.instance.disks):
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ for idx, dev in enumerate(disks):
self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
(self.new_node, idx))
# we pass force_create=True to force LVM creation
for new_lv in dev.children:
- _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False)
+ _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
+ True, _GetInstanceInfoText(self.instance), False)
# Step 4: dbrd minors and drbd setups changes
# after this, we must manually remove the drbd minors on both the
new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
logical_id=new_alone_id,
children=dev.children,
- size=dev.size)
+ size=dev.size,
+ params={})
+ (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
+ self.cfg)
try:
- _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
+ _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
+ anno_new_drbd,
_GetInstanceInfoText(self.instance), False)
except errors.GenericError:
self.cfg.ReleaseDRBDMinors(self.instance.name)
for idx, dev in enumerate(self.instance.disks):
self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
self.cfg.SetDiskID(dev, self.target_node)
- msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
+ msg = self.rpc.call_blockdev_shutdown(self.target_node,
+ (dev, self.instance)).fail_msg
if msg:
self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
"node: %s" % (idx, msg),
self.cfg.Update(self.instance, feedback_fn)
+ # Release all node locks (the configuration has been updated)
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+
# and now perform the drbd attach
self.lu.LogInfo("Attaching primary drbds to new secondary"
" (standalone => connected)")
result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
self.new_node],
self.node_secondary_ip,
- self.instance.disks,
+ (self.instance.disks, self.instance),
self.instance.name,
False)
for to_node, to_result in result.items():
to_node, msg,
hint=("please do a gnt-instance info to see the"
" status of disks"))
- cstep = 5
+
+ cstep = itertools.count(5)
+
if self.early_release:
- self.lu.LogStep(cstep, steps_total, "Removing old storage")
- cstep += 1
+ self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node, iv_names)
- # WARNING: we release all node locks here, do not do other RPCs
- # than WaitForSync to the primary node
- _ReleaseLocks(self.lu, locking.LEVEL_NODE,
- names=[self.instance.primary_node,
- self.target_node,
- self.new_node])
+ # TODO: Check if releasing locks early still makes sense
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+ else:
+ # Release all resource locks except those used by the instance
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+ keep=self.node_secondary_ip.keys())
+
+ # TODO: Can the instance lock be downgraded here? Take the optional disk
+ # shutdown in the caller into consideration.
# Wait for sync
# This can fail as the old devices are degraded and _WaitForSync
# does a combined result over all disks, so we don't check its return value
- self.lu.LogStep(cstep, steps_total, "Sync devices")
- cstep += 1
+ self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
_WaitForSync(self.lu, self.instance)
# Check all devices manually
# Step: remove old storage
if not self.early_release:
- self.lu.LogStep(cstep, steps_total, "Removing old storage")
+ self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node, iv_names)
"""
# Check whether any instance on this node has faulty disks
for inst in _GetNodeInstances(self.cfg, self.op.node_name):
- if not inst.admin_up:
+ if inst.admin_state != constants.ADMINST_UP:
continue
check_nodes = set(inst.all_nodes)
check_nodes.discard(self.op.node_name)
"""
REQ_BGL = False
+ _MODE2IALLOCATOR = {
+ constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
+ constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
+ constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
+ }
+ assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
+ assert (frozenset(_MODE2IALLOCATOR.values()) ==
+ constants.IALLOCATOR_NEVAC_MODES)
+
def CheckArguments(self):
_CheckIAllocatorOrNode(self, "iallocator", "remote_node")
raise errors.OpPrereqError("Can not use evacuated node as a new"
" secondary node", errors.ECODE_INVAL)
- if self.op.mode != constants.IALLOCATOR_NEVAC_SEC:
+ if self.op.mode != constants.NODE_EVAC_SEC:
raise errors.OpPrereqError("Without the use of an iallocator only"
" secondary instances can be evacuated",
errors.ECODE_INVAL)
"""Builds list of instances to operate on.
"""
- assert self.op.mode in constants.IALLOCATOR_NEVAC_MODES
+ assert self.op.mode in constants.NODE_EVAC_MODES
- if self.op.mode == constants.IALLOCATOR_NEVAC_PRI:
+ if self.op.mode == constants.NODE_EVAC_PRI:
# Primary instances only
inst_fn = _GetNodePrimaryInstances
assert self.op.remote_node is None, \
"Evacuating primary instances requires iallocator"
- elif self.op.mode == constants.IALLOCATOR_NEVAC_SEC:
+ elif self.op.mode == constants.NODE_EVAC_SEC:
# Secondary instances only
inst_fn = _GetNodeSecondaryInstances
else:
# All instances
- assert self.op.mode == constants.IALLOCATOR_NEVAC_ALL
+ assert self.op.mode == constants.NODE_EVAC_ALL
inst_fn = _GetNodeInstances
# TODO: In 2.6, change the iallocator interface to take an evacuation mode
# per instance
elif self.op.iallocator is not None:
# TODO: Implement relocation to other group
ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC,
- evac_mode=self.op.mode,
+ evac_mode=self._MODE2IALLOCATOR[self.op.mode],
instances=list(self.instance_names))
ial.Run(self.op.iallocator)
jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
elif self.op.remote_node is not None:
- assert self.op.mode == constants.IALLOCATOR_NEVAC_SEC
+ assert self.op.mode == constants.NODE_EVAC_SEC
jobs = [
[opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
remote_node=self.op.remote_node,
def ExpandNames(self):
self._ExpandAndLockInstance()
self.needed_locks[locking.LEVEL_NODE] = []
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
env = {
"DISK": self.op.disk,
"AMOUNT": self.op.amount,
+ "ABSOLUTE": self.op.absolute,
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
return env
self.disk = instance.FindDisk(self.op.disk)
+ if self.op.absolute:
+ self.target = self.op.amount
+ self.delta = self.target - self.disk.size
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested size (%s) is smaller than "
+ "current disk size (%s)" %
+ (utils.FormatUnit(self.target, "h"),
+ utils.FormatUnit(self.disk.size, "h")),
+ errors.ECODE_STATE)
+ else:
+ self.delta = self.op.amount
+ self.target = self.disk.size + self.delta
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested increment (%s) is negative" %
+ utils.FormatUnit(self.delta, "h"),
+ errors.ECODE_INVAL)
+
if instance.disk_template not in (constants.DT_FILE,
- constants.DT_SHARED_FILE):
+ constants.DT_SHARED_FILE,
+ constants.DT_RBD,
+ constants.DT_EXT):
# TODO: check the free disk space for file, when that feature will be
# supported
_CheckNodesFreeDiskPerVG(self, nodenames,
- self.disk.ComputeGrowth(self.op.amount))
+ self.disk.ComputeGrowth(self.delta))
def Exec(self, feedback_fn):
"""Execute disk grow.
instance = self.instance
disk = self.disk
+ assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+
disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
+ feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
+ (self.op.disk, instance.name,
+ utils.FormatUnit(self.delta, "h"),
+ utils.FormatUnit(self.target, "h")))
+
# First run all grow ops in dry-run mode
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ True)
result.Raise("Grow request failed to node %s" % node)
# We know that (as far as we can test) operations across different
# nodes will succeed, time to run it for real
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ False)
result.Raise("Grow request failed to node %s" % node)
# TODO: Rewrite code to work properly
# time is a work-around.
time.sleep(5)
- disk.RecordGrow(self.op.amount)
+ disk.RecordGrow(self.delta)
self.cfg.Update(instance, feedback_fn)
+
+ # Changes have been recorded, release node lock
+ _ReleaseLocks(self, locking.LEVEL_NODE)
+
+ # Downgrade lock while waiting for sync
+ self.glm.downgrade(locking.LEVEL_INSTANCE)
+
if self.op.wait_for_sync:
disk_abort = not _WaitForSync(self, instance, disks=[disk])
if disk_abort:
self.proc.LogWarning("Disk sync-ing has not returned a good"
" status; please check the instance")
- if not instance.admin_up:
+ if instance.admin_state != constants.ADMINST_UP:
_SafeShutdownInstanceDisks(self, instance, disks=[disk])
- elif not instance.admin_up:
+ elif instance.admin_state != constants.ADMINST_UP:
self.proc.LogWarning("Not shutting down the disk even if the instance is"
" not supposed to be running because no wait for"
" sync mode was requested")
+ assert self.owned_locks(locking.LEVEL_NODE_RES)
+ assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+
class LUInstanceQueryData(NoHooksLU):
"""Query runtime instance data.
else:
self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
- if self.op.use_locking and level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
+ if self.op.use_locking:
+ if level == locking.LEVEL_NODEGROUP:
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+
+ # Lock all groups used by instances optimistically; this requires going
+ # via the node before it's locked, requiring verification later on
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ frozenset(group_uuid
+ for instance_name in owned_instances
+ for group_uuid in
+ self.cfg.GetInstanceNodeGroups(instance_name))
+
+ elif level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
def CheckPrereq(self):
"""Check prerequisites.
This only checks the optional instance list against the existing names.
"""
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
+
if self.wanted_names is None:
assert self.op.use_locking, "Locking was not used"
- self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
+ self.wanted_names = owned_instances
- self.wanted_instances = \
- map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
+ instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
+
+ if self.op.use_locking:
+ _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
+ None)
+ else:
+ assert not (owned_instances or owned_groups or owned_nodes)
- def _ComputeBlockdevStatus(self, node, instance_name, dev):
+ self.wanted_instances = instances.values()
+
+ def _ComputeBlockdevStatus(self, node, instance, dev):
"""Returns the status of a block device
"""
if result.offline:
return None
- result.Raise("Can't compute disk status for %s" % instance_name)
+ result.Raise("Can't compute disk status for %s" % instance.name)
status = result.payload
if status is None:
"""Compute block device status.
"""
+ (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
+
+ return self._ComputeDiskStatusInner(instance, snode, anno_dev)
+
+ def _ComputeDiskStatusInner(self, instance, snode, dev):
+ """Compute block device status.
+
+ @attention: The device has to be annotated already.
+
+ """
if dev.dev_type in constants.LDS_DRBD:
# we change the snode then (otherwise we use the one passed in)
if dev.logical_id[0] == instance.primary_node:
snode = dev.logical_id[0]
dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
- instance.name, dev)
- dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
+ instance, dev)
+ dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
if dev.children:
- dev_children = map(compat.partial(self._ComputeDiskStatus,
+ dev_children = map(compat.partial(self._ComputeDiskStatusInner,
instance, snode),
dev.children)
else:
cluster = self.cfg.GetClusterInfo()
- pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node
- for i in self.wanted_instances)
- for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes):
+ node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
+ nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
+
+ groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
+ for node in nodes.values()))
+
+ group2name_fn = lambda uuid: groups[uuid].name
+
+ for instance in self.wanted_instances:
+ pnode = nodes[instance.primary_node]
+
if self.op.static or pnode.offline:
remote_state = None
if pnode.offline:
if remote_info and "state" in remote_info:
remote_state = "up"
else:
- remote_state = "down"
-
- if instance.admin_up:
- config_state = "up"
- else:
- config_state = "down"
+ if instance.admin_state == constants.ADMINST_UP:
+ remote_state = "down"
+ else:
+ remote_state = instance.admin_state
disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
instance.disks)
+ snodes_group_uuids = [nodes[snode_name].group
+ for snode_name in instance.secondary_nodes]
+
result[instance.name] = {
"name": instance.name,
- "config_state": config_state,
+ "config_state": instance.admin_state,
"run_state": remote_state,
"pnode": instance.primary_node,
+ "pnode_group_uuid": pnode.group,
+ "pnode_group_name": group2name_fn(pnode.group),
"snodes": instance.secondary_nodes,
+ "snodes_group_uuids": snodes_group_uuids,
+ "snodes_group_names": map(group2name_fn, snodes_group_uuids),
"os": instance.os,
# this happens to be the same format used for hooks
"nics": _NICListToTuple(self, instance.nics),
return result
+def PrepareContainerMods(mods, private_fn):
+ """Prepares a list of container modifications by adding a private data field.
+
+ @type mods: list of tuples; (operation, index, parameters)
+ @param mods: List of modifications
+ @type private_fn: callable or None
+ @param private_fn: Callable for constructing a private data field for a
+ modification
+ @rtype: list
+
+ """
+ if private_fn is None:
+ fn = lambda: None
+ else:
+ fn = private_fn
+
+ return [(op, idx, params, fn()) for (op, idx, params) in mods]
+
+
+#: Type description for changes as returned by L{ApplyContainerMods}'s
+#: callbacks
+_TApplyContModsCbChanges = \
+ ht.TMaybeListOf(ht.TAnd(ht.TIsLength(2), ht.TItems([
+ ht.TNonEmptyString,
+ ht.TAny,
+ ])))
+
+
+def ApplyContainerMods(kind, container, chgdesc, mods,
+ create_fn, modify_fn, remove_fn):
+ """Applies descriptions in C{mods} to C{container}.
+
+ @type kind: string
+ @param kind: One-word item description
+ @type container: list
+ @param container: Container to modify
+ @type chgdesc: None or list
+ @param chgdesc: List of applied changes
+ @type mods: list
+ @param mods: Modifications as returned by L{PrepareContainerMods}
+ @type create_fn: callable
+ @param create_fn: Callback for creating a new item (L{constants.DDM_ADD});
+ receives absolute item index, parameters and private data object as added
+ by L{PrepareContainerMods}, returns tuple containing new item and changes
+ as list
+ @type modify_fn: callable
+ @param modify_fn: Callback for modifying an existing item
+ (L{constants.DDM_MODIFY}); receives absolute item index, item, parameters
+ and private data object as added by L{PrepareContainerMods}, returns
+ changes as list
+ @type remove_fn: callable
+ @param remove_fn: Callback on removing item; receives absolute item index,
+ item and private data object as added by L{PrepareContainerMods}
+
+ """
+ for (op, idx, params, private) in mods:
+ if idx == -1:
+ # Append
+ absidx = len(container) - 1
+ elif idx < 0:
+ raise IndexError("Not accepting negative indices other than -1")
+ elif idx > len(container):
+ raise IndexError("Got %s index %s, but there are only %s" %
+ (kind, idx, len(container)))
+ else:
+ absidx = idx
+
+ changes = None
+
+ if op == constants.DDM_ADD:
+ # Calculate where item will be added
+ if idx == -1:
+ addidx = len(container)
+ else:
+ addidx = idx
+
+ if create_fn is None:
+ item = params
+ else:
+ (item, changes) = create_fn(addidx, params, private)
+
+ if idx == -1:
+ container.append(item)
+ else:
+ assert idx >= 0
+ assert idx <= len(container)
+ # list.insert does so before the specified index
+ container.insert(idx, item)
+ else:
+ # Retrieve existing item
+ try:
+ item = container[absidx]
+ except IndexError:
+ raise IndexError("Invalid %s index %s" % (kind, idx))
+
+ if op == constants.DDM_REMOVE:
+ assert not params
+
+ if remove_fn is not None:
+ remove_fn(absidx, item, private)
+
+ #TODO: include a hotplugged msg in changes
+ changes = [("%s/%s" % (kind, absidx), "remove")]
+
+ assert container[absidx] == item
+ del container[absidx]
+ elif op == constants.DDM_MODIFY:
+ if modify_fn is not None:
+ #TODO: include a hotplugged msg in changes
+ changes = modify_fn(absidx, item, params, private)
+
+ else:
+ raise errors.ProgrammerError("Unhandled operation '%s'" % op)
+
+ assert _TApplyContModsCbChanges(changes)
+
+ if not (chgdesc is None or changes is None):
+ chgdesc.extend(changes)
+
+
+def _UpdateIvNames(base_index, disks):
+ """Updates the C{iv_name} attribute of disks.
+
+ @type disks: list of L{objects.Disk}
+
+ """
+ for (idx, disk) in enumerate(disks):
+ disk.iv_name = "disk/%s" % (base_index + idx, )
+
+
+class _InstNicModPrivate:
+ """Data structure for network interface modifications.
+
+ Used by L{LUInstanceSetParams}.
+
+ """
+ def __init__(self):
+ self.params = None
+ self.filled = None
+
+
class LUInstanceSetParams(LogicalUnit):
"""Modifies an instances's parameters.
HTYPE = constants.HTYPE_INSTANCE
REQ_BGL = False
+ @staticmethod
+ def _UpgradeDiskNicMods(kind, mods, verify_fn):
+ assert ht.TList(mods)
+ assert not mods or len(mods[0]) in (2, 3)
+
+ if mods and len(mods[0]) == 2:
+ result = []
+
+ addremove = 0
+ for op, params in mods:
+ if op in (constants.DDM_ADD, constants.DDM_REMOVE):
+ result.append((op, -1, params))
+ addremove += 1
+
+ if addremove > 1:
+ raise errors.OpPrereqError("Only one %s add or remove operation is"
+ " supported at a time" % kind,
+ errors.ECODE_INVAL)
+ else:
+ result.append((constants.DDM_MODIFY, op, params))
+
+ assert verify_fn(result)
+ else:
+ result = mods
+
+ return result
+
+ @staticmethod
+ def _CheckMods(kind, mods, key_types, item_fn):
+ """Ensures requested disk/NIC modifications are valid.
+
+ """
+ for (op, _, params) in mods:
+ assert ht.TDict(params)
+
+ # If key_types is an empty dict, we assume we have an 'ext' template
+ # and thus do not ForceDictType
+ if key_types:
+ utils.ForceDictType(params, key_types)
+
+ if op == constants.DDM_REMOVE:
+ if params:
+ raise errors.OpPrereqError("No settings should be passed when"
+ " removing a %s" % kind,
+ errors.ECODE_INVAL)
+ elif op in (constants.DDM_ADD, constants.DDM_MODIFY):
+ item_fn(op, params)
+ else:
+ raise errors.ProgrammerError("Unhandled operation '%s'" % op)
+
+ @staticmethod
+ def _VerifyDiskModification(op, params):
+ """Verifies a disk modification.
+
+ """
+ if op == constants.DDM_ADD:
+ mode = params.setdefault(constants.IDISK_MODE, constants.DISK_RDWR)
+ if mode not in constants.DISK_ACCESS_SET:
+ raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
+ errors.ECODE_INVAL)
+
+ size = params.get(constants.IDISK_SIZE, None)
+ if size is None:
+ raise errors.OpPrereqError("Required disk parameter '%s' missing" %
+ constants.IDISK_SIZE, errors.ECODE_INVAL)
+
+ try:
+ size = int(size)
+ except (TypeError, ValueError), err:
+ raise errors.OpPrereqError("Invalid disk size parameter: %s" % err,
+ errors.ECODE_INVAL)
+
+ params[constants.IDISK_SIZE] = size
+
+ elif op == constants.DDM_MODIFY:
+ if constants.IDISK_SIZE in params:
+ raise errors.OpPrereqError("Disk size change not possible, use"
+ " grow-disk", errors.ECODE_INVAL)
+ if constants.IDISK_MODE not in params:
+ raise errors.OpPrereqError("Disk 'mode' is the only kind of"
+ " modification supported, but missing",
+ errors.ECODE_NOENT)
+ if len(params) > 1:
+ raise errors.OpPrereqError("Disk modification doesn't support"
+ " additional arbitrary parameters",
+ errors.ECODE_INVAL)
+
+ @staticmethod
+ def _VerifyNicModification(op, params):
+ """Verifies a network interface modification.
+
+ """
+ if op in (constants.DDM_ADD, constants.DDM_MODIFY):
+ ip = params.get(constants.INIC_IP, None)
+ req_net = params.get(constants.INIC_NETWORK, None)
+ link = params.get(constants.NIC_LINK, None)
+ mode = params.get(constants.NIC_MODE, None)
+ if req_net is not None:
+ if req_net.lower() == constants.VALUE_NONE:
+ params[constants.INIC_NETWORK] = None
+ req_net = None
+ elif link is not None or mode is not None:
+ raise errors.OpPrereqError("If network is given"
+ " mode or link should not",
+ errors.ECODE_INVAL)
+
+ if op == constants.DDM_ADD:
+ macaddr = params.get(constants.INIC_MAC, None)
+ if macaddr is None:
+ params[constants.INIC_MAC] = constants.VALUE_AUTO
+
+ if ip is not None:
+ if ip.lower() == constants.VALUE_NONE:
+ params[constants.INIC_IP] = None
+ else:
+ if ip.lower() == constants.NIC_IP_POOL:
+ if op == constants.DDM_ADD and req_net is None:
+ raise errors.OpPrereqError("If ip=pool, parameter network"
+ " cannot be none",
+ errors.ECODE_INVAL)
+ else:
+ if not netutils.IPAddress.IsValid(ip):
+ raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
+ errors.ECODE_INVAL)
+
+ if constants.INIC_MAC in params:
+ macaddr = params[constants.INIC_MAC]
+ if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ macaddr = utils.NormalizeAndValidateMac(macaddr)
+
+ if op == constants.DDM_MODIFY and macaddr == constants.VALUE_AUTO:
+ raise errors.OpPrereqError("'auto' is not a valid MAC address when"
+ " modifying an existing NIC",
+ errors.ECODE_INVAL)
+
def CheckArguments(self):
if not (self.op.nics or self.op.disks or self.op.disk_template or
- self.op.hvparams or self.op.beparams or self.op.os_name):
+ self.op.hvparams or self.op.beparams or self.op.os_name or
+ self.op.offline is not None or self.op.runtime_mem):
raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
if self.op.hvparams:
_CheckGlobalHvParams(self.op.hvparams)
- # Disk validation
- disk_addremove = 0
- for disk_op, disk_dict in self.op.disks:
- utils.ForceDictType(disk_dict, constants.IDISK_PARAMS_TYPES)
- if disk_op == constants.DDM_REMOVE:
- disk_addremove += 1
- continue
- elif disk_op == constants.DDM_ADD:
- disk_addremove += 1
- else:
- if not isinstance(disk_op, int):
- raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
- if not isinstance(disk_dict, dict):
- msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
- raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
-
- if disk_op == constants.DDM_ADD:
- mode = disk_dict.setdefault(constants.IDISK_MODE, constants.DISK_RDWR)
- if mode not in constants.DISK_ACCESS_SET:
- raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
- errors.ECODE_INVAL)
- size = disk_dict.get(constants.IDISK_SIZE, None)
- if size is None:
- raise errors.OpPrereqError("Required disk parameter size missing",
- errors.ECODE_INVAL)
- try:
- size = int(size)
- except (TypeError, ValueError), err:
- raise errors.OpPrereqError("Invalid disk size parameter: %s" %
- str(err), errors.ECODE_INVAL)
- disk_dict[constants.IDISK_SIZE] = size
- else:
- # modification of disk
- if constants.IDISK_SIZE in disk_dict:
- raise errors.OpPrereqError("Disk size change not possible, use"
- " grow-disk", errors.ECODE_INVAL)
-
- if disk_addremove > 1:
- raise errors.OpPrereqError("Only one disk add or remove operation"
- " supported at a time", errors.ECODE_INVAL)
+ if self.op.allow_arbit_params:
+ self.op.disks = \
+ self._UpgradeDiskNicMods("disk", self.op.disks,
+ opcodes.OpInstanceSetParams.TestExtDiskModifications)
+ else:
+ self.op.disks = \
+ self._UpgradeDiskNicMods("disk", self.op.disks,
+ opcodes.OpInstanceSetParams.TestDiskModifications)
+
+ self.op.nics = \
+ self._UpgradeDiskNicMods("NIC", self.op.nics,
+ opcodes.OpInstanceSetParams.TestNicModifications)
+
+ # Check disk modifications
+ if self.op.allow_arbit_params:
+ self._CheckMods("disk", self.op.disks, {},
+ self._VerifyDiskModification)
+ else:
+ self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES,
+ self._VerifyDiskModification)
if self.op.disks and self.op.disk_template is not None:
raise errors.OpPrereqError("Disk template conversion and other disk"
" one requires specifying a secondary node",
errors.ECODE_INVAL)
- # NIC validation
- nic_addremove = 0
- for nic_op, nic_dict in self.op.nics:
- utils.ForceDictType(nic_dict, constants.INIC_PARAMS_TYPES)
- if nic_op == constants.DDM_REMOVE:
- nic_addremove += 1
- continue
- elif nic_op == constants.DDM_ADD:
- nic_addremove += 1
- else:
- if not isinstance(nic_op, int):
- raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
- if not isinstance(nic_dict, dict):
- msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
- raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
-
- # nic_dict should be a dict
- nic_ip = nic_dict.get(constants.INIC_IP, None)
- if nic_ip is not None:
- if nic_ip.lower() == constants.VALUE_NONE:
- nic_dict[constants.INIC_IP] = None
- else:
- if not netutils.IPAddress.IsValid(nic_ip):
- raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
- errors.ECODE_INVAL)
-
- nic_bridge = nic_dict.get("bridge", None)
- nic_link = nic_dict.get(constants.INIC_LINK, None)
- if nic_bridge and nic_link:
- raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
- " at the same time", errors.ECODE_INVAL)
- elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
- nic_dict["bridge"] = None
- elif nic_link and nic_link.lower() == constants.VALUE_NONE:
- nic_dict[constants.INIC_LINK] = None
-
- if nic_op == constants.DDM_ADD:
- nic_mac = nic_dict.get(constants.INIC_MAC, None)
- if nic_mac is None:
- nic_dict[constants.INIC_MAC] = constants.VALUE_AUTO
-
- if constants.INIC_MAC in nic_dict:
- nic_mac = nic_dict[constants.INIC_MAC]
- if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- nic_mac = utils.NormalizeAndValidateMac(nic_mac)
-
- if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
- raise errors.OpPrereqError("'auto' is not a valid MAC address when"
- " modifying an existing nic",
- errors.ECODE_INVAL)
-
- if nic_addremove > 1:
- raise errors.OpPrereqError("Only one NIC add or remove operation"
- " supported at a time", errors.ECODE_INVAL)
+ # Check NIC modifications
+ self._CheckMods("NIC", self.op.nics, constants.INIC_PARAMS_TYPES,
+ self._VerifyNicModification)
def ExpandNames(self):
self._ExpandAndLockInstance()
+ # Can't even acquire node locks in shared mode as upcoming changes in
+ # Ganeti 2.6 will start to modify the node object on disk conversion
self.needed_locks[locking.LEVEL_NODE] = []
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
+ # TODO: Acquire group lock in shared mode (disk parameters)
if level == locking.LEVEL_NODE:
self._LockInstancesNodes()
if self.op.disk_template and self.op.remote_node:
self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node)
+ elif level == locking.LEVEL_NODE_RES and self.op.disk_template:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE][:]
def BuildHooksEnv(self):
"""Build hooks env.
"""
args = dict()
- if constants.BE_MEMORY in self.be_new:
- args["memory"] = self.be_new[constants.BE_MEMORY]
+ if constants.BE_MINMEM in self.be_new:
+ args["minmem"] = self.be_new[constants.BE_MINMEM]
+ if constants.BE_MAXMEM in self.be_new:
+ args["maxmem"] = self.be_new[constants.BE_MAXMEM]
if constants.BE_VCPUS in self.be_new:
args["vcpus"] = self.be_new[constants.BE_VCPUS]
# TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
# information at all.
- if self.op.nics:
- args["nics"] = []
- nic_override = dict(self.op.nics)
- for idx, nic in enumerate(self.instance.nics):
- if idx in nic_override:
- this_nic_override = nic_override[idx]
- else:
- this_nic_override = {}
- if constants.INIC_IP in this_nic_override:
- ip = this_nic_override[constants.INIC_IP]
- else:
- ip = nic.ip
- if constants.INIC_MAC in this_nic_override:
- mac = this_nic_override[constants.INIC_MAC]
- else:
- mac = nic.mac
- if idx in self.nic_pnew:
- nicparams = self.nic_pnew[idx]
- else:
- nicparams = self.cluster.SimpleFillNIC(nic.nicparams)
- mode = nicparams[constants.NIC_MODE]
- link = nicparams[constants.NIC_LINK]
- args["nics"].append((ip, mac, mode, link))
- if constants.DDM_ADD in nic_override:
- ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None)
- mac = nic_override[constants.DDM_ADD][constants.INIC_MAC]
- nicparams = self.nic_pnew[constants.DDM_ADD]
- mode = nicparams[constants.NIC_MODE]
- link = nicparams[constants.NIC_LINK]
- args["nics"].append((ip, mac, mode, link))
- elif constants.DDM_REMOVE in nic_override:
- del args["nics"][-1]
+
+ if self._new_nics is not None:
+ nics = []
+
+ for nic in self._new_nics:
+ n = copy.deepcopy(nic)
+ nicparams = self.cluster.SimpleFillNIC(n.nicparams)
+ n.nicparams = nicparams
+ nics.append(_NICToTuple(self, n))
+
+ args["nics"] = nics
env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
if self.op.disk_template:
env["NEW_DISK_TEMPLATE"] = self.op.disk_template
+ if self.op.runtime_mem:
+ env["RUNTIME_MEMORY"] = self.op.runtime_mem
return env
nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
return (nl, nl)
+ def _PrepareNicModification(self, params, private, old_ip, old_net,
+ old_params, cluster, pnode):
+
+ update_params_dict = dict([(key, params[key])
+ for key in constants.NICS_PARAMETERS
+ if key in params])
+
+ req_link = update_params_dict.get(constants.NIC_LINK, None)
+ req_mode = update_params_dict.get(constants.NIC_MODE, None)
+
+ new_net = params.get(constants.INIC_NETWORK, old_net)
+ if new_net is not None:
+ netparams = self.cfg.GetGroupNetParams(new_net, pnode)
+ if netparams is None:
+ raise errors.OpPrereqError("No netparams found for the network"
+ " %s, propably not connected." % new_net,
+ errors.ECODE_INVAL)
+ new_params = dict(netparams)
+ else:
+ new_params = _GetUpdatedParams(old_params, update_params_dict)
+
+ utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES)
+
+ new_filled_params = cluster.SimpleFillNIC(new_params)
+ objects.NIC.CheckParameterSyntax(new_filled_params)
+
+ new_mode = new_filled_params[constants.NIC_MODE]
+ if new_mode == constants.NIC_MODE_BRIDGED:
+ bridge = new_filled_params[constants.NIC_LINK]
+ msg = self.rpc.call_bridges_exist(pnode, [bridge]).fail_msg
+ if msg:
+ msg = "Error checking bridges on node '%s': %s" % (pnode, msg)
+ if self.op.force:
+ self.warn.append(msg)
+ else:
+ raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
+
+ elif new_mode == constants.NIC_MODE_ROUTED:
+ ip = params.get(constants.INIC_IP, old_ip)
+ if ip is None:
+ raise errors.OpPrereqError("Cannot set the NIC IP address to None"
+ " on a routed NIC", errors.ECODE_INVAL)
+
+ if constants.INIC_MAC in params:
+ mac = params[constants.INIC_MAC]
+ if mac is None:
+ raise errors.OpPrereqError("Cannot unset the NIC MAC address",
+ errors.ECODE_INVAL)
+ elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ # otherwise generate the MAC address
+ params[constants.INIC_MAC] = \
+ self.cfg.GenerateMAC(new_net, self.proc.GetECId())
+ else:
+ # or validate/reserve the current one
+ try:
+ self.cfg.ReserveMAC(mac, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("MAC address '%s' already in use"
+ " in cluster" % mac,
+ errors.ECODE_NOTUNIQUE)
+ elif new_net != old_net:
+ def get_net_prefix(net):
+ if net:
+ uuid = self.cfg.LookupNetwork(net)
+ if uuid:
+ nobj = self.cfg.GetNetwork(uuid)
+ return nobj.mac_prefix
+ return None
+ new_prefix = get_net_prefix(new_net)
+ old_prefix = get_net_prefix(old_net)
+ if old_prefix != new_prefix:
+ params[constants.INIC_MAC] = \
+ self.cfg.GenerateMAC(new_net, self.proc.GetECId())
+
+ #if there is a change in nic-network configuration
+ new_ip = params.get(constants.INIC_IP, old_ip)
+ if (new_ip, new_net) != (old_ip, old_net):
+ if new_ip:
+ if new_net:
+ if new_ip.lower() == constants.NIC_IP_POOL:
+ try:
+ new_ip = self.cfg.GenerateIp(new_net, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("Unable to get a free IP"
+ " from the address pool",
+ errors.ECODE_STATE)
+ self.LogInfo("Chose IP %s from pool %s", new_ip, new_net)
+ params[constants.INIC_IP] = new_ip
+ elif new_ip != old_ip or new_net != old_net:
+ try:
+ self.LogInfo("Reserving IP %s in pool %s", new_ip, new_net)
+ self.cfg.ReserveIp(new_net, new_ip, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("IP %s not available in network %s" %
+ (new_ip, new_net),
+ errors.ECODE_NOTUNIQUE)
+ elif new_ip.lower() == constants.NIC_IP_POOL:
+ raise errors.OpPrereqError("ip=pool, but no network found",
+ ECODEE_INVAL)
+ else:
+ # new net is None
+ if self.op.conflicts_check:
+ _CheckForConflictingIp(self, new_ip, pnode)
+
+ if old_ip:
+ if old_net:
+ try:
+ self.cfg.ReleaseIp(old_net, old_ip, self.proc.GetECId())
+ except errors.AddressPoolError:
+ logging.warning("Release IP %s not contained in network %s",
+ old_ip, old_net)
+
+ # there are no changes in (net, ip) tuple
+ elif (old_net is not None and
+ (req_link is not None or req_mode is not None)):
+ raise errors.OpPrereqError("Not allowed to change link or mode of"
+ " a NIC that is connected to a network.",
+ errors.ECODE_INVAL)
+
+ logging.info("new_params %s", new_params)
+ logging.info("new_filled_params %s", new_filled_params)
+ private.params = new_params
+ private.filled = new_filled_params
+
def CheckPrereq(self):
"""Check prerequisites.
"Cannot retrieve locked instance %s" % self.op.instance_name
pnode = instance.primary_node
nodelist = list(instance.all_nodes)
+ pnode_info = self.cfg.GetNodeInfo(pnode)
+ self.diskparams = self.cfg.GetInstanceDiskParams(instance)
+
+ # Prepare disk/NIC modifications
+ self.diskmod = PrepareContainerMods(self.op.disks, None)
+ self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate)
+ logging.info("nicmod %s", self.nicmod)
+
+ # Check the validity of the `provider' parameter
+ if instance.disk_template in constants.DT_EXT:
+ for mod in self.diskmod:
+ ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
+ if mod[0] == constants.DDM_ADD:
+ if ext_provider is None:
+ raise errors.OpPrereqError("Instance template is '%s' and parameter"
+ " '%s' missing, during disk add" %
+ (constants.DT_EXT,
+ constants.IDISK_PROVIDER),
+ errors.ECODE_NOENT)
+ elif mod[0] == constants.DDM_MODIFY:
+ if ext_provider:
+ raise errors.OpPrereqError("Parameter '%s' is invalid during disk"
+ " modification" % constants.IDISK_PROVIDER,
+ errors.ECODE_INVAL)
+ else:
+ for mod in self.diskmod:
+ ext_provider = mod[2].get(constants.IDISK_PROVIDER, None)
+ if ext_provider is not None:
+ raise errors.OpPrereqError("Parameter '%s' is only valid for instances"
+ " of type '%s'" % (constants.IDISK_PROVIDER,
+ constants.DT_EXT), errors.ECODE_INVAL)
# OS change
if self.op.os_name and not self.op.force:
else:
instance_os = instance.os
+ assert not (self.op.disk_template and self.op.disks), \
+ "Can't modify disk template and apply disk changes at the same time"
+
if self.op.disk_template:
if instance.disk_template == self.op.disk_template:
raise errors.OpPrereqError("Instance already has disk template %s" %
" %s to %s" % (instance.disk_template,
self.op.disk_template),
errors.ECODE_INVAL)
- _CheckInstanceDown(self, instance, "cannot change disk template")
+ _CheckInstanceState(self, instance, INSTANCE_DOWN,
+ msg="cannot change disk template")
if self.op.disk_template in constants.DTS_INT_MIRROR:
if self.op.remote_node == pnode:
raise errors.OpPrereqError("Given new secondary node %s is the same"
required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
_CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
+ snode_info = self.cfg.GetNodeInfo(self.op.remote_node)
+ snode_group = self.cfg.GetNodeGroup(snode_info.group)
+ ipolicy = _CalculateGroupIPolicy(cluster, snode_group)
+ _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info,
+ ignore=self.op.ignore_ipolicy)
+ if pnode_info.group != snode_info.group:
+ self.LogWarning("The primary and secondary nodes are in two"
+ " different node groups; the disk parameters"
+ " from the first disk's node group will be"
+ " used")
+
# hvparams processing
if self.op.hvparams:
hv_type = instance.hypervisor
# local check
hypervisor.GetHypervisor(hv_type).CheckParameterSyntax(hv_new)
_CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
- self.hv_new = hv_new # the new actual values
+ self.hv_proposed = self.hv_new = hv_new # the new actual values
self.hv_inst = i_hvdict # the new dict (without defaults)
else:
+ self.hv_proposed = cluster.SimpleFillHV(instance.hypervisor, instance.os,
+ instance.hvparams)
self.hv_new = self.hv_inst = {}
# beparams processing
if self.op.beparams:
i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams,
use_none=True)
+ objects.UpgradeBeParams(i_bedict)
utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
be_new = cluster.SimpleFillBE(i_bedict)
- self.be_new = be_new # the new actual values
+ self.be_proposed = self.be_new = be_new # the new actual values
self.be_inst = i_bedict # the new dict (without defaults)
else:
self.be_new = self.be_inst = {}
+ self.be_proposed = cluster.SimpleFillBE(instance.beparams)
be_old = cluster.FillBE(instance)
+ # CPU param validation -- checking every time a parameter is
+ # changed to cover all cases where either CPU mask or vcpus have
+ # changed
+ if (constants.BE_VCPUS in self.be_proposed and
+ constants.HV_CPU_MASK in self.hv_proposed):
+ cpu_list = \
+ utils.ParseMultiCpuMask(self.hv_proposed[constants.HV_CPU_MASK])
+ # Verify mask is consistent with number of vCPUs. Can skip this
+ # test if only 1 entry in the CPU mask, which means same mask
+ # is applied to all vCPUs.
+ if (len(cpu_list) > 1 and
+ len(cpu_list) != self.be_proposed[constants.BE_VCPUS]):
+ raise errors.OpPrereqError("Number of vCPUs [%d] does not match the"
+ " CPU mask [%s]" %
+ (self.be_proposed[constants.BE_VCPUS],
+ self.hv_proposed[constants.HV_CPU_MASK]),
+ errors.ECODE_INVAL)
+
+ # Only perform this test if a new CPU mask is given
+ if constants.HV_CPU_MASK in self.hv_new:
+ # Calculate the largest CPU number requested
+ max_requested_cpu = max(map(max, cpu_list))
+ # Check that all of the instance's nodes have enough physical CPUs to
+ # satisfy the requested CPU mask
+ _CheckNodesPhysicalCPUs(self, instance.all_nodes,
+ max_requested_cpu + 1, instance.hypervisor)
+
# osparams processing
if self.op.osparams:
i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
self.warn = []
- if (constants.BE_MEMORY in self.op.beparams and not self.op.force and
- be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]):
+ #TODO(dynmem): do the appropriate check involving MINMEM
+ if (constants.BE_MAXMEM in self.op.beparams and not self.op.force and
+ be_new[constants.BE_MAXMEM] > be_old[constants.BE_MAXMEM]):
mem_check_list = [pnode]
if be_new[constants.BE_AUTO_BALANCE]:
# either we changed auto_balance to yes or it was from before
instance_info = self.rpc.call_instance_info(pnode, instance.name,
instance.hypervisor)
nodeinfo = self.rpc.call_node_info(mem_check_list, None,
- instance.hypervisor)
+ [instance.hypervisor])
pninfo = nodeinfo[pnode]
msg = pninfo.fail_msg
if msg:
# Assume the primary node is unreachable and go ahead
self.warn.append("Can't get info from primary node %s: %s" %
(pnode, msg))
- elif not isinstance(pninfo.payload.get("memory_free", None), int):
- self.warn.append("Node data from primary node %s doesn't contain"
- " free memory information" % pnode)
- elif instance_info.fail_msg:
- self.warn.append("Can't get instance runtime information: %s" %
- instance_info.fail_msg)
else:
- if instance_info.payload:
- current_mem = int(instance_info.payload["memory"])
+ (_, _, (pnhvinfo, )) = pninfo.payload
+ if not isinstance(pnhvinfo.get("memory_free", None), int):
+ self.warn.append("Node data from primary node %s doesn't contain"
+ " free memory information" % pnode)
+ elif instance_info.fail_msg:
+ self.warn.append("Can't get instance runtime information: %s" %
+ instance_info.fail_msg)
else:
- # Assume instance not running
- # (there is a slight race condition here, but it's not very probable,
- # and we have no other way to check)
- current_mem = 0
- miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
- pninfo.payload["memory_free"])
- if miss_mem > 0:
- raise errors.OpPrereqError("This change will prevent the instance"
- " from starting, due to %d MB of memory"
- " missing on its primary node" % miss_mem,
- errors.ECODE_NORES)
+ if instance_info.payload:
+ current_mem = int(instance_info.payload["memory"])
+ else:
+ # Assume instance not running
+ # (there is a slight race condition here, but it's not very
+ # probable, and we have no other way to check)
+ # TODO: Describe race condition
+ current_mem = 0
+ #TODO(dynmem): do the appropriate check involving MINMEM
+ miss_mem = (be_new[constants.BE_MAXMEM] - current_mem -
+ pnhvinfo["memory_free"])
+ if miss_mem > 0:
+ raise errors.OpPrereqError("This change will prevent the instance"
+ " from starting, due to %d MB of memory"
+ " missing on its primary node" %
+ miss_mem,
+ errors.ECODE_NORES)
if be_new[constants.BE_AUTO_BALANCE]:
for node, nres in nodeinfo.items():
continue
nres.Raise("Can't get info from secondary node %s" % node,
prereq=True, ecode=errors.ECODE_STATE)
- if not isinstance(nres.payload.get("memory_free", None), int):
+ (_, _, (nhvinfo, )) = nres.payload
+ if not isinstance(nhvinfo.get("memory_free", None), int):
raise errors.OpPrereqError("Secondary node %s didn't return free"
" memory information" % node,
errors.ECODE_STATE)
- elif be_new[constants.BE_MEMORY] > nres.payload["memory_free"]:
+ #TODO(dynmem): do the appropriate check involving MINMEM
+ elif be_new[constants.BE_MAXMEM] > nhvinfo["memory_free"]:
raise errors.OpPrereqError("This change will prevent the instance"
" from failover to its secondary node"
" %s, due to not enough memory" % node,
errors.ECODE_STATE)
- # NIC processing
- self.nic_pnew = {}
- self.nic_pinst = {}
- for nic_op, nic_dict in self.op.nics:
- if nic_op == constants.DDM_REMOVE:
- if not instance.nics:
- raise errors.OpPrereqError("Instance has no NICs, cannot remove",
- errors.ECODE_INVAL)
- continue
- if nic_op != constants.DDM_ADD:
- # an existing nic
- if not instance.nics:
- raise errors.OpPrereqError("Invalid NIC index %s, instance has"
- " no NICs" % nic_op,
- errors.ECODE_INVAL)
- if nic_op < 0 or nic_op >= len(instance.nics):
- raise errors.OpPrereqError("Invalid NIC index %s, valid values"
- " are 0 to %d" %
- (nic_op, len(instance.nics) - 1),
- errors.ECODE_INVAL)
- old_nic_params = instance.nics[nic_op].nicparams
- old_nic_ip = instance.nics[nic_op].ip
- else:
- old_nic_params = {}
- old_nic_ip = None
-
- update_params_dict = dict([(key, nic_dict[key])
- for key in constants.NICS_PARAMETERS
- if key in nic_dict])
-
- if "bridge" in nic_dict:
- update_params_dict[constants.NIC_LINK] = nic_dict["bridge"]
-
- new_nic_params = _GetUpdatedParams(old_nic_params,
- update_params_dict)
- utils.ForceDictType(new_nic_params, constants.NICS_PARAMETER_TYPES)
- new_filled_nic_params = cluster.SimpleFillNIC(new_nic_params)
- objects.NIC.CheckParameterSyntax(new_filled_nic_params)
- self.nic_pinst[nic_op] = new_nic_params
- self.nic_pnew[nic_op] = new_filled_nic_params
- new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
-
- if new_nic_mode == constants.NIC_MODE_BRIDGED:
- nic_bridge = new_filled_nic_params[constants.NIC_LINK]
- msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
- if msg:
- msg = "Error checking bridges on node %s: %s" % (pnode, msg)
- if self.op.force:
- self.warn.append(msg)
- else:
- raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
- if new_nic_mode == constants.NIC_MODE_ROUTED:
- if constants.INIC_IP in nic_dict:
- nic_ip = nic_dict[constants.INIC_IP]
- else:
- nic_ip = old_nic_ip
- if nic_ip is None:
- raise errors.OpPrereqError("Cannot set the nic ip to None"
- " on a routed nic", errors.ECODE_INVAL)
- if constants.INIC_MAC in nic_dict:
- nic_mac = nic_dict[constants.INIC_MAC]
- if nic_mac is None:
- raise errors.OpPrereqError("Cannot set the nic mac to None",
- errors.ECODE_INVAL)
- elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- # otherwise generate the mac
- nic_dict[constants.INIC_MAC] = \
- self.cfg.GenerateMAC(self.proc.GetECId())
- else:
- # or validate/reserve the current one
- try:
- self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
- except errors.ReservationError:
- raise errors.OpPrereqError("MAC address %s already in use"
- " in cluster" % nic_mac,
- errors.ECODE_NOTUNIQUE)
+ if self.op.runtime_mem:
+ remote_info = self.rpc.call_instance_info(instance.primary_node,
+ instance.name,
+ instance.hypervisor)
+ remote_info.Raise("Error checking node %s" % instance.primary_node)
+ if not remote_info.payload: # not running already
+ raise errors.OpPrereqError("Instance %s is not running" % instance.name,
+ errors.ECODE_STATE)
+
+ current_memory = remote_info.payload["memory"]
+ if (not self.op.force and
+ (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or
+ self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])):
+ raise errors.OpPrereqError("Instance %s must have memory between %d"
+ " and %d MB of memory unless --force is"
+ " given" % (instance.name,
+ self.be_proposed[constants.BE_MINMEM],
+ self.be_proposed[constants.BE_MAXMEM]),
+ errors.ECODE_INVAL)
+
+ if self.op.runtime_mem > current_memory:
+ _CheckNodeFreeMemory(self, instance.primary_node,
+ "ballooning memory for instance %s" %
+ instance.name,
+ self.op.memory - current_memory,
+ instance.hypervisor)
- # DISK processing
if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Disk operations not supported for"
" diskless instances",
errors.ECODE_INVAL)
- for disk_op, _ in self.op.disks:
- if disk_op == constants.DDM_REMOVE:
- if len(instance.disks) == 1:
- raise errors.OpPrereqError("Cannot remove the last disk of"
- " an instance", errors.ECODE_INVAL)
- _CheckInstanceDown(self, instance, "cannot remove disks")
-
- if (disk_op == constants.DDM_ADD and
- len(instance.disks) >= constants.MAX_DISKS):
- raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
- " add more" % constants.MAX_DISKS,
- errors.ECODE_STATE)
- if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
- # an existing disk
- if disk_op < 0 or disk_op >= len(instance.disks):
- raise errors.OpPrereqError("Invalid disk index %s, valid values"
- " are 0 to %d" %
- (disk_op, len(instance.disks)),
- errors.ECODE_INVAL)
- return
+ def _PrepareNicCreate(_, params, private):
+ self._PrepareNicModification(params, private, None, None,
+ {}, cluster, pnode)
+ return (None, None)
+
+ def _PrepareNicMod(_, nic, params, private):
+ self._PrepareNicModification(params, private, nic.ip, nic.network,
+ nic.nicparams, cluster, pnode)
+ return None
+
+ def _PrepareNicRemove(_, params, private):
+ ip = params.ip
+ net = params.network
+ if net is not None and ip is not None:
+ self.cfg.ReleaseIp(net, ip, self.proc.GetECId())
+
+ # Verify NIC changes (operating on copy)
+ nics = instance.nics[:]
+ ApplyContainerMods("NIC", nics, None, self.nicmod,
+ _PrepareNicCreate, _PrepareNicMod, _PrepareNicRemove)
+ if len(nics) > constants.MAX_NICS:
+ raise errors.OpPrereqError("Instance has too many network interfaces"
+ " (%d), cannot add more" % constants.MAX_NICS,
+ errors.ECODE_STATE)
+
+
+ # Verify disk changes (operating on a copy)
+ disks = instance.disks[:]
+ ApplyContainerMods("disk", disks, None, self.diskmod,
+ None, None, None)
+ if len(disks) > constants.MAX_DISKS:
+ raise errors.OpPrereqError("Instance has too many disks (%d), cannot add"
+ " more" % constants.MAX_DISKS,
+ errors.ECODE_STATE)
+
+ if self.op.offline is not None:
+ if self.op.offline:
+ msg = "can't change to offline"
+ else:
+ msg = "can't change to online"
+ _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE, msg=msg)
+
+ # Pre-compute NIC changes (necessary to use result in hooks)
+ self._nic_chgdesc = []
+ if self.nicmod:
+ # Operate on copies as this is still in prereq
+ nics = [nic.Copy() for nic in instance.nics]
+ ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod,
+ self._CreateNewNic, self._ApplyNicMods,
+ self._RemoveNic)
+ self._new_nics = nics
+ else:
+ self._new_nics = None
+
def _ConvertPlainToDrbd(self, feedback_fn):
"""Converts an instance from plain to drbd.
pnode = instance.primary_node
snode = self.op.remote_node
+ assert instance.disk_template == constants.DT_PLAIN
+
# create a fake disk info for _GenerateDiskTemplate
disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
constants.IDISK_VG: d.logical_id[0]}
for d in instance.disks]
new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
instance.name, pnode, [snode],
- disk_info, None, None, 0, feedback_fn)
+ disk_info, None, None, 0, feedback_fn,
+ self.diskparams)
+ anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
+ self.diskparams)
info = _GetInstanceInfoText(instance)
- feedback_fn("Creating aditional volumes...")
+ feedback_fn("Creating additional volumes...")
# first, create the missing data and meta devices
- for disk in new_disks:
+ for disk in anno_disks:
# unfortunately this is... not too nice
_CreateSingleBlockDev(self, pnode, instance, disk.children[1],
info, True)
feedback_fn("Initializing DRBD devices...")
# all child devices are in place, we can now create the DRBD devices
- for disk in new_disks:
+ for disk in anno_disks:
for node in [pnode, snode]:
f_create = node == pnode
_CreateSingleBlockDev(self, node, instance, disk, info, f_create)
instance.disks = new_disks
self.cfg.Update(instance, feedback_fn)
+ # Release node locks while waiting for sync
+ _ReleaseLocks(self, locking.LEVEL_NODE)
+
# disks are created, waiting for sync
disk_abort = not _WaitForSync(self, instance,
oneshot=not self.op.wait_for_sync)
raise errors.OpExecError("There are some degraded disks for"
" this instance, please cleanup manually")
+ # Node resource locks will be released by caller
+
def _ConvertDrbdToPlain(self, feedback_fn):
"""Converts an instance from drbd to plain.
"""
instance = self.instance
+
assert len(instance.secondary_nodes) == 1
+ assert instance.disk_template == constants.DT_DRBD8
+
pnode = instance.primary_node
snode = instance.secondary_nodes[0]
feedback_fn("Converting template to plain")
- old_disks = instance.disks
- new_disks = [d.children[0] for d in old_disks]
+ old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
+ new_disks = [d.children[0] for d in instance.disks]
# copy over size and mode
for parent, child in zip(old_disks, new_disks):
child.size = parent.size
child.mode = parent.mode
+ # this is a DRBD disk, return its port to the pool
+ # NOTE: this must be done right before the call to cfg.Update!
+ for disk in old_disks:
+ tcp_port = disk.logical_id[2]
+ self.cfg.AddTcpUdpPort(tcp_port)
+
# update instance structure
instance.disks = new_disks
instance.disk_template = constants.DT_PLAIN
self.cfg.Update(instance, feedback_fn)
+ # Release locks in case removing disks takes a while
+ _ReleaseLocks(self, locking.LEVEL_NODE)
+
feedback_fn("Removing volumes on the secondary node...")
for disk in old_disks:
self.cfg.SetDiskID(disk, snode)
self.LogWarning("Could not remove metadata for disk %d on node %s,"
" continuing anyway: %s", idx, pnode, msg)
- # this is a DRBD disk, return its port to the pool
- for disk in old_disks:
- tcp_port = disk.logical_id[2]
- self.cfg.AddTcpUdpPort(tcp_port)
+ def _CreateNewDisk(self, idx, params, _):
+ """Creates a new disk.
+
+ """
+ instance = self.instance
+
+ # add a new disk
+ if instance.disk_template in constants.DTS_FILEBASED:
+ (file_driver, file_path) = instance.disks[0].logical_id
+ file_path = os.path.dirname(file_path)
+ else:
+ file_driver = file_path = None
+
+ disk = \
+ _GenerateDiskTemplate(self, instance.disk_template, instance.name,
+ instance.primary_node, instance.secondary_nodes,
+ [params], file_path, file_driver, idx,
+ self.Log, self.diskparams)[0]
+
+ info = _GetInstanceInfoText(instance)
+
+ logging.info("Creating volume %s for instance %s",
+ disk.iv_name, instance.name)
+ # Note: this needs to be kept in sync with _CreateDisks
+ #HARDCODE
+ for node in instance.all_nodes:
+ f_create = (node == instance.primary_node)
+ try:
+ _CreateBlockDev(self, node, instance, disk, f_create, info, f_create)
+ except errors.OpExecError, err:
+ self.LogWarning("Failed to create volume %s (%s) on node '%s': %s",
+ disk.iv_name, disk, node, err)
+
+ if self.op.hotplug and disk.pci:
+ self.LogInfo("Trying to hotplug device.")
+ disk_ok, device_info = _AssembleInstanceDisks(self, self.instance,
+ [disk], check=False)
+ _, _, dev_path = device_info[0]
+ result = self.rpc.call_hot_add_disk(self.instance.primary_node,
+ self.instance, disk, dev_path, idx)
+ return (disk, [
+ ("disk/%d" % idx, "add:size=%s,mode=%s" % (disk.size, disk.mode)),
+ ])
+
+ @staticmethod
+ def _ModifyDisk(idx, disk, params, _):
+ """Modifies a disk.
+
+ """
+ disk.mode = params[constants.IDISK_MODE]
+
+ return [
+ ("disk.mode/%d" % idx, disk.mode),
+ ]
+
+ def _RemoveDisk(self, idx, root, _):
+ """Removes a disk.
+
+ """
+ #TODO: log warning in case hotplug is not possible
+ # handle errors
+ if root.pci and not self.op.hotplug:
+ raise errors.OpPrereqError("Cannot remove a disk that has"
+ " been hotplugged"
+ " without removing it with hotplug",
+ errors.ECODE_INVAL)
+ if self.op.hotplug and root.pci:
+ self.LogInfo("Trying to hotplug device.")
+ self.rpc.call_hot_del_disk(self.instance.primary_node,
+ self.instance, root, idx)
+ _ShutdownInstanceDisks(self, self.instance, [root])
+ self.cfg.UpdatePCIInfo(self.instance.name, root.pci)
+
+ (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
+ for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
+ self.cfg.SetDiskID(disk, node)
+ msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
+ if msg:
+ self.LogWarning("Could not remove disk/%d on node '%s': %s,"
+ " continuing anyway", idx, node, msg)
+
+ # if this is a DRBD disk, return its port to the pool
+ if root.dev_type in constants.LDS_DRBD:
+ self.cfg.AddTcpUdpPort(root.logical_id[2])
+
+ def _CreateNewNic(self, idx, params, private):
+ """Creates data structure for a new network interface.
+
+ """
+ mac = params[constants.INIC_MAC]
+ ip = params.get(constants.INIC_IP, None)
+ network = params.get(constants.INIC_NETWORK, None)
+ #TODO: not private.filled?? can a nic have no nicparams??
+ nicparams = private.filled
+
+ nic = objects.NIC(mac=mac, ip=ip, network=network, nicparams=nicparams)
+
+ #TODO: log warning in case hotplug is not possible
+ # handle errors
+ # return changes
+ if self.op.hotplug:
+ nic_idx, pci = _GetPCIInfo(self, 'nics')
+ if pci is not None:
+ nic.idx = nic_idx
+ nic.pci = pci
+ result = self.rpc.call_hot_add_nic(self.instance.primary_node,
+ self.instance, nic, idx)
+ desc = [
+ ("nic.%d" % idx,
+ "add:mac=%s,ip=%s,mode=%s,link=%s,network=%s" %
+ (mac, ip, private.filled[constants.NIC_MODE],
+ private.filled[constants.NIC_LINK],
+ network)),
+ ]
+ return (nic, desc)
+
+ def _ApplyNicMods(self, idx, nic, params, private):
+ """Modifies a network interface.
+
+ """
+ changes = []
+
+ for key in [constants.INIC_MAC, constants.INIC_IP, constants.INIC_NETWORK]:
+ if key in params:
+ changes.append(("nic.%s/%d" % (key, idx), params[key]))
+ setattr(nic, key, params[key])
+
+ if private.filled:
+ nic.nicparams = private.filled
+
+ for (key, val) in nic.nicparams.items():
+ changes.append(("nic.%s/%d" % (key, idx), val))
+
+ #TODO: log warning in case hotplug is not possible
+ # handle errors
+ if self.op.hotplug and nic.pci:
+ self.LogInfo("Trying to hotplug device.")
+ self.rpc.call_hot_del_nic(self.instance.primary_node,
+ self.instance, nic, idx)
+ result = self.rpc.call_hot_add_nic(self.instance.primary_node,
+ self.instance, nic, idx)
+ return changes
+
+ def _RemoveNic(self, idx, nic, private):
+ if nic.pci and not self.op.hotplug:
+ raise errors.OpPrereqError("Cannot remove a nic that has been hotplugged"
+ " without removing it with hotplug",
+ errors.ECODE_INVAL)
+ #TODO: log warning in case hotplug is not possible
+ # handle errors
+ if self.op.hotplug and nic.pci:
+ self.LogInfo("Trying to hotplug device.")
+ self.rpc.call_hot_del_nic(self.instance.primary_node,
+ self.instance, nic, idx)
+ self.cfg.UpdatePCIInfo(self.instance.name, nic.pci)
+
def Exec(self, feedback_fn):
"""Modifies an instance.
"""
# Process here the warnings from CheckPrereq, as we don't have a
# feedback_fn there.
+ # TODO: Replace with self.LogWarning
for warn in self.warn:
feedback_fn("WARNING: %s" % warn)
+ assert ((self.op.disk_template is None) ^
+ bool(self.owned_locks(locking.LEVEL_NODE_RES))), \
+ "Not owning any node resource locks"
+
result = []
instance = self.instance
- # disk changes
- for disk_op, disk_dict in self.op.disks:
- if disk_op == constants.DDM_REMOVE:
- # remove the last disk
- device = instance.disks.pop()
- device_idx = len(instance.disks)
- for node, disk in device.ComputeNodeTree(instance.primary_node):
- self.cfg.SetDiskID(disk, node)
- msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
- if msg:
- self.LogWarning("Could not remove disk/%d on node %s: %s,"
- " continuing anyway", device_idx, node, msg)
- result.append(("disk/%d" % device_idx, "remove"))
-
- # if this is a DRBD disk, return its port to the pool
- if device.dev_type in constants.LDS_DRBD:
- tcp_port = device.logical_id[2]
- self.cfg.AddTcpUdpPort(tcp_port)
- elif disk_op == constants.DDM_ADD:
- # add a new disk
- if instance.disk_template in (constants.DT_FILE,
- constants.DT_SHARED_FILE):
- file_driver, file_path = instance.disks[0].logical_id
- file_path = os.path.dirname(file_path)
- else:
- file_driver = file_path = None
- disk_idx_base = len(instance.disks)
- new_disk = _GenerateDiskTemplate(self,
- instance.disk_template,
- instance.name, instance.primary_node,
- instance.secondary_nodes,
- [disk_dict],
- file_path,
- file_driver,
- disk_idx_base, feedback_fn)[0]
- instance.disks.append(new_disk)
- info = _GetInstanceInfoText(instance)
-
- logging.info("Creating volume %s for instance %s",
- new_disk.iv_name, instance.name)
- # Note: this needs to be kept in sync with _CreateDisks
- #HARDCODE
- for node in instance.all_nodes:
- f_create = node == instance.primary_node
- try:
- _CreateBlockDev(self, node, instance, new_disk,
- f_create, info, f_create)
- except errors.OpExecError, err:
- self.LogWarning("Failed to create volume %s (%s) on"
- " node %s: %s",
- new_disk.iv_name, new_disk, node, err)
- result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
- (new_disk.size, new_disk.mode)))
- else:
- # change a given disk
- instance.disks[disk_op].mode = disk_dict[constants.IDISK_MODE]
- result.append(("disk.mode/%d" % disk_op,
- disk_dict[constants.IDISK_MODE]))
+
+ # runtime memory
+ if self.op.runtime_mem:
+ rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
+ instance,
+ self.op.runtime_mem)
+ rpcres.Raise("Cannot modify instance runtime memory")
+ result.append(("runtime_memory", self.op.runtime_mem))
+
+ # Apply disk changes
+ ApplyContainerMods("disk", instance.disks, result, self.diskmod,
+ self._CreateNewDisk, self._ModifyDisk, self._RemoveDisk)
+ _UpdateIvNames(0, instance.disks)
if self.op.disk_template:
+ if __debug__:
+ check_nodes = set(instance.all_nodes)
+ if self.op.remote_node:
+ check_nodes.add(self.op.remote_node)
+ for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
+ owned = self.owned_locks(level)
+ assert not (check_nodes - owned), \
+ ("Not owning the correct locks, owning %r, expected at least %r" %
+ (owned, check_nodes))
+
r_shut = _ShutdownInstanceDisks(self, instance)
if not r_shut:
raise errors.OpExecError("Cannot shutdown instance disks, unable to"
raise
result.append(("disk_template", self.op.disk_template))
- # NIC changes
- for nic_op, nic_dict in self.op.nics:
- if nic_op == constants.DDM_REMOVE:
- # remove the last nic
- del instance.nics[-1]
- result.append(("nic.%d" % len(instance.nics), "remove"))
- elif nic_op == constants.DDM_ADD:
- # mac and bridge should be set, by now
- mac = nic_dict[constants.INIC_MAC]
- ip = nic_dict.get(constants.INIC_IP, None)
- nicparams = self.nic_pinst[constants.DDM_ADD]
- new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
- instance.nics.append(new_nic)
- result.append(("nic.%d" % (len(instance.nics) - 1),
- "add:mac=%s,ip=%s,mode=%s,link=%s" %
- (new_nic.mac, new_nic.ip,
- self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
- self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
- )))
- else:
- for key in (constants.INIC_MAC, constants.INIC_IP):
- if key in nic_dict:
- setattr(instance.nics[nic_op], key, nic_dict[key])
- if nic_op in self.nic_pinst:
- instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
- for key, val in nic_dict.iteritems():
- result.append(("nic.%s/%d" % (key, nic_op), val))
+ assert instance.disk_template == self.op.disk_template, \
+ ("Expected disk template '%s', found '%s'" %
+ (self.op.disk_template, instance.disk_template))
+
+ # Release node and resource locks if there are any (they might already have
+ # been released during disk conversion)
+ _ReleaseLocks(self, locking.LEVEL_NODE)
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES)
+
+ # Apply NIC changes
+ if self._new_nics is not None:
+ instance.nics = self._new_nics
+ result.extend(self._nic_chgdesc)
# hvparams changes
if self.op.hvparams:
for key, val in self.op.osparams.iteritems():
result.append(("os/%s" % key, val))
- self.cfg.Update(instance, feedback_fn)
+ if self.op.offline is None:
+ # Ignore
+ pass
+ elif self.op.offline:
+ # Mark instance as offline
+ self.cfg.MarkInstanceOffline(instance.name)
+ result.append(("admin_state", constants.ADMINST_OFFLINE))
+ else:
+ # Mark instance as online, but stopped
+ self.cfg.MarkInstanceDown(instance.name)
+ result.append(("admin_state", constants.ADMINST_DOWN))
+
+ self.cfg.Update(instance, feedback_fn, self.proc.GetECId())
+
+ assert not (self.owned_locks(locking.LEVEL_NODE_RES) or
+ self.owned_locks(locking.LEVEL_NODE)), \
+ "All node locks should have been released by now"
return result
"""
REQ_BGL = False
+ def CheckArguments(self):
+ self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
+ ["node", "export"], self.op.use_locking)
+
def ExpandNames(self):
- self.needed_locks = {}
- self.share_locks[locking.LEVEL_NODE] = 1
- if not self.op.nodes:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- else:
- self.needed_locks[locking.LEVEL_NODE] = \
- _GetWantedNodes(self, self.op.nodes)
+ self.expq.ExpandNames(self)
+
+ def DeclareLocks(self, level):
+ self.expq.DeclareLocks(self, level)
def Exec(self, feedback_fn):
- """Compute the list of all the exported system images.
+ result = {}
- @rtype: dict
- @return: a dictionary with the structure node->(export-list)
- where export-list is a list of the instances exported on
- that node.
+ for (node, expname) in self.expq.OldStyleQuery(self):
+ if expname is None:
+ result[node] = False
+ else:
+ result.setdefault(node, []).append(expname)
+
+ return result
+
+
+class _ExportQuery(_QueryBase):
+ FIELDS = query.EXPORT_FIELDS
+
+ #: The node name is not a unique key for this query
+ SORT_FIELD = "node"
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ # The following variables interact with _QueryBase._GetNames
+ if self.names:
+ self.wanted = _GetWantedNodes(lu, self.names)
+ else:
+ self.wanted = locking.ALL_SET
+
+ self.do_locking = self.use_locking
+
+ if self.do_locking:
+ lu.share_locks = _ShareAll()
+ lu.needed_locks = {
+ locking.LEVEL_NODE: self.wanted,
+ }
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
"""
- self.nodes = self.owned_locks(locking.LEVEL_NODE)
- rpcresult = self.rpc.call_export_list(self.nodes)
- result = {}
- for node in rpcresult:
- if rpcresult[node].fail_msg:
- result[node] = False
+ # Locking is not used
+ # TODO
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
+
+ result = []
+
+ for (node, nres) in lu.rpc.call_export_list(nodes).items():
+ if nres.fail_msg:
+ result.append((node, None))
else:
- result[node] = rpcresult[node].payload
+ result.extend((node, expname) for expname in nres.payload)
return result
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, self.instance.primary_node)
- if (self.op.remove_instance and self.instance.admin_up and
+ if (self.op.remove_instance and
+ self.instance.admin_state == constants.ADMINST_UP and
not self.op.shutdown):
raise errors.OpPrereqError("Can not remove instance without shutting it"
" down before")
for disk in instance.disks:
self.cfg.SetDiskID(disk, src_node)
- activate_disks = (not instance.admin_up)
+ activate_disks = (instance.admin_state != constants.ADMINST_UP)
if activate_disks:
# Activate the instance disks if we'exporting a stopped instance
helper.CreateSnapshots()
try:
- if (self.op.shutdown and instance.admin_up and
+ if (self.op.shutdown and
+ instance.admin_state == constants.ADMINST_UP and
not self.op.remove_instance):
assert not activate_disks
feedback_fn("Starting instance %s" % instance.name)
- result = self.rpc.call_instance_start(src_node, instance,
- None, None, False)
+ result = self.rpc.call_instance_start(src_node,
+ (instance, None, None), False)
msg = result.fail_msg
if msg:
feedback_fn("Failed to start instance: %s" % msg)
if self.op.ndparams:
utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
+ else:
+ self.new_hv_state = None
+
+ if self.op.disk_state:
+ self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+ else:
+ self.new_disk_state = None
+
+ if self.op.diskparams:
+ for templ in constants.DISK_TEMPLATES:
+ if templ in self.op.diskparams:
+ utils.ForceDictType(self.op.diskparams[templ],
+ constants.DISK_DT_TYPES)
+ self.new_diskparams = self.op.diskparams
+ try:
+ utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
+ else:
+ self.new_diskparams = {}
+
+ if self.op.ipolicy:
+ cluster = self.cfg.GetClusterInfo()
+ full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
+ try:
+ objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False)
+ except errors.ConfigurationError, err:
+ raise errors.OpPrereqError("Invalid instance policy: %s" % err,
+ errors.ECODE_INVAL)
+
def BuildHooksEnv(self):
"""Build hooks env.
group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
uuid=self.group_uuid,
alloc_policy=self.op.alloc_policy,
- ndparams=self.op.ndparams)
+ ndparams=self.op.ndparams,
+ diskparams=self.new_diskparams,
+ ipolicy=self.op.ipolicy,
+ hv_state_static=self.new_hv_state,
+ disk_state_static=self.new_disk_state)
self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
del self.remove_locks[locking.LEVEL_NODEGROUP]
lu.needed_locks = {}
self._all_groups = lu.cfg.GetAllNodeGroupsInfo()
+ self._cluster = lu.cfg.GetClusterInfo()
name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values())
if not self.names:
# Do not pass on node information if it was not requested.
group_to_nodes = None
- return query.GroupQueryData([self._all_groups[uuid]
+ return query.GroupQueryData(self._cluster,
+ [self._all_groups[uuid]
for uuid in self.wanted],
- group_to_nodes, group_to_instances)
+ group_to_nodes, group_to_instances,
+ query.GQ_DISKPARAMS in self.requested_data)
class LUGroupQuery(NoHooksLU):
def CheckArguments(self):
all_changes = [
self.op.ndparams,
+ self.op.diskparams,
self.op.alloc_policy,
+ self.op.hv_state,
+ self.op.disk_state,
+ self.op.ipolicy,
]
if all_changes.count(None) == len(all_changes):
self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
locking.LEVEL_NODEGROUP: [self.group_uuid],
}
+ self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once group lock has
+ # been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ @staticmethod
+ def _UpdateAndVerifyDiskParams(old, new):
+ """Updates and verifies disk parameters.
+
+ """
+ new_params = _GetUpdatedParams(old, new)
+ utils.ForceDictType(new_params, constants.DISK_DT_TYPES)
+ return new_params
+
def CheckPrereq(self):
"""Check prerequisites.
"""
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+
+ # Check if locked instances are still correct
+ _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
+
self.group = self.cfg.GetNodeGroup(self.group_uuid)
+ cluster = self.cfg.GetClusterInfo()
if self.group is None:
raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
if self.op.ndparams:
new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams)
- utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+ utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
self.new_ndparams = new_ndparams
+ if self.op.diskparams:
+ diskparams = self.group.diskparams
+ uavdp = self._UpdateAndVerifyDiskParams
+ # For each disktemplate subdict update and verify the values
+ new_diskparams = dict((dt,
+ uavdp(diskparams.get(dt, {}),
+ self.op.diskparams[dt]))
+ for dt in constants.DISK_TEMPLATES
+ if dt in self.op.diskparams)
+ # As we've all subdicts of diskparams ready, lets merge the actual
+ # dict with all updated subdicts
+ self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
+ try:
+ utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
+
+ if self.op.hv_state:
+ self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
+ self.group.hv_state_static)
+
+ if self.op.disk_state:
+ self.new_disk_state = \
+ _MergeAndVerifyDiskState(self.op.disk_state,
+ self.group.disk_state_static)
+
+ if self.op.ipolicy:
+ self.new_ipolicy = _GetUpdatedIPolicy(self.group.ipolicy,
+ self.op.ipolicy,
+ group_policy=True)
+
+ new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy)
+ inst_filter = lambda inst: inst.name in owned_instances
+ instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values()
+ violations = \
+ _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster,
+ self.group),
+ new_ipolicy, instances)
+
+ if violations:
+ self.LogWarning("After the ipolicy change the following instances"
+ " violate them: %s",
+ utils.CommaJoin(violations))
+
def BuildHooksEnv(self):
"""Build hooks env.
self.group.ndparams = self.new_ndparams
result.append(("ndparams", str(self.group.ndparams)))
+ if self.op.diskparams:
+ self.group.diskparams = self.new_diskparams
+ result.append(("diskparams", str(self.group.diskparams)))
+
if self.op.alloc_policy:
self.group.alloc_policy = self.op.alloc_policy
+ if self.op.hv_state:
+ self.group.hv_state_static = self.new_hv_state
+
+ if self.op.disk_state:
+ self.group.disk_state_static = self.new_disk_state
+
+ if self.op.ipolicy:
+ self.group.ipolicy = self.new_ipolicy
+
self.cfg.Update(self.group, feedback_fn)
return result
self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
- for instance_name in owned_instances:
- inst = self.instances[instance_name]
- assert owned_nodes.issuperset(inst.all_nodes), \
- "Instance %s's nodes changed while we kept the lock" % instance_name
-
- inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
- owned_groups)
-
- assert self.group_uuid in inst_groups, \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ _CheckInstancesNodeGroups(self.cfg, self.instances,
+ owned_groups, owned_nodes, self.group_uuid)
if self.req_target_uuids:
# User requested specific target groups
def ExpandNames(self):
self.group_uuid = None
self.needed_locks = {}
+
if self.op.kind == constants.TAG_NODE:
self.op.name = _ExpandNodeName(self.cfg, self.op.name)
- self.needed_locks[locking.LEVEL_NODE] = self.op.name
+ lock_level = locking.LEVEL_NODE
+ lock_name = self.op.name
elif self.op.kind == constants.TAG_INSTANCE:
self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
- self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+ lock_level = locking.LEVEL_INSTANCE
+ lock_name = self.op.name
elif self.op.kind == constants.TAG_NODEGROUP:
self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
+ lock_level = locking.LEVEL_NODEGROUP
+ lock_name = self.group_uuid
+ elif self.op.kind == constants.TAG_NETWORK:
+ self.network_uuid = self.cfg.LookupNetwork(self.op.name)
+ lock_level = locking.LEVEL_NETWORK
+ lock_name = self.network_uuid
+ else:
+ lock_level = None
+ lock_name = None
+
+ if lock_level and getattr(self.op, "use_locking", True):
+ self.needed_locks[lock_level] = lock_name
# FIXME: Acquire BGL for cluster tag operations (as of this writing it's
# not possible to acquire the BGL based on opcode parameters)
self.target = self.cfg.GetInstanceInfo(self.op.name)
elif self.op.kind == constants.TAG_NODEGROUP:
self.target = self.cfg.GetNodeGroup(self.group_uuid)
+ elif self.op.kind == constants.TAG_NETWORK:
+ self.target = self.cfg.GetNetwork(self.network_uuid)
else:
raise errors.OpPrereqError("Wrong tag type requested (%s)" %
str(self.op.kind), errors.ECODE_INVAL)
# pylint: disable=R0902
# lots of instance attributes
- def __init__(self, cfg, rpc, mode, **kwargs):
+ def __init__(self, cfg, rpc_runner, mode, **kwargs):
self.cfg = cfg
- self.rpc = rpc
+ self.rpc = rpc_runner
# init buffer variables
self.in_text = self.out_text = self.in_data = self.out_data = None
# init all input fields so that pylint is happy
self.mode = mode
- self.memory = self.disks = self.disk_template = None
+ self.memory = self.disks = self.disk_template = self.spindle_use = None
self.os = self.tags = self.nics = self.vcpus = None
self.hypervisor = None
self.relocate_from = None
"cluster_name": cfg.GetClusterName(),
"cluster_tags": list(cluster_info.GetTags()),
"enabled_hypervisors": list(cluster_info.enabled_hypervisors),
- # we don't have job IDs
+ "ipolicy": cluster_info.ipolicy,
}
ninfo = cfg.GetAllNodesInfo()
iinfo = cfg.GetAllInstancesInfo().values()
elif self.mode == constants.IALLOCATOR_MODE_RELOC:
hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
else:
- hypervisor_name = cluster_info.enabled_hypervisors[0]
+ hypervisor_name = cluster_info.primary_hypervisor
- node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
- hypervisor_name)
+ node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
+ [hypervisor_name])
node_iinfo = \
self.rpc.call_all_instances_info(node_list,
cluster_info.enabled_hypervisors)
data["nodegroups"] = self._ComputeNodeGroupData(cfg)
- config_ndata = self._ComputeBasicNodeData(ninfo)
+ config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
i_list, config_ndata)
assert len(data["nodes"]) == len(ninfo), \
"""Compute node groups data.
"""
+ cluster = cfg.GetClusterInfo()
ng = dict((guuid, {
"name": gdata.name,
"alloc_policy": gdata.alloc_policy,
+ "ipolicy": _CalculateGroupIPolicy(cluster, gdata),
})
for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
return ng
@staticmethod
- def _ComputeBasicNodeData(node_cfg):
+ def _ComputeBasicNodeData(cfg, node_cfg):
"""Compute global node data.
@rtype: dict
"group": ninfo.group,
"master_capable": ninfo.master_capable,
"vm_capable": ninfo.vm_capable,
+ "ndparams": cfg.GetNdParams(ninfo),
})
for ninfo in node_cfg.values())
@param node_results: the basic node structures as filled from the config
"""
+ #TODO(dynmem): compute the right data on MAX and MIN memory
# make a copy of the current dict
node_results = dict(node_results)
for nname, nresult in node_data.items():
nresult.Raise("Can't get data for node %s" % nname)
node_iinfo[nname].Raise("Can't get node instance info from node %s" %
nname)
- remote_info = nresult.payload
+ remote_info = _MakeLegacyNodeInfo(nresult.payload)
for attr in ["memory_total", "memory_free", "memory_dom0",
"vg_size", "vg_free", "cpu_total"]:
i_p_mem = i_p_up_mem = 0
for iinfo, beinfo in i_list:
if iinfo.primary_node == nname:
- i_p_mem += beinfo[constants.BE_MEMORY]
+ i_p_mem += beinfo[constants.BE_MAXMEM]
if iinfo.name not in node_iinfo[nname].payload:
i_used_mem = 0
else:
i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
- i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
+ i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
remote_info["memory_free"] -= max(0, i_mem_diff)
- if iinfo.admin_up:
- i_p_up_mem += beinfo[constants.BE_MEMORY]
+ if iinfo.admin_state == constants.ADMINST_UP:
+ i_p_up_mem += beinfo[constants.BE_MAXMEM]
# compute memory used by instances
pnr_dyn = {
"ip": nic.ip,
"mode": filled_params[constants.NIC_MODE],
"link": filled_params[constants.NIC_LINK],
+ "network": nic.network,
}
if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
nic_dict["bridge"] = filled_params[constants.NIC_LINK]
nic_data.append(nic_dict)
pir = {
"tags": list(iinfo.GetTags()),
- "admin_up": iinfo.admin_up,
+ "admin_state": iinfo.admin_state,
"vcpus": beinfo[constants.BE_VCPUS],
- "memory": beinfo[constants.BE_MEMORY],
+ "memory": beinfo[constants.BE_MAXMEM],
+ "spindle_use": beinfo[constants.BE_SPINDLE_USE],
"os": iinfo.os,
"nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
"nics": nic_data,
"os": self.os,
"vcpus": self.vcpus,
"memory": self.memory,
+ "spindle_use": self.spindle_use,
"disks": self.disks,
"disk_space_total": disk_space,
"nics": self.nics,
[
("name", ht.TString),
("memory", ht.TInt),
+ ("spindle_use", ht.TInt),
("disks", ht.TListOf(ht.TDict)),
("disk_template", ht.TString),
("os", ht.TString),
result = ial.out_text
return result
+# Network LUs
+class LUNetworkAdd(LogicalUnit):
+ """Logical unit for creating networks.
-#: Query type implementations
-_QUERY_IMPL = {
- constants.QR_INSTANCE: _InstanceQuery,
+ """
+ HPATH = "network-add"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
+
+ def ExpandNames(self):
+ self.network_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
+ self.needed_locks = {}
+ self.add_locks[locking.LEVEL_NETWORK] = self.network_uuid
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the given group name is not an existing node group
+ already.
+
+ """
+ if self.op.network is None:
+ raise errors.OpPrereqError("Network must be given",
+ errors.ECODE_INVAL)
+
+ uuid = self.cfg.LookupNetwork(self.op.network_name)
+
+ if uuid:
+ raise errors.OpPrereqError("Network '%s' already defined" %
+ self.op.network, errors.ECODE_EXISTS)
+
+ if self.op.mac_prefix:
+ utils.NormalizeAndValidateMac(self.op.mac_prefix+":00:00:00")
+
+ # Check tag validity
+ for tag in self.op.tags:
+ objects.TaggableObject.ValidateTag(tag)
+
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ args = {
+ "name": self.op.network_name,
+ "network": self.op.network,
+ "gateway": self.op.gateway,
+ "network6": self.op.network6,
+ "gateway6": self.op.gateway6,
+ "mac_prefix": self.op.mac_prefix,
+ "network_type": self.op.network_type,
+ "tags": self.op.tags,
+ }
+ return _BuildNetworkHookEnv(**args)
+
+ def Exec(self, feedback_fn):
+ """Add the ip pool to the cluster.
+
+ """
+ nobj = objects.Network(name=self.op.network_name,
+ network=self.op.network,
+ gateway=self.op.gateway,
+ network6=self.op.network6,
+ gateway6=self.op.gateway6,
+ mac_prefix=self.op.mac_prefix,
+ network_type=self.op.network_type,
+ uuid=self.network_uuid,
+ family=4)
+ # Initialize the associated address pool
+ try:
+ pool = network.AddressPool.InitializeNetwork(nobj)
+ except errors.AddressPoolError, e:
+ raise errors.OpExecError("Cannot create IP pool for this network. %s" % e)
+
+ # Check if we need to reserve the nodes and the cluster master IP
+ # These may not be allocated to any instances in routed mode, as
+ # they wouldn't function anyway.
+ for node in self.cfg.GetAllNodesInfo().values():
+ for ip in [node.primary_ip, node.secondary_ip]:
+ try:
+ pool.Reserve(ip)
+ self.LogInfo("Reserved node %s's IP (%s)", node.name, ip)
+
+ except errors.AddressPoolError:
+ pass
+
+ master_ip = self.cfg.GetClusterInfo().master_ip
+ try:
+ pool.Reserve(master_ip)
+ self.LogInfo("Reserved cluster master IP (%s)", master_ip)
+ except errors.AddressPoolError:
+ pass
+
+ if self.op.add_reserved_ips:
+ for ip in self.op.add_reserved_ips:
+ try:
+ pool.Reserve(ip, external=True)
+ except errors.AddressPoolError, e:
+ raise errors.OpExecError("Cannot reserve IP %s. %s " % (ip, e))
+
+ if self.op.tags:
+ for tag in self.op.tags:
+ nobj.AddTag(tag)
+
+ self.cfg.AddNetwork(nobj, self.proc.GetECId(), check_uuid=False)
+ del self.remove_locks[locking.LEVEL_NETWORK]
+
+
+class LUNetworkRemove(LogicalUnit):
+ HPATH = "network-remove"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.network_uuid = self.cfg.LookupNetwork(self.op.network_name)
+
+ self.needed_locks = {
+ locking.LEVEL_NETWORK: [self.network_uuid],
+ }
+
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the given network name exists as a network, that is
+ empty (i.e., contains no nodes), and that is not the last group of the
+ cluster.
+
+ """
+ if not self.network_uuid:
+ raise errors.OpPrereqError("Network %s not found" % self.op.network_name,
+ errors.ECODE_INVAL)
+
+ # Verify that the network is not conncted.
+ node_groups = [group.name
+ for group in self.cfg.GetAllNodeGroupsInfo().values()
+ for network in group.networks.keys()
+ if network == self.network_uuid]
+
+ if node_groups:
+ self.LogWarning("Nework '%s' is connected to the following"
+ " node groups: %s" % (self.op.network_name,
+ utils.CommaJoin(utils.NiceSort(node_groups))))
+ raise errors.OpPrereqError("Network still connected",
+ errors.ECODE_STATE)
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ return {
+ "NETWORK_NAME": self.op.network_name,
+ }
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
+
+ def Exec(self, feedback_fn):
+ """Remove the network.
+
+ """
+ try:
+ self.cfg.RemoveNetwork(self.network_uuid)
+ except errors.ConfigurationError:
+ raise errors.OpExecError("Network '%s' with UUID %s disappeared" %
+ (self.op.network_name, self.network_uuid))
+
+
+class LUNetworkSetParams(LogicalUnit):
+ """Modifies the parameters of a network.
+
+ """
+ HPATH = "network-modify"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ if (self.op.gateway and
+ (self.op.add_reserved_ips or self.op.remove_reserved_ips)):
+ raise errors.OpPrereqError("Cannot modify gateway and reserved ips"
+ " at once", errors.ECODE_INVAL)
+
+
+ def ExpandNames(self):
+ self.network_uuid = self.cfg.LookupNetwork(self.op.network_name)
+ self.network = self.cfg.GetNetwork(self.network_uuid)
+ self.needed_locks = {
+ locking.LEVEL_NETWORK: [self.network_uuid],
+ }
+
+
+ if self.network is None:
+ raise errors.OpPrereqError("Could not retrieve network '%s' (UUID: %s)" %
+ (self.op.network_name, self.network_uuid),
+ errors.ECODE_INVAL)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ self.gateway = self.network.gateway
+ self.network_type = self.network.network_type
+ self.mac_prefix = self.network.mac_prefix
+ self.network6 = self.network.network6
+ self.gateway6 = self.network.gateway6
+ self.tags = self.network.tags
+
+ self.pool = network.AddressPool(self.network)
+
+ if self.op.gateway:
+ if self.op.gateway == constants.VALUE_NONE:
+ self.gateway = None
+ else:
+ self.gateway = self.op.gateway
+ if self.pool.IsReserved(self.gateway):
+ raise errors.OpPrereqError("%s is already reserved" %
+ self.gateway, errors.ECODE_INVAL)
+
+ if self.op.network_type:
+ if self.op.network_type == constants.VALUE_NONE:
+ self.network_type = None
+ else:
+ self.network_type = self.op.network_type
+
+ if self.op.mac_prefix:
+ if self.op.mac_prefix == constants.VALUE_NONE:
+ self.mac_prefix = None
+ else:
+ utils.NormalizeAndValidateMac(self.op.mac_prefix+":00:00:00")
+ self.mac_prefix = self.op.mac_prefix
+
+ if self.op.gateway6:
+ if self.op.gateway6 == constants.VALUE_NONE:
+ self.gateway6 = None
+ else:
+ self.gateway6 = self.op.gateway6
+
+ if self.op.network6:
+ if self.op.network6 == constants.VALUE_NONE:
+ self.network6 = None
+ else:
+ self.network6 = self.op.network6
+
+
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ """
+ args = {
+ "name": self.op.network_name,
+ "network": self.network.network,
+ "gateway": self.gateway,
+ "network6": self.network6,
+ "gateway6": self.gateway6,
+ "mac_prefix": self.mac_prefix,
+ "network_type": self.network_type,
+ "tags": self.tags,
+ }
+ return _BuildNetworkHookEnv(**args)
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ mn = self.cfg.GetMasterNode()
+ return ([mn], [mn])
+
+ def Exec(self, feedback_fn):
+ """Modifies the network.
+
+ """
+ #TODO: reserve/release via temporary reservation manager
+ # extend cfg.ReserveIp/ReleaseIp with the external flag
+ if self.op.gateway:
+ if self.gateway == self.network.gateway:
+ self.LogWarning("Gateway is already %s" % self.gateway)
+ else:
+ if self.gateway:
+ self.pool.Reserve(self.gateway, external=True)
+ if self.network.gateway:
+ self.pool.Release(self.network.gateway, external=True)
+ self.network.gateway = self.gateway
+
+ if self.op.add_reserved_ips:
+ for ip in self.op.add_reserved_ips:
+ try:
+ if self.pool.IsReserved(ip):
+ self.LogWarning("IP %s is already reserved" % ip)
+ else:
+ self.pool.Reserve(ip, external=True)
+ except errors.AddressPoolError, e:
+ self.LogWarning("Cannot reserve ip %s. %s" % (ip, e))
+
+ if self.op.remove_reserved_ips:
+ for ip in self.op.remove_reserved_ips:
+ if ip == self.network.gateway:
+ self.LogWarning("Cannot unreserve Gateway's IP")
+ continue
+ try:
+ if not self.pool.IsReserved(ip):
+ self.LogWarning("IP %s is already unreserved" % ip)
+ else:
+ self.pool.Release(ip, external=True)
+ except errors.AddressPoolError, e:
+ self.LogWarning("Cannot release ip %s. %s" % (ip, e))
+
+ if self.op.mac_prefix:
+ self.network.mac_prefix = self.mac_prefix
+
+ if self.op.network6:
+ self.network.network6 = self.network6
+
+ if self.op.gateway6:
+ self.network.gateway6 = self.gateway6
+
+ if self.op.network_type:
+ self.network.network_type = self.network_type
+
+ self.pool.Validate()
+
+ self.cfg.Update(self.network, feedback_fn)
+
+
+class _NetworkQuery(_QueryBase):
+ FIELDS = query.NETWORK_FIELDS
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ self._all_networks = lu.cfg.GetAllNetworksInfo()
+ name_to_uuid = dict((n.name, n.uuid) for n in self._all_networks.values())
+
+ if not self.names:
+ self.wanted = [name_to_uuid[name]
+ for name in utils.NiceSort(name_to_uuid.keys())]
+ else:
+ # Accept names to be either names or UUIDs.
+ missing = []
+ self.wanted = []
+ all_uuid = frozenset(self._all_networks.keys())
+
+ for name in self.names:
+ if name in all_uuid:
+ self.wanted.append(name)
+ elif name in name_to_uuid:
+ self.wanted.append(name_to_uuid[name])
+ else:
+ missing.append(name)
+
+ if missing:
+ raise errors.OpPrereqError("Some networks do not exist: %s" % missing,
+ errors.ECODE_NOENT)
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of networks and their attributes.
+
+ """
+ do_instances = query.NETQ_INST in self.requested_data
+ do_groups = do_instances or (query.NETQ_GROUP in self.requested_data)
+ do_stats = query.NETQ_STATS in self.requested_data
+ cluster = lu.cfg.GetClusterInfo()
+
+ network_to_groups = None
+ network_to_instances = None
+ stats = None
+
+ # For NETQ_GROUP, we need to map network->[groups]
+ if do_groups:
+ all_groups = lu.cfg.GetAllNodeGroupsInfo()
+ network_to_groups = dict((uuid, []) for uuid in self.wanted)
+ default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
+
+ if do_instances:
+ all_instances = lu.cfg.GetAllInstancesInfo()
+ all_nodes = lu.cfg.GetAllNodesInfo()
+ network_to_instances = dict((uuid, []) for uuid in self.wanted)
+
+
+ for group in all_groups.values():
+ if do_instances:
+ group_nodes = [node.name for node in all_nodes.values() if
+ node.group == group.uuid]
+ group_instances = [instance for instance in all_instances.values()
+ if instance.primary_node in group_nodes]
+
+ for net_uuid in group.networks.keys():
+ if net_uuid in network_to_groups:
+ netparams = group.networks[net_uuid]
+ mode = netparams[constants.NIC_MODE]
+ link = netparams[constants.NIC_LINK]
+ info = group.name + '(' + mode + ', ' + link + ')'
+ network_to_groups[net_uuid].append(info)
+
+ if do_instances:
+ for instance in group_instances:
+ for nic in instance.nics:
+ if nic.network == self._all_networks[net_uuid].name:
+ network_to_instances[net_uuid].append(instance.name)
+ break
+
+ if do_stats:
+ stats = {}
+ for uuid, net in self._all_networks.items():
+ if uuid in self.wanted:
+ pool = network.AddressPool(net)
+ stats[uuid] = {
+ "free_count": pool.GetFreeCount(),
+ "reserved_count": pool.GetReservedCount(),
+ "map": pool.GetMap(),
+ "external_reservations": ", ".join(pool.GetExternalReservations()),
+ }
+
+ return query.NetworkQueryData([self._all_networks[uuid]
+ for uuid in self.wanted],
+ network_to_groups,
+ network_to_instances,
+ stats)
+
+
+class LUNetworkQuery(NoHooksLU):
+ """Logical unit for querying networks.
+
+ """
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ self.nq = _NetworkQuery(qlang.MakeSimpleFilter("name", self.op.names),
+ self.op.output_fields, False)
+
+ def ExpandNames(self):
+ self.nq.ExpandNames(self)
+
+ def Exec(self, feedback_fn):
+ return self.nq.OldStyleQuery(self)
+
+
+
+class LUNetworkConnect(LogicalUnit):
+ """Connect a network to a nodegroup
+
+ """
+ HPATH = "network-connect"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.network_name = self.op.network_name
+ self.group_name = self.op.group_name
+ self.network_mode = self.op.network_mode
+ self.network_link = self.op.network_link
+
+ self.network_uuid = self.cfg.LookupNetwork(self.network_name)
+ self.network = self.cfg.GetNetwork(self.network_uuid)
+ self.group_uuid = self.cfg.LookupNodeGroup(self.group_name)
+ self.group = self.cfg.GetNodeGroup(self.group_uuid)
+
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [self.group_uuid],
+ }
+ self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once group lock has
+ # been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ def BuildHooksEnv(self):
+ ret = dict()
+ ret["GROUP_NAME"] = self.group_name
+ ret["GROUP_NETWORK_MODE"] = self.network_mode
+ ret["GROUP_NETWORK_LINK"] = self.network_link
+ ret.update(_BuildNetworkHookEnvByObject(self, self.network))
+ return ret
+
+ def BuildHooksNodes(self):
+ nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ return (nodes, nodes)
+
+
+ def CheckPrereq(self):
+ l = lambda value: ", ".join("%s: %s/%s" % (i[0], i[1], i[2])
+ for i in value)
+
+ if self.network is None:
+ raise errors.OpPrereqError("Network %s does not exist" %
+ self.network_name, errors.ECODE_INVAL)
+
+ self.netparams = dict()
+ self.netparams[constants.NIC_MODE] = self.network_mode
+ self.netparams[constants.NIC_LINK] = self.network_link
+ objects.NIC.CheckParameterSyntax(self.netparams)
+
+ #if self.network_mode == constants.NIC_MODE_BRIDGED:
+ # _CheckNodeGroupBridgesExist(self, self.network_link, self.group_uuid)
+ self.connected = False
+ if self.network_uuid in self.group.networks:
+ self.LogWarning("Network '%s' is already mapped to group '%s'" %
+ (self.network_name, self.group.name))
+ self.connected = True
+ return
+
+ pool = network.AddressPool(self.network)
+ if self.op.conflicts_check:
+ groupinstances = []
+ for n in self.cfg.GetNodeGroupInstances(self.group_uuid):
+ groupinstances.append(self.cfg.GetInstanceInfo(n))
+ instances = [(instance.name, idx, nic.ip)
+ for instance in groupinstances
+ for idx, nic in enumerate(instance.nics)
+ if (not nic.network and pool._Contains(nic.ip))]
+ if instances:
+ self.LogWarning("Following occurences use IPs from network %s"
+ " that is about to connect to nodegroup %s: %s" %
+ (self.network_name, self.group.name,
+ l(instances)))
+ raise errors.OpPrereqError("Conflicting IPs found."
+ " Please remove/modify"
+ " corresponding NICs",
+ errors.ECODE_INVAL)
+
+ def Exec(self, feedback_fn):
+ if self.connected:
+ return
+
+ self.group.networks[self.network_uuid] = self.netparams
+ self.cfg.Update(self.group, feedback_fn)
+
+
+class LUNetworkDisconnect(LogicalUnit):
+ """Disconnect a network to a nodegroup
+
+ """
+ HPATH = "network-disconnect"
+ HTYPE = constants.HTYPE_NETWORK
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self.network_name = self.op.network_name
+ self.group_name = self.op.group_name
+
+ self.network_uuid = self.cfg.LookupNetwork(self.network_name)
+ self.network = self.cfg.GetNetwork(self.network_uuid)
+ self.group_uuid = self.cfg.LookupNodeGroup(self.group_name)
+ self.group = self.cfg.GetNodeGroup(self.group_uuid)
+
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [self.group_uuid],
+ }
+ self.share_locks[locking.LEVEL_INSTANCE] = 1
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once group lock has
+ # been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ def BuildHooksEnv(self):
+ ret = dict()
+ ret["GROUP_NAME"] = self.group_name
+ ret.update(_BuildNetworkHookEnvByObject(self, self.network))
+ return ret
+
+ def BuildHooksNodes(self):
+ nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ return (nodes, nodes)
+
+
+ def CheckPrereq(self):
+ l = lambda value: ", ".join("%s: %s/%s" % (i[0], i[1], i[2])
+ for i in value)
+
+ self.connected = True
+ if self.network_uuid not in self.group.networks:
+ self.LogWarning("Network '%s' is"
+ " not mapped to group '%s'" %
+ (self.network_name, self.group.name))
+ self.connected = False
+ return
+
+ if self.op.conflicts_check:
+ groupinstances = []
+ for n in self.cfg.GetNodeGroupInstances(self.group_uuid):
+ groupinstances.append(self.cfg.GetInstanceInfo(n))
+ instances = [(instance.name, idx, nic.ip)
+ for instance in groupinstances
+ for idx, nic in enumerate(instance.nics)
+ if nic.network == self.network_name]
+ if instances:
+ self.LogWarning("Following occurences use IPs from network %s"
+ " that is about to disconnected from the nodegroup"
+ " %s: %s" %
+ (self.network_name, self.group.name,
+ l(instances)))
+ raise errors.OpPrereqError("Conflicting IPs."
+ " Please remove/modify"
+ " corresponding NICS",
+ errors.ECODE_INVAL)
+
+ def Exec(self, feedback_fn):
+ if not self.connected:
+ return
+
+ del self.group.networks[self.network_uuid]
+ self.cfg.Update(self.group, feedback_fn)
+
+
+#: Query type implementations
+_QUERY_IMPL = {
+ constants.QR_CLUSTER: _ClusterQuery,
+ constants.QR_INSTANCE: _InstanceQuery,
constants.QR_NODE: _NodeQuery,
constants.QR_GROUP: _GroupQuery,
+ constants.QR_NETWORK: _NetworkQuery,
constants.QR_OS: _OsQuery,
+ constants.QR_EXTSTORAGE: _ExtStorageQuery,
+ constants.QR_EXPORT: _ExportQuery,
}
assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
except KeyError:
raise errors.OpPrereqError("Unknown query resource '%s'" % name,
errors.ECODE_INVAL)
+
+def _CheckForConflictingIp(lu, ip, node):
+ """In case of conflicting ip raise error.
+
+ @type ip: string
+ @param ip: ip address
+ @type node: string
+ @param node: node name
+
+ """
+ (conf_net, conf_netparams) = lu.cfg.CheckIPInNodeGroup(ip, node)
+ if conf_net is not None:
+ raise errors.OpPrereqError("Conflicting IP found:"
+ " %s <> %s." % (ip, conf_net),
+ errors.ECODE_INVAL)
+
+ return (None, None)