4 # Copyright (C) 2008, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the cmdlib module"""
31 from ganeti import constants
32 from ganeti import mcpu
33 from ganeti import cmdlib
34 from ganeti import opcodes
35 from ganeti import errors
36 from ganeti import utils
37 from ganeti import luxi
39 from ganeti import objects
45 class TestCertVerification(testutils.GanetiTestCase):
47 testutils.GanetiTestCase.setUp(self)
49 self.tmpdir = tempfile.mkdtemp()
52 shutil.rmtree(self.tmpdir)
54 def testVerifyCertificate(self):
55 cmdlib._VerifyCertificate(self._TestDataFilename("cert1.pem"))
57 nonexist_filename = os.path.join(self.tmpdir, "does-not-exist")
59 (errcode, msg) = cmdlib._VerifyCertificate(nonexist_filename)
60 self.assertEqual(errcode, cmdlib.LUClusterVerifyConfig.ETYPE_ERROR)
62 # Try to load non-certificate file
63 invalid_cert = self._TestDataFilename("bdev-net.txt")
64 (errcode, msg) = cmdlib._VerifyCertificate(invalid_cert)
65 self.assertEqual(errcode, cmdlib.LUClusterVerifyConfig.ETYPE_ERROR)
68 class TestOpcodeParams(testutils.GanetiTestCase):
69 def testParamsStructures(self):
70 for op in sorted(mcpu.Processor.DISPATCH_TABLE):
71 lu = mcpu.Processor.DISPATCH_TABLE[op]
73 self.failIf(hasattr(lu, "_OP_REQP"),
74 msg=("LU '%s' has old-style _OP_REQP" % lu_name))
75 self.failIf(hasattr(lu, "_OP_DEFS"),
76 msg=("LU '%s' has old-style _OP_DEFS" % lu_name))
77 self.failIf(hasattr(lu, "_OP_PARAMS"),
78 msg=("LU '%s' has old-style _OP_PARAMS" % lu_name))
81 class TestIAllocatorChecks(testutils.GanetiTestCase):
82 def testFunction(self):
84 def __init__(self, opcode):
85 self.cfg = mocks.FakeConfig()
88 class OpTest(opcodes.OpCode):
90 ("iallocator", None, ht.NoType, None),
91 ("node", None, ht.NoType, None),
94 default_iallocator = mocks.FakeConfig().GetDefaultIAllocator()
95 other_iallocator = default_iallocator + "_not"
100 c_i = lambda: cmdlib._CheckIAllocatorOrNode(lu, "iallocator", "node")
102 # Neither node nor iallocator given
106 self.assertEqual(lu.op.iallocator, default_iallocator)
107 self.assertEqual(lu.op.node, None)
109 # Both, iallocator and node given
110 op.iallocator = "test"
112 self.assertRaises(errors.OpPrereqError, c_i)
114 # Only iallocator given
115 op.iallocator = other_iallocator
118 self.assertEqual(lu.op.iallocator, other_iallocator)
119 self.assertEqual(lu.op.node, None)
125 self.assertEqual(lu.op.iallocator, None)
126 self.assertEqual(lu.op.node, "node")
128 # No node, iallocator or default iallocator
131 lu.cfg.GetDefaultIAllocator = lambda: None
132 self.assertRaises(errors.OpPrereqError, c_i)
135 class TestLUTestJqueue(unittest.TestCase):
137 self.assert_(cmdlib.LUTestJqueue._CLIENT_CONNECT_TIMEOUT <
138 (luxi.WFJC_TIMEOUT * 0.75),
139 msg=("Client timeout too high, might not notice bugs"
140 " in WaitForJobChange"))
143 class TestLUQuery(unittest.TestCase):
145 self.assertEqual(sorted(cmdlib._QUERY_IMPL.keys()),
146 sorted(constants.QR_VIA_OP))
148 assert constants.QR_NODE in constants.QR_VIA_OP
149 assert constants.QR_INSTANCE in constants.QR_VIA_OP
151 for i in constants.QR_VIA_OP:
152 self.assert_(cmdlib._GetQueryImplementation(i))
154 self.assertRaises(errors.OpPrereqError, cmdlib._GetQueryImplementation, "")
155 self.assertRaises(errors.OpPrereqError, cmdlib._GetQueryImplementation,
159 class TestLUGroupAssignNodes(unittest.TestCase):
161 def testCheckAssignmentForSplitInstances(self):
162 node_data = dict((name, objects.Node(name=name, group=group))
163 for (name, group) in [("n1a", "g1"), ("n1b", "g1"),
164 ("n2a", "g2"), ("n2b", "g2"),
165 ("n3a", "g3"), ("n3b", "g3"),
169 def Instance(name, pnode, snode):
172 disk_template = constants.DT_DISKLESS
174 disks = [objects.Disk(dev_type=constants.LD_DRBD8,
175 logical_id=[pnode, snode, 1, 17, 17])]
176 disk_template = constants.DT_DRBD8
178 return objects.Instance(name=name, primary_node=pnode, disks=disks,
179 disk_template=disk_template)
181 instance_data = dict((name, Instance(name, pnode, snode))
182 for name, pnode, snode in [("inst1a", "n1a", "n1b"),
183 ("inst1b", "n1b", "n1a"),
184 ("inst2a", "n2a", "n2b"),
185 ("inst3a", "n3a", None),
186 ("inst3b", "n3b", "n1b"),
187 ("inst3c", "n3b", "n2b"),
190 # Test first with the existing state.
192 cmdlib.LUGroupAssignNodes.CheckAssignmentForSplitInstances([],
196 self.assertEqual([], new)
197 self.assertEqual(set(["inst3b", "inst3c"]), set(prev))
199 # And now some changes.
201 cmdlib.LUGroupAssignNodes.CheckAssignmentForSplitInstances([("n1b",
206 self.assertEqual(set(["inst1a", "inst1b"]), set(new))
207 self.assertEqual(set(["inst3c"]), set(prev))
210 if __name__ == "__main__":
211 testutils.GanetiTestProgram()