Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 16abfbc2

History | View | Annotate | Download (16.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31
import shutil
32

    
33
import ganeti
34
from ganeti import constants
35
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
36
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
37
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
38
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles
39
from ganeti.errors import LockError, UnitParseError
40

    
41

    
42
class TestIsProcessAlive(unittest.TestCase):
43
  """Testing case for IsProcessAlive"""
44
  def setUp(self):
45
    # create a zombie and a (hopefully) non-existing process id
46
    self.pid_zombie = os.fork()
47
    if self.pid_zombie == 0:
48
      os._exit(0)
49
    elif self.pid_zombie < 0:
50
      raise SystemError("can't fork")
51
    self.pid_non_existing = os.fork()
52
    if self.pid_non_existing == 0:
53
      os._exit(0)
54
    elif self.pid_non_existing > 0:
55
      os.waitpid(self.pid_non_existing, 0)
56
    else:
57
      raise SystemError("can't fork")
58

    
59

    
60
  def testExists(self):
61
    mypid = os.getpid()
62
    self.assert_(IsProcessAlive(mypid),
63
                 "can't find myself running")
64

    
65
  def testZombie(self):
66
    self.assert_(not IsProcessAlive(self.pid_zombie),
67
                 "zombie not detected as zombie")
68

    
69

    
70
  def testNotExisting(self):
71
    self.assert_(not IsProcessAlive(self.pid_non_existing),
72
                 "noexisting process detected")
73

    
74

    
75
class TestLocking(unittest.TestCase):
76
  """Testing case for the Lock/Unlock functions"""
77
  def clean_lock(self, name):
78
    try:
79
      ganeti.utils.Unlock("unittest")
80
    except LockError:
81
      pass
82

    
83

    
84
  def testLock(self):
85
    self.clean_lock("unittest")
86
    self.assertEqual(None, Lock("unittest"))
87

    
88

    
89
  def testUnlock(self):
90
    self.clean_lock("unittest")
91
    ganeti.utils.Lock("unittest")
92
    self.assertEqual(None, Unlock("unittest"))
93

    
94

    
95
  def testDoubleLock(self):
96
    self.clean_lock("unittest")
97
    ganeti.utils.Lock("unittest")
98
    self.assertRaises(LockError, Lock, "unittest")
99

    
100

    
101
class TestRunCmd(unittest.TestCase):
102
  """Testing case for the RunCmd function"""
103

    
104
  def setUp(self):
105
    self.magic = time.ctime() + " ganeti test"
106

    
107
  def testOk(self):
108
    """Test successful exit code"""
109
    result = RunCmd("/bin/sh -c 'exit 0'")
110
    self.assertEqual(result.exit_code, 0)
111

    
112
  def testFail(self):
113
    """Test fail exit code"""
114
    result = RunCmd("/bin/sh -c 'exit 1'")
115
    self.assertEqual(result.exit_code, 1)
116

    
117

    
118
  def testStdout(self):
119
    """Test standard output"""
120
    cmd = 'echo -n "%s"' % self.magic
121
    result = RunCmd("/bin/sh -c '%s'" % cmd)
122
    self.assertEqual(result.stdout, self.magic)
123

    
124

    
125
  def testStderr(self):
126
    """Test standard error"""
127
    cmd = 'echo -n "%s"' % self.magic
128
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
129
    self.assertEqual(result.stderr, self.magic)
130

    
131

    
132
  def testCombined(self):
133
    """Test combined output"""
134
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
135
    result = RunCmd("/bin/sh -c '%s'" % cmd)
136
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
137

    
138
  def testSignal(self):
139
    """Test standard error"""
140
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
141
    self.assertEqual(result.signal, 15)
142

    
143
  def testListRun(self):
144
    """Test list runs"""
145
    result = RunCmd(["true"])
146
    self.assertEqual(result.signal, None)
147
    self.assertEqual(result.exit_code, 0)
148
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
149
    self.assertEqual(result.signal, None)
150
    self.assertEqual(result.exit_code, 1)
151
    result = RunCmd(["echo", "-n", self.magic])
152
    self.assertEqual(result.signal, None)
153
    self.assertEqual(result.exit_code, 0)
154
    self.assertEqual(result.stdout, self.magic)
155

    
156
  def testLang(self):
157
    """Test locale environment"""
158
    old_env = os.environ.copy()
159
    try:
160
      os.environ["LANG"] = "en_US.UTF-8"
161
      os.environ["LC_ALL"] = "en_US.UTF-8"
162
      result = RunCmd(["locale"])
163
      for line in result.output.splitlines():
164
        key, value = line.split("=", 1)
165
        # Ignore these variables, they're overridden by LC_ALL
166
        if key == "LANG" or key == "LANGUAGE":
167
          continue
168
        self.failIf(value and value != "C" and value != '"C"',
169
            "Variable %s is set to the invalid value '%s'" % (key, value))
170
    finally:
171
      os.environ = old_env
172

    
173

    
174
class TestRemoveFile(unittest.TestCase):
175
  """Test case for the RemoveFile function"""
176

    
177
  def setUp(self):
178
    """Create a temp dir and file for each case"""
179
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
180
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
181
    os.close(fd)
182

    
183
  def tearDown(self):
184
    if os.path.exists(self.tmpfile):
185
      os.unlink(self.tmpfile)
186
    os.rmdir(self.tmpdir)
187

    
188

    
189
  def testIgnoreDirs(self):
190
    """Test that RemoveFile() ignores directories"""
191
    self.assertEqual(None, RemoveFile(self.tmpdir))
192

    
193

    
194
  def testIgnoreNotExisting(self):
195
    """Test that RemoveFile() ignores non-existing files"""
196
    RemoveFile(self.tmpfile)
197
    RemoveFile(self.tmpfile)
198

    
199

    
200
  def testRemoveFile(self):
201
    """Test that RemoveFile does remove a file"""
202
    RemoveFile(self.tmpfile)
203
    if os.path.exists(self.tmpfile):
204
      self.fail("File '%s' not removed" % self.tmpfile)
205

    
206

    
207
  def testRemoveSymlink(self):
208
    """Test that RemoveFile does remove symlinks"""
209
    symlink = self.tmpdir + "/symlink"
210
    os.symlink("no-such-file", symlink)
211
    RemoveFile(symlink)
212
    if os.path.exists(symlink):
213
      self.fail("File '%s' not removed" % symlink)
214
    os.symlink(self.tmpfile, symlink)
215
    RemoveFile(symlink)
216
    if os.path.exists(symlink):
217
      self.fail("File '%s' not removed" % symlink)
218

    
219

    
220
class TestCheckdict(unittest.TestCase):
221
  """Test case for the CheckDict function"""
222

    
223
  def testAdd(self):
224
    """Test that CheckDict adds a missing key with the correct value"""
225

    
226
    tgt = {'a':1}
227
    tmpl = {'b': 2}
228
    CheckDict(tgt, tmpl)
229
    if 'b' not in tgt or tgt['b'] != 2:
230
      self.fail("Failed to update dict")
231

    
232

    
233
  def testNoUpdate(self):
234
    """Test that CheckDict does not overwrite an existing key"""
235
    tgt = {'a':1, 'b': 3}
236
    tmpl = {'b': 2}
237
    CheckDict(tgt, tmpl)
238
    self.failUnlessEqual(tgt['b'], 3)
239

    
240

    
241
class TestMatchNameComponent(unittest.TestCase):
242
  """Test case for the MatchNameComponent function"""
243

    
244
  def testEmptyList(self):
245
    """Test that there is no match against an empty list"""
246

    
247
    self.failUnlessEqual(MatchNameComponent("", []), None)
248
    self.failUnlessEqual(MatchNameComponent("test", []), None)
249

    
250
  def testSingleMatch(self):
251
    """Test that a single match is performed correctly"""
252
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
253
    for key in "test2", "test2.example", "test2.example.com":
254
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
255

    
256
  def testMultipleMatches(self):
257
    """Test that a multiple match is returned as None"""
258
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
259
    for key in "test1", "test1.example":
260
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
261

    
262

    
263
class TestFormatUnit(unittest.TestCase):
264
  """Test case for the FormatUnit function"""
265

    
266
  def testMiB(self):
267
    self.assertEqual(FormatUnit(1), '1M')
268
    self.assertEqual(FormatUnit(100), '100M')
269
    self.assertEqual(FormatUnit(1023), '1023M')
270

    
271
  def testGiB(self):
272
    self.assertEqual(FormatUnit(1024), '1.0G')
273
    self.assertEqual(FormatUnit(1536), '1.5G')
274
    self.assertEqual(FormatUnit(17133), '16.7G')
275
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
276

    
277
  def testTiB(self):
278
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
279
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
280
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
281

    
282

    
283
class TestParseUnit(unittest.TestCase):
284
  """Test case for the ParseUnit function"""
285

    
286
  SCALES = (('', 1),
287
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
288
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
289
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
290

    
291
  def testRounding(self):
292
    self.assertEqual(ParseUnit('0'), 0)
293
    self.assertEqual(ParseUnit('1'), 4)
294
    self.assertEqual(ParseUnit('2'), 4)
295
    self.assertEqual(ParseUnit('3'), 4)
296

    
297
    self.assertEqual(ParseUnit('124'), 124)
298
    self.assertEqual(ParseUnit('125'), 128)
299
    self.assertEqual(ParseUnit('126'), 128)
300
    self.assertEqual(ParseUnit('127'), 128)
301
    self.assertEqual(ParseUnit('128'), 128)
302
    self.assertEqual(ParseUnit('129'), 132)
303
    self.assertEqual(ParseUnit('130'), 132)
304

    
305
  def testFloating(self):
306
    self.assertEqual(ParseUnit('0'), 0)
307
    self.assertEqual(ParseUnit('0.5'), 4)
308
    self.assertEqual(ParseUnit('1.75'), 4)
309
    self.assertEqual(ParseUnit('1.99'), 4)
310
    self.assertEqual(ParseUnit('2.00'), 4)
311
    self.assertEqual(ParseUnit('2.01'), 4)
312
    self.assertEqual(ParseUnit('3.99'), 4)
313
    self.assertEqual(ParseUnit('4.00'), 4)
314
    self.assertEqual(ParseUnit('4.01'), 8)
315
    self.assertEqual(ParseUnit('1.5G'), 1536)
316
    self.assertEqual(ParseUnit('1.8G'), 1844)
317
    self.assertEqual(ParseUnit('8.28T'), 8682212)
318

    
319
  def testSuffixes(self):
320
    for sep in ('', ' ', '   ', "\t", "\t "):
321
      for suffix, scale in TestParseUnit.SCALES:
322
        for func in (lambda x: x, str.lower, str.upper):
323
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
324

    
325
  def testInvalidInput(self):
326
    for sep in ('-', '_', ',', 'a'):
327
      for suffix, _ in TestParseUnit.SCALES:
328
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
329

    
330
    for suffix, _ in TestParseUnit.SCALES:
331
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
332

    
333

    
334
class TestSshKeys(unittest.TestCase):
335
  """Test case for the AddAuthorizedKey function"""
336

    
337
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
338
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
339
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
340

    
341
  # NOTE: The MD5 sums below were calculated after manually
342
  #       checking the output files.
343

    
344
  def writeTestFile(self):
345
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
346
    f = os.fdopen(fd, 'w')
347
    try:
348
      f.write(TestSshKeys.KEY_A)
349
      f.write("\n")
350
      f.write(TestSshKeys.KEY_B)
351
      f.write("\n")
352
    finally:
353
      f.close()
354

    
355
    return tmpname
356

    
357
  def testAddingNewKey(self):
358
    tmpname = self.writeTestFile()
359
    try:
360
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
361

    
362
      f = open(tmpname, 'r')
363
      try:
364
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
365
                         'ccc71523108ca6e9d0343797dc3e9f16')
366
      finally:
367
        f.close()
368
    finally:
369
      os.unlink(tmpname)
370

    
371
  def testAddingAlmostButNotCompletlyTheSameKey(self):
372
    tmpname = self.writeTestFile()
373
    try:
374
      AddAuthorizedKey(tmpname,
375
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
376

    
377
      f = open(tmpname, 'r')
378
      try:
379
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
380
                         'f2c939d57addb5b3a6846884be896b46')
381
      finally:
382
        f.close()
383
    finally:
384
      os.unlink(tmpname)
385

    
386
  def testAddingExistingKeyWithSomeMoreSpaces(self):
387
    tmpname = self.writeTestFile()
388
    try:
389
      AddAuthorizedKey(tmpname,
390
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
391

    
392
      f = open(tmpname, 'r')
393
      try:
394
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
395
                         '4e612764808bd46337eb0f575415fc30')
396
      finally:
397
        f.close()
398
    finally:
399
      os.unlink(tmpname)
400

    
401
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
402
    tmpname = self.writeTestFile()
403
    try:
404
      RemoveAuthorizedKey(tmpname,
405
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
406

    
407
      f = open(tmpname, 'r')
408
      try:
409
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
410
                         '77516d987fca07f70e30b830b3e4f2ed')
411
      finally:
412
        f.close()
413
    finally:
414
      os.unlink(tmpname)
415

    
416
  def testRemovingNonExistingKey(self):
417
    tmpname = self.writeTestFile()
418
    try:
419
      RemoveAuthorizedKey(tmpname,
420
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
421

    
422
      f = open(tmpname, 'r')
423
      try:
424
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
425
                         '4e612764808bd46337eb0f575415fc30')
426
      finally:
427
        f.close()
428
    finally:
429
      os.unlink(tmpname)
430

    
431

    
432
class TestShellQuoting(unittest.TestCase):
433
  """Test case for shell quoting functions"""
434

    
435
  def testShellQuote(self):
436
    self.assertEqual(ShellQuote('abc'), "abc")
437
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
438
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
439
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
440
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
441

    
442
  def testShellQuoteArgs(self):
443
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
444
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
445
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
446

    
447

    
448
class TestTcpPing(unittest.TestCase):
449
  """Testcase for TCP version of ping - against listen(2)ing port"""
450

    
451
  def setUp(self):
452
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
453
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
454
    self.listenerport = self.listener.getsockname()[1]
455
    self.listener.listen(1)
456

    
457
  def tearDown(self):
458
    self.listener.shutdown(socket.SHUT_RDWR)
459
    del self.listener
460
    del self.listenerport
461

    
462
  def testTcpPingToLocalHostAccept(self):
463
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
464
                         constants.LOCALHOST_IP_ADDRESS,
465
                         self.listenerport,
466
                         timeout=10,
467
                         live_port_needed=True),
