Statistics
| Branch: | Tag: | Revision:

root / testing / ganeti.utils_unittest.py @ 88d14415

History | View | Annotate | Download (13.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30

    
31
import ganeti
32
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
33
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
34
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
35
     ShellQuote, ShellQuoteArgs, _ParseIpOutput
36
from ganeti.errors import LockError, UnitParseError
37

    
38
class TestIsProcessAlive(unittest.TestCase):
39
  """Testing case for IsProcessAlive"""
40
  def setUp(self):
41
    # create a zombie and a (hopefully) non-existing process id
42
    self.pid_zombie = os.fork()
43
    if self.pid_zombie == 0:
44
      os._exit(0)
45
    elif self.pid_zombie < 0:
46
      raise SystemError("can't fork")
47
    self.pid_non_existing = os.fork()
48
    if self.pid_non_existing == 0:
49
      os._exit(0)
50
    elif self.pid_non_existing > 0:
51
      os.waitpid(self.pid_non_existing, 0)
52
    else:
53
      raise SystemError("can't fork")
54

    
55

    
56
  def testExists(self):
57
    mypid = os.getpid()
58
    self.assert_(IsProcessAlive(mypid),
59
                 "can't find myself running")
60

    
61
  def testZombie(self):
62
    self.assert_(not IsProcessAlive(self.pid_zombie),
63
                 "zombie not detected as zombie")
64

    
65

    
66
  def testNotExisting(self):
67
    self.assert_(not IsProcessAlive(self.pid_non_existing),
68
                 "noexisting process detected")
69

    
70

    
71
class TestLocking(unittest.TestCase):
72
  """Testing case for the Lock/Unlock functions"""
73
  def clean_lock(self, name):
74
    try:
75
      ganeti.utils.Unlock("unittest")
76
    except LockError:
77
      pass
78

    
79

    
80
  def testLock(self):
81
    self.clean_lock("unittest")
82
    self.assertEqual(None, Lock("unittest"))
83

    
84

    
85
  def testUnlock(self):
86
    self.clean_lock("unittest")
87
    ganeti.utils.Lock("unittest")
88
    self.assertEqual(None, Unlock("unittest"))
89

    
90

    
91
  def testDoubleLock(self):
92
    self.clean_lock("unittest")
93
    ganeti.utils.Lock("unittest")
94
    self.assertRaises(LockError, Lock, "unittest")
95

    
96

    
97
class TestRunCmd(unittest.TestCase):
98
  """Testing case for the RunCmd function"""
99

    
100
  def setUp(self):
101
    self.magic = time.ctime() + " ganeti test"
102

    
103
  def testOk(self):
104
    """Test successful exit code"""
105
    result = RunCmd("/bin/sh -c 'exit 0'")
106
    self.assertEqual(result.exit_code, 0)
107

    
108
  def testFail(self):
109
    """Test fail exit code"""
110
    result = RunCmd("/bin/sh -c 'exit 1'")
111
    self.assertEqual(result.exit_code, 1)
112

    
113

    
114
  def testStdout(self):
115
    """Test standard output"""
116
    cmd = 'echo -n "%s"' % self.magic
117
    result = RunCmd("/bin/sh -c '%s'" % cmd)
118
    self.assertEqual(result.stdout, self.magic)
119

    
120

    
121
  def testStderr(self):
122
    """Test standard error"""
123
    cmd = 'echo -n "%s"' % self.magic
124
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
125
    self.assertEqual(result.stderr, self.magic)
126

    
127

    
128
  def testCombined(self):
129
    """Test combined output"""
130
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
131
    result = RunCmd("/bin/sh -c '%s'" % cmd)
132
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
133

    
134
  def testSignal(self):
135
    """Test standard error"""
136
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
137
    self.assertEqual(result.signal, 15)
138

    
139
  def testListRun(self):
140
    """Test list runs"""
141
    result = RunCmd(["true"])
142
    self.assertEqual(result.signal, None)
143
    self.assertEqual(result.exit_code, 0)
144
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
145
    self.assertEqual(result.signal, None)
146
    self.assertEqual(result.exit_code, 1)
147
    result = RunCmd(["echo", "-n", self.magic])
148
    self.assertEqual(result.signal, None)
149
    self.assertEqual(result.exit_code, 0)
150
    self.assertEqual(result.stdout, self.magic)
151

    
152

    
153
class TestRemoveFile(unittest.TestCase):
154
  """Test case for the RemoveFile function"""
155

    
156
  def setUp(self):
157
    """Create a temp dir and file for each case"""
158
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
159
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
160
    os.close(fd)
161

    
162
  def tearDown(self):
163
    if os.path.exists(self.tmpfile):
164
      os.unlink(self.tmpfile)
165
    os.rmdir(self.tmpdir)
166

    
167

    
168
  def testIgnoreDirs(self):
169
    """Test that RemoveFile() ignores directories"""
170
    self.assertEqual(None, RemoveFile(self.tmpdir))
171

    
172

    
173
  def testIgnoreNotExisting(self):
174
    """Test that RemoveFile() ignores non-existing files"""
175
    RemoveFile(self.tmpfile)
176
    RemoveFile(self.tmpfile)
177

    
178

    
179
  def testRemoveFile(self):
180
    """Test that RemoveFile does remove a file"""
181
    RemoveFile(self.tmpfile)
182
    if os.path.exists(self.tmpfile):
183
      self.fail("File '%s' not removed" % self.tmpfile)
184

    
185

    
186
  def testRemoveSymlink(self):
187
    """Test that RemoveFile does remove symlinks"""
188
    symlink = self.tmpdir + "/symlink"
189
    os.symlink("no-such-file", symlink)
190
    RemoveFile(symlink)
191
    if os.path.exists(symlink):
192
      self.fail("File '%s' not removed" % symlink)
193
    os.symlink(self.tmpfile, symlink)
194
    RemoveFile(symlink)
195
    if os.path.exists(symlink):
196
      self.fail("File '%s' not removed" % symlink)
197

    
198

    
199
class TestCheckdict(unittest.TestCase):
200
  """Test case for the CheckDict function"""
201

    
202
  def testAdd(self):
203
    """Test that CheckDict adds a missing key with the correct value"""
204

    
205
    tgt = {'a':1}
206
    tmpl = {'b': 2}
207
    CheckDict(tgt, tmpl)
208
    if 'b' not in tgt or tgt['b'] != 2:
209
      self.fail("Failed to update dict")
210

    
211

    
212
  def testNoUpdate(self):
213
    """Test that CheckDict does not overwrite an existing key"""
214
    tgt = {'a':1, 'b': 3}
215
    tmpl = {'b': 2}
216
    CheckDict(tgt, tmpl)
217
    self.failUnlessEqual(tgt['b'], 3)
218

    
219

    
220
class TestMatchNameComponent(unittest.TestCase):
221
  """Test case for the MatchNameComponent function"""
222

    
223
  def testEmptyList(self):
224
    """Test that there is no match against an empty list"""
225

    
226
    self.failUnlessEqual(MatchNameComponent("", []), None)
227
    self.failUnlessEqual(MatchNameComponent("test", []), None)
228

    
229
  def testSingleMatch(self):
230
    """Test that a single match is performed correctly"""
231
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
232
    for key in "test2", "test2.example", "test2.example.com":
233
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
234

    
235
  def testMultipleMatches(self):
236
    """Test that a multiple match is returned as None"""
237
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
238
    for key in "test1", "test1.example":
239
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
240

    
241

    
242
class TestFormatUnit(unittest.TestCase):
243
  """Test case for the FormatUnit function"""
244

    
245
  def testMiB(self):
246
    self.assertEqual(FormatUnit(1), '1M')
247
    self.assertEqual(FormatUnit(100), '100M')
248
    self.assertEqual(FormatUnit(1023), '1023M')
249

    
250
  def testGiB(self):
251
    self.assertEqual(FormatUnit(1024), '1.0G')
252
    self.assertEqual(FormatUnit(1536), '1.5G')
253
    self.assertEqual(FormatUnit(17133), '16.7G')
254
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
255

    
256
  def testTiB(self):
257
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
258
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
259
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
260

    
261

    
262
class TestParseUnit(unittest.TestCase):
263
  """Test case for the ParseUnit function"""
264

    
265
  SCALES = (('', 1),
266
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
267
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
268
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
269

    
270
  def testRounding(self):
271
    self.assertEqual(ParseUnit('0'), 0)
272
    self.assertEqual(ParseUnit('1'), 4)
273
    self.assertEqual(ParseUnit('2'), 4)
274
    self.assertEqual(ParseUnit('3'), 4)
275

    
276
    self.assertEqual(ParseUnit('124'), 124)
277
    self.assertEqual(ParseUnit('125'), 128)
278
    self.assertEqual(ParseUnit('126'), 128)
279
    self.assertEqual(ParseUnit('127'), 128)
280
    self.assertEqual(ParseUnit('128'), 128)
281
    self.assertEqual(ParseUnit('129'), 132)
282
    self.assertEqual(ParseUnit('130'), 132)
283

    
284
  def testFloating(self):
285
    self.assertEqual(ParseUnit('0'), 0)
286
    self.assertEqual(ParseUnit('0.5'), 4)
287
    self.assertEqual(ParseUnit('1.75'), 4)
288
    self.assertEqual(ParseUnit('1.99'), 4)
289
    self.assertEqual(ParseUnit('2.00'), 4)
290
    self.assertEqual(ParseUnit('2.01'), 4)
291
    self.assertEqual(ParseUnit('3.99'), 4)
292
    self.assertEqual(ParseUnit('4.00'), 4)
293
    self.assertEqual(ParseUnit('4.01'), 8)
294
    self.assertEqual(ParseUnit('1.5G'), 1536)
295
    self.assertEqual(ParseUnit('1.8G'), 1844)
296
    self.assertEqual(ParseUnit('8.28T'), 8682212)
297

    
298
  def testSuffixes(self):
299
    for sep in ('', ' ', '   ', "\t", "\t "):
300
      for suffix, scale in TestParseUnit.SCALES:
301
        for func in (lambda x: x, str.lower, str.upper):
302
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
303

    
304
  def testInvalidInput(self):
305
    for sep in ('-', '_', ',', 'a'):
306
      for suffix, _ in TestParseUnit.SCALES:
307
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
308

    
309
    for suffix, _ in TestParseUnit.SCALES:
310
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
311

    
312

    
313
class TestSshKeys(unittest.TestCase):
314
  """Test case for the AddAuthorizedKey function"""
315

    
316
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
317
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
318
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
319

    
320
  # NOTE: The MD5 sums below were calculated after manually
321
  #       checking the output files.
322

    
323
  def writeTestFile(self):
324
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
325
    f = os.fdopen(fd, 'w')
326
    try:
327
      f.write(TestSshKeys.KEY_A)
328
      f.write("\n")
329
      f.write(TestSshKeys.KEY_B)
330
      f.write("\n")
331
    finally:
332
      f.close()
333

    
334
    return tmpname
335

    
336
  def testAddingNewKey(self):
337
    tmpname = self.writeTestFile()
338
    try:
339
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
340

    
341
      f = open(tmpname, 'r')
342
      try:
343
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
344
                         'ccc71523108ca6e9d0343797dc3e9f16')
345
      finally:
346
        f.close()
347
    finally:
348
      os.unlink(tmpname)
349

    
350
  def testAddingAlmostButNotCompletlyTheSameKey(self):
351
    tmpname = self.writeTestFile()
352
    try:
353
      AddAuthorizedKey(tmpname,
354
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
355

    
356
      f = open(tmpname, 'r')
357
      try:
358
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
359
                         'f2c939d57addb5b3a6846884be896b46')
360
      finally:
361
        f.close()
362
    finally:
363
      os.unlink(tmpname)
364

    
365
  def testAddingExistingKeyWithSomeMoreSpaces(self):
366
    tmpname = self.writeTestFile()
367
    try:
368
      AddAuthorizedKey(tmpname,
369
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
370

    
371
      f = open(tmpname, 'r')
372
      try:
373
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
374
                         '4e612764808bd46337eb0f575415fc30')
375
      finally:
376
        f.close()
377
    finally:
378
      os.unlink(tmpname)
379

    
380
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
381
    tmpname = self.writeTestFile()
382
    try:
383
      RemoveAuthorizedKey(tmpname,
384
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
385

    
386
      f = open(tmpname, 'r')
387
      try:
388
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
389
                         '77516d987fca07f70e30b830b3e4f2ed')
390
      finally:
391
        f.close()
392
    finally:
393
      os.unlink(tmpname)
394

    
395
  def testRemovingNonExistingKey(self):
396
    tmpname = self.writeTestFile()
397
    try:
398
      RemoveAuthorizedKey(tmpname,
399
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
400

    
401
      f = open(tmpname, 'r')
402
      try:
403
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
404
                         '4e612764808bd46337eb0f575415fc30')
405
      finally:
406
        f.close()
407
    finally:
408
      os.unlink(tmpname)
409

    
410

    
411
class TestShellQuoting(unittest.TestCase):
412
  """Test case for shell quoting functions"""
413

    
414
  def testShellQuote(self):
415
    self.assertEqual(ShellQuote('abc'), "abc")
416
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
417
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
418
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
419
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
420

    
421
  def testShellQuoteArgs(self):
422
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
423
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
424
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
425

    
426

    
427
class TestIpAdressList(unittest.TestCase):
428
  """Test case for local IP addresses"""
429

    
430
  def _test(self, output, required):
431
    ips = _ParseIpOutput(output)
432

    
433
    # Sort the output, so our check below works in all cases
434
    ips.sort()
435
    required.sort()
436

    
437
    self.assertEqual(required, ips)
438

    
439
  def testSingleIpAddress(self):
440
    output = \
441
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
442
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n")
443
    self._test(output, ['127.0.0.1', '10.0.0.1'])
444

    
445
  def testMultipleIpAddresses(self):
446
    output = \
447
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
448
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n"
449
       "5: eth0    inet 1.2.3.4/8 brd 1.255.255.255 scope global eth0:test\n")
450
    self._test(output, ['127.0.0.1', '10.0.0.1', '1.2.3.4'])
451

    
452

    
453
if __name__ == '__main__':
454
  unittest.main()