"Instance %s has no node in group %s" % (name, cur_group_uuid)
-def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
+def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
+ primary_only=False):
"""Checks if the owned node groups are still correct for an instance.
@type cfg: L{config.ConfigWriter}
@param instance_name: Instance name
@type owned_groups: set or frozenset
@param owned_groups: List of currently owned node groups
+ @type primary_only: boolean
+ @param primary_only: Whether to check node groups for only the primary node
"""
- inst_groups = cfg.GetInstanceNodeGroups(instance_name)
+ inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
if not owned_groups.issuperset(inst_groups):
raise errors.OpPrereqError("Instance %s's node groups changed since"
use_none=use_none,
use_default=use_default)
else:
- if not value or value == [constants.VALUE_DEFAULT]:
+ if (not value or value == [constants.VALUE_DEFAULT] or
+ value == constants.VALUE_DEFAULT):
if group_policy:
del ipolicy[key]
else:
# in a nicer way
ipolicy[key] = list(value)
try:
- objects.InstancePolicy.CheckParameterSyntax(ipolicy)
+ objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
except errors.ConfigurationError, err:
raise errors.OpPrereqError("Invalid instance policy: %s" % err,
errors.ECODE_INVAL)
(instance.name, msg), errors.ECODE_STATE)
-def _ComputeMinMaxSpec(name, ipolicy, value):
+def _ComputeMinMaxSpec(name, qualifier, ipolicy, value):
"""Computes if value is in the desired range.
@param name: name of the parameter for which we perform the check
+ @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
+ not just 'disk')
@param ipolicy: dictionary containing min, max and std values
@param value: actual value that we want to use
@return: None or element not meeting the criteria
max_v = ipolicy[constants.ISPECS_MAX].get(name, value)
min_v = ipolicy[constants.ISPECS_MIN].get(name, value)
if value > max_v or min_v > value:
+ if qualifier:
+ fqn = "%s/%s" % (name, qualifier)
+ else:
+ fqn = name
return ("%s value %s is not in range [%s, %s]" %
- (name, value, min_v, max_v))
+ (fqn, value, min_v, max_v))
return None
assert disk_count == len(disk_sizes)
test_settings = [
- (constants.ISPEC_MEM_SIZE, mem_size),
- (constants.ISPEC_CPU_COUNT, cpu_count),
- (constants.ISPEC_DISK_COUNT, disk_count),
- (constants.ISPEC_NIC_COUNT, nic_count),
- (constants.ISPEC_SPINDLE_USE, spindle_use),
- ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes)
+ (constants.ISPEC_MEM_SIZE, "", mem_size),
+ (constants.ISPEC_CPU_COUNT, "", cpu_count),
+ (constants.ISPEC_DISK_COUNT, "", disk_count),
+ (constants.ISPEC_NIC_COUNT, "", nic_count),
+ (constants.ISPEC_SPINDLE_USE, "", spindle_use),
+ ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
+ for idx, d in enumerate(disk_sizes)]
return filter(None,
- (_compute_fn(name, ipolicy, value)
- for (name, value) in test_settings))
+ (_compute_fn(name, qualifier, ipolicy, value)
+ for (name, qualifier, value) in test_settings))
def _ComputeIPolicyInstanceViolation(ipolicy, instance,
@param old_ipolicy: The current (still in-place) ipolicy
@param new_ipolicy: The new (to become) ipolicy
@param instances: List of instances to verify
- @return: A list of instances which violates the new ipolicy but did not before
+ @return: A list of instances which violates the new ipolicy but
+ did not before
"""
- return (_ComputeViolatingInstances(old_ipolicy, instances) -
- _ComputeViolatingInstances(new_ipolicy, instances))
+ return (_ComputeViolatingInstances(new_ipolicy, instances) -
+ _ComputeViolatingInstances(old_ipolicy, instances))
def _ExpandItemName(fn, name, kind):
for dev in instance.disks:
cfg.SetDiskID(dev, node_name)
- result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
+ result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
+ instance))
result.Raise("Failed to get disk status from node %s" % node_name,
prereq=prereq, ecode=errors.ECODE_ENVIRON)
" cluster-wide default iallocator found;"
" please specify either an iallocator or a"
" node, or set a cluster-wide default"
- " iallocator")
+ " iallocator", errors.ECODE_INVAL)
def _GetDefaultIAllocator(cfg, iallocator):
ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info)
err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
- _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err)
+ _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err))
for node in node_vol_should:
n_img = node_image[node]
node_disks[nname] = disks
- # Creating copies as SetDiskID below will modify the objects and that can
- # lead to incorrect data returned from nodes
- devonly = [dev.Copy() for (_, dev) in disks]
-
- for dev in devonly:
- self.cfg.SetDiskID(dev, nname)
+ # _AnnotateDiskParams makes already copies of the disks
+ devonly = []
+ for (inst, dev) in disks:
+ (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
+ self.cfg.SetDiskID(anno_disk, nname)
+ devonly.append(anno_disk)
node_disks_devonly[nname] = devonly
for instance in self.my_inst_names:
inst_config = self.my_inst_info[instance]
+ if inst_config.admin_state == constants.ADMINST_OFFLINE:
+ i_offline += 1
for nname in inst_config.all_nodes:
if nname not in node_image:
non_primary_inst = set(nimg.instances).difference(nimg.pinst)
for inst in non_primary_inst:
- # FIXME: investigate best way to handle offline insts
- if inst.admin_state == constants.ADMINST_OFFLINE:
- if verbose:
- feedback_fn("* Skipping offline instance %s" % inst.name)
- i_offline += 1
- continue
test = inst in self.all_inst_info
_ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
"instance should not run on node %s", node_i.name)
ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
except errors.ProgrammerError:
raise errors.OpPrereqError("Invalid primary ip family: %s." %
- ip_family)
+ ip_family, errors.ECODE_INVAL)
if not ipcls.ValidateNetmask(netmask):
raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
- (netmask))
+ (netmask), errors.ECODE_INVAL)
class LUClusterSetParams(LogicalUnit):
if self.op.diskparams:
for dt_params in self.op.diskparams.values():
utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
+ try:
+ utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
if violations:
self.LogWarning("After the ipolicy change the following instances"
" violate them: %s",
- utils.CommaJoin(violations))
+ utils.CommaJoin(utils.NiceSort(violations)))
if self.op.nicparams:
utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
" address" % (instance.name, nic_idx))
if nic_errors:
raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
- "\n".join(nic_errors))
+ "\n".join(nic_errors), errors.ECODE_INVAL)
# hypervisor list/parameters
self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
if cluster.modify_etc_hosts:
files_all.add(constants.ETC_HOSTS)
+ if cluster.use_external_mip_script:
+ files_all.add(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+
# Files which are optional, these must:
# - be present in one other category as well
# - either exist or not exist on all nodes of that category (mc, vm all)
if not redist:
files_mc.add(constants.CLUSTER_CONF_FILE)
- # FIXME: this should also be replicated but Ganeti doesn't support files_mc
- # replication
- files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
-
# Files which should only be on VM-capable nodes
files_vm = set(filename
for hv_name in cluster.enabled_hypervisors
master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
online_nodes = lu.cfg.GetOnlineNodeList()
- vm_nodes = lu.cfg.GetVmCapableNodeList()
+ online_set = frozenset(online_nodes)
+ vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
if additional_nodes is not None:
online_nodes.extend(additional_nodes)
max_time = 0
done = True
cumul_degraded = False
- rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
+ rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
msg = rstats.fail_msg
if msg:
lu.LogWarning("Can't get any data from node %s: %s", node, msg)
return not cumul_degraded
+def _BlockdevFind(lu, node, dev, instance):
+ """Wrapper around call_blockdev_find to annotate diskparams.
+
+ @param lu: A reference to the lu object
+ @param node: The node to call out
+ @param dev: The device to find
+ @param instance: The instance object the device belongs to
+ @returns The result of the rpc call
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return lu.rpc.call_blockdev_find(node, disk)
+
+
def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
+ """Wrapper around L{_CheckDiskConsistencyInner}.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
+ ldisk=ldisk)
+
+
+def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
+ ldisk=False):
"""Check that mirrors are not degraded.
+ @attention: The device has to be annotated already.
+
The ldisk parameter, if True, will change the test from the
is_degraded attribute (which represents overall non-ok status for
the device(s)) to the ldisk (representing the local storage status).
if dev.children:
for child in dev.children:
- result = result and _CheckDiskConsistency(lu, instance, child, node,
- on_primary)
+ result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
+ on_primary)
return result
if mc_remaining < mc_should:
raise errors.OpPrereqError("Not enough master candidates, please"
" pass auto promote option to allow"
- " promotion", errors.ECODE_STATE)
+ " promotion (--auto-promote or RAPI"
+ " auto_promote=True)", errors.ECODE_STATE)
self.old_flags = old_flags = (node.master_candidate,
node.drained, node.offline)
self.op.powered == True):
raise errors.OpPrereqError(("Node %s needs to be turned on before its"
" offline status can be reset") %
- self.op.node_name)
+ self.op.node_name, errors.ECODE_STATE)
elif self.op.powered is not None:
raise errors.OpPrereqError(("Unable to change powered state for node %s"
" as it does not support out-of-band"
- " handling") % self.op.node_name)
+ " handling") % self.op.node_name,
+ errors.ECODE_STATE)
# If we're being deofflined/drained, we'll MC ourself if needed
if (self.op.drained == False or self.op.offline == False or
" without using re-add. Please make sure the node"
" is healthy!")
+ # When changing the secondary ip, verify if this is a single-homed to
+ # multi-homed transition or vice versa, and apply the relevant
+ # restrictions.
if self.op.secondary_ip:
# Ok even without locking, because this can't be changed by any LU
master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
master_singlehomed = master.secondary_ip == master.primary_ip
- if master_singlehomed and self.op.secondary_ip:
- raise errors.OpPrereqError("Cannot change the secondary ip on a single"
- " homed cluster", errors.ECODE_INVAL)
+ if master_singlehomed and self.op.secondary_ip != node.primary_ip:
+ if self.op.force and node.name == master.name:
+ self.LogWarning("Transitioning from single-homed to multi-homed"
+ " cluster. All nodes will require a secondary ip.")
+ else:
+ raise errors.OpPrereqError("Changing the secondary ip on a"
+ " single-homed cluster requires the"
+ " --force option to be passed, and the"
+ " target node to be the master",
+ errors.ECODE_INVAL)
+ elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
+ if self.op.force and node.name == master.name:
+ self.LogWarning("Transitioning from multi-homed to single-homed"
+ " cluster. Secondary IPs will have to be removed.")
+ else:
+ raise errors.OpPrereqError("Cannot set the secondary IP to be the"
+ " same as the primary IP on a multi-homed"
+ " cluster, unless the --force option is"
+ " passed, and the target node is the"
+ " master", errors.ECODE_INVAL)
assert not (frozenset(affected_instances) -
self.owned_locks(locking.LEVEL_INSTANCE))
if node.offline:
if affected_instances:
- raise errors.OpPrereqError("Cannot change secondary IP address:"
- " offline node has instances (%s)"
- " configured to use it" %
- utils.CommaJoin(affected_instances.keys()))
+ msg = ("Cannot change secondary IP address: offline node has"
+ " instances (%s) configured to use it" %
+ utils.CommaJoin(affected_instances.keys()))
+ raise errors.OpPrereqError(msg, errors.ECODE_STATE)
else:
# On online nodes, check that no instances are running, and that
# the node has the new ip and we can reach it.
"ipolicy": cluster.ipolicy,
"nicparams": cluster.nicparams,
"ndparams": cluster.ndparams,
+ "diskparams": cluster.diskparams,
"candidate_pool_size": cluster.candidate_pool_size,
"master_netdev": cluster.master_netdev,
"master_netmask": cluster.master_netmask,
if not disks_ok:
raise errors.OpExecError("Cannot activate block devices")
+ if self.op.wait_for_sync:
+ if not _WaitForSync(self, self.instance):
+ raise errors.OpExecError("Some disks of the instance are degraded!")
+
return disks_info
False, idx)
msg = result.fail_msg
if msg:
+ is_offline_secondary = (node in instance.secondary_nodes and
+ result.offline)
lu.proc.LogWarning("Could not prepare block device %s on node %s"
" (is_primary=False, pass=1): %s",
inst_disk.iv_name, node, msg)
- if not ignore_secondaries:
+ if not (ignore_secondaries or is_offline_secondary):
disks_ok = False
# FIXME: race condition on drbd migration to primary
for disk in disks:
for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(top_disk, node)
- result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+ result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
msg = result.fail_msg
if msg:
lu.LogWarning("Could not shutdown block device %s on node %s: %s",
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, instance.primary_node, "Instance primary node"
" offline, cannot reinstall")
- for node in instance.secondary_nodes:
- _CheckNodeOnline(self, node, "Instance secondary node offline,"
- " cannot reinstall")
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
constants.IDISK_METAVG,
]))
+ def _RunAllocator(self):
+ """Run the allocator based on input opcode.
+
+ """
+ be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
+
+ # FIXME
+ # The allocator should actually run in "relocate" mode, but current
+ # allocators don't support relocating all the nodes of an instance at
+ # the same time. As a workaround we use "allocate" mode, but this is
+ # suboptimal for two reasons:
+ # - The instance name passed to the allocator is present in the list of
+ # existing instances, so there could be a conflict within the
+ # internal structures of the allocator. This doesn't happen with the
+ # current allocators, but it's a liability.
+ # - The allocator counts the resources used by the instance twice: once
+ # because the instance exists already, and once because it tries to
+ # allocate a new instance.
+ # The allocator could choose some of the nodes on which the instance is
+ # running, but that's not a problem. If the instance nodes are broken,
+ # they should be already be marked as drained or offline, and hence
+ # skipped by the allocator. If instance disks have been lost for other
+ # reasons, then recreating the disks on the same nodes should be fine.
+ ial = IAllocator(self.cfg, self.rpc,
+ mode=constants.IALLOCATOR_MODE_ALLOC,
+ name=self.op.instance_name,
+ disk_template=self.instance.disk_template,
+ tags=list(self.instance.GetTags()),
+ os=self.instance.os,
+ nics=[{}],
+ vcpus=be_full[constants.BE_VCPUS],
+ memory=be_full[constants.BE_MAXMEM],
+ spindle_use=be_full[constants.BE_SPINDLE_USE],
+ disks=[{constants.IDISK_SIZE: d.size,
+ constants.IDISK_MODE: d.mode}
+ for d in self.instance.disks],
+ hypervisor=self.instance.hypervisor)
+
+ assert ial.required_nodes == len(self.instance.all_nodes)
+
+ ial.Run(self.op.iallocator)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+ " %s" % (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
+
+ if len(ial.result) != ial.required_nodes:
+ raise errors.OpPrereqError("iallocator '%s' returned invalid number"
+ " of nodes (%s), required %s" %
+ (self.op.iallocator, len(ial.result),
+ ial.required_nodes), errors.ECODE_FAULT)
+
+ self.op.nodes = ial.result
+ self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+ self.op.instance_name, self.op.iallocator,
+ utils.CommaJoin(ial.result))
+
def CheckArguments(self):
if self.op.disks and ht.TPositiveInt(self.op.disks[0]):
# Normalize and convert deprecated list of disk indices
" once: %s" % utils.CommaJoin(duplicates),
errors.ECODE_INVAL)
+ if self.op.iallocator and self.op.nodes:
+ raise errors.OpPrereqError("Give either the iallocator or the new"
+ " nodes, not both", errors.ECODE_INVAL)
+
for (idx, params) in self.op.disks:
utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
unsupported = frozenset(params.keys()) - self._MODIFYABLE
self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
else:
self.needed_locks[locking.LEVEL_NODE] = []
+ if self.op.iallocator:
+ # iallocator will select a new node in the same group
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
self.needed_locks[locking.LEVEL_NODE_RES] = []
def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- # if we replace the nodes, we only need to lock the old primary,
- # otherwise we need to lock all nodes for disk re-creation
- primary_only = bool(self.op.nodes)
- self._LockInstancesNodes(primary_only=primary_only)
+ if level == locking.LEVEL_NODEGROUP:
+ assert self.op.iallocator is not None
+ assert not self.op.nodes
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
+ # Lock the primary group used by the instance optimistically; this
+ # requires going via the node before it's locked, requiring
+ # verification later on
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
+
+ elif level == locking.LEVEL_NODE:
+ # If an allocator is used, then we lock all the nodes in the current
+ # instance group, as we don't know yet which ones will be selected;
+ # if we replace the nodes without using an allocator, we only need to
+ # lock the old primary for doing RPCs (FIXME: we don't lock nodes for
+ # RPC anymore), otherwise we need to lock all the instance nodes for
+ # disk re-creation
+ if self.op.iallocator:
+ assert not self.op.nodes
+ assert not self.needed_locks[locking.LEVEL_NODE]
+ assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
+
+ # Lock member nodes of the group of the primary node
+ for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
+ self.needed_locks[locking.LEVEL_NODE].extend(
+ self.cfg.GetNodeGroup(group_uuid).members)
+ else:
+ primary_only = bool(self.op.nodes)
+ self._LockInstancesNodes(primary_only=primary_only)
elif level == locking.LEVEL_NODE_RES:
# Copy node locks
self.needed_locks[locking.LEVEL_NODE_RES] = \
primary_node = self.op.nodes[0]
else:
primary_node = instance.primary_node
- _CheckNodeOnline(self, primary_node)
+ if not self.op.iallocator:
+ _CheckNodeOnline(self, primary_node)
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
self.op.instance_name, errors.ECODE_INVAL)
+ # Verify if node group locks are still correct
+ owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+ if owned_groups:
+ # Node group locks are acquired only for the primary node (and only
+ # when the allocator is used)
+ _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
+ primary_only=True)
+
# if we replace nodes *and* the old primary is offline, we don't
# check
assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE)
assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES)
old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
- if not (self.op.nodes and old_pnode.offline):
+ if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
_CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
msg="cannot recreate disks")
raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
errors.ECODE_INVAL)
- if (self.op.nodes and
+ if ((self.op.nodes or self.op.iallocator) and
sorted(self.disks.keys()) != range(len(instance.disks))):
raise errors.OpPrereqError("Can't recreate disks partially and"
" change the nodes at the same time",
self.instance = instance
+ if self.op.iallocator:
+ self._RunAllocator()
+
+ # Release unneeded node and node resource locks
+ _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
+
def Exec(self, feedback_fn):
"""Recreate the disks.
if self.target_node == instance.primary_node:
raise errors.OpPrereqError("Cannot migrate instance %s"
" to its primary (%s)" %
- (instance.name, instance.primary_node))
+ (instance.name, instance.primary_node),
+ errors.ECODE_STATE)
if len(self.lu.tasklets) == 1:
# It is safe to release locks only when we're the only tasklet
disks = _ExpandCheckDisks(instance, instance.disks)
self.feedback_fn("* unmapping instance's disks from %s" % source_node)
for disk in disks:
- result = self.rpc.call_blockdev_shutdown(source_node, disk)
+ result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
msg = result.fail_msg
if msg:
logging.error("Migration was successful, but couldn't unmap the"
return self._ExecMigration()
-def _CreateBlockDev(lu, node, instance, device, force_create,
- info, force_open):
+def _CreateBlockDev(lu, node, instance, device, force_create, info,
+ force_open):
+ """Wrapper around L{_CreateBlockDevInner}.
+
+ This method annotates the root device first.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
+ return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
+ force_open)
+
+
+def _CreateBlockDevInner(lu, node, instance, device, force_create,
+ info, force_open):
"""Create a tree of block devices on a given node.
If this device type has to be created on secondaries, create it and
If not, just recurse to children keeping the same 'force' value.
+ @attention: The device has to be annotated already.
+
@param lu: the lu on whose behalf we execute
@param node: the node on which to create the device
@type instance: L{objects.Instance}
if device.children:
for child in device.children:
- _CreateBlockDev(lu, node, instance, child, force_create,
- info, force_open)
+ _CreateBlockDevInner(lu, node, instance, child, force_create,
+ info, force_open)
if not force_create:
return
all_result = True
ports_to_release = set()
- for (idx, device) in enumerate(instance.disks):
+ anno_disks = _AnnotateDiskParams(instance, instance.disks, lu.cfg)
+ for (idx, device) in enumerate(anno_disks):
if target_node:
edata = [(target_node, device)]
else:
edata = device.ComputeNodeTree(instance.primary_node)
for node, disk in edata:
lu.cfg.SetDiskID(disk, node)
- msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
- if msg:
+ result = lu.rpc.call_blockdev_remove(node, disk)
+ if result.fail_msg:
lu.LogWarning("Could not remove disk %s on node %s,"
- " continuing anyway: %s", idx, node, msg)
- all_result = False
+ " continuing anyway: %s", idx, node, result.fail_msg)
+ if not (result.offline and node != instance.primary_node):
+ all_result = False
# if this is a DRBD disk, return its port to the pool
if device.dev_type in constants.LDS_DRBD:
if self.op.disk_template not in constants.DISK_TEMPLATES:
raise errors.OpPrereqError("Disk template specified in configuration"
" file is not one of the allowed values:"
- " %s" % " ".join(constants.DISK_TEMPLATES))
+ " %s" %
+ " ".join(constants.DISK_TEMPLATES),
+ errors.ECODE_INVAL)
else:
raise errors.OpPrereqError("No disk template specified and the export"
" is missing the disk_template information",
cfg_storagedir = get_fsd_fn()
if not cfg_storagedir:
- raise errors.OpPrereqError("Cluster file storage dir not defined")
+ raise errors.OpPrereqError("Cluster file storage dir not defined",
+ errors.ECODE_STATE)
joinargs.append(cfg_storagedir)
if self.op.file_storage_dir is not None:
if self.op.mode == constants.INSTANCE_IMPORT:
export_info = self._ReadExportInfo()
self._ReadExportParams(export_info)
+ self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
+ else:
+ self._old_instance_name = None
if (not self.cfg.GetVGName() and
self.op.disk_template not in constants.DTS_NOT_LVM):
self.src_images = disk_images
- old_name = export_info.get(constants.INISECT_INS, "name")
- if self.op.instance_name == old_name:
+ if self.op.instance_name == self._old_instance_name:
for idx, nic in enumerate(self.nics):
if nic.mac == constants.VALUE_AUTO:
nic_mac_ini = "nic%d_mac" % idx
_ReleaseLocks(self, locking.LEVEL_NODE_RES)
if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
+ # we need to set the disks ID to the primary node, since the
+ # preceding code might or might have not done it, depending on
+ # disk template and other options
+ for disk in iobj.disks:
+ self.cfg.SetDiskID(disk, pnode_name)
if self.op.mode == constants.INSTANCE_CREATE:
if not self.op.no_install:
pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
os_add_result.Raise("Could not add os for instance %s"
" on node %s" % (instance, pnode_name))
- elif self.op.mode == constants.INSTANCE_IMPORT:
- feedback_fn("* running the instance OS import scripts...")
+ else:
+ if self.op.mode == constants.INSTANCE_IMPORT:
+ feedback_fn("* running the instance OS import scripts...")
+
+ transfers = []
+
+ for idx, image in enumerate(self.src_images):
+ if not image:
+ continue
+
+ # FIXME: pass debug option from opcode to backend
+ dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+ constants.IEIO_FILE, (image, ),
+ constants.IEIO_SCRIPT,
+ (iobj.disks[idx], idx),
+ None)
+ transfers.append(dt)
+
+ import_result = \
+ masterd.instance.TransferInstanceData(self, feedback_fn,
+ self.op.src_node, pnode_name,
+ self.pnode.secondary_ip,
+ iobj, transfers)
+ if not compat.all(import_result):
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
+ rename_from = self._old_instance_name
+
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ feedback_fn("* preparing remote import...")
+ # The source cluster will stop the instance before attempting to make
+ # a connection. In some cases stopping an instance can take a long
+ # time, hence the shutdown timeout is added to the connection
+ # timeout.
+ connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
+ self.op.source_shutdown_timeout)
+ timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
- transfers = []
+ assert iobj.primary_node == self.pnode.name
+ disk_results = \
+ masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
+ self.source_x509_ca,
+ self._cds, timeouts)
+ if not compat.all(disk_results):
+ # TODO: Should the instance still be started, even if some disks
+ # failed to import (valid for local imports, too)?
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
- for idx, image in enumerate(self.src_images):
- if not image:
- continue
+ rename_from = self.source_instance_name
- # FIXME: pass debug option from opcode to backend
- dt = masterd.instance.DiskTransfer("disk/%s" % idx,
- constants.IEIO_FILE, (image, ),
- constants.IEIO_SCRIPT,
- (iobj.disks[idx], idx),
- None)
- transfers.append(dt)
-
- import_result = \
- masterd.instance.TransferInstanceData(self, feedback_fn,
- self.op.src_node, pnode_name,
- self.pnode.secondary_ip,
- iobj, transfers)
- if not compat.all(import_result):
- self.LogWarning("Some disks for instance %s on node %s were not"
- " imported successfully" % (instance, pnode_name))
-
- elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
- feedback_fn("* preparing remote import...")
- # The source cluster will stop the instance before attempting to make a
- # connection. In some cases stopping an instance can take a long time,
- # hence the shutdown timeout is added to the connection timeout.
- connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
- self.op.source_shutdown_timeout)
- timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
-
- assert iobj.primary_node == self.pnode.name
- disk_results = \
- masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
- self.source_x509_ca,
- self._cds, timeouts)
- if not compat.all(disk_results):
- # TODO: Should the instance still be started, even if some disks
- # failed to import (valid for local imports, too)?
- self.LogWarning("Some disks for instance %s on node %s were not"
- " imported successfully" % (instance, pnode_name))
+ else:
+ # also checked in the prereq part
+ raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
+ % self.op.mode)
# Run rename script on newly imported instance
assert iobj.name == instance
feedback_fn("Running rename script for %s" % instance)
result = self.rpc.call_instance_run_rename(pnode_name, iobj,
- self.source_instance_name,
+ rename_from,
self.op.debug_level)
if result.fail_msg:
self.LogWarning("Failed to run rename script for %s on node"
" %s: %s" % (instance, pnode_name, result.fail_msg))
- else:
- # also checked in the prereq part
- raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
- % self.op.mode)
-
assert not self.owned_locks(locking.LEVEL_NODE_RES)
if self.op.start:
self.lu.LogInfo("Checking disk/%d on %s", idx, node)
self.cfg.SetDiskID(dev, node)
- result = self.rpc.call_blockdev_find(node, dev)
+ result = _BlockdevFind(self, node, dev, instance)
if result.offline:
continue
self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
self.cfg.SetDiskID(dev, node)
- result = self.rpc.call_blockdev_find(node, dev)
+ result = _BlockdevFind(self, node, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
"""
iv_names = {}
- for idx, dev in enumerate(self.instance.disks):
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ for idx, dev in enumerate(disks):
if idx not in self.disks:
continue
lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
names = _GenerateUniqueNames(self.lu, lv_names)
- vg_data = dev.children[0].logical_id[0]
+ (data_disk, meta_disk) = dev.children
+ vg_data = data_disk.logical_id[0]
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
- logical_id=(vg_data, names[0]), params={})
- vg_meta = dev.children[1].logical_id[0]
+ logical_id=(vg_data, names[0]),
+ params=data_disk.params)
+ vg_meta = meta_disk.logical_id[0]
lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
- logical_id=(vg_meta, names[1]), params={})
+ logical_id=(vg_meta, names[1]),
+ params=meta_disk.params)
new_lvs = [lv_data, lv_meta]
old_lvs = [child.Copy() for child in dev.children]
# we pass force_create=True to force the LVM creation
for new_lv in new_lvs:
- _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False)
+ _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
+ _GetInstanceInfoText(self.instance), False)
return iv_names
for name, (dev, _, _) in iv_names.iteritems():
self.cfg.SetDiskID(dev, node_name)
- result = self.rpc.call_blockdev_find(node_name, dev)
+ result = _BlockdevFind(self, node_name, dev, self.instance)
msg = result.fail_msg
if msg or not result.payload:
# Now that the new lvs have the old name, we can add them to the device
self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
result = self.rpc.call_blockdev_addchildren(self.target_node,
- (dev, self.instance),
- (new_lvs, self.instance))
+ (dev, self.instance), new_lvs)
msg = result.fail_msg
if msg:
for new_lv in new_lvs:
# Step: create new storage
self.lu.LogStep(3, steps_total, "Allocate new storage")
- for idx, dev in enumerate(self.instance.disks):
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ for idx, dev in enumerate(disks):
self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
(self.new_node, idx))
# we pass force_create=True to force LVM creation
for new_lv in dev.children:
- _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False)
+ _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
+ True, _GetInstanceInfoText(self.instance), False)
# Step 4: dbrd minors and drbd setups changes
# after this, we must manually remove the drbd minors on both the
children=dev.children,
size=dev.size,
params={})
+ (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
+ self.cfg)
try:
- _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
+ _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
+ anno_new_drbd,
_GetInstanceInfoText(self.instance), False)
except errors.GenericError:
self.cfg.ReleaseDRBDMinors(self.instance.name)
for idx, dev in enumerate(self.instance.disks):
self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
self.cfg.SetDiskID(dev, self.target_node)
- msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
+ msg = self.rpc.call_blockdev_shutdown(self.target_node,
+ (dev, self.instance)).fail_msg
if msg:
self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
"node: %s" % (idx, msg),
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
- True)
+ True, True)
result.Raise("Grow request failed to node %s" % node)
# We know that (as far as we can test) operations across different
- # nodes will succeed, time to run it for real
+ # nodes will succeed, time to run it for real on the backing storage
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
- False)
+ False, True)
result.Raise("Grow request failed to node %s" % node)
- # TODO: Rewrite code to work properly
- # DRBD goes into sync mode for a short amount of time after executing the
- # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
- # calling "resize" in sync mode fails. Sleeping for a short amount of
- # time is a work-around.
- time.sleep(5)
+ # And now execute it for logical storage, on the primary node
+ node = instance.primary_node
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ False, False)
+ result.Raise("Grow request failed to node %s" % node)
disk.RecordGrow(self.delta)
self.cfg.Update(instance, feedback_fn)
"""Compute block device status.
"""
+ (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
+
+ return self._ComputeDiskStatusInner(instance, snode, anno_dev)
+
+ def _ComputeDiskStatusInner(self, instance, snode, dev):
+ """Compute block device status.
+
+ @attention: The device has to be annotated already.
+
+ """
if dev.dev_type in constants.LDS_DRBD:
# we change the snode then (otherwise we use the one passed in)
if dev.logical_id[0] == instance.primary_node:
dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
if dev.children:
- dev_children = map(compat.partial(self._ComputeDiskStatus,
+ dev_children = map(compat.partial(self._ComputeDiskStatusInner,
instance, snode),
dev.children)
else:
private.params = new_params
private.filled = new_filled_params
- return (None, None)
-
def CheckPrereq(self):
"""Check prerequisites.
self.be_proposed = cluster.SimpleFillBE(instance.beparams)
be_old = cluster.FillBE(instance)
- # CPU param validation -- checking every time a paramtere is
+ # CPU param validation -- checking every time a parameter is
# changed to cover all cases where either CPU mask or vcpus have
# changed
if (constants.BE_VCPUS in self.be_proposed and
raise errors.OpPrereqError("This change will prevent the instance"
" from starting, due to %d MB of memory"
" missing on its primary node" %
- miss_mem,
- errors.ECODE_NORES)
+ miss_mem, errors.ECODE_NORES)
if be_new[constants.BE_AUTO_BALANCE]:
for node, nres in nodeinfo.items():
instance.hypervisor)
remote_info.Raise("Error checking node %s" % instance.primary_node)
if not remote_info.payload: # not running already
- raise errors.OpPrereqError("Instance %s is not running" % instance.name,
- errors.ECODE_STATE)
+ raise errors.OpPrereqError("Instance %s is not running" %
+ instance.name, errors.ECODE_STATE)
current_memory = remote_info.payload["memory"]
if (not self.op.force and
if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Disk operations not supported for"
- " diskless instances",
- errors.ECODE_INVAL)
+ " diskless instances", errors.ECODE_INVAL)
def _PrepareNicCreate(_, params, private):
- return self._PrepareNicModification(params, private, None, {},
- cluster, pnode)
+ self._PrepareNicModification(params, private, None, {}, cluster, pnode)
+ return (None, None)
def _PrepareNicMod(_, nic, params, private):
- return self._PrepareNicModification(params, private, nic.ip,
- nic.nicparams, cluster, pnode)
+ self._PrepareNicModification(params, private, nic.ip,
+ nic.nicparams, cluster, pnode)
+ return None
# Verify NIC changes (operating on copy)
nics = instance.nics[:]
instance.name, pnode, [snode],
disk_info, None, None, 0, feedback_fn,
self.diskparams)
+ anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
+ self.diskparams)
info = _GetInstanceInfoText(instance)
feedback_fn("Creating additional volumes...")
# first, create the missing data and meta devices
- for disk in new_disks:
+ for disk in anno_disks:
# unfortunately this is... not too nice
_CreateSingleBlockDev(self, pnode, instance, disk.children[1],
info, True)
feedback_fn("Initializing DRBD devices...")
# all child devices are in place, we can now create the DRBD devices
- for disk in new_disks:
+ for disk in anno_disks:
for node in [pnode, snode]:
f_create = node == pnode
_CreateSingleBlockDev(self, node, instance, disk, info, f_create)
snode = instance.secondary_nodes[0]
feedback_fn("Converting template to plain")
- old_disks = instance.disks
- new_disks = [d.children[0] for d in old_disks]
+ old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
+ new_disks = [d.children[0] for d in instance.disks]
# copy over size and mode
for parent, child in zip(old_disks, new_disks):
"""Removes a disk.
"""
- for node, disk in root.ComputeNodeTree(self.instance.primary_node):
+ (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
+ for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
self.cfg.SetDiskID(disk, node)
msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
if msg:
raise errors.OpPrereqError("Can't compute solution for changing group of"
" instance '%s' using iallocator '%s': %s" %
(self.op.instance_name, self.op.iallocator,
- ial.info),
- errors.ECODE_NORES)
+ ial.info), errors.ECODE_NORES)
jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
self.instance.admin_state == constants.ADMINST_UP and
not self.op.shutdown):
raise errors.OpPrereqError("Can not remove instance without shutting it"
- " down before")
+ " down before", errors.ECODE_STATE)
if self.op.mode == constants.EXPORT_MODE_LOCAL:
self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
try:
(key_name, hmac_digest, hmac_salt) = self.x509_key_name
except (TypeError, ValueError), err:
- raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
+ raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
+ errors.ECODE_INVAL)
if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
raise errors.OpPrereqError("HMAC for X509 key name is wrong",
if self.op.diskparams:
for templ in constants.DISK_TEMPLATES:
- if templ not in self.op.diskparams:
- self.op.diskparams[templ] = {}
- utils.ForceDictType(self.op.diskparams[templ], constants.DISK_DT_TYPES)
+ if templ in self.op.diskparams:
+ utils.ForceDictType(self.op.diskparams[templ],
+ constants.DISK_DT_TYPES)
+ self.new_diskparams = self.op.diskparams
+ try:
+ utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
else:
- self.op.diskparams = self.cfg.GetClusterInfo().diskparams
+ self.new_diskparams = {}
if self.op.ipolicy:
cluster = self.cfg.GetClusterInfo()
full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
try:
- objects.InstancePolicy.CheckParameterSyntax(full_ipolicy)
+ objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False)
except errors.ConfigurationError, err:
raise errors.OpPrereqError("Invalid instance policy: %s" % err,
errors.ECODE_INVAL)
uuid=self.group_uuid,
alloc_policy=self.op.alloc_policy,
ndparams=self.op.ndparams,
- diskparams=self.op.diskparams,
+ diskparams=self.new_diskparams,
ipolicy=self.op.ipolicy,
hv_state_static=self.new_hv_state,
disk_state_static=self.new_disk_state)
return query.GroupQueryData(self._cluster,
[self._all_groups[uuid]
for uuid in self.wanted],
- group_to_nodes, group_to_instances)
+ group_to_nodes, group_to_instances,
+ query.GQ_DISKPARAMS in self.requested_data)
class LUGroupQuery(NoHooksLU):
self.needed_locks[locking.LEVEL_INSTANCE] = \
self.cfg.GetNodeGroupInstances(self.group_uuid)
+ @staticmethod
+ def _UpdateAndVerifyDiskParams(old, new):
+ """Updates and verifies disk parameters.
+
+ """
+ new_params = _GetUpdatedParams(old, new)
+ utils.ForceDictType(new_params, constants.DISK_DT_TYPES)
+ return new_params
+
def CheckPrereq(self):
"""Check prerequisites.
self.new_ndparams = new_ndparams
if self.op.diskparams:
- self.new_diskparams = dict()
- for templ in constants.DISK_TEMPLATES:
- if templ not in self.op.diskparams:
- self.op.diskparams[templ] = {}
- new_templ_params = _GetUpdatedParams(self.group.diskparams[templ],
- self.op.diskparams[templ])
- utils.ForceDictType(new_templ_params, constants.DISK_DT_TYPES)
- self.new_diskparams[templ] = new_templ_params
+ diskparams = self.group.diskparams
+ uavdp = self._UpdateAndVerifyDiskParams
+ # For each disktemplate subdict update and verify the values
+ new_diskparams = dict((dt,
+ uavdp(diskparams.get(dt, {}),
+ self.op.diskparams[dt]))
+ for dt in constants.DISK_TEMPLATES
+ if dt in self.op.diskparams)
+ # As we've all subdicts of diskparams ready, lets merge the actual
+ # dict with all updated subdicts
+ self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
+ try:
+ utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+ except errors.OpPrereqError, err:
+ raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+ errors.ECODE_INVAL)
if self.op.hv_state:
self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
# Verify the cluster would not be left group-less.
if len(self.cfg.GetNodeGroupList()) == 1:
- raise errors.OpPrereqError("Group '%s' is the only group,"
- " cannot be removed" %
- self.op.group_name,
+ raise errors.OpPrereqError("Group '%s' is the only group, cannot be"
+ " removed" % self.op.group_name,
errors.ECODE_STATE)
def BuildHooksEnv(self):