Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 9440aeab

History | View | Annotate | Download (19.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31
import shutil
32
import re
33

    
34
import ganeti
35
from ganeti import constants
36
from ganeti import utils
37
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
38
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
39
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
40
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
41
     SetEtcHostsEntry, RemoveEtcHostsEntry
42
from ganeti.errors import LockError, UnitParseError
43

    
44

    
45
class TestIsProcessAlive(unittest.TestCase):
46
  """Testing case for IsProcessAlive"""
47
  def setUp(self):
48
    # create a zombie and a (hopefully) non-existing process id
49
    self.pid_zombie = os.fork()
50
    if self.pid_zombie == 0:
51
      os._exit(0)
52
    elif self.pid_zombie < 0:
53
      raise SystemError("can't fork")
54
    self.pid_non_existing = os.fork()
55
    if self.pid_non_existing == 0:
56
      os._exit(0)
57
    elif self.pid_non_existing > 0:
58
      os.waitpid(self.pid_non_existing, 0)
59
    else:
60
      raise SystemError("can't fork")
61

    
62

    
63
  def testExists(self):
64
    mypid = os.getpid()
65
    self.assert_(IsProcessAlive(mypid),
66
                 "can't find myself running")
67

    
68
  def testZombie(self):
69
    self.assert_(not IsProcessAlive(self.pid_zombie),
70
                 "zombie not detected as zombie")
71

    
72

    
73
  def testNotExisting(self):
74
    self.assert_(not IsProcessAlive(self.pid_non_existing),
75
                 "noexisting process detected")
76

    
77

    
78
class TestLocking(unittest.TestCase):
79
  """Testing case for the Lock/Unlock functions"""
80
  def clean_lock(self, name):
81
    try:
82
      ganeti.utils.Unlock("unittest")
83
    except LockError:
84
      pass
85

    
86

    
87
  def testLock(self):
88
    self.clean_lock("unittest")
89
    self.assertEqual(None, Lock("unittest"))
90

    
91

    
92
  def testUnlock(self):
93
    self.clean_lock("unittest")
94
    ganeti.utils.Lock("unittest")
95
    self.assertEqual(None, Unlock("unittest"))
96

    
97

    
98
  def testDoubleLock(self):
99
    self.clean_lock("unittest")
100
    ganeti.utils.Lock("unittest")
101
    self.assertRaises(LockError, Lock, "unittest")
102

    
103

    
104
class TestRunCmd(unittest.TestCase):
105
  """Testing case for the RunCmd function"""
106

    
107
  def setUp(self):
108
    self.magic = time.ctime() + " ganeti test"
109

    
110
  def testOk(self):
111
    """Test successful exit code"""
112
    result = RunCmd("/bin/sh -c 'exit 0'")
113
    self.assertEqual(result.exit_code, 0)
114

    
115
  def testFail(self):
116
    """Test fail exit code"""
117
    result = RunCmd("/bin/sh -c 'exit 1'")
118
    self.assertEqual(result.exit_code, 1)
119

    
120

    
121
  def testStdout(self):
122
    """Test standard output"""
123
    cmd = 'echo -n "%s"' % self.magic
124
    result = RunCmd("/bin/sh -c '%s'" % cmd)
125
    self.assertEqual(result.stdout, self.magic)
126

    
127

    
128
  def testStderr(self):
129
    """Test standard error"""
130
    cmd = 'echo -n "%s"' % self.magic
131
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
132
    self.assertEqual(result.stderr, self.magic)
133

    
134

    
135
  def testCombined(self):
136
    """Test combined output"""
137
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
138
    result = RunCmd("/bin/sh -c '%s'" % cmd)
139
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
140

    
141
  def testSignal(self):
142
    """Test standard error"""
143
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
144
    self.assertEqual(result.signal, 15)
145

    
146
  def testListRun(self):
147
    """Test list runs"""
148
    result = RunCmd(["true"])
149
    self.assertEqual(result.signal, None)
150
    self.assertEqual(result.exit_code, 0)
151
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
152
    self.assertEqual(result.signal, None)
153
    self.assertEqual(result.exit_code, 1)
154
    result = RunCmd(["echo", "-n", self.magic])
155
    self.assertEqual(result.signal, None)
156
    self.assertEqual(result.exit_code, 0)
157
    self.assertEqual(result.stdout, self.magic)
158

    
159
  def testLang(self):
160
    """Test locale environment"""
161
    old_env = os.environ.copy()
162
    try:
163
      os.environ["LANG"] = "en_US.UTF-8"
164
      os.environ["LC_ALL"] = "en_US.UTF-8"
165
      result = RunCmd(["locale"])
166
      for line in result.output.splitlines():
167
        key, value = line.split("=", 1)
168
        # Ignore these variables, they're overridden by LC_ALL
169
        if key == "LANG" or key == "LANGUAGE":
170
          continue
171
        self.failIf(value and value != "C" and value != '"C"',
172
            "Variable %s is set to the invalid value '%s'" % (key, value))
173
    finally:
174
      os.environ = old_env
175

    
176

    
177
class TestRemoveFile(unittest.TestCase):
178
  """Test case for the RemoveFile function"""
179

    
180
  def setUp(self):
181
    """Create a temp dir and file for each case"""
182
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
183
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
184
    os.close(fd)
185

    
186
  def tearDown(self):
187
    if os.path.exists(self.tmpfile):
188
      os.unlink(self.tmpfile)
189
    os.rmdir(self.tmpdir)
190

    
191

    
192
  def testIgnoreDirs(self):
193
    """Test that RemoveFile() ignores directories"""
194
    self.assertEqual(None, RemoveFile(self.tmpdir))
195

    
196

    
197
  def testIgnoreNotExisting(self):
198
    """Test that RemoveFile() ignores non-existing files"""
199
    RemoveFile(self.tmpfile)
200
    RemoveFile(self.tmpfile)
201

    
202

    
203
  def testRemoveFile(self):
204
    """Test that RemoveFile does remove a file"""
205
    RemoveFile(self.tmpfile)
206
    if os.path.exists(self.tmpfile):
207
      self.fail("File '%s' not removed" % self.tmpfile)
208

    
209

    
210
  def testRemoveSymlink(self):
211
    """Test that RemoveFile does remove symlinks"""
212
    symlink = self.tmpdir + "/symlink"
213
    os.symlink("no-such-file", symlink)
214
    RemoveFile(symlink)
215
    if os.path.exists(symlink):
216
      self.fail("File '%s' not removed" % symlink)
217
    os.symlink(self.tmpfile, symlink)
218
    RemoveFile(symlink)
219
    if os.path.exists(symlink):
220
      self.fail("File '%s' not removed" % symlink)
221

    
222

    
223
class TestCheckdict(unittest.TestCase):
224
  """Test case for the CheckDict function"""
225

    
226
  def testAdd(self):
227
    """Test that CheckDict adds a missing key with the correct value"""
228

    
229
    tgt = {'a':1}
230
    tmpl = {'b': 2}
231
    CheckDict(tgt, tmpl)
232
    if 'b' not in tgt or tgt['b'] != 2:
233
      self.fail("Failed to update dict")
234

    
235

    
236
  def testNoUpdate(self):
237
    """Test that CheckDict does not overwrite an existing key"""
238
    tgt = {'a':1, 'b': 3}
239
    tmpl = {'b': 2}
240
    CheckDict(tgt, tmpl)
241
    self.failUnlessEqual(tgt['b'], 3)
242

    
243

    
244
class TestMatchNameComponent(unittest.TestCase):
245
  """Test case for the MatchNameComponent function"""
246

    
247
  def testEmptyList(self):
248
    """Test that there is no match against an empty list"""
249

    
250
    self.failUnlessEqual(MatchNameComponent("", []), None)
251
    self.failUnlessEqual(MatchNameComponent("test", []), None)
252

    
253
  def testSingleMatch(self):
254
    """Test that a single match is performed correctly"""
255
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
256
    for key in "test2", "test2.example", "test2.example.com":
257
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
258

    
259
  def testMultipleMatches(self):
260
    """Test that a multiple match is returned as None"""
261
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
262
    for key in "test1", "test1.example":
263
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
264

    
265

    
266
class TestFormatUnit(unittest.TestCase):
267
  """Test case for the FormatUnit function"""
268

    
269
  def testMiB(self):
270
    self.assertEqual(FormatUnit(1), '1M')
271
    self.assertEqual(FormatUnit(100), '100M')
272
    self.assertEqual(FormatUnit(1023), '1023M')
273

    
274
  def testGiB(self):
275
    self.assertEqual(FormatUnit(1024), '1.0G')
276
    self.assertEqual(FormatUnit(1536), '1.5G')
277
    self.assertEqual(FormatUnit(17133), '16.7G')
278
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
279

    
280
  def testTiB(self):
281
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
282
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
283
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
284

    
285

    
286
class TestParseUnit(unittest.TestCase):
287
  """Test case for the ParseUnit function"""
288

    
289
  SCALES = (('', 1),
290
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
291
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
292
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
293

    
294
  def testRounding(self):
295
    self.assertEqual(ParseUnit('0'), 0)
296
    self.assertEqual(ParseUnit('1'), 4)
297
    self.assertEqual(ParseUnit('2'), 4)
298
    self.assertEqual(ParseUnit('3'), 4)
299

    
300
    self.assertEqual(ParseUnit('124'), 124)
301
    self.assertEqual(ParseUnit('125'), 128)
302
    self.assertEqual(ParseUnit('126'), 128)
303
    self.assertEqual(ParseUnit('127'), 128)
304
    self.assertEqual(ParseUnit('128'), 128)
305
    self.assertEqual(ParseUnit('129'), 132)
306
    self.assertEqual(ParseUnit('130'), 132)
307

    
308
  def testFloating(self):
309
    self.assertEqual(ParseUnit('0'), 0)
310
    self.assertEqual(ParseUnit('0.5'), 4)
311
    self.assertEqual(ParseUnit('1.75'), 4)
312
    self.assertEqual(ParseUnit('1.99'), 4)
313
    self.assertEqual(ParseUnit('2.00'), 4)
314
    self.assertEqual(ParseUnit('2.01'), 4)
315
    self.assertEqual(ParseUnit('3.99'), 4)
316
    self.assertEqual(ParseUnit('4.00'), 4)
317
    self.assertEqual(ParseUnit('4.01'), 8)
318
    self.assertEqual(ParseUnit('1.5G'), 1536)
319
    self.assertEqual(ParseUnit('1.8G'), 1844)
320
    self.assertEqual(ParseUnit('8.28T'), 8682212)
321

    
322
  def testSuffixes(self):
323
    for sep in ('', ' ', '   ', "\t", "\t "):
324
      for suffix, scale in TestParseUnit.SCALES:
325
        for func in (lambda x: x, str.lower, str.upper):
326
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)), 1024 * scale)
327

    
328
  def testInvalidInput(self):
