gnt-cluster: Add hv/disk state to init
[ganeti-local] / lib / rpc.py
index 9626501..0bedb3c 100644 (file)
@@ -430,16 +430,28 @@ class _RpcClientBase:
     else:
       return encoder_fn(argkind)(value)
 
-  def _Call(self, node_list, procedure, timeout, argdefs, args):
+  def _Call(self, cdef, node_list, args):
     """Entry point for automatically generated RPC wrappers.
 
     """
-    assert len(args) == len(argdefs), "Wrong number of arguments"
+    (procedure, _, timeout, argdefs, postproc_fn, _) = cdef
 
-    body = serializer.DumpJson(map(self._encoder, zip(argdefs, args)),
+    if callable(timeout):
+      read_timeout = timeout(args)
+    else:
+      read_timeout = timeout
+
+    body = serializer.DumpJson(map(self._encoder,
+                                   zip(map(compat.snd, argdefs), args)),
                                indent=False)
 
-    return self._proc(node_list, procedure, body, read_timeout=timeout)
+    result = self._proc(node_list, procedure, body, read_timeout=read_timeout)
+
+    if postproc_fn:
+      return dict(map(lambda (key, value): (key, postproc_fn(value)),
+                      result.items()))
+    else:
+      return result
 
 
 def _ObjectToDict(value):
@@ -614,79 +626,6 @@ class RpcRunner(_RpcClientBase,
     """
     return self._InstDict(instance, osp=osparams)
 
-  @staticmethod
-  def _MigrationStatusPostProc(result):
-    if not result.fail_msg and result.payload is not None:
-      result.payload = objects.MigrationStatus.FromDict(result.payload)
-    return result
-
-  @staticmethod
-  def _BlockdevFindPostProc(result):
-    if not result.fail_msg and result.payload is not None:
-      result.payload = objects.BlockDevStatus.FromDict(result.payload)
-    return result
-
-  @staticmethod
-  def _BlockdevGetMirrorStatusPostProc(result):
-    if not result.fail_msg:
-      result.payload = [objects.BlockDevStatus.FromDict(i)
-                        for i in result.payload]
-    return result
-
-  @staticmethod
-  def _BlockdevGetMirrorStatusMultiPostProc(result):
-    for nres in result.values():
-      if nres.fail_msg:
-        continue
-
-      for idx, (success, status) in enumerate(nres.payload):
-        if success:
-          nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
-
-    return result
-
-  @staticmethod
-  def _OsGetPostProc(result):
-    if not result.fail_msg and isinstance(result.payload, dict):
-      result.payload = objects.OS.FromDict(result.payload)
-    return result
-
-  @staticmethod
-  def _ImpExpStatusPostProc(result):
-    """Post-processor for import/export status.
-
-    @rtype: Payload containing list of L{objects.ImportExportStatus} instances
-    @return: Returns a list of the state of each named import/export or None if
-             a status couldn't be retrieved
-
-    """
-    if not result.fail_msg:
-      decoded = []
-
-      for i in result.payload:
-        if i is None:
-          decoded.append(None)
-          continue
-        decoded.append(objects.ImportExportStatus.FromDict(i))
-
-      result.payload = decoded
-
-    return result
-
-  #
-  # Begin RPC calls
-  #
-
-  def call_test_delay(self, node_list, duration, read_timeout=None):
-    """Sleep for a fixed time on given node(s).
-
-    This is a multi-node call.
-
-    """
-    assert read_timeout is None
-    return self.call_test_delay(node_list, duration,
-                                read_timeout=int(duration + 5))
-
 
 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
   """RPC wrappers for job queue.