X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/34700f5b98e6b7a2009878720c6d0194ed09a0a8..99ccf8b915722aaed2029d189b3d522d5a6c8760:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index e68974b..b672fdd 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -32,7 +32,6 @@ import os import os.path import time import re -import platform import logging import copy import OpenSSL @@ -60,6 +59,7 @@ from ganeti import qlang from ganeti import opcodes from ganeti import ht from ganeti import rpc +from ganeti import runtime import ganeti.masterd.instance # pylint: disable=W0611 @@ -82,7 +82,7 @@ class ResultWithJobs: """Data container for LU results with jobs. Instances of this class returned from L{LogicalUnit.Exec} will be recognized - by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs + by L{mcpu._ProcessResult}. The latter will then submit the jobs contained in the C{jobs} attribute and include the job IDs in the opcode result. @@ -493,6 +493,9 @@ class _QueryBase: #: Attribute holding field definitions FIELDS = None + #: Field to sort by + SORT_FIELD = "name" + def __init__(self, qfilter, fields, use_locking): """Initializes this class. @@ -500,7 +503,7 @@ class _QueryBase: self.use_locking = use_locking self.query = query.Query(self.FIELDS, fields, qfilter=qfilter, - namefield="name") + namefield=self.SORT_FIELD) self.requested_data = self.query.RequestedData() self.names = self.query.RequestedNames() @@ -596,6 +599,32 @@ def _MakeLegacyNodeInfo(data): }) +def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes, + cur_group_uuid): + """Checks if node groups for locked instances are still correct. + + @type cfg: L{config.ConfigWriter} + @param cfg: Cluster configuration + @type instances: dict; string as key, L{objects.Instance} as value + @param instances: Dictionary, instance name as key, instance object as value + @type owned_groups: iterable of string + @param owned_groups: List of owned groups + @type owned_nodes: iterable of string + @param owned_nodes: List of owned nodes + @type cur_group_uuid: string or None + @param cur_group_uuid: Optional group UUID to check against instance's groups + + """ + for (name, inst) in instances.items(): + assert owned_nodes.issuperset(inst.all_nodes), \ + "Instance %s's nodes changed while we kept the lock" % name + + inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups) + + assert cur_group_uuid is None or cur_group_uuid in inst_groups, \ + "Instance %s has no node in group %s" % (name, cur_group_uuid) + + def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups): """Checks if the owned node groups are still correct for an instance. @@ -1885,7 +1914,7 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): """Verifies the cluster config. """ - REQ_BGL = True + REQ_BGL = False def _VerifyHVP(self, hvp_data): """Verifies locally the syntax of the hypervisor parameters. @@ -1902,13 +1931,17 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err)) def ExpandNames(self): - # Information can be safely retrieved as the BGL is acquired in exclusive - # mode - assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER) + self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET) + self.share_locks = _ShareAll() + + def CheckPrereq(self): + """Check prerequisites. + + """ + # Retrieve all information self.all_group_info = self.cfg.GetAllNodeGroupsInfo() self.all_node_info = self.cfg.GetAllNodesInfo() self.all_inst_info = self.cfg.GetAllInstancesInfo() - self.needed_locks = {} def Exec(self, feedback_fn): """Verify integrity of cluster, performing various test on nodes. @@ -3499,15 +3532,8 @@ class LUGroupVerifyDisks(NoHooksLU): self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances)) # Check if node groups for locked instances are still correct - for (instance_name, inst) in self.instances.items(): - assert owned_nodes.issuperset(inst.all_nodes), \ - "Instance %s's nodes changed while we kept the lock" % instance_name - - inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name, - owned_groups) - - assert self.group_uuid in inst_groups, \ - "Instance %s has no node in group %s" % (instance_name, self.group_uuid) + _CheckInstancesNodeGroups(self.cfg, self.instances, + owned_groups, owned_nodes, self.group_uuid) def Exec(self, feedback_fn): """Verify integrity of cluster disks. @@ -4474,7 +4500,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): return not cumul_degraded -def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): +def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False): """Check that mirrors are not degraded. The ldisk parameter, if True, will change the test from the @@ -4503,7 +4529,8 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): if dev.children: for child in dev.children: - result = result and _CheckDiskConsistency(lu, child, node, on_primary) + result = result and _CheckDiskConsistency(lu, instance, child, node, + on_primary) return result @@ -4512,7 +4539,7 @@ class LUOobCommand(NoHooksLU): """Logical unit for OOB handling. """ - REG_BGL = False + REQ_BGL = False _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE) def ExpandNames(self): @@ -5589,6 +5616,19 @@ class LUNodeAdd(LogicalUnit): if self.op.disk_state: self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None) + # TODO: If we need to have multiple DnsOnlyRunner we probably should make + # it a property on the base class. + result = rpc.DnsOnlyRunner().call_version([node])[node] + result.Raise("Can't get version information from node %s" % node) + if constants.PROTOCOL_VERSION == result.payload: + logging.info("Communication to node %s fine, sw version %s match", + node, result.payload) + else: + raise errors.OpPrereqError("Version mismatch master version %s," + " node version %s" % + (constants.PROTOCOL_VERSION, result.payload), + errors.ECODE_ENVIRON) + def Exec(self, feedback_fn): """Adds the new node to the cluster. @@ -5633,17 +5673,6 @@ class LUNodeAdd(LogicalUnit): if self.op.disk_state: new_node.disk_state_static = self.new_disk_state - # check connectivity - result = self.rpc.call_version([node])[node] - result.Raise("Can't get version information from node %s" % node) - if constants.PROTOCOL_VERSION == result.payload: - logging.info("Communication to node %s fine, sw version %s match", - node, result.payload) - else: - raise errors.OpExecError("Version mismatch master version %s," - " node version %s" % - (constants.PROTOCOL_VERSION, result.payload)) - # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: master_node = self.cfg.GetMasterNode() @@ -5909,9 +5938,7 @@ class LUNodeSetParams(LogicalUnit): if old_role == self._ROLE_OFFLINE and new_role != old_role: # Trying to transition out of offline status - # TODO: Use standard RPC runner, but make sure it works when the node is - # still marked offline - result = rpc.BootstrapRunner().call_version([node.name])[node.name] + result = self.rpc.call_version([node.name])[node.name] if result.fail_msg: raise errors.OpPrereqError("Node %s is being de-offlined but fails" " to report its version: %s" % @@ -6096,7 +6123,7 @@ class LUClusterQuery(NoHooksLU): "config_version": constants.CONFIG_VERSION, "os_api_version": max(constants.OS_API_VERSIONS), "export_version": constants.EXPORT_VERSION, - "architecture": (platform.architecture()[0], platform.machine()), + "architecture": runtime.GetArchInfo(), "name": cluster.cluster_name, "master": cluster.master_node, "default_hypervisor": cluster.primary_hypervisor, @@ -6139,38 +6166,70 @@ class LUClusterConfigQuery(NoHooksLU): """ REQ_BGL = False - _FIELDS_DYNAMIC = utils.FieldSet() - _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag", - "watcher_pause", "volume_group_name") def CheckArguments(self): - _CheckOutputFields(static=self._FIELDS_STATIC, - dynamic=self._FIELDS_DYNAMIC, - selected=self.op.output_fields) + self.cq = _ClusterQuery(None, self.op.output_fields, False) def ExpandNames(self): - self.needed_locks = {} + self.cq.ExpandNames(self) + + def DeclareLocks(self, level): + self.cq.DeclareLocks(self, level) def Exec(self, feedback_fn): - """Dump a representation of the cluster config to the standard output. - - """ - values = [] - for field in self.op.output_fields: - if field == "cluster_name": - entry = self.cfg.GetClusterName() - elif field == "master_node": - entry = self.cfg.GetMasterNode() - elif field == "drain_flag": - entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) - elif field == "watcher_pause": - entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE) - elif field == "volume_group_name": - entry = self.cfg.GetVGName() - else: - raise errors.ParameterError(field) - values.append(entry) - return values + result = self.cq.OldStyleQuery(self) + + assert len(result) == 1 + + return result[0] + + +class _ClusterQuery(_QueryBase): + FIELDS = query.CLUSTER_FIELDS + + #: Do not sort (there is only one item) + SORT_FIELD = None + + def ExpandNames(self, lu): + lu.needed_locks = {} + + # The following variables interact with _QueryBase._GetNames + self.wanted = locking.ALL_SET + self.do_locking = self.use_locking + + if self.do_locking: + raise errors.OpPrereqError("Can not use locking for cluster queries", + errors.ECODE_INVAL) + + def DeclareLocks(self, lu, level): + pass + + def _GetQueryData(self, lu): + """Computes the list of nodes and their attributes. + + """ + # Locking is not used + assert not (compat.any(lu.glm.is_owned(level) + for level in locking.LEVELS + if level != locking.LEVEL_CLUSTER) or + self.do_locking or self.use_locking) + + if query.CQ_CONFIG in self.requested_data: + cluster = lu.cfg.GetClusterInfo() + else: + cluster = NotImplemented + + if query.CQ_QUEUE_DRAINED in self.requested_data: + drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) + else: + drain_flag = NotImplemented + + if query.CQ_WATCHER_PAUSE in self.requested_data: + watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE) + else: + watcher_pause = NotImplemented + + return query.ClusterQueryData(cluster, drain_flag, watcher_pause) class LUInstanceActivateDisks(NoHooksLU): @@ -6257,7 +6316,8 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, node_disk = node_disk.Copy() node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) - result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx) + result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, + False, idx) msg = result.fail_msg if msg: lu.proc.LogWarning("Could not prepare block device %s on node %s" @@ -6279,7 +6339,8 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, node_disk = node_disk.Copy() node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) - result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx) + result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, + True, idx) msg = result.fail_msg if msg: lu.proc.LogWarning("Could not prepare block device %s on node %s" @@ -7319,7 +7380,7 @@ def _RemoveInstance(lu, feedback_fn, instance, ignore_failures): """ logging.info("Removing block devices for instance %s", instance.name) - if not _RemoveDisks(lu, instance): + if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures): if not ignore_failures: raise errors.OpExecError("Can't remove instance's disks") feedback_fn("Warning: can't remove instance's disks") @@ -7674,7 +7735,7 @@ class LUInstanceMove(LogicalUnit): # activate, get path, copy the data over for idx, disk in enumerate(instance.disks): self.LogInfo("Copying data for disk %d", idx) - result = self.rpc.call_blockdev_assemble(target_node, disk, + result = self.rpc.call_blockdev_assemble(target_node, (disk, instance), instance.name, True, idx) if result.fail_msg: self.LogWarning("Can't assemble newly created disk %d: %s", @@ -7682,7 +7743,7 @@ class LUInstanceMove(LogicalUnit): errs.append(result.fail_msg) break dev_path = result.payload - result = self.rpc.call_blockdev_export(source_node, disk, + result = self.rpc.call_blockdev_export(source_node, (disk, instance), target_node, dev_path, cluster_name) if result.fail_msg: @@ -8045,7 +8106,8 @@ class TLMigrateInstance(Tasklet): all_done = True result = self.rpc.call_drbd_wait_sync(self.all_nodes, self.nodes_ip, - self.instance.disks) + (self.instance.disks, + self.instance)) min_percent = 100 for node, nres in result.items(): nres.Raise("Cannot resync disks on node %s" % node) @@ -8091,7 +8153,7 @@ class TLMigrateInstance(Tasklet): msg = "single-master" self.feedback_fn("* changing disks into %s mode" % msg) result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip, - self.instance.disks, + (self.instance.disks, self.instance), self.instance.name, multimaster) for node, nres in result.items(): nres.Raise("Cannot change disks config on node %s" % node) @@ -8243,7 +8305,7 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* checking disk consistency between source and target") for (idx, dev) in enumerate(instance.disks): - if not _CheckDiskConsistency(self.lu, dev, target_node, False): + if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False): raise errors.OpExecError("Disk %s is degraded or not fully" " synchronized on target node," " aborting migration" % idx) @@ -8406,7 +8468,8 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* checking disk consistency between source and target") for (idx, dev) in enumerate(instance.disks): # for drbd, these are drbd over lvm - if not _CheckDiskConsistency(self.lu, dev, target_node, False): + if not _CheckDiskConsistency(self.lu, instance, dev, target_node, + False): if primary_node.offline: self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" " target node %s" % @@ -8581,94 +8644,8 @@ def _GenerateUniqueNames(lu, exts): return results -def _ComputeLDParams(disk_template, disk_params): - """Computes Logical Disk parameters from Disk Template parameters. - - @type disk_template: string - @param disk_template: disk template, one of L{constants.DISK_TEMPLATES} - @type disk_params: dict - @param disk_params: disk template parameters; dict(template_name -> parameters - @rtype: list(dict) - @return: a list of dicts, one for each node of the disk hierarchy. Each dict - contains the LD parameters of the node. The tree is flattened in-order. - - """ - if disk_template not in constants.DISK_TEMPLATES: - raise errors.ProgrammerError("Unknown disk template %s" % disk_template) - - result = list() - dt_params = disk_params[disk_template] - if disk_template == constants.DT_DRBD8: - drbd_params = { - constants.LDP_RESYNC_RATE: dt_params[constants.DRBD_RESYNC_RATE], - constants.LDP_BARRIERS: dt_params[constants.DRBD_DISK_BARRIERS], - constants.LDP_NO_META_FLUSH: dt_params[constants.DRBD_META_BARRIERS], - constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG], - constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM], - constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM], - constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC], - constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD], - constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET], - constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET], - constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE], - constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE], - } - - drbd_params = \ - objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_DRBD8], - drbd_params) - - result.append(drbd_params) - - # data LV - data_params = { - constants.LDP_STRIPES: dt_params[constants.DRBD_DATA_STRIPES], - } - data_params = \ - objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV], - data_params) - result.append(data_params) - - # metadata LV - meta_params = { - constants.LDP_STRIPES: dt_params[constants.DRBD_META_STRIPES], - } - meta_params = \ - objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV], - meta_params) - result.append(meta_params) - - elif (disk_template == constants.DT_FILE or - disk_template == constants.DT_SHARED_FILE): - result.append(constants.DISK_LD_DEFAULTS[constants.LD_FILE]) - - elif disk_template == constants.DT_PLAIN: - params = { - constants.LDP_STRIPES: dt_params[constants.LV_STRIPES], - } - params = \ - objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV], - params) - result.append(params) - - elif disk_template == constants.DT_BLOCK: - result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV]) - - elif disk_template == constants.DT_RBD: - params = { - constants.LDP_POOL: dt_params[constants.RBD_POOL] - } - params = \ - objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD], - params) - result.append(params) - - return result - - def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, - iv_name, p_minor, s_minor, drbd_params, data_params, - meta_params): + iv_name, p_minor, s_minor): """Generate a drbd8 device complete with its children. """ @@ -8678,16 +8655,16 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, logical_id=(vgnames[0], names[0]), - params=data_params) + params={}) dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, logical_id=(vgnames[1], names[1]), - params=meta_params) + params={}) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, logical_id=(primary, secondary, port, p_minor, s_minor, shared_secret), children=[dev_data, dev_meta], - iv_name=iv_name, params=drbd_params) + iv_name=iv_name, params={}) return drbd_dev @@ -8708,8 +8685,7 @@ _DISK_TEMPLATE_DEVICE_TYPE = { def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, secondary_nodes, disk_info, file_storage_dir, file_driver, base_index, - feedback_fn, disk_params, - _req_file_storage=opcodes.RequireFileStorage, + feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage, _req_shr_file_storage=opcodes.RequireSharedFileStorage): """Generate the entire disk layout for a given template type. @@ -8719,18 +8695,20 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, vgname = lu.cfg.GetVGName() disk_count = len(disk_info) disks = [] - ld_params = _ComputeLDParams(template_name, disk_params) if template_name == constants.DT_DISKLESS: pass elif template_name == constants.DT_DRBD8: - drbd_params, data_params, meta_params = ld_params if len(secondary_nodes) != 1: raise errors.ProgrammerError("Wrong template configuration") remote_node = secondary_nodes[0] minors = lu.cfg.AllocateDRBDMinor( [primary_node, remote_node] * len(disk_info), instance_name) + (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, + full_disk_params) + drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] + names = [] for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) for i in range(disk_count)]): @@ -8738,7 +8716,6 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, names.append(lv_prefix + "_meta") for idx, disk in enumerate(disk_info): disk_index = idx + base_index - drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] data_vg = disk.get(constants.IDISK_VG, vgname) meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node, @@ -8746,8 +8723,7 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, [data_vg, meta_vg], names[idx * 2:idx * 2 + 2], "disk/%d" % disk_index, - minors[idx * 2], minors[idx * 2 + 1], - drbd_params, data_params, meta_params) + minors[idx * 2], minors[idx * 2 + 1]) disk_dev.mode = disk[constants.IDISK_MODE] disks.append(disk_dev) else: @@ -8767,8 +8743,6 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, (name_prefix, base_index + i) for i in range(disk_count)]) - dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name] - if template_name == constants.DT_PLAIN: def logical_id_fn(idx, _, disk): vg = disk.get(constants.IDISK_VG, vgname) @@ -8787,6 +8761,8 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, else: raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) + dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name] + for idx, disk in enumerate(disk_info): disk_index = idx + base_index size = disk[constants.IDISK_SIZE] @@ -8796,7 +8772,7 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, logical_id=logical_id_fn(idx, disk_index, disk), iv_name="disk/%d" % disk_index, mode=disk[constants.IDISK_MODE], - params=ld_params[0])) + params={})) return disks @@ -8837,7 +8813,9 @@ def _WipeDisks(lu, instance): lu.cfg.SetDiskID(device, node) logging.info("Pause sync of instance %s disks", instance.name) - result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True) + result = lu.rpc.call_blockdev_pause_resume_sync(node, + (instance.disks, instance), + True) for idx, success in enumerate(result.payload): if not success: @@ -8867,7 +8845,8 @@ def _WipeDisks(lu, instance): wipe_size = min(wipe_chunk_size, size - offset) logging.debug("Wiping disk %d, offset %s, chunk %s", idx, offset, wipe_size) - result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size) + result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset, + wipe_size) result.Raise("Could not wipe disk %d at offset %d for size %d" % (idx, offset, wipe_size)) now = time.time() @@ -8880,7 +8859,9 @@ def _WipeDisks(lu, instance): finally: logging.info("Resume sync of instance %s disks", instance.name) - result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False) + result = lu.rpc.call_blockdev_pause_resume_sync(node, + (instance.disks, instance), + False) for idx, success in enumerate(result.payload): if not success: @@ -8934,7 +8915,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None): _CreateBlockDev(lu, node, instance, device, f_create, info, f_create) -def _RemoveDisks(lu, instance, target_node=None): +def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False): """Remove all disks for an instance. This abstracts away some work from `AddInstance()` and @@ -8955,6 +8936,7 @@ def _RemoveDisks(lu, instance, target_node=None): logging.info("Removing block devices for instance %s", instance.name) all_result = True + ports_to_release = set() for (idx, device) in enumerate(instance.disks): if target_node: edata = [(target_node, device)] @@ -8970,8 +8952,11 @@ def _RemoveDisks(lu, instance, target_node=None): # if this is a DRBD disk, return its port to the pool if device.dev_type in constants.LDS_DRBD: - tcp_port = device.logical_id[2] - lu.cfg.AddTcpUdpPort(tcp_port) + ports_to_release.add(device.logical_id[2]) + + if all_result or ignore_failures: + for port in ports_to_release: + lu.cfg.AddTcpUdpPort(port) if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) @@ -9867,10 +9852,6 @@ class LUInstanceCreate(LogicalUnit): utils.CommaJoin(res)), errors.ECODE_INVAL) - # disk parameters (not customizable at instance or node level) - # just use the primary node parameters, ignoring the secondary. - self.diskparams = group_info.diskparams - if not self.adopt_disks: if self.op.disk_template == constants.DT_RBD: # _CheckRADOSFreeSpace() is just a placeholder. @@ -9987,6 +9968,11 @@ class LUInstanceCreate(LogicalUnit): else: network_port = None + # This is ugly but we got a chicken-egg problem here + # We can only take the group disk parameters, as the instance + # has no disks yet (we are generating them right here). + node = self.cfg.GetNodeInfo(pnode_name) + nodegroup = self.cfg.GetNodeGroup(node.group) disks = _GenerateDiskTemplate(self, self.op.disk_template, instance, pnode_name, @@ -9996,7 +9982,7 @@ class LUInstanceCreate(LogicalUnit): self.op.file_driver, 0, feedback_fn, - self.diskparams) + self.cfg.GetGroupDiskParams(nodegroup)) iobj = objects.Instance(name=instance, os=self.op.os_type, primary_node=pnode_name, @@ -10095,7 +10081,8 @@ class LUInstanceCreate(LogicalUnit): if pause_sync: feedback_fn("* pausing disk sync to install instance OS") result = self.rpc.call_blockdev_pause_resume_sync(pnode_name, - iobj.disks, True) + (iobj.disks, + iobj), True) for idx, success in enumerate(result.payload): if not success: logging.warn("pause-sync of instance %s for disk %d failed", @@ -10109,7 +10096,8 @@ class LUInstanceCreate(LogicalUnit): if pause_sync: feedback_fn("* resuming disk sync") result = self.rpc.call_blockdev_pause_resume_sync(pnode_name, - iobj.disks, False) + (iobj.disks, + iobj), False) for idx, success in enumerate(result.payload): if not success: logging.warn("resume-sync of instance %s for disk %d failed", @@ -10646,16 +10634,6 @@ class TLReplaceDisks(Tasklet): _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info, ignore=self.ignore_ipolicy) - # TODO: compute disk parameters - primary_node_info = self.cfg.GetNodeInfo(instance.primary_node) - secondary_node_info = self.cfg.GetNodeInfo(secondary_node) - if primary_node_info.group != secondary_node_info.group: - self.lu.LogInfo("The instance primary and secondary nodes are in two" - " different node groups; the disk parameters of the" - " primary node's group will be applied.") - - self.diskparams = self.cfg.GetNodeGroup(primary_node_info.group).diskparams - for node in check_nodes: _CheckNodeOnline(self.lu, node) @@ -10789,8 +10767,8 @@ class TLReplaceDisks(Tasklet): self.lu.LogInfo("Checking disk/%d consistency on node %s" % (idx, node_name)) - if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary, - ldisk=ldisk): + if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name, + on_primary, ldisk=ldisk): raise errors.OpExecError("Node %s has degraded storage, unsafe to" " replace disks for instance %s" % (node_name, self.instance.name)) @@ -10815,14 +10793,12 @@ class TLReplaceDisks(Tasklet): lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] names = _GenerateUniqueNames(self.lu, lv_names) - _, data_p, meta_p = _ComputeLDParams(constants.DT_DRBD8, self.diskparams) - vg_data = dev.children[0].logical_id[0] lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, - logical_id=(vg_data, names[0]), params=data_p) + logical_id=(vg_data, names[0]), params={}) vg_meta = dev.children[1].logical_id[0] lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, - logical_id=(vg_meta, names[1]), params=meta_p) + logical_id=(vg_meta, names[1]), params={}) new_lvs = [lv_data, lv_meta] old_lvs = [child.Copy() for child in dev.children] @@ -10960,8 +10936,9 @@ class TLReplaceDisks(Tasklet): # Now that the new lvs have the old name, we can add them to the device self.lu.LogInfo("Adding new mirror component on %s" % self.target_node) - result = self.rpc.call_blockdev_addchildren(self.target_node, dev, - new_lvs) + result = self.rpc.call_blockdev_addchildren(self.target_node, + (dev, self.instance), + (new_lvs, self.instance)) msg = result.fail_msg if msg: for new_lv in new_lvs: @@ -11079,12 +11056,11 @@ class TLReplaceDisks(Tasklet): iv_names[idx] = (dev, dev.children, new_net_id) logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, new_net_id) - drbd_params, _, _ = _ComputeLDParams(constants.DT_DRBD8, self.diskparams) new_drbd = objects.Disk(dev_type=constants.LD_DRBD8, logical_id=new_alone_id, children=dev.children, size=dev.size, - params=drbd_params) + params={}) try: _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd, _GetInstanceInfoText(self.instance), False) @@ -11132,7 +11108,7 @@ class TLReplaceDisks(Tasklet): result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip, - self.instance.disks, + (self.instance.disks, self.instance), self.instance.name, False) for to_node, to_result in result.items(): @@ -11523,6 +11499,7 @@ class LUInstanceGrowDisk(LogicalUnit): env = { "DISK": self.op.disk, "AMOUNT": self.op.amount, + "ABSOLUTE": self.op.absolute, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) return env @@ -11555,13 +11532,30 @@ class LUInstanceGrowDisk(LogicalUnit): self.disk = instance.FindDisk(self.op.disk) + if self.op.absolute: + self.target = self.op.amount + self.delta = self.target - self.disk.size + if self.delta < 0: + raise errors.OpPrereqError("Requested size (%s) is smaller than " + "current disk size (%s)" % + (utils.FormatUnit(self.target, "h"), + utils.FormatUnit(self.disk.size, "h")), + errors.ECODE_STATE) + else: + self.delta = self.op.amount + self.target = self.disk.size + self.delta + if self.delta < 0: + raise errors.OpPrereqError("Requested increment (%s) is negative" % + utils.FormatUnit(self.delta, "h"), + errors.ECODE_INVAL) + if instance.disk_template not in (constants.DT_FILE, constants.DT_SHARED_FILE, constants.DT_RBD): # TODO: check the free disk space for file, when that feature will be # supported _CheckNodesFreeDiskPerVG(self, nodenames, - self.disk.ComputeGrowth(self.op.amount)) + self.disk.ComputeGrowth(self.delta)) def Exec(self, feedback_fn): """Execute disk grow. @@ -11578,21 +11572,24 @@ class LUInstanceGrowDisk(LogicalUnit): if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") - feedback_fn("Growing disk %s of instance '%s' by %s" % + feedback_fn("Growing disk %s of instance '%s' by %s to %s" % (self.op.disk, instance.name, - utils.FormatUnit(self.op.amount, "h"))) + utils.FormatUnit(self.delta, "h"), + utils.FormatUnit(self.target, "h"))) # First run all grow ops in dry-run mode for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True) + result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, + True) result.Raise("Grow request failed to node %s" % node) # We know that (as far as we can test) operations across different # nodes will succeed, time to run it for real for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False) + result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, + False) result.Raise("Grow request failed to node %s" % node) # TODO: Rewrite code to work properly @@ -11602,7 +11599,7 @@ class LUInstanceGrowDisk(LogicalUnit): # time is a work-around. time.sleep(5) - disk.RecordGrow(self.op.amount) + disk.RecordGrow(self.delta) self.cfg.Update(instance, feedback_fn) # Changes have been recorded, release node lock @@ -11656,12 +11653,25 @@ class LUInstanceQueryData(NoHooksLU): else: self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names + self.needed_locks[locking.LEVEL_NODEGROUP] = [] self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): - if self.op.use_locking and level == locking.LEVEL_NODE: - self._LockInstancesNodes() + if self.op.use_locking: + if level == locking.LEVEL_NODEGROUP: + owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) + + # Lock all groups used by instances optimistically; this requires going + # via the node before it's locked, requiring verification later on + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + frozenset(group_uuid + for instance_name in owned_instances + for group_uuid in + self.cfg.GetInstanceNodeGroups(instance_name)) + + elif level == locking.LEVEL_NODE: + self._LockInstancesNodes() def CheckPrereq(self): """Check prerequisites. @@ -11669,14 +11679,25 @@ class LUInstanceQueryData(NoHooksLU): This only checks the optional instance list against the existing names. """ + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) + if self.wanted_names is None: assert self.op.use_locking, "Locking was not used" - self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE) + self.wanted_names = owned_instances - self.wanted_instances = \ - map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names)) + instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names)) - def _ComputeBlockdevStatus(self, node, instance_name, dev): + if self.op.use_locking: + _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes, + None) + else: + assert not (owned_instances or owned_groups or owned_nodes) + + self.wanted_instances = instances.values() + + def _ComputeBlockdevStatus(self, node, instance, dev): """Returns the status of a block device """ @@ -11689,7 +11710,7 @@ class LUInstanceQueryData(NoHooksLU): if result.offline: return None - result.Raise("Can't compute disk status for %s" % instance_name) + result.Raise("Can't compute disk status for %s" % instance.name) status = result.payload if status is None: @@ -11711,8 +11732,8 @@ class LUInstanceQueryData(NoHooksLU): snode = dev.logical_id[0] dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node, - instance.name, dev) - dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev) + instance, dev) + dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev) if dev.children: dev_children = map(compat.partial(self._ComputeDiskStatus, @@ -11739,9 +11760,17 @@ class LUInstanceQueryData(NoHooksLU): cluster = self.cfg.GetClusterInfo() - pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node - for i in self.wanted_instances) - for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes): + node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances)) + nodes = dict(self.cfg.GetMultiNodeInfo(node_names)) + + groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group + for node in nodes.values())) + + group2name_fn = lambda uuid: groups[uuid].name + + for instance in self.wanted_instances: + pnode = nodes[instance.primary_node] + if self.op.static or pnode.offline: remote_state = None if pnode.offline: @@ -11765,12 +11794,19 @@ class LUInstanceQueryData(NoHooksLU): disks = map(compat.partial(self._ComputeDiskStatus, instance, None), instance.disks) + snodes_group_uuids = [nodes[snode_name].group + for snode_name in instance.secondary_nodes] + result[instance.name] = { "name": instance.name, "config_state": instance.admin_state, "run_state": remote_state, "pnode": instance.primary_node, + "pnode_group_uuid": pnode.group, + "pnode_group_name": group2name_fn(pnode.group), "snodes": instance.secondary_nodes, + "snodes_group_uuids": snodes_group_uuids, + "snodes_group_names": map(group2name_fn, snodes_group_uuids), "os": instance.os, # this happens to be the same format used for hooks "nics": _NICListToTuple(self, instance.nics), @@ -12222,7 +12258,7 @@ class LUInstanceSetParams(LogicalUnit): pnode = instance.primary_node nodelist = list(instance.all_nodes) pnode_info = self.cfg.GetNodeInfo(pnode) - self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams + self.diskparams = self.cfg.GetInstanceDiskParams(instance) # Prepare disk/NIC modifications self.diskmod = PrepareContainerMods(self.op.disks, None) @@ -12503,7 +12539,7 @@ class LUInstanceSetParams(LogicalUnit): disk_info, None, None, 0, feedback_fn, self.diskparams) info = _GetInstanceInfoText(instance) - feedback_fn("Creating aditional volumes...") + feedback_fn("Creating additional volumes...") # first, create the missing data and meta devices for disk in new_disks: # unfortunately this is... not too nice @@ -12564,6 +12600,12 @@ class LUInstanceSetParams(LogicalUnit): child.size = parent.size child.mode = parent.mode + # this is a DRBD disk, return its port to the pool + # NOTE: this must be done right before the call to cfg.Update! + for disk in old_disks: + tcp_port = disk.logical_id[2] + self.cfg.AddTcpUdpPort(tcp_port) + # update instance structure instance.disks = new_disks instance.disk_template = constants.DT_PLAIN @@ -12589,13 +12631,6 @@ class LUInstanceSetParams(LogicalUnit): self.LogWarning("Could not remove metadata for disk %d on node %s," " continuing anyway: %s", idx, pnode, msg) - # this is a DRBD disk, return its port to the pool - for disk in old_disks: - tcp_port = disk.logical_id[2] - self.cfg.AddTcpUdpPort(tcp_port) - - # Node resource locks will be released by caller - def _CreateNewDisk(self, idx, params, _): """Creates a new disk. @@ -12890,7 +12925,7 @@ class LUInstanceChangeGroup(LogicalUnit): if self.req_target_uuids: # User requested specific target groups - self.target_uuids = self.req_target_uuids + self.target_uuids = frozenset(self.req_target_uuids) else: # All groups except those used by the instance are potential targets self.target_uuids = owned_groups - inst_groups @@ -12959,32 +12994,74 @@ class LUBackupQuery(NoHooksLU): """ REQ_BGL = False + def CheckArguments(self): + self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes), + ["node", "export"], self.op.use_locking) + def ExpandNames(self): - self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 - if not self.op.nodes: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - else: - self.needed_locks[locking.LEVEL_NODE] = \ - _GetWantedNodes(self, self.op.nodes) + self.expq.ExpandNames(self) + + def DeclareLocks(self, level): + self.expq.DeclareLocks(self, level) def Exec(self, feedback_fn): - """Compute the list of all the exported system images. + result = {} - @rtype: dict - @return: a dictionary with the structure node->(export-list) - where export-list is a list of the instances exported on - that node. + for (node, expname) in self.expq.OldStyleQuery(self): + if expname is None: + result[node] = False + else: + result.setdefault(node, []).append(expname) + + return result + + +class _ExportQuery(_QueryBase): + FIELDS = query.EXPORT_FIELDS + + #: The node name is not a unique key for this query + SORT_FIELD = "node" + + def ExpandNames(self, lu): + lu.needed_locks = {} + + # The following variables interact with _QueryBase._GetNames + if self.names: + self.wanted = _GetWantedNodes(lu, self.names) + else: + self.wanted = locking.ALL_SET + + self.do_locking = self.use_locking + + if self.do_locking: + lu.share_locks = _ShareAll() + lu.needed_locks = { + locking.LEVEL_NODE: self.wanted, + } + + def DeclareLocks(self, lu, level): + pass + + def _GetQueryData(self, lu): + """Computes the list of nodes and their attributes. """ - self.nodes = self.owned_locks(locking.LEVEL_NODE) - rpcresult = self.rpc.call_export_list(self.nodes) - result = {} - for node in rpcresult: - if rpcresult[node].fail_msg: - result[node] = False + # Locking is not used + # TODO + assert not (compat.any(lu.glm.is_owned(level) + for level in locking.LEVELS + if level != locking.LEVEL_CLUSTER) or + self.do_locking or self.use_locking) + + nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE) + + result = [] + + for (node, nres) in lu.rpc.call_export_list(nodes).items(): + if nres.fail_msg: + result.append((node, None)) else: - result[node] = rpcresult[node].payload + result.extend((node, expname) for expname in nres.payload) return result @@ -14081,16 +14158,8 @@ class LUGroupEvacuate(LogicalUnit): self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances)) # Check if node groups for locked instances are still correct - for instance_name in owned_instances: - inst = self.instances[instance_name] - assert owned_nodes.issuperset(inst.all_nodes), \ - "Instance %s's nodes changed while we kept the lock" % instance_name - - inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name, - owned_groups) - - assert self.group_uuid in inst_groups, \ - "Instance %s has no node in group %s" % (instance_name, self.group_uuid) + _CheckInstancesNodeGroups(self.cfg, self.instances, + owned_groups, owned_nodes, self.group_uuid) if self.req_target_uuids: # User requested specific target groups @@ -14158,14 +14227,25 @@ class TagsLU(NoHooksLU): # pylint: disable=W0223 def ExpandNames(self): self.group_uuid = None self.needed_locks = {} + if self.op.kind == constants.TAG_NODE: self.op.name = _ExpandNodeName(self.cfg, self.op.name) - self.needed_locks[locking.LEVEL_NODE] = self.op.name + lock_level = locking.LEVEL_NODE + lock_name = self.op.name elif self.op.kind == constants.TAG_INSTANCE: self.op.name = _ExpandInstanceName(self.cfg, self.op.name) - self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name + lock_level = locking.LEVEL_INSTANCE + lock_name = self.op.name elif self.op.kind == constants.TAG_NODEGROUP: self.group_uuid = self.cfg.LookupNodeGroup(self.op.name) + lock_level = locking.LEVEL_NODEGROUP + lock_name = self.group_uuid + else: + lock_level = None + lock_name = None + + if lock_level and getattr(self.op, "use_locking", True): + self.needed_locks[lock_level] = lock_name # FIXME: Acquire BGL for cluster tag operations (as of this writing it's # not possible to acquire the BGL based on opcode parameters) @@ -15113,10 +15193,12 @@ class LUTestAllocator(NoHooksLU): #: Query type implementations _QUERY_IMPL = { + constants.QR_CLUSTER: _ClusterQuery, constants.QR_INSTANCE: _InstanceQuery, constants.QR_NODE: _NodeQuery, constants.QR_GROUP: _GroupQuery, constants.QR_OS: _OsQuery, + constants.QR_EXPORT: _ExportQuery, } assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP