Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.opcodes_unittest.py @ 7578ab0a

History | View | Annotate | Download (9.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.backend"""
23

    
24
import os
25
import sys
26
import unittest
27

    
28
from ganeti import utils
29
from ganeti import opcodes
30
from ganeti import ht
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import compat
34

    
35
import testutils
36

    
37

    
38
class TestOpcodes(unittest.TestCase):
39
  def test(self):
40
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, None)
41
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, "")
42
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, {})
43
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, {"OP_ID": ""})
44

    
45
    for cls in opcodes.OP_MAPPING.values():
46
      self.assert_(cls.OP_ID.startswith("OP_"))
47
      self.assert_(len(cls.OP_ID) > 3)
48
      self.assertEqual(cls.OP_ID, cls.OP_ID.upper())
49
      self.assertEqual(cls.OP_ID, opcodes._NameToId(cls.__name__))
50

    
51
      self.assertRaises(TypeError, cls, unsupported_parameter="some value")
52

    
53
      args = [
54
        # No variables
55
        {},
56

    
57
        # Variables supported by all opcodes
58
        {"dry_run": False, "debug_level": 0, },
59

    
60
        # All variables
61
        dict([(name, False) for name in cls._all_slots()])
62
        ]
63

    
64
      for i in args:
65
        op = cls(**i)
66

    
67
        self.assertEqual(op.OP_ID, cls.OP_ID)
68
        self._checkSummary(op)
69

    
70
        # Try a restore
71
        state = op.__getstate__()
72
        self.assert_(isinstance(state, dict))
73

    
74
        restored = opcodes.OpCode.LoadOpCode(state)
75
        self.assert_(isinstance(restored, cls))
76
        self._checkSummary(restored)
77

    
78
        for name in ["x_y_z", "hello_world"]:
79
          assert name not in cls._all_slots()
80
          for value in [None, True, False, [], "Hello World"]:
81
            self.assertRaises(AttributeError, setattr, op, name, value)
82

    
83
  def _checkSummary(self, op):
84
    summary = op.Summary()
85

    
86
    if hasattr(op, "OP_DSC_FIELD"):
87
      self.assert_(("OP_%s" % summary).startswith("%s(" % op.OP_ID))
88
      self.assert_(summary.endswith(")"))
89
    else:
90
      self.assertEqual("OP_%s" % summary, op.OP_ID)
91

    
92
  def testSummary(self):
93
    class OpTest(opcodes.OpCode):
94
      OP_DSC_FIELD = "data"
95
      OP_PARAMS = [
96
        ("data", ht.NoDefault, ht.TString, None),
97
        ]
98

    
99
    self.assertEqual(OpTest(data="").Summary(), "TEST()")
100
    self.assertEqual(OpTest(data="Hello World").Summary(),
101
                     "TEST(Hello World)")
102
    self.assertEqual(OpTest(data="node1.example.com").Summary(),
103
                     "TEST(node1.example.com)")
104

    
105
  def testListSummary(self):
106
    class OpTest(opcodes.OpCode):
107
      OP_DSC_FIELD = "data"
108
      OP_PARAMS = [
109
        ("data", ht.NoDefault, ht.TList, None),
110
        ]
111

    
112
    self.assertEqual(OpTest(data=["a", "b", "c"]).Summary(),
113
                     "TEST(a,b,c)")
114
    self.assertEqual(OpTest(data=["a", None, "c"]).Summary(),
115
                     "TEST(a,None,c)")
116
    self.assertEqual(OpTest(data=[1, 2, 3, 4]).Summary(), "TEST(1,2,3,4)")
117

    
118
  def testOpId(self):
119
    self.assertFalse(utils.FindDuplicates(cls.OP_ID
120
                                          for cls in opcodes._GetOpList()))
121
    self.assertEqual(len(opcodes._GetOpList()), len(opcodes.OP_MAPPING))
122

    
123
  def testParams(self):
124
    supported_by_all = set(["debug_level", "dry_run", "priority"])
125

    
126
    self.assertTrue(opcodes.BaseOpCode not in opcodes.OP_MAPPING.values())
127
    self.assertTrue(opcodes.OpCode not in opcodes.OP_MAPPING.values())
128

    
129
    for cls in opcodes.OP_MAPPING.values() + [opcodes.OpCode]:
130
      all_slots = cls._all_slots()
131

    
132
      self.assertEqual(len(set(all_slots) & supported_by_all), 3,
133
                       msg=("Opcode %s doesn't support all base"
134
                            " parameters (%r)" % (cls.OP_ID, supported_by_all)))
135

    
136
      # All opcodes must have OP_PARAMS
137
      self.assert_(hasattr(cls, "OP_PARAMS"),
138
                   msg="%s doesn't have OP_PARAMS" % cls.OP_ID)
139

    
140
      param_names = [name for (name, _, _, _) in cls.GetAllParams()]
141

    
142
      self.assertEqual(all_slots, param_names)
143

    
144
      # Without inheritance
145
      self.assertEqual(cls.__slots__,
146
                       [name for (name, _, _, _) in cls.OP_PARAMS])
147

    
148
      # This won't work if parameters are converted to a dictionary
149
      duplicates = utils.FindDuplicates(param_names)
150
      self.assertFalse(duplicates,
151
                       msg=("Found duplicate parameters %r in %s" %
152
                            (duplicates, cls.OP_ID)))
153

    
154
      # Check parameter definitions
155
      for attr_name, aval, test, doc in cls.GetAllParams():
156
        self.assert_(attr_name)
157
        self.assert_(test is None or test is ht.NoType or callable(test),
158
                     msg=("Invalid type check for %s.%s" %
159
                          (cls.OP_ID, attr_name)))
160
        self.assertTrue(doc is None or isinstance(doc, basestring))
161

    
162
        if callable(aval):
163
          self.assertFalse(callable(aval()),
164
                           msg="Default value returned by function is callable")
165

    
166
      # If any parameter has documentation, all others need to have it as well
167
      has_doc = [doc is not None for (_, _, _, doc) in cls.OP_PARAMS]
168
      self.assertTrue(not compat.any(has_doc) or compat.all(has_doc),
169
                      msg="%s does not document all parameters" % cls)
170

    
171
  def testValidateNoModification(self):
172
    class OpTest(opcodes.OpCode):
173
      OP_PARAMS = [
174
        ("nodef", ht.NoDefault, ht.TMaybeString, None),
175
        ("wdef", "default", ht.TMaybeString, None),
176
        ("number", 0, ht.TInt, None),
177
        ("notype", None, ht.NoType, None),
178
        ]
179

    
180
    # Missing required parameter "nodef"
181
    op = OpTest()
182
    before = op.__getstate__()
183
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
184
    self.assertFalse(hasattr(op, "nodef"))
185
    self.assertFalse(hasattr(op, "wdef"))
186
    self.assertFalse(hasattr(op, "number"))
187
    self.assertFalse(hasattr(op, "notype"))
188
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
189

    
190
    # Required parameter "nodef" is provided
191
    op = OpTest(nodef="foo")
192
    before = op.__getstate__()
193
    op.Validate(False)
194
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
195
    self.assertEqual(op.nodef, "foo")
196
    self.assertFalse(hasattr(op, "wdef"))
197
    self.assertFalse(hasattr(op, "number"))
198
    self.assertFalse(hasattr(op, "notype"))
199

    
200
    # Missing required parameter "nodef"
201
    op = OpTest(wdef="hello", number=999)
202
    before = op.__getstate__()
203
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
204
    self.assertFalse(hasattr(op, "nodef"))
205
    self.assertFalse(hasattr(op, "notype"))
206
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
207

    
208
    # Wrong type for "nodef"
209
    op = OpTest(nodef=987)
210
    before = op.__getstate__()
211
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
212
    self.assertEqual(op.nodef, 987)
213
    self.assertFalse(hasattr(op, "notype"))
214
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
215

    
216
    # Testing different types for "notype"
217
    op = OpTest(nodef="foo", notype=[1, 2, 3])
218
    before = op.__getstate__()
219
    op.Validate(False)
220
    self.assertEqual(op.nodef, "foo")
221
    self.assertEqual(op.notype, [1, 2, 3])
222
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
223

    
224
    op = OpTest(nodef="foo", notype="Hello World")
225
    before = op.__getstate__()
226
    op.Validate(False)
227
    self.assertEqual(op.nodef, "foo")
228
    self.assertEqual(op.notype, "Hello World")
229
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
230

    
231
  def testValidateSetDefaults(self):
232
    class OpTest(opcodes.OpCode):
233
      OP_PARAMS = [
234
        # Static default value
235
        ("value1", "default", ht.TMaybeString, None),
236

    
237
        # Default value callback
238
        ("value2", lambda: "result", ht.TMaybeString, None),
239
        ]
240

    
241
    op = OpTest()
242
    before = op.__getstate__()
243
    op.Validate(True)
244
    self.assertNotEqual(op.__getstate__(), before,
245
                        msg="Opcode was not modified")
246
    self.assertEqual(op.value1, "default")
247
    self.assertEqual(op.value2, "result")
248
    self.assert_(op.dry_run is None)
249
    self.assert_(op.debug_level is None)
250
    self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
251

    
252
    op = OpTest(value1="hello", value2="world", debug_level=123)
253
    before = op.__getstate__()
254
    op.Validate(True)
255
    self.assertNotEqual(op.__getstate__(), before,
256
                        msg="Opcode was not modified")
257
    self.assertEqual(op.value1, "hello")
258
    self.assertEqual(op.value2, "world")
259
    self.assertEqual(op.debug_level, 123)
260

    
261

    
262
if __name__ == "__main__":
263
  testutils.GanetiTestProgram()