import os.path
import time
import re
-import platform
import logging
import copy
import OpenSSL
from ganeti import opcodes
from ganeti import ht
from ganeti import rpc
+from ganeti import runtime
import ganeti.masterd.instance # pylint: disable=W0611
"""Data container for LU results with jobs.
Instances of this class returned from L{LogicalUnit.Exec} will be recognized
- by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+ by L{mcpu._ProcessResult}. The latter will then submit the jobs
contained in the C{jobs} attribute and include the job IDs in the opcode
result.
#: Attribute holding field definitions
FIELDS = None
+ #: Field to sort by
+ SORT_FIELD = "name"
+
def __init__(self, qfilter, fields, use_locking):
"""Initializes this class.
self.use_locking = use_locking
self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
- namefield="name")
+ namefield=self.SORT_FIELD)
self.requested_data = self.query.RequestedData()
self.names = self.query.RequestedNames()
})
+def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
+ cur_group_uuid):
+ """Checks if node groups for locked instances are still correct.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: Cluster configuration
+ @type instances: dict; string as key, L{objects.Instance} as value
+ @param instances: Dictionary, instance name as key, instance object as value
+ @type owned_groups: iterable of string
+ @param owned_groups: List of owned groups
+ @type owned_nodes: iterable of string
+ @param owned_nodes: List of owned nodes
+ @type cur_group_uuid: string or None
+ @param cur_group_uuid: Optional group UUID to check against instance's groups
+
+ """
+ for (name, inst) in instances.items():
+ assert owned_nodes.issuperset(inst.all_nodes), \
+ "Instance %s's nodes changed while we kept the lock" % name
+
+ inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
+
+ assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
+ "Instance %s has no node in group %s" % (name, cur_group_uuid)
+
+
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
"""Checks if the owned node groups are still correct for an instance.
"""Verifies the cluster config.
"""
- REQ_BGL = True
+ REQ_BGL = False
def _VerifyHVP(self, hvp_data):
"""Verifies locally the syntax of the hypervisor parameters.
self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
def ExpandNames(self):
- # Information can be safely retrieved as the BGL is acquired in exclusive
- # mode
- assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
+ self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
+ self.share_locks = _ShareAll()
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ # Retrieve all information
self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
self.all_node_info = self.cfg.GetAllNodesInfo()
self.all_inst_info = self.cfg.GetAllInstancesInfo()
- self.needed_locks = {}
def Exec(self, feedback_fn):
"""Verify integrity of cluster, performing various test on nodes.
self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
- for (instance_name, inst) in self.instances.items():
- assert owned_nodes.issuperset(inst.all_nodes), \
- "Instance %s's nodes changed while we kept the lock" % instance_name
-
- inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
- owned_groups)
-
- assert self.group_uuid in inst_groups, \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ _CheckInstancesNodeGroups(self.cfg, self.instances,
+ owned_groups, owned_nodes, self.group_uuid)
def Exec(self, feedback_fn):
"""Verify integrity of cluster disks.
return not cumul_degraded
-def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
+def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
"""Check that mirrors are not degraded.
The ldisk parameter, if True, will change the test from the
if dev.children:
for child in dev.children:
- result = result and _CheckDiskConsistency(lu, child, node, on_primary)
+ result = result and _CheckDiskConsistency(lu, instance, child, node,
+ on_primary)
return result
"""Logical unit for OOB handling.
"""
- REG_BGL = False
+ REQ_BGL = False
_SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
def ExpandNames(self):
if self.op.disk_state:
self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
+ # TODO: If we need to have multiple DnsOnlyRunner we probably should make
+ # it a property on the base class.
+ result = rpc.DnsOnlyRunner().call_version([node])[node]
+ result.Raise("Can't get version information from node %s" % node)
+ if constants.PROTOCOL_VERSION == result.payload:
+ logging.info("Communication to node %s fine, sw version %s match",
+ node, result.payload)
+ else:
+ raise errors.OpPrereqError("Version mismatch master version %s,"
+ " node version %s" %
+ (constants.PROTOCOL_VERSION, result.payload),
+ errors.ECODE_ENVIRON)
+
def Exec(self, feedback_fn):
"""Adds the new node to the cluster.
if self.op.disk_state:
new_node.disk_state_static = self.new_disk_state
- # check connectivity
- result = self.rpc.call_version([node])[node]
- result.Raise("Can't get version information from node %s" % node)
- if constants.PROTOCOL_VERSION == result.payload:
- logging.info("Communication to node %s fine, sw version %s match",
- node, result.payload)
- else:
- raise errors.OpExecError("Version mismatch master version %s,"
- " node version %s" %
- (constants.PROTOCOL_VERSION, result.payload))
-
# Add node to our /etc/hosts, and add key to known_hosts
if self.cfg.GetClusterInfo().modify_etc_hosts:
master_node = self.cfg.GetMasterNode()
if old_role == self._ROLE_OFFLINE and new_role != old_role:
# Trying to transition out of offline status
- # TODO: Use standard RPC runner, but make sure it works when the node is
- # still marked offline
- result = rpc.BootstrapRunner().call_version([node.name])[node.name]
+ result = self.rpc.call_version([node.name])[node.name]
if result.fail_msg:
raise errors.OpPrereqError("Node %s is being de-offlined but fails"
" to report its version: %s" %
"config_version": constants.CONFIG_VERSION,
"os_api_version": max(constants.OS_API_VERSIONS),
"export_version": constants.EXPORT_VERSION,
- "architecture": (platform.architecture()[0], platform.machine()),
+ "architecture": runtime.GetArchInfo(),
"name": cluster.cluster_name,
"master": cluster.master_node,
"default_hypervisor": cluster.primary_hypervisor,
"""
REQ_BGL = False
- _FIELDS_DYNAMIC = utils.FieldSet()
- _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
- "watcher_pause", "volume_group_name")
def CheckArguments(self):
- _CheckOutputFields(static=self._FIELDS_STATIC,
- dynamic=self._FIELDS_DYNAMIC,
- selected=self.op.output_fields)
+ self.cq = _ClusterQuery(None, self.op.output_fields, False)
def ExpandNames(self):
- self.needed_locks = {}
+ self.cq.ExpandNames(self)
+
+ def DeclareLocks(self, level):
+ self.cq.DeclareLocks(self, level)
def Exec(self, feedback_fn):
- """Dump a representation of the cluster config to the standard output.
-
- """
- values = []
- for field in self.op.output_fields:
- if field == "cluster_name":
- entry = self.cfg.GetClusterName()
- elif field == "master_node":
- entry = self.cfg.GetMasterNode()
- elif field == "drain_flag":
- entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
- elif field == "watcher_pause":
- entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
- elif field == "volume_group_name":
- entry = self.cfg.GetVGName()
- else:
- raise errors.ParameterError(field)
- values.append(entry)
- return values
+ result = self.cq.OldStyleQuery(self)
+
+ assert len(result) == 1
+
+ return result[0]
+
+
+class _ClusterQuery(_QueryBase):
+ FIELDS = query.CLUSTER_FIELDS
+
+ #: Do not sort (there is only one item)
+ SORT_FIELD = None
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ # The following variables interact with _QueryBase._GetNames
+ self.wanted = locking.ALL_SET
+ self.do_locking = self.use_locking
+
+ if self.do_locking:
+ raise errors.OpPrereqError("Can not use locking for cluster queries",
+ errors.ECODE_INVAL)
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
+
+ """
+ # Locking is not used
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ if query.CQ_CONFIG in self.requested_data:
+ cluster = lu.cfg.GetClusterInfo()
+ else:
+ cluster = NotImplemented
+
+ if query.CQ_QUEUE_DRAINED in self.requested_data:
+ drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+ else:
+ drain_flag = NotImplemented
+
+ if query.CQ_WATCHER_PAUSE in self.requested_data:
+ watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+ else:
+ watcher_pause = NotImplemented
+
+ return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
class LUInstanceActivateDisks(NoHooksLU):
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ False, idx)
msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
node_disk = node_disk.Copy()
node_disk.UnsetSize()
lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ True, idx)
msg = result.fail_msg
if msg:
lu.proc.LogWarning("Could not prepare block device %s on node %s"
"""
logging.info("Removing block devices for instance %s", instance.name)
- if not _RemoveDisks(lu, instance):
+ if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures):
if not ignore_failures:
raise errors.OpExecError("Can't remove instance's disks")
feedback_fn("Warning: can't remove instance's disks")
# activate, get path, copy the data over
for idx, disk in enumerate(instance.disks):
self.LogInfo("Copying data for disk %d", idx)
- result = self.rpc.call_blockdev_assemble(target_node, disk,
+ result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
instance.name, True, idx)
if result.fail_msg:
self.LogWarning("Can't assemble newly created disk %d: %s",
errs.append(result.fail_msg)
break
dev_path = result.payload
- result = self.rpc.call_blockdev_export(source_node, disk,
+ result = self.rpc.call_blockdev_export(source_node, (disk, instance),
target_node, dev_path,
cluster_name)
if result.fail_msg:
all_done = True
result = self.rpc.call_drbd_wait_sync(self.all_nodes,
self.nodes_ip,
- self.instance.disks)
+ (self.instance.disks,
+ self.instance))
min_percent = 100
for node, nres in result.items():
nres.Raise("Cannot resync disks on node %s" % node)
msg = "single-master"
self.feedback_fn("* changing disks into %s mode" % msg)
result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
- self.instance.disks,
+ (self.instance.disks, self.instance),
self.instance.name, multimaster)
for node, nres in result.items():
nres.Raise("Cannot change disks config on node %s" % node)
self.feedback_fn("* checking disk consistency between source and target")
for (idx, dev) in enumerate(instance.disks):
- if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+ if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
raise errors.OpExecError("Disk %s is degraded or not fully"
" synchronized on target node,"
" aborting migration" % idx)
self.feedback_fn("* checking disk consistency between source and target")
for (idx, dev) in enumerate(instance.disks):
# for drbd, these are drbd over lvm
- if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+ if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
+ False):
if primary_node.offline:
self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
" target node %s" %
return results
-def _ComputeLDParams(disk_template, disk_params):
- """Computes Logical Disk parameters from Disk Template parameters.
-
- @type disk_template: string
- @param disk_template: disk template, one of L{constants.DISK_TEMPLATES}
- @type disk_params: dict
- @param disk_params: disk template parameters; dict(template_name -> parameters
- @rtype: list(dict)
- @return: a list of dicts, one for each node of the disk hierarchy. Each dict
- contains the LD parameters of the node. The tree is flattened in-order.
-
- """
- if disk_template not in constants.DISK_TEMPLATES:
- raise errors.ProgrammerError("Unknown disk template %s" % disk_template)
-
- result = list()
- dt_params = disk_params[disk_template]
- if disk_template == constants.DT_DRBD8:
- drbd_params = {
- constants.LDP_RESYNC_RATE: dt_params[constants.DRBD_RESYNC_RATE],
- constants.LDP_BARRIERS: dt_params[constants.DRBD_DISK_BARRIERS],
- constants.LDP_NO_META_FLUSH: dt_params[constants.DRBD_META_BARRIERS],
- constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG],
- constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM],
- constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM],
- constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC],
- constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD],
- constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET],
- constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET],
- constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE],
- constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE],
- }
-
- drbd_params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_DRBD8],
- drbd_params)
-
- result.append(drbd_params)
-
- # data LV
- data_params = {
- constants.LDP_STRIPES: dt_params[constants.DRBD_DATA_STRIPES],
- }
- data_params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
- data_params)
- result.append(data_params)
-
- # metadata LV
- meta_params = {
- constants.LDP_STRIPES: dt_params[constants.DRBD_META_STRIPES],
- }
- meta_params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
- meta_params)
- result.append(meta_params)
-
- elif (disk_template == constants.DT_FILE or
- disk_template == constants.DT_SHARED_FILE):
- result.append(constants.DISK_LD_DEFAULTS[constants.LD_FILE])
-
- elif disk_template == constants.DT_PLAIN:
- params = {
- constants.LDP_STRIPES: dt_params[constants.LV_STRIPES],
- }
- params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
- params)
- result.append(params)
-
- elif disk_template == constants.DT_BLOCK:
- result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV])
-
- elif disk_template == constants.DT_RBD:
- params = {
- constants.LDP_POOL: dt_params[constants.RBD_POOL]
- }
- params = \
- objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD],
- params)
- result.append(params)
-
- return result
-
-
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
- iv_name, p_minor, s_minor, drbd_params, data_params,
- meta_params):
+ iv_name, p_minor, s_minor):
"""Generate a drbd8 device complete with its children.
"""
dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
logical_id=(vgnames[0], names[0]),
- params=data_params)
+ params={})
dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
logical_id=(vgnames[1], names[1]),
- params=meta_params)
+ params={})
drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
logical_id=(primary, secondary, port,
p_minor, s_minor,
shared_secret),
children=[dev_data, dev_meta],
- iv_name=iv_name, params=drbd_params)
+ iv_name=iv_name, params={})
return drbd_dev
def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
secondary_nodes, disk_info, file_storage_dir, file_driver, base_index,
- feedback_fn, disk_params,
- _req_file_storage=opcodes.RequireFileStorage,
+ feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
_req_shr_file_storage=opcodes.RequireSharedFileStorage):
"""Generate the entire disk layout for a given template type.
vgname = lu.cfg.GetVGName()
disk_count = len(disk_info)
disks = []
- ld_params = _ComputeLDParams(template_name, disk_params)
if template_name == constants.DT_DISKLESS:
pass
elif template_name == constants.DT_DRBD8:
- drbd_params, data_params, meta_params = ld_params
if len(secondary_nodes) != 1:
raise errors.ProgrammerError("Wrong template configuration")
remote_node = secondary_nodes[0]
minors = lu.cfg.AllocateDRBDMinor(
[primary_node, remote_node] * len(disk_info), instance_name)
+ (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
+ full_disk_params)
+ drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
+
names = []
for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
for i in range(disk_count)]):
names.append(lv_prefix + "_meta")
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
- drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
data_vg = disk.get(constants.IDISK_VG, vgname)
meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
[data_vg, meta_vg],
names[idx * 2:idx * 2 + 2],
"disk/%d" % disk_index,
- minors[idx * 2], minors[idx * 2 + 1],
- drbd_params, data_params, meta_params)
+ minors[idx * 2], minors[idx * 2 + 1])
disk_dev.mode = disk[constants.IDISK_MODE]
disks.append(disk_dev)
else:
(name_prefix, base_index + i)
for i in range(disk_count)])
- dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
-
if template_name == constants.DT_PLAIN:
def logical_id_fn(idx, _, disk):
vg = disk.get(constants.IDISK_VG, vgname)
else:
raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
+ dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
+
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
size = disk[constants.IDISK_SIZE]
logical_id=logical_id_fn(idx, disk_index, disk),
iv_name="disk/%d" % disk_index,
mode=disk[constants.IDISK_MODE],
- params=ld_params[0]))
+ params={}))
return disks
lu.cfg.SetDiskID(device, node)
logging.info("Pause sync of instance %s disks", instance.name)
- result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (instance.disks, instance),
+ True)
for idx, success in enumerate(result.payload):
if not success:
wipe_size = min(wipe_chunk_size, size - offset)
logging.debug("Wiping disk %d, offset %s, chunk %s",
idx, offset, wipe_size)
- result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+ result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
+ wipe_size)
result.Raise("Could not wipe disk %d at offset %d for size %d" %
(idx, offset, wipe_size))
now = time.time()
finally:
logging.info("Resume sync of instance %s disks", instance.name)
- result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (instance.disks, instance),
+ False)
for idx, success in enumerate(result.payload):
if not success:
_CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
-def _RemoveDisks(lu, instance, target_node=None):
+def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
"""Remove all disks for an instance.
This abstracts away some work from `AddInstance()` and
logging.info("Removing block devices for instance %s", instance.name)
all_result = True
+ ports_to_release = set()
for (idx, device) in enumerate(instance.disks):
if target_node:
edata = [(target_node, device)]
# if this is a DRBD disk, return its port to the pool
if device.dev_type in constants.LDS_DRBD:
- tcp_port = device.logical_id[2]
- lu.cfg.AddTcpUdpPort(tcp_port)
+ ports_to_release.add(device.logical_id[2])
+
+ if all_result or ignore_failures:
+ for port in ports_to_release:
+ lu.cfg.AddTcpUdpPort(port)
if instance.disk_template == constants.DT_FILE:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
utils.CommaJoin(res)),
errors.ECODE_INVAL)
- # disk parameters (not customizable at instance or node level)
- # just use the primary node parameters, ignoring the secondary.
- self.diskparams = group_info.diskparams
-
if not self.adopt_disks:
if self.op.disk_template == constants.DT_RBD:
# _CheckRADOSFreeSpace() is just a placeholder.
else:
network_port = None
+ # This is ugly but we got a chicken-egg problem here
+ # We can only take the group disk parameters, as the instance
+ # has no disks yet (we are generating them right here).
+ node = self.cfg.GetNodeInfo(pnode_name)
+ nodegroup = self.cfg.GetNodeGroup(node.group)
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
instance, pnode_name,
self.op.file_driver,
0,
feedback_fn,
- self.diskparams)
+ self.cfg.GetGroupDiskParams(nodegroup))
iobj = objects.Instance(name=instance, os=self.op.os_type,
primary_node=pnode_name,
if pause_sync:
feedback_fn("* pausing disk sync to install instance OS")
result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
- iobj.disks, True)
+ (iobj.disks,
+ iobj), True)
for idx, success in enumerate(result.payload):
if not success:
logging.warn("pause-sync of instance %s for disk %d failed",
if pause_sync:
feedback_fn("* resuming disk sync")
result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
- iobj.disks, False)
+ (iobj.disks,
+ iobj), False)
for idx, success in enumerate(result.payload):
if not success:
logging.warn("resume-sync of instance %s for disk %d failed",
_CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
ignore=self.ignore_ipolicy)
- # TODO: compute disk parameters
- primary_node_info = self.cfg.GetNodeInfo(instance.primary_node)
- secondary_node_info = self.cfg.GetNodeInfo(secondary_node)
- if primary_node_info.group != secondary_node_info.group:
- self.lu.LogInfo("The instance primary and secondary nodes are in two"
- " different node groups; the disk parameters of the"
- " primary node's group will be applied.")
-
- self.diskparams = self.cfg.GetNodeGroup(primary_node_info.group).diskparams
-
for node in check_nodes:
_CheckNodeOnline(self.lu, node)
self.lu.LogInfo("Checking disk/%d consistency on node %s" %
(idx, node_name))
- if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
- ldisk=ldisk):
+ if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+ on_primary, ldisk=ldisk):
raise errors.OpExecError("Node %s has degraded storage, unsafe to"
" replace disks for instance %s" %
(node_name, self.instance.name))
lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
names = _GenerateUniqueNames(self.lu, lv_names)
- _, data_p, meta_p = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
-
vg_data = dev.children[0].logical_id[0]
lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
- logical_id=(vg_data, names[0]), params=data_p)
+ logical_id=(vg_data, names[0]), params={})
vg_meta = dev.children[1].logical_id[0]
lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
- logical_id=(vg_meta, names[1]), params=meta_p)
+ logical_id=(vg_meta, names[1]), params={})
new_lvs = [lv_data, lv_meta]
old_lvs = [child.Copy() for child in dev.children]
# Now that the new lvs have the old name, we can add them to the device
self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
- result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
- new_lvs)
+ result = self.rpc.call_blockdev_addchildren(self.target_node,
+ (dev, self.instance),
+ (new_lvs, self.instance))
msg = result.fail_msg
if msg:
for new_lv in new_lvs:
iv_names[idx] = (dev, dev.children, new_net_id)
logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
new_net_id)
- drbd_params, _, _ = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
logical_id=new_alone_id,
children=dev.children,
size=dev.size,
- params=drbd_params)
+ params={})
try:
_CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
_GetInstanceInfoText(self.instance), False)
result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
self.new_node],
self.node_secondary_ip,
- self.instance.disks,
+ (self.instance.disks, self.instance),
self.instance.name,
False)
for to_node, to_result in result.items():
env = {
"DISK": self.op.disk,
"AMOUNT": self.op.amount,
+ "ABSOLUTE": self.op.absolute,
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
return env
self.disk = instance.FindDisk(self.op.disk)
+ if self.op.absolute:
+ self.target = self.op.amount
+ self.delta = self.target - self.disk.size
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested size (%s) is smaller than "
+ "current disk size (%s)" %
+ (utils.FormatUnit(self.target, "h"),
+ utils.FormatUnit(self.disk.size, "h")),
+ errors.ECODE_STATE)
+ else:
+ self.delta = self.op.amount
+ self.target = self.disk.size + self.delta
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested increment (%s) is negative" %
+ utils.FormatUnit(self.delta, "h"),
+ errors.ECODE_INVAL)
+
if instance.disk_template not in (constants.DT_FILE,
constants.DT_SHARED_FILE,
constants.DT_RBD):
# TODO: check the free disk space for file, when that feature will be
# supported
_CheckNodesFreeDiskPerVG(self, nodenames,
- self.disk.ComputeGrowth(self.op.amount))
+ self.disk.ComputeGrowth(self.delta))
def Exec(self, feedback_fn):
"""Execute disk grow.
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
- feedback_fn("Growing disk %s of instance '%s' by %s" %
+ feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
(self.op.disk, instance.name,
- utils.FormatUnit(self.op.amount, "h")))
+ utils.FormatUnit(self.delta, "h"),
+ utils.FormatUnit(self.target, "h")))
# First run all grow ops in dry-run mode
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ True)
result.Raise("Grow request failed to node %s" % node)
# We know that (as far as we can test) operations across different
# nodes will succeed, time to run it for real
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ False)
result.Raise("Grow request failed to node %s" % node)
# TODO: Rewrite code to work properly
# time is a work-around.
time.sleep(5)
- disk.RecordGrow(self.op.amount)
+ disk.RecordGrow(self.delta)
self.cfg.Update(instance, feedback_fn)
# Changes have been recorded, release node lock
else:
self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
self.needed_locks[locking.LEVEL_NODE] = []
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
def DeclareLocks(self, level):
- if self.op.use_locking and level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
+ if self.op.use_locking:
+ if level == locking.LEVEL_NODEGROUP:
+ owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+
+ # Lock all groups used by instances optimistically; this requires going
+ # via the node before it's locked, requiring verification later on
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ frozenset(group_uuid
+ for instance_name in owned_instances
+ for group_uuid in
+ self.cfg.GetInstanceNodeGroups(instance_name))
+
+ elif level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
def CheckPrereq(self):
"""Check prerequisites.
This only checks the optional instance list against the existing names.
"""
+ owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
+
if self.wanted_names is None:
assert self.op.use_locking, "Locking was not used"
- self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
+ self.wanted_names = owned_instances
- self.wanted_instances = \
- map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
+ instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
- def _ComputeBlockdevStatus(self, node, instance_name, dev):
+ if self.op.use_locking:
+ _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
+ None)
+ else:
+ assert not (owned_instances or owned_groups or owned_nodes)
+
+ self.wanted_instances = instances.values()
+
+ def _ComputeBlockdevStatus(self, node, instance, dev):
"""Returns the status of a block device
"""
if result.offline:
return None
- result.Raise("Can't compute disk status for %s" % instance_name)
+ result.Raise("Can't compute disk status for %s" % instance.name)
status = result.payload
if status is None:
snode = dev.logical_id[0]
dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
- instance.name, dev)
- dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
+ instance, dev)
+ dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
if dev.children:
dev_children = map(compat.partial(self._ComputeDiskStatus,
cluster = self.cfg.GetClusterInfo()
- pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node
- for i in self.wanted_instances)
- for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes):
+ node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
+ nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
+
+ groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
+ for node in nodes.values()))
+
+ group2name_fn = lambda uuid: groups[uuid].name
+
+ for instance in self.wanted_instances:
+ pnode = nodes[instance.primary_node]
+
if self.op.static or pnode.offline:
remote_state = None
if pnode.offline:
disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
instance.disks)
+ snodes_group_uuids = [nodes[snode_name].group
+ for snode_name in instance.secondary_nodes]
+
result[instance.name] = {
"name": instance.name,
"config_state": instance.admin_state,
"run_state": remote_state,
"pnode": instance.primary_node,
+ "pnode_group_uuid": pnode.group,
+ "pnode_group_name": group2name_fn(pnode.group),
"snodes": instance.secondary_nodes,
+ "snodes_group_uuids": snodes_group_uuids,
+ "snodes_group_names": map(group2name_fn, snodes_group_uuids),
"os": instance.os,
# this happens to be the same format used for hooks
"nics": _NICListToTuple(self, instance.nics),
pnode = instance.primary_node
nodelist = list(instance.all_nodes)
pnode_info = self.cfg.GetNodeInfo(pnode)
- self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams
+ self.diskparams = self.cfg.GetInstanceDiskParams(instance)
# Prepare disk/NIC modifications
self.diskmod = PrepareContainerMods(self.op.disks, None)
disk_info, None, None, 0, feedback_fn,
self.diskparams)
info = _GetInstanceInfoText(instance)
- feedback_fn("Creating aditional volumes...")
+ feedback_fn("Creating additional volumes...")
# first, create the missing data and meta devices
for disk in new_disks:
# unfortunately this is... not too nice
child.size = parent.size
child.mode = parent.mode
+ # this is a DRBD disk, return its port to the pool
+ # NOTE: this must be done right before the call to cfg.Update!
+ for disk in old_disks:
+ tcp_port = disk.logical_id[2]
+ self.cfg.AddTcpUdpPort(tcp_port)
+
# update instance structure
instance.disks = new_disks
instance.disk_template = constants.DT_PLAIN
self.LogWarning("Could not remove metadata for disk %d on node %s,"
" continuing anyway: %s", idx, pnode, msg)
- # this is a DRBD disk, return its port to the pool
- for disk in old_disks:
- tcp_port = disk.logical_id[2]
- self.cfg.AddTcpUdpPort(tcp_port)
-
- # Node resource locks will be released by caller
-
def _CreateNewDisk(self, idx, params, _):
"""Creates a new disk.
if self.req_target_uuids:
# User requested specific target groups
- self.target_uuids = self.req_target_uuids
+ self.target_uuids = frozenset(self.req_target_uuids)
else:
# All groups except those used by the instance are potential targets
self.target_uuids = owned_groups - inst_groups
"""
REQ_BGL = False
+ def CheckArguments(self):
+ self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
+ ["node", "export"], self.op.use_locking)
+
def ExpandNames(self):
- self.needed_locks = {}
- self.share_locks[locking.LEVEL_NODE] = 1
- if not self.op.nodes:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- else:
- self.needed_locks[locking.LEVEL_NODE] = \
- _GetWantedNodes(self, self.op.nodes)
+ self.expq.ExpandNames(self)
+
+ def DeclareLocks(self, level):
+ self.expq.DeclareLocks(self, level)
def Exec(self, feedback_fn):
- """Compute the list of all the exported system images.
+ result = {}
- @rtype: dict
- @return: a dictionary with the structure node->(export-list)
- where export-list is a list of the instances exported on
- that node.
+ for (node, expname) in self.expq.OldStyleQuery(self):
+ if expname is None:
+ result[node] = False
+ else:
+ result.setdefault(node, []).append(expname)
+
+ return result
+
+
+class _ExportQuery(_QueryBase):
+ FIELDS = query.EXPORT_FIELDS
+
+ #: The node name is not a unique key for this query
+ SORT_FIELD = "node"
+
+ def ExpandNames(self, lu):
+ lu.needed_locks = {}
+
+ # The following variables interact with _QueryBase._GetNames
+ if self.names:
+ self.wanted = _GetWantedNodes(lu, self.names)
+ else:
+ self.wanted = locking.ALL_SET
+
+ self.do_locking = self.use_locking
+
+ if self.do_locking:
+ lu.share_locks = _ShareAll()
+ lu.needed_locks = {
+ locking.LEVEL_NODE: self.wanted,
+ }
+
+ def DeclareLocks(self, lu, level):
+ pass
+
+ def _GetQueryData(self, lu):
+ """Computes the list of nodes and their attributes.
"""
- self.nodes = self.owned_locks(locking.LEVEL_NODE)
- rpcresult = self.rpc.call_export_list(self.nodes)
- result = {}
- for node in rpcresult:
- if rpcresult[node].fail_msg:
- result[node] = False
+ # Locking is not used
+ # TODO
+ assert not (compat.any(lu.glm.is_owned(level)
+ for level in locking.LEVELS
+ if level != locking.LEVEL_CLUSTER) or
+ self.do_locking or self.use_locking)
+
+ nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
+
+ result = []
+
+ for (node, nres) in lu.rpc.call_export_list(nodes).items():
+ if nres.fail_msg:
+ result.append((node, None))
else:
- result[node] = rpcresult[node].payload
+ result.extend((node, expname) for expname in nres.payload)
return result
self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
# Check if node groups for locked instances are still correct
- for instance_name in owned_instances:
- inst = self.instances[instance_name]
- assert owned_nodes.issuperset(inst.all_nodes), \
- "Instance %s's nodes changed while we kept the lock" % instance_name
-
- inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
- owned_groups)
-
- assert self.group_uuid in inst_groups, \
- "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ _CheckInstancesNodeGroups(self.cfg, self.instances,
+ owned_groups, owned_nodes, self.group_uuid)
if self.req_target_uuids:
# User requested specific target groups
def ExpandNames(self):
self.group_uuid = None
self.needed_locks = {}
+
if self.op.kind == constants.TAG_NODE:
self.op.name = _ExpandNodeName(self.cfg, self.op.name)
- self.needed_locks[locking.LEVEL_NODE] = self.op.name
+ lock_level = locking.LEVEL_NODE
+ lock_name = self.op.name
elif self.op.kind == constants.TAG_INSTANCE:
self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
- self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+ lock_level = locking.LEVEL_INSTANCE
+ lock_name = self.op.name
elif self.op.kind == constants.TAG_NODEGROUP:
self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
+ lock_level = locking.LEVEL_NODEGROUP
+ lock_name = self.group_uuid
+ else:
+ lock_level = None
+ lock_name = None
+
+ if lock_level and getattr(self.op, "use_locking", True):
+ self.needed_locks[lock_level] = lock_name
# FIXME: Acquire BGL for cluster tag operations (as of this writing it's
# not possible to acquire the BGL based on opcode parameters)
#: Query type implementations
_QUERY_IMPL = {
+ constants.QR_CLUSTER: _ClusterQuery,
constants.QR_INSTANCE: _InstanceQuery,
constants.QR_NODE: _NodeQuery,
constants.QR_GROUP: _GroupQuery,
constants.QR_OS: _OsQuery,
+ constants.QR_EXPORT: _ExportQuery,
}
assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP