Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.utils_unittest.py @ 651cc3e2

History | View | Annotate | Download (12.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import errno
25
import fcntl
26
import glob
27
import os
28
import os.path
29
import re
30
import shutil
31
import signal
32
import socket
33
import stat
34
import tempfile
35
import time
36
import unittest
37
import warnings
38
import random
39
import operator
40

    
41
import testutils
42
from ganeti import constants
43
from ganeti import compat
44
from ganeti import utils
45
from ganeti import errors
46
from ganeti.utils import RunCmd, \
47
     FirstFree, \
48
     RunParts
49

    
50

    
51
class TestParseCpuMask(unittest.TestCase):
52
  """Test case for the ParseCpuMask function."""
53

    
54
  def testWellFormed(self):
55
    self.assertEqual(utils.ParseCpuMask(""), [])
56
    self.assertEqual(utils.ParseCpuMask("1"), [1])
57
    self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
58

    
59
  def testInvalidInput(self):
60
    for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
61
      self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
62

    
63

    
64
class TestParseMultiCpuMask(unittest.TestCase):
65
  """Test case for the ParseMultiCpuMask function."""
66

    
67
  def testWellFormed(self):
68
    self.assertEqual(utils.ParseMultiCpuMask(""), [])
69
    self.assertEqual(utils.ParseMultiCpuMask("1"), [[1]])
70
    self.assertEqual(utils.ParseMultiCpuMask("0-2,4,5-5"), [[0, 1, 2, 4, 5]])
71
    self.assertEqual(utils.ParseMultiCpuMask("all"), [[-1]])
72
    self.assertEqual(utils.ParseMultiCpuMask("0-2:all:4,6-8"),
73
      [[0, 1, 2], [-1], [4, 6, 7, 8]])
74

    
75
  def testInvalidInput(self):
76
    for data in ["garbage", "0,", "0-1-2", "2-1", "1-a", "all-all"]:
77
      self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
78

    
79

    
80
class TestGetMounts(unittest.TestCase):
81
  """Test case for GetMounts()."""
82

    
83
  TESTDATA = (
84
    "rootfs /     rootfs rw 0 0\n"
85
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
86
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
87

    
88
  def setUp(self):
89
    self.tmpfile = tempfile.NamedTemporaryFile()
90
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
91

    
92
  def testGetMounts(self):
93
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
94
      [
95
        ("rootfs", "/", "rootfs", "rw"),
96
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
97
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
98
      ])
99

    
100

    
101
class TestFirstFree(unittest.TestCase):
102
  """Test case for the FirstFree function"""
103

    
104
  def test(self):
105
    """Test FirstFree"""
106
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
107
    self.failUnlessEqual(FirstFree([]), None)
108
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
109
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
110
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
111

    
112

    
113
class TestTimeFunctions(unittest.TestCase):
114
  """Test case for time functions"""
115

    
116
  def runTest(self):
117
    self.assertEqual(utils.SplitTime(1), (1, 0))
118
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
119
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
120
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
121
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
122
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
123
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
124
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
125

    
126
    self.assertRaises(AssertionError, utils.SplitTime, -1)
127

    
128
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
129
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
130
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
131

    
132
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
133
                     1218448917.481)
134
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
135

    
136
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
137
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
138
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
139
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
140
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
141

    
142

    
143
class FieldSetTestCase(unittest.TestCase):
144
  """Test case for FieldSets"""
145

    
146
  def testSimpleMatch(self):
147
    f = utils.FieldSet("a", "b", "c", "def")
148
    self.failUnless(f.Matches("a"))
149
    self.failIf(f.Matches("d"), "Substring matched")
150
    self.failIf(f.Matches("defghi"), "Prefix string matched")
151
    self.failIf(f.NonMatching(["b", "c"]))
152
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
153
    self.failUnless(f.NonMatching(["a", "d"]))
154

    
155
  def testRegexMatch(self):
156
    f = utils.FieldSet("a", "b([0-9]+)", "c")
157
    self.failUnless(f.Matches("b1"))
158
    self.failUnless(f.Matches("b99"))
159
    self.failIf(f.Matches("b/1"))
