Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ d4fa5c23

History | View | Annotate | Download (20.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35

    
36
import ganeti
37
import testutils
38
from ganeti import constants
39
from ganeti import utils
40
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
41
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
42
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
43
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
44
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree
45
from ganeti.errors import LockError, UnitParseError
46

    
47
def _ChildHandler(signal, stack):
48
  global _ChildFlag
49
  _ChildFlag = True
50

    
51
class TestIsProcessAlive(unittest.TestCase):
52
  """Testing case for IsProcessAlive"""
53

    
54
  def setUp(self):
55
    global _ChildFlag
56
    # create a (most probably) non-existing process-id
57
    self.pid_non_existing = os.fork()
58
    if self.pid_non_existing == 0:
59
      os._exit(0)
60
    elif self.pid_non_existing > 0:
61
      os.waitpid(self.pid_non_existing, 0)
62
    else:
63
      raise SystemError("can't fork")
64
    _ChildFlag = False
65
    # Use _ChildHandler for SIGCHLD
66
    self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
67
    # create a zombie
68
    self.pid_zombie = os.fork()
69
    if self.pid_zombie == 0:
70
      os._exit(0)
71
    elif self.pid_zombie < 0:
72
      raise SystemError("can't fork")
73

    
74
  def tearDown(self):
75
    signal.signal(signal.SIGCHLD, self.chldOrig)
76

    
77
  def testExists(self):
78
    mypid = os.getpid()
79
    self.assert_(IsProcessAlive(mypid),
80
                 "can't find myself running")
81

    
82
  def testZombie(self):
83
    global _ChildFlag
84
    timeout = 10
85

    
86
    while not _ChildFlag:
87
      if timeout >= 0:
88
        time.sleep(0.2)
89
        timeout -= 0.2
90
      else:
91
        self.fail("timed out waiting for child's signal")
92
        break # not executed...
93

    
94
    self.assert_(not IsProcessAlive(self.pid_zombie),
95
                 "zombie not detected as zombie")
96

    
97
  def testNotExisting(self):
98
    self.assert_(not IsProcessAlive(self.pid_non_existing),
99
                 "noexisting process detected")
100

    
101

    
102
class TestRunCmd(unittest.TestCase):
103
  """Testing case for the RunCmd function"""
104

    
105
  def setUp(self):
106
    self.magic = time.ctime() + " ganeti test"
107

    
108
  def testOk(self):
109
    """Test successful exit code"""
110
    result = RunCmd("/bin/sh -c 'exit 0'")
111
    self.assertEqual(result.exit_code, 0)
112

    
113
  def testFail(self):
114
    """Test fail exit code"""
115
    result = RunCmd("/bin/sh -c 'exit 1'")
116
    self.assertEqual(result.exit_code, 1)
117

    
118

    
119
  def testStdout(self):
120
    """Test standard output"""
121
    cmd = 'echo -n "%s"' % self.magic
122
    result = RunCmd("/bin/sh -c '%s'" % cmd)
123
    self.assertEqual(result.stdout, self.magic)
124

    
125

    
126
  def testStderr(self):
127
    """Test standard error"""
128
    cmd = 'echo -n "%s"' % self.magic
129
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
130
    self.assertEqual(result.stderr, self.magic)
131

    
132

    
133
  def testCombined(self):
134
    """Test combined output"""
135
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
136
    result = RunCmd("/bin/sh -c '%s'" % cmd)
137
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
138

    
139
  def testSignal(self):
140
    """Test signal"""
141
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
142
    self.assertEqual(result.signal, 15)
143

    
144
  def testListRun(self):
145
    """Test list runs"""
146
    result = RunCmd(["true"])
147
    self.assertEqual(result.signal, None)
148
    self.assertEqual(result.exit_code, 0)
149
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
150
    self.assertEqual(result.signal, None)
151
    self.assertEqual(result.exit_code, 1)
152
    result = RunCmd(["echo", "-n", self.magic])
153
    self.assertEqual(result.signal, None)
154
    self.assertEqual(result.exit_code, 0)
155
    self.assertEqual(result.stdout, self.magic)
156

    
157
  def testLang(self):
158
    """Test locale environment"""
159
    old_env = os.environ.copy()
160
    try:
161
      os.environ["LANG"] = "en_US.UTF-8"
162
      os.environ["LC_ALL"] = "en_US.UTF-8"
163
      result = RunCmd(["locale"])
164
      for line in result.output.splitlines():
165
        key, value = line.split("=", 1)
166
        # Ignore these variables, they're overridden by LC_ALL
167
        if key == "LANG" or key == "LANGUAGE":
168
          continue
169
        self.failIf(value and value != "C" and value != '"C"',
170
            "Variable %s is set to the invalid value '%s'" % (key, value))
171
    finally:
172
      os.environ = old_env
173

    
174

    
175
class TestRemoveFile(unittest.TestCase):
176
  """Test case for the RemoveFile function"""
177

    
178
  def setUp(self):
179
    """Create a temp dir and file for each case"""
180
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
181
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
182
    os.close(fd)
183

    
184
  def tearDown(self):
185
    if os.path.exists(self.tmpfile):
186
      os.unlink(self.tmpfile)
187
    os.rmdir(self.tmpdir)
188

    
189

    
190
  def testIgnoreDirs(self):
191
    """Test that RemoveFile() ignores directories"""
192
    self.assertEqual(None, RemoveFile(self.tmpdir))
193

    
194

    
195
  def testIgnoreNotExisting(self):
196
    """Test that RemoveFile() ignores non-existing files"""
197
    RemoveFile(self.tmpfile)
198
    RemoveFile(self.tmpfile)
199

    
200

    
201
  def testRemoveFile(self):
202
    """Test that RemoveFile does remove a file"""
203
    RemoveFile(self.tmpfile)
204
    if os.path.exists(self.tmpfile):
205
      self.fail("File '%s' not removed" % self.tmpfile)
206

    
207

    
208
  def testRemoveSymlink(self):
209
    """Test that RemoveFile does remove symlinks"""
210
    symlink = self.tmpdir + "/symlink"
211
    os.symlink("no-such-file", symlink)
212
    RemoveFile(symlink)
213
    if os.path.exists(symlink):
214
      self.fail("File '%s' not removed" % symlink)
215
    os.symlink(self.tmpfile, symlink)
216
    RemoveFile(symlink)
217
    if os.path.exists(symlink):
218
      self.fail("File '%s' not removed" % symlink)
219

    
220

    
221
class TestCheckdict(unittest.TestCase):
222
  """Test case for the CheckDict function"""
223

    
224
  def testAdd(self):
225
    """Test that CheckDict adds a missing key with the correct value"""
226

    
227
    tgt = {'a':1}
228
    tmpl = {'b': 2}
229
    CheckDict(tgt, tmpl)
230
    if 'b' not in tgt or tgt['b'] != 2:
231
      self.fail("Failed to update dict")
232

    
233

    
234
  def testNoUpdate(self):
235
    """Test that CheckDict does not overwrite an existing key"""
236
    tgt = {'a':1, 'b': 3}
237
    tmpl = {'b': 2}
238
    CheckDict(tgt, tmpl)
239
    self.failUnlessEqual(tgt['b'], 3)
240

    
241

    
242
class TestMatchNameComponent(unittest.TestCase):
243
  """Test case for the MatchNameComponent function"""
244

    
245
  def testEmptyList(self):
246
    """Test that there is no match against an empty list"""
247

    
248
    self.failUnlessEqual(MatchNameComponent("", []), None)
249
    self.failUnlessEqual(MatchNameComponent("test", []), None)
250

    
251
  def testSingleMatch(self):
252
    """Test that a single match is performed correctly"""
253
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
254
    for key in "test2", "test2.example", "test2.example.com":
255
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
256

    
257
  def testMultipleMatches(self):
258
    """Test that a multiple match is returned as None"""
259
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
260
    for key in "test1", "test1.example":
261
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
262

    
263

    
264
class TestFormatUnit(unittest.TestCase):
265
  """Test case for the FormatUnit function"""
266

    
267
  def testMiB(self):
268
    self.assertEqual(FormatUnit(1), '1M')
269
    self.assertEqual(FormatUnit(100), '100M')
270
    self.assertEqual(FormatUnit(1023), '1023M')
271

    
272
  def testGiB(self):
273
    self.assertEqual(FormatUnit(1024), '1.0G')
274
    self.assertEqual(FormatUnit(1536), '1.5G')
275
    self.assertEqual(FormatUnit(17133), '16.7G')
276
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
277

    
278
  def testTiB(self):
279
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
280
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
281
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
282

    
283

    
284
class TestParseUnit(unittest.TestCase):
285
  """Test case for the ParseUnit function"""
286

    
287
  SCALES = (('', 1),
288
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
289
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
290
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
291

    
292
  def testRounding(self):
293
    self.assertEqual(ParseUnit('0'), 0)
294
    self.assertEqual(ParseUnit('1'), 4)
295
    self.assertEqual(ParseUnit('2'), 4)
296
    self.assertEqual(ParseUnit('3'), 4)
297

    
298
    self.assertEqual(ParseUnit('124'), 124)
299
    self.assertEqual(ParseUnit('125'), 128)
300
    self.assertEqual(ParseUnit('126'), 128)
301
    self.assertEqual(ParseUnit('127'), 128)
302
    self.assertEqual(ParseUnit('128'), 128)
303
    self.assertEqual(ParseUnit('129'), 132)
304
    self.assertEqual(ParseUnit('130'), 132)
305

    
306
  def testFloating(self):
307
    self.assertEqual(ParseUnit('0'), 0)
308
    self.assertEqual(ParseUnit('0.5'), 4)
309
    self.assertEqual(ParseUnit('1.75'), 4)
310
    self.assertEqual(ParseUnit('1.99'), 4)
311
    self.assertEqual(ParseUnit('2.00'), 4)
312
    self.assertEqual(ParseUnit('2.01'), 4)
313
    self.assertEqual(ParseUnit('3.99'), 4)
314
    self.assertEqual(ParseUnit('4.00'), 4)
315
    self.assertEqual(ParseUnit('4.01'), 8)
316
    self.assertEqual(ParseUnit('1.5G'), 1536)
317
    self.assertEqual(ParseUnit('1.8G'), 1844)
318
    self.assertEqual(ParseUnit('8.28T'), 8682212)
319

    
320
  def testSuffixes(self):
321
    for sep in ('', ' ', '   ', "\t", "\t "):
322
      for suffix, scale in TestParseUnit.SCALES:
323
        for func in (lambda x: x, str.lower, str.upper):
324
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
325
                           1024 * scale)
326

    
327
  def testInvalidInput(self):
328
    for sep in ('-', '_', ',', 'a'):
329
      for suffix, _ in TestParseUnit.SCALES:
330
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
331

    
332
    for suffix, _ in TestParseUnit.SCALES:
333
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
334

    
335

    
336
class TestSshKeys(testutils.GanetiTestCase):
337
  """Test case for the AddAuthorizedKey function"""
338

    
339
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
340
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
341
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
342

    
343
  def setUp(self):
344
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
345
    try:
346
      handle = os.fdopen(fd, 'w')
347
      try:
348
        handle.write("%s\n" % TestSshKeys.KEY_A)
349
        handle.write("%s\n" % TestSshKeys.KEY_B)
350
      finally:
351
        handle.close()
352
    except:
353
      utils.RemoveFile(self.tmpname)
354
      raise
355

    
356
  def tearDown(self):
357
    utils.RemoveFile(self.tmpname)
358
    del self.tmpname
359

    
360
  def testAddingNewKey(self):
361
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
362

    
363
    self.assertFileContent(self.tmpname,
364
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
365
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
366
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
367
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
368

    
369
  def testAddingAlmostButNotCompletelyTheSameKey(self):
370
    AddAuthorizedKey(self.tmpname,
371
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
372

    
373
    self.assertFileContent(self.tmpname,
374
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
375
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
376
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
377
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
378

    
379
  def testAddingExistingKeyWithSomeMoreSpaces(self):
380
    AddAuthorizedKey(self.tmpname,
381
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
382

    
383
    self.assertFileContent(self.tmpname,
384
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
385
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
386
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
387

    
388
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
389
    RemoveAuthorizedKey(self.tmpname,
390
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
391

    
392
    self.assertFileContent(self.tmpname,
393
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
394
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
395

    
396
  def testRemovingNonExistingKey(self):
397
    RemoveAuthorizedKey(self.tmpname,
398
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
399

    
400
    self.assertFileContent(self.tmpname,
401
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
402
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
403
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
404

    
405

    
406
class TestEtcHosts(testutils.GanetiTestCase):
407
  """Test functions modifying /etc/hosts"""
408

    
409
  def setUp(self):
410
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
411
    try:
412
      handle = os.fdopen(fd, 'w')
413
      try:
414
        handle.write('# This is a test file for /etc/hosts\n')
415
        handle.write('127.0.0.1\tlocalhost\n')
416
        handle.write('192.168.1.1 router gw\n')
417
      finally:
418
        handle.close()
419
    except:
420
      utils.RemoveFile(self.tmpname)
421
      raise
422

    
423
  def tearDown(self):
424
    utils.RemoveFile(self.tmpname)
425
    del self.tmpname
426

    
427
  def testSettingNewIp(self):
428
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
429

    
430
    self.assertFileContent(self.tmpname,
431
      "# This is a test file for /etc/hosts\n"
432
      "127.0.0.1\tlocalhost\n"
433
      "192.168.1.1 router gw\n"
434
      "1.2.3.4\tmyhost.domain.tld myhost\n")
435

    
436
  def testSettingExistingIp(self):
437
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
438
                     ['myhost'])
439

    
440
    self.assertFileContent(self.tmpname,
441
      "# This is a test file for /etc/hosts\n"
442
      "127.0.0.1\tlocalhost\n"
443
      "192.168.1.1\tmyhost.domain.tld myhost\n")
444

    
445
  def testSettingDuplicateName(self):
446
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
447

    
448
    self.assertFileContent(self.tmpname,
449
      "# This is a test file for /etc/hosts\n"
450
      "127.0.0.1\tlocalhost\n"
451
      "192.168.1.1 router gw\n"
452
      "1.2.3.4\tmyhost\n")
453

    
454
  def testRemovingExistingHost(self):
455
    RemoveEtcHostsEntry(self.tmpname, 'router')
456

    
457
    self.assertFileContent(self.tmpname,
458
      "# This is a test file for /etc/hosts\n"
459
      "127.0.0.1\tlocalhost\n"
460
      "192.168.1.1 gw\n")
461

    
462
  def testRemovingSingleExistingHost(self):
463
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
464

    
465
    self.assertFileContent(self.tmpname,
466
      "# This is a test file for /etc/hosts\n"
467
      "192.168.1.1 router gw\n")
468

    
469
  def testRemovingNonExistingHost(self):
470
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
471

    
472
    self.assertFileContent(self.tmpname,
473
      "# This is a test file for /etc/hosts\n"
474
      "127.0.0.1\tlocalhost\n"
475
      "192.168.1.1 router gw\n")
476

    
477
  def testRemovingAlias(self):
478
    RemoveEtcHostsEntry(self.tmpname, 'gw')
479

    
480
    self.assertFileContent(self.tmpname,
481
      "# This is a test file for /etc/hosts\n"
482
      "127.0.0.1\tlocalhost\n"
483
      "192.168.1.1 router\n")
484

    
485

    
486
class TestShellQuoting(unittest.TestCase):
487
  """Test case for shell quoting functions"""
488

    
489
  def testShellQuote(self):
490
    self.assertEqual(ShellQuote('abc'), "abc")
491
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
492
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
493
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
494
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
495

    
496
  def testShellQuoteArgs(self):
497
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
498
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
499
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
500

    
501

    
502
class TestTcpPing(unittest.TestCase):
503
  """Testcase for TCP version of ping - against listen(2)ing port"""
504

    
505
  def setUp(self):
506
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
507
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
508
    self.listenerport = self.listener.getsockname()[1]
509
    self.listener.listen(1)
510

    
511
  def tearDown(self):
512
    self.listener.shutdown(socket.SHUT_RDWR)
513
    del self.listener
514
    del self.listenerport
515

    
516
  def testTcpPingToLocalHostAccept(self):
517
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
518
                         self.listenerport,
519
                         timeout=10,
520
                         live_port_needed=True,
521
                         source=constants.LOCALHOST_IP_ADDRESS,
522
                         ),
523
                 "failed to connect to test listener")
524

    
525
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
526
                         self.listenerport,
527
                         timeout=10,
528
                         live_port_needed=True,
529
                         ),
530
                 "failed to connect to test listener (no source)")
531

    
532

    
533
class TestTcpPingDeaf(unittest.TestCase):
534
  """Testcase for TCP version of ping - against non listen(2)ing port"""
535

    
536
  def setUp(self):
537
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
538
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
539
    self.deaflistenerport = self.deaflistener.getsockname()[1]
540

    
541
  def tearDown(self):
542
    del self.deaflistener
543
    del self.deaflistenerport
544

    
545
  def testTcpPingToLocalHostAcceptDeaf(self):
546
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
547
                        self.deaflistenerport,
548
                        timeout=constants.TCP_PING_TIMEOUT,
549
                        live_port_needed=True,
550
                        source=constants.LOCALHOST_IP_ADDRESS,
551
                        ), # need successful connect(2)
552
                "successfully connected to deaf listener")
553

    
554
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
555
                        self.deaflistenerport,
556
                        timeout=constants.TCP_PING_TIMEOUT,
557
                        live_port_needed=True,
558
                        ), # need successful connect(2)
559
                "successfully connected to deaf listener (no source addr)")
560

    
561
  def testTcpPingToLocalHostNoAccept(self):
562
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
563
                         self.deaflistenerport,
564
                         timeout=constants.TCP_PING_TIMEOUT,
565
                         live_port_needed=False,
566
                         source=constants.LOCALHOST_IP_ADDRESS,
567
                         ), # ECONNREFUSED is OK
