Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.utils_unittest.py @ 24711492

History | View | Annotate | Download (12.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import errno
25
import fcntl
26
import glob
27
import os
28
import os.path
29
import re
30
import shutil
31
import signal
32
import socket
33
import stat
34
import tempfile
35
import time
36
import unittest
37
import warnings
38
import random
39
import operator
40

    
41
import testutils
42
from ganeti import constants
43
from ganeti import compat
44
from ganeti import utils
45
from ganeti import errors
46
from ganeti.utils import RunCmd, \
47
     FirstFree, \
48
     RunParts
49

    
50

    
51
class TestParseCpuMask(unittest.TestCase):
52
  """Test case for the ParseCpuMask function."""
53

    
54
  def testWellFormed(self):
55
    self.assertEqual(utils.ParseCpuMask(""), [])
56
    self.assertEqual(utils.ParseCpuMask("1"), [1])
57
    self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
58

    
59
  def testInvalidInput(self):
60
    for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
61
      self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
62

    
63

    
64
class TestParseMultiCpuMask(unittest.TestCase):
65
  """Test case for the ParseMultiCpuMask function."""
66

    
67
  def testWellFormed(self):
68
    self.assertEqual(utils.ParseMultiCpuMask(""), [])
69
    self.assertEqual(utils.ParseMultiCpuMask("1"), [[1]])
70
    self.assertEqual(utils.ParseMultiCpuMask("0-2,4,5-5"), [[0, 1, 2, 4, 5]])
71
    self.assertEqual(utils.ParseMultiCpuMask("all"), [[-1]])
72
    self.assertEqual(utils.ParseMultiCpuMask("0-2:all:4,6-8"),
73
      [[0, 1, 2], [-1], [4, 6, 7, 8]])
74

    
75
  def testInvalidInput(self):
76
    for data in ["garbage", "0,", "0-1-2", "2-1", "1-a", "all-all"]:
77
      self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
78

    
79

    
80
class TestGetMounts(unittest.TestCase):
81
  """Test case for GetMounts()."""
82

    
83
  TESTDATA = (
84
    "rootfs /     rootfs rw 0 0\n"
85
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
86
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
87

    
88
  def setUp(self):
89
    self.tmpfile = tempfile.NamedTemporaryFile()
90
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
91

    
92
  def testGetMounts(self):
93
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
94
      [
95
        ("rootfs", "/", "rootfs", "rw"),
96
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
97
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
98
      ])
99

    
100

    
101
class TestFirstFree(unittest.TestCase):
102
  """Test case for the FirstFree function"""
103

    
104
  def test(self):
105
    """Test FirstFree"""
106
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
107
    self.failUnlessEqual(FirstFree([]), None)
108
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
109
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
110
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
111

    
112

    
113
class TestTimeFunctions(unittest.TestCase):
114
  """Test case for time functions"""
115

    
116
  def runTest(self):
117
    self.assertEqual(utils.SplitTime(1), (1, 0))
118
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
119
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
120
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
121
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
122
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
123
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
124
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
125

    
126
    self.assertRaises(AssertionError, utils.SplitTime, -1)
127

    
128
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
129
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
130
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
131

    
132
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
133
                     1218448917.481)
134
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
135

    
136
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
137
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
138
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
139
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
140
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
141

    
142

    
143
class FieldSetTestCase(unittest.TestCase):
144
  """Test case for FieldSets"""
145

    
146
  def testSimpleMatch(self):
147
    f = utils.FieldSet("a", "b", "c", "def")
148
    self.failUnless(f.Matches("a"))
149
    self.failIf(f.Matches("d"), "Substring matched")
150
    self.failIf(f.Matches("defghi"), "Prefix string matched")
151
    self.failIf(f.NonMatching(["b", "c"]))
152
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
153
    self.failUnless(f.NonMatching(["a", "d"]))
154

    
155
  def testRegexMatch(self):
156
    f = utils.FieldSet("a", "b([0-9]+)", "c")
157
    self.failUnless(f.Matches("b1"))
158
    self.failUnless(f.Matches("b99"))
159
    self.failIf(f.Matches("b/1"))
160
    self.failIf(f.NonMatching(["b12", "c"]))
161
    self.failUnless(f.NonMatching(["a", "1"]))
162

    
163
class TestForceDictType(unittest.TestCase):
164
  """Test case for ForceDictType"""
165
  KEY_TYPES = {
166
    "a": constants.VTYPE_INT,
167
    "b": constants.VTYPE_BOOL,
168
    "c": constants.VTYPE_STRING,
169
    "d": constants.VTYPE_SIZE,
170
    "e": constants.VTYPE_MAYBE_STRING,
171
    }
172

    
173
  def _fdt(self, dict, allowed_values=None):
174
    if allowed_values is None:
175
      utils.ForceDictType(dict, self.KEY_TYPES)
176
    else:
177
      utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
178

    
179
    return dict
180

    
181
  def testSimpleDict(self):
182
    self.assertEqual(self._fdt({}), {})
183
    self.assertEqual(self._fdt({"a": 1}), {"a": 1})
184
    self.assertEqual(self._fdt({"a": "1"}), {"a": 1})
185
    self.assertEqual(self._fdt({"a": 1, "b": 1}), {"a":1, "b": True})
186
    self.assertEqual(self._fdt({"b": 1, "c": "foo"}), {"b": True, "c": "foo"})
187
    self.assertEqual(self._fdt({"b": 1, "c": False}), {"b": True, "c": ""})
188
    self.assertEqual(self._fdt({"b": "false"}), {"b": False})
189
    self.assertEqual(self._fdt({"b": "False"}), {"b": False})
190
    self.assertEqual(self._fdt({"b": False}), {"b": False})
191
    self.assertEqual(self._fdt({"b": "true"}), {"b": True})
192
    self.assertEqual(self._fdt({"b": "True"}), {"b": True})
193
    self.assertEqual(self._fdt({"d": "4"}), {"d": 4})
194
    self.assertEqual(self._fdt({"d": "4M"}), {"d": 4})
195
    self.assertEqual(self._fdt({"e": constants.VALUE_HS_NOTHING, }), {"e":
196
                               constants.VALUE_HS_NOTHING, })
197
    self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
198
    self.assertEqual(self._fdt({"e": False, }), {"e": "", })
199
    self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
200

    
201
  def testErrors(self):
202
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"a": "astring"})
203
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
204
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"c": True})
205
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"d": "astring"})
206
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"d": "4 L"})
207
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
208
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
209
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
210
    self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
211
    self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
212
                      {"b": "hello"}, {"b": "no-such-type"})
213

    
214

    
215
class TestValidateServiceName(unittest.TestCase):
216
  def testValid(self):
217
    testnames = [
218
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
219
      "ganeti",
220
      "gnt-masterd",
221
      "HELLO_WORLD_SVC",
222
      "hello.world.1",
223
      "0", "80", "1111", "65535",
224
      ]
225

    
226
    for name in testnames:
227
      self.assertEqual(utils.ValidateServiceName(name), name)
228

    
229
  def testInvalid(self):
230
    testnames = [
231
      -15756, -1, 65536, 133428083,
232
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
233
      "-8546", "-1", "65536",
234
      (129 * "A"),
235
      ]
236

    
237
    for name in testnames:
238
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
239

    
240

    
241
class TestReadLockedPidFile(unittest.TestCase):
242
  def setUp(self):
243
    self.tmpdir = tempfile.mkdtemp()
244

    
245
  def tearDown(self):
246
    shutil.rmtree(self.tmpdir)
247

    
248
  def testNonExistent(self):
249
    path = utils.PathJoin(self.tmpdir, "nonexist")
250
    self.assert_(utils.ReadLockedPidFile(path) is None)
251

    
252
  def testUnlocked(self):
253
    path = utils.PathJoin(self.tmpdir, "pid")
254
    utils.WriteFile(path, data="123")
255
    self.assert_(utils.ReadLockedPidFile(path) is None)
256

    
257
  def testLocked(self):
258
    path = utils.PathJoin(self.tmpdir, "pid")
259
    utils.WriteFile(path, data="123")
260

    
261
    fl = utils.FileLock.Open(path)
262
    try:
263
      fl.Exclusive(blocking=True)
264

    
265
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
266
    finally:
267
      fl.Close()