160
    self.failIf(f.NonMatching(["b12", "c"]))
161
    self.failUnless(f.NonMatching(["a", "1"]))
162

    
163
class TestForceDictType(unittest.TestCase):
164
  """Test case for ForceDictType"""
165
  KEY_TYPES = {
166
    "a": constants.VTYPE_INT,
167
    "b": constants.VTYPE_BOOL,
168
    "c": constants.VTYPE_STRING,
169
    "d": constants.VTYPE_SIZE,
170
    "e": constants.VTYPE_MAYBE_STRING,
171
    }
172

    
173
  def _fdt(self, dict, allowed_values=None):
174
    if allowed_values is None:
175
      utils.ForceDictType(dict, self.KEY_TYPES)
176
    else:
177
      utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
178

    
179
    return dict
180

    
181
  def testSimpleDict(self):
182
    self.assertEqual(self._fdt({}), {})
183
    self.assertEqual(self._fdt({"a": 1}), {"a": 1})
184
    self.assertEqual(self._fdt({"a": "1"}), {"a": 1})
185
    self.assertEqual(self._fdt({"a": 1, "b": 1}), {"a":1, "b": True})
186
    self.assertEqual(self._fdt({"b": 1, "c": "foo"}), {"b": True, "c": "foo"})
187
    self.assertEqual(self._fdt({"b": 1, "c": False}), {"b": True, "c": ""})
188
    self.assertEqual(self._fdt({"b": "false"}), {"b": False})
189
    self.assertEqual(self._fdt({"b": "False"}), {"b": False})
190
    self.assertEqual(self._fdt({"b": False}), {"b": False})
191
    self.assertEqual(self._fdt({"b": "true"}), {"b": True})
192
    self.assertEqual(self._fdt({"b": "True"}), {"b": True})
193
    self.assertEqual(self._fdt({"d": "4"}), {"d": 4})
194
    self.assertEqual(self._fdt({"d": "4M"}), {"d": 4})
195
    self.assertEqual(self._fdt({"e": None, }), {"e": None, })
196
    self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
197
    self.assertEqual(self._fdt({"e": False, }), {"e": "", })
198
    self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
199

    
200
  def testErrors(self):
201
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"a": "astring"})
202
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
203
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"c": True})
204
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"d": "astring"})
205
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"d": "4 L"})
206
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
207
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
208
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
209
    self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
210
    self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
211
                      {"b": "hello"}, {"b": "no-such-type"})
212

    
213

    
214
class TestValidateServiceName(unittest.TestCase):
215
  def testValid(self):
216
    testnames = [
217
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
218
      "ganeti",
219
      "gnt-masterd",
220
      "HELLO_WORLD_SVC",
221
      "hello.world.1",
222
      "0", "80", "1111", "65535",
223
      ]
224

    
225
    for name in testnames:
226
      self.assertEqual(utils.ValidateServiceName(name), name)
227

    
228
  def testInvalid(self):
229
    testnames = [
230
      -15756, -1, 65536, 133428083,
231
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
232
      "-8546", "-1", "65536",
233
      (129 * "A"),
234
      ]
235

    
236
    for name in testnames:
237
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
238

    
239

    
240
class TestReadLockedPidFile(unittest.TestCase):
241
  def setUp(self):
242
    self.tmpdir = tempfile.mkdtemp()
243

    
244
  def tearDown(self):
245
    shutil.rmtree(self.tmpdir)
246

    
247
  def testNonExistent(self):
248
    path = utils.PathJoin(self.tmpdir, "nonexist")
249
    self.assert_(utils.ReadLockedPidFile(path) is None)
250

    
251
  def testUnlocked(self):
252
    path = utils.PathJoin(self.tmpdir, "pid")
253
    utils.WriteFile(path, data="123")
254
    self.assert_(utils.ReadLockedPidFile(path) is None)
255

    
256
  def testLocked(self):
257
    path = utils.PathJoin(self.tmpdir, "pid")
258
    utils.WriteFile(path, data="123")
259

    
260
    fl = utils.FileLock.Open(path)
261
    try:
262
      fl.Exclusive(blocking=True)
263

    
264
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
265
    finally:
