Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 7fbb1f65

History | View | Annotate | Download (19.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31
import shutil
32
import re
33

    
34
import ganeti
35
from ganeti import constants
36
from ganeti import utils
37
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
38
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
39
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
40
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
41
     SetEtcHostsEntry, RemoveEtcHostsEntry
42
from ganeti.errors import LockError, UnitParseError
43

    
44

    
45
class GanetiTestCase(unittest.TestCase):
46
  def assertFileContent(self, file_name, content):
47
    """Checks the content of a file.
48

49
    """
50
    handle = open(file_name, 'r')
51
    try:
52
      self.assertEqual(handle.read(), content)
53
    finally:
54
      handle.close()
55

    
56

    
57
class TestIsProcessAlive(unittest.TestCase):
58
  """Testing case for IsProcessAlive"""
59
  def setUp(self):
60
    # create a zombie and a (hopefully) non-existing process id
61
    self.pid_zombie = os.fork()
62
    if self.pid_zombie == 0:
63
      os._exit(0)
64
    elif self.pid_zombie < 0:
65
      raise SystemError("can't fork")
66
    self.pid_non_existing = os.fork()
67
    if self.pid_non_existing == 0:
68
      os._exit(0)
69
    elif self.pid_non_existing > 0:
70
      os.waitpid(self.pid_non_existing, 0)
71
    else:
72
      raise SystemError("can't fork")
73

    
74

    
75
  def testExists(self):
76
    mypid = os.getpid()
77
    self.assert_(IsProcessAlive(mypid),
78
                 "can't find myself running")
79

    
80
  def testZombie(self):
81
    self.assert_(not IsProcessAlive(self.pid_zombie),
82
                 "zombie not detected as zombie")
83

    
84

    
85
  def testNotExisting(self):
86
    self.assert_(not IsProcessAlive(self.pid_non_existing),
87
                 "noexisting process detected")
88

    
89

    
90
class TestLocking(unittest.TestCase):
91
  """Testing case for the Lock/Unlock functions"""
92
  def clean_lock(self, name):
93
    try:
94
      ganeti.utils.Unlock("unittest")
95
    except LockError:
96
      pass
97

    
98

    
99
  def testLock(self):
100
    self.clean_lock("unittest")
101
    self.assertEqual(None, Lock("unittest"))
102

    
103

    
104
  def testUnlock(self):
105
    self.clean_lock("unittest")
106
    ganeti.utils.Lock("unittest")
107
    self.assertEqual(None, Unlock("unittest"))
108

    
109

    
110
  def testDoubleLock(self):
111
    self.clean_lock("unittest")
112
    ganeti.utils.Lock("unittest")
113
    self.assertRaises(LockError, Lock, "unittest")
114

    
115

    
116
class TestRunCmd(unittest.TestCase):
117
  """Testing case for the RunCmd function"""
118

    
119
  def setUp(self):
120
    self.magic = time.ctime() + " ganeti test"
121

    
122
  def testOk(self):
123
    """Test successful exit code"""
124
    result = RunCmd("/bin/sh -c 'exit 0'")
125
    self.assertEqual(result.exit_code, 0)
126

    
127
  def testFail(self):
128
    """Test fail exit code"""
129
    result = RunCmd("/bin/sh -c 'exit 1'")
130
    self.assertEqual(result.exit_code, 1)
131

    
132

    
133
  def testStdout(self):
134
    """Test standard output"""
135
    cmd = 'echo -n "%s"' % self.magic
136
    result = RunCmd("/bin/sh -c '%s'" % cmd)
137
    self.assertEqual(result.stdout, self.magic)
138

    
139

    
140
  def testStderr(self):
141
    """Test standard error"""
142
    cmd = 'echo -n "%s"' % self.magic
143
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
144
    self.assertEqual(result.stderr, self.magic)
145

    
146

    
147
  def testCombined(self):
148
    """Test combined output"""
149
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
150
    result = RunCmd("/bin/sh -c '%s'" % cmd)
151
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
152

    
153
  def testSignal(self):
154
    """Test standard error"""
155
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
156
    self.assertEqual(result.signal, 15)
157

    
158
  def testListRun(self):
159
    """Test list runs"""
160
    result = RunCmd(["true"])
161
    self.assertEqual(result.signal, None)
162
    self.assertEqual(result.exit_code, 0)
163
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
164
    self.assertEqual(result.signal, None)
165
    self.assertEqual(result.exit_code, 1)
166
    result = RunCmd(["echo", "-n", self.magic])
167
    self.assertEqual(result.signal, None)
168
    self.assertEqual(result.exit_code, 0)
169
    self.assertEqual(result.stdout, self.magic)
170

    
171
  def testLang(self):
172
    """Test locale environment"""
173
    old_env = os.environ.copy()
174
    try:
175
      os.environ["LANG"] = "en_US.UTF-8"
176
      os.environ["LC_ALL"] = "en_US.UTF-8"
177
      result = RunCmd(["locale"])
178
      for line in result.output.splitlines():
179
        key, value = line.split("=", 1)
180
        # Ignore these variables, they're overridden by LC_ALL
181
        if key == "LANG" or key == "LANGUAGE":
182
          continue
183
        self.failIf(value and value != "C" and value != '"C"',
184
            "Variable %s is set to the invalid value '%s'" % (key, value))
185
    finally:
186
      os.environ = old_env
187

    
188

    
189
class TestRemoveFile(unittest.TestCase):
190
  """Test case for the RemoveFile function"""
191

    
192
  def setUp(self):
193
    """Create a temp dir and file for each case"""
194
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
195
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
196
    os.close(fd)
197

    
198
  def tearDown(self):
199
    if os.path.exists(self.tmpfile):
200
      os.unlink(self.tmpfile)
201
    os.rmdir(self.tmpdir)
202

    
203

    
204
  def testIgnoreDirs(self):
205
    """Test that RemoveFile() ignores directories"""
206
    self.assertEqual(None, RemoveFile(self.tmpdir))
207

    
208

    
209
  def testIgnoreNotExisting(self):
210
    """Test that RemoveFile() ignores non-existing files"""
211
    RemoveFile(self.tmpfile)
212
    RemoveFile(self.tmpfile)
213

    
214

    
215
  def testRemoveFile(self):
216
    """Test that RemoveFile does remove a file"""
217
    RemoveFile(self.tmpfile)
218
    if os.path.exists(self.tmpfile):
219
      self.fail("File '%s' not removed" % self.tmpfile)
220

    
221

    
222
  def testRemoveSymlink(self):
223
    """Test that RemoveFile does remove symlinks"""
224
    symlink = self.tmpdir + "/symlink"
225
    os.symlink("no-such-file", symlink)
226
    RemoveFile(symlink)
227
    if os.path.exists(symlink):
228
      self.fail("File '%s' not removed" % symlink)
229
    os.symlink(self.tmpfile, symlink)
230
    RemoveFile(symlink)
231
    if os.path.exists(symlink):
232
      self.fail("File '%s' not removed" % symlink)
233

    
234

    
235
class TestCheckdict(unittest.TestCase):
236
  """Test case for the CheckDict function"""
237

    
238
  def testAdd(self):
239
    """Test that CheckDict adds a missing key with the correct value"""
240

    
241
    tgt = {'a':1}
242
    tmpl = {'b': 2}
243
    CheckDict(tgt, tmpl)
244
    if 'b' not in tgt or tgt['b'] != 2:
245
      self.fail("Failed to update dict")
246

    
247

    
248
  def testNoUpdate(self):
249
    """Test that CheckDict does not overwrite an existing key"""
250
    tgt = {'a':1, 'b': 3}
251
    tmpl = {'b': 2}
252
    CheckDict(tgt, tmpl)
253
    self.failUnlessEqual(tgt['b'], 3)
254

    
255

    
256
class TestMatchNameComponent(unittest.TestCase):
257
  """Test case for the MatchNameComponent function"""
258

    
259
  def testEmptyList(self):
260
    """Test that there is no match against an empty list"""
261

    
262
    self.failUnlessEqual(MatchNameComponent("", []), None)
263
    self.failUnlessEqual(MatchNameComponent("test", []), None)
264

    
265
  def testSingleMatch(self):
266
    """Test that a single match is performed correctly"""
267
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
268
    for key in "test2", "test2.example", "test2.example.com":
269
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
270

    
271
  def testMultipleMatches(self):
272
    """Test that a multiple match is returned as None"""
273
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
274
    for key in "test1", "test1.example":
275
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
276

    
277

    
278
class TestFormatUnit(unittest.TestCase):
279
  """Test case for the FormatUnit function"""
280

    
281
  def testMiB(self):
282
    self.assertEqual(FormatUnit(1), '1M')
283
    self.assertEqual(FormatUnit(100), '100M')
284
    self.assertEqual(FormatUnit(1023), '1023M')
285

    
286
  def testGiB(self):
287
    self.assertEqual(FormatUnit(1024), '1.0G')
288
    self.assertEqual(FormatUnit(1536), '1.5G')
289
    self.assertEqual(FormatUnit(17133), '16.7G')
290
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
291

    
292
  def testTiB(self):
293
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
294
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
295
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
296

    
297

    
298
class TestParseUnit(unittest.TestCase):
299
  """Test case for the ParseUnit function"""
300

    
301
  SCALES = (('', 1),
302
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
303
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
304
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
305

    
306
  def testRounding(self):
307
    self.assertEqual(ParseUnit('0'), 0)
308
    self.assertEqual(ParseUnit('1'), 4)
309
    self.assertEqual(ParseUnit('2'), 4)
310
    self.assertEqual(ParseUnit('3'), 4)
311

    
312
    self.assertEqual(ParseUnit('124'), 124)
313
    self.assertEqual(ParseUnit('125'), 128)
314
    self.assertEqual(ParseUnit('126'), 128)
315
    self.assertEqual(ParseUnit('127'), 128)
316
    self.assertEqual(ParseUnit('128'), 128)
317
    self.assertEqual(ParseUnit('129'), 132)
318
    self.assertEqual(ParseUnit('130'), 132)
319

    
320
  def testFloating(self):
321
    self.assertEqual(ParseUnit('0'), 0)
322
    self.assertEqual(ParseUnit('0.5'), 4)
323
    self.assertEqual(ParseUnit('1.75'), 4)
324
    self.assertEqual(ParseUnit('1.99'), 4)
325
    self.assertEqual(ParseUnit('2.00'), 4)
326
    self.assertEqual(ParseUnit('2.01'), 4)
327
    self.assertEqual(ParseUnit('3.99'), 4)
328
    self.assertEqual(ParseUnit('4.00'), 4)
329
    self.assertEqual(ParseUnit('4.01'), 8)
330
    self.assertEqual(ParseUnit('1.5G'), 1536)
331
    self.assertEqual(ParseUnit('1.8G'), 1844)
332
    self.assertEqual(ParseUnit('8.28T'), 8682212)
333

    
334
  def testSuffixes(self):
335
    for sep in ('', ' ', '   ', "\t", "\t "):
336
      for suffix, scale in TestParseUnit.SCALES:
337
        for func in (lambda x: x, str.lower, str.upper):
338
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
339
                           1024 * scale)
340

    
341
  def testInvalidInput(self):
342
    for sep in ('-', '_', ',', 'a'):
343
      for suffix, _ in TestParseUnit.SCALES:
344
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
345

    
346
    for suffix, _ in TestParseUnit.SCALES:
347
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
348

    
349

    
350
class TestSshKeys(GanetiTestCase):
351
  """Test case for the AddAuthorizedKey function"""
352

    
353
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
354
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
355
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
356

    
357
  def setUp(self):
358
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
359
    try:
360
      handle = os.fdopen(fd, 'w')
361
      try:
362
        handle.write("%s\n" % TestSshKeys.KEY_A)
363
        handle.write("%s\n" % TestSshKeys.KEY_B)
364
      finally:
365
        handle.close()
366
    except:
367
      utils.RemoveFile(self.tmpname)
368
      raise
369

    
370
  def tearDown(self):
371
    utils.RemoveFile(self.tmpname)
372
    del self.tmpname
373

    
374
  def testAddingNewKey(self):
375
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
376

    
377
    self.assertFileContent(self.tmpname,
378
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
379
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
380
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
381
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
382

    
383
  def testAddingAlmostButNotCompletelyTheSameKey(self):
384
    AddAuthorizedKey(self.tmpname,
385
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
386

    
387
    self.assertFileContent(self.tmpname,
388
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
389
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
390
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
391
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
392

    
393
  def testAddingExistingKeyWithSomeMoreSpaces(self):
394
    AddAuthorizedKey(self.tmpname,
395
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
396

    
397
    self.assertFileContent(self.tmpname,
398
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
399
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
400
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
401

    
402
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
403
    RemoveAuthorizedKey(self.tmpname,
404
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
405

    
406
    self.assertFileContent(self.tmpname,
407
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
408
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
409

    
410
  def testRemovingNonExistingKey(self):
411
    RemoveAuthorizedKey(self.tmpname,
412
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
413

    
414
    self.assertFileContent(self.tmpname,
415
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
416
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
417
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
418

    
419

    
420
class TestEtcHosts(GanetiTestCase):
421
  """Test functions modifying /etc/hosts"""
422

    
423
  def setUp(self):
424
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
425
    try:
426
      handle = os.fdopen(fd, 'w')
427
      try:
428
        handle.write('# This is a test file for /etc/hosts\n')
429
        handle.write('127.0.0.1\tlocalhost\n')
430
        handle.write('192.168.1.1 router gw\n')
431
      finally:
432
        handle.close()
433
    except:
434
      utils.RemoveFile(self.tmpname)
435
      raise
436

    
437
  def tearDown(self):
438
    utils.RemoveFile(self.tmpname)
439
    del self.tmpname
440

    
441
  def testSettingNewIp(self):
442
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
443

    
444
    self.assertFileContent(self.tmpname,
445
      "# This is a test file for /etc/hosts\n"
446
      "127.0.0.1\tlocalhost\n"
447
      "192.168.1.1 router gw\n"
448
      "1.2.3.4\tmyhost.domain.tld myhost\n")
449

    
450
  def testSettingExistingIp(self):
451
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
452
                     ['myhost'])
453

    
454
    self.assertFileContent(self.tmpname,
455
      "# This is a test file for /etc/hosts\n"
456
      "127.0.0.1\tlocalhost\n"
457
      "192.168.1.1\tmyhost.domain.tld myhost\n")
458

    
459
  def testSettingDuplicateName(self):
460
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
461

    
462
    self.assertFileContent(self.tmpname,
463
      "# This is a test file for /etc/hosts\n"
464
      "127.0.0.1\tlocalhost\n"
465
      "192.168.1.1 router gw\n"
466
      "1.2.3.4\tmyhost\n")
467

    
468
  def testRemovingExistingHost(self):
469
    RemoveEtcHostsEntry(self.tmpname, 'router')
470

    
471
    self.assertFileContent(self.tmpname,
472
      "# This is a test file for /etc/hosts\n"
473
      "127.0.0.1\tlocalhost\n"
474
      "192.168.1.1 gw\n")
475

    
476
  def testRemovingSingleExistingHost(self):
477
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
478

    
479
    self.assertFileContent(self.tmpname,
480
      "# This is a test file for /etc/hosts\n"
481
      "192.168.1.1 router gw\n")
482

    
483
  def testRemovingNonExistingHost(self):
484
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
485

    
486
    self.assertFileContent(self.tmpname,
487
      "# This is a test file for /etc/hosts\n"
488
      "127.0.0.1\tlocalhost\n"
489
      "192.168.1.1 router gw\n")
490

    
491
  def testRemovingAlias(self):
492
    RemoveEtcHostsEntry(self.tmpname, 'gw')
493

    
494
    self.assertFileContent(self.tmpname,
495
      "# This is a test file for /etc/hosts\n"
496
      "127.0.0.1\tlocalhost\n"
497
      "192.168.1.1 router\n")
498

    
499

    
500
class TestShellQuoting(unittest.TestCase):
501
  """Test case for shell quoting functions"""
502

    
503
  def testShellQuote(self):
504
    self.assertEqual(ShellQuote('abc'), "abc")
505
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
506
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
507
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
508
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
509

    
510
  def testShellQuoteArgs(self):
511
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
512
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
513
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
514

    
515

    
516
class TestTcpPing(unittest.TestCase):
517
  """Testcase for TCP version of ping - against listen(2)ing port"""
518

    
519
  def setUp(self):
520
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
521
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
522
    self.listenerport = self.listener.getsockname()[1]
523
    self.listener.listen(1)
524

    
525
  def tearDown(self):
526
    self.listener.shutdown(socket.SHUT_RDWR)
527
    del self.listener
528
    del self.listenerport
529

    
530
  def testTcpPingToLocalHostAccept(self):
531
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
532
                         constants.LOCALHOST_IP_ADDRESS,
533
                         self.listenerport,
534
                         timeout=10,
535
                         live_port_needed=True),
536
                 "failed to connect to test listener")
537

    
538

    
539
class TestTcpPingDeaf(unittest.TestCase):
540
  """Testcase for TCP version of ping - against non listen(2)ing port"""
541

    
542
  def setUp(self):
543
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
544
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
545
    self.deaflistenerport = self.deaflistener.getsockname()[1]
546

    
547
  def tearDown(self):
548
    del self.deaflistener
549
    del self.deaflistenerport
550

    
551
  def testTcpPingToLocalHostAcceptDeaf(self):
552
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
553
                        constants.LOCALHOST_IP_ADDRESS,
554
                        self.deaflistenerport,
555
                        timeout=constants.TCP_PING_TIMEOUT,
556
                        live_port_needed=True), # need successful connect(2)
557
                "successfully connected to deaf listener")
558

    
559
  def testTcpPingToLocalHostNoAccept(self):
560
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
561
                         constants.LOCALHOST_IP_ADDRESS,
562
                         self.deaflistenerport,
563
                         timeout=constants.TCP_PING_TIMEOUT,
564
                         live_port_needed=False), # ECONNREFUSED is OK
565
                 "failed to ping alive host on deaf port")
566

    
567

    
568
class TestListVisibleFiles(unittest.TestCase):
569
  """Test case for ListVisibleFiles"""
570

    
571
  def setUp(self):
572
    self.path = tempfile.mkdtemp()
573

    
574
  def tearDown(self):
575
    shutil.rmtree(self.path)
576

    
577
  def _test(self, files, expected):
578
    # Sort a copy
579
    expected = expected[:]
580
    expected.sort()
581

    
582
    for name in files:
583
      f = open(os.path.join(self.path, name), 'w')
584
      try:
585
        f.write("Test\n")
586
      finally:
587
        f.close()
588

    
589
    found = ListVisibleFiles(self.path)
590
    found.sort()
591

    
592
    self.assertEqual(found, expected)
593

    
594
  def testAllVisible(self):
595
    files = ["a", "b", "c"]
596
    expected = files
597
    self._test(files, expected)
598

    
599
  def testNoneVisible(self):
600
    files = [".a", ".b", ".c"]
601
    expected = []
602
    self._test(files, expected)
603

    
604
  def testSomeVisible(self):
605
    files = ["a", "b", ".c"]
606
    expected = ["a", "b"]
607
    self._test(files, expected)
608

    
609

    
610
class TestNewUUID(unittest.TestCase):
611
  """Test case for NewUUID"""
612

    
613
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
614
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
615

    
616
  def runTest(self):
617
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
618

    
619

    
620
class TestUniqueSequence(unittest.TestCase):
621
  """Test case for UniqueSequence"""
622

    
623
  def _test(self, input, expected):
624
    self.assertEqual(utils.UniqueSequence(input), expected)
625

    
626
  def runTest(self):
627
    # Ordered input
628
    self._test([1, 2, 3], [1, 2, 3])
629
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
630
    self._test([1, 2, 2, 3], [1, 2, 3])
631
    self._test([1, 2, 3, 3], [1, 2, 3])
632

    
633
    # Unordered input
634
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
635
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
636

    
637
    # Strings
638
    self._test(["a", "a"], ["a"])
639
    self._test(["a", "b"], ["a", "b"])
640
    self._test(["a", "b", "a"], ["a", "b"])
641

    
642

    
643
if __name__ == '__main__':
644
  unittest.main()