Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 2d6cfa31

History | View | Annotate | Download (20 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31
import shutil
32
import re
33

    
34
import ganeti
35
from ganeti import constants
36
from ganeti import utils
37
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
38
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
39
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
40
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
41
     AddEtcHostsEntry, RemoveEtcHostsEntry, \
42
     AddKnownHost, RemoveKnownHost
43
from ganeti.errors import LockError, UnitParseError
44

    
45

    
46
class TestIsProcessAlive(unittest.TestCase):
47
  """Testing case for IsProcessAlive"""
48
  def setUp(self):
49
    # create a zombie and a (hopefully) non-existing process id
50
    self.pid_zombie = os.fork()
51
    if self.pid_zombie == 0:
52
      os._exit(0)
53
    elif self.pid_zombie < 0:
54
      raise SystemError("can't fork")
55
    self.pid_non_existing = os.fork()
56
    if self.pid_non_existing == 0:
57
      os._exit(0)
58
    elif self.pid_non_existing > 0:
59
      os.waitpid(self.pid_non_existing, 0)
60
    else:
61
      raise SystemError("can't fork")
62

    
63

    
64
  def testExists(self):
65
    mypid = os.getpid()
66
    self.assert_(IsProcessAlive(mypid),
67
                 "can't find myself running")
68

    
69
  def testZombie(self):
70
    self.assert_(not IsProcessAlive(self.pid_zombie),
71
                 "zombie not detected as zombie")
72

    
73

    
74
  def testNotExisting(self):
75
    self.assert_(not IsProcessAlive(self.pid_non_existing),
76
                 "noexisting process detected")
77

    
78

    
79
class TestLocking(unittest.TestCase):
80
  """Testing case for the Lock/Unlock functions"""
81
  def clean_lock(self, name):
82
    try:
83
      ganeti.utils.Unlock("unittest")
84
    except LockError:
85
      pass
86

    
87

    
88
  def testLock(self):
89
    self.clean_lock("unittest")
90
    self.assertEqual(None, Lock("unittest"))
91

    
92

    
93
  def testUnlock(self):
94
    self.clean_lock("unittest")
95
    ganeti.utils.Lock("unittest")
96
    self.assertEqual(None, Unlock("unittest"))
97

    
98

    
99
  def testDoubleLock(self):
100
    self.clean_lock("unittest")
101
    ganeti.utils.Lock("unittest")
102
    self.assertRaises(LockError, Lock, "unittest")
103

    
104

    
105
class TestRunCmd(unittest.TestCase):
106
  """Testing case for the RunCmd function"""
107

    
108
  def setUp(self):
109
    self.magic = time.ctime() + " ganeti test"
110

    
111
  def testOk(self):
112
    """Test successful exit code"""
113
    result = RunCmd("/bin/sh -c 'exit 0'")
114
    self.assertEqual(result.exit_code, 0)
115

    
116
  def testFail(self):
117
    """Test fail exit code"""
118
    result = RunCmd("/bin/sh -c 'exit 1'")
119
    self.assertEqual(result.exit_code, 1)
120

    
121

    
122
  def testStdout(self):
123
    """Test standard output"""
124
    cmd = 'echo -n "%s"' % self.magic
125
    result = RunCmd("/bin/sh -c '%s'" % cmd)
126
    self.assertEqual(result.stdout, self.magic)
127

    
128

    
129
  def testStderr(self):
130
    """Test standard error"""
131
    cmd = 'echo -n "%s"' % self.magic
132
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
133
    self.assertEqual(result.stderr, self.magic)
134

    
135

    
136
  def testCombined(self):
137
    """Test combined output"""
138
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
139
    result = RunCmd("/bin/sh -c '%s'" % cmd)
140
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
141

    
142
  def testSignal(self):
143
    """Test standard error"""
144
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
145
    self.assertEqual(result.signal, 15)
146

    
147
  def testListRun(self):
148
    """Test list runs"""
149
    result = RunCmd(["true"])
150
    self.assertEqual(result.signal, None)
151
    self.assertEqual(result.exit_code, 0)
152
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
153
    self.assertEqual(result.signal, None)
154
    self.assertEqual(result.exit_code, 1)
155
    result = RunCmd(["echo", "-n", self.magic])
156
    self.assertEqual(result.signal, None)
157
    self.assertEqual(result.exit_code, 0)
158
    self.assertEqual(result.stdout, self.magic)
159

    
160
  def testLang(self):
161
    """Test locale environment"""
162
    old_env = os.environ.copy()
163
    try:
164
      os.environ["LANG"] = "en_US.UTF-8"
165
      os.environ["LC_ALL"] = "en_US.UTF-8"
166
      result = RunCmd(["locale"])
167
      for line in result.output.splitlines():
168
        key, value = line.split("=", 1)
169
        # Ignore these variables, they're overridden by LC_ALL
170
        if key == "LANG" or key == "LANGUAGE":
171
          continue
172
        self.failIf(value and value != "C" and value != '"C"',
173
            "Variable %s is set to the invalid value '%s'" % (key, value))
174
    finally:
175
      os.environ = old_env
176

    
177

    
178
class TestRemoveFile(unittest.TestCase):
179
  """Test case for the RemoveFile function"""
180

    
181
  def setUp(self):
182
    """Create a temp dir and file for each case"""
183
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
184
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
185
    os.close(fd)
186

    
187
  def tearDown(self):
188
    if os.path.exists(self.tmpfile):
189
      os.unlink(self.tmpfile)
190
    os.rmdir(self.tmpdir)
191

    
192

    
193
  def testIgnoreDirs(self):
194
    """Test that RemoveFile() ignores directories"""
195
    self.assertEqual(None, RemoveFile(self.tmpdir))
196

    
197

    
198
  def testIgnoreNotExisting(self):
199
    """Test that RemoveFile() ignores non-existing files"""
200
    RemoveFile(self.tmpfile)
201
    RemoveFile(self.tmpfile)
202

    
203

    
204
  def testRemoveFile(self):
205
    """Test that RemoveFile does remove a file"""
206
    RemoveFile(self.tmpfile)
207
    if os.path.exists(self.tmpfile):
208
      self.fail("File '%s' not removed" % self.tmpfile)
209

    
210

    
211
  def testRemoveSymlink(self):
212
    """Test that RemoveFile does remove symlinks"""
213
    symlink = self.tmpdir + "/symlink"
214
    os.symlink("no-such-file", symlink)
215
    RemoveFile(symlink)
216
    if os.path.exists(symlink):
217
      self.fail("File '%s' not removed" % symlink)
218
    os.symlink(self.tmpfile, symlink)
219
    RemoveFile(symlink)
220
    if os.path.exists(symlink):
221
      self.fail("File '%s' not removed" % symlink)
222

    
223

    
224
class TestCheckdict(unittest.TestCase):
225
  """Test case for the CheckDict function"""
226

    
227
  def testAdd(self):
228
    """Test that CheckDict adds a missing key with the correct value"""
229

    
230
    tgt = {'a':1}
231
    tmpl = {'b': 2}
232
    CheckDict(tgt, tmpl)
233
    if 'b' not in tgt or tgt['b'] != 2:
234
      self.fail("Failed to update dict")
235

    
236

    
237
  def testNoUpdate(self):
238
    """Test that CheckDict does not overwrite an existing key"""
239
    tgt = {'a':1, 'b': 3}
240
    tmpl = {'b': 2}
241
    CheckDict(tgt, tmpl)
242
    self.failUnlessEqual(tgt['b'], 3)
243

    
244

    
245
class TestMatchNameComponent(unittest.TestCase):
246
  """Test case for the MatchNameComponent function"""
247

    
248
  def testEmptyList(self):
249
    """Test that there is no match against an empty list"""
250

    
251
    self.failUnlessEqual(MatchNameComponent("", []), None)
252
    self.failUnlessEqual(MatchNameComponent("test", []), None)
253

    
254
  def testSingleMatch(self):
255
    """Test that a single match is performed correctly"""
256
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
257
    for key in "test2", "test2.example", "test2.example.com":
258
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
259

    
260
  def testMultipleMatches(self):
261
    """Test that a multiple match is returned as None"""
262
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
263
    for key in "test1", "test1.example":
264
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
265

    
266

    
267
class TestFormatUnit(unittest.TestCase):
268
  """Test case for the FormatUnit function"""
269

    
270
  def testMiB(self):
271
    self.assertEqual(FormatUnit(1), '1M')
272
    self.assertEqual(FormatUnit(100), '100M')
273
    self.assertEqual(FormatUnit(1023), '1023M')
274

    
275
  def testGiB(self):
276
    self.assertEqual(FormatUnit(1024), '1.0G')
277
    self.assertEqual(FormatUnit(1536), '1.5G')
278
    self.assertEqual(FormatUnit(17133), '16.7G')
279
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
280

    
281
  def testTiB(self):
282
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
283
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
284
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
285

    
286

    
287
class TestParseUnit(unittest.TestCase):
288
  """Test case for the ParseUnit function"""
289

    
290
  SCALES = (('', 1),
291
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
292
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
293
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
294

    
295
  def testRounding(self):
296
    self.assertEqual(ParseUnit('0'), 0)
297
    self.assertEqual(ParseUnit('1'), 4)
298
    self.assertEqual(ParseUnit('2'), 4)
299
    self.assertEqual(ParseUnit('3'), 4)
300

    
301
    self.assertEqual(ParseUnit('124'), 124)
302
    self.assertEqual(ParseUnit('125'), 128)
303
    self.assertEqual(ParseUnit('126'), 128)
304
    self.assertEqual(ParseUnit('127'), 128)
305
    self.assertEqual(ParseUnit('128'), 128)
306
    self.assertEqual(ParseUnit('129'), 132)
307
    self.assertEqual(ParseUnit('130'), 132)
308

    
309
  def testFloating(self):
310
    self.assertEqual(ParseUnit('0'), 0)
311
    self.assertEqual(ParseUnit('0.5'), 4)
312
    self.assertEqual(ParseUnit('1.75'), 4)
313
    self.assertEqual(ParseUnit('1.99'), 4)
314
    self.assertEqual(ParseUnit('2.00'), 4)
315
    self.assertEqual(ParseUnit('2.01'), 4)
316
    self.assertEqual(ParseUnit('3.99'), 4)
317
    self.assertEqual(ParseUnit('4.00'), 4)
318
    self.assertEqual(ParseUnit('4.01'), 8)
319
    self.assertEqual(ParseUnit('1.5G'), 1536)
320
    self.assertEqual(ParseUnit('1.8G'), 1844)
321
    self.assertEqual(ParseUnit('8.28T'), 8682212)
322

    
323
  def testSuffixes(self):
324
    for sep in ('', ' ', '   ', "\t", "\t "):
325
      for suffix, scale in TestParseUnit.SCALES:
326
        for func in (lambda x: x, str.lower, str.upper):
327
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
328

    
329
  def testInvalidInput(self):
330
    for sep in ('-', '_', ',', 'a'):
331
      for suffix, _ in TestParseUnit.SCALES:
332
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
333

    
334
    for suffix, _ in TestParseUnit.SCALES:
335
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
336

    
337

    
338
class TestSshKeys(unittest.TestCase):
339
  """Test case for the AddAuthorizedKey function"""
340

    
341
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
342
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
343
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
344

    
345
  # NOTE: The MD5 sums below were calculated after manually
346
  #       checking the output files.
347

    
348
  def writeTestFile(self):
349
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
350
    f = os.fdopen(fd, 'w')
351
    try:
352
      f.write(TestSshKeys.KEY_A)
353
      f.write("\n")
354
      f.write(TestSshKeys.KEY_B)
355
      f.write("\n")
356
    finally:
357
      f.close()
358

    
359
    return tmpname
360

    
361
  def testAddingNewKey(self):
362
    tmpname = self.writeTestFile()
363
    try:
364
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
365

    
366
      f = open(tmpname, 'r')
367
      try:
368
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
369
                         'ccc71523108ca6e9d0343797dc3e9f16')
370
      finally:
371
        f.close()
372
    finally:
373
      os.unlink(tmpname)
374

    
375
  def testAddingAlmostButNotCompletlyTheSameKey(self):
376
    tmpname = self.writeTestFile()
377
    try:
378
      AddAuthorizedKey(tmpname,
379
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
380

    
381
      f = open(tmpname, 'r')
382
      try:
383
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
384
                         'f2c939d57addb5b3a6846884be896b46')
385
      finally:
386
        f.close()
387
    finally:
388
      os.unlink(tmpname)
389

    
390
  def testAddingExistingKeyWithSomeMoreSpaces(self):
391
    tmpname = self.writeTestFile()
392
    try:
393
      AddAuthorizedKey(tmpname,
394
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
395

    
396
      f = open(tmpname, 'r')
397
      try:
398
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
399
                         '4e612764808bd46337eb0f575415fc30')
400
      finally:
401
        f.close()
402
    finally:
403
      os.unlink(tmpname)
404

    
405
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
406
    tmpname = self.writeTestFile()
407
    try:
408
      RemoveAuthorizedKey(tmpname,
409
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
410

    
411
      f = open(tmpname, 'r')
412
      try:
413
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
414
                         '77516d987fca07f70e30b830b3e4f2ed')
415
      finally:
416
        f.close()
417
    finally:
418
      os.unlink(tmpname)
419

    
420
  def testRemovingNonExistingKey(self):
421
    tmpname = self.writeTestFile()
422
    try:
423
      RemoveAuthorizedKey(tmpname,
424
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
425

    
426
      f = open(tmpname, 'r')
427
      try:
428
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
429
                         '4e612764808bd46337eb0f575415fc30')
430
      finally:
431
        f.close()
432
    finally:
433
      os.unlink(tmpname)
434

    
435

    
436
class TestEtcHosts(unittest.TestCase):
437
  """Test functions modifying /etc/hosts"""
438

    
439
  def writeTestFile(self):
440
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
441
    f = os.fdopen(fd, 'w')
442
    try:
443
      f.write('# This is a test file for /etc/hosts\n')
444
      f.write('127.0.0.1\tlocalhost\n')
445
      f.write('192.168.1.1 router gw\n')
446
    finally:
447
      f.close()
448

    
449
    return tmpname
450

    
451
  def testAddingNewIp(self):
452
    tmpname = self.writeTestFile()
453
    try:
454
      AddEtcHostsEntry(tmpname, 'myhost.domain.tld', '1.2.3.4')
455

    
456
      f = open(tmpname, 'r')
457
      try:
458
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
459
                         '00e0e88250482e7449743c89a49e9349')
