X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/f2c6673df4ecedbbaeb67a5185e7fecef34d66dd..48aaca91efa214b37dba94f28582be73f3c90dbd:/test/ganeti.cli_unittest.py diff --git a/test/ganeti.cli_unittest.py b/test/ganeti.cli_unittest.py index a587a17..5cb4869 100755 --- a/test/ganeti.cli_unittest.py +++ b/test/ganeti.cli_unittest.py @@ -22,6 +22,7 @@ """Script for unittesting the cli module""" import unittest +import time from cStringIO import StringIO import ganeti @@ -32,6 +33,7 @@ from ganeti import cli from ganeti import errors from ganeti import utils from ganeti import objects +from ganeti import qlang from ganeti.errors import OpPrereqError, ParameterError @@ -41,14 +43,14 @@ class TestParseTimespec(unittest.TestCase): def testValidTimes(self): """Test valid timespecs""" test_data = [ - ('1s', 1), - ('1', 1), - ('1m', 60), - ('1h', 60 * 60), - ('1d', 60 * 60 * 24), - ('1w', 60 * 60 * 24 * 7), - ('4h', 4 * 60 * 60), - ('61m', 61 * 60), + ("1s", 1), + ("1", 1), + ("1m", 60), + ("1h", 60 * 60), + ("1d", 60 * 60 * 24), + ("1w", 60 * 60 * 24 * 7), + ("4h", 4 * 60 * 60), + ("61m", 61 * 60), ] for value, expected_result in test_data: self.failUnlessEqual(cli.ParseTimespec(value), expected_result) @@ -56,10 +58,10 @@ class TestParseTimespec(unittest.TestCase): def testInvalidTime(self): """Test invalid timespecs""" test_data = [ - '1y', - '', - 'aaa', - 's', + "1y", + "", + "aaa", + "s", ] for value in test_data: self.failUnlessRaises(OpPrereqError, cli.ParseTimespec, value) @@ -84,6 +86,7 @@ class TestSplitKeyVal(unittest.TestCase): """Test how we handle splitting an empty string""" self.failUnlessEqual(cli._SplitKeyVal("option", ""), {}) + class TestIdentKeyVal(unittest.TestCase): """Testing case for cli.check_ident_key_val""" @@ -101,6 +104,17 @@ class TestIdentKeyVal(unittest.TestCase): self.assertEqual(cikv("-foo"), ("foo", None)) self.assertRaises(ParameterError, cikv, "-foo:a=c") + # Check negative numbers + self.assertEqual(cikv("-1:remove"), ("-1", { + "remove": True, + })) + self.assertEqual(cikv("-29447:add,size=4G"), ("-29447", { + "add": True, + "size": "4G", + })) + for i in ["-:", "-"]: + self.assertEqual(cikv(i), ("", None)) + class TestToStream(unittest.TestCase): """Test the ToStream functions""" @@ -114,7 +128,7 @@ class TestToStream(unittest.TestCase): ]: buf = StringIO() cli._ToStream(buf, data) - self.failUnlessEqual(buf.getvalue(), data+'\n') + self.failUnlessEqual(buf.getvalue(), data + "\n") def testParams(self): buf = StringIO() @@ -753,5 +767,248 @@ class TestFormatResultError(unittest.TestCase): self.assertTrue(result.endswith(")")) -if __name__ == '__main__': +class TestGetOnlineNodes(unittest.TestCase): + class _FakeClient: + def __init__(self): + self._query = [] + + def AddQueryResult(self, *args): + self._query.append(args) + + def CountPending(self): + return len(self._query) + + def Query(self, res, fields, qfilter): + if res != constants.QR_NODE: + raise Exception("Querying wrong resource") + + (exp_fields, check_filter, result) = self._query.pop(0) + + if exp_fields != fields: + raise Exception("Expected fields %s, got %s" % (exp_fields, fields)) + + if not (qfilter is None or check_filter(qfilter)): + raise Exception("Filter doesn't match expectations") + + return objects.QueryResponse(fields=None, data=result) + + def testEmpty(self): + cl = self._FakeClient() + + cl.AddQueryResult(["name", "offline", "sip"], None, []) + self.assertEqual(cli.GetOnlineNodes(None, cl=cl), []) + self.assertEqual(cl.CountPending(), 0) + + def testNoSpecialFilter(self): + cl = self._FakeClient() + + cl.AddQueryResult(["name", "offline", "sip"], None, [ + [(constants.RS_NORMAL, "master.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.1")], + [(constants.RS_NORMAL, "node2.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.2")], + ]) + self.assertEqual(cli.GetOnlineNodes(None, cl=cl), + ["master.example.com", "node2.example.com"]) + self.assertEqual(cl.CountPending(), 0) + + def testNoMaster(self): + cl = self._FakeClient() + + def _CheckFilter(qfilter): + self.assertEqual(qfilter, [qlang.OP_NOT, [qlang.OP_TRUE, "master"]]) + return True + + cl.AddQueryResult(["name", "offline", "sip"], _CheckFilter, [ + [(constants.RS_NORMAL, "node2.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.2")], + ]) + self.assertEqual(cli.GetOnlineNodes(None, cl=cl, filter_master=True), + ["node2.example.com"]) + self.assertEqual(cl.CountPending(), 0) + + def testSecondaryIpAddress(self): + cl = self._FakeClient() + + cl.AddQueryResult(["name", "offline", "sip"], None, [ + [(constants.RS_NORMAL, "master.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.1")], + [(constants.RS_NORMAL, "node2.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.2")], + ]) + self.assertEqual(cli.GetOnlineNodes(None, cl=cl, secondary_ips=True), + ["192.0.2.1", "192.0.2.2"]) + self.assertEqual(cl.CountPending(), 0) + + def testNoMasterFilterNodeName(self): + cl = self._FakeClient() + + def _CheckFilter(qfilter): + self.assertEqual(qfilter, + [qlang.OP_AND, + [qlang.OP_OR] + [[qlang.OP_EQUAL, "name", name] + for name in ["node2", "node3"]], + [qlang.OP_NOT, [qlang.OP_TRUE, "master"]]]) + return True + + cl.AddQueryResult(["name", "offline", "sip"], _CheckFilter, [ + [(constants.RS_NORMAL, "node2.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.12")], + [(constants.RS_NORMAL, "node3.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.13")], + ]) + self.assertEqual(cli.GetOnlineNodes(["node2", "node3"], cl=cl, + secondary_ips=True, filter_master=True), + ["192.0.2.12", "192.0.2.13"]) + self.assertEqual(cl.CountPending(), 0) + + def testOfflineNodes(self): + cl = self._FakeClient() + + cl.AddQueryResult(["name", "offline", "sip"], None, [ + [(constants.RS_NORMAL, "master.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.1")], + [(constants.RS_NORMAL, "node2.example.com"), + (constants.RS_NORMAL, True), + (constants.RS_NORMAL, "192.0.2.2")], + [(constants.RS_NORMAL, "node3.example.com"), + (constants.RS_NORMAL, True), + (constants.RS_NORMAL, "192.0.2.3")], + ]) + self.assertEqual(cli.GetOnlineNodes(None, cl=cl, nowarn=True), + ["master.example.com"]) + self.assertEqual(cl.CountPending(), 0) + + def testNodeGroup(self): + cl = self._FakeClient() + + def _CheckFilter(qfilter): + self.assertEqual(qfilter, + [qlang.OP_OR, [qlang.OP_EQUAL, "group", "foobar"], + [qlang.OP_EQUAL, "group.uuid", "foobar"]]) + return True + + cl.AddQueryResult(["name", "offline", "sip"], _CheckFilter, [ + [(constants.RS_NORMAL, "master.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.1")], + [(constants.RS_NORMAL, "node3.example.com"), + (constants.RS_NORMAL, False), + (constants.RS_NORMAL, "192.0.2.3")], + ]) + self.assertEqual(cli.GetOnlineNodes(None, cl=cl, nodegroup="foobar"), + ["master.example.com", "node3.example.com"]) + self.assertEqual(cl.CountPending(), 0) + + +class TestFormatTimestamp(unittest.TestCase): + def testGood(self): + self.assertEqual(cli.FormatTimestamp((0, 1)), + time.strftime("%F %T", time.localtime(0)) + ".000001") + self.assertEqual(cli.FormatTimestamp((1332944009, 17376)), + (time.strftime("%F %T", time.localtime(1332944009)) + + ".017376")) + + def testWrong(self): + for i in [0, [], {}, "", [1]]: + self.assertEqual(cli.FormatTimestamp(i), "?") + + +class TestFormatUsage(unittest.TestCase): + def test(self): + binary = "gnt-unittest" + commands = { + "cmdA": + (NotImplemented, NotImplemented, NotImplemented, NotImplemented, + "description of A"), + "bbb": + (NotImplemented, NotImplemented, NotImplemented, NotImplemented, + "Hello World," * 10), + "longname": + (NotImplemented, NotImplemented, NotImplemented, NotImplemented, + "Another description"), + } + + self.assertEqual(list(cli._FormatUsage(binary, commands)), [ + "Usage: gnt-unittest {command} [options...] [argument...]", + "gnt-unittest --help to see details, or man gnt-unittest", + "", + "Commands:", + (" bbb - Hello World,Hello World,Hello World,Hello World,Hello" + " World,Hello"), + " World,Hello World,Hello World,Hello World,Hello World,", + " cmdA - description of A", + " longname - Another description", + "", + ]) + + +class TestParseArgs(unittest.TestCase): + def testNoArguments(self): + for argv in [[], ["gnt-unittest"]]: + try: + cli._ParseArgs("gnt-unittest", argv, {}, {}, set()) + except cli._ShowUsage, err: + self.assertTrue(err.exit_error) + else: + self.fail("Did not raise exception") + + def testVersion(self): + for argv in [["test", "--version"], ["test", "--version", "somethingelse"]]: + try: + cli._ParseArgs("test", argv, {}, {}, set()) + except cli._ShowVersion: + pass + else: + self.fail("Did not raise exception") + + def testHelp(self): + for argv in [["test", "--help"], ["test", "--help", "somethingelse"]]: + try: + cli._ParseArgs("test", argv, {}, {}, set()) + except cli._ShowUsage, err: + self.assertFalse(err.exit_error) + else: + self.fail("Did not raise exception") + + def testUnknownCommandOrAlias(self): + for argv in [["test", "list"], ["test", "somethingelse", "--help"]]: + try: + cli._ParseArgs("test", argv, {}, {}, set()) + except cli._ShowUsage, err: + self.assertTrue(err.exit_error) + else: + self.fail("Did not raise exception") + + def testInvalidAliasList(self): + cmd = { + "list": NotImplemented, + "foo": NotImplemented, + } + aliases = { + "list": NotImplemented, + "foo": NotImplemented, + } + assert sorted(cmd.keys()) == sorted(aliases.keys()) + self.assertRaises(AssertionError, cli._ParseArgs, "test", + ["test", "list"], cmd, aliases, set()) + + def testAliasForNonExistantCommand(self): + cmd = {} + aliases = { + "list": NotImplemented, + } + self.assertRaises(errors.ProgrammerError, cli._ParseArgs, "test", + ["test", "list"], cmd, aliases, set()) + + +if __name__ == "__main__": testutils.GanetiTestProgram()