Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ c9c4f19e

History | View | Annotate | Download (19.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import socket
32
import shutil
33
import re
34

    
35
import ganeti
36
import testutils
37
from ganeti import constants
38
from ganeti import utils
39
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
40
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
41
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
42
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
43
     SetEtcHostsEntry, RemoveEtcHostsEntry
44
from ganeti.errors import LockError, UnitParseError
45

    
46

    
47
class TestIsProcessAlive(unittest.TestCase):
48
  """Testing case for IsProcessAlive"""
49
  def setUp(self):
50
    # create a zombie and a (hopefully) non-existing process id
51
    self.pid_zombie = os.fork()
52
    if self.pid_zombie == 0:
53
      os._exit(0)
54
    elif self.pid_zombie < 0:
55
      raise SystemError("can't fork")
56
    self.pid_non_existing = os.fork()
57
    if self.pid_non_existing == 0:
58
      os._exit(0)
59
    elif self.pid_non_existing > 0:
60
      os.waitpid(self.pid_non_existing, 0)
61
    else:
62
      raise SystemError("can't fork")
63

    
64

    
65
  def testExists(self):
66
    mypid = os.getpid()
67
    self.assert_(IsProcessAlive(mypid),
68
                 "can't find myself running")
69

    
70
  def testZombie(self):
71
    self.assert_(not IsProcessAlive(self.pid_zombie),
72
                 "zombie not detected as zombie")
73

    
74

    
75
  def testNotExisting(self):
76
    self.assert_(not IsProcessAlive(self.pid_non_existing),
77
                 "noexisting process detected")
78

    
79

    
80
class TestLocking(unittest.TestCase):
81
  """Testing case for the Lock/Unlock functions"""
82

    
83
  def setUp(self):
84
    lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
85
                                suffix=".locking")
86
    self.old_lock_dir = constants.LOCK_DIR
87
    constants.LOCK_DIR = lock_dir
88

    
89
  def tearDown(self):
90
    try:
91
      ganeti.utils.Unlock("unittest")
92
    except LockError:
93
      pass
94
    shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
95
    constants.LOCK_DIR = self.old_lock_dir
96

    
97
  def clean_lock(self, name):
98
    try:
99
      ganeti.utils.Unlock("unittest")
100
    except LockError:
101
      pass
102

    
103

    
104
  def testLock(self):
105
    self.clean_lock("unittest")
106
    self.assertEqual(None, Lock("unittest"))
107

    
108

    
109
  def testUnlock(self):
110
    self.clean_lock("unittest")
111
    ganeti.utils.Lock("unittest")
112
    self.assertEqual(None, Unlock("unittest"))
113

    
114
  def testDoubleLock(self):
115
    self.clean_lock("unittest")
116
    ganeti.utils.Lock("unittest")
117
    self.assertRaises(LockError, Lock, "unittest")
118

    
119

    
120
class TestRunCmd(unittest.TestCase):
121
  """Testing case for the RunCmd function"""
122

    
123
  def setUp(self):
124
    self.magic = time.ctime() + " ganeti test"
125

    
126
  def testOk(self):
127
    """Test successful exit code"""
128
    result = RunCmd("/bin/sh -c 'exit 0'")
129
    self.assertEqual(result.exit_code, 0)
130

    
131
  def testFail(self):
132
    """Test fail exit code"""
133
    result = RunCmd("/bin/sh -c 'exit 1'")
134
    self.assertEqual(result.exit_code, 1)
135

    
136

    
137
  def testStdout(self):
138
    """Test standard output"""
139
    cmd = 'echo -n "%s"' % self.magic
140
    result = RunCmd("/bin/sh -c '%s'" % cmd)
141
    self.assertEqual(result.stdout, self.magic)
142

    
143

    
144
  def testStderr(self):
145
    """Test standard error"""
146
    cmd = 'echo -n "%s"' % self.magic
147
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
148
    self.assertEqual(result.stderr, self.magic)
149

    
150

    
151
  def testCombined(self):
152
    """Test combined output"""
153
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
154
    result = RunCmd("/bin/sh -c '%s'" % cmd)
155
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
156

    
157
  def testSignal(self):
158
    """Test standard error"""
