Add unittest for cli.FormatResultError
[ganeti-local] / test / ganeti.cmdlib_unittest.py
index 1fcf43a..4404d80 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/python
 #
 
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 import os
 import unittest
 import time
-import Queue
+import tempfile
+import shutil
 
+from ganeti import constants
+from ganeti import mcpu
 from ganeti import cmdlib
+from ganeti import opcodes
 from ganeti import errors
+from ganeti import utils
+from ganeti import luxi
+from ganeti import ht
+from ganeti import objects
 
+import testutils
+import mocks
 
-class FieldSetTestCase(unittest.TestCase):
-  """Test case for FieldSets"""
-
-  def testSimpleMatch(self):
-    f = cmdlib._FieldSet("a", "b", "c", "def")
-    self.failUnless(f.Matches("a"))
-    self.failIf(f.Matches("d"), "Substring matched")
-    self.failIf(f.Matches("defghi"), "Prefix string matched")
-    self.failIf(f.NonMatching(["b", "c"]))
-    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
-    self.failUnless(f.NonMatching(["a", "d"]))
-
-  def testRegexMatch(self):
-    f = cmdlib._FieldSet("a", "b([0-9]+)", "c")
-    self.failUnless(f.Matches("b1"))
-    self.failUnless(f.Matches("b99"))
-    self.failIf(f.Matches("b/1"))
-    self.failIf(f.NonMatching(["b12", "c"]))
-    self.failUnless(f.NonMatching(["a", "1"]))
-
-if __name__ == '__main__':
-  unittest.main()
+
+class TestCertVerification(testutils.GanetiTestCase):
+  def setUp(self):
+    testutils.GanetiTestCase.setUp(self)
+
+    self.tmpdir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def testVerifyCertificate(self):
+    cmdlib._VerifyCertificate(self._TestDataFilename("cert1.pem"))
+
+    nonexist_filename = os.path.join(self.tmpdir, "does-not-exist")
+
+    (errcode, msg) = cmdlib._VerifyCertificate(nonexist_filename)
+    self.assertEqual(errcode, cmdlib.LUClusterVerify.ETYPE_ERROR)
+
+    # Try to load non-certificate file
+    invalid_cert = self._TestDataFilename("bdev-net.txt")
+    (errcode, msg) = cmdlib._VerifyCertificate(invalid_cert)
+    self.assertEqual(errcode, cmdlib.LUClusterVerify.ETYPE_ERROR)
+
+
+class TestOpcodeParams(testutils.GanetiTestCase):
+  def testParamsStructures(self):
+    for op in sorted(mcpu.Processor.DISPATCH_TABLE):
+      lu = mcpu.Processor.DISPATCH_TABLE[op]
+      lu_name = lu.__name__
+      self.failIf(hasattr(lu, "_OP_REQP"),
+                  msg=("LU '%s' has old-style _OP_REQP" % lu_name))
+      self.failIf(hasattr(lu, "_OP_DEFS"),
+                  msg=("LU '%s' has old-style _OP_DEFS" % lu_name))
+      self.failIf(hasattr(lu, "_OP_PARAMS"),
+                  msg=("LU '%s' has old-style _OP_PARAMS" % lu_name))
+
+
+class TestIAllocatorChecks(testutils.GanetiTestCase):
+  def testFunction(self):
+    class TestLU(object):
+      def __init__(self, opcode):
+        self.cfg = mocks.FakeConfig()
+        self.op = opcode
+
+    class OpTest(opcodes.OpCode):
+       OP_PARAMS = [
+        ("iallocator", None, ht.NoType, None),
+        ("node", None, ht.NoType, None),
+        ]
+
+    default_iallocator = mocks.FakeConfig().GetDefaultIAllocator()
+    other_iallocator = default_iallocator + "_not"
+
+    op = OpTest()
+    lu = TestLU(op)
+
+    c_i = lambda: cmdlib._CheckIAllocatorOrNode(lu, "iallocator", "node")
+
+    # Neither node nor iallocator given
+    op.iallocator = None
+    op.node = None
+    c_i()
+    self.assertEqual(lu.op.iallocator, default_iallocator)
+    self.assertEqual(lu.op.node, None)
+
+    # Both, iallocator and node given
+    op.iallocator = "test"
+    op.node = "test"
+    self.assertRaises(errors.OpPrereqError, c_i)
+
+    # Only iallocator given
+    op.iallocator = other_iallocator
+    op.node = None
+    c_i()
+    self.assertEqual(lu.op.iallocator, other_iallocator)
+    self.assertEqual(lu.op.node, None)
+
+    # Only node given
+    op.iallocator = None
+    op.node = "node"
+    c_i()
+    self.assertEqual(lu.op.iallocator, None)
+    self.assertEqual(lu.op.node, "node")
+
+    # No node, iallocator or default iallocator
+    op.iallocator = None
+    op.node = None
+    lu.cfg.GetDefaultIAllocator = lambda: None
+    self.assertRaises(errors.OpPrereqError, c_i)
+
+
+class TestLUTestJqueue(unittest.TestCase):
+  def test(self):
+    self.assert_(cmdlib.LUTestJqueue._CLIENT_CONNECT_TIMEOUT <
+                 (luxi.WFJC_TIMEOUT * 0.75),
+                 msg=("Client timeout too high, might not notice bugs"
+                      " in WaitForJobChange"))
+
+
+class TestLUQuery(unittest.TestCase):
+  def test(self):
+    self.assertEqual(sorted(cmdlib._QUERY_IMPL.keys()),
+                     sorted(constants.QR_OP_QUERY))
+
+    assert constants.QR_NODE in constants.QR_OP_QUERY
+    assert constants.QR_INSTANCE in constants.QR_OP_QUERY
+
+    for i in constants.QR_OP_QUERY:
+      self.assert_(cmdlib._GetQueryImplementation(i))
+
+    self.assertRaises(errors.OpPrereqError, cmdlib._GetQueryImplementation, "")
+    self.assertRaises(errors.OpPrereqError, cmdlib._GetQueryImplementation,
+                      "xyz")
+
+
+class TestLUGroupAssignNodes(unittest.TestCase):
+
+  def testCheckAssignmentForSplitInstances(self):
+    node_data = dict((name, objects.Node(name=name, group=group))
+                     for (name, group) in [("n1a", "g1"), ("n1b", "g1"),
+                                           ("n2a", "g2"), ("n2b", "g2"),
+                                           ("n3a", "g3"), ("n3b", "g3"),
+                                           ("n3c", "g3"),
+                                           ])
+
+    def Instance(name, pnode, snode):
+      if snode is None:
+        disks = []
+        disk_template = constants.DT_DISKLESS
+      else:
+        disks = [objects.Disk(dev_type=constants.LD_DRBD8,
+                              logical_id=[pnode, snode, 1, 17, 17])]
+        disk_template = constants.DT_DRBD8
+
+      return objects.Instance(name=name, primary_node=pnode, disks=disks,
+                              disk_template=disk_template)
+
+    instance_data = dict((name, Instance(name, pnode, snode))
+                         for name, pnode, snode in [("inst1a", "n1a", "n1b"),
+                                                    ("inst1b", "n1b", "n1a"),
+                                                    ("inst2a", "n2a", "n2b"),
+                                                    ("inst3a", "n3a", None),
+                                                    ("inst3b", "n3b", "n1b"),
+                                                    ("inst3c", "n3b", "n2b"),
+                                                    ])
+
+    # Test first with the existing state.
+    (new, prev) = \
+      cmdlib.LUGroupAssignNodes.CheckAssignmentForSplitInstances([],
+                                                                 node_data,
+                                                                 instance_data)
+
+    self.assertEqual([], new)
+    self.assertEqual(set(["inst3b", "inst3c"]), set(prev))
+
+    # And now some changes.
+    (new, prev) = \
+      cmdlib.LUGroupAssignNodes.CheckAssignmentForSplitInstances([("n1b",
+                                                                   "g3")],
+                                                                 node_data,
+                                                                 instance_data)
+
+    self.assertEqual(set(["inst1a", "inst1b"]), set(new))
+    self.assertEqual(set(["inst3c"]), set(prev))
+
+
+if __name__ == "__main__":
+  testutils.GanetiTestProgram()