Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.opcodes_unittest.py @ 8572f1fe

History | View | Annotate | Download (8.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.backend"""
23

    
24
import os
25
import sys
26
import unittest
27

    
28
from ganeti import utils
29
from ganeti import opcodes
30
from ganeti import ht
31
from ganeti import constants
32
from ganeti import errors
33

    
34
import testutils
35

    
36

    
37
class TestOpcodes(unittest.TestCase):
38
  def test(self):
39
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, None)
40
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, "")
41
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, {})
42
    self.assertRaises(ValueError, opcodes.OpCode.LoadOpCode, {"OP_ID": ""})
43

    
44
    for cls in opcodes.OP_MAPPING.values():
45
      self.assert_(cls.OP_ID.startswith("OP_"))
46
      self.assert_(len(cls.OP_ID) > 3)
47
      self.assertEqual(cls.OP_ID, cls.OP_ID.upper())
48

    
49
      self.assertRaises(TypeError, cls, unsupported_parameter="some value")
50

    
51
      args = [
52
        # No variables
53
        {},
54

    
55
        # Variables supported by all opcodes
56
        {"dry_run": False, "debug_level": 0, },
57

    
58
        # All variables
59
        dict([(name, False) for name in cls._all_slots()])
60
        ]
61

    
62
      for i in args:
63
        op = cls(**i)
64

    
65
        self.assertEqual(op.OP_ID, cls.OP_ID)
66
        self._checkSummary(op)
67

    
68
        # Try a restore
69
        state = op.__getstate__()
70
        self.assert_(isinstance(state, dict))
71

    
72
        restored = opcodes.OpCode.LoadOpCode(state)
73
        self.assert_(isinstance(restored, cls))
74
        self._checkSummary(restored)
75

    
76
        for name in ["x_y_z", "hello_world"]:
77
          assert name not in cls._all_slots()
78
          for value in [None, True, False, [], "Hello World"]:
79
            self.assertRaises(AttributeError, setattr, op, name, value)
80

    
81
  def _checkSummary(self, op):
82
    summary = op.Summary()
83

    
84
    if hasattr(op, "OP_DSC_FIELD"):
85
      self.assert_(("OP_%s" % summary).startswith("%s(" % op.OP_ID))
86
      self.assert_(summary.endswith(")"))
87
    else:
88
      self.assertEqual("OP_%s" % summary, op.OP_ID)
89

    
90
  def testSummary(self):
91
    class _TestOp(opcodes.OpCode):
92
      OP_ID = "OP_TEST"
93
      OP_DSC_FIELD = "data"
94
      OP_PARAMS = [
95
        ("data", ht.NoDefault, ht.TString),
96
        ]
97

    
98
    self.assertEqual(_TestOp(data="").Summary(), "TEST()")
99
    self.assertEqual(_TestOp(data="Hello World").Summary(),
100
                     "TEST(Hello World)")
101
    self.assertEqual(_TestOp(data="node1.example.com").Summary(),
102
                     "TEST(node1.example.com)")
103

    
104
  def testListSummary(self):
105
    class _TestOp(opcodes.OpCode):
106
      OP_ID = "OP_TEST"
107
      OP_DSC_FIELD = "data"
108
      OP_PARAMS = [
109
        ("data", ht.NoDefault, ht.TList),
110
        ]
111

    
112
    self.assertEqual(_TestOp(data=["a", "b", "c"]).Summary(),
113
                     "TEST(a,b,c)")
114
    self.assertEqual(_TestOp(data=["a", None, "c"]).Summary(),
115
                     "TEST(a,None,c)")
116
    self.assertEqual(_TestOp(data=[1, 2, 3, 4]).Summary(), "TEST(1,2,3,4)")
117

    
118
  def testOpId(self):
119
    self.assertFalse(utils.FindDuplicates(cls.OP_ID
120
                                          for cls in opcodes._GetOpList()))
121
    self.assertEqual(len(opcodes._GetOpList()), len(opcodes.OP_MAPPING))
122

    
123
  def testParams(self):
124
    supported_by_all = set(["debug_level", "dry_run", "priority"])
125

    
126
    self.assertTrue(opcodes.BaseOpCode not in opcodes.OP_MAPPING.values())
127
    self.assertTrue(opcodes.OpCode not in opcodes.OP_MAPPING.values())
128

    
129
    for cls in opcodes.OP_MAPPING.values() + [opcodes.OpCode]:
130
      all_slots = cls._all_slots()
131

    
132
      self.assertEqual(len(set(all_slots) & supported_by_all), 3,
133
                       msg=("Opcode %s doesn't support all base"
134
                            " parameters (%r)" % (cls.OP_ID, supported_by_all)))
135

    
136
      # All opcodes must have OP_PARAMS
137
      self.assert_(hasattr(cls, "OP_PARAMS"),
138
                   msg="%s doesn't have OP_PARAMS" % cls.OP_ID)
139

    
140
      param_names = [name for (name, _, _) in cls.GetAllParams()]
141

    
142
      self.assertEqual(all_slots, param_names)
143

    
144
      # Without inheritance
145
      self.assertEqual(cls.__slots__, [name for (name, _, _) in cls.OP_PARAMS])
146

    
147
      # This won't work if parameters are converted to a dictionary
148
      duplicates = utils.FindDuplicates(param_names)
149
      self.assertFalse(duplicates,
150
                       msg=("Found duplicate parameters %r in %s" %
151
                            (duplicates, cls.OP_ID)))
152

    
153
      # Check parameter definitions
154
      for attr_name, aval, test in cls.GetAllParams():
155
        self.assert_(attr_name)
156
        self.assert_(test is None or test is ht.NoType or callable(test),
157
                     msg=("Invalid type check for %s.%s" %
158
                          (cls.OP_ID, attr_name)))
159

    
160
        if callable(aval):
161
          self.assertFalse(callable(aval()),
162
                           msg="Default value returned by function is callable")
163

    
164
  def testValidateNoModification(self):
165
    class _TestOp(opcodes.OpCode):
166
      OP_ID = "OP_TEST"
167
      OP_PARAMS = [
168
        ("nodef", ht.NoDefault, ht.TMaybeString),
169
        ("wdef", "default", ht.TMaybeString),
170
        ("number", 0, ht.TInt),
171
        ("notype", None, ht.NoType),
172
        ]
173

    
174
    # Missing required parameter "nodef"
175
    op = _TestOp()
176
    before = op.__getstate__()
177
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
178
    self.assertFalse(hasattr(op, "nodef"))
179
    self.assertFalse(hasattr(op, "wdef"))
180
    self.assertFalse(hasattr(op, "number"))
181
    self.assertFalse(hasattr(op, "notype"))
182
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
183

    
184
    # Required parameter "nodef" is provided
185
    op = _TestOp(nodef="foo")
186
    before = op.__getstate__()
187
    op.Validate(False)
188
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
189
    self.assertEqual(op.nodef, "foo")
190
    self.assertFalse(hasattr(op, "wdef"))
191
    self.assertFalse(hasattr(op, "number"))
192
    self.assertFalse(hasattr(op, "notype"))
193

    
194
    # Missing required parameter "nodef"
195
    op = _TestOp(wdef="hello", number=999)
196
    before = op.__getstate__()
197
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
198
    self.assertFalse(hasattr(op, "nodef"))
199
    self.assertFalse(hasattr(op, "notype"))
200
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
201

    
202
    # Wrong type for "nodef"
203
    op = _TestOp(nodef=987)
204
    before = op.__getstate__()
205
    self.assertRaises(errors.OpPrereqError, op.Validate, False)
206
    self.assertEqual(op.nodef, 987)
207
    self.assertFalse(hasattr(op, "notype"))
208
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
209

    
210
    # Testing different types for "notype"
211
    op = _TestOp(nodef="foo", notype=[1, 2, 3])
212
    before = op.__getstate__()
213
    op.Validate(False)
214
    self.assertEqual(op.nodef, "foo")
215
    self.assertEqual(op.notype, [1, 2, 3])
216
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
217

    
218
    op = _TestOp(nodef="foo", notype="Hello World")
219
    before = op.__getstate__()
220
    op.Validate(False)
221
    self.assertEqual(op.nodef, "foo")
222
    self.assertEqual(op.notype, "Hello World")
223
    self.assertEqual(op.__getstate__(), before, msg="Opcode was modified")
224

    
225
  def testValidateSetDefaults(self):
226
    class _TestOp(opcodes.OpCode):
227
      OP_ID = "OP_TEST"
228
      OP_PARAMS = [
229
        # Static default value
230
        ("value1", "default", ht.TMaybeString),
231

    
232
        # Default value callback
233
        ("value2", lambda: "result", ht.TMaybeString),
234
        ]
235

    
236
    op = _TestOp()
237
    before = op.__getstate__()
238
    op.Validate(True)
239
    self.assertNotEqual(op.__getstate__(), before,
240
                        msg="Opcode was not modified")
241
    self.assertEqual(op.value1, "default")
242
    self.assertEqual(op.value2, "result")
243
    self.assert_(op.dry_run is None)
244
    self.assert_(op.debug_level is None)
245
    self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
246

    
247
    op = _TestOp(value1="hello", value2="world", debug_level=123)
248
    before = op.__getstate__()
249
    op.Validate(True)
250
    self.assertNotEqual(op.__getstate__(), before,
251
                        msg="Opcode was not modified")
252
    self.assertEqual(op.value1, "hello")
253
    self.assertEqual(op.value2, "world")
254
    self.assertEqual(op.debug_level, 123)
255

    
256

    
257
if __name__ == "__main__":
258
  testutils.GanetiTestProgram()