Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ b77ba978

History | View | Annotate | Download (25.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35

    
36
import ganeti
37
import testutils
38
from ganeti import constants
39
from ganeti import utils
40
from ganeti.utils import IsProcessAlive, RunCmd, \
41
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
42
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
43
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
44
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree
45
from ganeti.errors import LockError, UnitParseError, GenericError, \
46
     ProgrammerError
47

    
48
def _ChildHandler(signal, stack):
49
  global _ChildFlag
50
  _ChildFlag = True
51

    
52

    
53
class TestIsProcessAlive(unittest.TestCase):
54
  """Testing case for IsProcessAlive"""
55

    
56
  def setUp(self):
57
    global _ChildFlag
58
    # create a (most probably) non-existing process-id
59
    self.pid_non_existing = os.fork()
60
    if self.pid_non_existing == 0:
61
      os._exit(0)
62
    elif self.pid_non_existing > 0:
63
      os.waitpid(self.pid_non_existing, 0)
64
    else:
65
      raise SystemError("can't fork")
66
    _ChildFlag = False
67
    # Use _ChildHandler for SIGCHLD
68
    self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
69
    # create a zombie
70
    self.pid_zombie = os.fork()
71
    if self.pid_zombie == 0:
72
      os._exit(0)
73
    elif self.pid_zombie < 0:
74
      raise SystemError("can't fork")
75

    
76
  def tearDown(self):
77
    signal.signal(signal.SIGCHLD, self.chldOrig)
78

    
79
  def testExists(self):
80
    mypid = os.getpid()
81
    self.assert_(IsProcessAlive(mypid),
82
                 "can't find myself running")
83

    
84
  def testZombie(self):
85
    global _ChildFlag
86
    timeout = 10
87

    
88
    while not _ChildFlag:
89
      if timeout >= 0:
90
        time.sleep(0.2)
91
        timeout -= 0.2
92
      else:
93
        self.fail("timed out waiting for child's signal")
94
        break # not executed...
95

    
96
    self.assert_(not IsProcessAlive(self.pid_zombie),
97
                 "zombie not detected as zombie")
98

    
99
  def testNotExisting(self):
100
    self.assert_(not IsProcessAlive(self.pid_non_existing),
101
                 "noexisting process detected")
102

    
103

    
104
class TestPidFileFunctions(unittest.TestCase):
105
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
106

    
107
  def setUp(self):
108
    self.dir = tempfile.mkdtemp()
109
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
110
    utils.DaemonPidFileName = self.f_dpn
111

    
112
  def testPidFileFunctions(self):
113
    pid_file = self.f_dpn('test')
114
    utils.WritePidFile('test')
115
    self.failUnless(os.path.exists(pid_file),
116
                    "PID file should have been created")
117
    read_pid = utils.ReadPidFile(pid_file)
118
    self.failUnlessEqual(read_pid, os.getpid())
119
    self.failUnless(utils.IsProcessAlive(read_pid))
120
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
121
    utils.RemovePidFile('test')
122
    self.failIf(os.path.exists(pid_file),
123
                "PID file should not exist anymore")
124
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
125
                         "ReadPidFile should return 0 for missing pid file")
126
    fh = open(pid_file, "w")
127
    fh.write("blah\n")
128
    fh.close()
129
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
130
                         "ReadPidFile should return 0 for invalid pid file")
131
    utils.RemovePidFile('test')
132
    self.failIf(os.path.exists(pid_file),
133
                "PID file should not exist anymore")
134

    
135
  def testKill(self):
136
    pid_file = self.f_dpn('child')
137
    r_fd, w_fd = os.pipe()
138
    new_pid = os.fork()
139
    if new_pid == 0: #child
140
      utils.WritePidFile('child')
141
      os.write(w_fd, 'a')
142
      signal.pause()
143
      os._exit(0)
144
      return
145
    # else we are in the parent
146
    # wait until the child has written the pid file
147
    os.read(r_fd, 1)
148
    read_pid = utils.ReadPidFile(pid_file)
149
    self.failUnlessEqual(read_pid, new_pid)
150
    self.failUnless(utils.IsProcessAlive(new_pid))
151
    utils.KillProcess(new_pid)
152
    self.failIf(utils.IsProcessAlive(new_pid))
153
    utils.RemovePidFile('child')
154
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
155

    
156
  def tearDown(self):
157
    for name in os.listdir(self.dir):
158
      os.unlink(os.path.join(self.dir, name))