266
      fl.Close()
267

    
268
    self.assert_(utils.ReadLockedPidFile(path) is None)
269

    
270
  def testError(self):
271
    path = utils.PathJoin(self.tmpdir, "foobar", "pid")
272
    utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
273
    # open(2) should return ENOTDIR
274
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
275

    
276

    
277
class TestFindMatch(unittest.TestCase):
278
  def test(self):
279
    data = {
280
      "aaaa": "Four A",
281
      "bb": {"Two B": True},
282
      re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
283
      }
284

    
285
    self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
286
    self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
287

    
288
    for i in ["foo", "bar", "bazX"]:
289
      for j in range(1, 100, 7):
290
        self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
291
                         ((1, 2, 3), [i, str(j)]))
292

    
293
  def testNoMatch(self):
294
    self.assert_(utils.FindMatch({}, "") is None)
295
    self.assert_(utils.FindMatch({}, "foo") is None)
296
    self.assert_(utils.FindMatch({}, 1234) is None)
297

    
298
    data = {
299
      "X": "Hello World",
300
      re.compile("^(something)$"): "Hello World",
301
      }
302

    
303
    self.assert_(utils.FindMatch(data, "") is None)
304
    self.assert_(utils.FindMatch(data, "Hello World") is None)
305

    
306

    
307
class TestTryConvert(unittest.TestCase):
308
  def test(self):
309
    for src, fn, result in [
310
      ("1", int, 1),
311
      ("a", int, "a"),
312
      ("", bool, False),
313
      ("a", bool, True),
314
      ]:
315
      self.assertEqual(utils.TryConvert(fn, src), result)
316

    
317

    
318
class TestVerifyDictOptions(unittest.TestCase):
319
  def setUp(self):
320
    self.defaults = {
321
      "first_key": "foobar",
322
      "foobar": {
323
        "key1": "value2",
324
        "key2": "value1",
325
        },
326
      "another_key": "another_value",
327
      }
328

    
329
  def test(self):
330
    some_keys = {
331
      "first_key": "blubb",
332
      "foobar": {
333
        "key2": "foo",
334
        },
335
      }
336
    utils.VerifyDictOptions(some_keys, self.defaults)
337

    
338
  def testInvalid(self):
339
    some_keys = {
340
      "invalid_key": "blubb",
341
      "foobar": {
342
        "key2": "foo",
343
        },
344
      }
345
    self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
346
                      some_keys, self.defaults)
347

    
348
  def testNestedInvalid(self):
349
    some_keys = {
350
      "foobar": {
351
        "key2": "foo",
352
        "key3": "blibb"
353
        },
354
      }
355
    self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
356
                      some_keys, self.defaults)
357

    
358
  def testMultiInvalid(self):
359
    some_keys = {
360
        "foobar": {
361
          "key1": "value3",
362
          "key6": "Right here",
363
        },
364
        "invalid_with_sub": {
365
          "sub1": "value3",
366
        },
367
      }
368
    self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
369
                      some_keys, self.defaults)
370

    
371

    
372
class TestValidateDeviceNames(unittest.TestCase):
373
  def testEmpty(self):
374
    utils.ValidateDeviceNames("NIC", [])
375
    utils.ValidateDeviceNames("disk", [])
376

    
377
  def testNoName(self):
378
    nics = [{}, {}]
379
    utils.ValidateDeviceNames("NIC", nics)
380

    
381
  def testInvalidName(self):
382
    self.assertRaises(errors.OpPrereqError, utils.ValidateDeviceNames,
383
                      "disk", [{constants.IDISK_NAME: "42"}])
384
    self.assertRaises(errors.OpPrereqError, utils.ValidateDeviceNames,
385
                      "NIC", [{constants.INIC_NAME: "42"}])
386

    
387
  def testUsedName(self):
388
    disks = [{constants.IDISK_NAME: "name1"}, {constants.IDISK_NAME: "name1"}]
389
    self.assertRaises(errors.OpPrereqError, utils.ValidateDeviceNames,
390
                      "disk", disks)
391

    
392

    
393
if __name__ == "__main__":
394
  testutils.GanetiTestProgram()