159
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
160
    self.assertEqual(result.signal, 15)
161

    
162
  def testListRun(self):
163
    """Test list runs"""
164
    result = RunCmd(["true"])
165
    self.assertEqual(result.signal, None)
166
    self.assertEqual(result.exit_code, 0)
167
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
168
    self.assertEqual(result.signal, None)
169
    self.assertEqual(result.exit_code, 1)
170
    result = RunCmd(["echo", "-n", self.magic])
171
    self.assertEqual(result.signal, None)
172
    self.assertEqual(result.exit_code, 0)
173
    self.assertEqual(result.stdout, self.magic)
174

    
175
  def testLang(self):
176
    """Test locale environment"""
177
    old_env = os.environ.copy()
178
    try:
179
      os.environ["LANG"] = "en_US.UTF-8"
180
      os.environ["LC_ALL"] = "en_US.UTF-8"
181
      result = RunCmd(["locale"])
182
      for line in result.output.splitlines():
183
        key, value = line.split("=", 1)
184
        # Ignore these variables, they're overridden by LC_ALL
185
        if key == "LANG" or key == "LANGUAGE":
186
          continue
187
        self.failIf(value and value != "C" and value != '"C"',
188
            "Variable %s is set to the invalid value '%s'" % (key, value))
189
    finally:
190
      os.environ = old_env
191

    
192

    
193
class TestRemoveFile(unittest.TestCase):
194
  """Test case for the RemoveFile function"""
195

    
196
  def setUp(self):
197
    """Create a temp dir and file for each case"""
198
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
199
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
200
    os.close(fd)
201

    
202
  def tearDown(self):
203
    if os.path.exists(self.tmpfile):
204
      os.unlink(self.tmpfile)
205
    os.rmdir(self.tmpdir)
206

    
207

    
208
  def testIgnoreDirs(self):
209
    """Test that RemoveFile() ignores directories"""
210
    self.assertEqual(None, RemoveFile(self.tmpdir))
211

    
212

    
213
  def testIgnoreNotExisting(self):
214
    """Test that RemoveFile() ignores non-existing files"""
215
    RemoveFile(self.tmpfile)
216
    RemoveFile(self.tmpfile)
217

    
218

    
219
  def testRemoveFile(self):
220
    """Test that RemoveFile does remove a file"""
221
    RemoveFile(self.tmpfile)
222
    if os.path.exists(self.tmpfile):
223
      self.fail("File '%s' not removed" % self.tmpfile)
224

    
225

    
226
  def testRemoveSymlink(self):
227
    """Test that RemoveFile does remove symlinks"""
228
    symlink = self.tmpdir + "/symlink"
229
    os.symlink("no-such-file", symlink)
230
    RemoveFile(symlink)
231
    if os.path.exists(symlink):
232
      self.fail("File '%s' not removed" % symlink)
233
    os.symlink(self.tmpfile, symlink)
234
    RemoveFile(symlink)
235
    if os.path.exists(symlink):
236
      self.fail("File '%s' not removed" % symlink)
237

    
238

    
239
class TestCheckdict(unittest.TestCase):
240
  """Test case for the CheckDict function"""
241

    
242
  def testAdd(self):
243
    """Test that CheckDict adds a missing key with the correct value"""
244

    
245
    tgt = {'a':1}
246
    tmpl = {'b': 2}
247
    CheckDict(tgt, tmpl)
248
    if 'b' not in tgt or tgt['b'] != 2:
249
      self.fail("Failed to update dict")
250

    
251

    
252
  def testNoUpdate(self):
253
    """Test that CheckDict does not overwrite an existing key"""
254
    tgt = {'a':1, 'b': 3}
255
    tmpl = {'b': 2}
256
    CheckDict(tgt, tmpl)
257
    self.failUnlessEqual(tgt['b'], 3)
258

    
259

    
260
class TestMatchNameComponent(unittest.TestCase):
261
  """Test case for the MatchNameComponent function"""
262

    
263
  def testEmptyList(self):
264
    """Test that there is no match against an empty list"""
265

    
266
    self.failUnlessEqual(MatchNameComponent("", []), None)
267
    self.failUnlessEqual(MatchNameComponent("test", []), None)
