#!/usr/bin/python
#
-# Copyright (C) 2010 Google Inc.
+# Copyright (C) 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
from ganeti import utils
from ganeti import opcodes
from ganeti import ht
+from ganeti import constants
+from ganeti import errors
+from ganeti import compat
import testutils
+#: Unless an opcode is included in the following list it must have a result
+#: check of some sort
+MISSING_RESULT_CHECK = frozenset([
+ opcodes.OpTestAllocator,
+ opcodes.OpTestDelay,
+ opcodes.OpTestDummy,
+ opcodes.OpTestJqueue,
+ ])
+
+
class TestOpcodes(unittest.TestCase):
def test(self):
self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, None)
for cls in opcodes.OP_MAPPING.values():
self.assert_(cls.OP_ID.startswith("OP_"))
+ self.assert_(len(cls.OP_ID) > 3)
self.assertEqual(cls.OP_ID, cls.OP_ID.upper())
+ self.assertEqual(cls.OP_ID, opcodes._NameToId(cls.__name__))
+ self.assertFalse(compat.any(cls.OP_ID.startswith(prefix)
+ for prefix in opcodes._SUMMARY_PREFIX.keys()))
+ if cls in MISSING_RESULT_CHECK:
+ self.assertTrue(cls.OP_RESULT is None,
+ msg=("%s is listed to not have a result check" %
+ cls.OP_ID))
+ else:
+ self.assertTrue(callable(cls.OP_RESULT),
+ msg=("%s should have a result check" % cls.OP_ID))
self.assertRaises(TypeError, cls, unsupported_parameter="some value")
{"dry_run": False, "debug_level": 0, },
# All variables
- dict([(name, False) for name in cls._all_slots()])
+ dict([(name, []) for name in cls.GetAllSlots()])
]
for i in args:
self._checkSummary(restored)
for name in ["x_y_z", "hello_world"]:
- assert name not in cls._all_slots()
+ assert name not in cls.GetAllSlots()
for value in [None, True, False, [], "Hello World"]:
self.assertRaises(AttributeError, setattr, op, name, value)
else:
self.assertEqual("OP_%s" % summary, op.OP_ID)
+ def testSummary(self):
+ class OpTest(opcodes.OpCode):
+ OP_DSC_FIELD = "data"
+ OP_PARAMS = [
+ ("data", ht.NoDefault, ht.TString, None),
+ ]
+
+ self.assertEqual(OpTest(data="").Summary(), "TEST()")
+ self.assertEqual(OpTest(data="Hello World").Summary(),
+ "TEST(Hello World)")
+ self.assertEqual(OpTest(data="node1.example.com").Summary(),
+ "TEST(node1.example.com)")
+
+ def testTinySummary(self):
+ self.assertFalse(utils.FindDuplicates(opcodes._SUMMARY_PREFIX.values()))
+ self.assertTrue(compat.all(prefix.endswith("_") and supplement.endswith("_")
+ for (prefix, supplement) in
+ opcodes._SUMMARY_PREFIX.items()))
+
+ self.assertEqual(opcodes.OpClusterPostInit().TinySummary(), "C_POST_INIT")
+ self.assertEqual(opcodes.OpNodeRemove().TinySummary(), "N_REMOVE")
+ self.assertEqual(opcodes.OpInstanceMigrate().TinySummary(), "I_MIGRATE")
+ self.assertEqual(opcodes.OpGroupQuery().TinySummary(), "G_QUERY")
+ self.assertEqual(opcodes.OpTestJqueue().TinySummary(), "TEST_JQUEUE")
+
+ def testListSummary(self):
+ class OpTest(opcodes.OpCode):
+ OP_DSC_FIELD = "data"
+ OP_PARAMS = [
+ ("data", ht.NoDefault, ht.TList, None),
+ ]
+
+ self.assertEqual(OpTest(data=["a", "b", "c"]).Summary(),
+ "TEST(a,b,c)")
+ self.assertEqual(OpTest(data=["a", None, "c"]).Summary(),
+ "TEST(a,None,c)")
+ self.assertEqual(OpTest(data=[1, 2, 3, 4]).Summary(), "TEST(1,2,3,4)")
+
+ def testOpId(self):
+ self.assertFalse(utils.FindDuplicates(cls.OP_ID
+ for cls in opcodes._GetOpList()))
+ self.assertEqual(len(opcodes._GetOpList()), len(opcodes.OP_MAPPING))
+
def testParams(self):
supported_by_all = set(["debug_level", "dry_run", "priority"])
- self.assert_(opcodes.BaseOpCode not in opcodes.OP_MAPPING.values())
- self.assert_(opcodes.OpCode in opcodes.OP_MAPPING.values())
+ self.assertTrue(opcodes.BaseOpCode not in opcodes.OP_MAPPING.values())
+ self.assertTrue(opcodes.OpCode not in opcodes.OP_MAPPING.values())
- for cls in opcodes.OP_MAPPING.values():
- all_slots = cls._all_slots()
+ for cls in opcodes.OP_MAPPING.values() + [opcodes.OpCode]:
+ all_slots = cls.GetAllSlots()
self.assertEqual(len(set(all_slots) & supported_by_all), 3,
msg=("Opcode %s doesn't support all base"
self.assert_(hasattr(cls, "OP_PARAMS"),
msg="%s doesn't have OP_PARAMS" % cls.OP_ID)
- param_names = [name for (name, _, _) in cls.GetAllParams()]
+ param_names = [name for (name, _, _, _) in cls.GetAllParams()]
self.assertEqual(all_slots, param_names)
# Without inheritance
- self.assertEqual(cls.__slots__, [name for (name, _, _) in cls.OP_PARAMS])
+ self.assertEqual(cls.__slots__,
+ [name for (name, _, _, _) in cls.OP_PARAMS])
# This won't work if parameters are converted to a dictionary
duplicates = utils.FindDuplicates(param_names)
(duplicates, cls.OP_ID)))
# Check parameter definitions
- for attr_name, aval, test in cls.GetAllParams():
+ for attr_name, aval, test, doc in cls.GetAllParams():
self.assert_(attr_name)
self.assert_(test is None or test is ht.NoType or callable(test),
msg=("Invalid type check for %s.%s" %
(cls.OP_ID, attr_name)))
+ self.assertTrue(doc is None or isinstance(doc, basestring))
if callable(aval):
self.assertFalse(callable(aval()),
msg="Default value returned by function is callable")
+ # If any parameter has documentation, all others need to have it as well
+ has_doc = [doc is not None for (_, _, _, doc) in cls.OP_PARAMS]
+ self.assertTrue(not compat.any(has_doc) or compat.all(has_doc),
+ msg="%s does not document all parameters" % cls)
+
+ def testValidateNoModification(self):
+ class OpTest(opcodes.OpCode):
+ OP_PARAMS = [
+ ("nodef", ht.NoDefault, ht.TMaybeString, None),
+ ("wdef", "default", ht.TMaybeString, None),
+ ("number", 0, ht.TInt, None),
+ ("notype", None, ht.NoType, None),
+ ]
+
+ # Missing required parameter "nodef"
+ op = OpTest()
+ before = op.__getstate__()
+ self.assertRaises(errors.OpPrereqError, op.Validate, False)
+ self.assertFalse(hasattr(op, "nodef"))
+ self.assertFalse(hasattr(op, "wdef"))
+ self.assertFalse(hasattr(op, "number"))
+ self.assertFalse(hasattr(op, "notype"))
+ self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
+
+ # Required parameter "nodef" is provided
+ op = OpTest(nodef="foo")
+ before = op.__getstate__()
+ op.Validate(False)
+ self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
+ self.assertEqual(op.nodef, "foo")
+ self.assertFalse(hasattr(op, "wdef"))
+ self.assertFalse(hasattr(op, "number"))
+ self.assertFalse(hasattr(op, "notype"))
+
+ # Missing required parameter "nodef"
+ op = OpTest(wdef="hello", number=999)
+ before = op.__getstate__()
+ self.assertRaises(errors.OpPrereqError, op.Validate, False)
+ self.assertFalse(hasattr(op, "nodef"))
+ self.assertFalse(hasattr(op, "notype"))
+ self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
+
+ # Wrong type for "nodef"
+ op = OpTest(nodef=987)
+ before = op.__getstate__()
+ self.assertRaises(errors.OpPrereqError, op.Validate, False)
+ self.assertEqual(op.nodef, 987)
+ self.assertFalse(hasattr(op, "notype"))
+ self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
+
+ # Testing different types for "notype"
+ op = OpTest(nodef="foo", notype=[1, 2, 3])
+ before = op.__getstate__()
+ op.Validate(False)
+ self.assertEqual(op.nodef, "foo")
+ self.assertEqual(op.notype, [1, 2, 3])
+ self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
+
+ op = OpTest(nodef="foo", notype="Hello World")
+ before = op.__getstate__()
+ op.Validate(False)
+ self.assertEqual(op.nodef, "foo")
+ self.assertEqual(op.notype, "Hello World")
+ self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
+
+ def testValidateSetDefaults(self):
+ class OpTest(opcodes.OpCode):
+ OP_PARAMS = [
+ # Static default value
+ ("value1", "default", ht.TMaybeString, None),
+
+ # Default value callback
+ ("value2", lambda: "result", ht.TMaybeString, None),
+ ]
+
+ op = OpTest()
+ before = op.__getstate__()
+ op.Validate(True)
+ self.assertNotEqual(op.__getstate__(), before,
+ msg="Opcode was not modified")
+ self.assertEqual(op.value1, "default")
+ self.assertEqual(op.value2, "result")
+ self.assert_(op.dry_run is None)
+ self.assert_(op.debug_level is None)
+ self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
+
+ op = OpTest(value1="hello", value2="world", debug_level=123)
+ before = op.__getstate__()
+ op.Validate(True)
+ self.assertNotEqual(op.__getstate__(), before,
+ msg="Opcode was not modified")
+ self.assertEqual(op.value1, "hello")
+ self.assertEqual(op.value2, "world")
+ self.assertEqual(op.debug_level, 123)
+
+
+ def testOpInstanceMultiAlloc(self):
+ inst = dict([(name, []) for name in opcodes.OpInstanceCreate.GetAllSlots()])
+ inst_op = opcodes.OpInstanceCreate(**inst)
+ inst_state = inst_op.__getstate__()
+
+ multialloc = opcodes.OpInstanceMultiAlloc(instances=[inst_op])
+ state = multialloc.__getstate__()
+ self.assertEquals(state["instances"], [inst_state])
+ loaded_multialloc = opcodes.OpCode.LoadOpCode(state)
+ (loaded_inst,) = loaded_multialloc.instances
+ self.assertNotEquals(loaded_inst, inst_op)
+ self.assertEquals(loaded_inst.__getstate__(), inst_state)
+
+
+class TestOpcodeDepends(unittest.TestCase):
+ def test(self):
+ check_relative = opcodes._BuildJobDepCheck(True)
+ check_norelative = opcodes.TNoRelativeJobDependencies
+
+ for fn in [check_relative, check_norelative]:
+ self.assertTrue(fn(None))
+ self.assertTrue(fn([]))
+ self.assertTrue(fn([(1, [])]))
+ self.assertTrue(fn([(719833, [])]))
+ self.assertTrue(fn([("24879", [])]))
+ self.assertTrue(fn([(2028, [constants.JOB_STATUS_ERROR])]))
+ self.assertTrue(fn([
+ (2028, [constants.JOB_STATUS_ERROR]),
+ (18750, []),
+ (5063, [constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR]),
+ ]))
+
+ self.assertFalse(fn(1))
+ self.assertFalse(fn([(9, )]))
+ self.assertFalse(fn([(15194, constants.JOB_STATUS_ERROR)]))
+
+ for i in [
+ [(-1, [])],
+ [(-27740, [constants.JOB_STATUS_CANCELED, constants.JOB_STATUS_ERROR]),
+ (-1, [constants.JOB_STATUS_ERROR]),
+ (9921, [])],
+ ]:
+ self.assertTrue(check_relative(i))
+ self.assertFalse(check_norelative(i))
+
+
+class TestResultChecks(unittest.TestCase):
+ def testJobIdList(self):
+ for i in [[], [(False, "error")], [(False, "")],
+ [(True, 123), (True, "999")]]:
+ self.assertTrue(opcodes.TJobIdList(i))
+
+ for i in ["", [("x", 1)], [[], []], [[False, "", None], [True, 123]]]:
+ self.assertFalse(opcodes.TJobIdList(i))
+
+ def testJobIdListOnly(self):
+ self.assertTrue(opcodes.TJobIdListOnly({
+ constants.JOB_IDS_KEY: [],
+ }))
+ self.assertTrue(opcodes.TJobIdListOnly({
+ constants.JOB_IDS_KEY: [(True, "9282")],
+ }))
+
+ self.assertFalse(opcodes.TJobIdListOnly({
+ "x": None,
+ }))
+ self.assertFalse(opcodes.TJobIdListOnly({
+ constants.JOB_IDS_KEY: [],
+ "x": None,
+ }))
+ self.assertFalse(opcodes.TJobIdListOnly({
+ constants.JOB_IDS_KEY: [("foo", "bar")],
+ }))
+ self.assertFalse(opcodes.TJobIdListOnly({
+ constants.JOB_IDS_KEY: [("one", "two", "three")],
+ }))
+
+
+class TestClusterOsList(unittest.TestCase):
+ def test(self):
+ good = [
+ None,
+ [],
+ [(constants.DDM_ADD, "dos"),
+ (constants.DDM_REMOVE, "linux")],
+ ]
+
+ for i in good:
+ self.assertTrue(opcodes._TestClusterOsList(i))
+
+ wrong = ["", 0, "xy", ["Hello World"], object(),
+ [("foo", "bar")],
+ [("", "")],
+ [[constants.DDM_ADD]],
+ [(constants.DDM_ADD, "")],
+ [(constants.DDM_REMOVE, "")],
+ [(constants.DDM_ADD, None)],
+ [(constants.DDM_REMOVE, None)],
+ ]
+
+ for i in wrong:
+ self.assertFalse(opcodes._TestClusterOsList(i))
+
+
+class TestOpInstanceSetParams(unittest.TestCase):
+ def _GenericTests(self, fn):
+ self.assertTrue(fn([]))
+ self.assertTrue(fn([(constants.DDM_ADD, {})]))
+ self.assertTrue(fn([(constants.DDM_REMOVE, {})]))
+ for i in [0, 1, 2, 3, 9, 10, 1024]:
+ self.assertTrue(fn([(i, {})]))
+
+ self.assertFalse(fn(None))
+ self.assertFalse(fn({}))
+ self.assertFalse(fn(""))
+ self.assertFalse(fn(0))
+ self.assertFalse(fn([(-100, {})]))
+ self.assertFalse(fn([(constants.DDM_ADD, 2, 3)]))
+ self.assertFalse(fn([[constants.DDM_ADD]]))
+
+ def testNicModifications(self):
+ fn = opcodes.OpInstanceSetParams.TestNicModifications
+ self._GenericTests(fn)
+
+ for param in constants.INIC_PARAMS:
+ self.assertTrue(fn([[constants.DDM_ADD, {param: None}]]))
+ self.assertTrue(fn([[constants.DDM_ADD, {param: param}]]))
+
+ def testDiskModifications(self):
+ fn = opcodes.OpInstanceSetParams.TestDiskModifications
+ self._GenericTests(fn)
+
+ for param in constants.IDISK_PARAMS:
+ self.assertTrue(fn([[constants.DDM_ADD, {param: 0}]]))
+ self.assertTrue(fn([[constants.DDM_ADD, {param: param}]]))
+
if __name__ == "__main__":
testutils.GanetiTestProgram()