Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ eedbda4b

History | View | Annotate | Download (17 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31
import shutil
32

    
33
import ganeti
34
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
35
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
36
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
37
     ShellQuote, ShellQuoteArgs, _ParseIpOutput, TcpPing, \
38
     ListVisibleFiles
39
from ganeti.errors import LockError, UnitParseError
40

    
41

    
42
class TestIsProcessAlive(unittest.TestCase):
43
  """Testing case for IsProcessAlive"""
44
  def setUp(self):
45
    # create a zombie and a (hopefully) non-existing process id
46
    self.pid_zombie = os.fork()
47
    if self.pid_zombie == 0:
48
      os._exit(0)
49
    elif self.pid_zombie < 0:
50
      raise SystemError("can't fork")
51
    self.pid_non_existing = os.fork()
52
    if self.pid_non_existing == 0:
53
      os._exit(0)
54
    elif self.pid_non_existing > 0:
55
      os.waitpid(self.pid_non_existing, 0)
56
    else:
57
      raise SystemError("can't fork")
58

    
59

    
60
  def testExists(self):
61
    mypid = os.getpid()
62
    self.assert_(IsProcessAlive(mypid),
63
                 "can't find myself running")
64

    
65
  def testZombie(self):
66
    self.assert_(not IsProcessAlive(self.pid_zombie),
67
                 "zombie not detected as zombie")
68

    
69

    
70
  def testNotExisting(self):
71
    self.assert_(not IsProcessAlive(self.pid_non_existing),
72
                 "noexisting process detected")
73

    
74

    
75
class TestLocking(unittest.TestCase):
76
  """Testing case for the Lock/Unlock functions"""
77
  def clean_lock(self, name):
78
    try:
79
      ganeti.utils.Unlock("unittest")
80
    except LockError:
81
      pass
82

    
83

    
84
  def testLock(self):
85
    self.clean_lock("unittest")
86
    self.assertEqual(None, Lock("unittest"))
87

    
88

    
89
  def testUnlock(self):
90
    self.clean_lock("unittest")
91
    ganeti.utils.Lock("unittest")
92
    self.assertEqual(None, Unlock("unittest"))
93

    
94

    
95
  def testDoubleLock(self):
96
    self.clean_lock("unittest")
97
    ganeti.utils.Lock("unittest")
98
    self.assertRaises(LockError, Lock, "unittest")
99

    
100

    
101
class TestRunCmd(unittest.TestCase):
102
  """Testing case for the RunCmd function"""
103

    
104
  def setUp(self):
105
    self.magic = time.ctime() + " ganeti test"
106

    
107
  def testOk(self):
108
    """Test successful exit code"""
109
    result = RunCmd("/bin/sh -c 'exit 0'")
110
    self.assertEqual(result.exit_code, 0)
111

    
112
  def testFail(self):
113
    """Test fail exit code"""
114
    result = RunCmd("/bin/sh -c 'exit 1'")
115
    self.assertEqual(result.exit_code, 1)
116

    
117

    
118
  def testStdout(self):
119
    """Test standard output"""
120
    cmd = 'echo -n "%s"' % self.magic
121
    result = RunCmd("/bin/sh -c '%s'" % cmd)
122
    self.assertEqual(result.stdout, self.magic)
123

    
124

    
125
  def testStderr(self):
126
    """Test standard error"""
127
    cmd = 'echo -n "%s"' % self.magic
128
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
129
    self.assertEqual(result.stderr, self.magic)
130

    
131

    
132
  def testCombined(self):
133
    """Test combined output"""
134
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
135
    result = RunCmd("/bin/sh -c '%s'" % cmd)
136
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
137

    
138
  def testSignal(self):
139
    """Test standard error"""
140
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
141
    self.assertEqual(result.signal, 15)
142

    
143
  def testListRun(self):
144
    """Test list runs"""
145
    result = RunCmd(["true"])
146
    self.assertEqual(result.signal, None)
147
    self.assertEqual(result.exit_code, 0)
148
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
149
    self.assertEqual(result.signal, None)
150
    self.assertEqual(result.exit_code, 1)
151
    result = RunCmd(["echo", "-n", self.magic])
152
    self.assertEqual(result.signal, None)
153
    self.assertEqual(result.exit_code, 0)
154
    self.assertEqual(result.stdout, self.magic)
155

    
156
  def testLang(self):
157
    """Test locale environment"""
158
    old_env = os.environ.copy()
159
    try:
160
      os.environ["LANG"] = "en_US.UTF-8"
161
      os.environ["LC_ALL"] = "en_US.UTF-8"
162
      result = RunCmd(["locale"])
163
      for line in result.output.splitlines():
164
        key, value = line.split("=", 1)
165
        # Ignore these variables, they're overridden by LC_ALL
166
        if key == "LANG" or key == "LANGUAGE":
167
          continue
168
        self.failIf(value and value != "C" and value != '"C"',
169
            "Variable %s is set to the invalid value '%s'" % (key, value))
170
    finally:
171
      os.environ = old_env
172

    
173

    
174
class TestRemoveFile(unittest.TestCase):
175
  """Test case for the RemoveFile function"""
176

    
177
  def setUp(self):
178
    """Create a temp dir and file for each case"""
179
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
180
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
181
    os.close(fd)
182

    
183
  def tearDown(self):
184
    if os.path.exists(self.tmpfile):
185
      os.unlink(self.tmpfile)
186
    os.rmdir(self.tmpdir)
187

    
188

    
189
  def testIgnoreDirs(self):
190
    """Test that RemoveFile() ignores directories"""
191
    self.assertEqual(None, RemoveFile(self.tmpdir))
192

    
193

    
194
  def testIgnoreNotExisting(self):
195
    """Test that RemoveFile() ignores non-existing files"""
196
    RemoveFile(self.tmpfile)
197
    RemoveFile(self.tmpfile)
198

    
199

    
200
  def testRemoveFile(self):
201
    """Test that RemoveFile does remove a file"""
202
    RemoveFile(self.tmpfile)
203
    if os.path.exists(self.tmpfile):
204
      self.fail("File '%s' not removed" % self.tmpfile)
205

    
206

    
207
  def testRemoveSymlink(self):
208
    """Test that RemoveFile does remove symlinks"""
209
    symlink = self.tmpdir + "/symlink"
210
    os.symlink("no-such-file", symlink)
211
    RemoveFile(symlink)
212
    if os.path.exists(symlink):
213
      self.fail("File '%s' not removed" % symlink)
214
    os.symlink(self.tmpfile, symlink)
215
    RemoveFile(symlink)
216
    if os.path.exists(symlink):
217
      self.fail("File '%s' not removed" % symlink)
218

    
219

    
220
class TestCheckdict(unittest.TestCase):
221
  """Test case for the CheckDict function"""
222

    
223
  def testAdd(self):
224
    """Test that CheckDict adds a missing key with the correct value"""
225

    
226
    tgt = {'a':1}
227
    tmpl = {'b': 2}
228
    CheckDict(tgt, tmpl)
229
    if 'b' not in tgt or tgt['b'] != 2:
230
      self.fail("Failed to update dict")
231

    
232

    
233
  def testNoUpdate(self):
234
    """Test that CheckDict does not overwrite an existing key"""
235
    tgt = {'a':1, 'b': 3}
236
    tmpl = {'b': 2}
237
    CheckDict(tgt, tmpl)
238
    self.failUnlessEqual(tgt['b'], 3)
239

    
240

    
241
class TestMatchNameComponent(unittest.TestCase):
242
  """Test case for the MatchNameComponent function"""
243

    
244
  def testEmptyList(self):
245
    """Test that there is no match against an empty list"""
246

    
247
    self.failUnlessEqual(MatchNameComponent("", []), None)
248
    self.failUnlessEqual(MatchNameComponent("test", []), None)
249

    
250
  def testSingleMatch(self):
251
    """Test that a single match is performed correctly"""
252
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
253
    for key in "test2", "test2.example", "test2.example.com":
254
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
255

    
256
  def testMultipleMatches(self):
257
    """Test that a multiple match is returned as None"""
258
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
259
    for key in "test1", "test1.example":
260
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
261

    
262

    
263
class TestFormatUnit(unittest.TestCase):
264
  """Test case for the FormatUnit function"""
265

    
266
  def testMiB(self):
267
    self.assertEqual(FormatUnit(1), '1M')
268
    self.assertEqual(FormatUnit(100), '100M')
269
    self.assertEqual(FormatUnit(1023), '1023M')
270

    
271
  def testGiB(self):
272
    self.assertEqual(FormatUnit(1024), '1.0G')
273
    self.assertEqual(FormatUnit(1536), '1.5G')
274
    self.assertEqual(FormatUnit(17133), '16.7G')
275
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
276

    
277
  def testTiB(self):
278
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
279
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
280
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
281

    
282

    
283
class TestParseUnit(unittest.TestCase):
284
  """Test case for the ParseUnit function"""
285

    
286
  SCALES = (('', 1),
287
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
288
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
289
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
290

    
291
  def testRounding(self):
292
    self.assertEqual(ParseUnit('0'), 0)
293
    self.assertEqual(ParseUnit('1'), 4)
294
    self.assertEqual(ParseUnit('2'), 4)
295
    self.assertEqual(ParseUnit('3'), 4)
296

    
297
    self.assertEqual(ParseUnit('124'), 124)
298
    self.assertEqual(ParseUnit('125'), 128)
299
    self.assertEqual(ParseUnit('126'), 128)
300
    self.assertEqual(ParseUnit('127'), 128)
301
    self.assertEqual(ParseUnit('128'), 128)
302
    self.assertEqual(ParseUnit('129'), 132)
303
    self.assertEqual(ParseUnit('130'), 132)
304

    
305
  def testFloating(self):
306
    self.assertEqual(ParseUnit('0'), 0)
307
    self.assertEqual(ParseUnit('0.5'), 4)
308
    self.assertEqual(ParseUnit('1.75'), 4)
309
    self.assertEqual(ParseUnit('1.99'), 4)
310
    self.assertEqual(ParseUnit('2.00'), 4)
311
    self.assertEqual(ParseUnit('2.01'), 4)
312
    self.assertEqual(ParseUnit('3.99'), 4)
313
    self.assertEqual(ParseUnit('4.00'), 4)
314
    self.assertEqual(ParseUnit('4.01'), 8)
315
    self.assertEqual(ParseUnit('1.5G'), 1536)
316
    self.assertEqual(ParseUnit('1.8G'), 1844)
317
    self.assertEqual(ParseUnit('8.28T'), 8682212)
318

    
319
  def testSuffixes(self):
320
    for sep in ('', ' ', '   ', "\t", "\t "):
321
      for suffix, scale in TestParseUnit.SCALES:
322
        for func in (lambda x: x, str.lower, str.upper):
323
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
324

    
325
  def testInvalidInput(self):
326
    for sep in ('-', '_', ',', 'a'):
327
      for suffix, _ in TestParseUnit.SCALES:
328
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
329

    
330
    for suffix, _ in TestParseUnit.SCALES:
331
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
332

    
333

    
334
class TestSshKeys(unittest.TestCase):
335
  """Test case for the AddAuthorizedKey function"""
336

    
337
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
338
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
339
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
340

    
341
  # NOTE: The MD5 sums below were calculated after manually
342
  #       checking the output files.
343

    
344
  def writeTestFile(self):
345
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
346
    f = os.fdopen(fd, 'w')
347
    try:
348
      f.write(TestSshKeys.KEY_A)
349
      f.write("\n")
350
      f.write(TestSshKeys.KEY_B)
351
      f.write("\n")
352
    finally:
353
      f.close()
354

    
355
    return tmpname
356

    
357
  def testAddingNewKey(self):
358
    tmpname = self.writeTestFile()
359
    try:
360
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
361

    
362
      f = open(tmpname, 'r')
363
      try:
364
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
365
                         'ccc71523108ca6e9d0343797dc3e9f16')
366
      finally:
367
        f.close()
368
    finally:
369
      os.unlink(tmpname)
370

    
371
  def testAddingAlmostButNotCompletlyTheSameKey(self):
372
    tmpname = self.writeTestFile()
373
    try:
374
      AddAuthorizedKey(tmpname,
375
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
376

    
377
      f = open(tmpname, 'r')
378
      try:
379
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
380
                         'f2c939d57addb5b3a6846884be896b46')
381
      finally:
382
        f.close()
383
    finally:
384
      os.unlink(tmpname)
385

    
386
  def testAddingExistingKeyWithSomeMoreSpaces(self):
387
    tmpname = self.writeTestFile()
388
    try:
389
      AddAuthorizedKey(tmpname,
390
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
391

    
392
      f = open(tmpname, 'r')
393
      try:
394
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
395
                         '4e612764808bd46337eb0f575415fc30')
396
      finally:
397
        f.close()
398
    finally:
399
      os.unlink(tmpname)
400

    
401
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
402
    tmpname = self.writeTestFile()
403
    try:
404
      RemoveAuthorizedKey(tmpname,
405
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
406

    
407
      f = open(tmpname, 'r')
408
      try:
409
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
410
                         '77516d987fca07f70e30b830b3e4f2ed')
411
      finally:
412
        f.close()
413
    finally:
414
      os.unlink(tmpname)
415

    
416
  def testRemovingNonExistingKey(self):
417
    tmpname = self.writeTestFile()
418
    try:
419
      RemoveAuthorizedKey(tmpname,
420
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
421

    
422
      f = open(tmpname, 'r')
423
      try:
424
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
425
                         '4e612764808bd46337eb0f575415fc30')
426
      finally:
427
        f.close()
428
    finally:
429
      os.unlink(tmpname)
430

    
431

    
432
class TestShellQuoting(unittest.TestCase):
433
  """Test case for shell quoting functions"""
434

    
435
  def testShellQuote(self):
436
    self.assertEqual(ShellQuote('abc'), "abc")
437
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
438
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
439
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
440
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
441

    
442
  def testShellQuoteArgs(self):
443
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
444
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
445
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
446

    
447

    
448
class TestIpAdressList(unittest.TestCase):
449
  """Test case for local IP addresses"""
450

    
451
  def _test(self, output, required):
452
    ips = _ParseIpOutput(output)
453

    
454
    # Sort the output, so our check below works in all cases
455
    ips.sort()
456
    required.sort()
457

    
458
    self.assertEqual(required, ips)
459

    
460
  def testSingleIpAddress(self):
461
    output = \
462
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
463
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n")
464
    self._test(output, ['127.0.0.1', '10.0.0.1'])
465

    
466
  def testMultipleIpAddresses(self):
467
    output = \
468
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
469
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n"
470
       "5: eth0    inet 1.2.3.4/8 brd 1.255.255.255 scope global eth0:test\n")