268

    
269
  def testSingleMatch(self):
270
    """Test that a single match is performed correctly"""
271
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
272
    for key in "test2", "test2.example", "test2.example.com":
273
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
274

    
275
  def testMultipleMatches(self):
276
    """Test that a multiple match is returned as None"""
277
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
278
    for key in "test1", "test1.example":
279
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
280

    
281

    
282
class TestFormatUnit(unittest.TestCase):
283
  """Test case for the FormatUnit function"""
284

    
285
  def testMiB(self):
286
    self.assertEqual(FormatUnit(1), '1M')
287
    self.assertEqual(FormatUnit(100), '100M')
288
    self.assertEqual(FormatUnit(1023), '1023M')
289

    
290
  def testGiB(self):
291
    self.assertEqual(FormatUnit(1024), '1.0G')
292
    self.assertEqual(FormatUnit(1536), '1.5G')
293
    self.assertEqual(FormatUnit(17133), '16.7G')
294
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
295

    
296
  def testTiB(self):
297
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
298
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
299
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
300

    
301

    
302
class TestParseUnit(unittest.TestCase):
303
  """Test case for the ParseUnit function"""
304

    
305
  SCALES = (('', 1),
306
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
307
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
308
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
309

    
310
  def testRounding(self):
311
    self.assertEqual(ParseUnit('0'), 0)
312
    self.assertEqual(ParseUnit('1'), 4)
313
    self.assertEqual(ParseUnit('2'), 4)
314
    self.assertEqual(ParseUnit('3'), 4)
315

    
316
    self.assertEqual(ParseUnit('124'), 124)
317
    self.assertEqual(ParseUnit('125'), 128)
318
    self.assertEqual(ParseUnit('126'), 128)
319
    self.assertEqual(ParseUnit('127'), 128)
320
    self.assertEqual(ParseUnit('128'), 128)
321
    self.assertEqual(ParseUnit('129'), 132)
322
    self.assertEqual(ParseUnit('130'), 132)
323

    
324
  def testFloating(self):
325
    self.assertEqual(ParseUnit('0'), 0)
326
    self.assertEqual(ParseUnit('0.5'), 4)
327
    self.assertEqual(ParseUnit('1.75'), 4)
328
    self.assertEqual(ParseUnit('1.99'), 4)
329
    self.assertEqual(ParseUnit('2.00'), 4)
330
    self.assertEqual(ParseUnit('2.01'), 4)
331
    self.assertEqual(ParseUnit('3.99'), 4)
332
    self.assertEqual(ParseUnit('4.00'), 4)
333
    self.assertEqual(ParseUnit('4.01'), 8)
334
    self.assertEqual(ParseUnit('1.5G'), 1536)
335
    self.assertEqual(ParseUnit('1.8G'), 1844)
336
    self.assertEqual(ParseUnit('8.28T'), 8682212)
337

    
338
  def testSuffixes(self):
339
    for sep in ('', ' ', '   ', "\t", "\t "):
340
      for suffix, scale in TestParseUnit.SCALES:
341
        for func in (lambda x: x, str.lower, str.upper):
342
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
343
                           1024 * scale)
344

    
345
  def testInvalidInput(self):
346
    for sep in ('-', '_', ',', 'a'):
347
      for suffix, _ in TestParseUnit.SCALES:
348
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
349

    
350
    for suffix, _ in TestParseUnit.SCALES:
351
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
352

    
353

    
354
class TestSshKeys(testutils.GanetiTestCase):
355
  """Test case for the AddAuthorizedKey function"""
356

    
357
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
358
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
359
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
360

    
361
  def setUp(self):
362
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
363
    try:
364
      handle = os.fdopen(fd, 'w')
365
      try:
366
        handle.write("%s\n" % TestSshKeys.KEY_A)
367
        handle.write("%s\n" % TestSshKeys.KEY_B)
368
      finally:
369
        handle.close()
370
    except:
371
      utils.RemoveFile(self.tmpname)
372
      raise
373

    
374
  def tearDown(self):
375
    utils.RemoveFile(self.tmpname)
376
    del self.tmpname
377

    
378
  def testAddingNewKey(self):
