else:
return encoder_fn(argkind)(value)
- def _Call(self, node_list, procedure, timeout, argdefs, args):
+ def _Call(self, cdef, node_list, args):
"""Entry point for automatically generated RPC wrappers.
"""
- assert len(args) == len(argdefs), "Wrong number of arguments"
+ (procedure, _, timeout, argdefs, postproc_fn, _) = cdef
- body = serializer.DumpJson(map(self._encoder, zip(argdefs, args)),
+ if callable(timeout):
+ read_timeout = timeout(args)
+ else:
+ read_timeout = timeout
+
+ body = serializer.DumpJson(map(self._encoder,
+ zip(map(compat.snd, argdefs), args)),
indent=False)
- return self._proc(node_list, procedure, body, read_timeout=timeout)
+ result = self._proc(node_list, procedure, body, read_timeout=read_timeout)
+
+ if postproc_fn:
+ return dict(map(lambda (key, value): (key, postproc_fn(value)),
+ result.items()))
+ else:
+ return result
def _ObjectToDict(value):
"""
return self._InstDict(instance, osp=osparams)
- @staticmethod
- def _MigrationStatusPostProc(result):
- if not result.fail_msg and result.payload is not None:
- result.payload = objects.MigrationStatus.FromDict(result.payload)
- return result
-
- @staticmethod
- def _BlockdevFindPostProc(result):
- if not result.fail_msg and result.payload is not None:
- result.payload = objects.BlockDevStatus.FromDict(result.payload)
- return result
-
- @staticmethod
- def _BlockdevGetMirrorStatusPostProc(result):
- if not result.fail_msg:
- result.payload = [objects.BlockDevStatus.FromDict(i)
- for i in result.payload]
- return result
-
- @staticmethod
- def _BlockdevGetMirrorStatusMultiPostProc(result):
- for nres in result.values():
- if nres.fail_msg:
- continue
-
- for idx, (success, status) in enumerate(nres.payload):
- if success:
- nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
-
- return result
-
- @staticmethod
- def _OsGetPostProc(result):
- if not result.fail_msg and isinstance(result.payload, dict):
- result.payload = objects.OS.FromDict(result.payload)
- return result
-
- @staticmethod
- def _ImpExpStatusPostProc(result):
- """Post-processor for import/export status.
-
- @rtype: Payload containing list of L{objects.ImportExportStatus} instances
- @return: Returns a list of the state of each named import/export or None if
- a status couldn't be retrieved
-
- """
- if not result.fail_msg:
- decoded = []
-
- for i in result.payload:
- if i is None:
- decoded.append(None)
- continue
- decoded.append(objects.ImportExportStatus.FromDict(i))
-
- result.payload = decoded
-
- return result
-
- #
- # Begin RPC calls
- #
-
- def call_test_delay(self, node_list, duration, read_timeout=None):
- """Sleep for a fixed time on given node(s).
-
- This is a multi-node call.
-
- """
- assert read_timeout is None
- return self.call_test_delay(node_list, duration,
- read_timeout=int(duration + 5))
-
class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
"""RPC wrappers for job queue.