Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 90e234a6

History | View | Annotate | Download (10 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import errno
25
import fcntl
26
import glob
27
import os
28
import os.path
29
import re
30
import shutil
31
import signal
32
import socket
33
import stat
34
import tempfile
35
import time
36
import unittest
37
import warnings
38
import random
39
import operator
40

    
41
import testutils
42
from ganeti import constants
43
from ganeti import compat
44
from ganeti import utils
45
from ganeti import errors
46
from ganeti.utils import RunCmd, \
47
     FirstFree, \
48
     RunParts
49

    
50

    
51
class TestParseCpuMask(unittest.TestCase):
52
  """Test case for the ParseCpuMask function."""
53

    
54
  def testWellFormed(self):
55
    self.assertEqual(utils.ParseCpuMask(""), [])
56
    self.assertEqual(utils.ParseCpuMask("1"), [1])
57
    self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
58

    
59
  def testInvalidInput(self):
60
    for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
61
      self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
62

    
63

    
64
class TestGetMounts(unittest.TestCase):
65
  """Test case for GetMounts()."""
66

    
67
  TESTDATA = (
68
    "rootfs /     rootfs rw 0 0\n"
69
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
70
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
71

    
72
  def setUp(self):
73
    self.tmpfile = tempfile.NamedTemporaryFile()
74
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
75

    
76
  def testGetMounts(self):
77
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
78
      [
79
        ("rootfs", "/", "rootfs", "rw"),
80
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
81
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
82
      ])
83

    
84

    
85
class TestFirstFree(unittest.TestCase):
86
  """Test case for the FirstFree function"""
87

    
88
  def test(self):
89
    """Test FirstFree"""
90
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
91
    self.failUnlessEqual(FirstFree([]), None)
92
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
93
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
94
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
95

    
96

    
97
class TestTimeFunctions(unittest.TestCase):
98
  """Test case for time functions"""
99

    
100
  def runTest(self):
101
    self.assertEqual(utils.SplitTime(1), (1, 0))
102
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
103
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
104
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
105
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
106
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
107
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
108
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
109

    
110
    self.assertRaises(AssertionError, utils.SplitTime, -1)
111

    
112
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
113
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
114
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
115

    
116
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
117
                     1218448917.481)
118
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
119

    
120
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
121
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
122
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
123
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
124
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
125

    
126

    
127
class FieldSetTestCase(unittest.TestCase):
128
  """Test case for FieldSets"""
129

    
130
  def testSimpleMatch(self):
131
    f = utils.FieldSet("a", "b", "c", "def")
132
    self.failUnless(f.Matches("a"))
133
    self.failIf(f.Matches("d"), "Substring matched")
134
    self.failIf(f.Matches("defghi"), "Prefix string matched")
135
    self.failIf(f.NonMatching(["b", "c"]))
136
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
137
    self.failUnless(f.NonMatching(["a", "d"]))
138

    
139
  def testRegexMatch(self):
140
    f = utils.FieldSet("a", "b([0-9]+)", "c")
141
    self.failUnless(f.Matches("b1"))
142
    self.failUnless(f.Matches("b99"))
143
    self.failIf(f.Matches("b/1"))
144
    self.failIf(f.NonMatching(["b12", "c"]))
145
    self.failUnless(f.NonMatching(["a", "1"]))
146

    
147
class TestForceDictType(unittest.TestCase):
148
  """Test case for ForceDictType"""
149
  KEY_TYPES = {
150
    "a": constants.VTYPE_INT,
151
    "b": constants.VTYPE_BOOL,
152
    "c": constants.VTYPE_STRING,
153
    "d": constants.VTYPE_SIZE,
154
    "e": constants.VTYPE_MAYBE_STRING,
155
    }
156

    
157
  def _fdt(self, dict, allowed_values=None):
158
    if allowed_values is None:
159
      utils.ForceDictType(dict, self.KEY_TYPES)
160
    else:
161
      utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
162

    
163
    return dict
164

    
165
  def testSimpleDict(self):
166
    self.assertEqual(self._fdt({}), {})