460
      finally:
461
        f.close()
462
    finally:
463
      os.unlink(tmpname)
464

    
465
  def testAddingExistingIp(self):
466
    tmpname = self.writeTestFile()
467
    try:
468
      AddEtcHostsEntry(tmpname, 'myhost.domain.tld', '192.168.1.1')
469

    
470
      f = open(tmpname, 'r')
471
      try:
472
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
473
                         '4dc04c0acdd247175e0b980c6beea822')
474
      finally:
475
        f.close()
476
    finally:
477
      os.unlink(tmpname)
478

    
479
  def testRemovingExistingHost(self):
480
    tmpname = self.writeTestFile()
481
    try:
482
      RemoveEtcHostsEntry(tmpname, 'router')
483

    
484
      f = open(tmpname, 'r')
485
      try:
486
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
487
                         '7d1e7a559eedbc25e0dff67d33ccac84')
488
      finally:
489
        f.close()
490
    finally:
491
      os.unlink(tmpname)
492

    
493
  def testRemovingSingleExistingHost(self):
494
    tmpname = self.writeTestFile()
495
    try:
496
      RemoveEtcHostsEntry(tmpname, 'localhost')
497

    
498
      f = open(tmpname, 'r')
499
      try:
500
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
501
                         'ec4e4589b56f82fdb88db5675de011b1')