329
    for sep in ('-', '_', ',', 'a'):
330
      for suffix, _ in TestParseUnit.SCALES:
331
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
332

    
333
    for suffix, _ in TestParseUnit.SCALES:
334
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
335

    
336

    
337
class TestSshKeys(unittest.TestCase):
338
  """Test case for the AddAuthorizedKey function"""
339

    
340
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
341
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
342
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
343

    
344
  # NOTE: The MD5 sums below were calculated after manually
345
  #       checking the output files.
346

    
347
  def writeTestFile(self):
348
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
349
    f = os.fdopen(fd, 'w')
350
    try:
351
      f.write(TestSshKeys.KEY_A)
352
      f.write("\n")
353
      f.write(TestSshKeys.KEY_B)
354
      f.write("\n")
355
    finally:
356
      f.close()
357

    
358
    return tmpname
359

    
360
  def testAddingNewKey(self):
361
    tmpname = self.writeTestFile()
362
    try:
363
      AddAuthorizedKey(tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
364

    
365
      f = open(tmpname, 'r')
366
      try:
367
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
368
                         'ccc71523108ca6e9d0343797dc3e9f16')
369
      finally:
370
        f.close()
371
    finally:
372
      os.unlink(tmpname)
373

    
374
  def testAddingAlmostButNotCompletlyTheSameKey(self):