268

    
269
    self.assert_(utils.ReadLockedPidFile(path) is None)
270

    
271
  def testError(self):
272
    path = utils.PathJoin(self.tmpdir, "foobar", "pid")
273
    utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
274
    # open(2) should return ENOTDIR
275
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
276

    
277

    
278
class TestFindMatch(unittest.TestCase):
279
  def test(self):
280
    data = {
281
      "aaaa": "Four A",
282
      "bb": {"Two B": True},
283
      re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
284
      }
285

    
286
    self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
287
    self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
288

    
289
    for i in ["foo", "bar", "bazX"]:
290
      for j in range(1, 100, 7):
291
        self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
292
                         ((1, 2, 3), [i, str(j)]))
293

    
294
  def testNoMatch(self):
295
    self.assert_(utils.FindMatch({}, "") is None)
296
    self.assert_(utils.FindMatch({}, "foo") is None)
297
    self.assert_(utils.FindMatch({}, 1234) is None)
298

    
299
    data = {
300
      "X": "Hello World",
301
      re.compile("^(something)$"): "Hello World",
302
      }
303

    
304
    self.assert_(utils.FindMatch(data, "") is None)
305
    self.assert_(utils.FindMatch(data, "Hello World") is None)
306

    
307

    
308
class TestTryConvert(unittest.TestCase):
309
  def test(self):
310
    for src, fn, result in [
311
      ("1", int, 1),
312
      ("a", int, "a"),
313
      ("", bool, False),
314
      ("a", bool, True),
315
      ]:
316
      self.assertEqual(utils.TryConvert(fn, src), result)
317

    
318

    
319
class TestVerifyDictOptions(unittest.TestCase):
320
  def setUp(self):
321
    self.defaults = {
322
      "first_key": "foobar",
323
      "foobar": {
324
        "key1": "value2",
325
        "key2": "value1",
326
        },
327
      "another_key": "another_value",
328
      }
329

    
330
  def test(self):
331
    some_keys = {
332
      "first_key": "blubb",
333
      "foobar": {
334
        "key2": "foo",
335
        },
336
      }
337
    utils.VerifyDictOptions(some_keys, self.defaults)
338

    
339
  def testInvalid(self):
340
    some_keys = {
341
      "invalid_key": "blubb",
342
      "foobar": {
343
        "key2": "foo",
344
        },
345
      }
346
    self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
347
                      some_keys, self.defaults)
348

    
349
  def testNestedInvalid(self):
350
    some_keys = {
351
      "foobar": {
352
        "key2": "foo",
353
        "key3": "blibb"
354
        },
355
      }
356
    self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
357
                      some_keys, self.defaults)
358

    
359
  def testMultiInvalid(self):
360
    some_keys = {
361
        "foobar": {
362
          "key1": "value3",
363
          "key6": "Right here",
364
        },
365
        "invalid_with_sub": {
366
          "sub1": "value3",
367
        },
368
      }
369
    self.assertRaises(errors.OpPrereqError, utils.VerifyDictOptions,
370
                      some_keys, self.defaults)
371

    
372

    
373
class TestValidateDeviceNames(unittest.TestCase):
374
  def testEmpty(self):
375
    utils.ValidateDeviceNames("NIC", [])
376
    utils.ValidateDeviceNames("disk", [])
377

    
378
  def testNoName(self):
379
    nics = [{}, {}]
380
    utils.ValidateDeviceNames("NIC", nics)
381

    
382
  def testInvalidName(self):
383
    self.assertRaises(errors.OpPrereqError, utils.ValidateDeviceNames,
384
                      "disk", [{constants.IDISK_NAME: "42"}])
385
    self.assertRaises(errors.OpPrereqError, utils.ValidateDeviceNames,
386
                      "NIC", [{constants.INIC_NAME: "42"}])
387

    
388
  def testUsedName(self):
389
    disks = [{constants.IDISK_NAME: "name1"}, {constants.IDISK_NAME: "name1"}]
390
    self.assertRaises(errors.OpPrereqError, utils.ValidateDeviceNames,
391
                      "disk", disks)
392

    
393

    
394
if __name__ == "__main__":
395
  testutils.GanetiTestProgram()