Factorize logging setup in tools
[ganeti-local] / test / ganeti.rpc_unittest.py
index 21f536d..ace49af 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
 #!/usr/bin/python
 #
 
-# Copyright (C) 2010 Google Inc.
+# Copyright (C) 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 import os
 import sys
 import unittest
 import os
 import sys
 import unittest
+import random
+import tempfile
 
 from ganeti import constants
 from ganeti import compat
 from ganeti import rpc
 
 from ganeti import constants
 from ganeti import compat
 from ganeti import rpc
+from ganeti import rpc_defs
 from ganeti import http
 from ganeti import errors
 from ganeti import serializer
 from ganeti import objects
 from ganeti import http
 from ganeti import errors
 from ganeti import serializer
 from ganeti import objects
+from ganeti import backend
 
 import testutils
 
 import testutils
+import mocks
 
 
 class _FakeRequestProcessor:
 
 
 class _FakeRequestProcessor:
@@ -56,6 +61,13 @@ def GetFakeSimpleStoreClass(fn):
   return FakeSimpleStore
 
 
   return FakeSimpleStore
 
 
+def _RaiseNotImplemented():
+  """Simple wrapper to raise NotImplementedError.
+
+  """
+  raise NotImplementedError
+
+
 class TestRpcProcessor(unittest.TestCase):
   def _FakeAddressLookup(self, map):
     return lambda node_list: [map.get(node) for node in node_list]
 class TestRpcProcessor(unittest.TestCase):
   def _FakeAddressLookup(self, map):
     return lambda node_list: [map.get(node) for node in node_list]
@@ -64,7 +76,7 @@ class TestRpcProcessor(unittest.TestCase):
     self.assertEqual(req.host, "127.0.0.1")
     self.assertEqual(req.port, 24094)
     self.assertEqual(req.path, "/version")
     self.assertEqual(req.host, "127.0.0.1")
     self.assertEqual(req.port, 24094)
     self.assertEqual(req.path, "/version")
-    self.assertEqual(req.read_timeout, rpc._TMO_URGENT)
+    self.assertEqual(req.read_timeout, constants.RPC_TMO_URGENT)
     req.success = True
     req.resp_status_code = http.HTTP_OK
     req.resp_body = serializer.DumpJson((True, 123))
     req.success = True
     req.resp_status_code = http.HTTP_OK
     req.resp_body = serializer.DumpJson((True, 123))
@@ -73,7 +85,8 @@ class TestRpcProcessor(unittest.TestCase):
     resolver = rpc._StaticResolver(["127.0.0.1"])
     http_proc = _FakeRequestProcessor(self._GetVersionResponse)
     proc = rpc._RpcProcessor(resolver, 24094)
     resolver = rpc._StaticResolver(["127.0.0.1"])
     http_proc = _FakeRequestProcessor(self._GetVersionResponse)
     proc = rpc._RpcProcessor(resolver, 24094)
-    result = proc(["localhost"], "version", None, _req_process_fn=http_proc)
+    result = proc(["localhost"], "version", {"localhost": ""}, 60,
+                  NotImplemented, _req_process_fn=http_proc)
     self.assertEqual(result.keys(), ["localhost"])
     lhresp = result["localhost"]
     self.assertFalse(lhresp.offline)
     self.assertEqual(result.keys(), ["localhost"])
     lhresp = result["localhost"]
     self.assertFalse(lhresp.offline)
@@ -97,12 +110,14 @@ class TestRpcProcessor(unittest.TestCase):
     resolver = rpc._StaticResolver(["192.0.2.13"])
     http_proc = _FakeRequestProcessor(self._ReadTimeoutResponse)
     proc = rpc._RpcProcessor(resolver, 19176)
     resolver = rpc._StaticResolver(["192.0.2.13"])
     http_proc = _FakeRequestProcessor(self._ReadTimeoutResponse)
     proc = rpc._RpcProcessor(resolver, 19176)
-    result = proc(["node31856"], "version", None, _req_process_fn=http_proc,
-                  read_timeout=12356)
-    self.assertEqual(result.keys(), ["node31856"])
-    lhresp = result["node31856"]
+    host = "node31856"
+    body = {host: ""}
+    result = proc([host], "version", body, 12356, NotImplemented,
+                  _req_process_fn=http_proc)
+    self.assertEqual(result.keys(), [host])
+    lhresp = result[host]
     self.assertFalse(lhresp.offline)
     self.assertFalse(lhresp.offline)
