Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 59072e7e

History | View | Annotate | Download (16.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31
import shutil
32
import re
33

    
34
import ganeti
35
from ganeti import constants
36
from ganeti import utils
37
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
38
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
39
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
40
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles
41
from ganeti.errors import LockError, UnitParseError
42

    
43

    
44
class TestIsProcessAlive(unittest.TestCase):
45
  """Testing case for IsProcessAlive"""
46
  def setUp(self):
47
    # create a zombie and a (hopefully) non-existing process id
48
    self.pid_zombie = os.fork()
49
    if self.pid_zombie == 0:
50
      os._exit(0)
51
    elif self.pid_zombie < 0:
52
      raise SystemError("can't fork")
53
    self.pid_non_existing = os.fork()
54
    if self.pid_non_existing == 0:
55
      os._exit(0)
56
    elif self.pid_non_existing > 0:
57
      os.waitpid(self.pid_non_existing, 0)
58
    else:
59
      raise SystemError("can't fork")
60

    
61

    
62
  def testExists(self):
63
    mypid = os.getpid()
64
    self.assert_(IsProcessAlive(mypid),
65
                 "can't find myself running")
66

    
67
  def testZombie(self):
68
    self.assert_(not IsProcessAlive(self.pid_zombie),
69
                 "zombie not detected as zombie")
70

    
71

    
72
  def testNotExisting(self):
73
    self.assert_(not IsProcessAlive(self.pid_non_existing),
74
                 "noexisting process detected")
75

    
76

    
77
class TestLocking(unittest.TestCase):
78
  """Testing case for the Lock/Unlock functions"""
79
  def clean_lock(self, name):
80
    try:
81
      ganeti.utils.Unlock("unittest")
82
    except LockError:
83
      pass
84

    
85

    
86
  def testLock(self):
87
    self.clean_lock("unittest")
88
    self.assertEqual(None, Lock("unittest"))
89

    
90

    
91
  def testUnlock(self):
92
    self.clean_lock("unittest")
93
    ganeti.utils.Lock("unittest")
94
    self.assertEqual(None, Unlock("unittest"))
95

    
96

    
97
  def testDoubleLock(self):
98
    self.clean_lock("unittest")
99
    ganeti.utils.Lock("unittest")
100
    self.assertRaises(LockError, Lock, "unittest")
101

    
102

    
103
class TestRunCmd(unittest.TestCase):
104
  """Testing case for the RunCmd function"""
105

    
106
  def setUp(self):
107
    self.magic = time.ctime() + " ganeti test"
108

    
109
  def testOk(self):
110
    """Test successful exit code"""
111
    result = RunCmd("/bin/sh -c 'exit 0'")
112
    self.assertEqual(result.exit_code, 0)
113

    
114
  def testFail(self):
115
    """Test fail exit code"""
116
    result = RunCmd("/bin/sh -c 'exit 1'")
117
    self.assertEqual(result.exit_code, 1)
118

    
119

    
120
  def testStdout(self):
121
    """Test standard output"""
122
    cmd = 'echo -n "%s"' % self.magic
123
    result = RunCmd("/bin/sh -c '%s'" % cmd)
124
    self.assertEqual(result.stdout, self.magic)
125

    
126

    
127
  def testStderr(self):
128
    """Test standard error"""
129
    cmd = 'echo -n "%s"' % self.magic
130
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
131
    self.assertEqual(result.stderr, self.magic)
132

    
133

    
134
  def testCombined(self):
135
    """Test combined output"""
136
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
137
    result = RunCmd("/bin/sh -c '%s'" % cmd)
138
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
139

    
140
  def testSignal(self):
141
    """Test standard error"""
142
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
143
    self.assertEqual(result.signal, 15)
144

    
145
  def testListRun(self):
146
    """Test list runs"""
147
    result = RunCmd(["true"])
148
    self.assertEqual(result.signal, None)
149
    self.assertEqual(result.exit_code, 0)
150
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
151
    self.assertEqual(result.signal, None)
152
    self.assertEqual(result.exit_code, 1)
153
    result = RunCmd(["echo", "-n", self.magic])
154
    self.assertEqual(result.signal, None)
155
    self.assertEqual(result.exit_code, 0)
156
    self.assertEqual(result.stdout, self.magic)
157

    
158
  def testLang(self):
159
    """Test locale environment"""
160
    old_env = os.environ.copy()
161
    try:
162
      os.environ["LANG"] = "en_US.UTF-8"
163
      os.environ["LC_ALL"] = "en_US.UTF-8"
164
      result = RunCmd(["locale"])
165
      for line in result.output.splitlines():
166
        key, value = line.split("=", 1)
167
        # Ignore these variables, they're overridden by LC_ALL
168
        if key == "LANG" or key == "LANGUAGE":
169
          continue
170
        self.failIf(value and value != "C" and value != '"C"',
171
            "Variable %s is set to the invalid value '%s'" % (key, value))
172
    finally:
173
      os.environ = old_env
174

    
175

    
176
class TestRemoveFile(unittest.TestCase):
177
  """Test case for the RemoveFile function"""
178

    
179
  def setUp(self):
180
    """Create a temp dir and file for each case"""
181
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
182
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
183
    os.close(fd)
184

    
185
  def tearDown(self):
186
    if os.path.exists(self.tmpfile):
187
      os.unlink(self.tmpfile)
188
    os.rmdir(self.tmpdir)
189

    
190

    
191
  def testIgnoreDirs(self):
192
    """Test that RemoveFile() ignores directories"""
193
    self.assertEqual(None, RemoveFile(self.tmpdir))
194

    
195

    
196
  def testIgnoreNotExisting(self):
197
    """Test that RemoveFile() ignores non-existing files"""
198
    RemoveFile(self.tmpfile)
199
    RemoveFile(self.tmpfile)
200

    
201

    
202
  def testRemoveFile(self):
203
    """Test that RemoveFile does remove a file"""
204
    RemoveFile(self.tmpfile)
205
    if os.path.exists(self.tmpfile):
206
      self.fail("File '%s' not removed" % self.tmpfile)
207

    
208

    
209
  def testRemoveSymlink(self):
210
    """Test that RemoveFile does remove symlinks"""
211
    symlink = self.tmpdir + "/symlink"
212
    os.symlink("no-such-file", symlink)
213
    RemoveFile(symlink)
214
    if os.path.exists(symlink):
215
      self.fail("File '%s' not removed" % symlink)
216
    os.symlink(self.tmpfile, symlink)
217
    RemoveFile(symlink)
218
    if os.path.exists(symlink):
219
      self.fail("File '%s' not removed" % symlink)
220

    
221

    
222
class TestCheckdict(unittest.TestCase):
223
  """Test case for the CheckDict function"""
224

    
225
  def testAdd(self):
226
    """Test that CheckDict adds a missing key with the correct value"""
227

    
228
    tgt = {'a':1}
229
    tmpl = {'b': 2}
230
    CheckDict(tgt, tmpl)
231
    if 'b' not in tgt or tgt['b'] != 2:
232
      self.fail("Failed to update dict")
233

    
234

    
235
  def testNoUpdate(self):
236
    """Test that CheckDict does not overwrite an existing key"""
237
    tgt = {'a':1, 'b': 3}
238
    tmpl = {'b': 2}
239
    CheckDict(tgt, tmpl)
240
    self.failUnlessEqual(tgt['b'], 3)
241

    
242

    
243
class TestMatchNameComponent(unittest.TestCase):
244
  """Test case for the MatchNameComponent function"""
245

    
246
  def testEmptyList(self):
247
    """Test that there is no match against an empty list"""
248

    
249
    self.failUnlessEqual(MatchNameComponent("", []), None)
250
    self.failUnlessEqual(MatchNameComponent("test", []), None)
251

    
252
  def testSingleMatch(self):
253
    """Test that a single match is performed correctly"""
254
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
255
    for key in "test2", "test2.example", "test2.example.com":
256
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
257

    
258
  def testMultipleMatches(self):
259
    """Test that a multiple match is returned as None"""
260
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
261
    for key in "test1", "test1.example":
262
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
263

    
264

    
265
class TestFormatUnit(unittest.TestCase):
266
  """Test case for the FormatUnit function"""
267

    
268
  def testMiB(self):
269
    self.assertEqual(FormatUnit(1), '1M')
270
    self.assertEqual(FormatUnit(100), '100M')
271
    self.assertEqual(FormatUnit(1023), '1023M')
272

    
273
  def testGiB(self):
274
    self.assertEqual(FormatUnit(1024), '1.0G')
275
    self.assertEqual(FormatUnit(1536), '1.5G')
276
    self.assertEqual(FormatUnit(17133), '16.7G')
277
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
278

    
279
  def testTiB(self):
280
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
281
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
282
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
283

    
284

    
285
class TestParseUnit(unittest.TestCase):
286
  """Test case for the ParseUnit function"""
287

    
288
  SCALES = (('', 1),
289
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
290
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
291
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
292

    
293
  def testRounding(self):
294
    self.assertEqual(ParseUnit('0'), 0)
295
    self.assertEqual(ParseUnit('1'), 4)
296
    self.assertEqual(ParseUnit('2'), 4)
297
    self.assertEqual(ParseUnit('3'), 4)
298

    
299
    self.assertEqual(ParseUnit('124'), 124)
300
    self.assertEqual(ParseUnit('125'), 128)
301
    self.assertEqual(ParseUnit('126'), 128)
302
    self.assertEqual(ParseUnit('127'), 128)
303
    self.assertEqual(ParseUnit('128'), 128)
304
    self.assertEqual(ParseUnit('129'), 132)
305
    self.assertEqual(ParseUnit('130'), 132)
306

    
307
  def testFloating(self):
308
    self.assertEqual(ParseUnit('0'), 0)
309
    self.assertEqual(ParseUnit('0.5'), 4)
310
    self.assertEqual(ParseUnit('1.75'), 4)
311
    self.assertEqual(ParseUnit('1.99'), 4)
312
    self.assertEqual(ParseUnit('2.00'), 4)
313
    self.assertEqual(ParseUnit('2.01'), 4)
314
    self.assertEqual(ParseUnit('3.99'), 4)
315
    self.assertEqual(ParseUnit('4.00'), 4)
316
    self.assertEqual(ParseUnit('4.01'), 8)
317
    self.assertEqual(ParseUnit('1.5G'), 1536)
318
    self.assertEqual(ParseUnit('1.8G'), 1844)
319
    self.assertEqual(ParseUnit('8.28T'), 8682212)
320

    
321
  def testSuffixes(self):
322
    for sep in ('', ' ', '   ', "\t", "\t "):
323
      for suffix, scale in TestParseUnit.SCALES:
324
        for func in (lambda x: x, str.lower, str.upper):
325
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
326

    
327
  def testInvalidInput(self):
328
    for sep in ('-', '_', ',', 'a'):
329
      for suffix, _ in TestParseUnit.SCALES:
330
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
331

    
332
    for suffix, _ in TestParseUnit.SCALES:
333
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
334

    
335

    
336
class TestSshKeys(unittest.TestCase):
337
  """Test case for the AddAuthorizedKey function"""
338

    
339
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
340
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
341
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
342

    
343
  # NOTE: The MD5 sums below were calculated after manually
344
  #       checking the output files.
345

    
346
  def writeTestFile(self):
347
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
348
    f = os.fdopen(fd, 'w')
349
    try:
350
      f.write(TestSshKeys.KEY_A)
351
      f.write("\n")
352
      f.write(TestSshKeys.KEY_B)
353
      f.write("\n")
354
    finally:
355
      f.close()
356

    
357
    return tmpname
358

    
359
  def testAddingNewKey(self):
360
    tmpname = self.writeTestFile()
361
    try:
362
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
363

    
364
      f = open(tmpname, 'r')
365
      try:
366
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
367
                         'ccc71523108ca6e9d0343797dc3e9f16')
368
      finally:
369
        f.close()
370
    finally:
371
      os.unlink(tmpname)
372

    
373
  def testAddingAlmostButNotCompletlyTheSameKey(self):
374
    tmpname = self.writeTestFile()
375
    try:
376
      AddAuthorizedKey(tmpname,
377
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
378

    
379
      f = open(tmpname, 'r')
380
      try:
381
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
382
                         'f2c939d57addb5b3a6846884be896b46')
383
      finally:
384
        f.close()
385
    finally:
386
      os.unlink(tmpname)
387

    
388
  def testAddingExistingKeyWithSomeMoreSpaces(self):
389
    tmpname = self.writeTestFile()
390
    try:
391
      AddAuthorizedKey(tmpname,
392
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
393

    
394
      f = open(tmpname, 'r')
395
      try:
396
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
397
                         '4e612764808bd46337eb0f575415fc30')
398
      finally:
399
        f.close()
400
    finally:
401
      os.unlink(tmpname)
402

    
403
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
404
    tmpname = self.writeTestFile()
405
    try:
406
      RemoveAuthorizedKey(tmpname,
407
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
408

    
409
      f = open(tmpname, 'r')
410
      try:
411
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
412
                         '77516d987fca07f70e30b830b3e4f2ed')
413
      finally:
414
        f.close()
415
    finally:
416
      os.unlink(tmpname)
417

    
418
  def testRemovingNonExistingKey(self):
419
    tmpname = self.writeTestFile()
420
    try:
421
      RemoveAuthorizedKey(tmpname,
422
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
423

    
424
      f = open(tmpname, 'r')
425
      try:
426
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
427
                         '4e612764808bd46337eb0f575415fc30')
428
      finally:
429
        f.close()
430
    finally:
431
      os.unlink(tmpname)
432

    
433

    
434
class TestShellQuoting(unittest.TestCase):
435
  """Test case for shell quoting functions"""
436

    
437
  def testShellQuote(self):
438
    self.assertEqual(ShellQuote('abc'), "abc")
439
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
440
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
441
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
442
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
443

    
444
  def testShellQuoteArgs(self):
445
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
446
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
447
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
448

    
449

    
450
class TestTcpPing(unittest.TestCase):
451
  """Testcase for TCP version of ping - against listen(2)ing port"""
452

    
453
  def setUp(self):
454
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
455
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
456
    self.listenerport = self.listener.getsockname()[1]
457
    self.listener.listen(1)
458

    
459
  def tearDown(self):
460
    self.listener.shutdown(socket.SHUT_RDWR)
461
    del self.listener
462
    del self.listenerport
463

    
464
  def testTcpPingToLocalHostAccept(self):
465
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
466
                         constants.LOCALHOST_IP_ADDRESS,
467
                         self.listenerport,
468
                         timeout=10,
469
                         live_port_needed=True),