568
                 "failed to ping alive host on deaf port")
569

    
570
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
571
                         self.deaflistenerport,
572
                         timeout=constants.TCP_PING_TIMEOUT,
573
                         live_port_needed=False,
574
                         ), # ECONNREFUSED is OK
575
                 "failed to ping alive host on deaf port (no source addr)")
576

    
577

    
578
class TestListVisibleFiles(unittest.TestCase):
579
  """Test case for ListVisibleFiles"""
580

    
581
  def setUp(self):
582
    self.path = tempfile.mkdtemp()
583

    
584
  def tearDown(self):
585
    shutil.rmtree(self.path)
586

    
587
  def _test(self, files, expected):
588
    # Sort a copy
589
    expected = expected[:]
590
    expected.sort()
591

    
592
    for name in files:
593
      f = open(os.path.join(self.path, name), 'w')
594
      try:
595
        f.write("Test\n")
596
      finally:
597
        f.close()
598

    
599
    found = ListVisibleFiles(self.path)
600
    found.sort()
601

    
602
    self.assertEqual(found, expected)
603

    
604
  def testAllVisible(self):
605
    files = ["a", "b", "c"]
606
    expected = files
607
    self._test(files, expected)
608

    
609
  def testNoneVisible(self):
610
    files = [".a", ".b", ".c"]