-    self.assertEqual(lhresp.node, "node31856")
+    self.assertEqual(lhresp.node, host)
     self.assertFalse(lhresp.fail_msg)
     self.assertEqual(lhresp.payload, -1)
     self.assertEqual(lhresp.call, "version")
     self.assertFalse(lhresp.fail_msg)
     self.assertEqual(lhresp.payload, -1)
     self.assertEqual(lhresp.call, "version")
@@ -113,11 +128,14 @@ class TestRpcProcessor(unittest.TestCase):
     resolver = rpc._StaticResolver([rpc._OFFLINE])
     http_proc = _FakeRequestProcessor(NotImplemented)
     proc = rpc._RpcProcessor(resolver, 30668)
     resolver = rpc._StaticResolver([rpc._OFFLINE])
     http_proc = _FakeRequestProcessor(NotImplemented)
     proc = rpc._RpcProcessor(resolver, 30668)
-    result = proc(["n17296"], "version", None, _req_process_fn=http_proc)
-    self.assertEqual(result.keys(), ["n17296"])
-    lhresp = result["n17296"]
+    host = "n17296"
+    body = {host: ""}
+    result = proc([host], "version", body, 60, NotImplemented,
+                  _req_process_fn=http_proc)
+    self.assertEqual(result.keys(), [host])
+    lhresp = result[host]
     self.assertTrue(lhresp.offline)
     self.assertTrue(lhresp.offline)
-    self.assertEqual(lhresp.node, "n17296")
+    self.assertEqual(lhresp.node, host)
     self.assertTrue(lhresp.fail_msg)
     self.assertFalse(lhresp.payload)
     self.assertEqual(lhresp.call, "version")
     self.assertTrue(lhresp.fail_msg)
     self.assertFalse(lhresp.payload)
     self.assertEqual(lhresp.call, "version")
@@ -140,10 +158,12 @@ class TestRpcProcessor(unittest.TestCase):
 
   def testMultiVersionSuccess(self):
     nodes = ["node%s" % i for i in range(50)]
 
   def testMultiVersionSuccess(self):
     nodes = ["node%s" % i for i in range(50)]
+    body = dict((n, "") for n in nodes)
     resolver = rpc._StaticResolver(nodes)
     http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse)
     proc = rpc._RpcProcessor(resolver, 23245)
     resolver = rpc._StaticResolver(nodes)
     http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse)
     proc = rpc._RpcProcessor(resolver, 23245)
-    result = proc(nodes, "version", None, _req_process_fn=http_proc)
+    result = proc(nodes, "version", body, 60, NotImplemented,
+                  _req_process_fn=http_proc)
     self.assertEqual(sorted(result.keys()), sorted(nodes))
 
     for name in nodes:
     self.assertEqual(sorted(result.keys()), sorted(nodes))
 
     for name in nodes:
@@ -170,12 +190,14 @@ class TestRpcProcessor(unittest.TestCase):
       http_proc = \
         _FakeRequestProcessor(compat.partial(self._GetVersionResponseFail,
                                              errinfo))
       http_proc = \
         _FakeRequestProcessor(compat.partial(self._GetVersionResponseFail,
                                              errinfo))
-      result = proc(["aef9ur4i.example.com"], "version", None,
+      host = "aef9ur4i.example.com"
+      body = {host: ""}
+      result = proc(body.keys(), "version", body, 60, NotImplemented,
                     _req_process_fn=http_proc)
                     _req_process_fn=http_proc)
-      self.assertEqual(result.keys(), ["aef9ur4i.example.com"])
-      lhresp = result["aef9ur4i.example.com"]
+      self.assertEqual(result.keys(), [host])
+      lhresp = result[host]
       self.assertFalse(lhresp.offline)
       self.assertFalse(lhresp.offline)
-      self.assertEqual(lhresp.node, "aef9ur4i.example.com")
+      self.assertEqual(lhresp.node, host)
       self.assert_(lhresp.fail_msg)
       self.assertFalse(lhresp.payload)
       self.assertEqual(lhresp.call, "version")
       self.assert_(lhresp.fail_msg)
       self.assertFalse(lhresp.payload)
       self.assertEqual(lhresp.call, "version")
@@ -205,6 +227,7 @@ class TestRpcProcessor(unittest.TestCase):
 
   def testHttpError(self):
     nodes = ["uaf6pbbv%s" % i for i in range(50)]
 
   def testHttpError(self):
     nodes = ["uaf6pbbv%s" % i for i in range(50)]
+    body = dict((n, "") for n in nodes)
     resolver = rpc._StaticResolver(nodes)
 
     httperrnodes = set(nodes[1::7])
     resolver = rpc._StaticResolver(nodes)
 
     httperrnodes = set(nodes[1::7])
