X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/0a9a0e5a6641c44e13b383e6425a129c0b8cb4cb..796b51522c2cf68ea6b6a2944a1a2373d51d72cb:/test/ganeti.utils.algo_unittest.py diff --git a/test/ganeti.utils.algo_unittest.py b/test/ganeti.utils.algo_unittest.py index 96b4ff5..73475ec 100755 --- a/test/ganeti.utils.algo_unittest.py +++ b/test/ganeti.utils.algo_unittest.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2011 Google Inc. +# Copyright (C) 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -26,6 +26,7 @@ import random import operator from ganeti import constants +from ganeti import compat from ganeti.utils import algo import testutils @@ -236,6 +237,16 @@ class TestInvertDict(unittest.TestCase): { 1: "foo", 2: "bar", 5: "baz"}) +class TestInsertAtPos(unittest.TestCase): + def test(self): + a = [1, 5, 6] + b = [2, 3, 4] + self.assertEqual(algo.InsertAtPos(a, 1, b), [1, 2, 3, 4, 5, 6]) + self.assertEqual(algo.InsertAtPos(a, 0, b), b + a) + self.assertEqual(algo.InsertAtPos(a, len(a), b), a + b) + self.assertEqual(algo.InsertAtPos(a, 2, b), [1, 5, 2, 3, 4, 6]) + + class TimeMock: def __init__(self, values): self.values = values @@ -272,5 +283,90 @@ class TestRunningTimeout(unittest.TestCase): self.assertRaises(ValueError, algo.RunningTimeout, -1.0, True) +class TestJoinDisjointDicts(unittest.TestCase): + def setUp(self): + self.non_empty_dict = {"a": 1, "b": 2} + self.empty_dict = {} + + def testWithEmptyDicts(self): + self.assertEqual(self.empty_dict, algo.JoinDisjointDicts(self.empty_dict, + self.empty_dict)) + self.assertEqual(self.non_empty_dict, algo.JoinDisjointDicts( + self.empty_dict, self.non_empty_dict)) + self.assertEqual(self.non_empty_dict, algo.JoinDisjointDicts( + self.non_empty_dict, self.empty_dict)) + + def testNonDisjoint(self): + self.assertRaises(AssertionError, algo.JoinDisjointDicts, + self.non_empty_dict, self.non_empty_dict) + + def testCommonCase(self): + dict_a = {"TEST1": 1, "TEST2": 2} + dict_b = {"TEST3": 3, "TEST4": 4} + + result = dict_a.copy() + result.update(dict_b) + + self.assertEqual(result, algo.JoinDisjointDicts(dict_a, dict_b)) + self.assertEqual(result, algo.JoinDisjointDicts(dict_b, dict_a)) + + +class TestSequenceToDict(unittest.TestCase): + def testEmpty(self): + self.assertEqual(algo.SequenceToDict([]), {}) + self.assertEqual(algo.SequenceToDict({}), {}) + + def testSimple(self): + data = [(i, str(i), "test%s" % i) for i in range(391)] + self.assertEqual(algo.SequenceToDict(data), + dict((i, (i, str(i), "test%s" % i)) + for i in range(391))) + + def testCustomKey(self): + data = [(i, hex(i), "test%s" % i) for i in range(100)] + self.assertEqual(algo.SequenceToDict(data, key=compat.snd), + dict((hex(i), (i, hex(i), "test%s" % i)) + for i in range(100))) + self.assertEqual(algo.SequenceToDict(data, + key=lambda (a, b, val): hash(val)), + dict((hash("test%s" % i), (i, hex(i), "test%s" % i)) + for i in range(100))) + + def testDuplicate(self): + self.assertRaises(ValueError, algo.SequenceToDict, + [(0, 0), (0, 0)]) + self.assertRaises(ValueError, algo.SequenceToDict, + [(i, ) for i in range(200)] + [(10, )]) + + +class TestFlatToDict(unittest.TestCase): + def testNormal(self): + data = [ + ("lv/xenvg", {"foo": "bar", "bar": "baz"}), + ("lv/xenfoo", {"foo": "bar", "baz": "blubb"}), + ("san/foo", {"ip": "127.0.0.1", "port": 1337}), + ("san/blubb/blibb", 54), + ] + reference = { + "lv": { + "xenvg": {"foo": "bar", "bar": "baz"}, + "xenfoo": {"foo": "bar", "baz": "blubb"}, + }, + "san": { + "foo": {"ip": "127.0.0.1", "port": 1337}, + "blubb": {"blibb": 54}, + }, + } + self.assertEqual(algo.FlatToDict(data), reference) + + def testUnlikeDepth(self): + data = [ + ("san/foo", {"ip": "127.0.0.1", "port": 1337}), + ("san/foo/blubb", 23), # Another foo entry under san + ("san/blubb/blibb", 54), + ] + self.assertRaises(AssertionError, algo.FlatToDict, data) + + if __name__ == "__main__": testutils.GanetiTestProgram()