4 # Copyright (C) 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.rpc"""
30 from ganeti import constants
31 from ganeti import compat
32 from ganeti import rpc
33 from ganeti import rpc_defs
34 from ganeti import http
35 from ganeti import errors
36 from ganeti import serializer
37 from ganeti import objects
38 from ganeti import backend
44 class _FakeRequestProcessor:
45 def __init__(self, response_fn):
46 self._response_fn = response_fn
49 def __call__(self, reqs, lock_monitor_cb=None):
50 assert lock_monitor_cb is None or callable(lock_monitor_cb)
53 self._response_fn(req)
56 def GetFakeSimpleStoreClass(fn):
57 class FakeSimpleStore:
58 GetNodePrimaryIPList = fn
59 GetPrimaryIPFamily = lambda _: None
61 return FakeSimpleStore
64 def _RaiseNotImplemented():
65 """Simple wrapper to raise NotImplementedError.
68 raise NotImplementedError
71 class TestRpcProcessor(unittest.TestCase):
72 def _FakeAddressLookup(self, map):
73 return lambda node_list: [map.get(node) for node in node_list]
75 def _GetVersionResponse(self, req):
76 self.assertEqual(req.host, "127.0.0.1")
77 self.assertEqual(req.port, 24094)
78 self.assertEqual(req.path, "/version")
79 self.assertEqual(req.read_timeout, constants.RPC_TMO_URGENT)
81 req.resp_status_code = http.HTTP_OK
82 req.resp_body = serializer.DumpJson((True, 123))
84 def testVersionSuccess(self):
85 resolver = rpc._StaticResolver(["127.0.0.1"])
86 http_proc = _FakeRequestProcessor(self._GetVersionResponse)
87 proc = rpc._RpcProcessor(resolver, 24094)
88 result = proc(["localhost"], "version", {"localhost": ""}, 60,
89 NotImplemented, _req_process_fn=http_proc)
90 self.assertEqual(result.keys(), ["localhost"])
91 lhresp = result["localhost"]
92 self.assertFalse(lhresp.offline)
93 self.assertEqual(lhresp.node, "localhost")
94 self.assertFalse(lhresp.fail_msg)
95 self.assertEqual(lhresp.payload, 123)
96 self.assertEqual(lhresp.call, "version")
97 lhresp.Raise("should not raise")
98 self.assertEqual(http_proc.reqcount, 1)
100 def _ReadTimeoutResponse(self, req):
101 self.assertEqual(req.host, "192.0.2.13")
102 self.assertEqual(req.port, 19176)
103 self.assertEqual(req.path, "/version")
104 self.assertEqual(req.read_timeout, 12356)
106 req.resp_status_code = http.HTTP_OK
107 req.resp_body = serializer.DumpJson((True, -1))
109 def testReadTimeout(self):
110 resolver = rpc._StaticResolver(["192.0.2.13"])
111 http_proc = _FakeRequestProcessor(self._ReadTimeoutResponse)
112 proc = rpc._RpcProcessor(resolver, 19176)
115 result = proc([host], "version", body, 12356, NotImplemented,
116 _req_process_fn=http_proc)
117 self.assertEqual(result.keys(), [host])
118 lhresp = result[host]
119 self.assertFalse(lhresp.offline)
120 self.assertEqual(lhresp.node, host)
121 self.assertFalse(lhresp.fail_msg)
122 self.assertEqual(lhresp.payload, -1)
123 self.assertEqual(lhresp.call, "version")
124 lhresp.Raise("should not raise")
125 self.assertEqual(http_proc.reqcount, 1)
127 def testOfflineNode(self):
128 resolver = rpc._StaticResolver([rpc._OFFLINE])
129 http_proc = _FakeRequestProcessor(NotImplemented)
130 proc = rpc._RpcProcessor(resolver, 30668)
133 result = proc([host], "version", body, 60, NotImplemented,
134 _req_process_fn=http_proc)
135 self.assertEqual(result.keys(), [host])
136 lhresp = result[host]
137 self.assertTrue(lhresp.offline)
138 self.assertEqual(lhresp.node, host)
139 self.assertTrue(lhresp.fail_msg)
140 self.assertFalse(lhresp.payload)
141 self.assertEqual(lhresp.call, "version")
144 self.assertRaises(errors.OpExecError, lhresp.Raise, "should raise")
147 self.assertRaises(errors.OpExecError, lhresp.Raise, None)
149 self.assertEqual(http_proc.reqcount, 0)
151 def _GetMultiVersionResponse(self, req):
152 self.assert_(req.host.startswith("node"))
153 self.assertEqual(req.port, 23245)
154 self.assertEqual(req.path, "/version")
156 req.resp_status_code = http.HTTP_OK
157 req.resp_body = serializer.DumpJson((True, 987))
159 def testMultiVersionSuccess(self):
160 nodes = ["node%s" % i for i in range(50)]
161 body = dict((n, "") for n in nodes)
162 resolver = rpc._StaticResolver(nodes)
163 http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse)
164 proc = rpc._RpcProcessor(resolver, 23245)
165 result = proc(nodes, "version", body, 60, NotImplemented,
166 _req_process_fn=http_proc)
167 self.assertEqual(sorted(result.keys()), sorted(nodes))
170 lhresp = result[name]
171 self.assertFalse(lhresp.offline)
172 self.assertEqual(lhresp.node, name)
173 self.assertFalse(lhresp.fail_msg)
174 self.assertEqual(lhresp.payload, 987)
175 self.assertEqual(lhresp.call, "version")
176 lhresp.Raise("should not raise")
178 self.assertEqual(http_proc.reqcount, len(nodes))
180 def _GetVersionResponseFail(self, errinfo, req):
181 self.assertEqual(req.path, "/version")
183 req.resp_status_code = http.HTTP_OK
184 req.resp_body = serializer.DumpJson((False, errinfo))
186 def testVersionFailure(self):
187 resolver = rpc._StaticResolver(["aef9ur4i.example.com"])
188 proc = rpc._RpcProcessor(resolver, 5903)
189 for errinfo in [None, "Unknown error"]:
191 _FakeRequestProcessor(compat.partial(self._GetVersionResponseFail,
193 host = "aef9ur4i.example.com"
195 result = proc(body.keys(), "version", body, 60, NotImplemented,
196 _req_process_fn=http_proc)
197 self.assertEqual(result.keys(), [host])
198 lhresp = result[host]
199 self.assertFalse(lhresp.offline)
200 self.assertEqual(lhresp.node, host)
201 self.assert_(lhresp.fail_msg)
202 self.assertFalse(lhresp.payload)
203 self.assertEqual(lhresp.call, "version")
204 self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
205 self.assertEqual(http_proc.reqcount, 1)
207 def _GetHttpErrorResponse(self, httperrnodes, failnodes, req):
208 self.assertEqual(req.path, "/vg_list")
209 self.assertEqual(req.port, 15165)
211 if req.host in httperrnodes:
213 req.error = "Node set up for HTTP errors"
215 elif req.host in failnodes:
217 req.resp_status_code = 404
218 req.resp_body = serializer.DumpJson({
220 "message": "Method not found",
221 "explain": "Explanation goes here",
225 req.resp_status_code = http.HTTP_OK
226 req.resp_body = serializer.DumpJson((True, hash(req.host)))
228 def testHttpError(self):
229 nodes = ["uaf6pbbv%s" % i for i in range(50)]
230 body = dict((n, "") for n in nodes)
231 resolver = rpc._StaticResolver(nodes)
233 httperrnodes = set(nodes[1::7])
234 self.assertEqual(len(httperrnodes), 7)
236 failnodes = set(nodes[2::3]) - httperrnodes
237 self.assertEqual(len(failnodes), 14)
239 self.assertEqual(len(set(nodes) - failnodes - httperrnodes), 29)
241 proc = rpc._RpcProcessor(resolver, 15165)
243 _FakeRequestProcessor(compat.partial(self._GetHttpErrorResponse,
244 httperrnodes, failnodes))
245 result = proc(nodes, "vg_list", body,
246 constants.RPC_TMO_URGENT, NotImplemented,
247 _req_process_fn=http_proc)
248 self.assertEqual(sorted(result.keys()), sorted(nodes))
251 lhresp = result[name]
252 self.assertFalse(lhresp.offline)
253 self.assertEqual(lhresp.node, name)
254 self.assertEqual(lhresp.call, "vg_list")
256 if name in httperrnodes:
257 self.assert_(lhresp.fail_msg)
258 self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
259 elif name in failnodes:
260 self.assert_(lhresp.fail_msg)
261 self.assertRaises(errors.OpPrereqError, lhresp.Raise, "failed",
262 prereq=True, ecode=errors.ECODE_INVAL)
264 self.assertFalse(lhresp.fail_msg)
265 self.assertEqual(lhresp.payload, hash(name))
266 lhresp.Raise("should not raise")
268 self.assertEqual(http_proc.reqcount, len(nodes))
270 def _GetInvalidResponseA(self, req):
271 self.assertEqual(req.path, "/version")
273 req.resp_status_code = http.HTTP_OK
274 req.resp_body = serializer.DumpJson(("This", "is", "an", "invalid",
275 "response", "!", 1, 2, 3))
277 def _GetInvalidResponseB(self, req):
278 self.assertEqual(req.path, "/version")
280 req.resp_status_code = http.HTTP_OK
281 req.resp_body = serializer.DumpJson("invalid response")
283 def testInvalidResponse(self):
284 resolver = rpc._StaticResolver(["oqo7lanhly.example.com"])
285 proc = rpc._RpcProcessor(resolver, 19978)
287 for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]:
288 http_proc = _FakeRequestProcessor(fn)
289 host = "oqo7lanhly.example.com"
291 result = proc([host], "version", body, 60, NotImplemented,
292 _req_process_fn=http_proc)
293 self.assertEqual(result.keys(), [host])
294 lhresp = result[host]
295 self.assertFalse(lhresp.offline)
296 self.assertEqual(lhresp.node, host)
297 self.assert_(lhresp.fail_msg)
298 self.assertFalse(lhresp.payload)
299 self.assertEqual(lhresp.call, "version")
300 self.assertRaises(errors.OpExecError, lhresp.Raise, "failed")
301 self.assertEqual(http_proc.reqcount, 1)
303 def _GetBodyTestResponse(self, test_data, req):
304 self.assertEqual(req.host, "192.0.2.84")
305 self.assertEqual(req.port, 18700)
306 self.assertEqual(req.path, "/upload_file")
307 self.assertEqual(serializer.LoadJson(req.post_data), test_data)
309 req.resp_status_code = http.HTTP_OK
310 req.resp_body = serializer.DumpJson((True, None))
312 def testResponseBody(self):
317 resolver = rpc._StaticResolver(["192.0.2.84"])
318 http_proc = _FakeRequestProcessor(compat.partial(self._GetBodyTestResponse,
320 proc = rpc._RpcProcessor(resolver, 18700)
322 body = {host: serializer.DumpJson(test_data)}
323 result = proc([host], "upload_file", body, 30, NotImplemented,
324 _req_process_fn=http_proc)
325 self.assertEqual(result.keys(), [host])
326 lhresp = result[host]
327 self.assertFalse(lhresp.offline)
328 self.assertEqual(lhresp.node, host)
329 self.assertFalse(lhresp.fail_msg)
330 self.assertEqual(lhresp.payload, None)
331 self.assertEqual(lhresp.call, "upload_file")
332 lhresp.Raise("should not raise")
333 self.assertEqual(http_proc.reqcount, 1)
336 class TestSsconfResolver(unittest.TestCase):
337 def testSsconfLookup(self):
338 addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
339 node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
340 node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
341 ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
342 result = rpc._SsconfResolver(True, node_list, NotImplemented,
343 ssc=ssc, nslookup_fn=NotImplemented)
344 self.assertEqual(result, zip(node_list, addr_list, node_list))
346 def testNsLookup(self):
347 addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
348 node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
349 ssc = GetFakeSimpleStoreClass(lambda _: [])
350 node_addr_map = dict(zip(node_list, addr_list))
351 nslookup_fn = lambda name, family=None: node_addr_map.get(name)
352 result = rpc._SsconfResolver(True, node_list, NotImplemented,
353 ssc=ssc, nslookup_fn=nslookup_fn)
354 self.assertEqual(result, zip(node_list, addr_list, node_list))
356 def testDisabledSsconfIp(self):
357 addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
358 node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
359 ssc = GetFakeSimpleStoreClass(_RaiseNotImplemented)
360 node_addr_map = dict(zip(node_list, addr_list))
361 nslookup_fn = lambda name, family=None: node_addr_map.get(name)
362 result = rpc._SsconfResolver(False, node_list, NotImplemented,
363 ssc=ssc, nslookup_fn=nslookup_fn)
364 self.assertEqual(result, zip(node_list, addr_list, node_list))
366 def testBothLookups(self):
367 addr_list = ["192.0.2.%d" % n for n in range(0, 255, 13)]
368 node_list = ["node%d.example.com" % n for n in range(0, 255, 13)]
369 n = len(addr_list) / 2
370 node_addr_list = [" ".join(t) for t in zip(node_list[n:], addr_list[n:])]
371 ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
372 node_addr_map = dict(zip(node_list[:n], addr_list[:n]))
373 nslookup_fn = lambda name, family=None: node_addr_map.get(name)
374 result = rpc._SsconfResolver(True, node_list, NotImplemented,
375 ssc=ssc, nslookup_fn=nslookup_fn)
376 self.assertEqual(result, zip(node_list, addr_list, node_list))
378 def testAddressLookupIPv6(self):
379 addr_list = ["2001:db8::%d" % n for n in range(0, 255, 11)]
380 node_list = ["node%d.example.com" % n for n in range(0, 255, 11)]
381 node_addr_list = [" ".join(t) for t in zip(node_list, addr_list)]
382 ssc = GetFakeSimpleStoreClass(lambda _: node_addr_list)
383 result = rpc._SsconfResolver(True, node_list, NotImplemented,
384 ssc=ssc, nslookup_fn=NotImplemented)
385 self.assertEqual(result, zip(node_list, addr_list, node_list))
388 class TestStaticResolver(unittest.TestCase):
390 addresses = ["192.0.2.%d" % n for n in range(0, 123, 7)]
391 nodes = ["node%s.example.com" % n for n in range(0, 123, 7)]
392 res = rpc._StaticResolver(addresses)
393 self.assertEqual(res(nodes, NotImplemented), zip(nodes, addresses, nodes))
395 def testWrongLength(self):
396 res = rpc._StaticResolver([])
397 self.assertRaises(AssertionError, res, ["abc"], NotImplemented)
400 class TestNodeConfigResolver(unittest.TestCase):
402 def _GetSingleOnlineNode(uuid):
403 assert uuid == "node90-uuid"
404 return objects.Node(name="node90.example.com",
407 primary_ip="192.0.2.90")
410 def _GetSingleOfflineNode(uuid):
411 assert uuid == "node100-uuid"
412 return objects.Node(name="node100.example.com",
415 primary_ip="192.0.2.100")
417 def testSingleOnline(self):
418 self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOnlineNode,
420 ["node90-uuid"], None),
421 [("node90.example.com", "192.0.2.90", "node90-uuid")])
423 def testSingleOffline(self):
424 self.assertEqual(rpc._NodeConfigResolver(self._GetSingleOfflineNode,
426 ["node100-uuid"], None),
427 [("node100.example.com", rpc._OFFLINE, "node100-uuid")])
429 def testSingleOfflineWithAcceptOffline(self):
430 fn = self._GetSingleOfflineNode
431 assert fn("node100-uuid").offline
432 self.assertEqual(rpc._NodeConfigResolver(fn, NotImplemented,
434 rpc_defs.ACCEPT_OFFLINE_NODE),
435 [("node100.example.com", "192.0.2.100", "node100-uuid")])
436 for i in [False, True, "", "Hello", 0, 1]:
437 self.assertRaises(AssertionError, rpc._NodeConfigResolver,
438 fn, NotImplemented, ["node100.example.com"], i)
440 def testUnknownSingleNode(self):
441 self.assertEqual(rpc._NodeConfigResolver(lambda _: None, NotImplemented,
442 ["node110.example.com"], None),
443 [("node110.example.com", "node110.example.com",
444 "node110.example.com")])
446 def testMultiEmpty(self):
447 self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
452 def testMultiSomeOffline(self):
453 nodes = dict(("node%s-uuid" % i,
454 objects.Node(name="node%s.example.com" % i,
455 offline=((i % 3) == 0),
456 primary_ip="192.0.2.%s" % i,
457 uuid="node%s-uuid" % i))
458 for i in range(1, 255))
461 self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
466 # Offline, online and unknown hosts
467 self.assertEqual(rpc._NodeConfigResolver(NotImplemented,
472 "unknown.example.com",],
474 ("node3.example.com", rpc._OFFLINE, "node3-uuid"),
475 ("node92.example.com", "192.0.2.92", "node92-uuid"),
476 ("node54.example.com", rpc._OFFLINE, "node54-uuid"),
477 ("unknown.example.com", "unknown.example.com", "unknown.example.com"),
481 class TestCompress(unittest.TestCase):
483 for data in ["", "Hello", "Hello World!\nnew\nlines"]:
484 self.assertEqual(rpc._Compress(NotImplemented, data),
485 (constants.RPC_ENCODING_NONE, data))
487 for data in [512 * " ", 5242 * "Hello World!\n"]:
488 compressed = rpc._Compress(NotImplemented, data)
489 self.assertEqual(len(compressed), 2)
490 self.assertEqual(backend._Decompress(compressed), data)
492 def testDecompression(self):
493 self.assertRaises(AssertionError, backend._Decompress, "")
494 self.assertRaises(AssertionError, backend._Decompress, [""])
495 self.assertRaises(AssertionError, backend._Decompress,
496 ("unknown compression", "data"))
497 self.assertRaises(Exception, backend._Decompress,
498 (constants.RPC_ENCODING_ZLIB_BASE64, "invalid zlib data"))
501 class TestRpcClientBase(unittest.TestCase):
502 def testNoHosts(self):
503 cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_SLOW, [],
504 None, None, NotImplemented)
505 http_proc = _FakeRequestProcessor(NotImplemented)
506 client = rpc._RpcClientBase(rpc._StaticResolver([]), NotImplemented,
507 _req_process_fn=http_proc)
508 self.assertEqual(client._Call(cdef, [], []), {})
510 # Test wrong number of arguments
511 self.assertRaises(errors.ProgrammerError, client._Call,
514 def testTimeout(self):
515 def _CalcTimeout((arg1, arg2)):
518 def _VerifyRequest(exp_timeout, req):
519 self.assertEqual(req.read_timeout, exp_timeout)
522 req.resp_status_code = http.HTTP_OK
523 req.resp_body = serializer.DumpJson((True, hex(req.read_timeout)))
525 resolver = rpc._StaticResolver([
535 tests = [(100, None, 100), (30, None, 30)]
536 tests.extend((_CalcTimeout, i, i + 300)
537 for i in [0, 5, 16485, 30516])
539 for timeout, arg1, exp_timeout in tests:
540 cdef = ("test_call", NotImplemented, None, timeout, [
541 ("arg1", None, NotImplemented),
542 ("arg2", None, NotImplemented),
543 ], None, None, NotImplemented)
545 http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest,
547 client = rpc._RpcClientBase(resolver, NotImplemented,
548 _req_process_fn=http_proc)
549 result = client._Call(cdef, nodes, [arg1, 300])
550 self.assertEqual(len(result), len(nodes))
551 self.assertTrue(compat.all(not res.fail_msg and
552 res.payload == hex(exp_timeout)
553 for res in result.values()))
555 def testArgumentEncoder(self):
556 (AT1, AT2) = range(1, 3)
558 resolver = rpc._StaticResolver([
569 AT1: lambda _, value: hex(value),
570 AT2: lambda _, value: hash(value),
573 cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [
574 ("arg0", None, NotImplemented),
575 ("arg1", AT1, NotImplemented),
576 ("arg1", AT2, NotImplemented),
577 ], None, None, NotImplemented)
579 def _VerifyRequest(req):
581 req.resp_status_code = http.HTTP_OK
582 req.resp_body = serializer.DumpJson((True, req.post_data))
584 http_proc = _FakeRequestProcessor(_VerifyRequest)
586 for num in [0, 3796, 9032119]:
587 client = rpc._RpcClientBase(resolver, encoders.get,
588 _req_process_fn=http_proc)
589 result = client._Call(cdef, nodes, ["foo", num, "Hello%s" % num])
590 self.assertEqual(len(result), len(nodes))
591 for res in result.values():
592 self.assertFalse(res.fail_msg)
593 self.assertEqual(serializer.LoadJson(res.payload),
594 ["foo", hex(num), hash("Hello%s" % num)])
596 def testPostProc(self):
597 def _VerifyRequest(nums, req):
599 req.resp_status_code = http.HTTP_OK
600 req.resp_body = serializer.DumpJson((True, nums))
602 resolver = rpc._StaticResolver([
608 "node90.example.com",
609 "node95.example.com",
613 self.assertFalse(res.fail_msg)
614 res.payload = sum(res.payload)
617 cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [],
618 None, _PostProc, NotImplemented)
620 # Seeded random generator
621 rnd = random.Random(20299)
623 for i in [0, 4, 74, 1391]:
624 nums = [rnd.randint(0, 1000) for _ in range(i)]
625 http_proc = _FakeRequestProcessor(compat.partial(_VerifyRequest, nums))
626 client = rpc._RpcClientBase(resolver, NotImplemented,
627 _req_process_fn=http_proc)
628 result = client._Call(cdef, nodes, [])
629 self.assertEqual(len(result), len(nodes))
630 for res in result.values():
631 self.assertFalse(res.fail_msg)
632 self.assertEqual(res.payload, sum(nums))
634 def testPreProc(self):
635 def _VerifyRequest(req):
637 req.resp_status_code = http.HTTP_OK
638 req.resp_body = serializer.DumpJson((True, req.post_data))
640 resolver = rpc._StaticResolver([
646 "node30.example.com",
647 "node35.example.com",
650 def _PreProc(node, data):
651 self.assertEqual(len(data), 1)
652 return data[0] + node
654 cdef = ("test_call", NotImplemented, None, constants.RPC_TMO_NORMAL, [
655 ("arg0", None, NotImplemented),
656 ], _PreProc, None, NotImplemented)
658 http_proc = _FakeRequestProcessor(_VerifyRequest)
659 client = rpc._RpcClientBase(resolver, NotImplemented,
660 _req_process_fn=http_proc)
662 for prefix in ["foo", "bar", "baz"]:
663 result = client._Call(cdef, nodes, [prefix])
664 self.assertEqual(len(result), len(nodes))
665 for (idx, (node, res)) in enumerate(result.items()):
666 self.assertFalse(res.fail_msg)
667 self.assertEqual(serializer.LoadJson(res.payload), prefix + node)
669 def testResolverOptions(self):
670 def _VerifyRequest(req):
672 req.resp_status_code = http.HTTP_OK
673 req.resp_body = serializer.DumpJson((True, req.post_data))
676 "node30.example.com",
677 "node35.example.com",
680 def _Resolver(expected, hosts, options):
681 self.assertEqual(hosts, nodes)
682 self.assertEqual(options, expected)
683 return zip(hosts, nodes, hosts)
685 def _DynamicResolverOptions((arg0, )):
690 (rpc_defs.ACCEPT_OFFLINE_NODE, None, rpc_defs.ACCEPT_OFFLINE_NODE),
691 (False, None, False),
694 (_DynamicResolverOptions, [1, 2, 3], 6),
695 (_DynamicResolverOptions, range(4, 19), 165),
698 for (resolver_opts, arg0, expected) in tests:
699 cdef = ("test_call", NotImplemented, resolver_opts,
700 constants.RPC_TMO_NORMAL, [
701 ("arg0", None, NotImplemented),
702 ], None, None, NotImplemented)
704 http_proc = _FakeRequestProcessor(_VerifyRequest)
706 client = rpc._RpcClientBase(compat.partial(_Resolver, expected),
707 NotImplemented, _req_process_fn=http_proc)
708 result = client._Call(cdef, nodes, [arg0])
709 self.assertEqual(len(result), len(nodes))
710 for (idx, (node, res)) in enumerate(result.items()):
711 self.assertFalse(res.fail_msg)
714 class _FakeConfigForRpcRunner:
715 GetAllNodesInfo = NotImplemented
717 def __init__(self, cluster=NotImplemented):
718 self._cluster = cluster
720 def GetNodeInfo(self, name):
721 return objects.Node(name=name)
723 def GetMultiNodeInfo(self, names):
724 return [(name, self.GetNodeInfo(name)) for name in names]
726 def GetClusterInfo(self):
729 def GetInstanceDiskParams(self, _):
730 return constants.DISK_DT_DEFAULTS
733 class TestRpcRunner(unittest.TestCase):
734 def testUploadFile(self):
735 data = 1779 * "Hello World\n"
737 tmpfile = tempfile.NamedTemporaryFile()
740 st = os.stat(tmpfile.name)
746 def _VerifyRequest(req):
747 (uldata, ) = serializer.LoadJson(req.post_data)
748 self.assertEqual(len(uldata), 7)
749 self.assertEqual(uldata[0], tmpfile.name)
750 self.assertEqual(list(uldata[1]), list(rpc._Compress(nodes[0], data)))
751 self.assertEqual(uldata[2], st.st_mode)
752 self.assertEqual(uldata[3], "user%s" % os.getuid())
753 self.assertEqual(uldata[4], "group%s" % os.getgid())
754 self.assertTrue(uldata[5] is not None)
755 self.assertEqual(uldata[6], st.st_mtime)
758 req.resp_status_code = http.HTTP_OK
759 req.resp_body = serializer.DumpJson((True, None))
761 http_proc = _FakeRequestProcessor(_VerifyRequest)
763 std_runner = rpc.RpcRunner(_FakeConfigForRpcRunner(), None,
764 _req_process_fn=http_proc,
765 _getents=mocks.FakeGetentResolver)
767 cfg_runner = rpc.ConfigRunner(None, ["192.0.2.13"],
768 _req_process_fn=http_proc,
769 _getents=mocks.FakeGetentResolver)
771 for runner in [std_runner, cfg_runner]:
772 result = runner.call_upload_file(nodes, tmpfile.name)
773 self.assertEqual(len(result), len(nodes))
774 for (idx, (node, res)) in enumerate(result.items()):
775 self.assertFalse(res.fail_msg)
777 def testEncodeInstance(self):
778 cluster = objects.Cluster(hvparams={
780 constants.HV_CDROM_IMAGE_PATH: "foo",
784 constants.PP_DEFAULT: {
785 constants.BE_MAXMEM: 8192,
794 cluster.UpgradeConfig()
796 inst = objects.Instance(name="inst1.example.com",
797 hypervisor=constants.HT_KVM,
800 constants.HV_CDROM_IMAGE_PATH: "bar",
801 constants.HV_ROOT_PATH: "/tmp",
804 constants.BE_MINMEM: 128,
805 constants.BE_MAXMEM: 256,
808 objects.NIC(nicparams={
809 constants.NIC_MODE: "mymode",
812 disk_template=constants.DT_PLAIN,
814 objects.Disk(dev_type=constants.DT_PLAIN, size=4096,
815 logical_id=("vg", "disk6120")),
816 objects.Disk(dev_type=constants.DT_PLAIN, size=1024,
817 logical_id=("vg", "disk8508")),
821 cfg = _FakeConfigForRpcRunner(cluster=cluster)
822 runner = rpc.RpcRunner(cfg, None,
823 _req_process_fn=NotImplemented,
824 _getents=mocks.FakeGetentResolver)
826 def _CheckBasics(result):
827 self.assertEqual(result["name"], "inst1.example.com")
828 self.assertEqual(result["os"], "linux")
829 self.assertEqual(result["beparams"][constants.BE_MINMEM], 128)
830 self.assertEqual(len(result["nics"]), 1)
831 self.assertEqual(result["nics"][0]["nicparams"][constants.NIC_MODE],
834 # Generic object serialization
835 result = runner._encoder(NotImplemented, (rpc_defs.ED_OBJECT_DICT, inst))
837 self.assertEqual(len(result["hvparams"]), 2)
839 result = runner._encoder(NotImplemented,
840 (rpc_defs.ED_OBJECT_DICT_LIST, 5 * [inst]))
841 map(_CheckBasics, result)
842 map(lambda r: self.assertEqual(len(r["hvparams"]), 2), result)
845 result = runner._encoder(NotImplemented, (rpc_defs.ED_INST_DICT, inst))
847 self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256)
848 self.assertEqual(result["hvparams"][constants.HV_CDROM_IMAGE_PATH], "bar")
849 self.assertEqual(result["hvparams"][constants.HV_ROOT_PATH], "/tmp")
850 self.assertEqual(result["osparams"], {
853 self.assertEqual(len(result["hvparams"]),
854 len(constants.HVC_DEFAULTS[constants.HT_KVM]))
856 # Instance with OS parameters
857 result = runner._encoder(NotImplemented,
858 (rpc_defs.ED_INST_DICT_OSP_DP, (inst, {
863 self.assertEqual(result["beparams"][constants.BE_MAXMEM], 256)
864 self.assertEqual(result["hvparams"][constants.HV_CDROM_IMAGE_PATH], "bar")
865 self.assertEqual(result["hvparams"][constants.HV_ROOT_PATH], "/tmp")
866 self.assertEqual(result["osparams"], {
871 # Instance with hypervisor and backend parameters
872 result = runner._encoder(NotImplemented,
873 (rpc_defs.ED_INST_DICT_HVP_BEP_DP, (inst, {
875 constants.HV_BOOT_ORDER: "xyz",
878 constants.BE_VCPUS: 100,
879 constants.BE_MAXMEM: 4096,
882 self.assertEqual(result["beparams"][constants.BE_MAXMEM], 4096)
883 self.assertEqual(result["beparams"][constants.BE_VCPUS], 100)
884 self.assertEqual(result["hvparams"][constants.HT_KVM], {
885 constants.HV_BOOT_ORDER: "xyz",
887 self.assertEqual(result["disks"], [{
888 "dev_type": constants.DT_PLAIN,
889 "dynamic_params": {},
891 "logical_id": ("vg", "disk6120"),
892 "params": constants.DISK_DT_DEFAULTS[inst.disk_template],
894 "dev_type": constants.DT_PLAIN,
895 "dynamic_params": {},
897 "logical_id": ("vg", "disk8508"),
898 "params": constants.DISK_DT_DEFAULTS[inst.disk_template],
901 self.assertTrue(compat.all(disk.params == {} for disk in inst.disks),
902 msg="Configuration objects were modified")
905 class TestLegacyNodeInfo(unittest.TestCase):
908 KEY_STORAGE_FREE = "storage_free"
909 KEY_STORAGE_TOTAL = "storage_size"
910 KEY_CPU_COUNT = "cpu_count"
911 KEY_SPINDLES_FREE = "spindles_free"
912 KEY_SPINDLES_TOTAL = "spindles_total"
913 KEY_STORAGE_TYPE = "type" # key for storage type
918 VAL_VG_TYPE = "lvm-vg"
923 VAL_PV_TYPE = "lvm-pv"
925 KEY_NAME: VAL_VG_NAME,
926 KEY_STORAGE_FREE: VAL_VG_FREE,
927 KEY_STORAGE_TOTAL: VAL_VG_TOTAL,
928 KEY_STORAGE_TYPE: VAL_VG_TYPE,
930 DICT_HV = {KEY_CPU_COUNT: VAL_CPU_COUNT}
932 KEY_STORAGE_TYPE: VAL_PV_TYPE,
933 KEY_NAME: VAL_PV_NAME,
934 KEY_STORAGE_FREE: VAL_PV_FREE,
935 KEY_STORAGE_TOTAL: VAL_PV_TOTAL,
937 STD_LST = [VAL_BOOT, [DICT_VG, DICT_SP], [DICT_HV]]
940 KEY_NAME: VAL_VG_NAME,
941 KEY_STORAGE_FREE: VAL_VG_FREE,
942 KEY_STORAGE_TOTAL: VAL_VG_TOTAL,
943 KEY_SPINDLES_FREE: VAL_PV_FREE,
944 KEY_SPINDLES_TOTAL: VAL_PV_TOTAL,
945 KEY_CPU_COUNT: VAL_CPU_COUNT,
948 def testWithSpindles(self):
949 result = rpc.MakeLegacyNodeInfo(self.STD_LST, constants.DT_PLAIN)
950 self.assertEqual(result, self.STD_DICT)
952 def testNoSpindles(self):
953 my_lst = [self.VAL_BOOT, [self.DICT_VG], [self.DICT_HV]]
954 result = rpc.MakeLegacyNodeInfo(my_lst, constants.DT_PLAIN)
955 expected_dict = dict((k,v) for k, v in self.STD_DICT.iteritems())
956 expected_dict[self.KEY_SPINDLES_FREE] = 0
957 expected_dict[self.KEY_SPINDLES_TOTAL] = 0
958 self.assertEqual(result, expected_dict)
961 if __name__ == "__main__":
962 testutils.GanetiTestProgram()