Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ f89f17a8

History | View | Annotate | Download (18.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import md5
30
import socket
31
import shutil
32
import re
33

    
34
import ganeti
35
from ganeti import constants
36
from ganeti import utils
37
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
38
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
39
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
40
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
41
     SetEtcHostsEntry, RemoveEtcHostsEntry
42
from ganeti.errors import LockError, UnitParseError
43

    
44

    
45
class GanetiTestCase(unittest.TestCase):
46
  def assertFileContent(self, file_name, content):
47
    """Checks the content of a file.
48

49
    """
50
    handle = open(file_name, 'r')
51
    try:
52
      self.assertEqual(handle.read(), content)
53
    finally:
54
      handle.close()
55

    
56

    
57
class TestIsProcessAlive(unittest.TestCase):
58
  """Testing case for IsProcessAlive"""
59
  def setUp(self):
60
    # create a zombie and a (hopefully) non-existing process id
61
    self.pid_zombie = os.fork()
62
    if self.pid_zombie == 0:
63
      os._exit(0)
64
    elif self.pid_zombie < 0:
65
      raise SystemError("can't fork")
66
    self.pid_non_existing = os.fork()
67
    if self.pid_non_existing == 0:
68
      os._exit(0)
69
    elif self.pid_non_existing > 0:
70
      os.waitpid(self.pid_non_existing, 0)
71
    else:
72
      raise SystemError("can't fork")
73

    
74

    
75
  def testExists(self):
76
    mypid = os.getpid()
77
    self.assert_(IsProcessAlive(mypid),
78
                 "can't find myself running")
79

    
80
  def testZombie(self):
81
    self.assert_(not IsProcessAlive(self.pid_zombie),
82
                 "zombie not detected as zombie")
83

    
84

    
85
  def testNotExisting(self):
86
    self.assert_(not IsProcessAlive(self.pid_non_existing),
87
                 "noexisting process detected")
88

    
89

    
90
class TestLocking(unittest.TestCase):
91
  """Testing case for the Lock/Unlock functions"""
92
  def clean_lock(self, name):
93
    try:
94
      ganeti.utils.Unlock("unittest")
95
    except LockError:
96
      pass
97

    
98

    
99
  def testLock(self):
100
    self.clean_lock("unittest")
101
    self.assertEqual(None, Lock("unittest"))
102

    
103

    
104
  def testUnlock(self):
105
    self.clean_lock("unittest")
106
    ganeti.utils.Lock("unittest")
107
    self.assertEqual(None, Unlock("unittest"))
108

    
109

    
110
  def testDoubleLock(self):
111
    self.clean_lock("unittest")
112
    ganeti.utils.Lock("unittest")
113
    self.assertRaises(LockError, Lock, "unittest")
114

    
115

    
116
class TestRunCmd(unittest.TestCase):
117
  """Testing case for the RunCmd function"""
118

    
119
  def setUp(self):
120
    self.magic = time.ctime() + " ganeti test"
121

    
122
  def testOk(self):
123
    """Test successful exit code"""
124
    result = RunCmd("/bin/sh -c 'exit 0'")
125
    self.assertEqual(result.exit_code, 0)
126

    
127
  def testFail(self):
128
    """Test fail exit code"""
129
    result = RunCmd("/bin/sh -c 'exit 1'")
130
    self.assertEqual(result.exit_code, 1)
131

    
132

    
133
  def testStdout(self):
134
    """Test standard output"""
135
    cmd = 'echo -n "%s"' % self.magic
136
    result = RunCmd("/bin/sh -c '%s'" % cmd)
137
    self.assertEqual(result.stdout, self.magic)
138

    
139

    
140
  def testStderr(self):
141
    """Test standard error"""
142
    cmd = 'echo -n "%s"' % self.magic
143
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
144
    self.assertEqual(result.stderr, self.magic)
145

    
146

    
147
  def testCombined(self):
148
    """Test combined output"""
149
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
150
    result = RunCmd("/bin/sh -c '%s'" % cmd)
151
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
152

    
153
  def testSignal(self):
154
    """Test standard error"""
155
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
156
    self.assertEqual(result.signal, 15)
157

    
158
  def testListRun(self):
159
    """Test list runs"""
160
    result = RunCmd(["true"])
161
    self.assertEqual(result.signal, None)
162
    self.assertEqual(result.exit_code, 0)
163
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
164
    self.assertEqual(result.signal, None)
165
    self.assertEqual(result.exit_code, 1)
166
    result = RunCmd(["echo", "-n", self.magic])
167
    self.assertEqual(result.signal, None)
168
    self.assertEqual(result.exit_code, 0)
169
    self.assertEqual(result.stdout, self.magic)
170

    
171
  def testLang(self):
172
    """Test locale environment"""
173
    old_env = os.environ.copy()
174
    try:
175
      os.environ["LANG"] = "en_US.UTF-8"
176
      os.environ["LC_ALL"] = "en_US.UTF-8"
177
      result = RunCmd(["locale"])
178
      for line in result.output.splitlines():
179
        key, value = line.split("=", 1)
180
        # Ignore these variables, they're overridden by LC_ALL
181
        if key == "LANG" or key == "LANGUAGE":
182
          continue
183
        self.failIf(value and value != "C" and value != '"C"',
184
            "Variable %s is set to the invalid value '%s'" % (key, value))
185
    finally:
186
      os.environ = old_env
187

    
188

    
189
class TestRemoveFile(unittest.TestCase):
190
  """Test case for the RemoveFile function"""
191

    
192
  def setUp(self):
193
    """Create a temp dir and file for each case"""
194
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
195
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
196
    os.close(fd)
197

    
198
  def tearDown(self):
199
    if os.path.exists(self.tmpfile):
200
      os.unlink(self.tmpfile)
201
    os.rmdir(self.tmpdir)
202

    
203

    
204
  def testIgnoreDirs(self):
205
    """Test that RemoveFile() ignores directories"""
206
    self.assertEqual(None, RemoveFile(self.tmpdir))
207

    
208

    
209
  def testIgnoreNotExisting(self):
210
    """Test that RemoveFile() ignores non-existing files"""
211
    RemoveFile(self.tmpfile)
212
    RemoveFile(self.tmpfile)
213

    
214

    
215
  def testRemoveFile(self):
216
    """Test that RemoveFile does remove a file"""
217
    RemoveFile(self.tmpfile)
218
    if os.path.exists(self.tmpfile):
219
      self.fail("File '%s' not removed" % self.tmpfile)
220

    
221

    
222
  def testRemoveSymlink(self):
223
    """Test that RemoveFile does remove symlinks"""
224
    symlink = self.tmpdir + "/symlink"
225
    os.symlink("no-such-file", symlink)
226
    RemoveFile(symlink)
227
    if os.path.exists(symlink):
228
      self.fail("File '%s' not removed" % symlink)
229
    os.symlink(self.tmpfile, symlink)
230
    RemoveFile(symlink)
231
    if os.path.exists(symlink):
232
      self.fail("File '%s' not removed" % symlink)
233

    
234

    
235
class TestCheckdict(unittest.TestCase):
236
  """Test case for the CheckDict function"""
237

    
238
  def testAdd(self):
239
    """Test that CheckDict adds a missing key with the correct value"""
240

    
241
    tgt = {'a':1}
242
    tmpl = {'b': 2}
243
    CheckDict(tgt, tmpl)
244
    if 'b' not in tgt or tgt['b'] != 2:
245
      self.fail("Failed to update dict")
246

    
247

    
248
  def testNoUpdate(self):
249
    """Test that CheckDict does not overwrite an existing key"""
250
    tgt = {'a':1, 'b': 3}
251
    tmpl = {'b': 2}
252
    CheckDict(tgt, tmpl)
253
    self.failUnlessEqual(tgt['b'], 3)
254

    
255

    
256
class TestMatchNameComponent(unittest.TestCase):
257
  """Test case for the MatchNameComponent function"""
258

    
259
  def testEmptyList(self):
260
    """Test that there is no match against an empty list"""
261

    
262
    self.failUnlessEqual(MatchNameComponent("", []), None)
263
    self.failUnlessEqual(MatchNameComponent("test", []), None)
264

    
265
  def testSingleMatch(self):
266
    """Test that a single match is performed correctly"""
267
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
268
    for key in "test2", "test2.example", "test2.example.com":
269
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
270

    
271
  def testMultipleMatches(self):
272
    """Test that a multiple match is returned as None"""
273
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
274
    for key in "test1", "test1.example":
275
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
276

    
277

    
278
class TestFormatUnit(unittest.TestCase):
279
  """Test case for the FormatUnit function"""
280

    
281
  def testMiB(self):
282
    self.assertEqual(FormatUnit(1), '1M')
283
    self.assertEqual(FormatUnit(100), '100M')
284
    self.assertEqual(FormatUnit(1023), '1023M')
285

    
286
  def testGiB(self):
287
    self.assertEqual(FormatUnit(1024), '1.0G')
288
    self.assertEqual(FormatUnit(1536), '1.5G')
289
    self.assertEqual(FormatUnit(17133), '16.7G')
290
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
291

    
292
  def testTiB(self):
293
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
294
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
295
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
296

    
297

    
298
class TestParseUnit(unittest.TestCase):
299
  """Test case for the ParseUnit function"""
300

    
301
  SCALES = (('', 1),
302
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
303
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
304
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
305

    
306
  def testRounding(self):
307
    self.assertEqual(ParseUnit('0'), 0)
308
    self.assertEqual(ParseUnit('1'), 4)
309
    self.assertEqual(ParseUnit('2'), 4)
310
    self.assertEqual(ParseUnit('3'), 4)
311

    
312
    self.assertEqual(ParseUnit('124'), 124)
313
    self.assertEqual(ParseUnit('125'), 128)
314
    self.assertEqual(ParseUnit('126'), 128)
315
    self.assertEqual(ParseUnit('127'), 128)
316
    self.assertEqual(ParseUnit('128'), 128)
317
    self.assertEqual(ParseUnit('129'), 132)
318
    self.assertEqual(ParseUnit('130'), 132)
319

    
320
  def testFloating(self):
321
    self.assertEqual(ParseUnit('0'), 0)
322
    self.assertEqual(ParseUnit('0.5'), 4)
323
    self.assertEqual(ParseUnit('1.75'), 4)
324
    self.assertEqual(ParseUnit('1.99'), 4)
325
    self.assertEqual(ParseUnit('2.00'), 4)
326
    self.assertEqual(ParseUnit('2.01'), 4)
327
    self.assertEqual(ParseUnit('3.99'), 4)
328
    self.assertEqual(ParseUnit('4.00'), 4)
329
    self.assertEqual(ParseUnit('4.01'), 8)
330
    self.assertEqual(ParseUnit('1.5G'), 1536)
331
    self.assertEqual(ParseUnit('1.8G'), 1844)
332
    self.assertEqual(ParseUnit('8.28T'), 8682212)
333

    
334
  def testSuffixes(self):
335
    for sep in ('', ' ', '   ', "\t", "\t "):
336
      for suffix, scale in TestParseUnit.SCALES:
337
        for func in (lambda x: x, str.lower, str.upper):
338
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
339
                           1024 * scale)
340

    
341
  def testInvalidInput(self):
342
    for sep in ('-', '_', ',', 'a'):
343
      for suffix, _ in TestParseUnit.SCALES:
344
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
345

    
346
    for suffix, _ in TestParseUnit.SCALES:
347
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
348

    
349

    
350
class TestSshKeys(GanetiTestCase):
351
  """Test case for the AddAuthorizedKey function"""
352

    
353
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
354
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
355
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
356

    
357
  def setUp(self):
358
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
359
    try:
360
      handle = os.fdopen(fd, 'w')
361
      try:
362
        handle.write("%s\n" % TestSshKeys.KEY_A)
363
        handle.write("%s\n" % TestSshKeys.KEY_B)
364
      finally:
365
        handle.close()
366
    except:
367
      utils.RemoveFile(self.tmpname)
368
      raise
369

    
370
  def tearDown(self):
371
    utils.RemoveFile(self.tmpname)
372
    del self.tmpname
373

    
374
  def testAddingNewKey(self):
375
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
376

    
377
    self.assertFileContent(self.tmpname,
378
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
379
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
380
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
381
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
382

    
383
  def testAddingAlmostButNotCompletelyTheSameKey(self):
384
    AddAuthorizedKey(self.tmpname,
385
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
386

    
387
    self.assertFileContent(self.tmpname,
388
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
389
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
390
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
391
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
392

    
393
  def testAddingExistingKeyWithSomeMoreSpaces(self):
394
    AddAuthorizedKey(self.tmpname,
395
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
396

    
397
    self.assertFileContent(self.tmpname,
398
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
399
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
400
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
401

    
402
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
403
    RemoveAuthorizedKey(self.tmpname,
404
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
405

    
406
    self.assertFileContent(self.tmpname,
407
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
408
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
409

    
410
  def testRemovingNonExistingKey(self):
411
    RemoveAuthorizedKey(self.tmpname,
412
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
413

    
414
    self.assertFileContent(self.tmpname,
415
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
416
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
417
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
418

    
419

    
420
class TestEtcHosts(GanetiTestCase):
421
  """Test functions modifying /etc/hosts"""
422

    
423
  def setUp(self):
424
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
425
    try:
426
      handle = os.fdopen(fd, 'w')
427
      try:
428
        handle.write('# This is a test file for /etc/hosts\n')
429
        handle.write('127.0.0.1\tlocalhost\n')
430
        handle.write('192.168.1.1 router gw\n')
431
      finally:
432
        handle.close()
433
    except:
434
      utils.RemoveFile(self.tmpname)
435
      raise
436

    
437
  def tearDown(self):
438
    utils.RemoveFile(self.tmpname)
439
    del self.tmpname
440

    
441
  def testSettingNewIp(self):
442
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
443

    
444
    self.assertFileContent(self.tmpname,
445
      "# This is a test file for /etc/hosts\n"
446
      "127.0.0.1\tlocalhost\n"
447
      "192.168.1.1 router gw\n"
448
      "1.2.3.4\tmyhost.domain.tld myhost\n")
449

    
450
  def testSettingExistingIp(self):
451
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
452
                     ['myhost'])
453

    
454
    self.assertFileContent(self.tmpname,
455
      "# This is a test file for /etc/hosts\n"
456
      "127.0.0.1\tlocalhost\n"
457
      "192.168.1.1\tmyhost.domain.tld myhost\n")
458

    
459
  def testRemovingExistingHost(self):
460
    RemoveEtcHostsEntry(self.tmpname, 'router')
461

    
462
    self.assertFileContent(self.tmpname,
463
      "# This is a test file for /etc/hosts\n"
464
      "127.0.0.1\tlocalhost\n"
465
      "192.168.1.1 gw\n")
466

    
467
  def testRemovingSingleExistingHost(self):
468
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
469

    
470
    self.assertFileContent(self.tmpname,
471
      "# This is a test file for /etc/hosts\n"
472
      "192.168.1.1 router gw\n")
473

    
474
  def testRemovingNonExistingHost(self):
475
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
476

    
477
    self.assertFileContent(self.tmpname,
478
      "# This is a test file for /etc/hosts\n"
479
      "127.0.0.1\tlocalhost\n"
480
      "192.168.1.1 router gw\n")
481

    
482
  def testRemovingAlias(self):
483
    RemoveEtcHostsEntry(self.tmpname, 'gw')
484

    
485
    self.assertFileContent(self.tmpname,
486
      "# This is a test file for /etc/hosts\n"
487
      "127.0.0.1\tlocalhost\n"
488
      "192.168.1.1 router\n")
489

    
490

    
491
class TestShellQuoting(unittest.TestCase):
492
  """Test case for shell quoting functions"""
493

    
494
  def testShellQuote(self):
495
    self.assertEqual(ShellQuote('abc'), "abc")
496
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
497
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
498
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
499
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
500

    
501
  def testShellQuoteArgs(self):
502
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
503
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
504
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
505

    
506

    
507
class TestTcpPing(unittest.TestCase):
508
  """Testcase for TCP version of ping - against listen(2)ing port"""
509

    
510
  def setUp(self):
511
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
512
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
513
    self.listenerport = self.listener.getsockname()[1]
514
    self.listener.listen(1)
515

    
516
  def tearDown(self):
517
    self.listener.shutdown(socket.SHUT_RDWR)
518
    del self.listener
519
    del self.listenerport
520

    
521
  def testTcpPingToLocalHostAccept(self):
522
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
523
                         constants.LOCALHOST_IP_ADDRESS,
524
                         self.listenerport,
525
                         timeout=10,
526
                         live_port_needed=True),
527
                 "failed to connect to test listener")
528

    
529

    
530
class TestTcpPingDeaf(unittest.TestCase):
531
  """Testcase for TCP version of ping - against non listen(2)ing port"""
532

    
533
  def setUp(self):
534
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
535
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
536
    self.deaflistenerport = self.deaflistener.getsockname()[1]
537

    
538
  def tearDown(self):
539
    del self.deaflistener
540
    del self.deaflistenerport
541

    
542
  def testTcpPingToLocalHostAcceptDeaf(self):
543
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
544
                        constants.LOCALHOST_IP_ADDRESS,
545
                        self.deaflistenerport,
546
                        timeout=constants.TCP_PING_TIMEOUT,
547
                        live_port_needed=True), # need successful connect(2)
548
                "successfully connected to deaf listener")
549

    
550
  def testTcpPingToLocalHostNoAccept(self):
551
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
552
                         constants.LOCALHOST_IP_ADDRESS,
553
                         self.deaflistenerport,
554
                         timeout=constants.TCP_PING_TIMEOUT,
555
                         live_port_needed=False), # ECONNREFUSED is OK
556
                 "failed to ping alive host on deaf port")
557

    
558

    
559
class TestListVisibleFiles(unittest.TestCase):
560
  """Test case for ListVisibleFiles"""
561

    
562
  def setUp(self):
563
    self.path = tempfile.mkdtemp()
564

    
565
  def tearDown(self):
566
    shutil.rmtree(self.path)
567

    
568
  def _test(self, files, expected):
569
    # Sort a copy
570
    expected = expected[:]
571
    expected.sort()
572

    
573
    for name in files:
574
      f = open(os.path.join(self.path, name), 'w')
575
      try:
576
        f.write("Test\n")
577
      finally:
578
        f.close()
579

    
580
    found = ListVisibleFiles(self.path)
581
    found.sort()
582

    
583
    self.assertEqual(found, expected)
584

    
585
  def testAllVisible(self):
586
    files = ["a", "b", "c"]
587
    expected = files
588
    self._test(files, expected)
589

    
590
  def testNoneVisible(self):
591
    files = [".a", ".b", ".c"]
592
    expected = []
593
    self._test(files, expected)
594

    
595
  def testSomeVisible(self):
596
    files = ["a", "b", ".c"]
597
    expected = ["a", "b"]
598
    self._test(files, expected)
599

    
600

    
601
class TestNewUUID(unittest.TestCase):
602
  """Test case for NewUUID"""
603

    
604
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
605
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
606

    
607
  def runTest(self):
608
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
609

    
610

    
611
if __name__ == '__main__':
612
  unittest.main()