Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ a0638838

History | View | Annotate | Download (21.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import tempfile
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti.utils import IsProcessAlive, RunCmd, \
42
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree
46
from ganeti.errors import LockError, UnitParseError
47

    
48
def _ChildHandler(signal, stack):
49
  global _ChildFlag
50
  _ChildFlag = True
51

    
52
class TestIsProcessAlive(unittest.TestCase):
53
  """Testing case for IsProcessAlive"""
54

    
55
  def setUp(self):
56
    global _ChildFlag
57
    # create a (most probably) non-existing process-id
58
    self.pid_non_existing = os.fork()
59
    if self.pid_non_existing == 0:
60
      os._exit(0)
61
    elif self.pid_non_existing > 0:
62
      os.waitpid(self.pid_non_existing, 0)
63
    else:
64
      raise SystemError("can't fork")
65
    _ChildFlag = False
66
    # Use _ChildHandler for SIGCHLD
67
    self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
68
    # create a zombie
69
    self.pid_zombie = os.fork()
70
    if self.pid_zombie == 0:
71
      os._exit(0)
72
    elif self.pid_zombie < 0:
73
      raise SystemError("can't fork")
74

    
75
  def tearDown(self):
76
    signal.signal(signal.SIGCHLD, self.chldOrig)
77

    
78
  def testExists(self):
79
    mypid = os.getpid()
80
    self.assert_(IsProcessAlive(mypid),
81
                 "can't find myself running")
82

    
83
  def testZombie(self):
84
    global _ChildFlag
85
    timeout = 10
86

    
87
    while not _ChildFlag:
88
      if timeout >= 0:
89
        time.sleep(0.2)
90
        timeout -= 0.2
91
      else:
92
        self.fail("timed out waiting for child's signal")
93
        break # not executed...
94

    
95
    self.assert_(not IsProcessAlive(self.pid_zombie),
96
                 "zombie not detected as zombie")
97

    
98
  def testNotExisting(self):
99
    self.assert_(not IsProcessAlive(self.pid_non_existing),
100
                 "noexisting process detected")
101

    
102
class TestPidFileFunctions(unittest.TestCase):
103
  """Tests for WritePidFile, RemovePidFile and IsPidFileAlive"""
104

    
105
  def setUp(self):
106
    self.dir = tempfile.mkdtemp()
107
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
108
    utils._DaemonPidFileName = self.f_dpn
109

    
110
  def testPidFileFunctions(self):
111
    utils.WritePidFile('test')
112
    self.assert_(os.path.exists(self.f_dpn('test')))
113
    self.assert_(utils.IsPidFileAlive(self.f_dpn('test')))
114
    utils.RemovePidFile('test')
115
    self.assert_(not os.path.exists(self.f_dpn('test')))
116

    
117
  def tearDown(self):
118
    os.rmdir(self.dir)
119

    
120

    
121
class TestRunCmd(unittest.TestCase):
122
  """Testing case for the RunCmd function"""
123

    
124
  def setUp(self):
125
    self.magic = time.ctime() + " ganeti test"
126

    
127
  def testOk(self):
128
    """Test successful exit code"""
129
    result = RunCmd("/bin/sh -c 'exit 0'")
130
    self.assertEqual(result.exit_code, 0)
131

    
132
  def testFail(self):
133
    """Test fail exit code"""
134
    result = RunCmd("/bin/sh -c 'exit 1'")
135
    self.assertEqual(result.exit_code, 1)
136

    
137

    
138
  def testStdout(self):
139
    """Test standard output"""
140
    cmd = 'echo -n "%s"' % self.magic
141
    result = RunCmd("/bin/sh -c '%s'" % cmd)
142
    self.assertEqual(result.stdout, self.magic)
143

    
144

    
145
  def testStderr(self):
146
    """Test standard error"""
147
    cmd = 'echo -n "%s"' % self.magic
148
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
149
    self.assertEqual(result.stderr, self.magic)
150

    
151

    
152
  def testCombined(self):
153
    """Test combined output"""
154
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
155
    result = RunCmd("/bin/sh -c '%s'" % cmd)
156
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
157

    
158
  def testSignal(self):
159
    """Test signal"""
160
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
161
    self.assertEqual(result.signal, 15)
162

    
163
  def testListRun(self):
164
    """Test list runs"""
165
    result = RunCmd(["true"])
166
    self.assertEqual(result.signal, None)
167
    self.assertEqual(result.exit_code, 0)
168
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
169
    self.assertEqual(result.signal, None)
170
    self.assertEqual(result.exit_code, 1)
171
    result = RunCmd(["echo", "-n", self.magic])
172
    self.assertEqual(result.signal, None)
173
    self.assertEqual(result.exit_code, 0)
174
    self.assertEqual(result.stdout, self.magic)
175

    
176
  def testLang(self):
177
    """Test locale environment"""
178
    old_env = os.environ.copy()
179
    try:
180
      os.environ["LANG"] = "en_US.UTF-8"
181
      os.environ["LC_ALL"] = "en_US.UTF-8"
182
      result = RunCmd(["locale"])
183
      for line in result.output.splitlines():
184
        key, value = line.split("=", 1)
185
        # Ignore these variables, they're overridden by LC_ALL
186
        if key == "LANG" or key == "LANGUAGE":
187
          continue
188
        self.failIf(value and value != "C" and value != '"C"',
189
            "Variable %s is set to the invalid value '%s'" % (key, value))
190
    finally:
191
      os.environ = old_env
192

    
193

    
194
class TestRemoveFile(unittest.TestCase):
195
  """Test case for the RemoveFile function"""
196

    
197
  def setUp(self):
198
    """Create a temp dir and file for each case"""
199
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
200
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
201
    os.close(fd)
202

    
203
  def tearDown(self):
204
    if os.path.exists(self.tmpfile):
205
      os.unlink(self.tmpfile)
206
    os.rmdir(self.tmpdir)
207

    
208

    
209
  def testIgnoreDirs(self):
210
    """Test that RemoveFile() ignores directories"""
211
    self.assertEqual(None, RemoveFile(self.tmpdir))
212

    
213

    
214
  def testIgnoreNotExisting(self):
215
    """Test that RemoveFile() ignores non-existing files"""
216
    RemoveFile(self.tmpfile)
217
    RemoveFile(self.tmpfile)
218

    
219

    
220
  def testRemoveFile(self):
221
    """Test that RemoveFile does remove a file"""
222
    RemoveFile(self.tmpfile)
223
    if os.path.exists(self.tmpfile):
224
      self.fail("File '%s' not removed" % self.tmpfile)
225

    
226

    
227
  def testRemoveSymlink(self):
228
    """Test that RemoveFile does remove symlinks"""
229
    symlink = self.tmpdir + "/symlink"
230
    os.symlink("no-such-file", symlink)
231
    RemoveFile(symlink)
232
    if os.path.exists(symlink):
233
      self.fail("File '%s' not removed" % symlink)
234
    os.symlink(self.tmpfile, symlink)
235
    RemoveFile(symlink)
236
    if os.path.exists(symlink):
237
      self.fail("File '%s' not removed" % symlink)
238

    
239

    
240
class TestCheckdict(unittest.TestCase):
241
  """Test case for the CheckDict function"""
242

    
243
  def testAdd(self):
244
    """Test that CheckDict adds a missing key with the correct value"""
245

    
246
    tgt = {'a':1}
247
    tmpl = {'b': 2}
248
    CheckDict(tgt, tmpl)
249
    if 'b' not in tgt or tgt['b'] != 2:
250
      self.fail("Failed to update dict")
251

    
252

    
253
  def testNoUpdate(self):
254
    """Test that CheckDict does not overwrite an existing key"""
255
    tgt = {'a':1, 'b': 3}
256
    tmpl = {'b': 2}
257
    CheckDict(tgt, tmpl)
258
    self.failUnlessEqual(tgt['b'], 3)
259

    
260

    
261
class TestMatchNameComponent(unittest.TestCase):
262
  """Test case for the MatchNameComponent function"""
263

    
264
  def testEmptyList(self):
265
    """Test that there is no match against an empty list"""
266

    
267
    self.failUnlessEqual(MatchNameComponent("", []), None)
268
    self.failUnlessEqual(MatchNameComponent("test", []), None)
269

    
270
  def testSingleMatch(self):
271
    """Test that a single match is performed correctly"""
272
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
273
    for key in "test2", "test2.example", "test2.example.com":
274
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
275

    
276
  def testMultipleMatches(self):
277
    """Test that a multiple match is returned as None"""
278
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
279
    for key in "test1", "test1.example":
280
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
281

    
282

    
283
class TestFormatUnit(unittest.TestCase):
284
  """Test case for the FormatUnit function"""
285

    
286
  def testMiB(self):
287
    self.assertEqual(FormatUnit(1), '1M')
288
    self.assertEqual(FormatUnit(100), '100M')
289
    self.assertEqual(FormatUnit(1023), '1023M')
290

    
291
  def testGiB(self):
292
    self.assertEqual(FormatUnit(1024), '1.0G')
293
    self.assertEqual(FormatUnit(1536), '1.5G')
294
    self.assertEqual(FormatUnit(17133), '16.7G')
295
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
296

    
297
  def testTiB(self):
298
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
299
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
300
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
301

    
302

    
303
class TestParseUnit(unittest.TestCase):
304
  """Test case for the ParseUnit function"""
305

    
306
  SCALES = (('', 1),
307
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
308
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
309
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
310

    
311
  def testRounding(self):
312
    self.assertEqual(ParseUnit('0'), 0)
313
    self.assertEqual(ParseUnit('1'), 4)
314
    self.assertEqual(ParseUnit('2'), 4)
315
    self.assertEqual(ParseUnit('3'), 4)
316

    
317
    self.assertEqual(ParseUnit('124'), 124)
318
    self.assertEqual(ParseUnit('125'), 128)
319
    self.assertEqual(ParseUnit('126'), 128)
320
    self.assertEqual(ParseUnit('127'), 128)
321
    self.assertEqual(ParseUnit('128'), 128)
322
    self.assertEqual(ParseUnit('129'), 132)
323
    self.assertEqual(ParseUnit('130'), 132)
324

    
325
  def testFloating(self):
326
    self.assertEqual(ParseUnit('0'), 0)
327
    self.assertEqual(ParseUnit('0.5'), 4)
328
    self.assertEqual(ParseUnit('1.75'), 4)
329
    self.assertEqual(ParseUnit('1.99'), 4)
330
    self.assertEqual(ParseUnit('2.00'), 4)
331
    self.assertEqual(ParseUnit('2.01'), 4)
332
    self.assertEqual(ParseUnit('3.99'), 4)
333
    self.assertEqual(ParseUnit('4.00'), 4)
334
    self.assertEqual(ParseUnit('4.01'), 8)
335
    self.assertEqual(ParseUnit('1.5G'), 1536)
336
    self.assertEqual(ParseUnit('1.8G'), 1844)
337
    self.assertEqual(ParseUnit('8.28T'), 8682212)
338

    
339
  def testSuffixes(self):
340
    for sep in ('', ' ', '   ', "\t", "\t "):
341
      for suffix, scale in TestParseUnit.SCALES:
342
        for func in (lambda x: x, str.lower, str.upper):
343
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
344
                           1024 * scale)
345

    
346
  def testInvalidInput(self):
347
    for sep in ('-', '_', ',', 'a'):
348
      for suffix, _ in TestParseUnit.SCALES:
349
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
350

    
351
    for suffix, _ in TestParseUnit.SCALES:
352
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
353

    
354

    
355
class TestSshKeys(testutils.GanetiTestCase):
356
  """Test case for the AddAuthorizedKey function"""
357

    
358
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
359
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
360
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
361

    
362
  def setUp(self):
363
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
364
    try:
365
      handle = os.fdopen(fd, 'w')
366
      try:
367
        handle.write("%s\n" % TestSshKeys.KEY_A)
368
        handle.write("%s\n" % TestSshKeys.KEY_B)
369
      finally:
370
        handle.close()
371
    except:
372
      utils.RemoveFile(self.tmpname)
373
      raise
374

    
375
  def tearDown(self):
376
    utils.RemoveFile(self.tmpname)
377
    del self.tmpname
378

    
379
  def testAddingNewKey(self):
380
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
381

    
382
    self.assertFileContent(self.tmpname,
383
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
384
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
385
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
386
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
387

    
388
  def testAddingAlmostButNotCompletelyTheSameKey(self):
389
    AddAuthorizedKey(self.tmpname,
390
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
391

    
392
    self.assertFileContent(self.tmpname,
393
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
394
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
395
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
396
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
397

    
398
  def testAddingExistingKeyWithSomeMoreSpaces(self):
399
    AddAuthorizedKey(self.tmpname,
400
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
401

    
402
    self.assertFileContent(self.tmpname,
403
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
404
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
405
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
406

    
407
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
408
    RemoveAuthorizedKey(self.tmpname,
409
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
410

    
411
    self.assertFileContent(self.tmpname,
412
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
413
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
414

    
415
  def testRemovingNonExistingKey(self):
416
    RemoveAuthorizedKey(self.tmpname,
417
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
418

    
419
    self.assertFileContent(self.tmpname,
420
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
421
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
422
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
423

    
424

    
425
class TestEtcHosts(testutils.GanetiTestCase):
426
  """Test functions modifying /etc/hosts"""
427

    
428
  def setUp(self):
429
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
430
    try:
431
      handle = os.fdopen(fd, 'w')
432
      try:
433
        handle.write('# This is a test file for /etc/hosts\n')
434
        handle.write('127.0.0.1\tlocalhost\n')
435
        handle.write('192.168.1.1 router gw\n')
436
      finally:
437
        handle.close()
438
    except:
439
      utils.RemoveFile(self.tmpname)
440
      raise
441

    
442
  def tearDown(self):
443
    utils.RemoveFile(self.tmpname)
444
    del self.tmpname
445

    
446
  def testSettingNewIp(self):
447
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
448

    
449
    self.assertFileContent(self.tmpname,
450
      "# This is a test file for /etc/hosts\n"
451
      "127.0.0.1\tlocalhost\n"
452
      "192.168.1.1 router gw\n"
453
      "1.2.3.4\tmyhost.domain.tld myhost\n")
454

    
455
  def testSettingExistingIp(self):
456
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
457
                     ['myhost'])
458

    
459
    self.assertFileContent(self.tmpname,
460
      "# This is a test file for /etc/hosts\n"
461
      "127.0.0.1\tlocalhost\n"
462
      "192.168.1.1\tmyhost.domain.tld myhost\n")
463

    
464
  def testSettingDuplicateName(self):
465
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
466

    
467
    self.assertFileContent(self.tmpname,
468
      "# This is a test file for /etc/hosts\n"
469
      "127.0.0.1\tlocalhost\n"
470
      "192.168.1.1 router gw\n"
471
      "1.2.3.4\tmyhost\n")
472

    
473
  def testRemovingExistingHost(self):
474
    RemoveEtcHostsEntry(self.tmpname, 'router')
475

    
476
    self.assertFileContent(self.tmpname,
477
      "# This is a test file for /etc/hosts\n"
478
      "127.0.0.1\tlocalhost\n"
479
      "192.168.1.1 gw\n")
480

    
481
  def testRemovingSingleExistingHost(self):
482
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
483

    
484
    self.assertFileContent(self.tmpname,
485
      "# This is a test file for /etc/hosts\n"
486
      "192.168.1.1 router gw\n")
487

    
488
  def testRemovingNonExistingHost(self):
489
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
490

    
491
    self.assertFileContent(self.tmpname,
492
      "# This is a test file for /etc/hosts\n"
493
      "127.0.0.1\tlocalhost\n"
494
      "192.168.1.1 router gw\n")
495

    
496
  def testRemovingAlias(self):
497
    RemoveEtcHostsEntry(self.tmpname, 'gw')
498

    
499
    self.assertFileContent(self.tmpname,
500
      "# This is a test file for /etc/hosts\n"
501
      "127.0.0.1\tlocalhost\n"
502
      "192.168.1.1 router\n")
503

    
504

    
505
class TestShellQuoting(unittest.TestCase):
506
  """Test case for shell quoting functions"""
507

    
508
  def testShellQuote(self):
509
    self.assertEqual(ShellQuote('abc'), "abc")
510
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
511
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
512
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
513
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
514

    
515
  def testShellQuoteArgs(self):
516
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
517
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
518
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
519

    
520

    
521
class TestTcpPing(unittest.TestCase):
522
  """Testcase for TCP version of ping - against listen(2)ing port"""
523

    
524
  def setUp(self):
525
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
526
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
527
    self.listenerport = self.listener.getsockname()[1]
528
    self.listener.listen(1)
529

    
530
  def tearDown(self):
531
    self.listener.shutdown(socket.SHUT_RDWR)
532
    del self.listener
533
    del self.listenerport
534

    
535
  def testTcpPingToLocalHostAccept(self):
536
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
537
                         self.listenerport,
538
                         timeout=10,
539
                         live_port_needed=True,
540
                         source=constants.LOCALHOST_IP_ADDRESS,
541
                         ),
542
                 "failed to connect to test listener")
543

    
544
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
545
                         self.listenerport,
546
                         timeout=10,
547
                         live_port_needed=True,
548
                         ),
549
                 "failed to connect to test listener (no source)")
550

    
551

    
552
class TestTcpPingDeaf(unittest.TestCase):
553
  """Testcase for TCP version of ping - against non listen(2)ing port"""
554

    
555
  def setUp(self):
556
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
557
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
558
    self.deaflistenerport = self.deaflistener.getsockname()[1]
559

    
560
  def tearDown(self):
561
    del self.deaflistener
562
    del self.deaflistenerport
563

    
564
  def testTcpPingToLocalHostAcceptDeaf(self):
565
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
566
                        self.deaflistenerport,
567
                        timeout=constants.TCP_PING_TIMEOUT,
568
                        live_port_needed=True,
569
                        source=constants.LOCALHOST_IP_ADDRESS,
570
                        ), # need successful connect(2)
571
                "successfully connected to deaf listener")
572

    
573
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
574
                        self.deaflistenerport,
575
                        timeout=constants.TCP_PING_TIMEOUT,
576
                        live_port_needed=True,
577
                        ), # need successful connect(2)
578
                "successfully connected to deaf listener (no source addr)")
579

    
580
  def testTcpPingToLocalHostNoAccept(self):
581
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
582
                         self.deaflistenerport,
583
                         timeout=constants.TCP_PING_TIMEOUT,
584
                         live_port_needed=False,
585
                         source=constants.LOCALHOST_IP_ADDRESS,
586
                         ), # ECONNREFUSED is OK
