4 # Copyright (C) 2008, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the cli module"""
28 from cStringIO import StringIO
30 from ganeti import constants
31 from ganeti import cli
32 from ganeti import errors
33 from ganeti import utils
34 from ganeti import objects
35 from ganeti import qlang
36 from ganeti.errors import OpPrereqError, ParameterError
39 class TestParseTimespec(unittest.TestCase):
40 """Testing case for ParseTimespec"""
42 def testValidTimes(self):
43 """Test valid timespecs"""
50 ("1w", 60 * 60 * 24 * 7),
54 for value, expected_result in test_data:
55 self.failUnlessEqual(cli.ParseTimespec(value), expected_result)
57 def testInvalidTime(self):
58 """Test invalid timespecs"""
65 for value in test_data:
66 self.failUnlessRaises(OpPrereqError, cli.ParseTimespec, value)
69 class TestSplitKeyVal(unittest.TestCase):
70 """Testing case for cli._SplitKeyVal"""
71 DATA = "a=b,c,no_d,-e"
72 RESULT = {"a": "b", "c": True, "d": False, "e": None}
73 RESULT_NOPREFIX = {"a": "b", "c": {}, "no_d": {}, "-e": {}}
75 def testSplitKeyVal(self):
77 self.failUnlessEqual(cli._SplitKeyVal("option", self.DATA, True),
80 def testDuplicateParam(self):
81 """Test duplicate parameters"""
82 for data in ("a=1,a=2", "a,no_a"):
83 self.failUnlessRaises(ParameterError, cli._SplitKeyVal,
86 def testEmptyData(self):
87 """Test how we handle splitting an empty string"""
88 self.failUnlessEqual(cli._SplitKeyVal("option", "", True), {})
91 class TestIdentKeyVal(unittest.TestCase):
92 """Testing case for cli.check_ident_key_val"""
94 def testIdentKeyVal(self):
95 """Test identkeyval"""
97 return cli.check_ident_key_val("option", "opt", value)
99 self.assertEqual(cikv("foo:bar"), ("foo", {"bar": True}))
100 self.assertEqual(cikv("foo:bar=baz"), ("foo", {"bar": "baz"}))
101 self.assertEqual(cikv("bar:b=c,c=a"), ("bar", {"b": "c", "c": "a"}))
102 self.assertEqual(cikv("no_bar"), ("bar", False))
103 self.assertRaises(ParameterError, cikv, "no_bar:foo")
104 self.assertRaises(ParameterError, cikv, "no_bar:foo=baz")
105 self.assertRaises(ParameterError, cikv, "bar:foo=baz,foo=baz")
106 self.assertEqual(cikv("-foo"), ("foo", None))
107 self.assertRaises(ParameterError, cikv, "-foo:a=c")
109 # Check negative numbers
110 self.assertEqual(cikv("-1:remove"), ("-1", {
113 self.assertEqual(cikv("-29447:add,size=4G"), ("-29447", {
117 for i in ["-:", "-"]:
118 self.assertEqual(cikv(i), ("", None))
122 return cli._SplitIdentKeyVal("opt", value, False)
124 def testIdentKeyValNoPrefix(self):
125 """Test identkeyval without prefixes"""
128 ("foo:no_bar", None),
129 ("foo:bar=baz,bar=baz", None),
133 ("foo", {"bar": "baz"})),
134 ("no_foo:-1=baz,no_op=3",
135 ("no_foo", {"-1": "baz", "no_op": "3"})),
137 for (arg, res) in test_cases:
139 self.assertRaises(ParameterError, self._csikv, arg)
141 self.assertEqual(self._csikv(arg), res)
144 class TestListIdentKeyVal(unittest.TestCase):
145 """Test for cli.check_list_ident_key_val()"""
149 return cli.check_list_ident_key_val("option", "opt", value)
151 def testListIdentKeyVal(self):
158 {"foo": {"bar": "baz"}}),
159 ("foo:bar=baz/foo:bat=bad",
161 ("foo:abc=42/bar:def=11",
162 {"foo": {"abc": "42"},
163 "bar": {"def": "11"}}),
164 ("foo:abc=42/bar:def=11,ghi=07",
165 {"foo": {"abc": "42"},
166 "bar": {"def": "11", "ghi": "07"}}),
168 for (arg, res) in test_cases:
170 self.assertRaises(ParameterError, self._clikv, arg)
172 self.assertEqual(res, self._clikv(arg))
175 class TestToStream(unittest.TestCase):
176 """Test the ToStream functions"""
186 cli._ToStream(buf, data)
187 self.failUnlessEqual(buf.getvalue(), data + "\n")
189 def testParams(self):
191 cli._ToStream(buf, "foo %s", 1)
192 self.failUnlessEqual(buf.getvalue(), "foo 1\n")
194 cli._ToStream(buf, "foo %s", (15,16))
195 self.failUnlessEqual(buf.getvalue(), "foo (15, 16)\n")
197 cli._ToStream(buf, "foo %s %s", "a", "b")
198 self.failUnlessEqual(buf.getvalue(), "foo a b\n")
201 class TestGenerateTable(unittest.TestCase):
202 HEADERS = dict([("f%s" % i, "Field%s" % i) for i in range(5)])
204 FIELDS1 = ["f1", "f2"]
211 def _test(self, headers, fields, separator, data,
212 numfields, unitfields, units, expected):
213 table = cli.GenerateTable(headers, fields, separator, data,
214 numfields=numfields, unitfields=unitfields,
216 self.assertEqual(table, expected)
225 self._test(self.HEADERS, self.FIELDS1, None, self.DATA1,
226 None, None, "m", exp)
228 def testNoFields(self):
229 self._test(self.HEADERS, [], None, [[], []],
230 None, None, "m", ["", "", ""])
231 self._test(None, [], None, [[], []],
232 None, None, "m", ["", ""])
234 def testSeparator(self):
235 for sep in ["#", ":", ",", "^", "!", "%", "|", "###", "%%", "!!!", "||"]:
237 "Field1%sField2" % sep,
242 self._test(self.HEADERS, self.FIELDS1, sep, self.DATA1,
243 None, None, "m", exp)
245 def testNoHeader(self):
251 self._test(None, self.FIELDS1, None, self.DATA1,
252 None, None, "m", exp)
254 def testUnknownField(self):
264 self._test(headers, ["f1", "UNKNOWN"], None, self.DATA1,
265 None, None, "m", exp)
267 def testNumfields(self):
268 fields = ["f1", "f2", "f3"]
275 "Field1 Field2 Field3",
280 self._test(self.HEADERS, fields, None, data,
281 ["f2", "f3"], None, "m", exp)
283 def testUnitfields(self):
285 "Field1 Field2 Field3",
292 "Field1:Field2:Field3",
298 for sep, expected in [(None, expnosep), (":", expsep)]:
299 fields = ["f1", "f2", "f3"]
305 self._test(self.HEADERS, fields, sep, data,
306 ["f2", "f3"], ["f3"], "h", expected)
308 def testUnusual(self):
318 self._test(self.HEADERS, ["f1", "f2"], None, data,
319 None, None, "m", exp)
322 class TestFormatQueryResult(unittest.TestCase):
325 objects.QueryFieldDefinition(name="name", title="Name",
326 kind=constants.QFT_TEXT),
327 objects.QueryFieldDefinition(name="size", title="Size",
328 kind=constants.QFT_NUMBER),
329 objects.QueryFieldDefinition(name="act", title="Active",
330 kind=constants.QFT_BOOL),
331 objects.QueryFieldDefinition(name="mem", title="Memory",
332 kind=constants.QFT_UNIT),
333 objects.QueryFieldDefinition(name="other", title="SomeList",
334 kind=constants.QFT_OTHER),
337 response = objects.QueryResponse(fields=fields, data=[
338 [(constants.RS_NORMAL, "nodeA"), (constants.RS_NORMAL, 128),
339 (constants.RS_NORMAL, False), (constants.RS_NORMAL, 1468006),
340 (constants.RS_NORMAL, [])],
341 [(constants.RS_NORMAL, "other"), (constants.RS_NORMAL, 512),
342 (constants.RS_NORMAL, True), (constants.RS_NORMAL, 16),
343 (constants.RS_NORMAL, [1, 2, 3])],
344 [(constants.RS_NORMAL, "xyz"), (constants.RS_NORMAL, 1024),
345 (constants.RS_NORMAL, True), (constants.RS_NORMAL, 4096),
346 (constants.RS_NORMAL, [{}, {}])],
349 self.assertEqual(cli.FormatQueryResult(response, unit="h", header=True),
351 "Name Size Active Memory SomeList",
352 "nodeA 128 N 1.4T []",
353 "other 512 Y 16M [1, 2, 3]",
354 "xyz 1024 Y 4.0G [{}, {}]",
357 def testTimestampAndUnit(self):
359 objects.QueryFieldDefinition(name="name", title="Name",
360 kind=constants.QFT_TEXT),
361 objects.QueryFieldDefinition(name="size", title="Size",
362 kind=constants.QFT_UNIT),
363 objects.QueryFieldDefinition(name="mtime", title="ModTime",
364 kind=constants.QFT_TIMESTAMP),
367 response = objects.QueryResponse(fields=fields, data=[
368 [(constants.RS_NORMAL, "a"), (constants.RS_NORMAL, 1024),
369 (constants.RS_NORMAL, 0)],
370 [(constants.RS_NORMAL, "b"), (constants.RS_NORMAL, 144996),
371 (constants.RS_NORMAL, 1291746295)],
374 self.assertEqual(cli.FormatQueryResult(response, unit="m", header=True),
377 "a 1024 %s" % utils.FormatTime(0),
378 "b 144996 %s" % utils.FormatTime(1291746295),
381 def testOverride(self):
383 objects.QueryFieldDefinition(name="name", title="Name",
384 kind=constants.QFT_TEXT),
385 objects.QueryFieldDefinition(name="cust", title="Custom",
386 kind=constants.QFT_OTHER),
387 objects.QueryFieldDefinition(name="xt", title="XTime",
388 kind=constants.QFT_TIMESTAMP),
391 response = objects.QueryResponse(fields=fields, data=[
392 [(constants.RS_NORMAL, "x"), (constants.RS_NORMAL, ["a", "b", "c"]),
393 (constants.RS_NORMAL, 1234)],
394 [(constants.RS_NORMAL, "y"), (constants.RS_NORMAL, range(10)),
395 (constants.RS_NORMAL, 1291746295)],
399 "cust": (utils.CommaJoin, False),
403 self.assertEqual(cli.FormatQueryResult(response, unit="h", header=True,
404 format_override=override),
408 "y 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 0x4cfe7bf7",
411 def testSeparator(self):
413 objects.QueryFieldDefinition(name="name", title="Name",
414 kind=constants.QFT_TEXT),
415 objects.QueryFieldDefinition(name="count", title="Count",
416 kind=constants.QFT_NUMBER),
417 objects.QueryFieldDefinition(name="desc", title="Description",
418 kind=constants.QFT_TEXT),
421 response = objects.QueryResponse(fields=fields, data=[
422 [(constants.RS_NORMAL, "instance1.example.com"),
423 (constants.RS_NORMAL, 21125), (constants.RS_NORMAL, "Hello World!")],
424 [(constants.RS_NORMAL, "mail.other.net"),
425 (constants.RS_NORMAL, -9000), (constants.RS_NORMAL, "a,b,c")],
428 for sep in [":", "|", "#", "|||", "###", "@@@", "@#@"]:
429 for header in [None, "Name%sCount%sDescription" % (sep, sep)]:
434 "instance1.example.com%s21125%sHello World!" % (sep, sep),
435 "mail.other.net%s-9000%sa,b,c" % (sep, sep),
438 self.assertEqual(cli.FormatQueryResult(response, separator=sep,
439 header=bool(header)),
440 (cli.QR_NORMAL, exp))
442 def testStatusWithUnknown(self):
444 objects.QueryFieldDefinition(name="id", title="ID",
445 kind=constants.QFT_NUMBER),
446 objects.QueryFieldDefinition(name="unk", title="unk",
447 kind=constants.QFT_UNKNOWN),
448 objects.QueryFieldDefinition(name="unavail", title="Unavail",
449 kind=constants.QFT_BOOL),
450 objects.QueryFieldDefinition(name="nodata", title="NoData",
451 kind=constants.QFT_TEXT),
452 objects.QueryFieldDefinition(name="offline", title="OffLine",
453 kind=constants.QFT_TEXT),
456 response = objects.QueryResponse(fields=fields, data=[
457 [(constants.RS_NORMAL, 1), (constants.RS_UNKNOWN, None),
458 (constants.RS_NORMAL, False), (constants.RS_NORMAL, ""),
459 (constants.RS_OFFLINE, None)],
460 [(constants.RS_NORMAL, 2), (constants.RS_UNKNOWN, None),
461 (constants.RS_NODATA, None), (constants.RS_NORMAL, "x"),
462 (constants.RS_OFFLINE, None)],
463 [(constants.RS_NORMAL, 3), (constants.RS_UNKNOWN, None),
464 (constants.RS_NORMAL, False), (constants.RS_UNAVAIL, None),
465 (constants.RS_OFFLINE, None)],
468 self.assertEqual(cli.FormatQueryResult(response, header=True,
469 separator="|", verbose=True),
471 "ID|unk|Unavail|NoData|OffLine",
472 "1|(unknown)|N||(offline)",
473 "2|(unknown)|(nodata)|x|(offline)",
474 "3|(unknown)|N|(unavail)|(offline)",
476 self.assertEqual(cli.FormatQueryResult(response, header=True,
477 separator="|", verbose=False),
479 "ID|unk|Unavail|NoData|OffLine",
485 def testNoData(self):
487 objects.QueryFieldDefinition(name="id", title="ID",
488 kind=constants.QFT_NUMBER),
489 objects.QueryFieldDefinition(name="name", title="Name",
490 kind=constants.QFT_TEXT),
493 response = objects.QueryResponse(fields=fields, data=[])
495 self.assertEqual(cli.FormatQueryResult(response, header=True),
496 (cli.QR_NORMAL, ["ID Name"]))
498 def testNoDataWithUnknown(self):
500 objects.QueryFieldDefinition(name="id", title="ID",
501 kind=constants.QFT_NUMBER),
502 objects.QueryFieldDefinition(name="unk", title="unk",
503 kind=constants.QFT_UNKNOWN),
506 response = objects.QueryResponse(fields=fields, data=[])
508 self.assertEqual(cli.FormatQueryResult(response, header=False),
509 (cli.QR_UNKNOWN, []))
511 def testStatus(self):
513 objects.QueryFieldDefinition(name="id", title="ID",
514 kind=constants.QFT_NUMBER),
515 objects.QueryFieldDefinition(name="unavail", title="Unavail",
516 kind=constants.QFT_BOOL),
517 objects.QueryFieldDefinition(name="nodata", title="NoData",
518 kind=constants.QFT_TEXT),
519 objects.QueryFieldDefinition(name="offline", title="OffLine",
520 kind=constants.QFT_TEXT),
523 response = objects.QueryResponse(fields=fields, data=[
524 [(constants.RS_NORMAL, 1), (constants.RS_NORMAL, False),
525 (constants.RS_NORMAL, ""), (constants.RS_OFFLINE, None)],
526 [(constants.RS_NORMAL, 2), (constants.RS_NODATA, None),
527 (constants.RS_NORMAL, "x"), (constants.RS_NORMAL, "abc")],
528 [(constants.RS_NORMAL, 3), (constants.RS_NORMAL, False),
529 (constants.RS_UNAVAIL, None), (constants.RS_OFFLINE, None)],
532 self.assertEqual(cli.FormatQueryResult(response, header=False,
533 separator="|", verbose=True),
534 (cli.QR_INCOMPLETE, [
537 "3|N|(unavail)|(offline)",
539 self.assertEqual(cli.FormatQueryResult(response, header=False,
540 separator="|", verbose=False),
541 (cli.QR_INCOMPLETE, [
547 def testInvalidFieldType(self):
549 objects.QueryFieldDefinition(name="x", title="x",
550 kind="#some#other#type"),
553 response = objects.QueryResponse(fields=fields, data=[])
555 self.assertRaises(NotImplementedError, cli.FormatQueryResult, response)
557 def testInvalidFieldStatus(self):
559 objects.QueryFieldDefinition(name="x", title="x",
560 kind=constants.QFT_TEXT),
563 response = objects.QueryResponse(fields=fields, data=[[(-1, None)]])
564 self.assertRaises(NotImplementedError, cli.FormatQueryResult, response)
566 response = objects.QueryResponse(fields=fields, data=[[(-1, "x")]])
567 self.assertRaises(AssertionError, cli.FormatQueryResult, response)
569 def testEmptyFieldTitle(self):
571 objects.QueryFieldDefinition(name="x", title="",
572 kind=constants.QFT_TEXT),
575 response = objects.QueryResponse(fields=fields, data=[])
576 self.assertRaises(AssertionError, cli.FormatQueryResult, response)
579 class _MockJobPollCb(cli.JobPollCbBase, cli.JobPollReportCbBase):
580 def __init__(self, tc, job_id):
585 self._expect_notchanged = False
586 self._expect_log = []
588 def CheckEmpty(self):
589 self.tc.assertFalse(self._wfjcr)
590 self.tc.assertFalse(self._jobstatus)
591 self.tc.assertFalse(self._expect_notchanged)
592 self.tc.assertFalse(self._expect_log)
594 def AddWfjcResult(self, *args):
595 self._wfjcr.append(args)
597 def AddQueryJobsResult(self, *args):
598 self._jobstatus.append(args)
600 def WaitForJobChangeOnce(self, job_id, fields,
601 prev_job_info, prev_log_serial):
602 self.tc.assertEqual(job_id, self.job_id)
603 self.tc.assertEqualValues(fields, ["status"])
604 self.tc.assertFalse(self._expect_notchanged)
605 self.tc.assertFalse(self._expect_log)
607 (exp_prev_job_info, exp_prev_log_serial, result) = self._wfjcr.pop(0)
608 self.tc.assertEqualValues(prev_job_info, exp_prev_job_info)
609 self.tc.assertEqual(prev_log_serial, exp_prev_log_serial)
611 if result == constants.JOB_NOTCHANGED:
612 self._expect_notchanged = True
614 (_, logmsgs) = result
616 self._expect_log.extend(logmsgs)
620 def QueryJobs(self, job_ids, fields):
621 self.tc.assertEqual(job_ids, [self.job_id])
622 self.tc.assertEqualValues(fields, ["status", "opstatus", "opresult"])
623 self.tc.assertFalse(self._expect_notchanged)
624 self.tc.assertFalse(self._expect_log)
626 result = self._jobstatus.pop(0)
627 self.tc.assertEqual(len(fields), len(result))
630 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
631 self.tc.assertEqual(job_id, self.job_id)
632 self.tc.assertEqualValues((serial, timestamp, log_type, log_msg),
633 self._expect_log.pop(0))
635 def ReportNotChanged(self, job_id, status):
636 self.tc.assertEqual(job_id, self.job_id)
637 self.tc.assert_(self._expect_notchanged)
638 self._expect_notchanged = False
641 class TestGenericPollJob(testutils.GanetiTestCase):
642 def testSuccessWithLog(self):
644 cbs = _MockJobPollCb(self, job_id)
646 cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
648 cbs.AddWfjcResult(None, None,
649 ((constants.JOB_STATUS_QUEUED, ), None))
651 cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
652 constants.JOB_NOTCHANGED)
654 cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
655 ((constants.JOB_STATUS_RUNNING, ),
656 [(1, utils.SplitTime(1273491611.0),
657 constants.ELOG_MESSAGE, "Step 1"),
658 (2, utils.SplitTime(1273491615.9),
659 constants.ELOG_MESSAGE, "Step 2"),
660 (3, utils.SplitTime(1273491625.02),
661 constants.ELOG_MESSAGE, "Step 3"),
662 (4, utils.SplitTime(1273491635.05),
663 constants.ELOG_MESSAGE, "Step 4"),
664 (37, utils.SplitTime(1273491645.0),
665 constants.ELOG_MESSAGE, "Step 5"),
666 (203, utils.SplitTime(127349155.0),
667 constants.ELOG_MESSAGE, "Step 6")]))
669 cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 203,
670 ((constants.JOB_STATUS_RUNNING, ),
671 [(300, utils.SplitTime(1273491711.01),
672 constants.ELOG_MESSAGE, "Step X"),
673 (302, utils.SplitTime(1273491815.8),
674 constants.ELOG_MESSAGE, "Step Y"),
675 (303, utils.SplitTime(1273491925.32),
676 constants.ELOG_MESSAGE, "Step Z")]))
678 cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 303,
679 ((constants.JOB_STATUS_SUCCESS, ), None))
681 cbs.AddQueryJobsResult(constants.JOB_STATUS_SUCCESS,
682 [constants.OP_STATUS_SUCCESS,
683 constants.OP_STATUS_SUCCESS],
684 ["Hello World", "Foo man bar"])
686 self.assertEqual(["Hello World", "Foo man bar"],
687 cli.GenericPollJob(job_id, cbs, cbs))
690 def testJobLost(self):
693 cbs = _MockJobPollCb(self, job_id)
694 cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
695 cbs.AddWfjcResult(None, None, None)
696 self.assertRaises(errors.JobLost, cli.GenericPollJob, job_id, cbs, cbs)
702 cbs = _MockJobPollCb(self, job_id)
703 cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
704 cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
705 cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
706 [constants.OP_STATUS_SUCCESS,
707 constants.OP_STATUS_ERROR],
708 ["Hello World", "Error code 123"])
709 self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
712 def testError2(self):
715 cbs = _MockJobPollCb(self, job_id)
716 cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
717 encexc = errors.EncodeException(errors.LockError("problem"))
718 cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
719 [constants.OP_STATUS_ERROR], [encexc])
720 self.assertRaises(errors.LockError, cli.GenericPollJob, job_id, cbs, cbs)
723 def testWeirdError(self):
726 cbs = _MockJobPollCb(self, job_id)
727 cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
728 cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
729 [constants.OP_STATUS_RUNNING,
730 constants.OP_STATUS_RUNNING],
732 self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
735 def testCancel(self):
738 cbs = _MockJobPollCb(self, job_id)
739 cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
740 cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_CANCELING, ), None))
741 cbs.AddQueryJobsResult(constants.JOB_STATUS_CANCELING,
742 [constants.OP_STATUS_CANCELING,
743 constants.OP_STATUS_CANCELING],
745 self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
749 class TestFormatLogMessage(unittest.TestCase):
751 self.assertEqual(cli.FormatLogMessage(constants.ELOG_MESSAGE,
754 self.assertRaises(TypeError, cli.FormatLogMessage,
755 constants.ELOG_MESSAGE, [1, 2, 3])
757 self.assert_(cli.FormatLogMessage("some other type", (1, 2, 3)))
760 class TestParseFields(unittest.TestCase):
762 self.assertEqual(cli.ParseFields(None, []), [])
763 self.assertEqual(cli.ParseFields("name,foo,hello", []),
764 ["name", "foo", "hello"])
765 self.assertEqual(cli.ParseFields(None, ["def", "ault", "fields", "here"]),
766 ["def", "ault", "fields", "here"])
767 self.assertEqual(cli.ParseFields("name,foo", ["def", "ault"]),
769 self.assertEqual(cli.ParseFields("+name,foo", ["def", "ault"]),
770 ["def", "ault", "name", "foo"])
773 class TestConstants(unittest.TestCase):
774 def testPriority(self):
775 self.assertEqual(set(cli._PRIONAME_TO_VALUE.values()),
776 set(constants.OP_PRIO_SUBMIT_VALID))
777 self.assertEqual(list(value for _, value in cli._PRIORITY_NAMES),
778 sorted(constants.OP_PRIO_SUBMIT_VALID, reverse=True))
781 class TestParseNicOption(unittest.TestCase):
783 self.assertEqual(cli.ParseNicOption([("0", { "link": "eth0", })]),
784 [{ "link": "eth0", }])
785 self.assertEqual(cli.ParseNicOption([("5", { "ip": "192.0.2.7", })]),
786 [{}, {}, {}, {}, {}, { "ip": "192.0.2.7", }])
788 def testErrors(self):
789 for i in [None, "", "abc", "zero", "Hello World", "\0", []]:
790 self.assertRaises(errors.OpPrereqError, cli.ParseNicOption,
791 [(i, { "link": "eth0", })])
792 self.assertRaises(errors.OpPrereqError, cli.ParseNicOption,
795 self.assertRaises(errors.TypeEnforcementError, cli.ParseNicOption,
796 [(0, { True: False, })])
798 self.assertRaises(errors.TypeEnforcementError, cli.ParseNicOption,
799 [(3, { "mode": [], })])
802 class TestFormatResultError(unittest.TestCase):
803 def testNormal(self):
804 for verbose in [False, True]:
805 self.assertRaises(AssertionError, cli.FormatResultError,
806 constants.RS_NORMAL, verbose)
808 def testUnknown(self):
809 for verbose in [False, True]:
810 self.assertRaises(NotImplementedError, cli.FormatResultError,
811 "#some!other!status#", verbose)
814 for status in constants.RS_ALL:
815 if status == constants.RS_NORMAL:
818 self.assertNotEqual(cli.FormatResultError(status, False),
819 cli.FormatResultError(status, True))
821 result = cli.FormatResultError(status, True)
822 self.assertTrue(result.startswith("("))
823 self.assertTrue(result.endswith(")"))
826 class TestGetOnlineNodes(unittest.TestCase):
831 def AddQueryResult(self, *args):
832 self._query.append(args)
834 def CountPending(self):
835 return len(self._query)
837 def Query(self, res, fields, qfilter):
838 if res != constants.QR_NODE:
839 raise Exception("Querying wrong resource")
841 (exp_fields, check_filter, result) = self._query.pop(0)
843 if exp_fields != fields:
844 raise Exception("Expected fields %s, got %s" % (exp_fields, fields))
846 if not (qfilter is None or check_filter(qfilter)):
847 raise Exception("Filter doesn't match expectations")
849 return objects.QueryResponse(fields=None, data=result)
852 cl = self._FakeClient()
854 cl.AddQueryResult(["name", "offline", "sip"], None, [])
855 self.assertEqual(cli.GetOnlineNodes(None, cl=cl), [])
856 self.assertEqual(cl.CountPending(), 0)
858 def testNoSpecialFilter(self):
859 cl = self._FakeClient()
861 cl.AddQueryResult(["name", "offline", "sip"], None, [
862 [(constants.RS_NORMAL, "master.example.com"),
863 (constants.RS_NORMAL, False),
864 (constants.RS_NORMAL, "192.0.2.1")],
865 [(constants.RS_NORMAL, "node2.example.com"),
866 (constants.RS_NORMAL, False),
867 (constants.RS_NORMAL, "192.0.2.2")],
869 self.assertEqual(cli.GetOnlineNodes(None, cl=cl),
870 ["master.example.com", "node2.example.com"])
871 self.assertEqual(cl.CountPending(), 0)
873 def testNoMaster(self):
874 cl = self._FakeClient()
876 def _CheckFilter(qfilter):
877 self.assertEqual(qfilter, [qlang.OP_NOT, [qlang.OP_TRUE, "master"]])
880 cl.AddQueryResult(["name", "offline", "sip"], _CheckFilter, [
881 [(constants.RS_NORMAL, "node2.example.com"),
882 (constants.RS_NORMAL, False),
883 (constants.RS_NORMAL, "192.0.2.2")],
885 self.assertEqual(cli.GetOnlineNodes(None, cl=cl, filter_master=True),
886 ["node2.example.com"])
887 self.assertEqual(cl.CountPending(), 0)
889 def testSecondaryIpAddress(self):
890 cl = self._FakeClient()
892 cl.AddQueryResult(["name", "offline", "sip"], None, [
893 [(constants.RS_NORMAL, "master.example.com"),
894 (constants.RS_NORMAL, False),
895 (constants.RS_NORMAL, "192.0.2.1")],
896 [(constants.RS_NORMAL, "node2.example.com"),
897 (constants.RS_NORMAL, False),
898 (constants.RS_NORMAL, "192.0.2.2")],
900 self.assertEqual(cli.GetOnlineNodes(None, cl=cl, secondary_ips=True),
901 ["192.0.2.1", "192.0.2.2"])
902 self.assertEqual(cl.CountPending(), 0)
904 def testNoMasterFilterNodeName(self):
905 cl = self._FakeClient()
907 def _CheckFilter(qfilter):
908 self.assertEqual(qfilter,
910 [qlang.OP_OR] + [[qlang.OP_EQUAL, "name", name]
911 for name in ["node2", "node3"]],
912 [qlang.OP_NOT, [qlang.OP_TRUE, "master"]]])
915 cl.AddQueryResult(["name", "offline", "sip"], _CheckFilter, [
916 [(constants.RS_NORMAL, "node2.example.com"),
917 (constants.RS_NORMAL, False),
918 (constants.RS_NORMAL, "192.0.2.12")],
919 [(constants.RS_NORMAL, "node3.example.com"),
920 (constants.RS_NORMAL, False),
921 (constants.RS_NORMAL, "192.0.2.13")],
923 self.assertEqual(cli.GetOnlineNodes(["node2", "node3"], cl=cl,
924 secondary_ips=True, filter_master=True),
925 ["192.0.2.12", "192.0.2.13"])
926 self.assertEqual(cl.CountPending(), 0)
928 def testOfflineNodes(self):
929 cl = self._FakeClient()
931 cl.AddQueryResult(["name", "offline", "sip"], None, [
932 [(constants.RS_NORMAL, "master.example.com"),
933 (constants.RS_NORMAL, False),
934 (constants.RS_NORMAL, "192.0.2.1")],
935 [(constants.RS_NORMAL, "node2.example.com"),
936 (constants.RS_NORMAL, True),
937 (constants.RS_NORMAL, "192.0.2.2")],
938 [(constants.RS_NORMAL, "node3.example.com"),
939 (constants.RS_NORMAL, True),
940 (constants.RS_NORMAL, "192.0.2.3")],
942 self.assertEqual(cli.GetOnlineNodes(None, cl=cl, nowarn=True),
943 ["master.example.com"])
944 self.assertEqual(cl.CountPending(), 0)
946 def testNodeGroup(self):
947 cl = self._FakeClient()
949 def _CheckFilter(qfilter):
950 self.assertEqual(qfilter,
951 [qlang.OP_OR, [qlang.OP_EQUAL, "group", "foobar"],
952 [qlang.OP_EQUAL, "group.uuid", "foobar"]])
955 cl.AddQueryResult(["name", "offline", "sip"], _CheckFilter, [
956 [(constants.RS_NORMAL, "master.example.com"),
957 (constants.RS_NORMAL, False),
958 (constants.RS_NORMAL, "192.0.2.1")],
959 [(constants.RS_NORMAL, "node3.example.com"),
960 (constants.RS_NORMAL, False),
961 (constants.RS_NORMAL, "192.0.2.3")],
963 self.assertEqual(cli.GetOnlineNodes(None, cl=cl, nodegroup="foobar"),
964 ["master.example.com", "node3.example.com"])
965 self.assertEqual(cl.CountPending(), 0)
968 class TestFormatTimestamp(unittest.TestCase):
970 self.assertEqual(cli.FormatTimestamp((0, 1)),
971 time.strftime("%F %T", time.localtime(0)) + ".000001")
972 self.assertEqual(cli.FormatTimestamp((1332944009, 17376)),
973 (time.strftime("%F %T", time.localtime(1332944009)) +
977 for i in [0, [], {}, "", [1]]:
978 self.assertEqual(cli.FormatTimestamp(i), "?")
981 class TestFormatUsage(unittest.TestCase):
983 binary = "gnt-unittest"
986 (NotImplemented, NotImplemented, NotImplemented, NotImplemented,
989 (NotImplemented, NotImplemented, NotImplemented, NotImplemented,
990 "Hello World," * 10),
992 (NotImplemented, NotImplemented, NotImplemented, NotImplemented,
993 "Another description"),
996 self.assertEqual(list(cli._FormatUsage(binary, commands)), [
997 "Usage: gnt-unittest {command} [options...] [argument...]",
998 "gnt-unittest <command> --help to see details, or man gnt-unittest",
1001 (" bbb - Hello World,Hello World,Hello World,Hello World,Hello"
1003 " World,Hello World,Hello World,Hello World,Hello World,",
1004 " cmdA - description of A",
1005 " longname - Another description",
1010 class TestParseArgs(unittest.TestCase):
1011 def testNoArguments(self):
1012 for argv in [[], ["gnt-unittest"]]:
1014 cli._ParseArgs("gnt-unittest", argv, {}, {}, set())
1015 except cli._ShowUsage, err:
1016 self.assertTrue(err.exit_error)
1018 self.fail("Did not raise exception")
1020 def testVersion(self):
1021 for argv in [["test", "--version"], ["test", "--version", "somethingelse"]]:
1023 cli._ParseArgs("test", argv, {}, {}, set())
1024 except cli._ShowVersion:
1027 self.fail("Did not raise exception")
1030 for argv in [["test", "--help"], ["test", "--help", "somethingelse"]]:
1032 cli._ParseArgs("test", argv, {}, {}, set())
1033 except cli._ShowUsage, err:
1034 self.assertFalse(err.exit_error)
1036 self.fail("Did not raise exception")
1038 def testUnknownCommandOrAlias(self):
1039 for argv in [["test", "list"], ["test", "somethingelse", "--help"]]:
1041 cli._ParseArgs("test", argv, {}, {}, set())
1042 except cli._ShowUsage, err:
1043 self.assertTrue(err.exit_error)
1045 self.fail("Did not raise exception")
1047 def testInvalidAliasList(self):
1049 "list": NotImplemented,
1050 "foo": NotImplemented,
1053 "list": NotImplemented,
1054 "foo": NotImplemented,
1056 assert sorted(cmd.keys()) == sorted(aliases.keys())
1057 self.assertRaises(AssertionError, cli._ParseArgs, "test",
1058 ["test", "list"], cmd, aliases, set())
1060 def testAliasForNonExistantCommand(self):
1063 "list": NotImplemented,
1065 self.assertRaises(errors.ProgrammerError, cli._ParseArgs, "test",
1066 ["test", "list"], cmd, aliases, set())
1069 class TestQftNames(unittest.TestCase):
1070 def testComplete(self):
1071 self.assertEqual(frozenset(cli._QFT_NAMES), constants.QFT_ALL)
1073 def testUnique(self):
1074 lcnames = map(lambda s: s.lower(), cli._QFT_NAMES.values())
1075 self.assertFalse(utils.FindDuplicates(lcnames))
1077 def testUppercase(self):
1078 for name in cli._QFT_NAMES.values():
1079 self.assertEqual(name[0], name[0].upper())
1082 class TestFieldDescValues(unittest.TestCase):
1083 def testKnownKind(self):
1084 fdef = objects.QueryFieldDefinition(name="aname",
1086 kind=constants.QFT_TEXT,
1088 self.assertEqual(cli._FieldDescValues(fdef),
1089 ["aname", "Text", "Atitle", "aaa doc aaa"])
1091 def testUnknownKind(self):
1094 self.assertFalse(kind in constants.QFT_ALL)
1095 self.assertFalse(kind in cli._QFT_NAMES)
1097 fdef = objects.QueryFieldDefinition(name="zname", title="Ztitle",
1098 kind=kind, doc="zzz doc zzz")
1099 self.assertEqual(cli._FieldDescValues(fdef),
1100 ["zname", kind, "Ztitle", "zzz doc zzz"])
1103 class TestSerializeGenericInfo(unittest.TestCase):
1104 """Test case for cli._SerializeGenericInfo"""
1105 def _RunTest(self, data, expected):
1107 cli._SerializeGenericInfo(buf, data, 0)
1108 self.assertEqual(buf.getvalue(), expected)
1110 def testSimple(self):
1115 (["1", "2", "3"], "- 1\n- 2\n- 3\n"),
1116 ([("z", "26")], "z: 26\n"),
1117 ({"z": "26"}, "z: 26\n"),
1118 ([("z", "26"), ("a", "1")], "z: 26\na: 1\n"),
1119 ({"z": "26", "a": "1"}, "a: 1\nz: 26\n"),
1121 for (data, expected) in test_samples:
1122 self._RunTest(data, expected)
1124 def testLists(self):
1130 adict_exp = ("- aa: 11\n"
1138 anobj_exp = ("- zz: 11\n"
1141 alist = ["aa", "cc", "bb"]
1142 alist_exp = ("- - aa\n"
1150 for (base_data, base_expected) in test_samples:
1151 for k in range(1, 4):
1152 data = k * [base_data]
1153 expected = k * base_expected
1154 self._RunTest(data, expected)
1156 def testDictionaries(self):
1158 ("aaa", ["x", "y"]),
1168 expected = ("aaa: \n"
1177 self._RunTest(data, expected)
1178 self._RunTest(dict(data), expected)
1181 class TestCreateIPolicyFromOpts(unittest.TestCase):
1182 """Test case for cli.CreateIPolicyFromOpts."""
1184 # Policies are big, and we want to see the difference in case of an error
1187 def _RecursiveCheckMergedDicts(self, default_pol, diff_pol, merged_pol):
1188 self.assertTrue(type(default_pol) is dict)
1189 self.assertTrue(type(diff_pol) is dict)
1190 self.assertTrue(type(merged_pol) is dict)
1191 self.assertEqual(frozenset(default_pol.keys()),
1192 frozenset(merged_pol.keys()))
1193 for (key, val) in merged_pol.items():
1195 if type(val) is dict:
1196 self._RecursiveCheckMergedDicts(default_pol[key], diff_pol[key], val)
1198 self.assertEqual(val, diff_pol[key])
1200 self.assertEqual(val, default_pol[key])
1202 def testClusterPolicy(self):
1203 pol0 = cli.CreateIPolicyFromOpts(
1205 ispecs_cpu_count={},
1206 ispecs_disk_count={},
1207 ispecs_disk_size={},
1208 ispecs_nic_count={},
1209 ipolicy_disk_templates=None,
1210 ipolicy_vcpu_ratio=None,
1211 ipolicy_spindle_ratio=None,
1214 self.assertEqual(pol0, constants.IPOLICY_DEFAULTS)
1217 constants.ISPECS_MINMAX: {
1218 constants.ISPECS_MIN: {
1219 constants.ISPEC_CPU_COUNT: 2,
1220 constants.ISPEC_DISK_COUNT: 1,
1222 constants.ISPECS_MAX: {
1223 constants.ISPEC_MEM_SIZE: 12*1024,
1224 constants.ISPEC_DISK_COUNT: 2,
1227 constants.ISPECS_STD: {
1228 constants.ISPEC_CPU_COUNT: 2,
1229 constants.ISPEC_DISK_COUNT: 2,
1231 constants.IPOLICY_VCPU_RATIO: 3.1,
1233 pol1 = cli.CreateIPolicyFromOpts(
1234 ispecs_mem_size={"max": "12g"},
1235 ispecs_cpu_count={"min": 2, "std": 2},
1236 ispecs_disk_count={"min": 1, "max": 2, "std": 2},
1237 ispecs_disk_size={},
1238 ispecs_nic_count={},
1239 ipolicy_disk_templates=None,
1240 ipolicy_vcpu_ratio=3.1,
1241 ipolicy_spindle_ratio=None,
1244 self._RecursiveCheckMergedDicts(constants.IPOLICY_DEFAULTS,
1248 constants.ISPECS_MINMAX: {
1249 constants.ISPECS_MIN: {
1250 constants.ISPEC_DISK_SIZE: 512,
1251 constants.ISPEC_NIC_COUNT: 2,
1253 constants.ISPECS_MAX: {
1254 constants.ISPEC_NIC_COUNT: 3,
1257 constants.ISPECS_STD: {
1258 constants.ISPEC_CPU_COUNT: 2,
1259 constants.ISPEC_NIC_COUNT: 3,
1261 constants.IPOLICY_SPINDLE_RATIO: 1.3,
1262 constants.IPOLICY_DTS: ["templates"],
1264 pol2 = cli.CreateIPolicyFromOpts(
1266 ispecs_cpu_count={"std": 2},
1267 ispecs_disk_count={},
1268 ispecs_disk_size={"min": "0.5g"},
1269 ispecs_nic_count={"min": 2, "max": 3, "std": 3},
1270 ipolicy_disk_templates=["templates"],
1271 ipolicy_vcpu_ratio=None,
1272 ipolicy_spindle_ratio=1.3,
1275 self._RecursiveCheckMergedDicts(constants.IPOLICY_DEFAULTS,
1278 for fill_all in [False, True]:
1280 constants.ISPECS_STD: {
1281 constants.ISPEC_CPU_COUNT: 2,
1282 constants.ISPEC_NIC_COUNT: 3,
1285 pol3 = cli.CreateIPolicyFromOpts(
1287 constants.ISPEC_CPU_COUNT: "2",
1288 constants.ISPEC_NIC_COUNT: "3",
1290 ipolicy_disk_templates=None,
1291 ipolicy_vcpu_ratio=None,
1292 ipolicy_spindle_ratio=None,
1296 self._RecursiveCheckMergedDicts(constants.IPOLICY_DEFAULTS,
1299 self.assertEqual(pol3, exp_pol3)
1301 def testPartialPolicy(self):
1302 exp_pol0 = objects.MakeEmptyIPolicy()
1303 pol0 = cli.CreateIPolicyFromOpts(
1306 ipolicy_disk_templates=None,
1307 ipolicy_vcpu_ratio=None,
1308 ipolicy_spindle_ratio=None,
1311 self.assertEqual(pol0, exp_pol0)
1314 constants.IPOLICY_VCPU_RATIO: 3.1,
1316 pol1 = cli.CreateIPolicyFromOpts(
1319 ipolicy_disk_templates=None,
1320 ipolicy_vcpu_ratio=3.1,
1321 ipolicy_spindle_ratio=None,
1324 self.assertEqual(pol1, exp_pol1)
1327 constants.IPOLICY_SPINDLE_RATIO: 1.3,
1328 constants.IPOLICY_DTS: ["templates"],
1330 pol2 = cli.CreateIPolicyFromOpts(
1333 ipolicy_disk_templates=["templates"],
1334 ipolicy_vcpu_ratio=None,
1335 ipolicy_spindle_ratio=1.3,
1338 self.assertEqual(pol2, exp_pol2)
1340 def _TestInvalidISpecs(self, minmax_ispecs, std_ispecs, fail=True):
1341 for fill_all in [False, True]:
1343 self.assertRaises((errors.OpPrereqError,
1344 errors.UnitParseError,
1345 errors.TypeEnforcementError),
1346 cli.CreateIPolicyFromOpts,
1347 minmax_ispecs=minmax_ispecs,
1348 std_ispecs=std_ispecs,
1351 cli.CreateIPolicyFromOpts(minmax_ispecs=minmax_ispecs,
1352 std_ispecs=std_ispecs,
1355 def testInvalidPolicies(self):
1356 self.assertRaises(AssertionError, cli.CreateIPolicyFromOpts,
1357 std_ispecs={constants.ISPEC_MEM_SIZE: 1024},
1358 ipolicy_disk_templates=None, ipolicy_vcpu_ratio=None,
1359 ipolicy_spindle_ratio=None, group_ipolicy=True)
1360 self.assertRaises(errors.OpPrereqError, cli.CreateIPolicyFromOpts,
1361 ispecs_mem_size={"wrong": "x"}, ispecs_cpu_count={},
1362 ispecs_disk_count={}, ispecs_disk_size={},
1363 ispecs_nic_count={}, ipolicy_disk_templates=None,
1364 ipolicy_vcpu_ratio=None, ipolicy_spindle_ratio=None,
1366 self.assertRaises(errors.TypeEnforcementError, cli.CreateIPolicyFromOpts,
1367 ispecs_mem_size={}, ispecs_cpu_count={"min": "default"},
1368 ispecs_disk_count={}, ispecs_disk_size={},
1369 ispecs_nic_count={}, ipolicy_disk_templates=None,
1370 ipolicy_vcpu_ratio=None, ipolicy_spindle_ratio=None,
1373 good_mmspecs = constants.ISPECS_MINMAX_DEFAULTS
1374 self._TestInvalidISpecs(good_mmspecs, None, fail=False)
1375 broken_mmspecs = copy.deepcopy(good_mmspecs)
1376 for key in constants.ISPECS_MINMAX_KEYS:
1377 for par in constants.ISPECS_PARAMETERS:
1378 old = broken_mmspecs[key][par]
1379 del broken_mmspecs[key][par]
1380 self._TestInvalidISpecs(broken_mmspecs, None)
1381 broken_mmspecs[key][par] = "invalid"
1382 self._TestInvalidISpecs(broken_mmspecs, None)
1383 broken_mmspecs[key][par] = old
1384 broken_mmspecs[key]["invalid_key"] = None
1385 self._TestInvalidISpecs(broken_mmspecs, None)
1386 del broken_mmspecs[key]["invalid_key"]
1387 assert broken_mmspecs == good_mmspecs
1389 good_stdspecs = constants.IPOLICY_DEFAULTS[constants.ISPECS_STD]
1390 self._TestInvalidISpecs(None, good_stdspecs, fail=False)
1391 broken_stdspecs = copy.deepcopy(good_stdspecs)
1392 for par in constants.ISPECS_PARAMETERS:
1393 old = broken_stdspecs[par]
1394 broken_stdspecs[par] = "invalid"
1395 self._TestInvalidISpecs(None, broken_stdspecs)
1396 broken_stdspecs[par] = old
1397 broken_stdspecs["invalid_key"] = None
1398 self._TestInvalidISpecs(None, broken_stdspecs)
1399 del broken_stdspecs["invalid_key"]
1400 assert broken_stdspecs == good_stdspecs
1402 def testAllowedValues(self):
1405 constants.ISPECS_MINMAX: allowedv,
1406 constants.IPOLICY_DTS: allowedv,
1407 constants.IPOLICY_VCPU_RATIO: allowedv,
1408 constants.IPOLICY_SPINDLE_RATIO: allowedv,
1410 pol1 = cli.CreateIPolicyFromOpts(minmax_ispecs={allowedv: {}},
1412 ipolicy_disk_templates=allowedv,
1413 ipolicy_vcpu_ratio=allowedv,
1414 ipolicy_spindle_ratio=allowedv,
1415 allowed_values=[allowedv])
1416 self.assertEqual(pol1, exp_pol1)
1419 def _ConvertSpecToStrings(spec):
1421 for (par, val) in spec.items():
1425 def _CheckNewStyleSpecsCall(self, exp_ipolicy, minmax_ispecs, std_ispecs,
1426 group_ipolicy, fill_all):
1427 ipolicy = cli.CreateIPolicyFromOpts(minmax_ispecs=minmax_ispecs,
1428 std_ispecs=std_ispecs,
1429 group_ipolicy=group_ipolicy,
1431 self.assertEqual(ipolicy, exp_ipolicy)
1433 def _TestFullISpecsInner(self, skel_exp_ipol, exp_minmax, exp_std,
1434 group_ipolicy, fill_all):
1435 exp_ipol = skel_exp_ipol.copy()
1436 if exp_minmax is not None:
1438 for (key, spec) in exp_minmax.items():
1439 minmax_ispecs[key] = self._ConvertSpecToStrings(spec)
1440 exp_ipol[constants.ISPECS_MINMAX] = exp_minmax
1442 minmax_ispecs = None
1443 if exp_std is not None:
1444 std_ispecs = self._ConvertSpecToStrings(exp_std)
1445 exp_ipol[constants.ISPECS_STD] = exp_std
1449 self._CheckNewStyleSpecsCall(exp_ipol, minmax_ispecs, std_ispecs,
1450 group_ipolicy, fill_all)
1452 for (key, spec) in minmax_ispecs.items():
1453 for par in [constants.ISPEC_MEM_SIZE, constants.ISPEC_DISK_SIZE]:
1456 self._CheckNewStyleSpecsCall(exp_ipol, minmax_ispecs, std_ispecs,
1457 group_ipolicy, fill_all)
1459 for par in [constants.ISPEC_MEM_SIZE, constants.ISPEC_DISK_SIZE]:
1460 if par in std_ispecs:
1461 std_ispecs[par] += "m"
1462 self._CheckNewStyleSpecsCall(exp_ipol, minmax_ispecs, std_ispecs,
1463 group_ipolicy, fill_all)
1465 def testFullISpecs(self):
1467 constants.ISPECS_MIN: {
1468 constants.ISPEC_MEM_SIZE: 512,
1469 constants.ISPEC_CPU_COUNT: 2,
1470 constants.ISPEC_DISK_COUNT: 2,
1471 constants.ISPEC_DISK_SIZE: 512,
1472 constants.ISPEC_NIC_COUNT: 2,
1473 constants.ISPEC_SPINDLE_USE: 2,
1475 constants.ISPECS_MAX: {
1476 constants.ISPEC_MEM_SIZE: 768*1024,
1477 constants.ISPEC_CPU_COUNT: 7,
1478 constants.ISPEC_DISK_COUNT: 6,
1479 constants.ISPEC_DISK_SIZE: 2048*1024,
1480 constants.ISPEC_NIC_COUNT: 3,
1481 constants.ISPEC_SPINDLE_USE: 1,
1485 constants.ISPEC_MEM_SIZE: 768*1024,
1486 constants.ISPEC_CPU_COUNT: 7,
1487 constants.ISPEC_DISK_COUNT: 6,
1488 constants.ISPEC_DISK_SIZE: 2048*1024,
1489 constants.ISPEC_NIC_COUNT: 3,
1490 constants.ISPEC_SPINDLE_USE: 1,
1492 for fill_all in [False, True]:
1494 skel_ipolicy = constants.IPOLICY_DEFAULTS
1497 self._TestFullISpecsInner(skel_ipolicy, exp_minmax1, exp_std1,
1499 self._TestFullISpecsInner(skel_ipolicy, None, exp_std1,
1501 self._TestFullISpecsInner(skel_ipolicy, exp_minmax1, None,
1505 class TestPrintIPolicyCommand(unittest.TestCase):
1506 """Test case for cli.PrintIPolicyCommand"""
1511 _SPECS1_STR = "par1=42,par2=xyz"
1514 "another_param": 101,
1516 _SPECS2_STR = "another_param=101,param=10"
1518 def _CheckPrintIPolicyCommand(self, ipolicy, isgroup, expected):
1520 cli.PrintIPolicyCommand(buf, ipolicy, isgroup)
1521 self.assertEqual(buf.getvalue(), expected)
1523 def testIgnoreStdForGroup(self):
1524 self._CheckPrintIPolicyCommand({"std": self._SPECS1}, True, "")
1526 def testIgnoreEmpty(self):
1536 "min": self._SPECS1,
1540 for pol in policies:
1541 self._CheckPrintIPolicyCommand(pol, False, "")
1543 def testFullPolicies(self):
1545 ({"std": self._SPECS1},
1546 " %s %s" % (cli.IPOLICY_STD_SPECS_STR, self._SPECS1_STR)),
1548 "min": self._SPECS1,
1549 "max": self._SPECS2,
1551 " %s min:%s/max:%s" % (cli.IPOLICY_BOUNDS_SPECS_STR,
1552 self._SPECS1_STR, self._SPECS2_STR)),
1554 for (pol, exp) in cases:
1555 self._CheckPrintIPolicyCommand(pol, False, exp)
1558 if __name__ == "__main__":
1559 testutils.GanetiTestProgram()