Statistics
| Branch: | Tag: | Revision:

root / testing / ganeti.utils_unittest.py @ f6441c7c

History | View | Annotate | Download (14 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30

    
31
import ganeti
32
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
33
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
34
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
35
     ShellQuote, ShellQuoteArgs, _ParseIpOutput
36
from ganeti.errors import LockError, UnitParseError
37

    
38
class TestIsProcessAlive(unittest.TestCase):
39
  """Testing case for IsProcessAlive"""
40
  def setUp(self):
41
    # create a zombie and a (hopefully) non-existing process id
42
    self.pid_zombie = os.fork()
43
    if self.pid_zombie == 0:
44
      os._exit(0)
45
    elif self.pid_zombie < 0:
46
      raise SystemError("can't fork")
47
    self.pid_non_existing = os.fork()
48
    if self.pid_non_existing == 0:
49
      os._exit(0)
50
    elif self.pid_non_existing > 0:
51
      os.waitpid(self.pid_non_existing, 0)
52
    else:
53
      raise SystemError("can't fork")
54

    
55

    
56
  def testExists(self):
57
    mypid = os.getpid()
58
    self.assert_(IsProcessAlive(mypid),
59
                 "can't find myself running")
60

    
61
  def testZombie(self):
62
    self.assert_(not IsProcessAlive(self.pid_zombie),
63
                 "zombie not detected as zombie")
64

    
65

    
66
  def testNotExisting(self):
67
    self.assert_(not IsProcessAlive(self.pid_non_existing),
68
                 "noexisting process detected")
69

    
70

    
71
class TestLocking(unittest.TestCase):
72
  """Testing case for the Lock/Unlock functions"""
73
  def clean_lock(self, name):
74
    try:
75
      ganeti.utils.Unlock("unittest")
76
    except LockError:
77
      pass
78

    
79

    
80
  def testLock(self):
81
    self.clean_lock("unittest")
82
    self.assertEqual(None, Lock("unittest"))
83

    
84

    
85
  def testUnlock(self):
86
    self.clean_lock("unittest")
87
    ganeti.utils.Lock("unittest")
88
    self.assertEqual(None, Unlock("unittest"))
89

    
90

    
91
  def testDoubleLock(self):
92
    self.clean_lock("unittest")
93
    ganeti.utils.Lock("unittest")
94
    self.assertRaises(LockError, Lock, "unittest")
95

    
96

    
97
class TestRunCmd(unittest.TestCase):
98
  """Testing case for the RunCmd function"""
99

    
100
  def setUp(self):
101
    self.magic = time.ctime() + " ganeti test"
102

    
103
  def testOk(self):
104
    """Test successful exit code"""
105
    result = RunCmd("/bin/sh -c 'exit 0'")
106
    self.assertEqual(result.exit_code, 0)
107

    
108
  def testFail(self):
109
    """Test fail exit code"""
110
    result = RunCmd("/bin/sh -c 'exit 1'")
111
    self.assertEqual(result.exit_code, 1)
112

    
113

    
114
  def testStdout(self):
115
    """Test standard output"""
116
    cmd = 'echo -n "%s"' % self.magic
117
    result = RunCmd("/bin/sh -c '%s'" % cmd)
118
    self.assertEqual(result.stdout, self.magic)
119

    
120

    
121
  def testStderr(self):
122
    """Test standard error"""
123
    cmd = 'echo -n "%s"' % self.magic
124
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
125
    self.assertEqual(result.stderr, self.magic)
126

    
127

    
128
  def testCombined(self):
129
    """Test combined output"""
130
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
131
    result = RunCmd("/bin/sh -c '%s'" % cmd)
132
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
133

    
134
  def testSignal(self):
135
    """Test standard error"""
136
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
137
    self.assertEqual(result.signal, 15)
138

    
139
  def testListRun(self):
140
    """Test list runs"""
141
    result = RunCmd(["true"])
142
    self.assertEqual(result.signal, None)
143
    self.assertEqual(result.exit_code, 0)
144
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
145
    self.assertEqual(result.signal, None)
146
    self.assertEqual(result.exit_code, 1)
147
    result = RunCmd(["echo", "-n", self.magic])
148
    self.assertEqual(result.signal, None)
149
    self.assertEqual(result.exit_code, 0)
150
    self.assertEqual(result.stdout, self.magic)
151

    
152
  def testLang(self):
153
    """Test locale environment"""
154
    os.environ["LANG"] = "en_US.UTF-8"
155
    os.environ["LC_ALL"] = "en_US.UTF-8"
156
    result = RunCmd(["env"])
157
    for line in result.output.splitlines():
158
      key, val = line.split("=", 1)
159
      if key.startswith("LC_") or key == "LANG":
160
        self.fail("Unexpected language variable '%s' = '%s'" % (key, val))
161

    
162

    
163
class TestRemoveFile(unittest.TestCase):
164
  """Test case for the RemoveFile function"""
165

    
166
  def setUp(self):
167
    """Create a temp dir and file for each case"""
168
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
169
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
170
    os.close(fd)
171

    
172
  def tearDown(self):
173
    if os.path.exists(self.tmpfile):
174
      os.unlink(self.tmpfile)
175
    os.rmdir(self.tmpdir)
176

    
177

    
178
  def testIgnoreDirs(self):
179
    """Test that RemoveFile() ignores directories"""
180
    self.assertEqual(None, RemoveFile(self.tmpdir))
181

    
182

    
183
  def testIgnoreNotExisting(self):
184
    """Test that RemoveFile() ignores non-existing files"""
185
    RemoveFile(self.tmpfile)
186
    RemoveFile(self.tmpfile)
187

    
188

    
189
  def testRemoveFile(self):
190
    """Test that RemoveFile does remove a file"""
191
    RemoveFile(self.tmpfile)
192
    if os.path.exists(self.tmpfile):
193
      self.fail("File '%s' not removed" % self.tmpfile)
194

    
195

    
196
  def testRemoveSymlink(self):
197
    """Test that RemoveFile does remove symlinks"""
198
    symlink = self.tmpdir + "/symlink"
199
    os.symlink("no-such-file", symlink)
200
    RemoveFile(symlink)
201
    if os.path.exists(symlink):
202
      self.fail("File '%s' not removed" % symlink)
203
    os.symlink(self.tmpfile, symlink)
204
    RemoveFile(symlink)
205
    if os.path.exists(symlink):
206
      self.fail("File '%s' not removed" % symlink)
207

    
208

    
209
class TestCheckdict(unittest.TestCase):
210
  """Test case for the CheckDict function"""
211

    
212
  def testAdd(self):
213
    """Test that CheckDict adds a missing key with the correct value"""
214

    
215
    tgt = {'a':1}
216
    tmpl = {'b': 2}
217
    CheckDict(tgt, tmpl)
218
    if 'b' not in tgt or tgt['b'] != 2:
219
      self.fail("Failed to update dict")
220

    
221

    
222
  def testNoUpdate(self):
223
    """Test that CheckDict does not overwrite an existing key"""
224
    tgt = {'a':1, 'b': 3}
225
    tmpl = {'b': 2}
226
    CheckDict(tgt, tmpl)
227
    self.failUnlessEqual(tgt['b'], 3)
228

    
229

    
230
class TestMatchNameComponent(unittest.TestCase):
231
  """Test case for the MatchNameComponent function"""
232

    
233
  def testEmptyList(self):
234
    """Test that there is no match against an empty list"""
235

    
236
    self.failUnlessEqual(MatchNameComponent("", []), None)
237
    self.failUnlessEqual(MatchNameComponent("test", []), None)
238

    
239
  def testSingleMatch(self):
240
    """Test that a single match is performed correctly"""
241
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
242
    for key in "test2", "test2.example", "test2.example.com":
243
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
244

    
245
  def testMultipleMatches(self):
246
    """Test that a multiple match is returned as None"""
247
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
248
    for key in "test1", "test1.example":
249
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
250

    
251

    
252
class TestFormatUnit(unittest.TestCase):
253
  """Test case for the FormatUnit function"""
254

    
255
  def testMiB(self):
256
    self.assertEqual(FormatUnit(1), '1M')
257
    self.assertEqual(FormatUnit(100), '100M')
258
    self.assertEqual(FormatUnit(1023), '1023M')
259

    
260
  def testGiB(self):
261
    self.assertEqual(FormatUnit(1024), '1.0G')
262
    self.assertEqual(FormatUnit(1536), '1.5G')
263
    self.assertEqual(FormatUnit(17133), '16.7G')
264
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
265

    
266
  def testTiB(self):
267
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
268
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
269
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
270

    
271

    
272
class TestParseUnit(unittest.TestCase):
273
  """Test case for the ParseUnit function"""
274

    
275
  SCALES = (('', 1),
276
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
277
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
278
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
279

    
280
  def testRounding(self):
281
    self.assertEqual(ParseUnit('0'), 0)
282
    self.assertEqual(ParseUnit('1'), 4)
283
    self.assertEqual(ParseUnit('2'), 4)
284
    self.assertEqual(ParseUnit('3'), 4)
285

    
286
    self.assertEqual(ParseUnit('124'), 124)
287
    self.assertEqual(ParseUnit('125'), 128)
288
    self.assertEqual(ParseUnit('126'), 128)
289
    self.assertEqual(ParseUnit('127'), 128)
290
    self.assertEqual(ParseUnit('128'), 128)
291
    self.assertEqual(ParseUnit('129'), 132)
292
    self.assertEqual(ParseUnit('130'), 132)
293

    
294
  def testFloating(self):
295
    self.assertEqual(ParseUnit('0'), 0)
296
    self.assertEqual(ParseUnit('0.5'), 4)
297
    self.assertEqual(ParseUnit('1.75'), 4)
298
    self.assertEqual(ParseUnit('1.99'), 4)
299
    self.assertEqual(ParseUnit('2.00'), 4)
300
    self.assertEqual(ParseUnit('2.01'), 4)
301
    self.assertEqual(ParseUnit('3.99'), 4)
302
    self.assertEqual(ParseUnit('4.00'), 4)
303
    self.assertEqual(ParseUnit('4.01'), 8)
304
    self.assertEqual(ParseUnit('1.5G'), 1536)
305
    self.assertEqual(ParseUnit('1.8G'), 1844)
306
    self.assertEqual(ParseUnit('8.28T'), 8682212)
307

    
308
  def testSuffixes(self):
309
    for sep in ('', ' ', '   ', "\t", "\t "):
310
      for suffix, scale in TestParseUnit.SCALES:
311
        for func in (lambda x: x, str.lower, str.upper):
312
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
313

    
314
  def testInvalidInput(self):
315
    for sep in ('-', '_', ',', 'a'):
316
      for suffix, _ in TestParseUnit.SCALES:
317
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
318

    
319
    for suffix, _ in TestParseUnit.SCALES:
320
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
321

    
322

    
323
class TestSshKeys(unittest.TestCase):
324
  """Test case for the AddAuthorizedKey function"""
325

    
326
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
327
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
328
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
329

    
330
  # NOTE: The MD5 sums below were calculated after manually
331
  #       checking the output files.
332

    
333
  def writeTestFile(self):
334
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
335
    f = os.fdopen(fd, 'w')
336
    try:
337
      f.write(TestSshKeys.KEY_A)
338
      f.write("\n")
339
      f.write(TestSshKeys.KEY_B)
340
      f.write("\n")
341
    finally:
342
      f.close()
343

    
344
    return tmpname
345

    
346
  def testAddingNewKey(self):
347
    tmpname = self.writeTestFile()
348
    try:
349
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
350

    
351
      f = open(tmpname, 'r')
352
      try:
353
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
354
                         'ccc71523108ca6e9d0343797dc3e9f16')
355
      finally:
356
        f.close()
357
    finally:
358
      os.unlink(tmpname)
359

    
360
  def testAddingAlmostButNotCompletlyTheSameKey(self):
361
    tmpname = self.writeTestFile()
362
    try:
363
      AddAuthorizedKey(tmpname,
364
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
365

    
366
      f = open(tmpname, 'r')
367
      try:
368
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
369
                         'f2c939d57addb5b3a6846884be896b46')
370
      finally:
371
        f.close()
372
    finally:
373
      os.unlink(tmpname)
374

    
375
  def testAddingExistingKeyWithSomeMoreSpaces(self):
376
    tmpname = self.writeTestFile()
377
    try:
378
      AddAuthorizedKey(tmpname,
379
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
380

    
381
      f = open(tmpname, 'r')
382
      try:
383
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
384
                         '4e612764808bd46337eb0f575415fc30')
385
      finally:
386
        f.close()
387
    finally:
388
      os.unlink(tmpname)
389

    
390
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
391
    tmpname = self.writeTestFile()
392
    try:
393
      RemoveAuthorizedKey(tmpname,
394
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
395

    
396
      f = open(tmpname, 'r')
397
      try:
398
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
399
                         '77516d987fca07f70e30b830b3e4f2ed')
400
      finally:
401
        f.close()
402
    finally:
403
      os.unlink(tmpname)
404

    
405
  def testRemovingNonExistingKey(self):
406
    tmpname = self.writeTestFile()
407
    try:
408
      RemoveAuthorizedKey(tmpname,
409
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
410

    
411
      f = open(tmpname, 'r')
412
      try:
413
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
414
                         '4e612764808bd46337eb0f575415fc30')
415
      finally:
416
        f.close()
417
    finally:
418
      os.unlink(tmpname)
419

    
420

    
421
class TestShellQuoting(unittest.TestCase):
422
  """Test case for shell quoting functions"""
423

    
424
  def testShellQuote(self):
425
    self.assertEqual(ShellQuote('abc'), "abc")
426
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
427
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
428
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
429
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
430

    
431
  def testShellQuoteArgs(self):
432
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
433
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
434
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
435

    
436

    
437
class TestIpAdressList(unittest.TestCase):
438
  """Test case for local IP addresses"""
439

    
440
  def _test(self, output, required):
441
    ips = _ParseIpOutput(output)
442

    
443
    # Sort the output, so our check below works in all cases
444
    ips.sort()
445
    required.sort()
446

    
447
    self.assertEqual(required, ips)
448

    
449
  def testSingleIpAddress(self):
450
    output = \
451
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
452
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n")
453
    self._test(output, ['127.0.0.1', '10.0.0.1'])
454

    
455
  def testMultipleIpAddresses(self):
456
    output = \
457
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
458
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n"
459
       "5: eth0    inet 1.2.3.4/8 brd 1.255.255.255 scope global eth0:test\n")
460
    self._test(output, ['127.0.0.1', '10.0.0.1', '1.2.3.4'])
461

    
462

    
463
if __name__ == '__main__':
464
  unittest.main()