587
                 "failed to ping alive host on deaf port")
588

    
589
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
590
                         self.deaflistenerport,
591
                         timeout=constants.TCP_PING_TIMEOUT,
592
                         live_port_needed=False,
593
                         ), # ECONNREFUSED is OK
594
                 "failed to ping alive host on deaf port (no source addr)")
595

    
596

    
597
class TestListVisibleFiles(unittest.TestCase):
598
  """Test case for ListVisibleFiles"""
599

    
600
  def setUp(self):
601
    self.path = tempfile.mkdtemp()
602

    
603
  def tearDown(self):
604
    shutil.rmtree(self.path)
605

    
606
  def _test(self, files, expected):
607
    # Sort a copy
608
    expected = expected[:]
609
    expected.sort()
610

    
611
    for name in files:
612
      f = open(os.path.join(self.path, name), 'w')
613
      try:
614
        f.write("Test\n")
615
      finally:
616
        f.close()
617

    
618
    found = ListVisibleFiles(self.path)
619
    found.sort()
620

    
621
    self.assertEqual(found, expected)
622

    
623
  def testAllVisible(self):
624
    files = ["a", "b", "c"]
625
    expected = files
626
    self._test(files, expected)
627

    
628
  def testNoneVisible(self):
629
    files = [".a", ".b", ".c"]
