Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ d6646186

History | View | Annotate | Download (20.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import socket
32
import shutil
33
import re
34

    
35
import ganeti
36
from ganeti import constants
37
from ganeti import utils
38
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
39
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
40
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
41
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
42
     SetEtcHostsEntry, RemoveEtcHostsEntry
43
from ganeti.errors import LockError, UnitParseError
44

    
45

    
46
class GanetiTestCase(unittest.TestCase):
47
  def assertFileContent(self, file_name, content):
48
    """Checks the content of a file.
49

50
    """
51
    handle = open(file_name, 'r')
52
    try:
53
      self.assertEqual(handle.read(), content)
54
    finally:
55
      handle.close()
56

    
57

    
58
class TestIsProcessAlive(unittest.TestCase):
59
  """Testing case for IsProcessAlive"""
60
  def setUp(self):
61
    # create a zombie and a (hopefully) non-existing process id
62
    self.pid_zombie = os.fork()
63
    if self.pid_zombie == 0:
64
      os._exit(0)
65
    elif self.pid_zombie < 0:
66
      raise SystemError("can't fork")
67
    self.pid_non_existing = os.fork()
68
    if self.pid_non_existing == 0:
69
      os._exit(0)
70
    elif self.pid_non_existing > 0:
71
      os.waitpid(self.pid_non_existing, 0)
72
    else:
73
      raise SystemError("can't fork")
74

    
75

    
76
  def testExists(self):
77
    mypid = os.getpid()
78
    self.assert_(IsProcessAlive(mypid),
79
                 "can't find myself running")
80

    
81
  def testZombie(self):
82
    self.assert_(not IsProcessAlive(self.pid_zombie),
83
                 "zombie not detected as zombie")
84

    
85

    
86
  def testNotExisting(self):
87
    self.assert_(not IsProcessAlive(self.pid_non_existing),
88
                 "noexisting process detected")
89

    
90

    
91
class TestLocking(unittest.TestCase):
92
  """Testing case for the Lock/Unlock functions"""
93

    
94
  def setUp(self):
95
    lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
96
                                suffix=".locking")
97
    self.old_lock_dir = constants.LOCK_DIR
98
    constants.LOCK_DIR = lock_dir
99

    
100
  def tearDown(self):
101
    try:
102
      ganeti.utils.Unlock("unittest")
103
    except LockError:
104
      pass
105
    shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
106
    constants.LOCK_DIR = self.old_lock_dir
107

    
108
  def clean_lock(self, name):
109
    try:
110
      ganeti.utils.Unlock("unittest")
111
    except LockError:
112
      pass
113

    
114

    
115
  def testLock(self):
116
    self.clean_lock("unittest")
117
    self.assertEqual(None, Lock("unittest"))
118

    
119

    
120
  def testUnlock(self):
121
    self.clean_lock("unittest")
122
    ganeti.utils.Lock("unittest")
123
    self.assertEqual(None, Unlock("unittest"))
124

    
125
  def testDoubleLock(self):
126
    self.clean_lock("unittest")
127
    ganeti.utils.Lock("unittest")
128
    self.assertRaises(LockError, Lock, "unittest")
129

    
130

    
131
class TestRunCmd(unittest.TestCase):
132
  """Testing case for the RunCmd function"""
133

    
134
  def setUp(self):
135
    self.magic = time.ctime() + " ganeti test"
136

    
137
  def testOk(self):
138
    """Test successful exit code"""
139
    result = RunCmd("/bin/sh -c 'exit 0'")
140
    self.assertEqual(result.exit_code, 0)
141

    
142
  def testFail(self):
143
    """Test fail exit code"""
144
    result = RunCmd("/bin/sh -c 'exit 1'")
145
    self.assertEqual(result.exit_code, 1)
146

    
147

    
148
  def testStdout(self):
149
    """Test standard output"""
150
    cmd = 'echo -n "%s"' % self.magic
151
    result = RunCmd("/bin/sh -c '%s'" % cmd)
152
    self.assertEqual(result.stdout, self.magic)
153

    
154

    
155
  def testStderr(self):
156
    """Test standard error"""
157
    cmd = 'echo -n "%s"' % self.magic
158
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
159
    self.assertEqual(result.stderr, self.magic)