@@ -219,8 +242,9 @@ class TestRpcProcessor(unittest.TestCase):
     http_proc = \
       _FakeRequestProcessor(compat.partial(self._GetHttpErrorResponse,
                                            httperrnodes, failnodes))
     http_proc = \
       _FakeRequestProcessor(compat.partial(self._GetHttpErrorResponse,
                                            httperrnodes, failnodes))
-    result = proc(nodes, "vg_list", None, _req_process_fn=http_proc,
-                  read_timeout=rpc._TMO_URGENT)
+    result = proc(nodes, "vg_list", body,
+                  constants.RPC_TMO_URGENT, NotImplemented,
+                  _req_process_fn=http_proc)
     self.assertEqual(sorted(result.keys()), sorted(nodes))
 
     for name in nodes:
     self.assertEqual(sorted(result.keys()), sorted(nodes))
 
     for name in nodes:
@@ -262,12 +286,14 @@ class TestRpcProcessor(unittest.TestCase):
 
     for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]:
       http_proc = _FakeRequestProcessor(fn)
 
     for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]:
       http_proc = _FakeRequestProcessor(fn)
-      result = proc(["oqo7lanhly.example.com"], "version", None,
+      host = "oqo7lanhly.example.com"
+      body = {host: ""}
+      result = proc([host], "version", body, 60, NotImplemented,
                     _req_process_fn=http_proc)
                     _req_process_fn=http_proc)
-      self.assertEqual(result.keys(), ["oqo7lanhly.example.com"])
-      lhresp = result["oqo7lanhly.example.com"]
+      self.assertEqual(result.keys(), [host])
+      lhresp = result[host]
       self.assertFalse(lhresp.offline)
       self.assertFalse(lhresp.offline)
-      self.assertEqual(lhresp.node, "oqo7lanhly.example.com")
+      self.assertEqual(lhresp.node, host)
       self.assert_(lhresp.fail_msg)
       self.assertFalse(lhresp.payload)
       self.assertEqual(lhresp.call, "version")
       self.assert_(lhresp.fail_msg)
       self.assertFalse(lhresp.payload)
       self.assertEqual(lhresp.call, "version")
@@ -292,12 +318,14 @@ class TestRpcProcessor(unittest.TestCase):
     http_proc = _FakeRequestProcessor(compat.partial(self._GetBodyTestResponse,
                                                      test_data))
     proc = rpc._RpcProcessor(resolver, 18700)
     http_proc = _FakeRequestProcessor(compat.partial(self._GetBodyTestResponse,
                                                      test_data))
     proc = rpc._RpcProcessor(resolver, 18700)
-    body = serializer.DumpJson(test_data)
-    result = proc(["node19759"], "upload_file", body, _req_process_fn=http_proc)
-    self.assertEqual(result.keys(), ["node19759"])
-    lhresp = result["node19759"]
+    host = "node19759"
+    body = {host: serializer.DumpJson(test_data)}
+    result = proc([host], "upload_file", body, 30, NotImplemented,
+                  _req_process_fn=http_proc)
+    self.assertEqual(result.keys(), [host])
+    lhresp = result[host]
     self.assertFalse(lhresp.offline)
     self.assertFalse(lhresp.offline)
-    self.assertEqual(lhresp.node, "node19759")
+    self.assertEqual(lhresp.node, host)
     self.assertFalse(lhresp.fail_msg)
     self.assertEqual(lhresp.payload, None)
     self.assertEqual(lhresp.call, "upload_file")
     self.assertFalse(lhresp.fail_msg)
     self.assertEqual(lhresp.payload, None)
     self.assertEqual(lhresp.call, "upload_file")
@@ -311,7 +339,8 @@ class TestSsconfResolver(unittest.TestCase):
     node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
     node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
     ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
     node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
     node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
     ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
-    result = rpc._SsconfResolver(node_list, ssc=ssc, nslookup_fn=NotImplemented)
+    result = rpc._SsconfResolver(True, node_list, NotImplemented,
+                                 ssc=ssc, nslookup_fn=NotImplemented)
     self.assertEqual(result, zip(node_list, addr_list))
 
   def testNsLookup(self):
     self.assertEqual(result, zip(node_list, addr_list))
 
   def testNsLookup(self):
@@ -320,7 +349,18 @@ class TestSsconfResolver(unittest.TestCase):
     ssc = GetFakeSimpleStoreClass(lambda _: [])
     node_addr_map = dict(zip(node_list, addr_list))
     nslookup_fn = lambda name, family=None: node_addr_map.get(name)
     ssc = GetFakeSimpleStoreClass(lambda _: [])
     node_addr_map = dict(zip(node_list, addr_list))
     nslookup_fn = lambda name, family=None: node_addr_map.get(name)
