Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ b15d625f

History | View | Annotate | Download (20.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import socket
32
import shutil
33
import re
34

    
35
import ganeti
36
import testutils
37
from ganeti import constants
38
from ganeti import utils
39
from ganeti.utils import IsProcessAlive, Lock, Unlock, RunCmd, \
40
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
41
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
42
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
43
     SetEtcHostsEntry, RemoveEtcHostsEntry
44
from ganeti.errors import LockError, UnitParseError
45

    
46

    
47
class TestIsProcessAlive(unittest.TestCase):
48
  """Testing case for IsProcessAlive"""
49
  def setUp(self):
50
    # create a zombie and a (hopefully) non-existing process id
51
    self.pid_zombie = os.fork()
52
    if self.pid_zombie == 0:
53
      os._exit(0)
54
    elif self.pid_zombie < 0:
55
      raise SystemError("can't fork")
56
    self.pid_non_existing = os.fork()
57
    if self.pid_non_existing == 0:
58
      os._exit(0)
59
    elif self.pid_non_existing > 0:
60
      os.waitpid(self.pid_non_existing, 0)
61
    else:
62
      raise SystemError("can't fork")
63

    
64

    
65
  def testExists(self):
66
    mypid = os.getpid()
67
    self.assert_(IsProcessAlive(mypid),
68
                 "can't find myself running")
69

    
70
  def testZombie(self):
71
    self.assert_(not IsProcessAlive(self.pid_zombie),
72
                 "zombie not detected as zombie")
73

    
74

    
75
  def testNotExisting(self):
76
    self.assert_(not IsProcessAlive(self.pid_non_existing),
77
                 "noexisting process detected")
78

    
79

    
80
class TestLocking(unittest.TestCase):
81
  """Testing case for the Lock/Unlock functions"""
82

    
83
  def setUp(self):
84
    lock_dir = tempfile.mkdtemp(prefix="ganeti.unittest.",
85
                                suffix=".locking")
86
    self.old_lock_dir = constants.LOCK_DIR
87
    constants.LOCK_DIR = lock_dir
88

    
89
  def tearDown(self):
90
    try:
91
      ganeti.utils.Unlock("unittest")
92
    except LockError:
93
      pass
94
    shutil.rmtree(constants.LOCK_DIR, ignore_errors=True)
95
    constants.LOCK_DIR = self.old_lock_dir
96

    
97
  def clean_lock(self, name):
98
    try:
99
      ganeti.utils.Unlock("unittest")
100
    except LockError:
101
      pass
102

    
103

    
104
  def testLock(self):
105
    self.clean_lock("unittest")
106
    self.assertEqual(None, Lock("unittest"))
107

    
108

    
109
  def testUnlock(self):
110
    self.clean_lock("unittest")
111
    ganeti.utils.Lock("unittest")
112
    self.assertEqual(None, Unlock("unittest"))
113

    
114
  def testDoubleLock(self):
115
    self.clean_lock("unittest")
116
    ganeti.utils.Lock("unittest")
117
    self.assertRaises(LockError, Lock, "unittest")
118

    
119

    
120
class TestRunCmd(unittest.TestCase):
121
  """Testing case for the RunCmd function"""
122

    
123
  def setUp(self):
124
    self.magic = time.ctime() + " ganeti test"
125

    
126
  def testOk(self):
127
    """Test successful exit code"""
128
    result = RunCmd("/bin/sh -c 'exit 0'")
129
    self.assertEqual(result.exit_code, 0)
130

    
131
  def testFail(self):
132
    """Test fail exit code"""
133
    result = RunCmd("/bin/sh -c 'exit 1'")
134
    self.assertEqual(result.exit_code, 1)
135

    
136

    
137
  def testStdout(self):
138
    """Test standard output"""
139
    cmd = 'echo -n "%s"' % self.magic
140
    result = RunCmd("/bin/sh -c '%s'" % cmd)
141
    self.assertEqual(result.stdout, self.magic)
142

    
143

    
144
  def testStderr(self):
145
    """Test standard error"""
146
    cmd = 'echo -n "%s"' % self.magic
147
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
148
    self.assertEqual(result.stderr, self.magic)
149

    
150

    
151
  def testCombined(self):
152
    """Test combined output"""
153
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
154
    result = RunCmd("/bin/sh -c '%s'" % cmd)
155
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
156

    
157
  def testSignal(self):
158
    """Test standard error"""
159
    result = RunCmd("/bin/sh -c 'kill -15 $$'")
160
    self.assertEqual(result.signal, 15)
161

    
162
  def testListRun(self):
163
    """Test list runs"""
164
    result = RunCmd(["true"])
165
    self.assertEqual(result.signal, None)
166
    self.assertEqual(result.exit_code, 0)
167
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
168
    self.assertEqual(result.signal, None)
169
    self.assertEqual(result.exit_code, 1)
170
    result = RunCmd(["echo", "-n", self.magic])
171
    self.assertEqual(result.signal, None)
172
    self.assertEqual(result.exit_code, 0)
173
    self.assertEqual(result.stdout, self.magic)
174

    
175
  def testLang(self):
176
    """Test locale environment"""
177
    old_env = os.environ.copy()
178
    try:
179
      os.environ["LANG"] = "en_US.UTF-8"
180
      os.environ["LC_ALL"] = "en_US.UTF-8"
181
      result = RunCmd(["locale"])
182
      for line in result.output.splitlines():
183
        key, value = line.split("=", 1)
184
        # Ignore these variables, they're overridden by LC_ALL
185
        if key == "LANG" or key == "LANGUAGE":
186
          continue
187
        self.failIf(value and value != "C" and value != '"C"',
188
            "Variable %s is set to the invalid value '%s'" % (key, value))
189
    finally:
190
      os.environ = old_env
191

    
192

    
193
class TestRemoveFile(unittest.TestCase):
194
  """Test case for the RemoveFile function"""
195

    
196
  def setUp(self):
197
    """Create a temp dir and file for each case"""
198
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
199
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
200
    os.close(fd)
201

    
202
  def tearDown(self):
203
    if os.path.exists(self.tmpfile):
204
      os.unlink(self.tmpfile)
205
    os.rmdir(self.tmpdir)
206

    
207

    
208
  def testIgnoreDirs(self):
209
    """Test that RemoveFile() ignores directories"""
210
    self.assertEqual(None, RemoveFile(self.tmpdir))
211

    
212

    
213
  def testIgnoreNotExisting(self):
214
    """Test that RemoveFile() ignores non-existing files"""
215
    RemoveFile(self.tmpfile)
216
    RemoveFile(self.tmpfile)
217

    
218

    
219
  def testRemoveFile(self):
220
    """Test that RemoveFile does remove a file"""
221
    RemoveFile(self.tmpfile)
222
    if os.path.exists(self.tmpfile):
223
      self.fail("File '%s' not removed" % self.tmpfile)
224

    
225

    
226
  def testRemoveSymlink(self):
227
    """Test that RemoveFile does remove symlinks"""
228
    symlink = self.tmpdir + "/symlink"
229
    os.symlink("no-such-file", symlink)
230
    RemoveFile(symlink)
231
    if os.path.exists(symlink):
232
      self.fail("File '%s' not removed" % symlink)
233
    os.symlink(self.tmpfile, symlink)
234
    RemoveFile(symlink)
235
    if os.path.exists(symlink):
236
      self.fail("File '%s' not removed" % symlink)
237

    
238

    
239
class TestCheckdict(unittest.TestCase):
240
  """Test case for the CheckDict function"""
241

    
242
  def testAdd(self):
243
    """Test that CheckDict adds a missing key with the correct value"""
244

    
245
    tgt = {'a':1}
246
    tmpl = {'b': 2}
247
    CheckDict(tgt, tmpl)
248
    if 'b' not in tgt or tgt['b'] != 2:
249
      self.fail("Failed to update dict")
250

    
251

    
252
  def testNoUpdate(self):
253
    """Test that CheckDict does not overwrite an existing key"""
254
    tgt = {'a':1, 'b': 3}
255
    tmpl = {'b': 2}
256
    CheckDict(tgt, tmpl)
257
    self.failUnlessEqual(tgt['b'], 3)
258

    
259

    
260
class TestMatchNameComponent(unittest.TestCase):
261
  """Test case for the MatchNameComponent function"""
262

    
263
  def testEmptyList(self):
264
    """Test that there is no match against an empty list"""
265

    
266
    self.failUnlessEqual(MatchNameComponent("", []), None)
267
    self.failUnlessEqual(MatchNameComponent("test", []), None)
268

    
269
  def testSingleMatch(self):
270
    """Test that a single match is performed correctly"""
271
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
272
    for key in "test2", "test2.example", "test2.example.com":
273
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
274

    
275
  def testMultipleMatches(self):
276
    """Test that a multiple match is returned as None"""
277
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
278
    for key in "test1", "test1.example":
279
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
280

    
281

    
282
class TestFormatUnit(unittest.TestCase):
283
  """Test case for the FormatUnit function"""
284

    
285
  def testMiB(self):
286
    self.assertEqual(FormatUnit(1), '1M')
287
    self.assertEqual(FormatUnit(100), '100M')
288
    self.assertEqual(FormatUnit(1023), '1023M')
289

    
290
  def testGiB(self):
291
    self.assertEqual(FormatUnit(1024), '1.0G')
292
    self.assertEqual(FormatUnit(1536), '1.5G')
293
    self.assertEqual(FormatUnit(17133), '16.7G')
294
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
295

    
296
  def testTiB(self):
297
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
298
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
299
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
300

    
301

    
302
class TestParseUnit(unittest.TestCase):
303
  """Test case for the ParseUnit function"""
304

    
305
  SCALES = (('', 1),
306
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
307
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
308
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
309

    
310
  def testRounding(self):
311
    self.assertEqual(ParseUnit('0'), 0)
312
    self.assertEqual(ParseUnit('1'), 4)
313
    self.assertEqual(ParseUnit('2'), 4)
314
    self.assertEqual(ParseUnit('3'), 4)
315

    
316
    self.assertEqual(ParseUnit('124'), 124)
317
    self.assertEqual(ParseUnit('125'), 128)
318
    self.assertEqual(ParseUnit('126'), 128)
319
    self.assertEqual(ParseUnit('127'), 128)
320
    self.assertEqual(ParseUnit('128'), 128)
321
    self.assertEqual(ParseUnit('129'), 132)
322
    self.assertEqual(ParseUnit('130'), 132)
323

    
324
  def testFloating(self):
325
    self.assertEqual(ParseUnit('0'), 0)
326
    self.assertEqual(ParseUnit('0.5'), 4)
327
    self.assertEqual(ParseUnit('1.75'), 4)
328
    self.assertEqual(ParseUnit('1.99'), 4)
329
    self.assertEqual(ParseUnit('2.00'), 4)
330
    self.assertEqual(ParseUnit('2.01'), 4)
331
    self.assertEqual(ParseUnit('3.99'), 4)
332
    self.assertEqual(ParseUnit('4.00'), 4)
333
    self.assertEqual(ParseUnit('4.01'), 8)
334
    self.assertEqual(ParseUnit('1.5G'), 1536)
335
    self.assertEqual(ParseUnit('1.8G'), 1844)
336
    self.assertEqual(ParseUnit('8.28T'), 8682212)
337

    
338
  def testSuffixes(self):
339
    for sep in ('', ' ', '   ', "\t", "\t "):
340
      for suffix, scale in TestParseUnit.SCALES:
341
        for func in (lambda x: x, str.lower, str.upper):
342
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
343
                           1024 * scale)
344

    
345
  def testInvalidInput(self):
346
    for sep in ('-', '_', ',', 'a'):
347
      for suffix, _ in TestParseUnit.SCALES:
348
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
349

    
350
    for suffix, _ in TestParseUnit.SCALES:
351
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
352

    
353

    
354
class TestSshKeys(testutils.GanetiTestCase):
355
  """Test case for the AddAuthorizedKey function"""
356

    
357
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
358
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
359
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
360

    
361
  def setUp(self):
362
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
363
    try:
364
      handle = os.fdopen(fd, 'w')
365
      try:
366
        handle.write("%s\n" % TestSshKeys.KEY_A)
367
        handle.write("%s\n" % TestSshKeys.KEY_B)
368
      finally:
369
        handle.close()
370
    except:
371
      utils.RemoveFile(self.tmpname)
372
      raise
373

    
374
  def tearDown(self):
375
    utils.RemoveFile(self.tmpname)
376
    del self.tmpname
377

    
378
  def testAddingNewKey(self):
379
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
380

    
381
    self.assertFileContent(self.tmpname,
382
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
383
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
384
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
385
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
386

    
387
  def testAddingAlmostButNotCompletelyTheSameKey(self):
388
    AddAuthorizedKey(self.tmpname,
389
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
390

    
391
    self.assertFileContent(self.tmpname,
392
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
393
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
394
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
395
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
396

    
397
  def testAddingExistingKeyWithSomeMoreSpaces(self):
398
    AddAuthorizedKey(self.tmpname,
399
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
400

    
401
    self.assertFileContent(self.tmpname,
402
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
403
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
404
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
405

    
406
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
407
    RemoveAuthorizedKey(self.tmpname,
408
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
409

    
410
    self.assertFileContent(self.tmpname,
411
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
412
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
413

    
414
  def testRemovingNonExistingKey(self):
415
    RemoveAuthorizedKey(self.tmpname,
416
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
417

    
418
    self.assertFileContent(self.tmpname,
419
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
420
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
421
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
422

    
423

    
424
class TestEtcHosts(testutils.GanetiTestCase):
425
  """Test functions modifying /etc/hosts"""
426

    
427
  def setUp(self):
428
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
429
    try:
430
      handle = os.fdopen(fd, 'w')
431
      try:
432
        handle.write('# This is a test file for /etc/hosts\n')
433
        handle.write('127.0.0.1\tlocalhost\n')
434
        handle.write('192.168.1.1 router gw\n')
435
      finally:
436
        handle.close()
437
    except:
438
      utils.RemoveFile(self.tmpname)
439
      raise
440

    
441
  def tearDown(self):
442
    utils.RemoveFile(self.tmpname)
443
    del self.tmpname
444

    
445
  def testSettingNewIp(self):
446
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
447

    
448
    self.assertFileContent(self.tmpname,
449
      "# This is a test file for /etc/hosts\n"
450
      "127.0.0.1\tlocalhost\n"
451
      "192.168.1.1 router gw\n"
452
      "1.2.3.4\tmyhost.domain.tld myhost\n")
453

    
454
  def testSettingExistingIp(self):
455
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
456
                     ['myhost'])
457

    
458
    self.assertFileContent(self.tmpname,
459
      "# This is a test file for /etc/hosts\n"
460
      "127.0.0.1\tlocalhost\n"
461
      "192.168.1.1\tmyhost.domain.tld myhost\n")
462

    
463
  def testSettingDuplicateName(self):
464
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
465

    
466
    self.assertFileContent(self.tmpname,
467
      "# This is a test file for /etc/hosts\n"
468
      "127.0.0.1\tlocalhost\n"
469
      "192.168.1.1 router gw\n"
470
      "1.2.3.4\tmyhost\n")
471

    
472
  def testRemovingExistingHost(self):
473
    RemoveEtcHostsEntry(self.tmpname, 'router')
474

    
475
    self.assertFileContent(self.tmpname,
476
      "# This is a test file for /etc/hosts\n"
477
      "127.0.0.1\tlocalhost\n"
478
      "192.168.1.1 gw\n")
479

    
480
  def testRemovingSingleExistingHost(self):
481
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
482

    
483
    self.assertFileContent(self.tmpname,
484
      "# This is a test file for /etc/hosts\n"
485
      "192.168.1.1 router gw\n")
486

    
487
  def testRemovingNonExistingHost(self):
488
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
489

    
490
    self.assertFileContent(self.tmpname,
491
      "# This is a test file for /etc/hosts\n"
492
      "127.0.0.1\tlocalhost\n"
493
      "192.168.1.1 router gw\n")
494

    
495
  def testRemovingAlias(self):
496
    RemoveEtcHostsEntry(self.tmpname, 'gw')
497

    
498
    self.assertFileContent(self.tmpname,
499
      "# This is a test file for /etc/hosts\n"
500
      "127.0.0.1\tlocalhost\n"
501
      "192.168.1.1 router\n")
502

    
503

    
504
class TestShellQuoting(unittest.TestCase):
505
  """Test case for shell quoting functions"""
506

    
507
  def testShellQuote(self):
508
    self.assertEqual(ShellQuote('abc'), "abc")
509
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
510
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
511
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
512
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
513

    
514
  def testShellQuoteArgs(self):
515
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
516
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
517
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
518

    
519

    
520
class TestTcpPing(unittest.TestCase):
521
  """Testcase for TCP version of ping - against listen(2)ing port"""
522

    
523
  def setUp(self):
524
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
525
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
526
    self.listenerport = self.listener.getsockname()[1]
527
    self.listener.listen(1)
528

    
529
  def tearDown(self):
530
    self.listener.shutdown(socket.SHUT_RDWR)
531
    del self.listener
532
    del self.listenerport
533

    
534
  def testTcpPingToLocalHostAccept(self):
535
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
536
                         self.listenerport,
537
                         timeout=10,
538
                         live_port_needed=True,
539
                         source=constants.LOCALHOST_IP_ADDRESS,
540
                         ),
541
                 "failed to connect to test listener")
542

    
543
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
544
                         self.listenerport,
545
                         timeout=10,
546
                         live_port_needed=True,
547
                         ),
548
                 "failed to connect to test listener (no source)")
549

    
550

    
551
class TestTcpPingDeaf(unittest.TestCase):
552
  """Testcase for TCP version of ping - against non listen(2)ing port"""
553

    
554
  def setUp(self):
555
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
556
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
557
    self.deaflistenerport = self.deaflistener.getsockname()[1]
558

    
559
  def tearDown(self):
560
    del self.deaflistener
561
    del self.deaflistenerport
562

    
563
  def testTcpPingToLocalHostAcceptDeaf(self):
564
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
565
                        self.deaflistenerport,
566
                        timeout=constants.TCP_PING_TIMEOUT,
567
                        live_port_needed=True,
568
                        source=constants.LOCALHOST_IP_ADDRESS,
569
                        ), # need successful connect(2)
570
                "successfully connected to deaf listener")
571

    
572
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
573
                        self.deaflistenerport,
574
                        timeout=constants.TCP_PING_TIMEOUT,
575
                        live_port_needed=True,
576
                        ), # need successful connect(2)
577
                "successfully connected to deaf listener (no source addr)")
578

    
579
  def testTcpPingToLocalHostNoAccept(self):
580
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
581
                         self.deaflistenerport,
582
                         timeout=constants.TCP_PING_TIMEOUT,
583
                         live_port_needed=False,
584
                         source=constants.LOCALHOST_IP_ADDRESS,
585
                         ), # ECONNREFUSED is OK