159
    os.rmdir(self.dir)
160

    
161

    
162
class TestRunCmd(unittest.TestCase):
163
  """Testing case for the RunCmd function"""
164

    
165
  def setUp(self):
166
    self.magic = time.ctime() + " ganeti test"
167

    
168
  def testOk(self):
169
    """Test successful exit code"""
170
    result = RunCmd("/bin/sh -c 'exit 0'")
171
    self.assertEqual(result.exit_code, 0)
172

    
173
  def testFail(self):
174
    """Test fail exit code"""
175
    result = RunCmd("/bin/sh -c 'exit 1'")
176
    self.assertEqual(result.exit_code, 1)
177

    
178

    
179
  def testStdout(self):
180
    """Test standard output"""
181
    cmd = 'echo -n "%s"' % self.magic
182
    result = RunCmd("/bin/sh -c '%s'" % cmd)
183
    self.assertEqual(result.stdout, self.magic)
184

    
185

    
186
  def testStderr(self):
187
    """Test standard error"""
188
    cmd = 'echo -n "%s"' % self.magic
189
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
190
    self.assertEqual(result.stderr, self.magic)
191

    
192

    
193
  def testCombined(self):
194
    """Test combined output"""
195
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
196
    result = RunCmd("/bin/sh -c '%s'" % cmd)
197
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
198

    
199
  def testSignal(self):
200
    """Test signal"""
201
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
202
    self.assertEqual(result.signal, 15)
203

    
204
  def testListRun(self):
205
    """Test list runs"""
206
    result = RunCmd(["true"])
207
    self.assertEqual(result.signal, None)
208
    self.assertEqual(result.exit_code, 0)
209
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
210
    self.assertEqual(result.signal, None)
211
    self.assertEqual(result.exit_code, 1)
212
    result = RunCmd(["echo", "-n", self.magic])
213
    self.assertEqual(result.signal, None)
214
    self.assertEqual(result.exit_code, 0)
215
    self.assertEqual(result.stdout, self.magic)
216

    
217
  def testLang(self):
218
    """Test locale environment"""
219
    old_env = os.environ.copy()
220
    try:
221
      os.environ["LANG"] = "en_US.UTF-8"
222
      os.environ["LC_ALL"] = "en_US.UTF-8"
223
      result = RunCmd(["locale"])
224
      for line in result.output.splitlines():
225
        key, value = line.split("=", 1)
226
        # Ignore these variables, they're overridden by LC_ALL
227
        if key == "LANG" or key == "LANGUAGE":
228
          continue
229
        self.failIf(value and value != "C" and value != '"C"',
230
            "Variable %s is set to the invalid value '%s'" % (key, value))
231
    finally:
232
      os.environ = old_env
233

    
234

    
235
class TestRemoveFile(unittest.TestCase):
236
  """Test case for the RemoveFile function"""
237

    
238
  def setUp(self):
239
    """Create a temp dir and file for each case"""
240
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
241
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
242
    os.close(fd)
243

    
244
  def tearDown(self):
245
    if os.path.exists(self.tmpfile):
246
      os.unlink(self.tmpfile)
247
    os.rmdir(self.tmpdir)
248

    
249

    
250
  def testIgnoreDirs(self):
251
    """Test that RemoveFile() ignores directories"""
252
    self.assertEqual(None, RemoveFile(self.tmpdir))
253

    
254

    
255
  def testIgnoreNotExisting(self):
256
    """Test that RemoveFile() ignores non-existing files"""
257
    RemoveFile(self.tmpfile)
258
    RemoveFile(self.tmpfile)
259

    
260

    
261
  def testRemoveFile(self):
262
    """Test that RemoveFile does remove a file"""
263
    RemoveFile(self.tmpfile)
264
    if os.path.exists(self.tmpfile):
265
      self.fail("File '%s' not removed" % self.tmpfile)
266

    
267

    
268
  def testRemoveSymlink(self):
269
    """Test that RemoveFile does remove symlinks"""
270
    symlink = self.tmpdir + "/symlink"
271
    os.symlink("no-such-file", symlink)
272
    RemoveFile(symlink)
273
    if os.path.exists(symlink):
274
      self.fail("File '%s' not removed" % symlink)
275
    os.symlink(self.tmpfile, symlink)
276
    RemoveFile(symlink)
277
    if os.path.exists(symlink):
278
      self.fail("File '%s' not removed" % symlink)