-    result = rpc._SsconfResolver(node_list, ssc=ssc, nslookup_fn=nslookup_fn)
+    result = rpc._SsconfResolver(True, node_list, NotImplemented,
+                                 ssc=ssc, nslookup_fn=nslookup_fn)
+    self.assertEqual(result, zip(node_list, addr_list))
+
+  def testDisabledSsconfIp(self):
+    addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
+    node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
+    ssc = GetFakeSimpleStoreClass(_RaiseNotImplemented)
+    node_addr_map = dict(zip(node_list, addr_list))
+    nslookup_fn = lambda name, family=None: node_addr_map.get(name)
+    result = rpc._SsconfResolver(False, node_list, NotImplemented,
+                                 ssc=ssc, nslookup_fn=nslookup_fn)
     self.assertEqual(result, zip(node_list, addr_list))
 
   def testBothLookups(self):
     self.assertEqual(result, zip(node_list, addr_list))
 
   def testBothLookups(self):
@@ -331,7 +371,8 @@ class TestSsconfResolver(unittest.TestCase):
     ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
     node_addr_map = dict(zip(node_list[:n], addr_list[:n]))
     nslookup_fn = lambda name, family=None: node_addr_map.get(name)
     ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
     node_addr_map = dict(zip(node_list[:n], addr_list[:n]))
     nslookup_fn = lambda name, family=None: node_addr_map.get(name)
-    result = rpc._SsconfResolver(node_list, ssc=ssc, nslookup_fn=nslookup_fn)
+    result = rpc._SsconfResolver(True, node_list, NotImplemented,
+                                 ssc=ssc, nslookup_fn=nslookup_fn)
     self.assertEqual(result, zip(node_list, addr_list))
 
   def testAddressLookupIPv6(self):
     self.assertEqual(result, zip(node_list, addr_list))
 
   def testAddressLookupIPv6(self):
@@ -339,7 +380,8 @@ class TestSsconfResolver(unittest.TestCase):
     node_list = ["node%d.example.com" % n for n in range(0, 255, 11)]
     node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
     ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
     node_list = ["node%d.example.com" % n for n in range(0, 255, 11)]
     node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
     ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
-    result = rpc._SsconfResolver(node_list, ssc=ssc, nslookup_fn=NotImplemented)
+    result = rpc._SsconfResolver(True, node_list, NotImplemented,
+                                 ssc=ssc, nslookup_fn=NotImplemented)
     self.assertEqual(result, zip(node_list, addr_list))
 
 
     self.assertEqual(result, zip(node_list, addr_list))
 
 
@@ -348,11 +390,11 @@ class TestStaticResolver(unittest.TestCase):
     addresses = ["192.0.2.%d" % n for n in range(0, 123, 7)]
     nodes = ["node%s.example.com" % n for n in range(0, 123, 7)]
     res = rpc._StaticResolver(addresses)
     addresses = ["192.0.2.%d" % n for n in range(0, 123, 7)]
     nodes = ["node%s.example.com" % n for n in range(0, 123, 7)]
     res = rpc._StaticResolver(addresses)
-    self.assertEqual(res(nodes), zip(nodes, addresses))
+    self.assertEqual(res(nodes, NotImplemented), zip(nodes, addresses))
 
   def testWrongLength(self):
     res = rpc._StaticResolver([])
 
   def testWrongLength(self):
     res = rpc._StaticResolver([])
-    self.assertRaises(AssertionError, res, ["abc"])
+    self.assertRaises(AssertionError, res, ["abc"], NotImplemented)
 
 
 class TestNodeConfigResolver(unittest.TestCase):
 
 
 class TestNodeConfigResolver(unittest.TestCase):
@@ -369,24 +411,35 @@ class TestNodeConfigResolver(unittest.TestCase):
   def testSingleOnline(self):
     self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOnlineNode,
                                              NotImplemented,
   def testSingleOnline(self):
     self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOnlineNode,
                                              NotImplemented,
-                                             ["node90.example.com"]),
+                                             ["node90.example.com"], None),
                      [("node90.example.com", "192.0.2.90")])
 
   def testSingleOffline(self):
     self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOfflineNode,
                                              NotImplemented,
                      [("node90.example.com", "192.0.2.90")])
 
   def testSingleOffline(self):
     self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOfflineNode,
                                              NotImplemented,
-                                             ["node100.example.com"]),
+                                             ["node100.example.com"], None),
                      [("node100.example.com", rpc._OFFLINE)])
 
                      [("node100.example.com", rpc._OFFLINE)])
 
