Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 5ca84bdd

History | View | Annotate | Download (14.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30

    
31
import ganeti
32
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
33
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
34
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
35
     ShellQuote, ShellQuoteArgs, _ParseIpOutput
36
from ganeti.errors import LockError, UnitParseError
37

    
38
class TestIsProcessAlive(unittest.TestCase):
39
  """Testing case for IsProcessAlive"""
40
  def setUp(self):
41
    # create a zombie and a (hopefully) non-existing process id
42
    self.pid_zombie = os.fork()
43
    if self.pid_zombie == 0:
44
      os._exit(0)
45
    elif self.pid_zombie < 0:
46
      raise SystemError("can't fork")
47
    self.pid_non_existing = os.fork()
48
    if self.pid_non_existing == 0:
49
      os._exit(0)
50
    elif self.pid_non_existing > 0:
51
      os.waitpid(self.pid_non_existing, 0)
52
    else:
53
      raise SystemError("can't fork")
54

    
55

    
56
  def testExists(self):
57
    mypid = os.getpid()
58
    self.assert_(IsProcessAlive(mypid),
59
                 "can't find myself running")
60

    
61
  def testZombie(self):
62
    self.assert_(not IsProcessAlive(self.pid_zombie),
63
                 "zombie not detected as zombie")
64

    
65

    
66
  def testNotExisting(self):
67
    self.assert_(not IsProcessAlive(self.pid_non_existing),
68
                 "noexisting process detected")
69

    
70

    
71
class TestLocking(unittest.TestCase):
72
  """Testing case for the Lock/Unlock functions"""
73
  def clean_lock(self, name):
74
    try:
75
      ganeti.utils.Unlock("unittest")
76
    except LockError:
77
      pass
78

    
79

    
80
  def testLock(self):
81
    self.clean_lock("unittest")
82
    self.assertEqual(None, Lock("unittest"))
83

    
84

    
85
  def testUnlock(self):
86
    self.clean_lock("unittest")
87
    ganeti.utils.Lock("unittest")
88
    self.assertEqual(None, Unlock("unittest"))
89

    
90

    
91
  def testDoubleLock(self):
92
    self.clean_lock("unittest")
93
    ganeti.utils.Lock("unittest")
94
    self.assertRaises(LockError, Lock, "unittest")
95

    
96

    
97
class TestRunCmd(unittest.TestCase):
98
  """Testing case for the RunCmd function"""
99

    
100
  def setUp(self):
101
    self.magic = time.ctime() + " ganeti test"
102

    
103
  def testOk(self):
104
    """Test successful exit code"""
105
    result = RunCmd("/bin/sh -c 'exit 0'")
106
    self.assertEqual(result.exit_code, 0)
107

    
108
  def testFail(self):
109
    """Test fail exit code"""
110
    result = RunCmd("/bin/sh -c 'exit 1'")
111
    self.assertEqual(result.exit_code, 1)
112

    
113

    
114
  def testStdout(self):
115
    """Test standard output"""
116
    cmd = 'echo -n "%s"' % self.magic
117
    result = RunCmd("/bin/sh -c '%s'" % cmd)
118
    self.assertEqual(result.stdout, self.magic)
119

    
120

    
121
  def testStderr(self):
122
    """Test standard error"""
123
    cmd = 'echo -n "%s"' % self.magic
124
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
125
    self.assertEqual(result.stderr, self.magic)
126

    
127

    
128
  def testCombined(self):
129
    """Test combined output"""
130
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
131
    result = RunCmd("/bin/sh -c '%s'" % cmd)
132
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
133

    
134
  def testSignal(self):
135
    """Test standard error"""
136
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
137
    self.assertEqual(result.signal, 15)
138

    
139
  def testListRun(self):
140
    """Test list runs"""
141
    result = RunCmd(["true"])
142
    self.assertEqual(result.signal, None)
143
    self.assertEqual(result.exit_code, 0)
144
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
145
    self.assertEqual(result.signal, None)
146
    self.assertEqual(result.exit_code, 1)
147
    result = RunCmd(["echo", "-n", self.magic])
148
    self.assertEqual(result.signal, None)
149
    self.assertEqual(result.exit_code, 0)
150
    self.assertEqual(result.stdout, self.magic)
151

    
152
  def testLang(self):
153
    """Test locale environment"""
154
    old_env = os.environ.copy()
155
    try:
156
      os.environ["LANG"] = "en_US.UTF-8"
157
      os.environ["LC_ALL"] = "en_US.UTF-8"
158
      result = RunCmd(["locale"])
159
      for line in result.output.splitlines():
160
        key, value = line.split("=", 1)
161
        # Ignore these variables, they're overridden by LC_ALL
162
        if key == "LANG" or key == "LANGUAGE":
163
          continue
164
        self.failIf(value and value != "C" and value != '"C"',
165
            "Variable %s is set to the invalid value '%s'" % (key, value))
166
    finally:
167
      os.environ = old_env
168

    
169

    
170
class TestRemoveFile(unittest.TestCase):
171
  """Test case for the RemoveFile function"""
172

    
173
  def setUp(self):
174
    """Create a temp dir and file for each case"""
175
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
176
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
177
    os.close(fd)
178

    
179
  def tearDown(self):
180
    if os.path.exists(self.tmpfile):
181
      os.unlink(self.tmpfile)
182
    os.rmdir(self.tmpdir)
183

    
184

    
185
  def testIgnoreDirs(self):
186
    """Test that RemoveFile() ignores directories"""
187
    self.assertEqual(None, RemoveFile(self.tmpdir))
188

    
189

    
190
  def testIgnoreNotExisting(self):
191
    """Test that RemoveFile() ignores non-existing files"""
192
    RemoveFile(self.tmpfile)
193
    RemoveFile(self.tmpfile)
194

    
195

    
196
  def testRemoveFile(self):
197
    """Test that RemoveFile does remove a file"""
198
    RemoveFile(self.tmpfile)
199
    if os.path.exists(self.tmpfile):
200
      self.fail("File '%s' not removed" % self.tmpfile)
201

    
202

    
203
  def testRemoveSymlink(self):
204
    """Test that RemoveFile does remove symlinks"""
205
    symlink = self.tmpdir + "/symlink"
206
    os.symlink("no-such-file", symlink)
207
    RemoveFile(symlink)
208
    if os.path.exists(symlink):
209
      self.fail("File '%s' not removed" % symlink)
210
    os.symlink(self.tmpfile, symlink)
211
    RemoveFile(symlink)
212
    if os.path.exists(symlink):
213
      self.fail("File '%s' not removed" % symlink)
214

    
215

    
216
class TestCheckdict(unittest.TestCase):
217
  """Test case for the CheckDict function"""
218

    
219
  def testAdd(self):
220
    """Test that CheckDict adds a missing key with the correct value"""
221

    
222
    tgt = {'a':1}
223
    tmpl = {'b': 2}
224
    CheckDict(tgt, tmpl)
225
    if 'b' not in tgt or tgt['b'] != 2:
226
      self.fail("Failed to update dict")
227

    
228

    
229
  def testNoUpdate(self):
230
    """Test that CheckDict does not overwrite an existing key"""
231
    tgt = {'a':1, 'b': 3}
232
    tmpl = {'b': 2}
233
    CheckDict(tgt, tmpl)
234
    self.failUnlessEqual(tgt['b'], 3)
235

    
236

    
237
class TestMatchNameComponent(unittest.TestCase):
238
  """Test case for the MatchNameComponent function"""
239

    
240
  def testEmptyList(self):
241
    """Test that there is no match against an empty list"""
242

    
243
    self.failUnlessEqual(MatchNameComponent("", []), None)
244
    self.failUnlessEqual(MatchNameComponent("test", []), None)
245

    
246
  def testSingleMatch(self):
247
    """Test that a single match is performed correctly"""
248
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
249
    for key in "test2", "test2.example", "test2.example.com":
250
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
251

    
252
  def testMultipleMatches(self):
253
    """Test that a multiple match is returned as None"""
254
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
255
    for key in "test1", "test1.example":
256
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
257

    
258

    
259
class TestFormatUnit(unittest.TestCase):
260
  """Test case for the FormatUnit function"""
261

    
262
  def testMiB(self):
263
    self.assertEqual(FormatUnit(1), '1M')
264
    self.assertEqual(FormatUnit(100), '100M')
265
    self.assertEqual(FormatUnit(1023), '1023M')
266

    
267
  def testGiB(self):
268
    self.assertEqual(FormatUnit(1024), '1.0G')
269
    self.assertEqual(FormatUnit(1536), '1.5G')
270
    self.assertEqual(FormatUnit(17133), '16.7G')
271
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
272

    
273
  def testTiB(self):
274
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
275
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
276
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
277

    
278

    
279
class TestParseUnit(unittest.TestCase):
280
  """Test case for the ParseUnit function"""
281

    
282
  SCALES = (('', 1),
283
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
284
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
285
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
286

    
287
  def testRounding(self):
288
    self.assertEqual(ParseUnit('0'), 0)
289
    self.assertEqual(ParseUnit('1'), 4)
290
    self.assertEqual(ParseUnit('2'), 4)
291
    self.assertEqual(ParseUnit('3'), 4)
292

    
293
    self.assertEqual(ParseUnit('124'), 124)
294
    self.assertEqual(ParseUnit('125'), 128)
295
    self.assertEqual(ParseUnit('126'), 128)
296
    self.assertEqual(ParseUnit('127'), 128)
297
    self.assertEqual(ParseUnit('128'), 128)
298
    self.assertEqual(ParseUnit('129'), 132)
299
    self.assertEqual(ParseUnit('130'), 132)
300

    
301
  def testFloating(self):
302
    self.assertEqual(ParseUnit('0'), 0)
303
    self.assertEqual(ParseUnit('0.5'), 4)
304
    self.assertEqual(ParseUnit('1.75'), 4)
305
    self.assertEqual(ParseUnit('1.99'), 4)
306
    self.assertEqual(ParseUnit('2.00'), 4)
307
    self.assertEqual(ParseUnit('2.01'), 4)
308
    self.assertEqual(ParseUnit('3.99'), 4)
309
    self.assertEqual(ParseUnit('4.00'), 4)
310
    self.assertEqual(ParseUnit('4.01'), 8)
311
    self.assertEqual(ParseUnit('1.5G'), 1536)
312
    self.assertEqual(ParseUnit('1.8G'), 1844)
313
    self.assertEqual(ParseUnit('8.28T'), 8682212)
314

    
315
  def testSuffixes(self):
316
    for sep in ('', ' ', '   ', "\t", "\t "):
317
      for suffix, scale in TestParseUnit.SCALES:
318
        for func in (lambda x: x, str.lower, str.upper):
319
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
320

    
321
  def testInvalidInput(self):
322
    for sep in ('-', '_', ',', 'a'):
323
      for suffix, _ in TestParseUnit.SCALES:
324
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
325

    
326
    for suffix, _ in TestParseUnit.SCALES:
327
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
328

    
329

    
330
class TestSshKeys(unittest.TestCase):
331
  """Test case for the AddAuthorizedKey function"""
332

    
333
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
334
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
335
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
336

    
337
  # NOTE: The MD5 sums below were calculated after manually
338
  #       checking the output files.
339

    
340
  def writeTestFile(self):
341
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
342
    f = os.fdopen(fd, 'w')
343
    try:
344
      f.write(TestSshKeys.KEY_A)
345
      f.write("\n")
346
      f.write(TestSshKeys.KEY_B)
347
      f.write("\n")
348
    finally:
349
      f.close()
350

    
351
    return tmpname
352

    
353
  def testAddingNewKey(self):
354
    tmpname = self.writeTestFile()
355
    try:
356
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
357

    
358
      f = open(tmpname, 'r')
359
      try:
360
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
361
                         'ccc71523108ca6e9d0343797dc3e9f16')
362
      finally:
363
        f.close()
364
    finally:
365
      os.unlink(tmpname)
366

    
367
  def testAddingAlmostButNotCompletlyTheSameKey(self):
368
    tmpname = self.writeTestFile()
369
    try:
370
      AddAuthorizedKey(tmpname,
371
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
372

    
373
      f = open(tmpname, 'r')
374
      try:
375
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
376
                         'f2c939d57addb5b3a6846884be896b46')
377
      finally:
378
        f.close()
379
    finally:
380
      os.unlink(tmpname)
381

    
382
  def testAddingExistingKeyWithSomeMoreSpaces(self):
383
    tmpname = self.writeTestFile()
384
    try:
385
      AddAuthorizedKey(tmpname,
386
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
387

    
388
      f = open(tmpname, 'r')
389
      try:
390
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
391
                         '4e612764808bd46337eb0f575415fc30')
392
      finally:
393
        f.close()
394
    finally:
395
      os.unlink(tmpname)
396

    
397
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
398
    tmpname = self.writeTestFile()
399
    try:
400
      RemoveAuthorizedKey(tmpname,
401
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
402

    
403
      f = open(tmpname, 'r')
404
      try:
405
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
406
                         '77516d987fca07f70e30b830b3e4f2ed')
407
      finally:
408
        f.close()
409
    finally:
410
      os.unlink(tmpname)
411

    
412
  def testRemovingNonExistingKey(self):
413
    tmpname = self.writeTestFile()
414
    try:
415
      RemoveAuthorizedKey(tmpname,
416
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
417

    
418
      f = open(tmpname, 'r')
419
      try:
420
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
421
                         '4e612764808bd46337eb0f575415fc30')
422
      finally:
423
        f.close()
424
    finally:
425
      os.unlink(tmpname)
426

    
427

    
428
class TestShellQuoting(unittest.TestCase):
429
  """Test case for shell quoting functions"""
430

    
431
  def testShellQuote(self):
432
    self.assertEqual(ShellQuote('abc'), "abc")
433
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
434
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
435
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
436
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
437

    
438
  def testShellQuoteArgs(self):
439
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
440
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
441
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
442

    
443

    
444
class TestIpAdressList(unittest.TestCase):
445
  """Test case for local IP addresses"""
446

    
447
  def _test(self, output, required):
448
    ips = _ParseIpOutput(output)
449

    
450
    # Sort the output, so our check below works in all cases
451
    ips.sort()
452
    required.sort()
453

    
454
    self.assertEqual(required, ips)
455

    
456
  def testSingleIpAddress(self):
457
    output = \
458
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
459
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n")
460
    self._test(output, ['127.0.0.1', '10.0.0.1'])
461

    
462
  def testMultipleIpAddresses(self):
463
    output = \
464
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
465
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n"
466
       "5: eth0    inet 1.2.3.4/8 brd 1.255.255.255 scope global eth0:test\n")
467
    self._test(output, ['127.0.0.1', '10.0.0.1', '1.2.3.4'])
468

    
469

    
470
if __name__ == '__main__':
471
  unittest.main()