630
    expected = []
631
    self._test(files, expected)
632

    
633
  def testSomeVisible(self):
634
    files = ["a", "b", ".c"]
635
    expected = ["a", "b"]
636
    self._test(files, expected)
637

    
638

    
639
class TestNewUUID(unittest.TestCase):
640
  """Test case for NewUUID"""
641

    
642
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
643
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
644

    
645
  def runTest(self):
646
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
647

    
648

    
649
class TestUniqueSequence(unittest.TestCase):
650
  """Test case for UniqueSequence"""
651

    
652
  def _test(self, input, expected):
653
    self.assertEqual(utils.UniqueSequence(input), expected)
654

    
655
  def runTest(self):
656
    # Ordered input
657
    self._test([1, 2, 3], [1, 2, 3])
658
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
659
    self._test([1, 2, 2, 3], [1, 2, 3])
660
    self._test([1, 2, 3, 3], [1, 2, 3])
661

    
662
    # Unordered input
663
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
664
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
665

    
666
    # Strings
667
    self._test(["a", "a"], ["a"])
668
    self._test(["a", "b"], ["a", "b"])
669
    self._test(["a", "b", "a"], ["a", "b"])
670

    
671
class TestFirstFree(unittest.TestCase):
672
  """Test case for the FirstFree function"""
673

    
674
  def test(self):
675
    """Test FirstFree"""
676
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
677
    self.failUnlessEqual(FirstFree([]), None)
678
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
679
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
680
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
681

    
682
if __name__ == '__main__':
683
  unittest.main()