160

    
161

    
162
  def testCombined(self):
163
    """Test combined output"""
164
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
165
    result = RunCmd("/bin/sh -c '%s'" % cmd)
166
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
167

    
168
  def testSignal(self):
169
    """Test standard error"""
170
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
171
    self.assertEqual(result.signal, 15)
172

    
173
  def testListRun(self):
174
    """Test list runs"""
175
    result = RunCmd(["true"])
176
    self.assertEqual(result.signal, None)
177
    self.assertEqual(result.exit_code, 0)
178
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
179
    self.assertEqual(result.signal, None)
180
    self.assertEqual(result.exit_code, 1)
181
    result = RunCmd(["echo", "-n", self.magic])
182
    self.assertEqual(result.signal, None)
183
    self.assertEqual(result.exit_code, 0)
184
    self.assertEqual(result.stdout, self.magic)
185

    
186
  def testLang(self):
187
    """Test locale environment"""
188
    old_env = os.environ.copy()
189
    try:
190
      os.environ["LANG"] = "en_US.UTF-8"
191
      os.environ["LC_ALL"] = "en_US.UTF-8"
192
      result = RunCmd(["locale"])
193
      for line in result.output.splitlines():
194
        key, value = line.split("=", 1)
195
        # Ignore these variables, they're overridden by LC_ALL
196
        if key == "LANG" or key == "LANGUAGE":
197
          continue
198
        self.failIf(value and value != "C" and value != '"C"',
199
            "Variable %s is set to the invalid value '%s'" % (key, value))
200
    finally:
201
      os.environ = old_env
202

    
203

    
204
class TestRemoveFile(unittest.TestCase):
205
  """Test case for the RemoveFile function"""
206

    
207
  def setUp(self):
208
    """Create a temp dir and file for each case"""
209
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
210
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
211
    os.close(fd)
212

    
213
  def tearDown(self):
214
    if os.path.exists(self.tmpfile):
215
      os.unlink(self.tmpfile)
216
    os.rmdir(self.tmpdir)
217

    
218

    
219
  def testIgnoreDirs(self):
220
    """Test that RemoveFile() ignores directories"""
221
    self.assertEqual(None, RemoveFile(self.tmpdir))
222

    
223

    
224
  def testIgnoreNotExisting(self):
225
    """Test that RemoveFile() ignores non-existing files"""
226
    RemoveFile(self.tmpfile)
227
    RemoveFile(self.tmpfile)
228

    
229

    
230
  def testRemoveFile(self):
231
    """Test that RemoveFile does remove a file"""
232
    RemoveFile(self.tmpfile)
233
    if os.path.exists(self.tmpfile):
234
      self.fail("File '%s' not removed" % self.tmpfile)
235

    
236

    
237
  def testRemoveSymlink(self):
238
    """Test that RemoveFile does remove symlinks"""
239
    symlink = self.tmpdir + "/symlink"
240
    os.symlink("no-such-file", symlink)
241
    RemoveFile(symlink)
242
    if os.path.exists(symlink):
243
      self.fail("File '%s' not removed" % symlink)
244
    os.symlink(self.tmpfile, symlink)
245
    RemoveFile(symlink)
246
    if os.path.exists(symlink):
247
      self.fail("File '%s' not removed" % symlink)
248

    
249

    
250
class TestCheckdict(unittest.TestCase):
251
  """Test case for the CheckDict function"""
252

    
253
  def testAdd(self):
254
    """Test that CheckDict adds a missing key with the correct value"""
255

    
256
    tgt = {'a':1}
257
    tmpl = {'b': 2}
258
    CheckDict(tgt, tmpl)
259
    if 'b' not in tgt or tgt['b'] != 2:
260
      self.fail("Failed to update dict")
261

    
262

    
263
  def testNoUpdate(self):
264
    """Test that CheckDict does not overwrite an existing key"""
265
    tgt = {'a':1, 'b': 3}
266
    tmpl = {'b': 2}
267
    CheckDict(tgt, tmpl)
268
    self.failUnlessEqual(tgt['b'], 3)
269

    
270

    
271
class TestMatchNameComponent(unittest.TestCase):
272
  """Test case for the MatchNameComponent function"""
273

    
274
  def testEmptyList(self):
