Revision abbf2cd9 test/ganeti.rpc_unittest.py

b/test/ganeti.rpc_unittest.py
46 46
                              rpc._TIMEOUTS[name] > 0)])
47 47

  
48 48

  
49
class FakeHttpPool:
49
class _FakeRequestProcessor:
50 50
  def __init__(self, response_fn):
51 51
    self._response_fn = response_fn
52 52
    self.reqcount = 0
53 53

  
54
  def ProcessRequests(self, reqs, lock_monitor_cb=None):
54
  def __call__(self, reqs, lock_monitor_cb=None):
55
    assert lock_monitor_cb is None or callable(lock_monitor_cb)
55 56
    for req in reqs:
56 57
      self.reqcount += 1
57 58
      self._response_fn(req)
......
80 81

  
81 82
  def testVersionSuccess(self):
82 83
    resolver = rpc._StaticResolver(["127.0.0.1"])
83
    pool = FakeHttpPool(self._GetVersionResponse)
84
    http_proc = _FakeRequestProcessor(self._GetVersionResponse)
84 85
    proc = rpc._RpcProcessor(resolver, 24094)
85
    result = proc(["localhost"], "version", None, http_pool=pool)
86
    result = proc(["localhost"], "version", None, _req_process_fn=http_proc)
86 87
    self.assertEqual(result.keys(), ["localhost"])
87 88
    lhresp = result["localhost"]
88 89
    self.assertFalse(lhresp.offline)
......
91 92
    self.assertEqual(lhresp.payload, 123)
92 93
    self.assertEqual(lhresp.call, "version")
93 94
    lhresp.Raise("should not raise")
94
    self.assertEqual(pool.reqcount, 1)
95
    self.assertEqual(http_proc.reqcount, 1)
95 96

  
96 97
  def _ReadTimeoutResponse(self, req):
97 98
    self.assertEqual(req.host, "192.0.2.13")
......
104 105

  
105 106
  def testReadTimeout(self):
106 107
    resolver = rpc._StaticResolver(["192.0.2.13"])
107
    pool = FakeHttpPool(self._ReadTimeoutResponse)
108
    http_proc = _FakeRequestProcessor(self._ReadTimeoutResponse)
108 109
    proc = rpc._RpcProcessor(resolver, 19176)
109
    result = proc(["node31856"], "version", None, http_pool=pool,
110
    result = proc(["node31856"], "version", None, _req_process_fn=http_proc,
110 111
                  read_timeout=12356)
111 112
    self.assertEqual(result.keys(), ["node31856"])
112 113
    lhresp = result["node31856"]
......
116 117
    self.assertEqual(lhresp.payload, -1)
117 118
    self.assertEqual(lhresp.call, "version")
118 119
    lhresp.Raise("should not raise")
119
    self.assertEqual(pool.reqcount, 1)
120
    self.assertEqual(http_proc.reqcount, 1)
120 121

  
121 122
  def testOfflineNode(self):
122 123
    resolver = rpc._StaticResolver([rpc._OFFLINE])
123
    pool = FakeHttpPool(NotImplemented)
124
    http_proc = _FakeRequestProcessor(NotImplemented)
124 125
    proc = rpc._RpcProcessor(resolver, 30668)
125
    result = proc(["n17296"], "version", None, http_pool=pool)
126
    result = proc(["n17296"], "version", None, _req_process_fn=http_proc)
126 127
    self.assertEqual(result.keys(), ["n17296"])
127 128
    lhresp = result["n17296"]
128 129
    self.assertTrue(lhresp.offline)
......
137 138
    # No message
138 139
    self.assertRaises(errors.OpExecError, lhresp.Raise, None)
139 140

  
140
    self.assertEqual(pool.reqcount, 0)
141
    self.assertEqual(http_proc.reqcount, 0)
141 142

  
142 143
  def _GetMultiVersionResponse(self, req):
143 144
    self.assert_(req.host.startswith("node"))
......
150 151
  def testMultiVersionSuccess(self):
151 152
    nodes = ["node%s" % i for i in range(50)]
152 153
    resolver = rpc._StaticResolver(nodes)
153
    pool = FakeHttpPool(self._GetMultiVersionResponse)
154
    http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse)
154 155
    proc = rpc._RpcProcessor(resolver, 23245)
155
    result = proc(nodes, "version", None, http_pool=pool)
156
    result = proc(nodes, "version", None, _req_process_fn=http_proc)
156 157
    self.assertEqual(sorted(result.keys()), sorted(nodes))
157 158

  
158 159
    for name in nodes:
......
164 165
      self.assertEqual(lhresp.call, "version")
165 166
      lhresp.Raise("should not raise")
166 167

  
167
    self.assertEqual(pool.reqcount, len(nodes))
168
    self.assertEqual(http_proc.reqcount, len(nodes))
168 169

  
169 170
  def _GetVersionResponseFail(self, errinfo, req):