375
    tmpname = self.writeTestFile()
376
    try:
377
      AddAuthorizedKey(tmpname,
378
          'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
379

    
380
      f = open(tmpname, 'r')
381
      try:
382
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
383
                         'f2c939d57addb5b3a6846884be896b46')
384
      finally:
385
        f.close()
386
    finally:
387
      os.unlink(tmpname)
388

    
389
  def testAddingExistingKeyWithSomeMoreSpaces(self):
390
    tmpname = self.writeTestFile()
391
    try:
392
      AddAuthorizedKey(tmpname,
393
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
394

    
395
      f = open(tmpname, 'r')
396
      try:
397
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
398
                         '4e612764808bd46337eb0f575415fc30')
399
      finally:
400
        f.close()
401
    finally:
402
      os.unlink(tmpname)
403

    
404
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
405
    tmpname = self.writeTestFile()
406
    try:
407
      RemoveAuthorizedKey(tmpname,
408
          'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
409

    
410
      f = open(tmpname, 'r')
411
      try:
412
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
413
                         '77516d987fca07f70e30b830b3e4f2ed')
414
      finally:
415
        f.close()
416
    finally:
417
      os.unlink(tmpname)
418

    
419
  def testRemovingNonExistingKey(self):
420
    tmpname = self.writeTestFile()
