Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 667479d5

History | View | Annotate | Download (19.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31
import shutil
32
import re
33

    
34
import ganeti
35
from ganeti import constants
36
from ganeti import utils
37
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
38
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
39
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
40
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
41
     SetEtcHostsEntry, RemoveEtcHostsEntry
42
from ganeti.errors import LockError, UnitParseError
43

    
44

    
45
class GanetiTestCase(unittest.TestCase):
46
  def assertFileContent(self, file_name, content):
47
    """Checks the content of a file.
48

49
    """
50
    handle = open(file_name, 'r')
51
    try:
52
      self.assertEqual(handle.read(), content)
53
    finally:
54
      handle.close()
55

    
56

    
57
class TestIsProcessAlive(unittest.TestCase):
58
  """Testing case for IsProcessAlive"""
59
  def setUp(self):
60
    # create a zombie and a (hopefully) non-existing process id
61
    self.pid_zombie = os.fork()
62
    if self.pid_zombie == 0:
63
      os._exit(0)
64
    elif self.pid_zombie < 0:
65
      raise SystemError("can't fork")
66
    self.pid_non_existing = os.fork()
67
    if self.pid_non_existing == 0:
68
      os._exit(0)
69
    elif self.pid_non_existing > 0:
70
      os.waitpid(self.pid_non_existing, 0)
71
    else:
72
      raise SystemError("can't fork")
73

    
74

    
75
  def testExists(self):
76
    mypid = os.getpid()
77
    self.assert_(IsProcessAlive(mypid),
78
                 "can't find myself running")
79

    
80
  def testZombie(self):
81
    self.assert_(not IsProcessAlive(self.pid_zombie),
82
                 "zombie not detected as zombie")
83

    
84

    
85
  def testNotExisting(self):
86
    self.assert_(not IsProcessAlive(self.pid_non_existing),
87
                 "noexisting process detected")
88

    
89

    
90
class TestLocking(unittest.TestCase):
91
  """Testing case for the Lock/Unlock functions"""
92
  def clean_lock(self, name):
93
    try:
94
      ganeti.utils.Unlock("unittest")
95
    except LockError:
96
      pass
97

    
98

    
99
  def testLock(self):
100
    self.clean_lock("unittest")
101
    self.assertEqual(None, Lock("unittest"))
102

    
103

    
104
  def testUnlock(self):
105
    self.clean_lock("unittest")
106
    ganeti.utils.Lock("unittest")
107
    self.assertEqual(None, Unlock("unittest"))
108

    
109

    
110
  def testDoubleLock(self):
111
    self.clean_lock("unittest")
112
    ganeti.utils.Lock("unittest")
113
    self.assertRaises(LockError, Lock, "unittest")
114

    
115

    
116
class TestRunCmd(unittest.TestCase):
117
  """Testing case for the RunCmd function"""
118

    
119
  def setUp(self):
120
    self.magic = time.ctime() + " ganeti test"
121

    
122
  def testOk(self):
123
    """Test successful exit code"""
124
    result = RunCmd("/bin/sh -c 'exit 0'")
125
    self.assertEqual(result.exit_code, 0)
126

    
127
  def testFail(self):
128
    """Test fail exit code"""
129
    result = RunCmd("/bin/sh -c 'exit 1'")
130
    self.assertEqual(result.exit_code, 1)
131

    
132

    
133
  def testStdout(self):
134
    """Test standard output"""
135
    cmd = 'echo -n "%s"' % self.magic
136
    result = RunCmd("/bin/sh -c '%s'" % cmd)
137
    self.assertEqual(result.stdout, self.magic)
138

    
139

    
140
  def testStderr(self):
141
    """Test standard error"""
142
    cmd = 'echo -n "%s"' % self.magic
143
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
144
    self.assertEqual(result.stderr, self.magic)
145

    
146

    
147
  def testCombined(self):
148
    """Test combined output"""
149
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
150
    result = RunCmd("/bin/sh -c '%s'" % cmd)
151
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
152

    
153
  def testSignal(self):
154
    """Test standard error"""
155
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
156
    self.assertEqual(result.signal, 15)
157

    
158
  def testListRun(self):
159
    """Test list runs"""
160
    result = RunCmd(["true"])
161
    self.assertEqual(result.signal, None)
162
    self.assertEqual(result.exit_code, 0)
163
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
164
    self.assertEqual(result.signal, None)
165
    self.assertEqual(result.exit_code, 1)
166
    result = RunCmd(["echo", "-n", self.magic])
167
    self.assertEqual(result.signal, None)
168
    self.assertEqual(result.exit_code, 0)
169
    self.assertEqual(result.stdout, self.magic)
170

    
171
  def testLang(self):
172
    """Test locale environment"""
173
    old_env = os.environ.copy()
174
    try:
175
      os.environ["LANG"] = "en_US.UTF-8"
176
      os.environ["LC_ALL"] = "en_US.UTF-8"
177
      result = RunCmd(["locale"])
178
      for line in result.output.splitlines():
179
        key, value = line.split("=", 1)
180
        # Ignore these variables, they're overridden by LC_ALL
181
        if key == "LANG" or key == "LANGUAGE":
182
          continue
183
        self.failIf(value and value != "C" and value != '"C"',
184
            "Variable %s is set to the invalid value '%s'" % (key, value))
185
    finally:
186
      os.environ = old_env
187

    
188

    
189
class TestRemoveFile(unittest.TestCase):
190
  """Test case for the RemoveFile function"""
191

    
192
  def setUp(self):
193
    """Create a temp dir and file for each case"""
194
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
195
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
196
    os.close(fd)
197

    
198
  def tearDown(self):
199
    if os.path.exists(self.tmpfile):
200
      os.unlink(self.tmpfile)
201
    os.rmdir(self.tmpdir)
202

    
203

    
204
  def testIgnoreDirs(self):
205
    """Test that RemoveFile() ignores directories"""
206
    self.assertEqual(None, RemoveFile(self.tmpdir))
207

    
208

    
209
  def testIgnoreNotExisting(self):
210
    """Test that RemoveFile() ignores non-existing files"""
211
    RemoveFile(self.tmpfile)
212
    RemoveFile(self.tmpfile)
213

    
214

    
215
  def testRemoveFile(self):
216
    """Test that RemoveFile does remove a file"""
217
    RemoveFile(self.tmpfile)
218
    if os.path.exists(self.tmpfile):
219
      self.fail("File '%s' not removed" % self.tmpfile)
220

    
221

    
222
  def testRemoveSymlink(self):
223
    """Test that RemoveFile does remove symlinks"""
224
    symlink = self.tmpdir + "/symlink"
225
    os.symlink("no-such-file", symlink)
226
    RemoveFile(symlink)
227
    if os.path.exists(symlink):
228
      self.fail("File '%s' not removed" % symlink)
229
    os.symlink(self.tmpfile, symlink)
230
    RemoveFile(symlink)
231
    if os.path.exists(symlink):
232
      self.fail("File '%s' not removed" % symlink)
233

    
234

    
235
class TestCheckdict(unittest.TestCase):
236
  """Test case for the CheckDict function"""
237

    
238
  def testAdd(self):
239
    """Test that CheckDict adds a missing key with the correct value"""
240

    
241
    tgt = {'a':1}
242
    tmpl = {'b': 2}
243
    CheckDict(tgt, tmpl)
244
    if 'b' not in tgt or tgt['b'] != 2:
245
      self.fail("Failed to update dict")
246

    
247

    
248
  def testNoUpdate(self):
249
    """Test that CheckDict does not overwrite an existing key"""
250
    tgt = {'a':1, 'b': 3}
251
    tmpl = {'b': 2}
252
    CheckDict(tgt, tmpl)
253
    self.failUnlessEqual(tgt['b'], 3)
254

    
255

    
256
class TestMatchNameComponent(unittest.TestCase):
257
  """Test case for the MatchNameComponent function"""
258

    
259
  def testEmptyList(self):
260
    """Test that there is no match against an empty list"""
261

    
262
    self.failUnlessEqual(MatchNameComponent("", []), None)
263
    self.failUnlessEqual(MatchNameComponent("test", []), None)
264

    
265
  def testSingleMatch(self):
266
    """Test that a single match is performed correctly"""
267
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
268
    for key in "test2", "test2.example", "test2.example.com":
269
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
270

    
271
  def testMultipleMatches(self):
272
    """Test that a multiple match is returned as None"""
273
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
274
    for key in "test1", "test1.example":
275
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
276

    
277

    
278
class TestFormatUnit(unittest.TestCase):
279
  """Test case for the FormatUnit function"""
280

    
281
  def testMiB(self):
282
    self.assertEqual(FormatUnit(1), '1M')
283
    self.assertEqual(FormatUnit(100), '100M')
284
    self.assertEqual(FormatUnit(1023), '1023M')
285

    
286
  def testGiB(self):
287
    self.assertEqual(FormatUnit(1024), '1.0G')
288
    self.assertEqual(FormatUnit(1536), '1.5G')
289
    self.assertEqual(FormatUnit(17133), '16.7G')
290
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
291

    
292
  def testTiB(self):
293
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
294
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
295
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
296

    
297

    
298
class TestParseUnit(unittest.TestCase):
299
  """Test case for the ParseUnit function"""
300

    
301
  SCALES = (('', 1),
302
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
303
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
304
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
305

    
306
  def testRounding(self):
307
    self.assertEqual(ParseUnit('0'), 0)
308
    self.assertEqual(ParseUnit('1'), 4)
309
    self.assertEqual(ParseUnit('2'), 4)
310
    self.assertEqual(ParseUnit('3'), 4)
311

    
312
    self.assertEqual(ParseUnit('124'), 124)
313
    self.assertEqual(ParseUnit('125'), 128)
314
    self.assertEqual(ParseUnit('126'), 128)
315
    self.assertEqual(ParseUnit('127'), 128)
316
    self.assertEqual(ParseUnit('128'), 128)
317
    self.assertEqual(ParseUnit('129'), 132)
318
    self.assertEqual(ParseUnit('130'), 132)
319

    
320
  def testFloating(self):
321
    self.assertEqual(ParseUnit('0'), 0)
322
    self.assertEqual(ParseUnit('0.5'), 4)
323
    self.assertEqual(ParseUnit('1.75'), 4)
324
    self.assertEqual(ParseUnit('1.99'), 4)
325
    self.assertEqual(ParseUnit('2.00'), 4)
326
    self.assertEqual(ParseUnit('2.01'), 4)
327
    self.assertEqual(ParseUnit('3.99'), 4)
328
    self.assertEqual(ParseUnit('4.00'), 4)
329
    self.assertEqual(ParseUnit('4.01'), 8)
330
    self.assertEqual(ParseUnit('1.5G'), 1536)
331
    self.assertEqual(ParseUnit('1.8G'), 1844)
332
    self.assertEqual(ParseUnit('8.28T'), 8682212)
333

    
334
  def testSuffixes(self):
335
    for sep in ('', ' ', '   ', "\t", "\t "):
336
      for suffix, scale in TestParseUnit.SCALES:
337
        for func in (lambda x: x, str.lower, str.upper):
338
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
339
                           1024 * scale)
340

    
341
  def testInvalidInput(self):
342
    for sep in ('-', '_', ',', 'a'):
343
      for suffix, _ in TestParseUnit.SCALES:
344
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
345

    
346
    for suffix, _ in TestParseUnit.SCALES:
347
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
348

    
349

    
350
class TestSshKeys(GanetiTestCase):
351
  """Test case for the AddAuthorizedKey function"""
352

    
353
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
354
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
355
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
356

    
357
  def writeTestFile(self):
358
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
359
    f = os.fdopen(fd, 'w')
360
    try:
361
      f.write(TestSshKeys.KEY_A)
362
      f.write("\n")
363
      f.write(TestSshKeys.KEY_B)
364
      f.write("\n")
365
    finally:
366
      f.close()
367

    
368
    return tmpname
369

    
370
  def testAddingNewKey(self):
371
    tmpname = self.writeTestFile()
372
    try:
373
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
374

    
375
      self.assertFileContent(tmpname,
376
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
377
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
378
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
379
        "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
380
    finally:
381
      os.unlink(tmpname)
382

    
383
  def testAddingAlmostButNotCompletlyTheSameKey(self):
384
    tmpname = self.writeTestFile()
385
    try:
386
      AddAuthorizedKey(tmpname,
387
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
388

    
389
      self.assertFileContent(tmpname,
390
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
391
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
392
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
393
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
394
    finally:
395
      os.unlink(tmpname)
396

    
397
  def testAddingExistingKeyWithSomeMoreSpaces(self):
398
    tmpname = self.writeTestFile()
399
    try:
400
      AddAuthorizedKey(tmpname,
401
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
402

    
403
      self.assertFileContent(tmpname,
404
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
405
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
406
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
407
    finally:
408
      os.unlink(tmpname)
409

    
410
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
411
    tmpname = self.writeTestFile()
412
    try:
413
      RemoveAuthorizedKey(tmpname,
414
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
415

    
416
      self.assertFileContent(tmpname,
417
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
418
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
419
    finally:
420
      os.unlink(tmpname)
421

    
422
  def testRemovingNonExistingKey(self):
423
    tmpname = self.writeTestFile()
424
    try:
425
      RemoveAuthorizedKey(tmpname,
426
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
427

    
428
      self.assertFileContent(tmpname,
429
        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
430
        'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
431
        " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
432
    finally:
433
      os.unlink(tmpname)
434

    
435

    
436
class TestEtcHosts(GanetiTestCase):
437
  """Test functions modifying /etc/hosts"""
438

    
439
  def writeTestFile(self):
440
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
441
    f = os.fdopen(fd, 'w')
442
    try:
443
      f.write('# This is a test file for /etc/hosts\n')
444
      f.write('127.0.0.1\tlocalhost\n')
445
      f.write('192.168.1.1 router gw\n')
446
    finally:
447
      f.close()
448

    
449
    return tmpname
450

    
451
  def testSettingNewIp(self):
452
    tmpname = self.writeTestFile()
453
    try:
454
      SetEtcHostsEntry(tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
455

    
456
      self.assertFileContent(tmpname,
457
        "# This is a test file for /etc/hosts\n"
458
        "127.0.0.1\tlocalhost\n"
459
        "192.168.1.1 router gw\n"
460
        "1.2.3.4\tmyhost.domain.tld myhost\n")
461
    finally:
462
      os.unlink(tmpname)
463

    
464
  def testSettingExistingIp(self):
465
    tmpname = self.writeTestFile()
466
    try:
467
      SetEtcHostsEntry(tmpname, '192.168.1.1', 'myhost.domain.tld', ['myhost'])
468

    
469
      self.assertFileContent(tmpname,
470
        "# This is a test file for /etc/hosts\n"
471
        "127.0.0.1\tlocalhost\n"
472
        "192.168.1.1\tmyhost.domain.tld myhost\n")
473
    finally:
474
      os.unlink(tmpname)
475

    
476
  def testRemovingExistingHost(self):
477
    tmpname = self.writeTestFile()
478
    try:
479
      RemoveEtcHostsEntry(tmpname, 'router')
480

    
481
      self.assertFileContent(tmpname,
482
        "# This is a test file for /etc/hosts\n"
483
        "127.0.0.1\tlocalhost\n"
484
        "192.168.1.1 gw\n")
485
    finally:
486
      os.unlink(tmpname)
487

    
488
  def testRemovingSingleExistingHost(self):
489
    tmpname = self.writeTestFile()
490
    try:
491
      RemoveEtcHostsEntry(tmpname, 'localhost')
492

    
493
      self.assertFileContent(tmpname,
494
        "# This is a test file for /etc/hosts\n"
495
        "192.168.1.1 router gw\n")
496
    finally:
497
      os.unlink(tmpname)
498

    
499
  def testRemovingNonExistingHost(self):
500
    tmpname = self.writeTestFile()
501
    try:
502
      RemoveEtcHostsEntry(tmpname, 'myhost')
503

    
504
      self.assertFileContent(tmpname,
505
        "# This is a test file for /etc/hosts\n"
506
        "127.0.0.1\tlocalhost\n"
507
        "192.168.1.1 router gw\n")
508
    finally:
509
      os.unlink(tmpname)
510

    
511
  def testRemovingAlias(self):
512
    tmpname = self.writeTestFile()
513
    try:
514
      RemoveEtcHostsEntry(tmpname, 'gw')
515

    
516
      self.assertFileContent(tmpname,
517
        "# This is a test file for /etc/hosts\n"
518
        "127.0.0.1\tlocalhost\n"
519
        "192.168.1.1 router\n")
520
    finally:
521
      os.unlink(tmpname)
522

    
523

    
524
class TestShellQuoting(unittest.TestCase):
525
  """Test case for shell quoting functions"""
526

    
527
  def testShellQuote(self):
528
    self.assertEqual(ShellQuote('abc'), "abc")
529
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
530
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
531
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
532
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
533

    
534
  def testShellQuoteArgs(self):
535
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
536
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
537
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
538

    
539

    
540
class TestTcpPing(unittest.TestCase):
541
  """Testcase for TCP version of ping - against listen(2)ing port"""
542

    
543
  def setUp(self):
544
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
545
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
546
    self.listenerport = self.listener.getsockname()[1]
547
    self.listener.listen(1)
548

    
549
  def tearDown(self):
550
    self.listener.shutdown(socket.SHUT_RDWR)
551
    del self.listener
552
    del self.listenerport
553

    
554
  def testTcpPingToLocalHostAccept(self):
555
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
556
                         constants.LOCALHOST_IP_ADDRESS,
557
                         self.listenerport,
558
                         timeout=10,
559
                         live_port_needed=True),
560
                 "failed to connect to test listener")
561

    
562

    
563
class TestTcpPingDeaf(unittest.TestCase):
564
  """Testcase for TCP version of ping - against non listen(2)ing port"""
565

    
566
  def setUp(self):
567
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
568
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
569
    self.deaflistenerport = self.deaflistener.getsockname()[1]
570

    
571
  def tearDown(self):
572
    del self.deaflistener
573
    del self.deaflistenerport
574

    
575
  def testTcpPingToLocalHostAcceptDeaf(self):
576
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
577
                        constants.LOCALHOST_IP_ADDRESS,
578
                        self.deaflistenerport,
579
                        timeout=constants.TCP_PING_TIMEOUT,
580
                        live_port_needed=True), # need successful connect(2)
581
                "successfully connected to deaf listener")
582

    
583
  def testTcpPingToLocalHostNoAccept(self):
584
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
585
                         constants.LOCALHOST_IP_ADDRESS,
586
                         self.deaflistenerport,
587
                         timeout=constants.TCP_PING_TIMEOUT,
588
                         live_port_needed=False), # ECONNREFUSED is OK
589
                 "failed to ping alive host on deaf port")
590

    
591

    
592
class TestListVisibleFiles(unittest.TestCase):
593
  """Test case for ListVisibleFiles"""
594

    
595
  def setUp(self):
596
    self.path = tempfile.mkdtemp()
597

    
598
  def tearDown(self):
599
    shutil.rmtree(self.path)
600

    
601
  def _test(self, files, expected):
602
    # Sort a copy
603
    expected = expected[:]
604
    expected.sort()
605

    
606
    for name in files:
607
      f = open(os.path.join(self.path, name), 'w')
608
      try:
609
        f.write("Test\n")
610
      finally:
611
        f.close()
612

    
613
    found = ListVisibleFiles(self.path)
614
    found.sort()
615

    
616
    self.assertEqual(found, expected)
617

    
618
  def testAllVisible(self):
619
    files = ["a", "b", "c"]
620
    expected = files
621
    self._test(files, expected)
622

    
623
  def testNoneVisible(self):
624
    files = [".a", ".b", ".c"]
625
    expected = []
626
    self._test(files, expected)
627

    
628
  def testSomeVisible(self):
629
    files = ["a", "b", ".c"]
630
    expected = ["a", "b"]
631
    self._test(files, expected)
632

    
633

    
634
class TestNewUUID(unittest.TestCase):
635
  """Test case for NewUUID"""
636

    
637
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
638
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
639

    
640
  def runTest(self):
641
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
642

    
643

    
644
if __name__ == '__main__':
645
  unittest.main()