+class _RpcClientBase:
+ def __init__(self, resolver, encoder_fn, lock_monitor_cb=None):
+ """Initializes this class.
+
+ """
+ self._proc = _RpcProcessor(resolver,
+ netutils.GetDaemonPort(constants.NODED),
+ lock_monitor_cb=lock_monitor_cb)
+ self._encoder = compat.partial(self._EncodeArg, encoder_fn)
+
+ @staticmethod
+ def _EncodeArg(encoder_fn, (argkind, value)):
+ """Encode argument.
+
+ """
+ if argkind is None:
+ return value
+ else:
+ return encoder_fn(argkind)(value)
+
+ def _Call(self, cdef, node_list, args):
+ """Entry point for automatically generated RPC wrappers.
+
+ """
+ (procedure, _, timeout, argdefs, postproc_fn, _) = cdef
+
+ if callable(timeout):
+ read_timeout = timeout(args)
+ else:
+ read_timeout = timeout
+
+ body = serializer.DumpJson(map(self._encoder,
+ zip(map(compat.snd, argdefs), args)),
+ indent=False)
+
+ result = self._proc(node_list, procedure, body, read_timeout=read_timeout)
+
+ if postproc_fn:
+ return dict(map(lambda (key, value): (key, postproc_fn(value)),
+ result.items()))
+ else:
+ return result
+
+
+def _ObjectToDict(value):
+ """Converts an object to a dictionary.
+
+ @note: See L{objects}.
+
+ """
+ return value.ToDict()
+
+
+def _ObjectListToDict(value):
+ """Converts a list of L{objects} to dictionaries.
+
+ """
+ return map(_ObjectToDict, value)
+
+
+def _EncodeNodeToDiskDict(value):
+ """Encodes a dictionary with node name as key and disk objects as values.
+
+ """
+ return dict((name, _ObjectListToDict(disks))
+ for name, disks in value.items())
+
+
+def _PrepareFileUpload(filename):
+ """Loads a file and prepares it for an upload to nodes.
+
+ """
+ data = _Compress(utils.ReadFile(filename))
+ st = os.stat(filename)
+ getents = runtime.GetEnts()
+ return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
+ getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
+
+
+def _PrepareFinalizeExportDisks(snap_disks):
+ """Encodes disks for finalizing export.
+
+ """
+ flat_disks = []
+
+ for disk in snap_disks:
+ if isinstance(disk, bool):
+ flat_disks.append(disk)
+ else:
+ flat_disks.append(disk.ToDict())
+
+ return flat_disks
+
+
+def _EncodeImportExportIO((ieio, ieioargs)):
+ """Encodes import/export I/O information.
+
+ """
+ if ieio == constants.IEIO_RAW_DISK:
+ assert len(ieioargs) == 1
+ return (ieio, (ieioargs[0].ToDict(), ))
+
+ if ieio == constants.IEIO_SCRIPT:
+ assert len(ieioargs) == 2
+ return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
+
+ return (ieio, ieioargs)
+
+
+def _EncodeBlockdevRename(value):
+ """Encodes information for renaming block devices.
+
+ """
+ return [(d.ToDict(), uid) for d, uid in value]
+
+
+#: Generic encoders
+_ENCODERS = {
+ rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
+ rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
+ rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
+ rpc_defs.ED_FILE_DETAILS: _PrepareFileUpload,
+ rpc_defs.ED_COMPRESS: _Compress,
+ rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
+ rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
+ rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
+ }
+
+
+class RpcRunner(_RpcClientBase,
+ _generated_rpc.RpcClientDefault,