379
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
380

    
381
    self.assertFileContent(self.tmpname,
382
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
383
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
384
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
385
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
386

    
387
  def testAddingAlmostButNotCompletelyTheSameKey(self):
388
    AddAuthorizedKey(self.tmpname,
389
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
390

    
391
    self.assertFileContent(self.tmpname,
392
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
393
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
394
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
395
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
396

    
397
  def testAddingExistingKeyWithSomeMoreSpaces(self):
398
    AddAuthorizedKey(self.tmpname,
399
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
400

    
401
    self.assertFileContent(self.tmpname,
402
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
403
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
404
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
405

    
406
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
407
    RemoveAuthorizedKey(self.tmpname,
408
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
409

    
410
    self.assertFileContent(self.tmpname,
411
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
412
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
413

    
414
  def testRemovingNonExistingKey(self):
415
    RemoveAuthorizedKey(self.tmpname,
416
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
417

    
418
    self.assertFileContent(self.tmpname,
419
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
420
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
421
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
422

    
423

    
424
class TestEtcHosts(testutils.GanetiTestCase):
425
  """Test functions modifying /etc/hosts"""
426

    
427
  def setUp(self):
428
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
429
    try:
430
      handle = os.fdopen(fd, 'w')
431
      try:
432
        handle.write('# This is a test file for /etc/hosts\n')
433
        handle.write('127.0.0.1\tlocalhost\n')
434
        handle.write('192.168.1.1 router gw\n')
435
      finally:
436
        handle.close()
437
    except:
438
      utils.RemoveFile(self.tmpname)
439
      raise
440

    
441
  def tearDown(self):
442
    utils.RemoveFile(self.tmpname)
443
    del self.tmpname
444

    
445
  def testSettingNewIp(self):
446
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
447

    
448
    self.assertFileContent(self.tmpname,
449
      "# This is a test file for /etc/hosts\n"
450
      "127.0.0.1\tlocalhost\n"
451
      "192.168.1.1 router gw\n"
452
      "1.2.3.4\tmyhost.domain.tld myhost\n")
453

    
454
  def testSettingExistingIp(self):
455
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
456
                     ['myhost'])
457

    
458
    self.assertFileContent(self.tmpname,
459
      "# This is a test file for /etc/hosts\n"
460
      "127.0.0.1\tlocalhost\n"
461
      "192.168.1.1\tmyhost.domain.tld myhost\n")
462

    
463
  def testSettingDuplicateName(self):
464
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
465

    
466
    self.assertFileContent(self.tmpname,
467
      "# This is a test file for /etc/hosts\n"
468
      "127.0.0.1\tlocalhost\n"
469
      "192.168.1.1 router gw\n"
470
      "1.2.3.4\tmyhost\n")
471

    
472
  def testRemovingExistingHost(self):
473
    RemoveEtcHostsEntry(self.tmpname, 'router')
474

    
475
    self.assertFileContent(self.tmpname,
476
      "# This is a test file for /etc/hosts\n"
477
      "127.0.0.1\tlocalhost\n"
478
      "192.168.1.1 gw\n")
479

    
480
  def testRemovingSingleExistingHost(self):
481
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
482

    
483
    self.assertFileContent(self.tmpname,
484
      "# This is a test file for /etc/hosts\n"
485
      "192.168.1.1 router gw\n")
486

    
487
  def testRemovingNonExistingHost(self):
488
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
489

    
490
    self.assertFileContent(self.tmpname,
491
      "# This is a test file for /etc/hosts\n"
492
      "127.0.0.1\tlocalhost\n"
493
      "192.168.1.1 router gw\n")
494

    
495
  def testRemovingAlias(self):
496
    RemoveEtcHostsEntry(self.tmpname, 'gw')
497

    
498
    self.assertFileContent(self.tmpname,
499
      "# This is a test file for /etc/hosts\n"
500
      "127.0.0.1\tlocalhost\n"
501
      "192.168.1.1 router\n")
502

    
503

    
504
class TestShellQuoting(unittest.TestCase):
505
  """Test case for shell quoting functions"""
506

    
507
  def testShellQuote(self):
508
    self.assertEqual(ShellQuote('abc'), "abc")
509
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
510
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
511
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
512
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
513

    
514
  def testShellQuoteArgs(self):
515
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
516
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
517
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
518

    
519

    
520
class TestTcpPing(unittest.TestCase):
521
  """Testcase for TCP version of ping - against listen(2)ing port"""
522

    
523
  def setUp(self):
524
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
525
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
526
    self.listenerport = self.listener.getsockname()[1]
527
    self.listener.listen(1)
528

    
529
  def tearDown(self):
530
    self.listener.shutdown(socket.SHUT_RDWR)
531
    del self.listener
532
    del self.listenerport
533

    
534
  def testTcpPingToLocalHostAccept(self):
535
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
536
                         constants.LOCALHOST_IP_ADDRESS,
537
                         self.listenerport,
538
                         timeout=10,
539
                         live_port_needed=True),
540
                 "failed to connect to test listener")
541

    
542

    
543
class TestTcpPingDeaf(unittest.TestCase):
544
  """Testcase for TCP version of ping - against non listen(2)ing port"""
545

    
546
  def setUp(self):
547
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
548
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
549
    self.deaflistenerport = self.deaflistener.getsockname()[1]
550

    
551
  def tearDown(self):
552
    del self.deaflistener
553
    del self.deaflistenerport
554

    
555
  def testTcpPingToLocalHostAcceptDeaf(self):
556
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
557
                        constants.LOCALHOST_IP_ADDRESS,
558
                        self.deaflistenerport,
559
                        timeout=constants.TCP_PING_TIMEOUT,
560
                        live_port_needed=True), # need successful connect(2)
561
                "successfully connected to deaf listener")
562

    
563
  def testTcpPingToLocalHostNoAccept(self):
564
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
565
                         constants.LOCALHOST_IP_ADDRESS,
566
                         self.deaflistenerport,
567
                         timeout=constants.TCP_PING_TIMEOUT,
568
                         live_port_needed=False), # ECONNREFUSED is OK
569
                 "failed to ping alive host on deaf port")
570

    
571

    
572
class TestListVisibleFiles(unittest.TestCase):
573
  """Test case for ListVisibleFiles"""
574

    
575
  def setUp(self):
576
    self.path = tempfile.mkdtemp()
577

    
578
  def tearDown(self):
579
    shutil.rmtree(self.path)
580

    
581
  def _test(self, files, expected):
582
    # Sort a copy
583
    expected = expected[:]
584
    expected.sort()
585

    
586
    for name in files:
587
      f = open(os.path.join(self.path, name), 'w')
588
      try:
589
        f.write("Test\n")
590
      finally:
591
        f.close()
592

    
593
    found = ListVisibleFiles(self.path)
594
    found.sort()
595

    
596
    self.assertEqual(found, expected)
597

    
598
  def testAllVisible(self):
599
    files = ["a", "b", "c"]
600
    expected = files
601
    self._test(files, expected)
602

    
603
  def testNoneVisible(self):
604
    files = [".a", ".b", ".c"]
605
    expected = []
606
    self._test(files, expected)
607

    
608
  def testSomeVisible(self):
609
    files = ["a", "b", ".c"]
610
    expected = ["a", "b"]
611
    self._test(files, expected)
612

    
613

    
614
class TestNewUUID(unittest.TestCase):
615
  """Test case for NewUUID"""
616

    
617
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
618
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
619

    
620
  def runTest(self):
621
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
622

    
623

    
624
class TestUniqueSequence(unittest.TestCase):
625
  """Test case for UniqueSequence"""
626

    
627
  def _test(self, input, expected):
628
    self.assertEqual(utils.UniqueSequence(input), expected)
629

    
630
  def runTest(self):
631
    # Ordered input
632
    self._test([1, 2, 3], [1, 2, 3])
633
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
634
    self._test([1, 2, 2, 3], [1, 2, 3])
635
    self._test([1, 2, 3, 3], [1, 2, 3])
636

    
637
    # Unordered input
638
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
639
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
640

    
641
    # Strings
642
    self._test(["a", "a"], ["a"])
643
    self._test(["a", "b"], ["a", "b"])
644
    self._test(["a", "b", "a"], ["a", "b"])
645

    
646

    
647
if __name__ == '__main__':
648
  unittest.main()