586
                 "failed to ping alive host on deaf port")
587

    
588
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
589
                         self.deaflistenerport,
590
                         timeout=constants.TCP_PING_TIMEOUT,
591
                         live_port_needed=False,
592
                         ), # ECONNREFUSED is OK
593
                 "failed to ping alive host on deaf port (no source addr)")
594

    
595

    
596
class TestListVisibleFiles(unittest.TestCase):
597
  """Test case for ListVisibleFiles"""
598

    
599
  def setUp(self):
600
    self.path = tempfile.mkdtemp()
601

    
602
  def tearDown(self):
603
    shutil.rmtree(self.path)
604

    
605
  def _test(self, files, expected):
606
    # Sort a copy
607
    expected = expected[:]
608
    expected.sort()
609

    
610
    for name in files:
611
      f = open(os.path.join(self.path, name), 'w')
612
      try:
613
        f.write("Test\n")
614
      finally:
615
        f.close()
616

    
617
    found = ListVisibleFiles(self.path)
618
    found.sort()
619

    
620
    self.assertEqual(found, expected)
621

    
622
  def testAllVisible(self):
623
    files = ["a", "b", "c"]
624
    expected = files
625
    self._test(files, expected)
626

    
627
  def testNoneVisible(self):
628
    files = [".a", ".b", ".c"]
629
    expected = []
630
    self._test(files, expected)
631

    
632
  def testSomeVisible(self):
633
    files = ["a", "b", ".c"]
634
    expected = ["a", "b"]
635
    self._test(files, expected)
636

    
637

    
638
class TestNewUUID(unittest.TestCase):
639
  """Test case for NewUUID"""
640

    
641
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
642
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
643

    
644
  def runTest(self):
645
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
646

    
647

    
648
class TestUniqueSequence(unittest.TestCase):
649
  """Test case for UniqueSequence"""
650

    
651
  def _test(self, input, expected):
652
    self.assertEqual(utils.UniqueSequence(input), expected)
653

    
654
  def runTest(self):
655
    # Ordered input
656
    self._test([1, 2, 3], [1, 2, 3])
657
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
658
    self._test([1, 2, 2, 3], [1, 2, 3])
659
    self._test([1, 2, 3, 3], [1, 2, 3])
660

    
661
    # Unordered input
662
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
663
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
664

    
665
    # Strings
666
    self._test(["a", "a"], ["a"])
667
    self._test(["a", "b"], ["a", "b"])
668
    self._test(["a", "b", "a"], ["a", "b"])
669

    
670

    
671
if __name__ == '__main__':
672
  unittest.main()