421
    try:
422
      RemoveAuthorizedKey(tmpname,
423
          'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
424

    
425
      f = open(tmpname, 'r')
426
      try:
427
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
428
                         '4e612764808bd46337eb0f575415fc30')
429
      finally:
430
        f.close()
431
    finally:
432
      os.unlink(tmpname)
433

    
434

    
435
class TestEtcHosts(unittest.TestCase):
436
  """Test functions modifying /etc/hosts"""
437

    
438
  def writeTestFile(self):
439
    (fd, tmpname) = tempfile.mkstemp(prefix = 'ganeti-test')
440
    f = os.fdopen(fd, 'w')
441
    try:
442
      f.write('# This is a test file for /etc/hosts\n')
443
      f.write('127.0.0.1\tlocalhost\n')
444
      f.write('192.168.1.1 router gw\n')
445
    finally:
446
      f.close()
447

    
448
    return tmpname
449

    
450
  def testSettingNewIp(self):
451
    tmpname = self.writeTestFile()
452
    try:
453
      SetEtcHostsEntry(tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
454

    
455
      f = open(tmpname, 'r')
456
      try:
457
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
458
                         '410c141dcafffd505f662a41713d2eab')
459
      finally:
460
        f.close()
461
    finally:
462
      os.unlink(tmpname)
463

    
464
  def testSettingExistingIp(self):
465
    tmpname = self.writeTestFile()
466
    try:
467
      SetEtcHostsEntry(tmpname, '192.168.1.1', 'myhost.domain.tld', ['myhost'])
468

    
469
      f = open(tmpname, 'r')
470
      try:
471
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
472
                         'bbf60c542dec949f3968b59522ec0d7b')
473
      finally:
474
        f.close()
475
    finally:
476
      os.unlink(tmpname)
477

    
478
  def testRemovingExistingHost(self):
479
    tmpname = self.writeTestFile()
480
    try:
481
      RemoveEtcHostsEntry(tmpname, 'router')
482

    
483
      f = open(tmpname, 'r')
484
      try:
485
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
486
                         '8b09207a23709d60240674601a3548b2')
487
      finally:
488
        f.close()
489
    finally:
490
      os.unlink(tmpname)
491

    
492
  def testRemovingSingleExistingHost(self):
493
    tmpname = self.writeTestFile()
494
    try:
495
      RemoveEtcHostsEntry(tmpname, 'localhost')
496

    
497
      f = open(tmpname, 'r')
498
      try:
499
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
500
                         'ec4e4589b56f82fdb88db5675de011b1')
501
      finally:
502
        f.close()
503
    finally:
504
      os.unlink(tmpname)
505

    
506
  def testRemovingNonExistingHost(self):
507
    tmpname = self.writeTestFile()
508
    try:
509
      RemoveEtcHostsEntry(tmpname, 'myhost')
510

    
511
      f = open(tmpname, 'r')
512
      try:
513
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
514
                         'aa005bddc6d9ee399c296953f194929e')
515
      finally:
516
        f.close()
517
    finally:
518
      os.unlink(tmpname)