275
    """Test that there is no match against an empty list"""
276

    
277
    self.failUnlessEqual(MatchNameComponent("", []), None)
278
    self.failUnlessEqual(MatchNameComponent("test", []), None)
279

    
280
  def testSingleMatch(self):
281
    """Test that a single match is performed correctly"""
282
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
283
    for key in "test2", "test2.example", "test2.example.com":
284
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
285

    
286
  def testMultipleMatches(self):
287
    """Test that a multiple match is returned as None"""
288
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
289
    for key in "test1", "test1.example":
290
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
291

    
292

    
293
class TestFormatUnit(unittest.TestCase):
294
  """Test case for the FormatUnit function"""
295

    
296
  def testMiB(self):
297
    self.assertEqual(FormatUnit(1), '1M')
298
    self.assertEqual(FormatUnit(100), '100M')
299
    self.assertEqual(FormatUnit(1023), '1023M')
300

    
301
  def testGiB(self):
302
    self.assertEqual(FormatUnit(1024), '1.0G')
303
    self.assertEqual(FormatUnit(1536), '1.5G')
304
    self.assertEqual(FormatUnit(17133), '16.7G')
305
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
306

    
307
  def testTiB(self):
308
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
309
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
310
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
311

    
312

    
313
class TestParseUnit(unittest.TestCase):
314
  """Test case for the ParseUnit function"""
315

    
316
  SCALES = (('', 1),
317
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
318
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
319
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
320

    
321
  def testRounding(self):
322
    self.assertEqual(ParseUnit('0'), 0)
323
    self.assertEqual(ParseUnit('1'), 4)
324
    self.assertEqual(ParseUnit('2'), 4)
325
    self.assertEqual(ParseUnit('3'), 4)
326

    
327
    self.assertEqual(ParseUnit('124'), 124)
328
    self.assertEqual(ParseUnit('125'), 128)
329
    self.assertEqual(ParseUnit('126'), 128)
330
    self.assertEqual(ParseUnit('127'), 128)
331
    self.assertEqual(ParseUnit('128'), 128)
332
    self.assertEqual(ParseUnit('129'), 132)
333
    self.assertEqual(ParseUnit('130'), 132)
334

    
335
  def testFloating(self):
336
    self.assertEqual(ParseUnit('0'), 0)
337
    self.assertEqual(ParseUnit('0.5'), 4)
338
    self.assertEqual(ParseUnit('1.75'), 4)
339
    self.assertEqual(ParseUnit('1.99'), 4)
340
    self.assertEqual(ParseUnit('2.00'), 4)
341
    self.assertEqual(ParseUnit('2.01'), 4)
342
    self.assertEqual(ParseUnit('3.99'), 4)
343
    self.assertEqual(ParseUnit('4.00'), 4)
344
    self.assertEqual(ParseUnit('4.01'), 8)
345
    self.assertEqual(ParseUnit('1.5G'), 1536)
346
    self.assertEqual(ParseUnit('1.8G'), 1844)
347
    self.assertEqual(ParseUnit('8.28T'), 8682212)
348

    
349
  def testSuffixes(self):
350
    for sep in ('', ' ', '   ', "\t", "\t "):
351
      for suffix, scale in TestParseUnit.SCALES:
352
        for func in (lambda x: x, str.lower, str.upper):
353
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
354
                           1024 * scale)
355

    
356
  def testInvalidInput(self):
357
    for sep in ('-', '_', ',', 'a'):
358
      for suffix, _ in TestParseUnit.SCALES:
359
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
360

    
361
    for suffix, _ in TestParseUnit.SCALES:
362
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
363

    
364

    
365
class TestSshKeys(GanetiTestCase):
366
  """Test case for the AddAuthorizedKey function"""
367

    
368
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
369
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
370
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
371

    
372
  def setUp(self):
373
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
374
    try:
375
      handle = os.fdopen(fd, 'w')
376
      try:
377
        handle.write("%s\n" % TestSshKeys.KEY_A)
378
        handle.write("%s\n" % TestSshKeys.KEY_B)
379
      finally:
380
        handle.close()
381
    except:
382
      utils.RemoveFile(self.tmpname)
383
      raise
384

    
385
  def tearDown(self):
386
    utils.RemoveFile(self.tmpname)
