Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 7ebd876f

History | View | Annotate | Download (10.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import errno
25
import fcntl
26
import glob
27
import os
28
import os.path
29
import re
30
import shutil
31
import signal
32
import socket
33
import stat
34
import tempfile
35
import time
36
import unittest
37
import warnings
38
import random
39
import operator
40

    
41
import testutils
42
from ganeti import constants
43
from ganeti import compat
44
from ganeti import utils
45
from ganeti import errors
46
from ganeti.utils import RunCmd, \
47
     FirstFree, \
48
     RunParts
49

    
50

    
51
class TestParseCpuMask(unittest.TestCase):
52
  """Test case for the ParseCpuMask function."""
53

    
54
  def testWellFormed(self):
55
    self.assertEqual(utils.ParseCpuMask(""), [])
56
    self.assertEqual(utils.ParseCpuMask("1"), [1])
57
    self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
58

    
59
  def testInvalidInput(self):
60
    for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
61
      self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
62

    
63

    
64
class TestGetMounts(unittest.TestCase):
65
  """Test case for GetMounts()."""
66

    
67
  TESTDATA = (
68
    "rootfs /     rootfs rw 0 0\n"
69
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
70
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
71

    
72
  def setUp(self):
73
    self.tmpfile = tempfile.NamedTemporaryFile()
74
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
75

    
76
  def testGetMounts(self):
77
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
78
      [
79
        ("rootfs", "/", "rootfs", "rw"),
80
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
81
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
82
      ])
83

    
84
class TestNewUUID(unittest.TestCase):
85
  """Test case for NewUUID"""
86

    
87
  def runTest(self):
88
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
89

    
90

    
91
class TestFirstFree(unittest.TestCase):
92
  """Test case for the FirstFree function"""
93

    
94
  def test(self):
95
    """Test FirstFree"""
96
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
97
    self.failUnlessEqual(FirstFree([]), None)
98
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
99
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
100
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
101

    
102

    
103
class TestTimeFunctions(unittest.TestCase):
104
  """Test case for time functions"""
105

    
106
  def runTest(self):
107
    self.assertEqual(utils.SplitTime(1), (1, 0))
108
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
109
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
110
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
111
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
112
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
113
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
114
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
115

    
116
    self.assertRaises(AssertionError, utils.SplitTime, -1)
117

    
118
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
119
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
120
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
121

    
122
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
123
                     1218448917.481)
124
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
125

    
126
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
127
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
128
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
129
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
130
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
131

    
132

    
133
class FieldSetTestCase(unittest.TestCase):
134
  """Test case for FieldSets"""
135

    
136
  def testSimpleMatch(self):
137
    f = utils.FieldSet("a", "b", "c", "def")
138
    self.failUnless(f.Matches("a"))
139
    self.failIf(f.Matches("d"), "Substring matched")
140
    self.failIf(f.Matches("defghi"), "Prefix string matched")
141
    self.failIf(f.NonMatching(["b", "c"]))
142
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
143
    self.failUnless(f.NonMatching(["a", "d"]))
144

    
145
  def testRegexMatch(self):
146
    f = utils.FieldSet("a", "b([0-9]+)", "c")
147
    self.failUnless(f.Matches("b1"))
148
    self.failUnless(f.Matches("b99"))
149
    self.failIf(f.Matches("b/1"))
150
    self.failIf(f.NonMatching(["b12", "c"]))
151
    self.failUnless(f.NonMatching(["a", "1"]))
152

    
153
class TestForceDictType(unittest.TestCase):
154
  """Test case for ForceDictType"""
155
  KEY_TYPES = {
156
    "a": constants.VTYPE_INT,
157
    "b": constants.VTYPE_BOOL,
158
    "c": constants.VTYPE_STRING,
159
    "d": constants.VTYPE_SIZE,
160
    "e": constants.VTYPE_MAYBE_STRING,
161
    }
162

    
163
  def _fdt(self, dict, allowed_values=None):
164
    if allowed_values is None:
165
      utils.ForceDictType(dict, self.KEY_TYPES)
166
    else:
167
      utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
168

    
169
    return dict