170 171
    self.assertEqual(req.path, "/version")
......
176 177
    resolver = rpc._StaticResolver(["aef9ur4i.example.com"])
177 178
    proc = rpc._RpcProcessor(resolver, 5903)
178 179
    for errinfo in [None, "Unknown error"]:
179
      pool = FakeHttpPool(compat.partial(self._GetVersionResponseFail, errinfo))
180
      result = proc(["aef9ur4i.example.com"], "version", None, http_pool=pool)
180
      http_proc = \
181
        _FakeRequestProcessor(compat.partial(self._GetVersionResponseFail,
182
                                             errinfo))
183
      result = proc(["aef9ur4i.example.com"], "version", None,
184
                    _req_process_fn=http_proc)
181 185
      self.assertEqual(result.keys(), ["aef9ur4i.example.com"])
182 186
      lhresp = result["aef9ur4i.example.com"]
183 187
      self.assertFalse(lhresp.offline)
......
186 190
      self.assertFalse(lhresp.payload)
187 191
      self.assertEqual(lhresp.call, "version")
188 192
      self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
189
      self.assertEqual(pool.reqcount, 1)
193
      self.assertEqual(http_proc.reqcount, 1)
190 194

  
191 195
  def _GetHttpErrorResponse(self, httperrnodes, failnodes, req):
192 196
    self.assertEqual(req.path, "/vg_list")
......
222 226
    self.assertEqual(len(set(nodes) - failnodes - httperrnodes), 29)
223 227

  
224 228
    proc = rpc._RpcProcessor(resolver, 15165)
225
    pool = FakeHttpPool(compat.partial(self._GetHttpErrorResponse,
226
                                       httperrnodes, failnodes))
227
    result = proc(nodes, "vg_list", None, http_pool=pool)
229
    http_proc = \
230
      _FakeRequestProcessor(compat.partial(self._GetHttpErrorResponse,
231
                                           httperrnodes, failnodes))
232
    result = proc(nodes, "vg_list", None, _req_process_fn=http_proc)
228 233
    self.assertEqual(sorted(result.keys()), sorted(nodes))
229 234

  
230 235
    for name in nodes:
......
245 250
        self.assertEqual(lhresp.payload, hash(name))
246 251
        lhresp.Raise("should not raise")
247 252

  
248
    self.assertEqual(pool.reqcount, len(nodes))
253
    self.assertEqual(http_proc.reqcount, len(nodes))
249 254

  
250 255
  def _GetInvalidResponseA(self, req):
251 256
    self.assertEqual(req.path, "/version")
......
265 270
    proc = rpc._RpcProcessor(resolver, 19978)
266 271

  
267 272
    for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]:
268
      pool = FakeHttpPool(fn)
269
      result = proc(["oqo7lanhly.example.com"], "version", None, http_pool=pool)
273
      http_proc = _FakeRequestProcessor(fn)
274
      result = proc(["oqo7lanhly.example.com"], "version", None,
275
                    _req_process_fn=http_proc)
270 276
      self.assertEqual(result.keys(), ["oqo7lanhly.example.com"])
271 277
      lhresp = result["oqo7lanhly.example.com"]
272 278
      self.assertFalse(lhresp.offline)
......
275 281
      self.assertFalse(lhresp.payload)
276 282
      self.assertEqual(lhresp.call, "version")
277 283
      self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
278
      self.assertEqual(pool.reqcount, 1)
284
      self.assertEqual(http_proc.reqcount, 1)
279 285

  
280 286
  def _GetBodyTestResponse(self, test_data, req):
281 287
    self.assertEqual(req.host, "192.0.2.84")
......
292 298
      "xyz": range(10),
293 299
      }
294 300
    resolver = rpc._StaticResolver(["192.0.2.84"])
295
    pool = FakeHttpPool(compat.partial(self._GetBodyTestResponse, test_data))
301
    http_proc = _FakeRequestProcessor(compat.partial(self._GetBodyTestResponse,
302
                                                     test_data))
296 303
    proc = rpc._RpcProcessor(resolver, 18700)
297 304
    body = serializer.DumpJson(test_data)
298
    result = proc(["node19759"], "upload_file", body, http_pool=pool)
305
    result = proc(["node19759"], "upload_file", body, _req_process_fn=http_proc)
299 306
    self.assertEqual(result.keys(), ["node19759"])
300 307
    lhresp = result["node19759"]
301 308
    self.assertFalse(lhresp.offline)
......
304 311
    self.assertEqual(lhresp.payload, None)
305 312
    self.assertEqual(lhresp.call, "upload_file")
306 313
    lhresp.Raise("should not raise")
307
    self.assertEqual(pool.reqcount, 1)
314
    self.assertEqual(http_proc.reqcount, 1)
308 315

  
309 316

  
310 317
class TestSsconfResolver(unittest.TestCase):

Also available in: Unified diff