Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ f0990e0c

History | View | Annotate | Download (19.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31
import shutil
32
import re
33

    
34
import ganeti
35
from ganeti import constants
36
from ganeti import utils
37
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
38
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
39
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
40
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
41
     SetEtcHostsEntry, RemoveEtcHostsEntry
42
from ganeti.errors import LockError, UnitParseError
43

    
44

    
45
class GanetiTestCase(unittest.TestCase):
46
  def assertFileContent(self, file_name, content):
47
    """Checks the content of a file.
48

49
    """
50
    handle = open(file_name, 'r')
51
    try:
52
      self.assertEqual(handle.read(), content)
53
    finally:
54
      handle.close()
55

    
56

    
57
class TestIsProcessAlive(unittest.TestCase):
58
  """Testing case for IsProcessAlive"""
59
  def setUp(self):
60
    # create a zombie and a (hopefully) non-existing process id
61
    self.pid_zombie = os.fork()
62
    if self.pid_zombie == 0:
63
      os._exit(0)
64
    elif self.pid_zombie < 0:
65
      raise SystemError("can't fork")
66
    self.pid_non_existing = os.fork()
67
    if self.pid_non_existing == 0:
68
      os._exit(0)
69
    elif self.pid_non_existing > 0:
70
      os.waitpid(self.pid_non_existing, 0)
71
    else:
72
      raise SystemError("can't fork")
73

    
74

    
75
  def testExists(self):
76
    mypid = os.getpid()
77
    self.assert_(IsProcessAlive(mypid),
78
                 "can't find myself running")
79

    
80
  def testZombie(self):
81
    self.assert_(not IsProcessAlive(self.pid_zombie),
82
                 "zombie not detected as zombie")
83

    
84

    
85
  def testNotExisting(self):
86
    self.assert_(not IsProcessAlive(self.pid_non_existing),
87
                 "noexisting process detected")
88

    
89

    
90
class TestLocking(unittest.TestCase):
91
  """Testing case for the Lock/Unlock functions"""
92
  def clean_lock(self, name):
93
    try:
94
      ganeti.utils.Unlock("unittest")
95
    except LockError:
96
      pass
97

    
98

    
99
  def testLock(self):
100
    self.clean_lock("unittest")
101
    self.assertEqual(None, Lock("unittest"))
102

    
103

    
104
  def testUnlock(self):
105
    self.clean_lock("unittest")
106
    ganeti.utils.Lock("unittest")
107
    self.assertEqual(None, Unlock("unittest"))
108

    
109

    
110
  def testDoubleLock(self):
111
    self.clean_lock("unittest")
112
    ganeti.utils.Lock("unittest")
113
    self.assertRaises(LockError, Lock, "unittest")
114

    
115

    
116
class TestRunCmd(unittest.TestCase):
117
  """Testing case for the RunCmd function"""
118

    
119
  def setUp(self):
120
    self.magic = time.ctime() + " ganeti test"
121

    
122
  def testOk(self):
123
    """Test successful exit code"""
124
    result = RunCmd("/bin/sh -c 'exit 0'")
125
    self.assertEqual(result.exit_code, 0)
126

    
127
  def testFail(self):
128
    """Test fail exit code"""
129
    result = RunCmd("/bin/sh -c 'exit 1'")
130
    self.assertEqual(result.exit_code, 1)
131

    
132

    
133
  def testStdout(self):
134
    """Test standard output"""
135
    cmd = 'echo -n "%s"' % self.magic
136
    result = RunCmd("/bin/sh -c '%s'" % cmd)
137
    self.assertEqual(result.stdout, self.magic)
138

    
139

    
140
  def testStderr(self):
141
    """Test standard error"""
142
    cmd = 'echo -n "%s"' % self.magic
143
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
144
    self.assertEqual(result.stderr, self.magic)
145

    
146

    
147
  def testCombined(self):
148
    """Test combined output"""
149
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
150
    result = RunCmd("/bin/sh -c '%s'" % cmd)
151
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
152

    
153
  def testSignal(self):
154
    """Test standard error"""
155
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
156
    self.assertEqual(result.signal, 15)
157

    
158
  def testListRun(self):
159
    """Test list runs"""
160
    result = RunCmd(["true"])
161
    self.assertEqual(result.signal, None)
162
    self.assertEqual(result.exit_code, 0)
163
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
164
    self.assertEqual(result.signal, None)
165
    self.assertEqual(result.exit_code, 1)
166
    result = RunCmd(["echo", "-n", self.magic])
167
    self.assertEqual(result.signal, None)
168
    self.assertEqual(result.exit_code, 0)
169
    self.assertEqual(result.stdout, self.magic)
170

    
171
  def testLang(self):
172
    """Test locale environment"""
173
    old_env = os.environ.copy()
174
    try:
175
      os.environ["LANG"] = "en_US.UTF-8"
176
      os.environ["LC_ALL"] = "en_US.UTF-8"
177
      result = RunCmd(["locale"])
178
      for line in result.output.splitlines():
179
        key, value = line.split("=", 1)
180
        # Ignore these variables, they're overridden by LC_ALL
181
        if key == "LANG" or key == "LANGUAGE":
182
          continue
183
        self.failIf(value and value != "C" and value != '"C"',
184
            "Variable %s is set to the invalid value '%s'" % (key, value))
185
    finally:
186
      os.environ = old_env
187

    
188

    
189
class TestRemoveFile(unittest.TestCase):
190
  """Test case for the RemoveFile function"""
191

    
192
  def setUp(self):
193
    """Create a temp dir and file for each case"""
194
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
195
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
196
    os.close(fd)
197

    
198
  def tearDown(self):
199
    if os.path.exists(self.tmpfile):
200
      os.unlink(self.tmpfile)
201
    os.rmdir(self.tmpdir)
202

    
203

    
204
  def testIgnoreDirs(self):
205
    """Test that RemoveFile() ignores directories"""
206
    self.assertEqual(None, RemoveFile(self.tmpdir))
207

    
208

    
209
  def testIgnoreNotExisting(self):
210
    """Test that RemoveFile() ignores non-existing files"""
211
    RemoveFile(self.tmpfile)
212
    RemoveFile(self.tmpfile)
213

    
214

    
215
  def testRemoveFile(self):
216
    """Test that RemoveFile does remove a file"""
217
    RemoveFile(self.tmpfile)
218
    if os.path.exists(self.tmpfile):
219
      self.fail("File '%s' not removed" % self.tmpfile)
220

    
221

    
222
  def testRemoveSymlink(self):
223
    """Test that RemoveFile does remove symlinks"""
224
    symlink = self.tmpdir + "/symlink"
225
    os.symlink("no-such-file", symlink)
226
    RemoveFile(symlink)
227
    if os.path.exists(symlink):
228
      self.fail("File '%s' not removed" % symlink)
229
    os.symlink(self.tmpfile, symlink)
230
    RemoveFile(symlink)
231
    if os.path.exists(symlink):
232
      self.fail("File '%s' not removed" % symlink)
233

    
234

    
235
class TestCheckdict(unittest.TestCase):
236
  """Test case for the CheckDict function"""
237

    
238
  def testAdd(self):
239
    """Test that CheckDict adds a missing key with the correct value"""
240

    
241
    tgt = {'a':1}
242
    tmpl = {'b': 2}
243
    CheckDict(tgt, tmpl)
244
    if 'b' not in tgt or tgt['b'] != 2:
245
      self.fail("Failed to update dict")
246

    
247

    
248
  def testNoUpdate(self):
249
    """Test that CheckDict does not overwrite an existing key"""
250
    tgt = {'a':1, 'b': 3}
251
    tmpl = {'b': 2}
252
    CheckDict(tgt, tmpl)
253
    self.failUnlessEqual(tgt['b'], 3)
254

    
255

    
256
class TestMatchNameComponent(unittest.TestCase):
257
  """Test case for the MatchNameComponent function"""
258

    
259
  def testEmptyList(self):
260
    """Test that there is no match against an empty list"""
261

    
262
    self.failUnlessEqual(MatchNameComponent("", []), None)
263
    self.failUnlessEqual(MatchNameComponent("test", []), None)
264

    
265
  def testSingleMatch(self):
266
    """Test that a single match is performed correctly"""
267
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
268
    for key in "test2", "test2.example", "test2.example.com":
269
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
270

    
271
  def testMultipleMatches(self):
272
    """Test that a multiple match is returned as None"""
273
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
274
    for key in "test1", "test1.example":
275
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
276

    
277

    
278
class TestFormatUnit(unittest.TestCase):
279
  """Test case for the FormatUnit function"""
280

    
281
  def testMiB(self):
282
    self.assertEqual(FormatUnit(1), '1M')
283
    self.assertEqual(FormatUnit(100), '100M')
284
    self.assertEqual(FormatUnit(1023), '1023M')
285

    
286
  def testGiB(self):
287
    self.assertEqual(FormatUnit(1024), '1.0G')
288
    self.assertEqual(FormatUnit(1536), '1.5G')
289
    self.assertEqual(FormatUnit(17133), '16.7G')
290
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
291

    
292
  def testTiB(self):
293
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
294
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
295
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
296

    
297

    
298
class TestParseUnit(unittest.TestCase):
299
  """Test case for the ParseUnit function"""
300

    
301
  SCALES = (('', 1),
302
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
303
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
304
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
305

    
306
  def testRounding(self):
307
    self.assertEqual(ParseUnit('0'), 0)
308
    self.assertEqual(ParseUnit('1'), 4)
309
    self.assertEqual(ParseUnit('2'), 4)
310
    self.assertEqual(ParseUnit('3'), 4)
311

    
312
    self.assertEqual(ParseUnit('124'), 124)
313
    self.assertEqual(ParseUnit('125'), 128)
314
    self.assertEqual(ParseUnit('126'), 128)
315
    self.assertEqual(ParseUnit('127'), 128)
316
    self.assertEqual(ParseUnit('128'), 128)
317
    self.assertEqual(ParseUnit('129'), 132)
318
    self.assertEqual(ParseUnit('130'), 132)
319

    
320
  def testFloating(self):
321
    self.assertEqual(ParseUnit('0'), 0)
322
    self.assertEqual(ParseUnit('0.5'), 4)
323
    self.assertEqual(ParseUnit('1.75'), 4)
324
    self.assertEqual(ParseUnit('1.99'), 4)
325
    self.assertEqual(ParseUnit('2.00'), 4)
326
    self.assertEqual(ParseUnit('2.01'), 4)
327
    self.assertEqual(ParseUnit('3.99'), 4)
328
    self.assertEqual(ParseUnit('4.00'), 4)
329
    self.assertEqual(ParseUnit('4.01'), 8)
330
    self.assertEqual(ParseUnit('1.5G'), 1536)
331
    self.assertEqual(ParseUnit('1.8G'), 1844)
332
    self.assertEqual(ParseUnit('8.28T'), 8682212)
333

    
334
  def testSuffixes(self):
335
    for sep in ('', ' ', '   ', "\t", "\t "):
336
      for suffix, scale in TestParseUnit.SCALES:
337
        for func in (lambda x: x, str.lower, str.upper):
338
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
339

    
340
  def testInvalidInput(self):
341
    for sep in ('-', '_', ',', 'a'):
342
      for suffix, _ in TestParseUnit.SCALES:
343
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
344

    
345
    for suffix, _ in TestParseUnit.SCALES:
346
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
347

    
348

    
349
class TestSshKeys(GanetiTestCase):
350
  """Test case for the AddAuthorizedKey function"""
351

    
352
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
353
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
354
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
355

    
356
  def writeTestFile(self):
357
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
358
    f = os.fdopen(fd, 'w')
359
    try:
360
      f.write(TestSshKeys.KEY_A)
361
      f.write("\n")
362
      f.write(TestSshKeys.KEY_B)
363
      f.write("\n")
364
    finally:
365
      f.close()
366

    
367
    return tmpname
368

    
369
  def testAddingNewKey(self):
370
    tmpname = self.writeTestFile()
371
    try:
372
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
373

    
374
      self.assertFileContent(tmpname,
375
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
376
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
377
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
378
        "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
379
    finally:
380
      os.unlink(tmpname)
381

    
382
  def testAddingAlmostButNotCompletlyTheSameKey(self):
383
    tmpname = self.writeTestFile()
384
    try:
385
      AddAuthorizedKey(tmpname,
386
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
387

    
388
      self.assertFileContent(tmpname,
389
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
390
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
391
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
392
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
393
    finally:
394
      os.unlink(tmpname)
395

    
396
  def testAddingExistingKeyWithSomeMoreSpaces(self):
397
    tmpname = self.writeTestFile()
398
    try:
399
      AddAuthorizedKey(tmpname,
400
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
401

    
402
      self.assertFileContent(tmpname,
403
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
404
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
405
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
406
    finally:
407
      os.unlink(tmpname)
408

    
409
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
410
    tmpname = self.writeTestFile()
411
    try:
412
      RemoveAuthorizedKey(tmpname,
413
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
414

    
415
      self.assertFileContent(tmpname,
416
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
417
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
418
    finally:
419
      os.unlink(tmpname)
420

    
421
  def testRemovingNonExistingKey(self):
422
    tmpname = self.writeTestFile()
423
    try:
424
      RemoveAuthorizedKey(tmpname,
425
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
426

    
427
      self.assertFileContent(tmpname,
428
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
429
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
430
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
431
    finally:
432
      os.unlink(tmpname)
433

    
434

    
435
class TestEtcHosts(GanetiTestCase):
436
  """Test functions modifying /etc/hosts"""
437

    
438
  def writeTestFile(self):
439
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
440
    f = os.fdopen(fd, 'w')
441
    try:
442
      f.write('# This is a test file for /etc/hosts\n')
443
      f.write('127.0.0.1\tlocalhost\n')
444
      f.write('192.168.1.1 router gw\n')
445
    finally:
446
      f.close()
447

    
448
    return tmpname
449

    
450
  def testSettingNewIp(self):
451
    tmpname = self.writeTestFile()
452
    try:
453
      SetEtcHostsEntry(tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
454

    
455
      self.assertFileContent(tmpname,
456
        "# This is a test file for /etc/hosts\n"
457
        "127.0.0.1\tlocalhost\n"
458
        "192.168.1.1 router gw\n"
459
        "1.2.3.4\tmyhost.domain.tld myhost\n")
460
    finally:
461
      os.unlink(tmpname)
462

    
463
  def testSettingExistingIp(self):
464
    tmpname = self.writeTestFile()
465
    try:
466
      SetEtcHostsEntry(tmpname, '192.168.1.1', 'myhost.domain.tld', ['myhost'])
467

    
468
      self.assertFileContent(tmpname,
469
        "# This is a test file for /etc/hosts\n"
470
        "127.0.0.1\tlocalhost\n"
471
        "192.168.1.1\tmyhost.domain.tld myhost\n")
472
    finally:
473
      os.unlink(tmpname)
474

    
475
  def testRemovingExistingHost(self):
476
    tmpname = self.writeTestFile()
477
    try:
478
      RemoveEtcHostsEntry(tmpname, 'router')
479

    
480
      self.assertFileContent(tmpname,
481
        "# This is a test file for /etc/hosts\n"
482
        "127.0.0.1\tlocalhost\n"
483
        "192.168.1.1 gw\n")
484
    finally:
485
      os.unlink(tmpname)
486

    
487
  def testRemovingSingleExistingHost(self):
488
    tmpname = self.writeTestFile()
489
    try:
490
      RemoveEtcHostsEntry(tmpname, 'localhost')
491

    
492
      self.assertFileContent(tmpname,
493
        "# This is a test file for /etc/hosts\n"
494
        "192.168.1.1 router gw\n")
495
    finally:
496
      os.unlink(tmpname)
497

    
498
  def testRemovingNonExistingHost(self):
499
    tmpname = self.writeTestFile()
500
    try:
501
      RemoveEtcHostsEntry(tmpname, 'myhost')
502

    
503
      self.assertFileContent(tmpname,
504
        "# This is a test file for /etc/hosts\n"
505
        "127.0.0.1\tlocalhost\n"
506
        "192.168.1.1 router gw\n")
507
    finally:
508
      os.unlink(tmpname)
509

    
510
  def testRemovingAlias(self):
511
    tmpname = self.writeTestFile()
512
    try:
513
      RemoveEtcHostsEntry(tmpname, 'gw')
514

    
515
      self.assertFileContent(tmpname,
516
        "# This is a test file for /etc/hosts\n"
517
        "127.0.0.1\tlocalhost\n"
518
        "192.168.1.1 router\n")
519
    finally:
520
      os.unlink(tmpname)
521

    
522

    
523
class TestShellQuoting(unittest.TestCase):
524
  """Test case for shell quoting functions"""
525

    
526
  def testShellQuote(self):
527
    self.assertEqual(ShellQuote('abc'), "abc")
528
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
529
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
530
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
531
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
532

    
533
  def testShellQuoteArgs(self):
534
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
535
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
536
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
537

    
538

    
539
class TestTcpPing(unittest.TestCase):
540
  """Testcase for TCP version of ping - against listen(2)ing port"""
541

    
542
  def setUp(self):
543
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
544
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
545
    self.listenerport = self.listener.getsockname()[1]
546
    self.listener.listen(1)
547

    
548
  def tearDown(self):
549
    self.listener.shutdown(socket.SHUT_RDWR)
550
    del self.listener
551
    del self.listenerport
552

    
553
  def testTcpPingToLocalHostAccept(self):
554
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
555
                         constants.LOCALHOST_IP_ADDRESS,
556
                         self.listenerport,
557
                         timeout=10,
558
                         live_port_needed=True),
559
                 "failed to connect to test listener")
560

    
561

    
562
class TestTcpPingDeaf(unittest.TestCase):
563
  """Testcase for TCP version of ping - against non listen(2)ing port"""
564

    
565
  def setUp(self):
566
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
567
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
568
    self.deaflistenerport = self.deaflistener.getsockname()[1]
569

    
570
  def tearDown(self):
571
    del self.deaflistener
572
    del self.deaflistenerport
573

    
574
  def testTcpPingToLocalHostAcceptDeaf(self):
575
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
576
                        constants.LOCALHOST_IP_ADDRESS,
577
                        self.deaflistenerport,
578
                        timeout=constants.TCP_PING_TIMEOUT,
579
                        live_port_needed=True), # need successful connect(2)
580
                "successfully connected to deaf listener")
581

    
582
  def testTcpPingToLocalHostNoAccept(self):
583
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
584
                         constants.LOCALHOST_IP_ADDRESS,
585
                         self.deaflistenerport,
586
                         timeout=constants.TCP_PING_TIMEOUT,
587
                         live_port_needed=False), # ECONNREFUSED is OK
588
                 "failed to ping alive host on deaf port")
589

    
590

    
591
class TestListVisibleFiles(unittest.TestCase):
592
  """Test case for ListVisibleFiles"""
593

    
594
  def setUp(self):
595
    self.path = tempfile.mkdtemp()
596

    
597
  def tearDown(self):
598
    shutil.rmtree(self.path)
599

    
600
  def _test(self, files, expected):
601
    # Sort a copy
602
    expected = expected[:]
603
    expected.sort()
604

    
605
    for name in files:
606
      f = open(os.path.join(self.path, name), 'w')
607
      try:
608
        f.write("Test\n")
609
      finally:
610
        f.close()
611

    
612
    found = ListVisibleFiles(self.path)
613
    found.sort()
614

    
615
    self.assertEqual(found, expected)
616

    
617
  def testAllVisible(self):
618
    files = ["a", "b", "c"]
619
    expected = files
620
    self._test(files, expected)
621

    
622
  def testNoneVisible(self):
623
    files = [".a", ".b", ".c"]
624
    expected = []
625
    self._test(files, expected)
626

    
627
  def testSomeVisible(self):
628
    files = ["a", "b", ".c"]
629
    expected = ["a", "b"]
630
    self._test(files, expected)
631

    
632

    
633
class TestNewUUID(unittest.TestCase):
634
  """Test case for NewUUID"""
635

    
636
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
637
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
638

    
639
  def runTest(self):
640
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
641

    
642

    
643
if __name__ == '__main__':
644
  unittest.main()