471
    self._test(output, ['127.0.0.1', '10.0.0.1', '1.2.3.4'])
472

    
473

    
474
class TestTcpPing(unittest.TestCase):
475
  """Testcase for TCP version of ping - against listen(2)ing port"""
476

    
477
  def setUp(self):
478
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
479
    self.listener.bind(("127.0.0.1", 0))
480
    self.listenerport = self.listener.getsockname()[1]
481
    self.listener.listen(1)
482

    
483
  def tearDown(self):
484
    self.listener.shutdown(socket.SHUT_RDWR)
485
    del self.listener
486
    del self.listenerport
487

    
488
  def testTcpPingToLocalHostAccept(self):
489
    self.assert_(TcpPing("127.0.0.1",
490
                         "127.0.0.1",
491
                         self.listenerport,
492
                         timeout=10,
493
                         live_port_needed=True),
494
                 "failed to connect to test listener")
495

    
496

    
497
class TestTcpPingDeaf(unittest.TestCase):
498
  """Testcase for TCP version of ping - against non listen(2)ing port"""
499

    
500
  def setUp(self):
501
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
502
    self.deaflistener.bind(("127.0.0.1", 0))
503
    self.deaflistenerport = self.deaflistener.getsockname()[1]
504

    
505
  def tearDown(self):
