return wrapper
-def _Compress(data):
+def _Compress(_, data):
"""Compresses a string for transport over RPC.
Small amounts of data are not compressed.
self._encoder = compat.partial(self._EncodeArg, encoder_fn)
@staticmethod
- def _EncodeArg(encoder_fn, (argkind, value)):
+ def _EncodeArg(encoder_fn, node, (argkind, value)):
"""Encode argument.
"""
if argkind is None:
return value
else:
- return encoder_fn(argkind)(value)
+ return encoder_fn(argkind)(node, value)
def _Call(self, cdef, node_list, args):
"""Entry point for automatically generated RPC wrappers.
if len(args) != len(argdefs):
raise errors.ProgrammerError("Number of passed arguments doesn't match")
- enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
if prep_fn is None:
- # for a no-op prep_fn, we serialise the body once, and then we
- # reuse it in the dictionary values
- body = serializer.DumpJson(enc_args)
- pnbody = dict((n, body) for n in node_list)
- else:
- # for a custom prep_fn, we pass the encoded arguments and the
- # node name to the prep_fn, and we serialise its return value
- assert callable(prep_fn)
- pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
- for n in node_list)
+ prep_fn = lambda _, args: args
+ assert callable(prep_fn)
+
+ # encode the arguments for each node individually, pass them and the node
+ # name to the prep_fn, and serialise its return value
+ encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
+ zip(map(compat.snd, argdefs), args))
+ pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
+ for n in node_list)
result = self._proc(node_list, procedure, pnbody, read_timeout,
req_resolver_opts)
return result
-def _ObjectToDict(value):
+def _ObjectToDict(_, value):
"""Converts an object to a dictionary.
@note: See L{objects}.
return value.ToDict()
-def _ObjectListToDict(value):
+def _ObjectListToDict(node, value):
"""Converts a list of L{objects} to dictionaries.
"""
- return map(_ObjectToDict, value)
+ return map(compat.partial(_ObjectToDict, node), value)
-def _EncodeNodeToDiskDict(value):
- """Encodes a dictionary with node name as key and disk objects as values.
-
- """
- return dict((name, _ObjectListToDict(disks))
- for name, disks in value.items())
-
-
-def _PrepareFileUpload(getents_fn, filename):
+def _PrepareFileUpload(getents_fn, node, filename):
"""Loads a file and prepares it for an upload to nodes.
"""
statcb = utils.FileStatHelper()
- data = _Compress(utils.ReadFile(filename, preread=statcb))
+ data = _Compress(node, utils.ReadFile(filename, preread=statcb))
st = statcb.st
if getents_fn is None:
getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
-def _PrepareFinalizeExportDisks(snap_disks):
+def _PrepareFinalizeExportDisks(_, snap_disks):
"""Encodes disks for finalizing export.
"""
return flat_disks
-def _EncodeImportExportIO((ieio, ieioargs)):
- """Encodes import/export I/O information.
-
- """
- if ieio == constants.IEIO_RAW_DISK:
- assert len(ieioargs) == 1
- return (ieio, (ieioargs[0].ToDict(), ))
-
- if ieio == constants.IEIO_SCRIPT:
- assert len(ieioargs) == 2
- return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
-
- return (ieio, ieioargs)
-
-
-def _EncodeBlockdevRename(value):
+def _EncodeBlockdevRename(_, value):
"""Encodes information for renaming block devices.
"""
if lvm_pv_info:
result["spindles_free"] = lvm_pv_info["storage_free"]
result["spindles_total"] = lvm_pv_info["storage_size"]
+ else:
+ raise errors.OpExecError("No spindle storage information available.")
-def _AddDefaultStorageInfoToLegacyNodeInfo(result, space_info,
- require_vg_info=True):
+def _AddDefaultStorageInfoToLegacyNodeInfo(result, space_info):
"""Extracts the storage space information of the default storage type from
the space info and adds it to the result dictionary.
@see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
- @type require_vg_info: boolean
- @param require_vg_info: indicates whether volume group information is
- required or not
"""
# Check if there is at least one row for non-spindle storage info.
else:
default_space_info = space_info[0]
- if require_vg_info:
- # if lvm storage is required, ignore the actual default and look for LVM
- lvm_info_found = False
- for space_entry in space_info:
- if space_entry["type"] == constants.ST_LVM_VG:
- default_space_info = space_entry
- lvm_info_found = True
- continue
- if not lvm_info_found:
- raise errors.OpExecError("LVM volume group info required, but not"
- " provided.")
-
if default_space_info:
result["name"] = default_space_info["name"]
result["storage_free"] = default_space_info["storage_free"]
result["storage_size"] = default_space_info["storage_size"]
-def MakeLegacyNodeInfo(data, require_vg_info=True):
+def MakeLegacyNodeInfo(data, require_spindles=False):
"""Formats the data returned by L{rpc.RpcRunner.call_node_info}.
Converts the data into a single dictionary. This is fine for most use cases,
but some require information from more than one volume group or hypervisor.
- @param require_vg_info: raise an error if the returnd vg_info
- doesn't have any values
+ @param require_spindles: add spindle storage information to the legacy node
+ info
"""
(bootid, space_info, (hv_info, )) = data
ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
- _AddSpindlesToLegacyNodeInfo(ret, space_info)
- _AddDefaultStorageInfoToLegacyNodeInfo(ret, space_info,
- require_vg_info=require_vg_info)
+ if require_spindles:
+ _AddSpindlesToLegacyNodeInfo(ret, space_info)
+ _AddDefaultStorageInfoToLegacyNodeInfo(ret, space_info)
return ret
"""Annotates just DRBD disks layouts.
"""
- assert disk.dev_type == constants.LD_DRBD8
+ assert disk.dev_type == constants.DT_DRBD8
disk.params = objects.FillDict(drbd_params, disk.params)
(dev_data, dev_meta) = disk.children
"""Generic disk parameter annotation routine.
"""
- assert disk.dev_type != constants.LD_DRBD8
+ assert disk.dev_type != constants.DT_DRBD8
disk.params = objects.FillDict(params, disk.params)
return disk
-def AnnotateDiskParams(template, disks, disk_params):
+def AnnotateDiskParams(disks, disk_params):
"""Annotates the disk objects with the disk parameters.
- @param template: The disk template used
@param disks: The list of disks objects to annotate
- @param disk_params: The disk paramaters for annotation
+ @param disk_params: The disk parameters for annotation
@returns: A list of disk objects annotated
"""
- ld_params = objects.Disk.ComputeLDParams(template, disk_params)
+ def AnnotateDisk(disk):
+ if disk.dev_type == constants.DT_DISKLESS:
+ return disk
- if template == constants.DT_DRBD8:
- annotation_fn = _AnnotateDParamsDRBD
- elif template == constants.DT_DISKLESS:
- annotation_fn = lambda disk, _: disk
- else:
- annotation_fn = _AnnotateDParamsGeneric
+ ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
+
+ if disk.dev_type == constants.DT_DRBD8:
+ return _AnnotateDParamsDRBD(disk, ld_params)
+ else:
+ return _AnnotateDParamsGeneric(disk, ld_params)
- return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
+ return [AnnotateDisk(disk.Copy()) for disk in disks]
-def _GetESFlag(cfg, node_uuid):
+def _GetExclusiveStorageFlag(cfg, node_uuid):
ni = cfg.GetNodeInfo(node_uuid)
if ni is None:
raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
+def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
+ """Adds the exclusive storage flag to lvm units.
+
+ This function creates a copy of the storage_units lists, with the
+ es_flag being added to all lvm storage units.
+
+ @type storage_units: list of pairs (string, string)
+ @param storage_units: list of 'raw' storage units, consisting only of
+ (storage_type, storage_key)
+ @type es_flag: boolean
+ @param es_flag: exclusive storage flag
+ @rtype: list of tuples (string, string, list)
+ @return: list of storage units (storage_type, storage_key, params) with
+ the params containing the es_flag for lvm-vg storage units
+
+ """
+ result = []
+ for (storage_type, storage_key) in storage_units:
+ if storage_type in [constants.ST_LVM_VG, constants.ST_LVM_PV]:
+ result.append((storage_type, storage_key, [es_flag]))
+ else:
+ result.append((storage_type, storage_key, []))
+ return result
+
+
def GetExclusiveStorageForNodes(cfg, node_uuids):
"""Return the exclusive storage flag for all the given nodes.
@type node_uuids: list or tuple
@param node_uuids: node UUIDs for which to read the flag
@rtype: dict
- @return: mapping from node names to exclusive storage flags
+ @return: mapping from node uuids to exclusive storage flags
@raise errors.OpPrereqError: if any given node name has no corresponding
node
"""
- getflag = lambda n: _GetESFlag(cfg, n)
+ getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
flags = map(getflag, node_uuids)
return dict(zip(node_uuids, flags))
+def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
+ """Return the lvm storage unit for all the given nodes.
+
+ Main purpose of this function is to map the exclusive storage flag, which
+ can be different for each node, to the default LVM storage unit.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: cluster configuration
+ @type storage_units: list of pairs (string, string)
+ @param storage_units: list of 'raw' storage units, e.g. pairs of
+ (storage_type, storage_key)
+ @type node_uuids: list or tuple
+ @param node_uuids: node UUIDs for which to read the flag
+ @rtype: dict
+ @return: mapping from node uuids to a list of storage units which include
+ the exclusive storage flag for lvm storage
+ @raise errors.OpPrereqError: if any given node name has no corresponding
+ node
+
+ """
+ getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
+ storage_units, _GetExclusiveStorageFlag(cfg, n))
+ flags = map(getunit, node_uuids)
+ return dict(zip(node_uuids, flags))
+
+
#: Generic encoders
_ENCODERS = {
rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
- rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
rpc_defs.ED_COMPRESS: _Compress,
rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
- rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
}
# Encoders annotating disk parameters
rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
+ rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
+ rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
# Encoders with special requirements
rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
+
+ rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
})
# Resolver using configuration
_generated_rpc.RpcClientDnsOnly.__init__(self)
_generated_rpc.RpcClientDefault.__init__(self)
- def _NicDict(self, nic):
+ def _NicDict(self, _, nic):
"""Convert the given nic to a dict and encapsulate netinfo
"""
n.netinfo = objects.Network.ToDict(nobj)
return n.ToDict()
- def _InstDict(self, instance, hvp=None, bep=None, osp=None):
+ def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
"""Convert the given instance to a dict.
This is done via the instance's ToDict() method and additionally
idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
if osp is not None:
idict["osparams"].update(osp)
- idict["disks"] = self._DisksDictDP((instance.disks, instance))
+ idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
for nic in idict["nics"]:
nic["nicparams"] = objects.FillDict(
cluster.nicparams[constants.PP_DEFAULT],
nic["netinfo"] = objects.Network.ToDict(nobj)
return idict
- def _InstDictHvpBepDp(self, (instance, hvp, bep)):
+ def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
"""Wrapper for L{_InstDict}.
"""
- return self._InstDict(instance, hvp=hvp, bep=bep)
+ return self._InstDict(node, instance, hvp=hvp, bep=bep)
- def _InstDictOspDp(self, (instance, osparams)):
+ def _InstDictOspDp(self, node, (instance, osparams)):
"""Wrapper for L{_InstDict}.
"""
- return self._InstDict(instance, osp=osparams)
+ return self._InstDict(node, instance, osp=osparams)
- def _DisksDictDP(self, (disks, instance)):
+ def _DisksDictDP(self, node, (disks, instance)):
"""Wrapper for L{AnnotateDiskParams}.
"""
diskparams = self._cfg.GetInstanceDiskParams(instance)
- return [disk.ToDict()
- for disk in AnnotateDiskParams(instance.disk_template,
- disks, diskparams)]
+ ret = []
+ for disk in AnnotateDiskParams(disks, diskparams):
+ disk_node_uuids = disk.GetNodes(instance.primary_node)
+ node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
+ in self._cfg.GetMultiNodeInfo(disk_node_uuids))
+
+ disk.UpdateDynamicDiskParams(node, node_ips)
- def _SingleDiskDictDP(self, (disk, instance)):
+ ret.append(disk.ToDict(include_dynamic_params=True))
+
+ return ret
+
+ def _MultiDiskDictDP(self, node, disks_insts):
"""Wrapper for L{AnnotateDiskParams}.
+ Supports a list of (disk, instance) tuples.
"""
- (anno_disk,) = self._DisksDictDP(([disk], instance))
+ return [disk for disk_inst in disks_insts
+ for disk in self._DisksDictDP(node, disk_inst)]
+
+ def _SingleDiskDictDP(self, node, (disk, instance)):
+ """Wrapper for L{AnnotateDiskParams}.
+
+ """
+ (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
return anno_disk
+ def _EncodeNodeToDiskDictDP(self, node, value):
+ """Encode dict of node name -> list of (disk, instance) tuples as values.
+
+ """
+ return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
+ for name, disks in value.items())
+
+ def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
+ """Encodes import/export I/O information.
+
+ """
+ if ieio == constants.IEIO_RAW_DISK:
+ assert len(ieioargs) == 1
+ return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ))
+
+ if ieio == constants.IEIO_SCRIPT:
+ assert len(ieioargs) == 2
+ return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
+
+ return (ieio, ieioargs)
+
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
"""RPC wrappers for job queue.