470
                 "failed to connect to test listener")
471

    
472

    
473
class TestTcpPingDeaf(unittest.TestCase):
474
  """Testcase for TCP version of ping - against non listen(2)ing port"""
475

    
476
  def setUp(self):
477
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
478
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
479
    self.deaflistenerport = self.deaflistener.getsockname()[1]
480

    
481
  def tearDown(self):
482
    del self.deaflistener
483
    del self.deaflistenerport
484

    
485
  def testTcpPingToLocalHostAcceptDeaf(self):
486
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
487
                        constants.LOCALHOST_IP_ADDRESS,
488
                        self.deaflistenerport,
489
                        timeout=constants.TCP_PING_TIMEOUT,
490
                        live_port_needed=True), # need successful connect(2)
491
                "successfully connected to deaf listener")
492

    
493
  def testTcpPingToLocalHostNoAccept(self):
494
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
495
                         constants.LOCALHOST_IP_ADDRESS,
496
                         self.deaflistenerport,
497
                         timeout=constants.TCP_PING_TIMEOUT,
498
                         live_port_needed=False), # ECONNREFUSED is OK
499
                 "failed to ping alive host on deaf port")
500

    
501

    
502
class TestListVisibleFiles(unittest.TestCase):
503
  """Test case for ListVisibleFiles"""