506
    del self.deaflistener
507
    del self.deaflistenerport
508

    
509
  def testTcpPingToLocalHostAcceptDeaf(self):
510
    self.failIf(TcpPing("127.0.0.1",
511
                        "127.0.0.1",
512
                        self.deaflistenerport,
513
                        timeout=10,  # timeout for blocking operations
514
                        live_port_needed=True), # need successful connect(2)
515
                "successfully connected to deaf listener")
516

    
517
  def testTcpPingToLocalHostNoAccept(self):
518
    self.assert_(TcpPing("127.0.0.1",
519
                         "127.0.0.1",
520
                         self.deaflistenerport,
521
                         timeout=10, # timeout for blocking operations
522
                         live_port_needed=False), # ECONNREFUSED is OK
523
                 "failed to ping alive host on deaf port")
524

    
525

    
526
class TestListVisibleFiles(unittest.TestCase):
527
  """Test case for ListVisibleFiles"""
528

    
529
  def setUp(self):
530
    self.path = tempfile.mkdtemp()
531

    
532
  def tearDown(self):
533
    shutil.rmtree(self.path)
534

    
535
  def _test(self, files, expected):
536
    # Sort a copy
537
    expected = expected[:]
538
    expected.sort()
539

    
540
    for name in files:
541
      f = open(os.path.join(self.path, name), 'w')
542
      try:
543
        f.write("Test\n")
544
      finally:
545
        f.close()
546

    
547
    found = ListVisibleFiles(self.path)
548
    found.sort()
549

    
550
    self.assertEqual(found, expected)
551

    
552
  def testAllVisible(self):
553
    files = ["a", "b", "c"]
554
    expected = files
555
    self._test(files, expected)
556

    
557
  def testNoneVisible(self):
558
    files = [".a", ".b", ".c"]
559
    expected = []
560
    self._test(files, expected)
561

    
562
  def testSomeVisible(self):
563
    files = ["a", "b", ".c"]
564
    expected = ["a", "b"]
565
    self._test(files, expected)
566

    
567

    
568
if __name__ == '__main__':
569
  unittest.main()