387
    del self.tmpname
388

    
389
  def testAddingNewKey(self):
390
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
391

    
392
    self.assertFileContent(self.tmpname,
393
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
394
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
395
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
396
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
397

    
398
  def testAddingAlmostButNotCompletelyTheSameKey(self):
399
    AddAuthorizedKey(self.tmpname,
400
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
401

    
402
    self.assertFileContent(self.tmpname,
403
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
404
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
405
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
406
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
407

    
408
  def testAddingExistingKeyWithSomeMoreSpaces(self):
409
    AddAuthorizedKey(self.tmpname,
410
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
411

    
412
    self.assertFileContent(self.tmpname,
413
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
414
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
415
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
416

    
417
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
418
    RemoveAuthorizedKey(self.tmpname,
419
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
420

    
421
    self.assertFileContent(self.tmpname,
422
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
423
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
424

    
425
  def testRemovingNonExistingKey(self):
426
    RemoveAuthorizedKey(self.tmpname,
427
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
428

    
429
    self.assertFileContent(self.tmpname,
430
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
431
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
432
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
433

    
434

    
435
class TestEtcHosts(GanetiTestCase):
436
  """Test functions modifying /etc/hosts"""
437

    
438
  def setUp(self):
439
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
440
    try:
441
      handle = os.fdopen(fd, 'w')
442
      try:
443
        handle.write('# This is a test file for /etc/hosts\n')
444
        handle.write('127.0.0.1\tlocalhost\n')
445
        handle.write('192.168.1.1 router gw\n')
446
      finally:
447
        handle.close()
448
    except:
449
      utils.RemoveFile(self.tmpname)
450
      raise
451

    
452
  def tearDown(self):
453
    utils.RemoveFile(self.tmpname)
454
    del self.tmpname
455

    
456
  def testSettingNewIp(self):
457
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
458

    
459
    self.assertFileContent(self.tmpname,
460
      "# This is a test file for /etc/hosts\n"
461
      "127.0.0.1\tlocalhost\n"
462
      "192.168.1.1 router gw\n"
463
      "1.2.3.4\tmyhost.domain.tld myhost\n")
464

    
465
  def testSettingExistingIp(self):
466
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
467
                     ['myhost'])
468

    
469
    self.assertFileContent(self.tmpname,
470
      "# This is a test file for /etc/hosts\n"
471
      "127.0.0.1\tlocalhost\n"
472
      "192.168.1.1\tmyhost.domain.tld myhost\n")
473

    
474
  def testSettingDuplicateName(self):
475
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
476

    
477
    self.assertFileContent(self.tmpname,
478
      "# This is a test file for /etc/hosts\n"
479
      "127.0.0.1\tlocalhost\n"
480
      "192.168.1.1 router gw\n"
481
      "1.2.3.4\tmyhost\n")
482

    
483
  def testRemovingExistingHost(self):
484
    RemoveEtcHostsEntry(self.tmpname, 'router')
485

    
486
    self.assertFileContent(self.tmpname,
487
      "# This is a test file for /etc/hosts\n"
488
      "127.0.0.1\tlocalhost\n"
489
      "192.168.1.1 gw\n")
490

    
491
  def testRemovingSingleExistingHost(self):
492
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
493

    
494
    self.assertFileContent(self.tmpname,
495
      "# This is a test file for /etc/hosts\n"
496
      "192.168.1.1 router gw\n")
497

    
498
  def testRemovingNonExistingHost(self):
499
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
500

    
501
    self.assertFileContent(self.tmpname,
502
      "# This is a test file for /etc/hosts\n"
503
      "127.0.0.1\tlocalhost\n"
504
      "192.168.1.1 router gw\n")
505

    
506
  def testRemovingAlias(self):
507
    RemoveEtcHostsEntry(self.tmpname, 'gw')
508

    
509
    self.assertFileContent(self.tmpname,
510
      "# This is a test file for /etc/hosts\n"
511
      "127.0.0.1\tlocalhost\n"
512
      "192.168.1.1 router\n")
513

    
514

    
515
class TestShellQuoting(unittest.TestCase):
516
  """Test case for shell quoting functions"""
517

    
518
  def testShellQuote(self):
519
    self.assertEqual(ShellQuote('abc'), "abc")
520
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
521
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
522
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
523
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
524

    
525
  def testShellQuoteArgs(self):
526
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
527
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
528
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
529

    
530

    
531
class TestTcpPing(unittest.TestCase):
532
  """Testcase for TCP version of ping - against listen(2)ing port"""
533

    
534
  def setUp(self):
535
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
536
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
537
    self.listenerport = self.listener.getsockname()[1]
538
    self.listener.listen(1)
539

    
540
  def tearDown(self):
541
    self.listener.shutdown(socket.SHUT_RDWR)
542
    del self.listener
543
    del self.listenerport
544

    
545
  def testTcpPingToLocalHostAccept(self):
546
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
547
                         constants.LOCALHOST_IP_ADDRESS,
548
                         self.listenerport,
549
                         timeout=10,
550
                         live_port_needed=True),
551
                 "failed to connect to test listener")
552

    
553

    
554
class TestTcpPingDeaf(unittest.TestCase):
555
  """Testcase for TCP version of ping - against non listen(2)ing port"""
556

    
557
  def setUp(self):
558
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
559
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
560
    self.deaflistenerport = self.deaflistener.getsockname()[1]
561

    
562
  def tearDown(self):
563
    del self.deaflistener
564
    del self.deaflistenerport
565

    
566
  def testTcpPingToLocalHostAcceptDeaf(self):
567
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
568
                        constants.LOCALHOST_IP_ADDRESS,
569
                        self.deaflistenerport,
570
                        timeout=constants.TCP_PING_TIMEOUT,
571
                        live_port_needed=True), # need successful connect(2)
572
                "successfully connected to deaf listener")
573

    
574
  def testTcpPingToLocalHostNoAccept(self):
575
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
576
                         constants.LOCALHOST_IP_ADDRESS,
577
                         self.deaflistenerport,
578
                         timeout=constants.TCP_PING_TIMEOUT,
579
                         live_port_needed=False), # ECONNREFUSED is OK
580
                 "failed to ping alive host on deaf port")
581

    
582

    
583
class TestListVisibleFiles(unittest.TestCase):
584
  """Test case for ListVisibleFiles"""
585

    
586
  def setUp(self):
587
    self.path = tempfile.mkdtemp()
588

    
589
  def tearDown(self):
590
    shutil.rmtree(self.path)
591

    
592
  def _test(self, files, expected):
593
    # Sort a copy
594
    expected = expected[:]
595
    expected.sort()
596

    
597
    for name in files:
598
      f = open(os.path.join(self.path, name), 'w')
599
      try:
600
        f.write("Test\n")
601
      finally:
602
        f.close()
603

    
604
    found = ListVisibleFiles(self.path)
605
    found.sort()
606

    
607
    self.assertEqual(found, expected)
608

    
609
  def testAllVisible(self):
610
    files = ["a", "b", "c"]
611
    expected = files
612
    self._test(files, expected)
613

    
614
  def testNoneVisible(self):
615
    files = [".a", ".b", ".c"]
616
    expected = []
617
    self._test(files, expected)
618

    
619
  def testSomeVisible(self):
620
    files = ["a", "b", ".c"]
621
    expected = ["a", "b"]
622
    self._test(files, expected)
623

    
624

    
625
class TestNewUUID(unittest.TestCase):
626
  """Test case for NewUUID"""
627

    
628
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
629
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
630

    
631
  def runTest(self):
632
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
633

    
634

    
635
class TestUniqueSequence(unittest.TestCase):
636
  """Test case for UniqueSequence"""
637

    
638
  def _test(self, input, expected):
639
    self.assertEqual(utils.UniqueSequence(input), expected)
640

    
641
  def runTest(self):
642
    # Ordered input
643
    self._test([1, 2, 3], [1, 2, 3])
644
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
645
    self._test([1, 2, 2, 3], [1, 2, 3])
646
    self._test([1, 2, 3, 3], [1, 2, 3])
647

    
648
    # Unordered input
649
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
650
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
651

    
652
    # Strings
653
    self._test(["a", "a"], ["a"])
654
    self._test(["a", "b"], ["a", "b"])
655
    self._test(["a", "b", "a"], ["a", "b"])
656

    
657

    
658
if __name__ == '__main__':
659
  unittest.main()