611
    expected = []
612
    self._test(files, expected)
613

    
614
  def testSomeVisible(self):
615
    files = ["a", "b", ".c"]
616
    expected = ["a", "b"]
617
    self._test(files, expected)
618

    
619

    
620
class TestNewUUID(unittest.TestCase):
621
  """Test case for NewUUID"""
622

    
623
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
624
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
625

    
626
  def runTest(self):
627
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
628

    
629

    
630
class TestUniqueSequence(unittest.TestCase):
631
  """Test case for UniqueSequence"""
632

    
633
  def _test(self, input, expected):
634
    self.assertEqual(utils.UniqueSequence(input), expected)
635

    
636
  def runTest(self):
637
    # Ordered input
638
    self._test([1, 2, 3], [1, 2, 3])
639
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
640
    self._test([1, 2, 2, 3], [1, 2, 3])
641
    self._test([1, 2, 3, 3], [1, 2, 3])
642

    
643
    # Unordered input
644
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
645
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
646

    
647
    # Strings
648
    self._test(["a", "a"], ["a"])
649
    self._test(["a", "b"], ["a", "b"])
650
    self._test(["a", "b", "a"], ["a", "b"])
651

    
652
class TestFirstFree(unittest.TestCase):
653
  """Test case for the FirstFree function"""
654

    
655
  def test(self):
656
    """Test FirstFree"""
657
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
658
    self.failUnlessEqual(FirstFree([]), None)
659
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
660
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
661
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
662

    
663
if __name__ == '__main__':
664
  unittest.main()