+  def testSingleOfflineWithAcceptOffline(self):
+    fn = self._GetSingleOfflineNode
+    assert fn("node100.example.com").offline
+    self.assertEqual(rpc._NodeConfigResolver(fn, NotImplemented,
+                                             ["node100.example.com"],
+                                             rpc_defs.ACCEPT_OFFLINE_NODE),
+                     [("node100.example.com", "192.0.2.100")])
+    for i in [False, True, "", "Hello", 0, 1]:
+      self.assertRaises(AssertionError, rpc._NodeConfigResolver,
+                        fn, NotImplemented, ["node100.example.com"], i)
+
   def testUnknownSingleNode(self):
     self.assertEqual(rpc._NodeConfigResolver(lambda _: None, NotImplemented,
   def testUnknownSingleNode(self):
     self.assertEqual(rpc._NodeConfigResolver(lambda _: None, NotImplemented,
-                                             ["node110.example.com"]),
+                                             ["node110.example.com"], None),
                      [("node110.example.com", "node110.example.com")])
 
   def testMultiEmpty(self):
     self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
                                              lambda: {},
                      [("node110.example.com", "node110.example.com")])
 
   def testMultiEmpty(self):
     self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
                                              lambda: {},
-                                             []),
+                                             [], None),
                      [])
 
   def testMultiSomeOffline(self):
                      [])
 
   def testMultiSomeOffline(self):
@@ -399,7 +452,7 @@ class TestNodeConfigResolver(unittest.TestCase):
     # Resolve no names
     self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
                                              lambda: nodes,
     # Resolve no names
     self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
                                              lambda: nodes,
-                                             []),
+                                             [], None),
                      [])
 
     # Offline, online and unknown hosts
                      [])
 
     # Offline, online and unknown hosts
@@ -408,7 +461,8 @@ class TestNodeConfigResolver(unittest.TestCase):
                                              ["node3.example.com",
                                               "node92.example.com",
                                               "node54.example.com",
                                              ["node3.example.com",
                                               "node92.example.com",
                                               "node54.example.com",
-                                              "unknown.example.com",]), [
+                                              "unknown.example.com",],
+                                             None), [
       ("node3.example.com", rpc._OFFLINE),
       ("node92.example.com", "192.0.2.92"),
       ("node54.example.com", rpc._OFFLINE),
       ("node3.example.com", rpc._OFFLINE),
       ("node92.example.com", "192.0.2.92"),
       ("node54.example.com", rpc._OFFLINE),
@@ -416,5 +470,424 @@ class TestNodeConfigResolver(unittest.TestCase):
       ])
 
 
       ])
 
 