502
      finally:
503
        f.close()
504
    finally:
505
      os.unlink(tmpname)
506

    
507
  def testRemovingNonExistingHost(self):
508
    tmpname = self.writeTestFile()
509
    try:
510
      RemoveEtcHostsEntry(tmpname, 'myhost')
511

    
512
      f = open(tmpname, 'r')
513
      try:
514
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
515
                         'aa005bddc6d9ee399c296953f194929e')
516
      finally:
517
        f.close()
518
    finally:
519
      os.unlink(tmpname)
520

    
521

    
522
class TestKnownHosts(unittest.TestCase):
523
  """Test functions modifying known_hosts files"""
524

    
525
  def writeTestFile(self):
526
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
527
    f = os.fdopen(fd, 'w')
528
    try:
529
      f.write('node1.tld,node1\tssh-rsa AAAA1234567890=\n')
530
      f.write('node2,node2.tld ssh-rsa AAAA1234567890=\n')
531
    finally:
532
      f.close()
533

    
534
    return tmpname
535

    
536
  def testAddingNewHost(self):
537
    tmpname = self.writeTestFile()
538
    try:
539
      AddKnownHost(tmpname, 'node3.tld', 'AAAA0987654321=')
540

    
541
      f = open(tmpname, 'r')
542
      try:
543
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
544
                         '86cf3c7c7983a3bd5c475c4c1a3e5678')
545
      finally:
546
        f.close()
547
    finally:
548
      os.unlink(tmpname)
549

    
550
  def testAddingOldHost(self):
551
    tmpname = self.writeTestFile()
552
    try:
553
      AddKnownHost(tmpname, 'node2.tld', 'AAAA0987654321=')
554

    
555
      f = open(tmpname, 'r')
556
      try:
557
        os.system("vim %s" % tmpname)
558
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
559
                         '86cf3c7c7983a3bd5c475c4c1a3e5678')
560
      finally:
561
        f.close()
562
    finally:
563
      os.unlink(tmpname)
564

    
565

    
566

    
567
class TestShellQuoting(unittest.TestCase):
568
  """Test case for shell quoting functions"""
569

    
570
  def testShellQuote(self):
571
    self.assertEqual(ShellQuote('abc'), "abc")
572
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
573
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
574
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
575
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
576

    
577
  def testShellQuoteArgs(self):
578
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
579
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
580
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
581

    
582

    
583
class TestTcpPing(unittest.TestCase):
584
  """Testcase for TCP version of ping - against listen(2)ing port"""
585

    
586
  def setUp(self):
587
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
588
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
589
    self.listenerport = self.listener.getsockname()[1]