170

    
171
  def testSimpleDict(self):
172
    self.assertEqual(self._fdt({}), {})
173
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
174
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
175
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
176
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
177
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
178
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
179
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
180
    self.assertEqual(self._fdt({'b': False}), {'b': False})
181
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
182
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
183
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
184
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
185
    self.assertEqual(self._fdt({"e": None, }), {"e": None, })
186
    self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
187
    self.assertEqual(self._fdt({"e": False, }), {"e": '', })
188
    self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
189

    
190
  def testErrors(self):
191
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
192
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
193
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
194
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
195
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
196
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
197
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
198
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
199
    self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
200
    self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
201
                      {"b": "hello"}, {"b": "no-such-type"})
202

    
203

    
204
class TestValidateServiceName(unittest.TestCase):
205
  def testValid(self):
206
    testnames = [
207
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
208
      "ganeti",
209
      "gnt-masterd",
210
      "HELLO_WORLD_SVC",
211
      "hello.world.1",
212
      "0", "80", "1111", "65535",
213
      ]
214

    
215
    for name in testnames:
216
      self.assertEqual(utils.ValidateServiceName(name), name)
217

    
218
  def testInvalid(self):
219
    testnames = [
220
      -15756, -1, 65536, 133428083,
221
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
222
      "-8546", "-1", "65536",
223
      (129 * "A"),
224
      ]
225

    
226
    for name in testnames:
227
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
228

    
229

    
230
class TestReadLockedPidFile(unittest.TestCase):
231
  def setUp(self):
232
    self.tmpdir = tempfile.mkdtemp()
233

    
234
  def tearDown(self):
235
    shutil.rmtree(self.tmpdir)
236

    
237
  def testNonExistent(self):
238
    path = utils.PathJoin(self.tmpdir, "nonexist")
239
    self.assert_(utils.ReadLockedPidFile(path) is None)
240

    
241
  def testUnlocked(self):
242
    path = utils.PathJoin(self.tmpdir, "pid")
243
    utils.WriteFile(path, data="123")
244
    self.assert_(utils.ReadLockedPidFile(path) is None)
245

    
246
  def testLocked(self):
247
    path = utils.PathJoin(self.tmpdir, "pid")
248
    utils.WriteFile(path, data="123")
249

    
250
    fl = utils.FileLock.Open(path)
251
    try:
252
      fl.Exclusive(blocking=True)
253

    
254
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
255
    finally:
256
      fl.Close()
257

    
258
    self.assert_(utils.ReadLockedPidFile(path) is None)
259

    
260
  def testError(self):
261
    path = utils.PathJoin(self.tmpdir, "foobar", "pid")
262
    utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
263
    # open(2) should return ENOTDIR
264
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
265

    
266

    
267
class TestFindMatch(unittest.TestCase):
268
  def test(self):
269
    data = {
270
      "aaaa": "Four A",
271
      "bb": {"Two B": True},
272
      re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
273
      }
274

    
275
    self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
276
    self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
277

    
278
    for i in ["foo", "bar", "bazX"]:
279
      for j in range(1, 100, 7):
280
        self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
281
                         ((1, 2, 3), [i, str(j)]))
282

    
283
  def testNoMatch(self):
284
    self.assert_(utils.FindMatch({}, "") is None)
285
    self.assert_(utils.FindMatch({}, "foo") is None)
286
    self.assert_(utils.FindMatch({}, 1234) is None)
287

    
288
    data = {
289
      "X": "Hello World",
290
      re.compile("^(something)$"): "Hello World",
291
      }
292

    
293
    self.assert_(utils.FindMatch(data, "") is None)
294
    self.assert_(utils.FindMatch(data, "Hello World") is None)
295

    
296

    
297
class TestTryConvert(unittest.TestCase):
298
  def test(self):
299
    for src, fn, result in [
300
      ("1", int, 1),
301
      ("a", int, "a"),
302
      ("", bool, False),
303
      ("a", bool, True),
304
      ]:
305
      self.assertEqual(utils.TryConvert(fn, src), result)
306

    
307

    
308
if __name__ == '__main__':
309
  testutils.GanetiTestProgram()