519

    
520
  def testRemovingAlias(self):
521
    tmpname = self.writeTestFile()
522
    try:
523
      RemoveEtcHostsEntry(tmpname, 'gw')
524

    
525
      f = open(tmpname, 'r')
526
      try:
527
        self.assertEqual(md5.new(f.read(8192)).hexdigest(),
528
                         '156dd3980a17b2ef934e2d0bf670aca2')
529
      finally:
530
        f.close()
531
    finally:
532
      os.unlink(tmpname)
533

    
534

    
535
class TestShellQuoting(unittest.TestCase):
536
  """Test case for shell quoting functions"""
537

    
538
  def testShellQuote(self):
539
    self.assertEqual(ShellQuote('abc'), "abc")
540
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
541
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
542
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
543
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
544

    
545
  def testShellQuoteArgs(self):
546
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
547
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
548
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
549

    
550

    
551
class TestTcpPing(unittest.TestCase):
552
  """Testcase for TCP version of ping - against listen(2)ing port"""
553

    
554
  def setUp(self):
555
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
556
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
557
    self.listenerport = self.listener.getsockname()[1]
558
    self.listener.listen(1)
559

    
560
  def tearDown(self):
561
    self.listener.shutdown(socket.SHUT_RDWR)
562
    del self.listener
563
    del self.listenerport
564

    
565
  def testTcpPingToLocalHostAccept(self):
566
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
567
                         constants.LOCALHOST_IP_ADDRESS,
568
                         self.listenerport,
569
                         timeout=10,
570
                         live_port_needed=True),
571
                 "failed to connect to test listener")
572

    
573

    
574
class TestTcpPingDeaf(unittest.TestCase):
575
  """Testcase for TCP version of ping - against non listen(2)ing port"""
576

    
577
  def setUp(self):
578
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
579
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
580
    self.deaflistenerport = self.deaflistener.getsockname()[1]
581

    
582
  def tearDown(self):
583
    del self.deaflistener
584
    del self.deaflistenerport
585

    
586
  def testTcpPingToLocalHostAcceptDeaf(self):
587
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
588
                        constants.LOCALHOST_IP_ADDRESS,
589
                        self.deaflistenerport,
590
                        timeout=constants.TCP_PING_TIMEOUT,
591
                        live_port_needed=True), # need successful connect(2)
592
                "successfully connected to deaf listener")
593

    
594
  def testTcpPingToLocalHostNoAccept(self):
595
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
596
                         constants.LOCALHOST_IP_ADDRESS,
597
                         self.deaflistenerport,
598
                         timeout=constants.TCP_PING_TIMEOUT,
599
                         live_port_needed=False), # ECONNREFUSED is OK
600
                 "failed to ping alive host on deaf port")
601

    
602

    
603
class TestListVisibleFiles(unittest.TestCase):
604
  """Test case for ListVisibleFiles"""
605

    
606
  def setUp(self):
607
    self.path = tempfile.mkdtemp()
608

    
609
  def tearDown(self):
610
    shutil.rmtree(self.path)
611

    
612
  def _test(self, files, expected):
613
    # Sort a copy
614
    expected = expected[:]
615
    expected.sort()
616

    
617
    for name in files:
618
      f = open(os.path.join(self.path, name), 'w')
619
      try:
620
        f.write("Test\n")
621
      finally:
622
        f.close()
623

    
624
    found = ListVisibleFiles(self.path)
625
    found.sort()
626

    
627
    self.assertEqual(found, expected)
628

    
629
  def testAllVisible(self):
630
    files = ["a", "b", "c"]
631
    expected = files
632
    self._test(files, expected)
633

    
634
  def testNoneVisible(self):
635
    files = [".a", ".b", ".c"]
636
    expected = []
637
    self._test(files, expected)
638

    
639
  def testSomeVisible(self):
640
    files = ["a", "b", ".c"]
641
    expected = ["a", "b"]
642
    self._test(files, expected)
643

    
644

    
645
class TestNewUUID(unittest.TestCase):
646
  """Test case for NewUUID"""
647

    
648
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
649
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
650

    
651
  def runTest(self):
652
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
653

    
654

    
655
if __name__ == '__main__':
656
  unittest.main()