590
    self.listener.listen(1)
591

    
592
  def tearDown(self):
593
    self.listener.shutdown(socket.SHUT_RDWR)
594
    del self.listener
595
    del self.listenerport
596

    
597
  def testTcpPingToLocalHostAccept(self):
598
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
599
                         constants.LOCALHOST_IP_ADDRESS,
600
                         self.listenerport,
601
                         timeout=10,
602
                         live_port_needed=True),
603
                 "failed to connect to test listener")
604

    
605

    
606
class TestTcpPingDeaf(unittest.TestCase):
607
  """Testcase for TCP version of ping - against non listen(2)ing port"""
608

    
609
  def setUp(self):
610
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
611
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
612
    self.deaflistenerport = self.deaflistener.getsockname()[1]
613

    
614
  def tearDown(self):
615
    del self.deaflistener
616
    del self.deaflistenerport
617

    
618
  def testTcpPingToLocalHostAcceptDeaf(self):
619
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
620
                        constants.LOCALHOST_IP_ADDRESS,
621
                        self.deaflistenerport,
622
                        timeout=constants.TCP_PING_TIMEOUT,
623
                        live_port_needed=True), # need successful connect(2)
624
                "successfully connected to deaf listener")
625

    
626
  def testTcpPingToLocalHostNoAccept(self):
627
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
628
                         constants.LOCALHOST_IP_ADDRESS,
629
                         self.deaflistenerport,
630
                         timeout=constants.TCP_PING_TIMEOUT,
631
                         live_port_needed=False), # ECONNREFUSED is OK
632
                 "failed to ping alive host on deaf port")
633

    
634

    
635
class TestListVisibleFiles(unittest.TestCase):
636
  """Test case for ListVisibleFiles"""
637

    
638
  def setUp(self):
639
    self.path = tempfile.mkdtemp()
640

    
641
  def tearDown(self):
642
    shutil.rmtree(self.path)
643

    
644
  def _test(self, files, expected):
645
    # Sort a copy
646
    expected = expected[:]
647
    expected.sort()
648

    
649
    for name in files:
650
      f = open(os.path.join(self.path, name), 'w')
651
      try:
652
        f.write("Test\n")
653
      finally:
654
        f.close()
655

    
656
    found = ListVisibleFiles(self.path)
657
    found.sort()
658

    
659
    self.assertEqual(found, expected)
660

    
661
  def testAllVisible(self):
662
    files = ["a", "b", "c"]
663
    expected = files
664
    self._test(files, expected)
665

    
666
  def testNoneVisible(self):
667
    files = [".a", ".b", ".c"]
668
    expected = []
669
    self._test(files, expected)
670

    
671
  def testSomeVisible(self):
672
    files = ["a", "b", ".c"]
673
    expected = ["a", "b"]
674
    self._test(files, expected)
675

    
676

    
677
class TestNewUUID(unittest.TestCase):
678
  """Test case for NewUUID"""
679

    
680
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
681
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
682

    
683
  def runTest(self):
684
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
685

    
686

    
687
if __name__ == '__main__':
688
  unittest.main()