Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.opcodes_unittest.py @ 197b323b

History | View | Annotate | Download (8.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.backend"""
23

    
24
import os
25
import sys
26
import unittest
27

    
28
from ganeti import utils
29
from ganeti import opcodes
30
from ganeti import ht
31
from ganeti import constants
32
from ganeti import errors
33

    
34
import testutils
35

    
36

    
37
class TestOpcodes(unittest.TestCase):
38
  def test(self):
39
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, None)
40
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, "")
41
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, {})
42
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, {"OP_ID": ""})
43

    
44
    for cls in opcodes.OP_MAPPING.values():
45
      self.assert_(cls.OP_ID.startswith("OP_"))
46
      self.assert_(len(cls.OP_ID) > 3)
47
      self.assertEqual(cls.OP_ID, cls.OP_ID.upper())
48
      self.assertEqual(cls.OP_ID, opcodes._NameToId(cls.__name__))
49

    
50
      self.assertRaises(TypeError, cls, unsupported_parameter="some value")
51

    
52
      args = [
53
        # No variables
54
        {},
55

    
56
        # Variables supported by all opcodes
57
        {"dry_run": False, "debug_level": 0, },
58

    
59
        # All variables
60
        dict([(name, False) for name in cls._all_slots()])
61
        ]
62

    
63
      for i in args:
64
        op = cls(**i)
65

    
66
        self.assertEqual(op.OP_ID, cls.OP_ID)
67
        self._checkSummary(op)
68

    
69
        # Try a restore
70
        state = op.__getstate__()
71
        self.assert_(isinstance(state, dict))
72

    
73
        restored = opcodes.OpCode.LoadOpCode(state)
74
        self.assert_(isinstance(restored, cls))
75
        self._checkSummary(restored)
76

    
77
        for name in ["x_y_z", "hello_world"]:
78
          assert name not in cls._all_slots()
79
          for value in [None, True, False, [], "Hello World"]:
80
            self.assertRaises(AttributeError, setattr, op, name, value)
81

    
82
  def _checkSummary(self, op):
83
    summary = op.Summary()
84

    
85
    if hasattr(op, "OP_DSC_FIELD"):
86
      self.assert_(("OP_%s" % summary).startswith("%s(" % op.OP_ID))
87
      self.assert_(summary.endswith(")"))
88
    else:
89
      self.assertEqual("OP_%s" % summary, op.OP_ID)
90

    
91
  def testSummary(self):
92
    class OpTest(opcodes.OpCode):
93
      OP_DSC_FIELD = "data"
94
      OP_PARAMS = [
95
        ("data", ht.NoDefault, ht.TString, None),
96
        ]
97

    
98
    self.assertEqual(OpTest(data="").Summary(), "TEST()")
99
    self.assertEqual(OpTest(data="Hello World").Summary(),
100
                     "TEST(Hello World)")
101
    self.assertEqual(OpTest(data="node1.example.com").Summary(),
102
                     "TEST(node1.example.com)")
103

    
104
  def testListSummary(self):
105
    class OpTest(opcodes.OpCode):
106
      OP_DSC_FIELD = "data"
107
      OP_PARAMS = [
108
        ("data", ht.NoDefault, ht.TList, None),
109
        ]
110

    
111
    self.assertEqual(OpTest(data=["a", "b", "c"]).Summary(),
112
                     "TEST(a,b,c)")
113
    self.assertEqual(OpTest(data=["a", None, "c"]).Summary(),
114
                     "TEST(a,None,c)")
115
    self.assertEqual(OpTest(data=[1, 2, 3, 4]).Summary(), "TEST(1,2,3,4)")
116

    
117
  def testOpId(self):
118
    self.assertFalse(utils.FindDuplicates(cls.OP_ID
119
                                          for cls in opcodes._GetOpList()))
120
    self.assertEqual(len(opcodes._GetOpList()), len(opcodes.OP_MAPPING))
121

    
122
  def testParams(self):
123
    supported_by_all = set(["debug_level", "dry_run", "priority"])
124

    
125
    self.assertTrue(opcodes.BaseOpCode not in opcodes.OP_MAPPING.values())
126
    self.assertTrue(opcodes.OpCode not in opcodes.OP_MAPPING.values())
127

    
128
    for cls in opcodes.OP_MAPPING.values() + [opcodes.OpCode]:
129
      all_slots = cls._all_slots()
130

    
131
      self.assertEqual(len(set(all_slots) & supported_by_all), 3,
132
                       msg=("Opcode %s doesn't support all base"
133
                            " parameters (%r)" % (cls.OP_ID, supported_by_all)))
134

    
135
      # All opcodes must have OP_PARAMS
136
      self.assert_(hasattr(cls, "OP_PARAMS"),
137
                   msg="%s doesn't have OP_PARAMS" % cls.OP_ID)
138

    
139
      param_names = [name for (name, _, _, _) in cls.GetAllParams()]
140

    
141
      self.assertEqual(all_slots, param_names)
142

    
143
      # Without inheritance
144
      self.assertEqual(cls.__slots__,
145
                       [name for (name, _, _, _) in cls.OP_PARAMS])
146

    
147
      # This won't work if parameters are converted to a dictionary
148
      duplicates = utils.FindDuplicates(param_names)
149
      self.assertFalse(duplicates,
150
                       msg=("Found duplicate parameters %r in %s" %
151
                            (duplicates, cls.OP_ID)))
152

    
153
      # Check parameter definitions
154
      for attr_name, aval, test, doc in cls.GetAllParams():
155
        self.assert_(attr_name)
156
        self.assert_(test is None or test is ht.NoType or callable(test),
157
                     msg=("Invalid type check for %s.%s" %
158
                          (cls.OP_ID, attr_name)))
159
        self.assertTrue(doc is None)
160

    
161
        if callable(aval):
162
          self.assertFalse(callable(aval()),
163
                           msg="Default value returned by function is callable")
164

    
165
  def testValidateNoModification(self):
166
    class OpTest(opcodes.OpCode):
167
      OP_PARAMS = [
168
        ("nodef", ht.NoDefault, ht.TMaybeString, None),
169
        ("wdef", "default", ht.TMaybeString, None),
170
        ("number", 0, ht.TInt, None),
171
        ("notype", None, ht.NoType, None),
172
        ]
173

    
174
    # Missing required parameter "nodef"
175
    op = OpTest()
176
    before = op.__getstate__()
177
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
178
    self.assertFalse(hasattr(op, "nodef"))
179
    self.assertFalse(hasattr(op, "wdef"))
180
    self.assertFalse(hasattr(op, "number"))
181
    self.assertFalse(hasattr(op, "notype"))
182
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
183

    
184
    # Required parameter "nodef" is provided
185
    op = OpTest(nodef="foo")
186
    before = op.__getstate__()
187
    op.Validate(False)
188
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
189
    self.assertEqual(op.nodef, "foo")
190
    self.assertFalse(hasattr(op, "wdef"))
191
    self.assertFalse(hasattr(op, "number"))
192
    self.assertFalse(hasattr(op, "notype"))
193

    
194
    # Missing required parameter "nodef"
195
    op = OpTest(wdef="hello", number=999)
196
    before = op.__getstate__()
197
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
198
    self.assertFalse(hasattr(op, "nodef"))
199
    self.assertFalse(hasattr(op, "notype"))
200
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
201

    
202
    # Wrong type for "nodef"
203
    op = OpTest(nodef=987)
204
    before = op.__getstate__()
205
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
206
    self.assertEqual(op.nodef, 987)
207
    self.assertFalse(hasattr(op, "notype"))
208
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
209

    
210
    # Testing different types for "notype"
211
    op = OpTest(nodef="foo", notype=[1, 2, 3])
212
    before = op.__getstate__()
213
    op.Validate(False)
214
    self.assertEqual(op.nodef, "foo")
215
    self.assertEqual(op.notype, [1, 2, 3])
216
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
217

    
218
    op = OpTest(nodef="foo", notype="Hello World")
219
    before = op.__getstate__()
220
    op.Validate(False)
221
    self.assertEqual(op.nodef, "foo")
222
    self.assertEqual(op.notype, "Hello World")
223
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
224

    
225
  def testValidateSetDefaults(self):
226
    class OpTest(opcodes.OpCode):
227
      OP_PARAMS = [
228
        # Static default value
229
        ("value1", "default", ht.TMaybeString, None),
230

    
231
        # Default value callback
232
        ("value2", lambda: "result", ht.TMaybeString, None),
233
        ]
234

    
235
    op = OpTest()
236
    before = op.__getstate__()
237
    op.Validate(True)
238
    self.assertNotEqual(op.__getstate__(), before,
239
                        msg="Opcode was not modified")
240
    self.assertEqual(op.value1, "default")
241
    self.assertEqual(op.value2, "result")
242
    self.assert_(op.dry_run is None)
243
    self.assert_(op.debug_level is None)
244
    self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
245

    
246
    op = OpTest(value1="hello", value2="world", debug_level=123)
247
    before = op.__getstate__()
248
    op.Validate(True)
249
    self.assertNotEqual(op.__getstate__(), before,
250
                        msg="Opcode was not modified")
251
    self.assertEqual(op.value1, "hello")
252
    self.assertEqual(op.value2, "world")
253
    self.assertEqual(op.debug_level, 123)
254

    
255

    
256
if __name__ == "__main__":
257
  testutils.GanetiTestProgram()