import testutils
-class TestTimeouts(unittest.TestCase):
- def test(self):
- names = [name[len("call_"):] for name in dir(rpc.RpcRunner)
- if name.startswith("call_")]
- self.assertEqual(len(names), len(rpc._TIMEOUTS))
- self.assertFalse([name for name in names
- if not (rpc._TIMEOUTS[name] is None or
- rpc._TIMEOUTS[name] > 0)])
-
-
-class FakeHttpPool:
+class _FakeRequestProcessor:
def __init__(self, response_fn):
self._response_fn = response_fn
self.reqcount = 0
- def ProcessRequests(self, reqs):
+ def __call__(self, reqs, lock_monitor_cb=None):
+ assert lock_monitor_cb is None or callable(lock_monitor_cb)
for req in reqs:
self.reqcount += 1
self._response_fn(req)
def testVersionSuccess(self):
resolver = rpc._StaticResolver(["127.0.0.1"])
- pool = FakeHttpPool(self._GetVersionResponse)
+ http_proc = _FakeRequestProcessor(self._GetVersionResponse)
proc = rpc._RpcProcessor(resolver, 24094)
- result = proc(["localhost"], "version", None, http_pool=pool)
+ result = proc(["localhost"], "version", None, _req_process_fn=http_proc,
+ read_timeout=60)
self.assertEqual(result.keys(), ["localhost"])
lhresp = result["localhost"]
self.assertFalse(lhresp.offline)
self.assertEqual(lhresp.payload, 123)
self.assertEqual(lhresp.call, "version")
lhresp.Raise("should not raise")
- self.assertEqual(pool.reqcount, 1)
+ self.assertEqual(http_proc.reqcount, 1)
def _ReadTimeoutResponse(self, req):
self.assertEqual(req.host, "192.0.2.13")
def testReadTimeout(self):
resolver = rpc._StaticResolver(["192.0.2.13"])
- pool = FakeHttpPool(self._ReadTimeoutResponse)
+ http_proc = _FakeRequestProcessor(self._ReadTimeoutResponse)
proc = rpc._RpcProcessor(resolver, 19176)
- result = proc(["node31856"], "version", None, http_pool=pool,
+ result = proc(["node31856"], "version", None, _req_process_fn=http_proc,
read_timeout=12356)
self.assertEqual(result.keys(), ["node31856"])
lhresp = result["node31856"]
self.assertEqual(lhresp.payload, -1)
self.assertEqual(lhresp.call, "version")
lhresp.Raise("should not raise")
- self.assertEqual(pool.reqcount, 1)
+ self.assertEqual(http_proc.reqcount, 1)
def testOfflineNode(self):
resolver = rpc._StaticResolver([rpc._OFFLINE])
- pool = FakeHttpPool(NotImplemented)
+ http_proc = _FakeRequestProcessor(NotImplemented)
proc = rpc._RpcProcessor(resolver, 30668)
- result = proc(["n17296"], "version", None, http_pool=pool)
+ result = proc(["n17296"], "version", None, _req_process_fn=http_proc,
+ read_timeout=60)
self.assertEqual(result.keys(), ["n17296"])
lhresp = result["n17296"]
self.assertTrue(lhresp.offline)
# No message
self.assertRaises(errors.OpExecError, lhresp.Raise, None)
- self.assertEqual(pool.reqcount, 0)
+ self.assertEqual(http_proc.reqcount, 0)
def _GetMultiVersionResponse(self, req):
self.assert_(req.host.startswith("node"))
def testMultiVersionSuccess(self):
nodes = ["node%s" % i for i in range(50)]
resolver = rpc._StaticResolver(nodes)
- pool = FakeHttpPool(self._GetMultiVersionResponse)
+ http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse)
proc = rpc._RpcProcessor(resolver, 23245)
- result = proc(nodes, "version", None, http_pool=pool)
+ result = proc(nodes, "version", None, _req_process_fn=http_proc,
+ read_timeout=60)
self.assertEqual(sorted(result.keys()), sorted(nodes))
for name in nodes:
self.assertEqual(lhresp.call, "version")
lhresp.Raise("should not raise")
- self.assertEqual(pool.reqcount, len(nodes))
+ self.assertEqual(http_proc.reqcount, len(nodes))
def _GetVersionResponseFail(self, errinfo, req):
self.assertEqual(req.path, "/version")
resolver = rpc._StaticResolver(["aef9ur4i.example.com"])
proc = rpc._RpcProcessor(resolver, 5903)
for errinfo in [None, "Unknown error"]:
- pool = FakeHttpPool(compat.partial(self._GetVersionResponseFail, errinfo))
- result = proc(["aef9ur4i.example.com"], "version", None, http_pool=pool)
+ http_proc = \
+ _FakeRequestProcessor(compat.partial(self._GetVersionResponseFail,
+ errinfo))
+ result = proc(["aef9ur4i.example.com"], "version", None,
+ _req_process_fn=http_proc, read_timeout=60)
self.assertEqual(result.keys(), ["aef9ur4i.example.com"])
lhresp = result["aef9ur4i.example.com"]
self.assertFalse(lhresp.offline)
self.assertFalse(lhresp.payload)
self.assertEqual(lhresp.call, "version")
self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
- self.assertEqual(pool.reqcount, 1)
+ self.assertEqual(http_proc.reqcount, 1)
def _GetHttpErrorResponse(self, httperrnodes, failnodes, req):
self.assertEqual(req.path, "/vg_list")
self.assertEqual(len(set(nodes) - failnodes - httperrnodes), 29)
proc = rpc._RpcProcessor(resolver, 15165)
- pool = FakeHttpPool(compat.partial(self._GetHttpErrorResponse,
- httperrnodes, failnodes))
- result = proc(nodes, "vg_list", None, http_pool=pool)
+ http_proc = \
+ _FakeRequestProcessor(compat.partial(self._GetHttpErrorResponse,
+ httperrnodes, failnodes))
+ result = proc(nodes, "vg_list", None, _req_process_fn=http_proc,
+ read_timeout=rpc._TMO_URGENT)
self.assertEqual(sorted(result.keys()), sorted(nodes))
for name in nodes:
self.assertEqual(lhresp.payload, hash(name))
lhresp.Raise("should not raise")
- self.assertEqual(pool.reqcount, len(nodes))
+ self.assertEqual(http_proc.reqcount, len(nodes))
def _GetInvalidResponseA(self, req):
self.assertEqual(req.path, "/version")
proc = rpc._RpcProcessor(resolver, 19978)
for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]:
- pool = FakeHttpPool(fn)
- result = proc(["oqo7lanhly.example.com"], "version", None, http_pool=pool)
+ http_proc = _FakeRequestProcessor(fn)
+ result = proc(["oqo7lanhly.example.com"], "version", None,
+ _req_process_fn=http_proc, read_timeout=60)
self.assertEqual(result.keys(), ["oqo7lanhly.example.com"])
lhresp = result["oqo7lanhly.example.com"]
self.assertFalse(lhresp.offline)
self.assertFalse(lhresp.payload)
self.assertEqual(lhresp.call, "version")
self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
- self.assertEqual(pool.reqcount, 1)
+ self.assertEqual(http_proc.reqcount, 1)
def _GetBodyTestResponse(self, test_data, req):
self.assertEqual(req.host, "192.0.2.84")
"xyz": range(10),
}
resolver = rpc._StaticResolver(["192.0.2.84"])
- pool = FakeHttpPool(compat.partial(self._GetBodyTestResponse, test_data))
+ http_proc = _FakeRequestProcessor(compat.partial(self._GetBodyTestResponse,
+ test_data))
proc = rpc._RpcProcessor(resolver, 18700)
body = serializer.DumpJson(test_data)
- result = proc(["node19759"], "upload_file", body, http_pool=pool)
+ result = proc(["node19759"], "upload_file", body, _req_process_fn=http_proc,
+ read_timeout=30)
self.assertEqual(result.keys(), ["node19759"])
lhresp = result["node19759"]
self.assertFalse(lhresp.offline)
self.assertEqual(lhresp.payload, None)
self.assertEqual(lhresp.call, "upload_file")
lhresp.Raise("should not raise")
- self.assertEqual(pool.reqcount, 1)
+ self.assertEqual(http_proc.reqcount, 1)
class TestSsconfResolver(unittest.TestCase):