+class TestCompress(unittest.TestCase):
+  def test(self):
+    for data in ["", "Hello", "Hello World!\nnew\nlines"]:
+      self.assertEqual(rpc._Compress(data),
+                       (constants.RPC_ENCODING_NONE, data))
+
+    for data in [512 * " ", 5242 * "Hello World!\n"]:
+      compressed = rpc._Compress(data)
+      self.assertEqual(len(compressed), 2)
+      self.assertEqual(backend._Decompress(compressed), data)
+
+  def testDecompression(self):
+    self.assertRaises(AssertionError, backend._Decompress, "")
+    self.assertRaises(AssertionError, backend._Decompress, [""])
+    self.assertRaises(AssertionError, backend._Decompress,
+                      ("unknown compression", "data"))
+    self.assertRaises(Exception, backend._Decompress,
+                      (constants.RPC_ENCODING_ZLIB_BASE64, "invalid zlib data"))
+
+
+class TestRpcClientBase(unittest.TestCase):
+  def testNoHosts(self):
+    cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_SLOW, [],
+            None, None, NotImplemented)
+    http_proc = _FakeRequestProcessor(NotImplemented)
+    client = rpc._RpcClientBase(rpc._StaticResolver([]), NotImplemented,
+                                _req_process_fn=http_proc)
+    self.assertEqual(client._Call(cdef, [], []), {})
+
+    # Test wrong number of arguments
+    self.assertRaises(errors.ProgrammerError, client._Call,
+                      cdef, [], [0, 1, 2])
+
+  def testTimeout(self):
+    def _CalcTimeout((arg1, arg2)):
+      return arg1 + arg2
+
+    def _VerifyRequest(exp_timeout, req):
+      self.assertEqual(req.read_timeout, exp_timeout)
+
+      req.success = True
+      req.resp_status_code = http.HTTP_OK
+      req.resp_body = serializer.DumpJson((True, hex(req.read_timeout)))
+
+    resolver = rpc._StaticResolver([
+      "192.0.2.1",
+      "192.0.2.2",
+      ])
+
+    nodes = [
+      "node1.example.com",
+      "node2.example.com",
+      ]
+
+    tests = [(100, None, 100), (30, None, 30)]
+    tests.extend((_CalcTimeout, i, i + 300)
+                 for i in [0, 5, 16485, 30516])
+
+    for timeout, arg1, exp_timeout in tests:
+      cdef = ("test_call", NotImplemented, None, timeout, [
+        ("arg1", None, NotImplemented),
+        ("arg2", None, NotImplemented),
+        ], None, None, NotImplemented)
+
+      http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest,
+                                                       exp_timeout))
+      client = rpc._RpcClientBase(resolver, NotImplemented,
+                                  _req_process_fn=http_proc)
+      result = client._Call(cdef, nodes, [arg1, 300])
+      self.assertEqual(len(result), len(nodes))
+      self.assertTrue(compat.all(not res.fail_msg and
+                                 res.payload == hex(exp_timeout)
+                                 for res in result.values()))
+
+  def testArgumentEncoder(self):
+    (AT1, AT2) = range(1, 3)
+
+    resolver = rpc._StaticResolver([
+      "192.0.2.5",
+      "192.0.2.6",
+      ])
+
+    nodes = [
+      "node5.example.com",
+      "node6.example.com",
+      ]
+
+    encoders = {
+      AT1: hex,
+      AT2: hash,
+      }
+
+    cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [
+      ("arg0", None, NotImplemented),
+      ("arg1", AT1, NotImplemented),
+      ("arg1", AT2, NotImplemented),
+      ], None, None, NotImplemented)
+
+    def _VerifyRequest(req):
+      req.success = True
+      req.resp_status_code = http.HTTP_OK
+      req.resp_body = serializer.DumpJson((True, req.post_data))
+
+    http_proc = _FakeRequestProcessor(_VerifyRequest)
+
+    for num in [0, 3796, 9032119]:
+      client = rpc._RpcClientBase(resolver, encoders.get,
+                                  _req_process_fn=http_proc)
+      result = client._Call(cdef, nodes, ["foo", num, "Hello%s" % num])
+      self.assertEqual(len(result), len(nodes))
+      for res in result.values():
+        self.assertFalse(res.fail_msg)
+        self.assertEqual(serializer.LoadJson(res.payload),
+                         ["foo", hex(num), hash("Hello%s" % num)])
+
+  def testPostProc(self):
+    def _VerifyRequest(nums, req):
+      req.success = True
+      req.resp_status_code = http.HTTP_OK
+      req.resp_body = serializer.DumpJson((True, nums))
+
+    resolver = rpc._StaticResolver([
+      "192.0.2.90",
+      "192.0.2.95",
+      ])
+
+    nodes = [
+      "node90.example.com",
+      "node95.example.com",
+      ]
+
+    def _PostProc(res):
+      self.assertFalse(res.fail_msg)
+      res.payload = sum(res.payload)
+      return res
+
+    cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [],
+            None, _PostProc, NotImplemented)
+
+    # Seeded random generator
+    rnd = random.Random(20299)
+
+    for i in [0, 4, 74, 1391]:
+      nums = [rnd.randint(0, 1000) for _ in range(i)]
+      http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest, nums))
+      client = rpc._RpcClientBase(resolver, NotImplemented,
+                                  _req_process_fn=http_proc)
+      result = client._Call(cdef, nodes, [])
+      self.assertEqual(len(result), len(nodes))
+      for res in result.values():
+        self.assertFalse(res.fail_msg)
+        self.assertEqual(res.payload, sum(nums))
+
+  def testPreProc(self):
+    def _VerifyRequest(req):
+      req.success = True
+      req.resp_status_code = http.HTTP_OK
+      req.resp_body = serializer.DumpJson((True, req.post_data))
+
+    resolver = rpc._StaticResolver([
+      "192.0.2.30",
+      "192.0.2.35",
+      ])
+
+    nodes = [
+      "node30.example.com",
+      "node35.example.com",
+      ]
+
+    def _PreProc(node, data):
+      self.assertEqual(len(data), 1)
+      return data[0] + node
+
+    cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [
+      ("arg0", None, NotImplemented),
+      ], _PreProc, None, NotImplemented)
+
+    http_proc = _FakeRequestProcessor(_VerifyRequest)
+    client = rpc._RpcClientBase(resolver, NotImplemented,
+                                _req_process_fn=http_proc)
+
+    for prefix in ["foo", "bar", "baz"]:
+      result = client._Call(cdef, nodes, [prefix])
+      self.assertEqual(len(result), len(nodes))
+      for (idx, (node, res)) in enumerate(result.items()):
+        self.assertFalse(res.fail_msg)
+        self.assertEqual(serializer.LoadJson(res.payload), prefix + node)
+
+  def testResolverOptions(self):
+    def _VerifyRequest(req):
+      req.success = True
+      req.resp_status_code = http.HTTP_OK
+      req.resp_body = serializer.DumpJson((True, req.post_data))
+
+    nodes = [
+      "node30.example.com",
+      "node35.example.com",
+      ]
+
+    def _Resolver(expected, hosts, options):
+      self.assertEqual(hosts, nodes)
+      self.assertEqual(options, expected)
+      return zip(hosts, nodes)
+
+    def _DynamicResolverOptions((arg0, )):
+      return sum(arg0)
+
+    tests = [
+      (None, None, None),
+      (rpc_defs.ACCEPT_OFFLINE_NODE, None, rpc_defs.ACCEPT_OFFLINE_NODE),
+      (False, None, False),
+      (True, None, True),
+      (0, None, 0),
+      (_DynamicResolverOptions, [1, 2, 3], 6),
+      (_DynamicResolverOptions, range(4, 19), 165),
+      ]
+
+    for (resolver_opts, arg0, expected) in tests:
+      cdef = ("test_call", NotImplemented, resolver_opts,
+              constants.RPC_TMO_NORMAL, [
+        ("arg0", None, NotImplemented),
+        ], None, None, NotImplemented)
+
+      http_proc = _FakeRequestProcessor(_VerifyRequest)
+
+      client = rpc._RpcClientBase(compat.partial(_Resolver, expected),
+                                  NotImplemented, _req_process_fn=http_proc)
+      result = client._Call(cdef, nodes, [arg0])
+      self.assertEqual(len(result), len(nodes))
+      for (idx, (node, res)) in enumerate(result.items()):
+        self.assertFalse(res.fail_msg)
+
+
+class _FakeConfigForRpcRunner:
+  GetAllNodesInfo = NotImplemented
+
+  def __init__(self, cluster=NotImplemented):
+    self._cluster = cluster
+
+  def GetNodeInfo(self, name):
+    return objects.Node(name=name)
+
+  def GetClusterInfo(self):
+    return self._cluster
+
+  def GetInstanceDiskParams(self, _):
+    return constants.DISK_DT_DEFAULTS
+
+
+class TestRpcRunner(unittest.TestCase):
+  def testUploadFile(self):
+    data = 1779 * "Hello World\n"
+
+    tmpfile = tempfile.NamedTemporaryFile()
+    tmpfile.write(data)
+    tmpfile.flush()
+    st = os.stat(tmpfile.name)
+
+    def _VerifyRequest(req):
+      (uldata, ) = serializer.LoadJson(req.post_data)
+      self.assertEqual(len(uldata), 7)
+      self.assertEqual(uldata[0], tmpfile.name)
+      self.assertEqual(list(uldata[1]), list(rpc._Compress(data)))
+      self.assertEqual(uldata[2], st.st_mode)
+      self.assertEqual(uldata[3], "user%s" % os.getuid())
+      self.assertEqual(uldata[4], "group%s" % os.getgid())
+      self.assertTrue(uldata[5] is not None)
+      self.assertEqual(uldata[6], st.st_mtime)
+
+      req.success = True
+      req.resp_status_code = http.HTTP_OK
+      req.resp_body = serializer.DumpJson((True, None))
+
+    http_proc = _FakeRequestProcessor(_VerifyRequest)
+
+    std_runner = rpc.RpcRunner(_FakeConfigForRpcRunner(), None,
+                               _req_process_fn=http_proc,
+                               _getents=mocks.FakeGetentResolver)
+
+    cfg_runner = rpc.ConfigRunner(None, ["192.0.2.13"],
+                                  _req_process_fn=http_proc,
+                                  _getents=mocks.FakeGetentResolver)
+
+    nodes = [
+      "node1.example.com",
+      ]
+
+    for runner in [std_runner, cfg_runner]:
+      result = runner.call_upload_file(nodes, tmpfile.name)
+      self.assertEqual(len(result), len(nodes))
+      for (idx, (node, res)) in enumerate(result.items()):
+        self.assertFalse(res.fail_msg)
+
+  def testEncodeInstance(self):
+    cluster = objects.Cluster(hvparams={
+      constants.HT_KVM: {
+        constants.HV_BLOCKDEV_PREFIX: "foo",
+        },
+      },
+      beparams={
+        constants.PP_DEFAULT: {
+          constants.BE_MAXMEM: 8192,
+          },
+        },
+      os_hvp={},
+      osparams={
+        "linux": {
+          "role": "unknown",
+          },
+        })
+    cluster.UpgradeConfig()
+
+    inst = objects.Instance(name="inst1.example.com",
+      hypervisor=constants.HT_FAKE,
+      os="linux",
+      hvparams={
+        constants.HT_KVM: {
+          constants.HV_BLOCKDEV_PREFIX: "bar",
+          constants.HV_ROOT_PATH: "/tmp",
+          },
+        },
+      beparams={
+        constants.BE_MINMEM: 128,
+        constants.BE_MAXMEM: 256,
+        },
+      nics=[
+        objects.NIC(nicparams={
+          constants.NIC_MODE: "mymode",
+          }),
+        ],
+      disk_template=constants.DT_PLAIN,
+      disks=[
+        objects.Disk(dev_type=constants.LD_LV, size=4096,
+                     logical_id=("vg", "disk6120")),
+        objects.Disk(dev_type=constants.LD_LV, size=1024,
+                     logical_id=("vg", "disk8508")),
+        ])
+    inst.UpgradeConfig()
+
+    cfg = _FakeConfigForRpcRunner(cluster=cluster)
+    runner = rpc.RpcRunner(cfg, None,
+                           _req_process_fn=NotImplemented,
+                           _getents=mocks.FakeGetentResolver)
+
+    def _CheckBasics(result):
+      self.assertEqual(result["name"], "inst1.example.com")
+      self.assertEqual(result["os"], "linux")
+      self.assertEqual(result["beparams"][constants.BE_MINMEM], 128)
+      self.assertEqual(len(result["hvparams"]), 1)
+      self.assertEqual(len(result["nics"]), 1)
+      self.assertEqual(result["nics"][0]["nicparams"][constants.NIC_MODE],
+                       "mymode")
+
+    # Generic object serialization
+    result = runner._encoder((rpc_defs.ED_OBJECT_DICT, inst))
+    _CheckBasics(result)
+
+    result = runner._encoder((rpc_defs.ED_OBJECT_DICT_LIST, 5 * [inst]))
+    map(_CheckBasics, result)
+
+    # Just an instance
+    result = runner._encoder((rpc_defs.ED_INST_DICT, inst))
+    _CheckBasics(result)
+    self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256)
+    self.assertEqual(result["hvparams"][constants.HT_KVM], {
+      constants.HV_BLOCKDEV_PREFIX: "bar",
+      constants.HV_ROOT_PATH: "/tmp",
+      })
+    self.assertEqual(result["osparams"], {
+      "role": "unknown",
+      })
+
+    # Instance with OS parameters
+    result = runner._encoder((rpc_defs.ED_INST_DICT_OSP_DP, (inst, {
+      "role": "webserver",
+      "other": "field",
+      })))
+    _CheckBasics(result)
+    self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256)
+    self.assertEqual(result["hvparams"][constants.HT_KVM], {
+      constants.HV_BLOCKDEV_PREFIX: "bar",
+      constants.HV_ROOT_PATH: "/tmp",
+      })
+    self.assertEqual(result["osparams"], {
+      "role": "webserver",
+      "other": "field",
+      })
+
+    # Instance with hypervisor and backend parameters
+    result = runner._encoder((rpc_defs.ED_INST_DICT_HVP_BEP_DP, (inst, {
+      constants.HT_KVM: {
+        constants.HV_BOOT_ORDER: "xyz",
+        },
+      }, {
+      constants.BE_VCPUS: 100,
+      constants.BE_MAXMEM: 4096,
+      })))
+    _CheckBasics(result)
+    self.assertEqual(result["beparams"][constants.BE_MAXMEM], 4096)
+    self.assertEqual(result["beparams"][constants.BE_VCPUS], 100)
+    self.assertEqual(result["hvparams"][constants.HT_KVM], {
+      constants.HV_BOOT_ORDER: "xyz",
+      })
+    self.assertEqual(result["disks"], [{
+      "dev_type": constants.LD_LV,
+      "size": 4096,
+      "logical_id": ("vg", "disk6120"),
+      "params": constants.DISK_DT_DEFAULTS[inst.disk_template],
+      }, {
+      "dev_type": constants.LD_LV,
+      "size": 1024,
+      "logical_id": ("vg", "disk8508"),
+      "params": constants.DISK_DT_DEFAULTS[inst.disk_template],
+      }])
+
+    self.assertTrue(compat.all(disk.params == {} for disk in inst.disks),
+                    msg="Configuration objects were modified")
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()
 if __name__ == "__main__":
   testutils.GanetiTestProgram()