Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.opcodes_unittest.py @ ff0d18e6

History | View | Annotate | Download (8.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.backend"""
23

    
24
import os
25
import sys
26
import unittest
27

    
28
from ganeti import utils
29
from ganeti import opcodes
30
from ganeti import ht
31
from ganeti import constants
32
from ganeti import errors
33

    
34
import testutils
35

    
36

    
37
class TestOpcodes(unittest.TestCase):
38
  def test(self):
39
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, None)
40
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, "")
41
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, {})
42
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, {"OP_ID": ""})
43

    
44
    for cls in opcodes.OP_MAPPING.values():
45
      self.assert_(cls.OP_ID.startswith("OP_"))
46
      self.assert_(len(cls.OP_ID) > 3)
47
      self.assertEqual(cls.OP_ID, cls.OP_ID.upper())
48
      self.assertEqual(cls.OP_ID, opcodes._NameToId(cls.__name__))
49

    
50
      self.assertRaises(TypeError, cls, unsupported_parameter="some value")
51

    
52
      args = [
53
        # No variables
54
        {},
55

    
56
        # Variables supported by all opcodes
57
        {"dry_run": False, "debug_level": 0, },
58

    
59
        # All variables
60
        dict([(name, False) for name in cls._all_slots()])
61
        ]
62

    
63
      for i in args:
64
        op = cls(**i)
65

    
66
        self.assertEqual(op.OP_ID, cls.OP_ID)
67
        self._checkSummary(op)
68

    
69
        # Try a restore
70
        state = op.__getstate__()
71
        self.assert_(isinstance(state, dict))
72

    
73
        restored = opcodes.OpCode.LoadOpCode(state)
74
        self.assert_(isinstance(restored, cls))
75
        self._checkSummary(restored)
76

    
77
        for name in ["x_y_z", "hello_world"]:
78
          assert name not in cls._all_slots()
79
          for value in [None, True, False, [], "Hello World"]:
80
            self.assertRaises(AttributeError, setattr, op, name, value)
81

    
82
  def _checkSummary(self, op):
83
    summary = op.Summary()
84

    
85
    if hasattr(op, "OP_DSC_FIELD"):
86
      self.assert_(("OP_%s" % summary).startswith("%s(" % op.OP_ID))
87
      self.assert_(summary.endswith(")"))
88
    else:
89
      self.assertEqual("OP_%s" % summary, op.OP_ID)
90

    
91
  def testSummary(self):
92
    class OpTest(opcodes.OpCode):
93
      OP_DSC_FIELD = "data"
94
      OP_PARAMS = [
95
        ("data", ht.NoDefault, ht.TString),
96
        ]
97

    
98
    self.assertEqual(OpTest(data="").Summary(), "TEST()")
99
    self.assertEqual(OpTest(data="Hello World").Summary(),
100
                     "TEST(Hello World)")
101
    self.assertEqual(OpTest(data="node1.example.com").Summary(),
102
                     "TEST(node1.example.com)")
103

    
104
  def testListSummary(self):
105
    class OpTest(opcodes.OpCode):
106
      OP_DSC_FIELD = "data"
107
      OP_PARAMS = [
108
        ("data", ht.NoDefault, ht.TList),
109
        ]
110

    
111
    self.assertEqual(OpTest(data=["a", "b", "c"]).Summary(),
112
                     "TEST(a,b,c)")
113
    self.assertEqual(OpTest(data=["a", None, "c"]).Summary(),
114
                     "TEST(a,None,c)")
115
    self.assertEqual(OpTest(data=[1, 2, 3, 4]).Summary(), "TEST(1,2,3,4)")
116

    
117
  def testOpId(self):
118
    self.assertFalse(utils.FindDuplicates(cls.OP_ID
119
                                          for cls in opcodes._GetOpList()))
120
    self.assertEqual(len(opcodes._GetOpList()), len(opcodes.OP_MAPPING))
121

    
122
  def testParams(self):
123
    supported_by_all = set(["debug_level", "dry_run", "priority"])
124

    
125
    self.assertTrue(opcodes.BaseOpCode not in opcodes.OP_MAPPING.values())
126
    self.assertTrue(opcodes.OpCode not in opcodes.OP_MAPPING.values())
127

    
128
    for cls in opcodes.OP_MAPPING.values() + [opcodes.OpCode]:
129
      all_slots = cls._all_slots()
130

    
131
      self.assertEqual(len(set(all_slots) & supported_by_all), 3,
132
                       msg=("Opcode %s doesn't support all base"
133
                            " parameters (%r)" % (cls.OP_ID, supported_by_all)))
134

    
135
      # All opcodes must have OP_PARAMS
136
      self.assert_(hasattr(cls, "OP_PARAMS"),
137
                   msg="%s doesn't have OP_PARAMS" % cls.OP_ID)
138

    
139
      param_names = [name for (name, _, _) in cls.GetAllParams()]
140

    
141
      self.assertEqual(all_slots, param_names)
142

    
143
      # Without inheritance
144
      self.assertEqual(cls.__slots__, [name for (name, _, _) in cls.OP_PARAMS])
145

    
146
      # This won't work if parameters are converted to a dictionary
147
      duplicates = utils.FindDuplicates(param_names)
148
      self.assertFalse(duplicates,
149
                       msg=("Found duplicate parameters %r in %s" %
150
                            (duplicates, cls.OP_ID)))
151

    
152
      # Check parameter definitions
153
      for attr_name, aval, test in cls.GetAllParams():
154
        self.assert_(attr_name)
155
        self.assert_(test is None or test is ht.NoType or callable(test),
156
                     msg=("Invalid type check for %s.%s" %
157
                          (cls.OP_ID, attr_name)))
158

    
159
        if callable(aval):
160
          self.assertFalse(callable(aval()),
161
                           msg="Default value returned by function is callable")
162

    
163
  def testValidateNoModification(self):
164
    class OpTest(opcodes.OpCode):
165
      OP_PARAMS = [
166
        ("nodef", ht.NoDefault, ht.TMaybeString),
167
        ("wdef", "default", ht.TMaybeString),
168
        ("number", 0, ht.TInt),
169
        ("notype", None, ht.NoType),
170
        ]
171

    
172
    # Missing required parameter "nodef"
173
    op = OpTest()
174
    before = op.__getstate__()
175
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
176
    self.assertFalse(hasattr(op, "nodef"))
177
    self.assertFalse(hasattr(op, "wdef"))
178
    self.assertFalse(hasattr(op, "number"))
179
    self.assertFalse(hasattr(op, "notype"))
180
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
181

    
182
    # Required parameter "nodef" is provided
183
    op = OpTest(nodef="foo")
184
    before = op.__getstate__()
185
    op.Validate(False)
186
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
187
    self.assertEqual(op.nodef, "foo")
188
    self.assertFalse(hasattr(op, "wdef"))
189
    self.assertFalse(hasattr(op, "number"))
190
    self.assertFalse(hasattr(op, "notype"))
191

    
192
    # Missing required parameter "nodef"
193
    op = OpTest(wdef="hello", number=999)
194
    before = op.__getstate__()
195
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
196
    self.assertFalse(hasattr(op, "nodef"))
197
    self.assertFalse(hasattr(op, "notype"))
198
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
199

    
200
    # Wrong type for "nodef"
201
    op = OpTest(nodef=987)
202
    before = op.__getstate__()
203
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
204
    self.assertEqual(op.nodef, 987)
205
    self.assertFalse(hasattr(op, "notype"))
206
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
207

    
208
    # Testing different types for "notype"
209
    op = OpTest(nodef="foo", notype=[1, 2, 3])
210
    before = op.__getstate__()
211
    op.Validate(False)
212
    self.assertEqual(op.nodef, "foo")
213
    self.assertEqual(op.notype, [1, 2, 3])
214
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
215

    
216
    op = OpTest(nodef="foo", notype="Hello World")
217
    before = op.__getstate__()
218
    op.Validate(False)
219
    self.assertEqual(op.nodef, "foo")
220
    self.assertEqual(op.notype, "Hello World")
221
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
222

    
223
  def testValidateSetDefaults(self):
224
    class OpTest(opcodes.OpCode):
225
      OP_PARAMS = [
226
        # Static default value
227
        ("value1", "default", ht.TMaybeString),
228

    
229
        # Default value callback
230
        ("value2", lambda: "result", ht.TMaybeString),
231
        ]
232

    
233
    op = OpTest()
234
    before = op.__getstate__()
235
    op.Validate(True)
236
    self.assertNotEqual(op.__getstate__(), before,
237
                        msg="Opcode was not modified")
238
    self.assertEqual(op.value1, "default")
239
    self.assertEqual(op.value2, "result")
240
    self.assert_(op.dry_run is None)
241
    self.assert_(op.debug_level is None)
242
    self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
243

    
244
    op = OpTest(value1="hello", value2="world", debug_level=123)
245
    before = op.__getstate__()
246
    op.Validate(True)
247
    self.assertNotEqual(op.__getstate__(), before,
248
                        msg="Opcode was not modified")
249
    self.assertEqual(op.value1, "hello")
250
    self.assertEqual(op.value2, "world")
251
    self.assertEqual(op.debug_level, 123)
252

    
253

    
254
if __name__ == "__main__":
255
  testutils.GanetiTestProgram()