279

    
280

    
281
class TestCheckdict(unittest.TestCase):
282
  """Test case for the CheckDict function"""
283

    
284
  def testAdd(self):
285
    """Test that CheckDict adds a missing key with the correct value"""
286

    
287
    tgt = {'a':1}
288
    tmpl = {'b': 2}
289
    CheckDict(tgt, tmpl)
290
    if 'b' not in tgt or tgt['b'] != 2:
291
      self.fail("Failed to update dict")
292

    
293

    
294
  def testNoUpdate(self):
295
    """Test that CheckDict does not overwrite an existing key"""
296
    tgt = {'a':1, 'b': 3}
297
    tmpl = {'b': 2}
298
    CheckDict(tgt, tmpl)
299
    self.failUnlessEqual(tgt['b'], 3)
300

    
301

    
302
class TestMatchNameComponent(unittest.TestCase):
303
  """Test case for the MatchNameComponent function"""
304

    
305
  def testEmptyList(self):
306
    """Test that there is no match against an empty list"""
307

    
308
    self.failUnlessEqual(MatchNameComponent("", []), None)
309
    self.failUnlessEqual(MatchNameComponent("test", []), None)
310

    
311
  def testSingleMatch(self):
312
    """Test that a single match is performed correctly"""
313
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
314
    for key in "test2", "test2.example", "test2.example.com":
315
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
316

    
317
  def testMultipleMatches(self):
318
    """Test that a multiple match is returned as None"""
319
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
320
    for key in "test1", "test1.example":
321
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
322

    
323

    
324
class TestFormatUnit(unittest.TestCase):
325
  """Test case for the FormatUnit function"""
326

    
327
  def testMiB(self):
328
    self.assertEqual(FormatUnit(1), '1M')
329
    self.assertEqual(FormatUnit(100), '100M')
330
    self.assertEqual(FormatUnit(1023), '1023M')
331

    
332
  def testGiB(self):
333
    self.assertEqual(FormatUnit(1024), '1.0G')
334
    self.assertEqual(FormatUnit(1536), '1.5G')
335
    self.assertEqual(FormatUnit(17133), '16.7G')
336
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
337

    
338
  def testTiB(self):
339
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
340
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
341
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
342

    
343

    
344
class TestParseUnit(unittest.TestCase):
345
  """Test case for the ParseUnit function"""
346

    
347
  SCALES = (('', 1),
348
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
349
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
350
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
351

    
352
  def testRounding(self):
353
    self.assertEqual(ParseUnit('0'), 0)
354
    self.assertEqual(ParseUnit('1'), 4)
355
    self.assertEqual(ParseUnit('2'), 4)
356
    self.assertEqual(ParseUnit('3'), 4)
357

    
358
    self.assertEqual(ParseUnit('124'), 124)
359
    self.assertEqual(ParseUnit('125'), 128)
360
    self.assertEqual(ParseUnit('126'), 128)
361
    self.assertEqual(ParseUnit('127'), 128)
362
    self.assertEqual(ParseUnit('128'), 128)
363
    self.assertEqual(ParseUnit('129'), 132)
364
    self.assertEqual(ParseUnit('130'), 132)
365

    
366
  def testFloating(self):
367
    self.assertEqual(ParseUnit('0'), 0)
368
    self.assertEqual(ParseUnit('0.5'), 4)
369
    self.assertEqual(ParseUnit('1.75'), 4)
370
    self.assertEqual(ParseUnit('1.99'), 4)
371
    self.assertEqual(ParseUnit('2.00'), 4)
372
    self.assertEqual(ParseUnit('2.01'), 4)
373
    self.assertEqual(ParseUnit('3.99'), 4)
374
    self.assertEqual(ParseUnit('4.00'), 4)
375
    self.assertEqual(ParseUnit('4.01'), 8)
376
    self.assertEqual(ParseUnit('1.5G'), 1536)
377
    self.assertEqual(ParseUnit('1.8G'), 1844)
378
    self.assertEqual(ParseUnit('8.28T'), 8682212)
379

    
380
  def testSuffixes(self):
381
    for sep in ('', ' ', '   ', "\t", "\t "):
382
      for suffix, scale in TestParseUnit.SCALES:
383
        for func in (lambda x: x, str.lower, str.upper):
384
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
385
                           1024 * scale)
386

    
387
  def testInvalidInput(self):
388
    for sep in ('-', '_', ',', 'a'):
