return wrapper
-def _Compress(data):
+def _Compress(_, data):
"""Compresses a string for transport over RPC.
Small amounts of data are not compressed.
self._encoder = compat.partial(self._EncodeArg, encoder_fn)
@staticmethod
- def _EncodeArg(encoder_fn, (argkind, value)):
+ def _EncodeArg(encoder_fn, node, (argkind, value)):
"""Encode argument.
"""
if argkind is None:
return value
else:
- return encoder_fn(argkind)(value)
+ return encoder_fn(argkind)(node, value)
def _Call(self, cdef, node_list, args):
"""Entry point for automatically generated RPC wrappers.
if len(args) != len(argdefs):
raise errors.ProgrammerError("Number of passed arguments doesn't match")
- enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
if prep_fn is None:
- # for a no-op prep_fn, we serialise the body once, and then we
- # reuse it in the dictionary values
- body = serializer.DumpJson(enc_args)
- pnbody = dict((n, body) for n in node_list)
- else:
- # for a custom prep_fn, we pass the encoded arguments and the
- # node name to the prep_fn, and we serialise its return value
- assert callable(prep_fn)
- pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
- for n in node_list)
+ prep_fn = lambda _, args: args
+ assert callable(prep_fn)
+
+ # encode the arguments for each node individually, pass them and the node
+ # name to the prep_fn, and serialise its return value
+ encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
+ zip(map(compat.snd, argdefs), args))
+ pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
+ for n in node_list)
result = self._proc(node_list, procedure, pnbody, read_timeout,
req_resolver_opts)
return result
-def _ObjectToDict(value):
+def _ObjectToDict(_, value):
"""Converts an object to a dictionary.
@note: See L{objects}.
return value.ToDict()
-def _ObjectListToDict(value):
+def _ObjectListToDict(node, value):
"""Converts a list of L{objects} to dictionaries.
"""
- return map(_ObjectToDict, value)
+ return map(compat.partial(_ObjectToDict, node), value)
-def _EncodeNodeToDiskDict(value):
- """Encodes a dictionary with node name as key and disk objects as values.
-
- """
- return dict((name, _ObjectListToDict(disks))
- for name, disks in value.items())
-
-
-def _PrepareFileUpload(getents_fn, filename):
+def _PrepareFileUpload(getents_fn, node, filename):
"""Loads a file and prepares it for an upload to nodes.
"""
statcb = utils.FileStatHelper()
- data = _Compress(utils.ReadFile(filename, preread=statcb))
+ data = _Compress(node, utils.ReadFile(filename, preread=statcb))
st = statcb.st
if getents_fn is None:
getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
-def _PrepareFinalizeExportDisks(snap_disks):
+def _PrepareFinalizeExportDisks(_, snap_disks):
"""Encodes disks for finalizing export.
"""
return flat_disks
-def _EncodeImportExportIO((ieio, ieioargs)):
- """Encodes import/export I/O information.
-
- """
- if ieio == constants.IEIO_RAW_DISK:
- assert len(ieioargs) == 1
- return (ieio, (ieioargs[0].ToDict(), ))
-
- if ieio == constants.IEIO_SCRIPT:
- assert len(ieioargs) == 2
- return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
-
- return (ieio, ieioargs)
-
-
-def _EncodeBlockdevRename(value):
+def _EncodeBlockdevRename(_, value):
"""Encodes information for renaming block devices.
"""
"""Annotates just DRBD disks layouts.
"""
- assert disk.dev_type == constants.LD_DRBD8
+ assert disk.dev_type == constants.DT_DRBD8
disk.params = objects.FillDict(drbd_params, disk.params)
(dev_data, dev_meta) = disk.children
"""Generic disk parameter annotation routine.
"""
- assert disk.dev_type != constants.LD_DRBD8
+ assert disk.dev_type != constants.DT_DRBD8
disk.params = objects.FillDict(params, disk.params)
return disk
-def AnnotateDiskParams(template, disks, disk_params):
+def AnnotateDiskParams(disks, disk_params):
"""Annotates the disk objects with the disk parameters.
- @param template: The disk template used
@param disks: The list of disks objects to annotate
- @param disk_params: The disk paramaters for annotation
+ @param disk_params: The disk parameters for annotation
@returns: A list of disk objects annotated
"""
- ld_params = objects.Disk.ComputeLDParams(template, disk_params)
+ def AnnotateDisk(disk):
+ if disk.dev_type == constants.DT_DISKLESS:
+ return disk
- if template == constants.DT_DRBD8:
- annotation_fn = _AnnotateDParamsDRBD
- elif template == constants.DT_DISKLESS:
- annotation_fn = lambda disk, _: disk
- else:
- annotation_fn = _AnnotateDParamsGeneric
+ ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
- return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
+ if disk.dev_type == constants.DT_DRBD8:
+ return _AnnotateDParamsDRBD(disk, ld_params)
+ else:
+ return _AnnotateDParamsGeneric(disk, ld_params)
+
+ return [AnnotateDisk(disk.Copy()) for disk in disks]
def _GetExclusiveStorageFlag(cfg, node_uuid):
_ENCODERS = {
rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
- rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
rpc_defs.ED_COMPRESS: _Compress,
rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
- rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
}
rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
+ rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
# Encoders with special requirements
rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
+
+ rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
})
# Resolver using configuration
_generated_rpc.RpcClientDnsOnly.__init__(self)
_generated_rpc.RpcClientDefault.__init__(self)
- def _NicDict(self, nic):
+ def _NicDict(self, _, nic):
"""Convert the given nic to a dict and encapsulate netinfo
"""
n.netinfo = objects.Network.ToDict(nobj)
return n.ToDict()
- def _InstDict(self, instance, hvp=None, bep=None, osp=None):
+ def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
"""Convert the given instance to a dict.
This is done via the instance's ToDict() method and additionally
idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
if osp is not None:
idict["osparams"].update(osp)
- idict["disks"] = self._DisksDictDP((instance.disks, instance))
+ idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
for nic in idict["nics"]:
nic["nicparams"] = objects.FillDict(
cluster.nicparams[constants.PP_DEFAULT],
nic["netinfo"] = objects.Network.ToDict(nobj)
return idict
- def _InstDictHvpBepDp(self, (instance, hvp, bep)):
+ def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
"""Wrapper for L{_InstDict}.
"""
- return self._InstDict(instance, hvp=hvp, bep=bep)
+ return self._InstDict(node, instance, hvp=hvp, bep=bep)
- def _InstDictOspDp(self, (instance, osparams)):
+ def _InstDictOspDp(self, node, (instance, osparams)):
"""Wrapper for L{_InstDict}.
"""
- return self._InstDict(instance, osp=osparams)
+ return self._InstDict(node, instance, osp=osparams)
- def _DisksDictDP(self, (disks, instance)):
+ def _DisksDictDP(self, node, (disks, instance)):
"""Wrapper for L{AnnotateDiskParams}.
"""
diskparams = self._cfg.GetInstanceDiskParams(instance)
- return [disk.ToDict()
- for disk in AnnotateDiskParams(instance.disk_template,
- disks, diskparams)]
+ ret = []
+ for disk in AnnotateDiskParams(disks, diskparams):
+ disk_node_uuids = disk.GetNodes(instance.primary_node)
+ node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
+ in self._cfg.GetMultiNodeInfo(disk_node_uuids))
+
+ disk.UpdateDynamicDiskParams(node, node_ips)
+
+ ret.append(disk.ToDict(include_dynamic_params=True))
- def _MultiDiskDictDP(self, disks_insts):
+ return ret
+
+ def _MultiDiskDictDP(self, node, disks_insts):
"""Wrapper for L{AnnotateDiskParams}.
Supports a list of (disk, instance) tuples.
"""
return [disk for disk_inst in disks_insts
- for disk in self._DisksDictDP(disk_inst)]
+ for disk in self._DisksDictDP(node, disk_inst)]
- def _SingleDiskDictDP(self, (disk, instance)):
+ def _SingleDiskDictDP(self, node, (disk, instance)):
"""Wrapper for L{AnnotateDiskParams}.
"""
- (anno_disk,) = self._DisksDictDP(([disk], instance))
+ (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
return anno_disk
+ def _EncodeNodeToDiskDictDP(self, node, value):
+ """Encode dict of node name -> list of (disk, instance) tuples as values.
+
+ """
+ return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
+ for name, disks in value.items())
+
+ def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
+ """Encodes import/export I/O information.
+
+ """
+ if ieio == constants.IEIO_RAW_DISK:
+ assert len(ieioargs) == 1
+ return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ))
+
+ if ieio == constants.IEIO_SCRIPT:
+ assert len(ieioargs) == 2
+ return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
+
+ return (ieio, ieioargs)
+
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
"""RPC wrappers for job queue.