167
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
168
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
169
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
170
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
171
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
172
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
173
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
174
    self.assertEqual(self._fdt({'b': False}), {'b': False})
175
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
176
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
177
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
178
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
179
    self.assertEqual(self._fdt({"e": None, }), {"e": None, })
180
    self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
181
    self.assertEqual(self._fdt({"e": False, }), {"e": '', })
182
    self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
183

    
184
  def testErrors(self):
185
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
186
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
187
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
188
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
189
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
190
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
191
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
192
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
193
    self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
194
    self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
195
                      {"b": "hello"}, {"b": "no-such-type"})
196

    
197

    
198
class TestValidateServiceName(unittest.TestCase):
199
  def testValid(self):
200
    testnames = [
201
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
202
      "ganeti",
203
      "gnt-masterd",
204
      "HELLO_WORLD_SVC",
205
      "hello.world.1",
206
      "0", "80", "1111", "65535",
207
      ]
208

    
209
    for name in testnames:
210
      self.assertEqual(utils.ValidateServiceName(name), name)
211

    
212
  def testInvalid(self):
213
    testnames = [
214
      -15756, -1, 65536, 133428083,
215
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
216
      "-8546", "-1", "65536",
217
      (129 * "A"),
218
      ]
219

    
220
    for name in testnames:
221
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
222

    
223

    
224
class TestReadLockedPidFile(unittest.TestCase):
225
  def setUp(self):
226
    self.tmpdir = tempfile.mkdtemp()
227

    
228
  def tearDown(self):
229
    shutil.rmtree(self.tmpdir)
230

    
231
  def testNonExistent(self):
232
    path = utils.PathJoin(self.tmpdir, "nonexist")
233
    self.assert_(utils.ReadLockedPidFile(path) is None)
234

    
235
  def testUnlocked(self):
236
    path = utils.PathJoin(self.tmpdir, "pid")
237
    utils.WriteFile(path, data="123")
238
    self.assert_(utils.ReadLockedPidFile(path) is None)
239

    
240
  def testLocked(self):
241
    path = utils.PathJoin(self.tmpdir, "pid")
242
    utils.WriteFile(path, data="123")
243

    
244
    fl = utils.FileLock.Open(path)
245
    try:
246
      fl.Exclusive(blocking=True)
247

    
248
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
249
    finally:
250
      fl.Close()
251

    
252
    self.assert_(utils.ReadLockedPidFile(path) is None)
253

    
254
  def testError(self):
255
    path = utils.PathJoin(self.tmpdir, "foobar", "pid")
256
    utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
257
    # open(2) should return ENOTDIR
258
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
259

    
260

    
261
class TestFindMatch(unittest.TestCase):
262
  def test(self):
263
    data = {
264
      "aaaa": "Four A",
265
      "bb": {"Two B": True},
266
      re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
267
      }
268

    
269
    self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
270
    self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
271

    
272
    for i in ["foo", "bar", "bazX"]:
273
      for j in range(1, 100, 7):
274
        self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
275
                         ((1, 2, 3), [i, str(j)]))
276

    
277
  def testNoMatch(self):
278
    self.assert_(utils.FindMatch({}, "") is None)
279
    self.assert_(utils.FindMatch({}, "foo") is None)
280
    self.assert_(utils.FindMatch({}, 1234) is None)
281

    
282
    data = {
283
      "X": "Hello World",
284
      re.compile("^(something)$"): "Hello World",
285
      }
286

    
287
    self.assert_(utils.FindMatch(data, "") is None)
288
    self.assert_(utils.FindMatch(data, "Hello World") is None)
289

    
290

    
291
class TestTryConvert(unittest.TestCase):
292
  def test(self):
293
    for src, fn, result in [
294
      ("1", int, 1),
295
      ("a", int, "a"),
296
      ("", bool, False),
297
      ("a", bool, True),
298
      ]:
299
      self.assertEqual(utils.TryConvert(fn, src), result)
300

    
301

    
302
if __name__ == '__main__':
303
  testutils.GanetiTestProgram()