4 # Copyright (C) 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the cmdlib module"""
31 from ganeti import constants
32 from ganeti import mcpu
33 from ganeti import cmdlib
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import utils
37 from ganeti import luxi
39 from ganeti import objects
45 class TestCertVerification(testutils.GanetiTestCase):
47 testutils.GanetiTestCase.setUp(self)
49 self.tmpdir = tempfile.mkdtemp()
52 shutil.rmtree(self.tmpdir)
54 def testVerifyCertificate(self):
55 cmdlib._VerifyCertificate(self._TestDataFilename("cert1.pem"))
57 nonexist_filename = os.path.join(self.tmpdir, "does-not-exist")
59 (errcode, msg) = cmdlib._VerifyCertificate(nonexist_filename)
60 self.assertEqual(errcode, cmdlib.LUClusterVerify.ETYPE_ERROR)
62 # Try to load non-certificate file
63 invalid_cert = self._TestDataFilename("bdev-net.txt")
64 (errcode, msg) = cmdlib._VerifyCertificate(invalid_cert)
65 self.assertEqual(errcode, cmdlib.LUClusterVerify.ETYPE_ERROR)
68 class TestOpcodeParams(testutils.GanetiTestCase):
69 def testParamsStructures(self):
70 for op in sorted(mcpu.Processor.DISPATCH_TABLE):
71 lu = mcpu.Processor.DISPATCH_TABLE[op]
73 self.failIf(hasattr(lu, "_OP_REQP"),
74 msg=("LU '%s' has old-style _OP_REQP" % lu_name))
75 self.failIf(hasattr(lu, "_OP_DEFS"),
76 msg=("LU '%s' has old-style _OP_DEFS" % lu_name))
77 self.failIf(hasattr(lu, "_OP_PARAMS"),
78 msg=("LU '%s' has old-style _OP_PARAMS" % lu_name))
81 class TestIAllocatorChecks(testutils.GanetiTestCase):
82 def testFunction(self):
84 def __init__(self, opcode):
85 self.cfg = mocks.FakeConfig()
88 class TestOpcode(opcodes.OpCode):
91 ("iallocator", None, ht.NoType),
92 ("node", None, ht.NoType),
95 default_iallocator = mocks.FakeConfig().GetDefaultIAllocator()
96 other_iallocator = default_iallocator + "_not"
101 c_i = lambda: cmdlib._CheckIAllocatorOrNode(lu, "iallocator", "node")
103 # Neither node nor iallocator given
107 self.assertEqual(lu.op.iallocator, default_iallocator)
108 self.assertEqual(lu.op.node, None)
110 # Both, iallocator and node given
111 op.iallocator = "test"
113 self.assertRaises(errors.OpPrereqError, c_i)
115 # Only iallocator given
116 op.iallocator = other_iallocator
119 self.assertEqual(lu.op.iallocator, other_iallocator)
120 self.assertEqual(lu.op.node, None)
126 self.assertEqual(lu.op.iallocator, None)
127 self.assertEqual(lu.op.node, "node")
129 # No node, iallocator or default iallocator
132 lu.cfg.GetDefaultIAllocator = lambda: None
133 self.assertRaises(errors.OpPrereqError, c_i)
136 class TestLUTestJobqueue(unittest.TestCase):
138 self.assert_(cmdlib.LUTestJobqueue._CLIENT_CONNECT_TIMEOUT <
139 (luxi.WFJC_TIMEOUT * 0.75),
140 msg=("Client timeout too high, might not notice bugs"
141 " in WaitForJobChange"))
144 class TestLUQuery(unittest.TestCase):
146 self.assertEqual(sorted(cmdlib._QUERY_IMPL.keys()),
147 sorted(constants.QR_OP_QUERY))
149 assert constants.QR_NODE in constants.QR_OP_QUERY
150 assert constants.QR_INSTANCE in constants.QR_OP_QUERY
152 for i in constants.QR_OP_QUERY:
153 self.assert_(cmdlib._GetQueryImplementation(i))
155 self.assertRaises(errors.OpPrereqError, cmdlib._GetQueryImplementation, "")
156 self.assertRaises(errors.OpPrereqError, cmdlib._GetQueryImplementation,
160 class TestLUGroupAssignNodes(unittest.TestCase):
162 def testCheckAssignmentForSplitInstances(self):
163 node_data = dict((name, objects.Node(name=name, group=group))
164 for (name, group) in [("n1a", "g1"), ("n1b", "g1"),
165 ("n2a", "g2"), ("n2b", "g2"),
166 ("n3a", "g3"), ("n3b", "g3"),
170 def Instance(name, pnode, snode):
173 disk_template = constants.DT_DISKLESS
175 disks = [objects.Disk(dev_type=constants.LD_DRBD8,
176 logical_id=[pnode, snode, 1, 17, 17])]
177 disk_template = constants.DT_DRBD8
179 return objects.Instance(name=name, primary_node=pnode, disks=disks,
180 disk_template=disk_template)
182 instance_data = dict((name, Instance(name, pnode, snode))
183 for name, pnode, snode in [("inst1a", "n1a", "n1b"),
184 ("inst1b", "n1b", "n1a"),
185 ("inst2a", "n2a", "n2b"),
186 ("inst3a", "n3a", None),
187 ("inst3b", "n3b", "n1b"),
188 ("inst3c", "n3b", "n2b"),
191 # Test first with the existing state.
193 cmdlib.LUGroupAssignNodes.CheckAssignmentForSplitInstances([],
197 self.assertEqual([], new)
198 self.assertEqual(set(["inst3b", "inst3c"]), set(prev))
200 # And now some changes.
202 cmdlib.LUGroupAssignNodes.CheckAssignmentForSplitInstances([("n1b",
207 self.assertEqual(set(["inst1a", "inst1b"]), set(new))
208 self.assertEqual(set(["inst3c"]), set(prev))
211 if __name__ == "__main__":
212 testutils.GanetiTestProgram()