4 # Copyright (C) 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.rpc"""
30 from ganeti import constants
31 from ganeti import compat
32 from ganeti import rpc
33 from ganeti import rpc_defs
34 from ganeti import http
35 from ganeti import errors
36 from ganeti import serializer
37 from ganeti import objects
38 from ganeti import backend
44 class _FakeRequestProcessor:
45 def __init__(self, response_fn):
46 self._response_fn = response_fn
49 def __call__(self, reqs, lock_monitor_cb=None):
50 assert lock_monitor_cb is None or callable(lock_monitor_cb)
53 self._response_fn(req)
56 def GetFakeSimpleStoreClass(fn):
57 class FakeSimpleStore:
58 GetNodePrimaryIPList = fn
59 GetPrimaryIPFamily = lambda _: None
61 return FakeSimpleStore
64 def _RaiseNotImplemented():
65 """Simple wrapper to raise NotImplementedError.
68 raise NotImplementedError
71 class TestRpcProcessor(unittest.TestCase):
72 def _FakeAddressLookup(self, map):
73 return lambda node_list: [map.get(node) for node in node_list]
75 def _GetVersionResponse(self, req):
76 self.assertEqual(req.host, "127.0.0.1")
77 self.assertEqual(req.port, 24094)
78 self.assertEqual(req.path, "/version")
79 self.assertEqual(req.read_timeout, constants.RPC_TMO_URGENT)
81 req.resp_status_code = http.HTTP_OK
82 req.resp_body = serializer.DumpJson((True, 123))
84 def testVersionSuccess(self):
85 resolver = rpc._StaticResolver(["127.0.0.1"])
86 http_proc = _FakeRequestProcessor(self._GetVersionResponse)
87 proc = rpc._RpcProcessor(resolver, 24094)
88 result = proc(["localhost"], "version", {"localhost": ""}, 60,
89 NotImplemented, _req_process_fn=http_proc)
90 self.assertEqual(result.keys(), ["localhost"])
91 lhresp = result["localhost"]
92 self.assertFalse(lhresp.offline)
93 self.assertEqual(lhresp.node, "localhost")
94 self.assertFalse(lhresp.fail_msg)
95 self.assertEqual(lhresp.payload, 123)
96 self.assertEqual(lhresp.call, "version")
97 lhresp.Raise("should not raise")
98 self.assertEqual(http_proc.reqcount, 1)
100 def _ReadTimeoutResponse(self, req):
101 self.assertEqual(req.host, "192.0.2.13")
102 self.assertEqual(req.port, 19176)
103 self.assertEqual(req.path, "/version")
104 self.assertEqual(req.read_timeout, 12356)
106 req.resp_status_code = http.HTTP_OK
107 req.resp_body = serializer.DumpJson((True, -1))
109 def testReadTimeout(self):
110 resolver = rpc._StaticResolver(["192.0.2.13"])
111 http_proc = _FakeRequestProcessor(self._ReadTimeoutResponse)
112 proc = rpc._RpcProcessor(resolver, 19176)
115 result = proc([host], "version", body, 12356, NotImplemented,
116 _req_process_fn=http_proc)
117 self.assertEqual(result.keys(), [host])
118 lhresp = result[host]
119 self.assertFalse(lhresp.offline)
120 self.assertEqual(lhresp.node, host)
121 self.assertFalse(lhresp.fail_msg)
122 self.assertEqual(lhresp.payload, -1)
123 self.assertEqual(lhresp.call, "version")
124 lhresp.Raise("should not raise")
125 self.assertEqual(http_proc.reqcount, 1)
127 def testOfflineNode(self):
128 resolver = rpc._StaticResolver([rpc._OFFLINE])
129 http_proc = _FakeRequestProcessor(NotImplemented)
130 proc = rpc._RpcProcessor(resolver, 30668)
133 result = proc([host], "version", body, 60, NotImplemented,
134 _req_process_fn=http_proc)
135 self.assertEqual(result.keys(), [host])
136 lhresp = result[host]
137 self.assertTrue(lhresp.offline)
138 self.assertEqual(lhresp.node, host)
139 self.assertTrue(lhresp.fail_msg)
140 self.assertFalse(lhresp.payload)
141 self.assertEqual(lhresp.call, "version")
144 self.assertRaises(errors.OpExecError, lhresp.Raise, "should raise")
147 self.assertRaises(errors.OpExecError, lhresp.Raise, None)
149 self.assertEqual(http_proc.reqcount, 0)
151 def _GetMultiVersionResponse(self, req):
152 self.assert_(req.host.startswith("node"))
153 self.assertEqual(req.port, 23245)
154 self.assertEqual(req.path, "/version")
156 req.resp_status_code = http.HTTP_OK
157 req.resp_body = serializer.DumpJson((True, 987))
159 def testMultiVersionSuccess(self):
160 nodes = ["node%s" % i for i in range(50)]
161 body = dict((n, "") for n in nodes)
162 resolver = rpc._StaticResolver(nodes)
163 http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse)
164 proc = rpc._RpcProcessor(resolver, 23245)
165 result = proc(nodes, "version", body, 60, NotImplemented,
166 _req_process_fn=http_proc)
167 self.assertEqual(sorted(result.keys()), sorted(nodes))
170 lhresp = result[name]
171 self.assertFalse(lhresp.offline)
172 self.assertEqual(lhresp.node, name)
173 self.assertFalse(lhresp.fail_msg)
174 self.assertEqual(lhresp.payload, 987)
175 self.assertEqual(lhresp.call, "version")
176 lhresp.Raise("should not raise")
178 self.assertEqual(http_proc.reqcount, len(nodes))
180 def _GetVersionResponseFail(self, errinfo, req):
181 self.assertEqual(req.path, "/version")
183 req.resp_status_code = http.HTTP_OK
184 req.resp_body = serializer.DumpJson((False, errinfo))
186 def testVersionFailure(self):
187 resolver = rpc._StaticResolver(["aef9ur4i.example.com"])
188 proc = rpc._RpcProcessor(resolver, 5903)
189 for errinfo in [None, "Unknown error"]:
191 _FakeRequestProcessor(compat.partial(self._GetVersionResponseFail,
193 host = "aef9ur4i.example.com"
195 result = proc(body.keys(), "version", body, 60, NotImplemented,
196 _req_process_fn=http_proc)
197 self.assertEqual(result.keys(), [host])
198 lhresp = result[host]
199 self.assertFalse(lhresp.offline)
200 self.assertEqual(lhresp.node, host)
201 self.assert_(lhresp.fail_msg)
202 self.assertFalse(lhresp.payload)
203 self.assertEqual(lhresp.call, "version")
204 self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
205 self.assertEqual(http_proc.reqcount, 1)
207 def _GetHttpErrorResponse(self, httperrnodes, failnodes, req):
208 self.assertEqual(req.path, "/vg_list")
209 self.assertEqual(req.port, 15165)
211 if req.host in httperrnodes:
213 req.error = "Node set up for HTTP errors"
215 elif req.host in failnodes:
217 req.resp_status_code = 404
218 req.resp_body = serializer.DumpJson({
220 "message": "Method not found",
221 "explain": "Explanation goes here",
225 req.resp_status_code = http.HTTP_OK
226 req.resp_body = serializer.DumpJson((True, hash(req.host)))
228 def testHttpError(self):
229 nodes = ["uaf6pbbv%s" % i for i in range(50)]
230 body = dict((n, "") for n in nodes)
231 resolver = rpc._StaticResolver(nodes)
233 httperrnodes = set(nodes[1::7])
234 self.assertEqual(len(httperrnodes), 7)
236 failnodes = set(nodes[2::3]) - httperrnodes
237 self.assertEqual(len(failnodes), 14)
239 self.assertEqual(len(set(nodes) - failnodes - httperrnodes), 29)
241 proc = rpc._RpcProcessor(resolver, 15165)
243 _FakeRequestProcessor(compat.partial(self._GetHttpErrorResponse,
244 httperrnodes, failnodes))
245 result = proc(nodes, "vg_list", body,
246 constants.RPC_TMO_URGENT, NotImplemented,
247 _req_process_fn=http_proc)
248 self.assertEqual(sorted(result.keys()), sorted(nodes))
251 lhresp = result[name]
252 self.assertFalse(lhresp.offline)
253 self.assertEqual(lhresp.node, name)
254 self.assertEqual(lhresp.call, "vg_list")
256 if name in httperrnodes:
257 self.assert_(lhresp.fail_msg)
258 self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
259 elif name in failnodes:
260 self.assert_(lhresp.fail_msg)
261 self.assertRaises(errors.OpPrereqError, lhresp.Raise, "failed",
262 prereq=True, ecode=errors.ECODE_INVAL)
264 self.assertFalse(lhresp.fail_msg)
265 self.assertEqual(lhresp.payload, hash(name))
266 lhresp.Raise("should not raise")
268 self.assertEqual(http_proc.reqcount, len(nodes))
270 def _GetInvalidResponseA(self, req):
271 self.assertEqual(req.path, "/version")
273 req.resp_status_code = http.HTTP_OK
274 req.resp_body = serializer.DumpJson(("This", "is", "an", "invalid",
275 "response", "!", 1, 2, 3))
277 def _GetInvalidResponseB(self, req):
278 self.assertEqual(req.path, "/version")
280 req.resp_status_code = http.HTTP_OK
281 req.resp_body = serializer.DumpJson("invalid response")
283 def testInvalidResponse(self):
284 resolver = rpc._StaticResolver(["oqo7lanhly.example.com"])
285 proc = rpc._RpcProcessor(resolver, 19978)
287 for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]:
288 http_proc = _FakeRequestProcessor(fn)
289 host = "oqo7lanhly.example.com"
291 result = proc([host], "version", body, 60, NotImplemented,
292 _req_process_fn=http_proc)
293 self.assertEqual(result.keys(), [host])
294 lhresp = result[host]
295 self.assertFalse(lhresp.offline)
296 self.assertEqual(lhresp.node, host)
297 self.assert_(lhresp.fail_msg)
298 self.assertFalse(lhresp.payload)
299 self.assertEqual(lhresp.call, "version")
300 self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
301 self.assertEqual(http_proc.reqcount, 1)
303 def _GetBodyTestResponse(self, test_data, req):
304 self.assertEqual(req.host, "192.0.2.84")
305 self.assertEqual(req.port, 18700)
306 self.assertEqual(req.path, "/upload_file")
307 self.assertEqual(serializer.LoadJson(req.post_data), test_data)
309 req.resp_status_code = http.HTTP_OK
310 req.resp_body = serializer.DumpJson((True, None))
312 def testResponseBody(self):
317 resolver = rpc._StaticResolver(["192.0.2.84"])
318 http_proc = _FakeRequestProcessor(compat.partial(self._GetBodyTestResponse,
320 proc = rpc._RpcProcessor(resolver, 18700)
322 body = {host: serializer.DumpJson(test_data)}
323 result = proc([host], "upload_file", body, 30, NotImplemented,
324 _req_process_fn=http_proc)
325 self.assertEqual(result.keys(), [host])
326 lhresp = result[host]
327 self.assertFalse(lhresp.offline)
328 self.assertEqual(lhresp.node, host)
329 self.assertFalse(lhresp.fail_msg)
330 self.assertEqual(lhresp.payload, None)
331 self.assertEqual(lhresp.call, "upload_file")
332 lhresp.Raise("should not raise")
333 self.assertEqual(http_proc.reqcount, 1)
336 class TestSsconfResolver(unittest.TestCase):
337 def testSsconfLookup(self):
338 addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
339 node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
340 node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
341 ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
342 result = rpc._SsconfResolver(True, node_list, NotImplemented,
343 ssc=ssc, nslookup_fn=NotImplemented)
344 self.assertEqual(result, zip(node_list, addr_list, node_list))
346 def testNsLookup(self):
347 addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
348 node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
349 ssc = GetFakeSimpleStoreClass(lambda _: [])
350 node_addr_map = dict(zip(node_list, addr_list))
351 nslookup_fn = lambda name, family=None: node_addr_map.get(name)
352 result = rpc._SsconfResolver(True, node_list, NotImplemented,
353 ssc=ssc, nslookup_fn=nslookup_fn)
354 self.assertEqual(result, zip(node_list, addr_list, node_list))
356 def testDisabledSsconfIp(self):
357 addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
358 node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
359 ssc = GetFakeSimpleStoreClass(_RaiseNotImplemented)
360 node_addr_map = dict(zip(node_list, addr_list))
361 nslookup_fn = lambda name, family=None: node_addr_map.get(name)
362 result = rpc._SsconfResolver(False, node_list, NotImplemented,
363 ssc=ssc, nslookup_fn=nslookup_fn)
364 self.assertEqual(result, zip(node_list, addr_list, node_list))
366 def testBothLookups(self):
367 addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
368 node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
369 n = len(addr_list) / 2
370 node_addr_list = [" ".join(t) for t in zip(node_list[n:], addr_list[n:])]
371 ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
372 node_addr_map = dict(zip(node_list[:n], addr_list[:n]))
373 nslookup_fn = lambda name, family=None: node_addr_map.get(name)
374 result = rpc._SsconfResolver(True, node_list, NotImplemented,
375 ssc=ssc, nslookup_fn=nslookup_fn)
376 self.assertEqual(result, zip(node_list, addr_list, node_list))
378 def testAddressLookupIPv6(self):
379 addr_list = ["2001:db8::%d" % n for n in range(0, 255, 11)]
380 node_list = ["node%d.example.com" % n for n in range(0, 255, 11)]
381 node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
382 ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
383 result = rpc._SsconfResolver(True, node_list, NotImplemented,
384 ssc=ssc, nslookup_fn=NotImplemented)
385 self.assertEqual(result, zip(node_list, addr_list, node_list))
388 class TestStaticResolver(unittest.TestCase):
390 addresses = ["192.0.2.%d" % n for n in range(0, 123, 7)]
391 nodes = ["node%s.example.com" % n for n in range(0, 123, 7)]
392 res = rpc._StaticResolver(addresses)
393 self.assertEqual(res(nodes, NotImplemented), zip(nodes, addresses, nodes))
395 def testWrongLength(self):
396 res = rpc._StaticResolver([])
397 self.assertRaises(AssertionError, res, ["abc"], NotImplemented)
400 class TestNodeConfigResolver(unittest.TestCase):
402 def _GetSingleOnlineNode(uuid):
403 assert uuid == "node90-uuid"
404 return objects.Node(name="node90.example.com",
407 primary_ip="192.0.2.90")
410 def _GetSingleOfflineNode(uuid):
411 assert uuid == "node100-uuid"
412 return objects.Node(name="node100.example.com",
415 primary_ip="192.0.2.100")
417 def testSingleOnline(self):
418 self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOnlineNode,
420 ["node90-uuid"], None),
421 [("node90.example.com", "192.0.2.90", "node90-uuid")])
423 def testSingleOffline(self):
424 self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOfflineNode,
426 ["node100-uuid"], None),
427 [("node100.example.com", rpc._OFFLINE, "node100-uuid")])
429 def testSingleOfflineWithAcceptOffline(self):
430 fn = self._GetSingleOfflineNode
431 assert fn("node100-uuid").offline
432 self.assertEqual(rpc._NodeConfigResolver(fn, NotImplemented,
434 rpc_defs.ACCEPT_OFFLINE_NODE),
435 [("node100.example.com", "192.0.2.100", "node100-uuid")])
436 for i in [False, True, "", "Hello", 0, 1]:
437 self.assertRaises(AssertionError, rpc._NodeConfigResolver,
438 fn, NotImplemented, ["node100.example.com"], i)
440 def testUnknownSingleNode(self):
441 self.assertEqual(rpc._NodeConfigResolver(lambda _: None, NotImplemented,
442 ["node110.example.com"], None),
443 [("node110.example.com", "node110.example.com",
444 "node110.example.com")])
446 def testMultiEmpty(self):
447 self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
452 def testMultiSomeOffline(self):
453 nodes = dict(("node%s-uuid" % i,
454 objects.Node(name="node%s.example.com" % i,
455 offline=((i % 3) == 0),
456 primary_ip="192.0.2.%s" % i,
457 uuid="node%s-uuid" % i))
458 for i in range(1, 255))
461 self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
466 # Offline, online and unknown hosts
467 self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
472 "unknown.example.com",],
474 ("node3.example.com", rpc._OFFLINE, "node3-uuid"),
475 ("node92.example.com", "192.0.2.92", "node92-uuid"),
476 ("node54.example.com", rpc._OFFLINE, "node54-uuid"),
477 ("unknown.example.com", "unknown.example.com", "unknown.example.com"),
481 class TestCompress(unittest.TestCase):
483 for data in ["", "Hello", "Hello World!\nnew\nlines"]:
484 self.assertEqual(rpc._Compress(data),
485 (constants.RPC_ENCODING_NONE, data))
487 for data in [512 * " ", 5242 * "Hello World!\n"]:
488 compressed = rpc._Compress(data)
489 self.assertEqual(len(compressed), 2)
490 self.assertEqual(backend._Decompress(compressed), data)
492 def testDecompression(self):
493 self.assertRaises(AssertionError, backend._Decompress, "")
494 self.assertRaises(AssertionError, backend._Decompress, [""])
495 self.assertRaises(AssertionError, backend._Decompress,
496 ("unknown compression", "data"))
497 self.assertRaises(Exception, backend._Decompress,
498 (constants.RPC_ENCODING_ZLIB_BASE64, "invalid zlib data"))
501 class TestRpcClientBase(unittest.TestCase):
502 def testNoHosts(self):
503 cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_SLOW, [],
504 None, None, NotImplemented)
505 http_proc = _FakeRequestProcessor(NotImplemented)
506 client = rpc._RpcClientBase(rpc._StaticResolver([]), NotImplemented,
507 _req_process_fn=http_proc)
508 self.assertEqual(client._Call(cdef, [], []), {})
510 # Test wrong number of arguments
511 self.assertRaises(errors.ProgrammerError, client._Call,
514 def testTimeout(self):
515 def _CalcTimeout((arg1, arg2)):
518 def _VerifyRequest(exp_timeout, req):
519 self.assertEqual(req.read_timeout, exp_timeout)
522 req.resp_status_code = http.HTTP_OK
523 req.resp_body = serializer.DumpJson((True, hex(req.read_timeout)))
525 resolver = rpc._StaticResolver([
535 tests = [(100, None, 100), (30, None, 30)]
536 tests.extend((_CalcTimeout, i, i + 300)
537 for i in [0, 5, 16485, 30516])
539 for timeout, arg1, exp_timeout in tests:
540 cdef = ("test_call", NotImplemented, None, timeout, [
541 ("arg1", None, NotImplemented),
542 ("arg2", None, NotImplemented),
543 ], None, None, NotImplemented)
545 http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest,
547 client = rpc._RpcClientBase(resolver, NotImplemented,
548 _req_process_fn=http_proc)
549 result = client._Call(cdef, nodes, [arg1, 300])
550 self.assertEqual(len(result), len(nodes))
551 self.assertTrue(compat.all(not res.fail_msg and
552 res.payload == hex(exp_timeout)
553 for res in result.values()))
555 def testArgumentEncoder(self):
556 (AT1, AT2) = range(1, 3)
558 resolver = rpc._StaticResolver([
573 cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [
574 ("arg0", None, NotImplemented),
575 ("arg1", AT1, NotImplemented),
576 ("arg1", AT2, NotImplemented),
577 ], None, None, NotImplemented)
579 def _VerifyRequest(req):
581 req.resp_status_code = http.HTTP_OK
582 req.resp_body = serializer.DumpJson((True, req.post_data))
584 http_proc = _FakeRequestProcessor(_VerifyRequest)
586 for num in [0, 3796, 9032119]:
587 client = rpc._RpcClientBase(resolver, encoders.get,
588 _req_process_fn=http_proc)
589 result = client._Call(cdef, nodes, ["foo", num, "Hello%s" % num])
590 self.assertEqual(len(result), len(nodes))
591 for res in result.values():
592 self.assertFalse(res.fail_msg)
593 self.assertEqual(serializer.LoadJson(res.payload),
594 ["foo", hex(num), hash("Hello%s" % num)])
596 def testPostProc(self):
597 def _VerifyRequest(nums, req):
599 req.resp_status_code = http.HTTP_OK
600 req.resp_body = serializer.DumpJson((True, nums))
602 resolver = rpc._StaticResolver([
608 "node90.example.com",
609 "node95.example.com",
613 self.assertFalse(res.fail_msg)
614 res.payload = sum(res.payload)
617 cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [],
618 None, _PostProc, NotImplemented)
620 # Seeded random generator
621 rnd = random.Random(20299)
623 for i in [0, 4, 74, 1391]:
624 nums = [rnd.randint(0, 1000) for _ in range(i)]
625 http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest, nums))
626 client = rpc._RpcClientBase(resolver, NotImplemented,
627 _req_process_fn=http_proc)
628 result = client._Call(cdef, nodes, [])
629 self.assertEqual(len(result), len(nodes))
630 for res in result.values():
631 self.assertFalse(res.fail_msg)
632 self.assertEqual(res.payload, sum(nums))
634 def testPreProc(self):
635 def _VerifyRequest(req):
637 req.resp_status_code = http.HTTP_OK
638 req.resp_body = serializer.DumpJson((True, req.post_data))
640 resolver = rpc._StaticResolver([
646 "node30.example.com",
647 "node35.example.com",
650 def _PreProc(node, data):
651 self.assertEqual(len(data), 1)
652 return data[0] + node
654 cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [
655 ("arg0", None, NotImplemented),
656 ], _PreProc, None, NotImplemented)
658 http_proc = _FakeRequestProcessor(_VerifyRequest)
659 client = rpc._RpcClientBase(resolver, NotImplemented,
660 _req_process_fn=http_proc)
662 for prefix in ["foo", "bar", "baz"]:
663 result = client._Call(cdef, nodes, [prefix])
664 self.assertEqual(len(result), len(nodes))
665 for (idx, (node, res)) in enumerate(result.items()):
666 self.assertFalse(res.fail_msg)
667 self.assertEqual(serializer.LoadJson(res.payload), prefix + node)
669 def testResolverOptions(self):
670 def _VerifyRequest(req):
672 req.resp_status_code = http.HTTP_OK
673 req.resp_body = serializer.DumpJson((True, req.post_data))
676 "node30.example.com",
677 "node35.example.com",
680 def _Resolver(expected, hosts, options):
681 self.assertEqual(hosts, nodes)
682 self.assertEqual(options, expected)
683 return zip(hosts, nodes, hosts)
685 def _DynamicResolverOptions((arg0, )):
690 (rpc_defs.ACCEPT_OFFLINE_NODE, None, rpc_defs.ACCEPT_OFFLINE_NODE),
691 (False, None, False),
694 (_DynamicResolverOptions, [1, 2, 3], 6),
695 (_DynamicResolverOptions, range(4, 19), 165),
698 for (resolver_opts, arg0, expected) in tests:
699 cdef = ("test_call", NotImplemented, resolver_opts,
700 constants.RPC_TMO_NORMAL, [
701 ("arg0", None, NotImplemented),
702 ], None, None, NotImplemented)
704 http_proc = _FakeRequestProcessor(_VerifyRequest)
706 client = rpc._RpcClientBase(compat.partial(_Resolver, expected),
707 NotImplemented, _req_process_fn=http_proc)
708 result = client._Call(cdef, nodes, [arg0])
709 self.assertEqual(len(result), len(nodes))
710 for (idx, (node, res)) in enumerate(result.items()):
711 self.assertFalse(res.fail_msg)
714 class _FakeConfigForRpcRunner:
715 GetAllNodesInfo = NotImplemented
717 def __init__(self, cluster=NotImplemented):
718 self._cluster = cluster
720 def GetNodeInfo(self, name):
721 return objects.Node(name=name)
723 def GetClusterInfo(self):
726 def GetInstanceDiskParams(self, _):
727 return constants.DISK_DT_DEFAULTS
730 class TestRpcRunner(unittest.TestCase):
731 def testUploadFile(self):
732 data = 1779 * "Hello World\n"
734 tmpfile = tempfile.NamedTemporaryFile()
737 st = os.stat(tmpfile.name)
739 def _VerifyRequest(req):
740 (uldata, ) = serializer.LoadJson(req.post_data)
741 self.assertEqual(len(uldata), 7)
742 self.assertEqual(uldata[0], tmpfile.name)
743 self.assertEqual(list(uldata[1]), list(rpc._Compress(data)))
744 self.assertEqual(uldata[2], st.st_mode)
745 self.assertEqual(uldata[3], "user%s" % os.getuid())
746 self.assertEqual(uldata[4], "group%s" % os.getgid())
747 self.assertTrue(uldata[5] is not None)
748 self.assertEqual(uldata[6], st.st_mtime)
751 req.resp_status_code = http.HTTP_OK
752 req.resp_body = serializer.DumpJson((True, None))
754 http_proc = _FakeRequestProcessor(_VerifyRequest)
756 std_runner = rpc.RpcRunner(_FakeConfigForRpcRunner(), None,
757 _req_process_fn=http_proc,
758 _getents=mocks.FakeGetentResolver)
760 cfg_runner = rpc.ConfigRunner(None, ["192.0.2.13"],
761 _req_process_fn=http_proc,
762 _getents=mocks.FakeGetentResolver)
768 for runner in [std_runner, cfg_runner]:
769 result = runner.call_upload_file(nodes, tmpfile.name)
770 self.assertEqual(len(result), len(nodes))
771 for (idx, (node, res)) in enumerate(result.items()):
772 self.assertFalse(res.fail_msg)
774 def testEncodeInstance(self):
775 cluster = objects.Cluster(hvparams={
777 constants.HV_BLOCKDEV_PREFIX: "foo",
781 constants.PP_DEFAULT: {
782 constants.BE_MAXMEM: 8192,
791 cluster.UpgradeConfig()
793 inst = objects.Instance(name="inst1.example.com",
794 hypervisor=constants.HT_FAKE,
798 constants.HV_BLOCKDEV_PREFIX: "bar",
799 constants.HV_ROOT_PATH: "/tmp",
803 constants.BE_MINMEM: 128,
804 constants.BE_MAXMEM: 256,
807 objects.NIC(nicparams={
808 constants.NIC_MODE: "mymode",
811 disk_template=constants.DT_PLAIN,
813 objects.Disk(dev_type=constants.LD_LV, size=4096,
814 logical_id=("vg", "disk6120")),
815 objects.Disk(dev_type=constants.LD_LV, size=1024,
816 logical_id=("vg", "disk8508")),
820 cfg = _FakeConfigForRpcRunner(cluster=cluster)
821 runner = rpc.RpcRunner(cfg, None,
822 _req_process_fn=NotImplemented,
823 _getents=mocks.FakeGetentResolver)
825 def _CheckBasics(result):
826 self.assertEqual(result["name"], "inst1.example.com")
827 self.assertEqual(result["os"], "linux")
828 self.assertEqual(result["beparams"][constants.BE_MINMEM], 128)
829 self.assertEqual(len(result["hvparams"]), 1)
830 self.assertEqual(len(result["nics"]), 1)
831 self.assertEqual(result["nics"][0]["nicparams"][constants.NIC_MODE],
834 # Generic object serialization
835 result = runner._encoder((rpc_defs.ED_OBJECT_DICT, inst))
838 result = runner._encoder((rpc_defs.ED_OBJECT_DICT_LIST, 5 * [inst]))
839 map(_CheckBasics, result)
842 result = runner._encoder((rpc_defs.ED_INST_DICT, inst))
844 self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256)
845 self.assertEqual(result["hvparams"][constants.HT_KVM], {
846 constants.HV_BLOCKDEV_PREFIX: "bar",
847 constants.HV_ROOT_PATH: "/tmp",
849 self.assertEqual(result["osparams"], {
853 # Instance with OS parameters
854 result = runner._encoder((rpc_defs.ED_INST_DICT_OSP_DP, (inst, {
859 self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256)
860 self.assertEqual(result["hvparams"][constants.HT_KVM], {
861 constants.HV_BLOCKDEV_PREFIX: "bar",
862 constants.HV_ROOT_PATH: "/tmp",
864 self.assertEqual(result["osparams"], {
869 # Instance with hypervisor and backend parameters
870 result = runner._encoder((rpc_defs.ED_INST_DICT_HVP_BEP_DP, (inst, {
872 constants.HV_BOOT_ORDER: "xyz",
875 constants.BE_VCPUS: 100,
876 constants.BE_MAXMEM: 4096,
879 self.assertEqual(result["beparams"][constants.BE_MAXMEM], 4096)
880 self.assertEqual(result["beparams"][constants.BE_VCPUS], 100)
881 self.assertEqual(result["hvparams"][constants.HT_KVM], {
882 constants.HV_BOOT_ORDER: "xyz",
884 self.assertEqual(result["disks"], [{
885 "dev_type": constants.LD_LV,
887 "logical_id": ("vg", "disk6120"),
888 "params": constants.DISK_DT_DEFAULTS[inst.disk_template],
890 "dev_type": constants.LD_LV,
892 "logical_id": ("vg", "disk8508"),
893 "params": constants.DISK_DT_DEFAULTS[inst.disk_template],
896 self.assertTrue(compat.all(disk.params == {} for disk in inst.disks),
897 msg="Configuration objects were modified")
900 class TestLegacyNodeInfo(unittest.TestCase):
906 KEY_SP1 = "spindles_free"
907 KEY_SP2 = "spindles_total"
921 DICT_HV = {KEY_HV: VAL_HV}
927 STD_LST = [VAL_BOOT, [DICT_VG, DICT_SP], [DICT_HV]]
938 def testStandard(self):
939 result = rpc.MakeLegacyNodeInfo(self.STD_LST)
940 self.assertEqual(result, self.STD_DICT)
943 my_lst = [self.VAL_BOOT, [], [self.DICT_HV]]
944 self.assertRaises(ValueError, rpc.MakeLegacyNodeInfo, my_lst)
946 def testNoReqVg(self):
947 my_lst = [self.VAL_BOOT, [], [self.DICT_HV]]
948 result = rpc.MakeLegacyNodeInfo(my_lst, require_vg_info = False)
949 self.assertEqual(result, {self.KEY_BOOT: self.VAL_BOOT,
950 self.KEY_HV: self.VAL_HV})
951 result = rpc.MakeLegacyNodeInfo(self.STD_LST, require_vg_info = False)
952 self.assertEqual(result, self.STD_DICT)
955 if __name__ == "__main__":
956 testutils.GanetiTestProgram()