389
      for suffix, _ in TestParseUnit.SCALES:
390
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
391

    
392
    for suffix, _ in TestParseUnit.SCALES:
393
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
394

    
395

    
396
class TestSshKeys(testutils.GanetiTestCase):
397
  """Test case for the AddAuthorizedKey function"""
398

    
399
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
400
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
401
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
402

    
403
  def setUp(self):
404
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
405
    try:
406
      handle = os.fdopen(fd, 'w')
407
      try:
408
        handle.write("%s\n" % TestSshKeys.KEY_A)
409
        handle.write("%s\n" % TestSshKeys.KEY_B)
410
      finally:
411
        handle.close()
412
    except:
413
      utils.RemoveFile(self.tmpname)
414
      raise
415

    
416
  def tearDown(self):
417
    utils.RemoveFile(self.tmpname)
418
    del self.tmpname
419

    
420
  def testAddingNewKey(self):
421
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
422

    
423
    self.assertFileContent(self.tmpname,
424
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
425
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
426
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
427
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
428

    
429
  def testAddingAlmostButNotCompletelyTheSameKey(self):
430
    AddAuthorizedKey(self.tmpname,
431
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
432

    
433
    self.assertFileContent(self.tmpname,
434
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
435
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
436
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
437
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
438

    
439
  def testAddingExistingKeyWithSomeMoreSpaces(self):
440
    AddAuthorizedKey(self.tmpname,
441
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
442

    
443
    self.assertFileContent(self.tmpname,
444
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
445
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
446
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
447

    
448
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
449
    RemoveAuthorizedKey(self.tmpname,
450
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
451

    
452
    self.assertFileContent(self.tmpname,
453
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
454
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
455

    
456
  def testRemovingNonExistingKey(self):
457
    RemoveAuthorizedKey(self.tmpname,
458
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
459

    
460
    self.assertFileContent(self.tmpname,
461
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
462
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
463
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
464

    
465

    
466
class TestEtcHosts(testutils.GanetiTestCase):
467
  """Test functions modifying /etc/hosts"""
468

    
469
  def setUp(self):
470
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
471
    try:
472
      handle = os.fdopen(fd, 'w')
473
      try:
474
        handle.write('# This is a test file for /etc/hosts\n')
475
        handle.write('127.0.0.1\tlocalhost\n')
476
        handle.write('192.168.1.1 router gw\n')
477
      finally:
478
        handle.close()
479
    except:
480
      utils.RemoveFile(self.tmpname)
481
      raise
482

    
483
  def tearDown(self):
484
    utils.RemoveFile(self.tmpname)
485
    del self.tmpname
486

    
487
  def testSettingNewIp(self):
488
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
489

    
490
    self.assertFileContent(self.tmpname,
491
      "# This is a test file for /etc/hosts\n"
492
      "127.0.0.1\tlocalhost\n"
493
      "192.168.1.1 router gw\n"
494
      "1.2.3.4\tmyhost.domain.tld myhost\n")
495

    
496
  def testSettingExistingIp(self):
497
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
498
                     ['myhost'])
499

    
500
    self.assertFileContent(self.tmpname,
501
      "# This is a test file for /etc/hosts\n"
502
      "127.0.0.1\tlocalhost\n"
503
      "192.168.1.1\tmyhost.domain.tld myhost\n")
504

    
505
  def testSettingDuplicateName(self):
506
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
507

    
508
    self.assertFileContent(self.tmpname,
509
      "# This is a test file for /etc/hosts\n"
510
      "127.0.0.1\tlocalhost\n"
511
      "192.168.1.1 router gw\n"
512
      "1.2.3.4\tmyhost\n")
513

    
514
  def testRemovingExistingHost(self):
515
    RemoveEtcHostsEntry(self.tmpname, 'router')
516

    
517
    self.assertFileContent(self.tmpname,
518
      "# This is a test file for /etc/hosts\n"
519
      "127.0.0.1\tlocalhost\n"
520
      "192.168.1.1 gw\n")
521

    
522
  def testRemovingSingleExistingHost(self):
523
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
524

    
525
    self.assertFileContent(self.tmpname,
526
      "# This is a test file for /etc/hosts\n"
527
      "192.168.1.1 router gw\n")
528

    
529
  def testRemovingNonExistingHost(self):
530
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
531

    
532
    self.assertFileContent(self.tmpname,
533
      "# This is a test file for /etc/hosts\n"
534
      "127.0.0.1\tlocalhost\n"
535
      "192.168.1.1 router gw\n")
536

    
537
  def testRemovingAlias(self):
538
    RemoveEtcHostsEntry(self.tmpname, 'gw')
539

    
540
    self.assertFileContent(self.tmpname,
541
      "# This is a test file for /etc/hosts\n"
542
      "127.0.0.1\tlocalhost\n"
543
      "192.168.1.1 router\n")
544

    
545

    
546
class TestShellQuoting(unittest.TestCase):
547
  """Test case for shell quoting functions"""
548

    
549
  def testShellQuote(self):
550
    self.assertEqual(ShellQuote('abc'), "abc")
551
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
552
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
553
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
554
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
555

    
556
  def testShellQuoteArgs(self):
557
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
558
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
559
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
560

    
561

    
562
class TestTcpPing(unittest.TestCase):
563
  """Testcase for TCP version of ping - against listen(2)ing port"""
564

    
565
  def setUp(self):
566
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
567
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
568
    self.listenerport = self.listener.getsockname()[1]
569
    self.listener.listen(1)
570

    
571
  def tearDown(self):
572
    self.listener.shutdown(socket.SHUT_RDWR)
573
    del self.listener
574
    del self.listenerport
575

    
576
  def testTcpPingToLocalHostAccept(self):
577
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
578
                         self.listenerport,
579
                         timeout=10,
580
                         live_port_needed=True,
581
                         source=constants.LOCALHOST_IP_ADDRESS,
582
                         ),
583
                 "failed to connect to test listener")
584

    
585
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
586
                         self.listenerport,
587
                         timeout=10,
588
                         live_port_needed=True,
589
                         ),
590
                 "failed to connect to test listener (no source)")
591

    
592

    
593
class TestTcpPingDeaf(unittest.TestCase):
594
  """Testcase for TCP version of ping - against non listen(2)ing port"""
595

    
596
  def setUp(self):
597
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
598
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
599
    self.deaflistenerport = self.deaflistener.getsockname()[1]
600

    
601
  def tearDown(self):
602
    del self.deaflistener
603
    del self.deaflistenerport
604

    
605
  def testTcpPingToLocalHostAcceptDeaf(self):
606
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
607
                        self.deaflistenerport,
608
                        timeout=constants.TCP_PING_TIMEOUT,
609
                        live_port_needed=True,
610
                        source=constants.LOCALHOST_IP_ADDRESS,
611
                        ), # need successful connect(2)
612
                "successfully connected to deaf listener")
613

    
614
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
615
                        self.deaflistenerport,
616
                        timeout=constants.TCP_PING_TIMEOUT,
617
                        live_port_needed=True,
618
                        ), # need successful connect(2)
619
                "successfully connected to deaf listener (no source addr)")