504

    
505
  def setUp(self):
506
    self.path = tempfile.mkdtemp()
507

    
508
  def tearDown(self):
509
    shutil.rmtree(self.path)
510

    
511
  def _test(self, files, expected):
512
    # Sort a copy
513
    expected = expected[:]
514
    expected.sort()
515

    
516
    for name in files:
517
      f = open(os.path.join(self.path, name), 'w')
518
      try:
519
        f.write("Test\n")
520
      finally:
521
        f.close()
522

    
523
    found = ListVisibleFiles(self.path)
524
    found.sort()
525

    
526
    self.assertEqual(found, expected)
527

    
528
  def testAllVisible(self):
529
    files = ["a", "b", "c"]
530
    expected = files
531
    self._test(files, expected)
532

    
533
  def testNoneVisible(self):
534
    files = [".a", ".b", ".c"]
535
    expected = []
536
    self._test(files, expected)
537

    
538
  def testSomeVisible(self):
539
    files = ["a", "b", ".c"]
540
    expected = ["a", "b"]
541
    self._test(files, expected)
542

    
543

    
544
class TestGetUUID(unittest.TestCase):
545
  """Test case for GetUUID"""
546

    
547
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
548
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
549

    
550
  def runTest(self):
551
    self.failUnless(self._re_uuid.match(utils.GetUUID()))
552

    
553

    
554
if __name__ == '__main__':
555
  unittest.main()