468
                 "failed to connect to test listener")
469

    
470

    
471
class TestTcpPingDeaf(unittest.TestCase):
472
  """Testcase for TCP version of ping - against non listen(2)ing port"""
473

    
474
  def setUp(self):
475
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
476
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
477
    self.deaflistenerport = self.deaflistener.getsockname()[1]
478

    
479
  def tearDown(self):
480
    del self.deaflistener
481
    del self.deaflistenerport
482

    
483
  def testTcpPingToLocalHostAcceptDeaf(self):
484
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
485
                        constants.LOCALHOST_IP_ADDRESS,
486
                        self.deaflistenerport,
487
                        timeout=constants.TCP_PING_TIMEOUT,
488
                        live_port_needed=True), # need successful connect(2)
489
                "successfully connected to deaf listener")
490

    
491
  def testTcpPingToLocalHostNoAccept(self):
492
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
493
                         constants.LOCALHOST_IP_ADDRESS,
494
                         self.deaflistenerport,
495
                         timeout=constants.TCP_PING_TIMEOUT,
496
                         live_port_needed=False), # ECONNREFUSED is OK
497
                 "failed to ping alive host on deaf port")
498

    
499

    
500
class TestListVisibleFiles(unittest.TestCase):
501
  """Test case for ListVisibleFiles"""
502

    
503
  def setUp(self):
504
    self.path = tempfile.mkdtemp()
505

    
506
  def tearDown(self):
507
    shutil.rmtree(self.path)
508

    
509
  def _test(self, files, expected):
510
    # Sort a copy
511
    expected = expected[:]
512
    expected.sort()
513

    
514
    for name in files:
515
      f = open(os.path.join(self.path, name), 'w')
516
      try:
517
        f.write("Test\n")
518
      finally:
519
        f.close()
520

    
521
    found = ListVisibleFiles(self.path)
522
    found.sort()
523

    
524
    self.assertEqual(found, expected)
525

    
526
  def testAllVisible(self):
527
    files = ["a", "b", "c"]
528
    expected = files
529
    self._test(files, expected)
530

    
531
  def testNoneVisible(self):
532
    files = [".a", ".b", ".c"]
533
    expected = []
534
    self._test(files, expected)
535

    
536
  def testSomeVisible(self):
537
    files = ["a", "b", ".c"]
538
    expected = ["a", "b"]
539
    self._test(files, expected)
540

    
541

    
542
if __name__ == '__main__':
543
  unittest.main()