620

    
621
  def testTcpPingToLocalHostNoAccept(self):
622
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
623
                         self.deaflistenerport,
624
                         timeout=constants.TCP_PING_TIMEOUT,
625
                         live_port_needed=False,
626
                         source=constants.LOCALHOST_IP_ADDRESS,
627
                         ), # ECONNREFUSED is OK
628
                 "failed to ping alive host on deaf port")
629

    
630
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
631
                         self.deaflistenerport,
632
                         timeout=constants.TCP_PING_TIMEOUT,
633
                         live_port_needed=False,
634
                         ), # ECONNREFUSED is OK
635
                 "failed to ping alive host on deaf port (no source addr)")
636

    
637

    
638
class TestListVisibleFiles(unittest.TestCase):
639
  """Test case for ListVisibleFiles"""
640

    
641
  def setUp(self):
642
    self.path = tempfile.mkdtemp()
643

    
644
  def tearDown(self):
645
    shutil.rmtree(self.path)
646

    
647
  def _test(self, files, expected):
648
    # Sort a copy
649
    expected = expected[:]
650
    expected.sort()
651

    
652
    for name in files:
653
      f = open(os.path.join(self.path, name), 'w')
654
      try:
655
        f.write("Test\n")
656
      finally:
657
        f.close()
658

    
659
    found = ListVisibleFiles(self.path)
660
    found.sort()
661

    
662
    self.assertEqual(found, expected)
663

    
664
  def testAllVisible(self):
665
    files = ["a", "b", "c"]
666
    expected = files
667
    self._test(files, expected)
668

    
669
  def testNoneVisible(self):
670
    files = [".a", ".b", ".c"]
671
    expected = []
