Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 4ca1b175

History | View | Annotate | Download (16.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31

    
32

    
33
import ganeti
34
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
35
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
36
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
37
     ShellQuote, ShellQuoteArgs, _ParseIpOutput, TcpPing
38
from ganeti.errors import LockError, UnitParseError
39

    
40

    
41
class TestIsProcessAlive(unittest.TestCase):
42
  """Testing case for IsProcessAlive"""
43
  def setUp(self):
44
    # create a zombie and a (hopefully) non-existing process id
45
    self.pid_zombie = os.fork()
46
    if self.pid_zombie == 0:
47
      os._exit(0)
48
    elif self.pid_zombie < 0:
49
      raise SystemError("can't fork")
50
    self.pid_non_existing = os.fork()
51
    if self.pid_non_existing == 0:
52
      os._exit(0)
53
    elif self.pid_non_existing > 0:
54
      os.waitpid(self.pid_non_existing, 0)
55
    else:
56
      raise SystemError("can't fork")
57

    
58

    
59
  def testExists(self):
60
    mypid = os.getpid()
61
    self.assert_(IsProcessAlive(mypid),
62
                 "can't find myself running")
63

    
64
  def testZombie(self):
65
    self.assert_(not IsProcessAlive(self.pid_zombie),
66
                 "zombie not detected as zombie")
67

    
68

    
69
  def testNotExisting(self):
70
    self.assert_(not IsProcessAlive(self.pid_non_existing),
71
                 "noexisting process detected")
72

    
73

    
74
class TestLocking(unittest.TestCase):
75
  """Testing case for the Lock/Unlock functions"""
76
  def clean_lock(self, name):
77
    try:
78
      ganeti.utils.Unlock("unittest")
79
    except LockError:
80
      pass
81

    
82

    
83
  def testLock(self):
84
    self.clean_lock("unittest")
85
    self.assertEqual(None, Lock("unittest"))
86

    
87

    
88
  def testUnlock(self):
89
    self.clean_lock("unittest")
90
    ganeti.utils.Lock("unittest")
91
    self.assertEqual(None, Unlock("unittest"))
92

    
93

    
94
  def testDoubleLock(self):
95
    self.clean_lock("unittest")
96
    ganeti.utils.Lock("unittest")
97
    self.assertRaises(LockError, Lock, "unittest")
98

    
99

    
100
class TestRunCmd(unittest.TestCase):
101
  """Testing case for the RunCmd function"""
102

    
103
  def setUp(self):
104
    self.magic = time.ctime() + " ganeti test"
105

    
106
  def testOk(self):
107
    """Test successful exit code"""
108
    result = RunCmd("/bin/sh -c 'exit 0'")
109
    self.assertEqual(result.exit_code, 0)
110

    
111
  def testFail(self):
112
    """Test fail exit code"""
113
    result = RunCmd("/bin/sh -c 'exit 1'")
114
    self.assertEqual(result.exit_code, 1)
115

    
116

    
117
  def testStdout(self):
118
    """Test standard output"""
119
    cmd = 'echo -n "%s"' % self.magic
120
    result = RunCmd("/bin/sh -c '%s'" % cmd)
121
    self.assertEqual(result.stdout, self.magic)
122

    
123

    
124
  def testStderr(self):
125
    """Test standard error"""
126
    cmd = 'echo -n "%s"' % self.magic
127
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
128
    self.assertEqual(result.stderr, self.magic)
129

    
130

    
131
  def testCombined(self):
132
    """Test combined output"""
133
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
134
    result = RunCmd("/bin/sh -c '%s'" % cmd)
135
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
136

    
137
  def testSignal(self):
138
    """Test standard error"""
139
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
140
    self.assertEqual(result.signal, 15)
141

    
142
  def testListRun(self):
143
    """Test list runs"""
144
    result = RunCmd(["true"])
145
    self.assertEqual(result.signal, None)
146
    self.assertEqual(result.exit_code, 0)
147
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
148
    self.assertEqual(result.signal, None)
149
    self.assertEqual(result.exit_code, 1)
150
    result = RunCmd(["echo", "-n", self.magic])
151
    self.assertEqual(result.signal, None)
152
    self.assertEqual(result.exit_code, 0)
153
    self.assertEqual(result.stdout, self.magic)
154

    
155
  def testLang(self):
156
    """Test locale environment"""
157
    old_env = os.environ.copy()
158
    try:
159
      os.environ["LANG"] = "en_US.UTF-8"
160
      os.environ["LC_ALL"] = "en_US.UTF-8"
161
      result = RunCmd(["locale"])
162
      for line in result.output.splitlines():
163
        key, value = line.split("=", 1)
164
        # Ignore these variables, they're overridden by LC_ALL
165
        if key == "LANG" or key == "LANGUAGE":
166
          continue
167
        self.failIf(value and value != "C" and value != '"C"',
168
            "Variable %s is set to the invalid value '%s'" % (key, value))
169
    finally:
170
      os.environ = old_env
171

    
172

    
173
class TestRemoveFile(unittest.TestCase):
174
  """Test case for the RemoveFile function"""
175

    
176
  def setUp(self):
177
    """Create a temp dir and file for each case"""
178
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
179
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
180
    os.close(fd)
181

    
182
  def tearDown(self):
183
    if os.path.exists(self.tmpfile):
184
      os.unlink(self.tmpfile)
185
    os.rmdir(self.tmpdir)
186

    
187

    
188
  def testIgnoreDirs(self):
189
    """Test that RemoveFile() ignores directories"""
190
    self.assertEqual(None, RemoveFile(self.tmpdir))
191

    
192

    
193
  def testIgnoreNotExisting(self):
194
    """Test that RemoveFile() ignores non-existing files"""
195
    RemoveFile(self.tmpfile)
196
    RemoveFile(self.tmpfile)
197

    
198

    
199
  def testRemoveFile(self):
200
    """Test that RemoveFile does remove a file"""
201
    RemoveFile(self.tmpfile)
202
    if os.path.exists(self.tmpfile):
203
      self.fail("File '%s' not removed" % self.tmpfile)
204

    
205

    
206
  def testRemoveSymlink(self):
207
    """Test that RemoveFile does remove symlinks"""
208
    symlink = self.tmpdir + "/symlink"
209
    os.symlink("no-such-file", symlink)
210
    RemoveFile(symlink)
211
    if os.path.exists(symlink):
212
      self.fail("File '%s' not removed" % symlink)
213
    os.symlink(self.tmpfile, symlink)
214
    RemoveFile(symlink)
215
    if os.path.exists(symlink):
216
      self.fail("File '%s' not removed" % symlink)
217

    
218

    
219
class TestCheckdict(unittest.TestCase):
220
  """Test case for the CheckDict function"""
221

    
222
  def testAdd(self):
223
    """Test that CheckDict adds a missing key with the correct value"""
224

    
225
    tgt = {'a':1}
226
    tmpl = {'b': 2}
227
    CheckDict(tgt, tmpl)
228
    if 'b' not in tgt or tgt['b'] != 2:
229
      self.fail("Failed to update dict")
230

    
231

    
232
  def testNoUpdate(self):
233
    """Test that CheckDict does not overwrite an existing key"""
234
    tgt = {'a':1, 'b': 3}
235
    tmpl = {'b': 2}
236
    CheckDict(tgt, tmpl)
237
    self.failUnlessEqual(tgt['b'], 3)
238

    
239

    
240
class TestMatchNameComponent(unittest.TestCase):
241
  """Test case for the MatchNameComponent function"""
242

    
243
  def testEmptyList(self):
244
    """Test that there is no match against an empty list"""
245

    
246
    self.failUnlessEqual(MatchNameComponent("", []), None)
247
    self.failUnlessEqual(MatchNameComponent("test", []), None)
248

    
249
  def testSingleMatch(self):
250
    """Test that a single match is performed correctly"""
251
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
252
    for key in "test2", "test2.example", "test2.example.com":
253
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
254

    
255
  def testMultipleMatches(self):
256
    """Test that a multiple match is returned as None"""
257
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
258
    for key in "test1", "test1.example":
259
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
260

    
261

    
262
class TestFormatUnit(unittest.TestCase):
263
  """Test case for the FormatUnit function"""
264

    
265
  def testMiB(self):
266
    self.assertEqual(FormatUnit(1), '1M')
267
    self.assertEqual(FormatUnit(100), '100M')
268
    self.assertEqual(FormatUnit(1023), '1023M')
269

    
270
  def testGiB(self):
271
    self.assertEqual(FormatUnit(1024), '1.0G')
272
    self.assertEqual(FormatUnit(1536), '1.5G')
273
    self.assertEqual(FormatUnit(17133), '16.7G')
274
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
275

    
276
  def testTiB(self):
277
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
278
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
279
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
280

    
281

    
282
class TestParseUnit(unittest.TestCase):
283
  """Test case for the ParseUnit function"""
284

    
285
  SCALES = (('', 1),
286
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
287
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
288
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
289

    
290
  def testRounding(self):
291
    self.assertEqual(ParseUnit('0'), 0)
292
    self.assertEqual(ParseUnit('1'), 4)
293
    self.assertEqual(ParseUnit('2'), 4)
294
    self.assertEqual(ParseUnit('3'), 4)
295

    
296
    self.assertEqual(ParseUnit('124'), 124)
297
    self.assertEqual(ParseUnit('125'), 128)
298
    self.assertEqual(ParseUnit('126'), 128)
299
    self.assertEqual(ParseUnit('127'), 128)
300
    self.assertEqual(ParseUnit('128'), 128)
301
    self.assertEqual(ParseUnit('129'), 132)
302
    self.assertEqual(ParseUnit('130'), 132)
303

    
304
  def testFloating(self):
305
    self.assertEqual(ParseUnit('0'), 0)
306
    self.assertEqual(ParseUnit('0.5'), 4)
307
    self.assertEqual(ParseUnit('1.75'), 4)
308
    self.assertEqual(ParseUnit('1.99'), 4)
309
    self.assertEqual(ParseUnit('2.00'), 4)
310
    self.assertEqual(ParseUnit('2.01'), 4)
311
    self.assertEqual(ParseUnit('3.99'), 4)
312
    self.assertEqual(ParseUnit('4.00'), 4)
313
    self.assertEqual(ParseUnit('4.01'), 8)
314
    self.assertEqual(ParseUnit('1.5G'), 1536)
315
    self.assertEqual(ParseUnit('1.8G'), 1844)
316
    self.assertEqual(ParseUnit('8.28T'), 8682212)
317

    
318
  def testSuffixes(self):
319
    for sep in ('', ' ', '   ', "\t", "\t "):
320
      for suffix, scale in TestParseUnit.SCALES:
321
        for func in (lambda x: x, str.lower, str.upper):
322
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
323

    
324
  def testInvalidInput(self):
325
    for sep in ('-', '_', ',', 'a'):
326
      for suffix, _ in TestParseUnit.SCALES:
327
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
328

    
329
    for suffix, _ in TestParseUnit.SCALES:
330
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
331

    
332

    
333
class TestSshKeys(unittest.TestCase):
334
  """Test case for the AddAuthorizedKey function"""
335

    
336
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
337
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
338
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
339

    
340
  # NOTE: The MD5 sums below were calculated after manually
341
  #       checking the output files.
342

    
343
  def writeTestFile(self):
344
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
345
    f = os.fdopen(fd, 'w')
346
    try:
347
      f.write(TestSshKeys.KEY_A)
348
      f.write("\n")
349
      f.write(TestSshKeys.KEY_B)
350
      f.write("\n")
351
    finally:
352
      f.close()
353

    
354
    return tmpname
355

    
356
  def testAddingNewKey(self):
357
    tmpname = self.writeTestFile()
358
    try:
359
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
360

    
361
      f = open(tmpname, 'r')
362
      try:
363
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
364
                         'ccc71523108ca6e9d0343797dc3e9f16')
365
      finally:
366
        f.close()
367
    finally:
368
      os.unlink(tmpname)
369

    
370
  def testAddingAlmostButNotCompletlyTheSameKey(self):
371
    tmpname = self.writeTestFile()
372
    try:
373
      AddAuthorizedKey(tmpname,
374
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
375

    
376
      f = open(tmpname, 'r')
377
      try:
378
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
379
                         'f2c939d57addb5b3a6846884be896b46')
380
      finally:
381
        f.close()
382
    finally:
383
      os.unlink(tmpname)
384

    
385
  def testAddingExistingKeyWithSomeMoreSpaces(self):
386
    tmpname = self.writeTestFile()
387
    try:
388
      AddAuthorizedKey(tmpname,
389
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
390

    
391
      f = open(tmpname, 'r')
392
      try:
393
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
394
                         '4e612764808bd46337eb0f575415fc30')
395
      finally:
396
        f.close()
397
    finally:
398
      os.unlink(tmpname)
399

    
400
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
401
    tmpname = self.writeTestFile()
402
    try:
403
      RemoveAuthorizedKey(tmpname,
404
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
405

    
406
      f = open(tmpname, 'r')
407
      try:
408
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
409
                         '77516d987fca07f70e30b830b3e4f2ed')
410
      finally:
411
        f.close()
412
    finally:
413
      os.unlink(tmpname)
414

    
415
  def testRemovingNonExistingKey(self):
416
    tmpname = self.writeTestFile()
417
    try:
418
      RemoveAuthorizedKey(tmpname,
419
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
420

    
421
      f = open(tmpname, 'r')
422
      try:
423
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
424
                         '4e612764808bd46337eb0f575415fc30')
425
      finally:
426
        f.close()
427
    finally:
428
      os.unlink(tmpname)
429

    
430

    
431
class TestShellQuoting(unittest.TestCase):
432
  """Test case for shell quoting functions"""
433

    
434
  def testShellQuote(self):
435
    self.assertEqual(ShellQuote('abc'), "abc")
436
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
437
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
438
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
439
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
440

    
441
  def testShellQuoteArgs(self):
442
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
443
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
444
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
445

    
446

    
447
class TestIpAdressList(unittest.TestCase):
448
  """Test case for local IP addresses"""
449

    
450
  def _test(self, output, required):
451
    ips = _ParseIpOutput(output)
452

    
453
    # Sort the output, so our check below works in all cases
454
    ips.sort()
455
    required.sort()
456

    
457
    self.assertEqual(required, ips)
458

    
459
  def testSingleIpAddress(self):
460
    output = \
461
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
462
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n")
463
    self._test(output, ['127.0.0.1', '10.0.0.1'])
464

    
465
  def testMultipleIpAddresses(self):
466
    output = \
467
      ("3: lo    inet 127.0.0.1/8 brd 127.255.255.255 scope host lo\n"
468
       "5: eth0    inet 10.0.0.1/24 brd 172.30.15.127 scope global eth0\n"
469
       "5: eth0    inet 1.2.3.4/8 brd 1.255.255.255 scope global eth0:test\n")
470
    self._test(output, ['127.0.0.1', '10.0.0.1', '1.2.3.4'])
471

    
472

    
473
class TestTcpPing(unittest.TestCase):
474
  """Testcase for TCP version of ping - against listen(2)ing port"""
475

    
476
  def setUp(self):
477
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
478
    self.listener.bind(("127.0.0.1", 0))
479
    self.listenerport = self.listener.getsockname()[1]
480
    self.listener.listen(1)
481

    
482
  def tearDown(self):
483
    self.listener.shutdown(socket.SHUT_RDWR)
484
    del self.listener
485
    del self.listenerport
486

    
487
  def testTcpPingToLocalHostAccept(self):
488
    self.assert_(TcpPing("127.0.0.1",
489
                         "127.0.0.1",
490
                         self.listenerport,
491
                         timeout=10,
492
                         live_port_needed=True),
493
                 "failed to connect to test listener")
494

    
495

    
496
class TestTcpPingDeaf(unittest.TestCase):
497
  """Testcase for TCP version of ping - against non listen(2)ing port"""
498

    
499
  def setUp(self):
500
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
501
    self.deaflistener.bind(("127.0.0.1", 0))
502
    self.deaflistenerport = self.deaflistener.getsockname()[1]
503

    
504
  def tearDown(self):
505
    del self.deaflistener
506
    del self.deaflistenerport
507

    
508
  def testTcpPingToLocalHostAcceptDeaf(self):
509
    self.failIf(TcpPing("127.0.0.1",
510
                        "127.0.0.1",
511
                        self.deaflistenerport,
512
                        timeout=10,  # timeout for blocking operations
513
                        live_port_needed=True), # need successful connect(2)
514
                "successfully connected to deaf listener")
515

    
516
  def testTcpPingToLocalHostNoAccept(self):
517
    self.assert_(TcpPing("127.0.0.1",
518
                         "127.0.0.1",
519
                         self.deaflistenerport,
520
                         timeout=10, # timeout for blocking operations
521
                         live_port_needed=False), # ECONNREFUSED is OK
522
                 "failed to ping alive host on deaf port")
523

    
524

    
525
if __name__ == '__main__':
526
  unittest.main()