672
    self._test(files, expected)
673

    
674
  def testSomeVisible(self):
675
    files = ["a", "b", ".c"]
676
    expected = ["a", "b"]
677
    self._test(files, expected)
678

    
679

    
680
class TestNewUUID(unittest.TestCase):
681
  """Test case for NewUUID"""
682

    
683
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
684
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
685

    
686
  def runTest(self):
687
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
688

    
689

    
690
class TestUniqueSequence(unittest.TestCase):
691
  """Test case for UniqueSequence"""
692

    
693
  def _test(self, input, expected):
694
    self.assertEqual(utils.UniqueSequence(input), expected)
695

    
696
  def runTest(self):
697
    # Ordered input
698
    self._test([1, 2, 3], [1, 2, 3])
699
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
700
    self._test([1, 2, 2, 3], [1, 2, 3])
701
    self._test([1, 2, 3, 3], [1, 2, 3])
702

    
703
    # Unordered input
704
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
705
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
706

    
707
    # Strings
708
    self._test(["a", "a"], ["a"])
709
    self._test(["a", "b"], ["a", "b"])
710
    self._test(["a", "b", "a"], ["a", "b"])
711

    
712

    
713
class TestFirstFree(unittest.TestCase):
714
  """Test case for the FirstFree function"""
715

    
716
  def test(self):
717
    """Test FirstFree"""
718
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
719
    self.failUnlessEqual(FirstFree([]), None)
720
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
721
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
722
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
723

    
724

    
725
class TestFileLock(unittest.TestCase):
726
  """Test case for the FileLock class"""
727

    
728
  def setUp(self):
729
    self.tmpfile = tempfile.NamedTemporaryFile()
730
    self.lock = utils.FileLock(self.tmpfile.name)
731

    
732
  def testSharedNonblocking(self):
733
    self.lock.Shared(blocking=False)
734
    self.lock.Close()
735

    
736
  def testExclusiveNonblocking(self):
737
    self.lock.Exclusive(blocking=False)
738
    self.lock.Close()
739

    
740
  def testUnlockNonblocking(self):
741
    self.lock.Unlock(blocking=False)
742
    self.lock.Close()
743

    
744
  def testSharedBlocking(self):
745
    self.lock.Shared(blocking=True)
746
    self.lock.Close()
747

    
748
  def testExclusiveBlocking(self):
749
    self.lock.Exclusive(blocking=True)
750
    self.lock.Close()
751

    
752
  def testUnlockBlocking(self):
753
    self.lock.Unlock(blocking=True)
754
    self.lock.Close()
755

    
756
  def testSharedExclusiveUnlock(self):
757
    self.lock.Shared(blocking=False)
758
    self.lock.Exclusive(blocking=False)
759
    self.lock.Unlock(blocking=False)
760
    self.lock.Close()
761

    
762
  def testExclusiveSharedUnlock(self):
763
    self.lock.Exclusive(blocking=False)
764
    self.lock.Shared(blocking=False)
765
    self.lock.Unlock(blocking=False)
766
    self.lock.Close()
767

    
768
  def testCloseShared(self):
769
    self.lock.Close()
770
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
771

    
772
  def testCloseExclusive(self):
773
    self.lock.Close()
774
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
775

    
776
  def testCloseUnlock(self):
777
    self.lock.Close()
778
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
779

    
780

    
781
class TestTimeFunctions(unittest.TestCase):
782
  """Test case for time functions"""
783

    
784
  def runTest(self):
785
    self.assertEqual(utils.SplitTime(1), (1, 0))
786
    self.assertEqual(utils.SplitTime(1.5), (1, 500))
787
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 481))
788
    self.assertEqual(utils.SplitTime(123.48012), (123, 480))
789
    self.assertEqual(utils.SplitTime(123.9995), (124, 0))
790
    self.assertEqual(utils.SplitTime(123.999999999), (124, 0))
791

    
792
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
793
    self.assertEqual(utils.MergeTime((1, 500)), 1.5)
794
    self.assertEqual(utils.MergeTime((1218448917, 500)), 1218448917.5)
795

    
796
    self.assertEqual(round(utils.MergeTime((1218448917, 481)), 3), 1218448917.481)
797
    self.assertEqual(round(utils.MergeTime((1, 801)), 3), 1.801)
798

    
799
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
800
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000))
801
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999))
802
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
803
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
804

    
805

    
806
if __name__ == '__main__':
807
  unittest.main()