Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ d9f311d7

History | View | Annotate | Download (22.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import tempfile
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti.utils import IsProcessAlive, RunCmd, \
42
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree
46
from ganeti.errors import LockError, UnitParseError, GenericError
47

    
48
def _ChildHandler(signal, stack):
49
  global _ChildFlag
50
  _ChildFlag = True
51

    
52

    
53
class TestIsProcessAlive(unittest.TestCase):
54
  """Testing case for IsProcessAlive"""
55

    
56
  def setUp(self):
57
    global _ChildFlag
58
    # create a (most probably) non-existing process-id
59
    self.pid_non_existing = os.fork()
60
    if self.pid_non_existing == 0:
61
      os._exit(0)
62
    elif self.pid_non_existing > 0:
63
      os.waitpid(self.pid_non_existing, 0)
64
    else:
65
      raise SystemError("can't fork")
66
    _ChildFlag = False
67
    # Use _ChildHandler for SIGCHLD
68
    self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
69
    # create a zombie
70
    self.pid_zombie = os.fork()
71
    if self.pid_zombie == 0:
72
      os._exit(0)
73
    elif self.pid_zombie < 0:
74
      raise SystemError("can't fork")
75

    
76
  def tearDown(self):
77
    signal.signal(signal.SIGCHLD, self.chldOrig)
78

    
79
  def testExists(self):
80
    mypid = os.getpid()
81
    self.assert_(IsProcessAlive(mypid),
82
                 "can't find myself running")
83

    
84
  def testZombie(self):
85
    global _ChildFlag
86
    timeout = 10
87

    
88
    while not _ChildFlag:
89
      if timeout >= 0:
90
        time.sleep(0.2)
91
        timeout -= 0.2
92
      else:
93
        self.fail("timed out waiting for child's signal")
94
        break # not executed...
95

    
96
    self.assert_(not IsProcessAlive(self.pid_zombie),
97
                 "zombie not detected as zombie")
98

    
99
  def testNotExisting(self):
100
    self.assert_(not IsProcessAlive(self.pid_non_existing),
101
                 "noexisting process detected")
102

    
103

    
104
class TestPidFileFunctions(unittest.TestCase):
105
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
106

    
107
  def setUp(self):
108
    self.dir = tempfile.mkdtemp()
109
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
110
    utils._DaemonPidFileName = self.f_dpn
111

    
112
  def testPidFileFunctions(self):
113
    pid_file = self.f_dpn('test')
114
    utils.WritePidFile('test')
115
    self.failUnless(os.path.exists(pid_file),
116
                    "PID file should have been created")
117
    read_pid = utils.ReadPidFile(pid_file)
118
    self.failUnlessEqual(read_pid, os.getpid())
119
    self.failUnless(utils.IsProcessAlive(read_pid))
120
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
121
    utils.RemovePidFile('test')
122
    self.failIf(os.path.exists(pid_file),
123
                "PID file should not exist anymore")
124
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
125
                         "ReadPidFile should return 0 for missing pid file")
126
    fh = open(pid_file, "w")
127
    fh.write("blah\n")
128
    fh.close()
129
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
130
                         "ReadPidFile should return 0 for invalid pid file")
131
    utils.RemovePidFile('test')
132
    self.failIf(os.path.exists(pid_file),
133
                "PID file should not exist anymore")
134

    
135
  def tearDown(self):
136
    for name in os.listdir(self.dir):
137
      os.unlink(os.path.join(self.dir, name))
138
    os.rmdir(self.dir)
139

    
140

    
141
class TestRunCmd(unittest.TestCase):
142
  """Testing case for the RunCmd function"""
143

    
144
  def setUp(self):
145
    self.magic = time.ctime() + " ganeti test"
146

    
147
  def testOk(self):
148
    """Test successful exit code"""
149
    result = RunCmd("/bin/sh -c 'exit 0'")
150
    self.assertEqual(result.exit_code, 0)
151

    
152
  def testFail(self):
153
    """Test fail exit code"""
154
    result = RunCmd("/bin/sh -c 'exit 1'")
155
    self.assertEqual(result.exit_code, 1)
156

    
157

    
158
  def testStdout(self):
159
    """Test standard output"""
160
    cmd = 'echo -n "%s"' % self.magic
161
    result = RunCmd("/bin/sh -c '%s'" % cmd)
162
    self.assertEqual(result.stdout, self.magic)
163

    
164

    
165
  def testStderr(self):
166
    """Test standard error"""
167
    cmd = 'echo -n "%s"' % self.magic
168
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
169
    self.assertEqual(result.stderr, self.magic)
170

    
171

    
172
  def testCombined(self):
173
    """Test combined output"""
174
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
175
    result = RunCmd("/bin/sh -c '%s'" % cmd)
176
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
177

    
178
  def testSignal(self):
179
    """Test signal"""
180
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
181
    self.assertEqual(result.signal, 15)
182

    
183
  def testListRun(self):
184
    """Test list runs"""
185
    result = RunCmd(["true"])
186
    self.assertEqual(result.signal, None)
187
    self.assertEqual(result.exit_code, 0)
188
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
189
    self.assertEqual(result.signal, None)
190
    self.assertEqual(result.exit_code, 1)
191
    result = RunCmd(["echo", "-n", self.magic])
192
    self.assertEqual(result.signal, None)
193
    self.assertEqual(result.exit_code, 0)
194
    self.assertEqual(result.stdout, self.magic)
195

    
196
  def testLang(self):
197
    """Test locale environment"""
198
    old_env = os.environ.copy()
199
    try:
200
      os.environ["LANG"] = "en_US.UTF-8"
201
      os.environ["LC_ALL"] = "en_US.UTF-8"
202
      result = RunCmd(["locale"])
203
      for line in result.output.splitlines():
204
        key, value = line.split("=", 1)
205
        # Ignore these variables, they're overridden by LC_ALL
206
        if key == "LANG" or key == "LANGUAGE":
207
          continue
208
        self.failIf(value and value != "C" and value != '"C"',
209
            "Variable %s is set to the invalid value '%s'" % (key, value))
210
    finally:
211
      os.environ = old_env
212

    
213

    
214
class TestRemoveFile(unittest.TestCase):
215
  """Test case for the RemoveFile function"""
216

    
217
  def setUp(self):
218
    """Create a temp dir and file for each case"""
219
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
220
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
221
    os.close(fd)
222

    
223
  def tearDown(self):
224
    if os.path.exists(self.tmpfile):
225
      os.unlink(self.tmpfile)
226
    os.rmdir(self.tmpdir)
227

    
228

    
229
  def testIgnoreDirs(self):
230
    """Test that RemoveFile() ignores directories"""
231
    self.assertEqual(None, RemoveFile(self.tmpdir))
232

    
233

    
234
  def testIgnoreNotExisting(self):
235
    """Test that RemoveFile() ignores non-existing files"""
236
    RemoveFile(self.tmpfile)
237
    RemoveFile(self.tmpfile)
238

    
239

    
240
  def testRemoveFile(self):
241
    """Test that RemoveFile does remove a file"""
242
    RemoveFile(self.tmpfile)
243
    if os.path.exists(self.tmpfile):
244
      self.fail("File '%s' not removed" % self.tmpfile)
245

    
246

    
247
  def testRemoveSymlink(self):
248
    """Test that RemoveFile does remove symlinks"""
249
    symlink = self.tmpdir + "/symlink"
250
    os.symlink("no-such-file", symlink)
251
    RemoveFile(symlink)
252
    if os.path.exists(symlink):
253
      self.fail("File '%s' not removed" % symlink)
254
    os.symlink(self.tmpfile, symlink)
255
    RemoveFile(symlink)
256
    if os.path.exists(symlink):
257
      self.fail("File '%s' not removed" % symlink)
258

    
259

    
260
class TestCheckdict(unittest.TestCase):
261
  """Test case for the CheckDict function"""
262

    
263
  def testAdd(self):
264
    """Test that CheckDict adds a missing key with the correct value"""
265

    
266
    tgt = {'a':1}
267
    tmpl = {'b': 2}
268
    CheckDict(tgt, tmpl)
269
    if 'b' not in tgt or tgt['b'] != 2:
270
      self.fail("Failed to update dict")
271

    
272

    
273
  def testNoUpdate(self):
274
    """Test that CheckDict does not overwrite an existing key"""
275
    tgt = {'a':1, 'b': 3}
276
    tmpl = {'b': 2}
277
    CheckDict(tgt, tmpl)
278
    self.failUnlessEqual(tgt['b'], 3)
279

    
280

    
281
class TestMatchNameComponent(unittest.TestCase):
282
  """Test case for the MatchNameComponent function"""
283

    
284
  def testEmptyList(self):
285
    """Test that there is no match against an empty list"""
286

    
287
    self.failUnlessEqual(MatchNameComponent("", []), None)
288
    self.failUnlessEqual(MatchNameComponent("test", []), None)
289

    
290
  def testSingleMatch(self):
291
    """Test that a single match is performed correctly"""
292
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
293
    for key in "test2", "test2.example", "test2.example.com":
294
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
295

    
296
  def testMultipleMatches(self):
297
    """Test that a multiple match is returned as None"""
298
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
299
    for key in "test1", "test1.example":
300
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
301

    
302

    
303
class TestFormatUnit(unittest.TestCase):
304
  """Test case for the FormatUnit function"""
305

    
306
  def testMiB(self):
307
    self.assertEqual(FormatUnit(1), '1M')
308
    self.assertEqual(FormatUnit(100), '100M')
309
    self.assertEqual(FormatUnit(1023), '1023M')
310

    
311
  def testGiB(self):
312
    self.assertEqual(FormatUnit(1024), '1.0G')
313
    self.assertEqual(FormatUnit(1536), '1.5G')
314
    self.assertEqual(FormatUnit(17133), '16.7G')
315
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
316

    
317
  def testTiB(self):
318
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
319
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
320
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
321

    
322

    
323
class TestParseUnit(unittest.TestCase):
324
  """Test case for the ParseUnit function"""
325

    
326
  SCALES = (('', 1),
327
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
328
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
329
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
330

    
331
  def testRounding(self):
332
    self.assertEqual(ParseUnit('0'), 0)
333
    self.assertEqual(ParseUnit('1'), 4)
334
    self.assertEqual(ParseUnit('2'), 4)
335
    self.assertEqual(ParseUnit('3'), 4)
336

    
337
    self.assertEqual(ParseUnit('124'), 124)
338
    self.assertEqual(ParseUnit('125'), 128)
339
    self.assertEqual(ParseUnit('126'), 128)
340
    self.assertEqual(ParseUnit('127'), 128)
341
    self.assertEqual(ParseUnit('128'), 128)
342
    self.assertEqual(ParseUnit('129'), 132)
343
    self.assertEqual(ParseUnit('130'), 132)
344

    
345
  def testFloating(self):
346
    self.assertEqual(ParseUnit('0'), 0)
347
    self.assertEqual(ParseUnit('0.5'), 4)
348
    self.assertEqual(ParseUnit('1.75'), 4)
349
    self.assertEqual(ParseUnit('1.99'), 4)
350
    self.assertEqual(ParseUnit('2.00'), 4)
351
    self.assertEqual(ParseUnit('2.01'), 4)
352
    self.assertEqual(ParseUnit('3.99'), 4)
353
    self.assertEqual(ParseUnit('4.00'), 4)
354
    self.assertEqual(ParseUnit('4.01'), 8)
355
    self.assertEqual(ParseUnit('1.5G'), 1536)
356
    self.assertEqual(ParseUnit('1.8G'), 1844)
357
    self.assertEqual(ParseUnit('8.28T'), 8682212)
358

    
359
  def testSuffixes(self):
360
    for sep in ('', ' ', '   ', "\t", "\t "):
361
      for suffix, scale in TestParseUnit.SCALES:
362
        for func in (lambda x: x, str.lower, str.upper):
363
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
364
                           1024 * scale)
365

    
366
  def testInvalidInput(self):
367
    for sep in ('-', '_', ',', 'a'):
368
      for suffix, _ in TestParseUnit.SCALES:
369
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
370

    
371
    for suffix, _ in TestParseUnit.SCALES:
372
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
373

    
374

    
375
class TestSshKeys(testutils.GanetiTestCase):
376
  """Test case for the AddAuthorizedKey function"""
377

    
378
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
379
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
380
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
381

    
382
  def setUp(self):
383
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
384
    try:
385
      handle = os.fdopen(fd, 'w')
386
      try:
387
        handle.write("%s\n" % TestSshKeys.KEY_A)
388
        handle.write("%s\n" % TestSshKeys.KEY_B)
389
      finally:
390
        handle.close()
391
    except:
392
      utils.RemoveFile(self.tmpname)
393
      raise
394

    
395
  def tearDown(self):
396
    utils.RemoveFile(self.tmpname)
397
    del self.tmpname
398

    
399
  def testAddingNewKey(self):
400
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
401

    
402
    self.assertFileContent(self.tmpname,
403
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
404
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
405
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
406
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
407

    
408
  def testAddingAlmostButNotCompletelyTheSameKey(self):
409
    AddAuthorizedKey(self.tmpname,
410
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
411

    
412
    self.assertFileContent(self.tmpname,
413
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
414
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
415
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
416
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
417

    
418
  def testAddingExistingKeyWithSomeMoreSpaces(self):
419
    AddAuthorizedKey(self.tmpname,
420
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
421

    
422
    self.assertFileContent(self.tmpname,
423
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
424
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
425
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
426

    
427
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
428
    RemoveAuthorizedKey(self.tmpname,
429
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
430

    
431
    self.assertFileContent(self.tmpname,
432
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
433
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
434

    
435
  def testRemovingNonExistingKey(self):
436
    RemoveAuthorizedKey(self.tmpname,
437
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
438

    
439
    self.assertFileContent(self.tmpname,
440
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
441
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
442
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
443

    
444

    
445
class TestEtcHosts(testutils.GanetiTestCase):
446
  """Test functions modifying /etc/hosts"""
447

    
448
  def setUp(self):
449
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
450
    try:
451
      handle = os.fdopen(fd, 'w')
452
      try:
453
        handle.write('# This is a test file for /etc/hosts\n')
454
        handle.write('127.0.0.1\tlocalhost\n')
455
        handle.write('192.168.1.1 router gw\n')
456
      finally:
457
        handle.close()
458
    except:
459
      utils.RemoveFile(self.tmpname)
460
      raise
461

    
462
  def tearDown(self):
463
    utils.RemoveFile(self.tmpname)
464
    del self.tmpname
465

    
466
  def testSettingNewIp(self):
467
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
468

    
469
    self.assertFileContent(self.tmpname,
470
      "# This is a test file for /etc/hosts\n"
471
      "127.0.0.1\tlocalhost\n"
472
      "192.168.1.1 router gw\n"
473
      "1.2.3.4\tmyhost.domain.tld myhost\n")
474

    
475
  def testSettingExistingIp(self):
476
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
477
                     ['myhost'])
478

    
479
    self.assertFileContent(self.tmpname,
480
      "# This is a test file for /etc/hosts\n"
481
      "127.0.0.1\tlocalhost\n"
482
      "192.168.1.1\tmyhost.domain.tld myhost\n")
483

    
484
  def testSettingDuplicateName(self):
485
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
486

    
487
    self.assertFileContent(self.tmpname,
488
      "# This is a test file for /etc/hosts\n"
489
      "127.0.0.1\tlocalhost\n"
490
      "192.168.1.1 router gw\n"
491
      "1.2.3.4\tmyhost\n")
492

    
493
  def testRemovingExistingHost(self):
494
    RemoveEtcHostsEntry(self.tmpname, 'router')
495

    
496
    self.assertFileContent(self.tmpname,
497
      "# This is a test file for /etc/hosts\n"
498
      "127.0.0.1\tlocalhost\n"
499
      "192.168.1.1 gw\n")
500

    
501
  def testRemovingSingleExistingHost(self):
502
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
503

    
504
    self.assertFileContent(self.tmpname,
505
      "# This is a test file for /etc/hosts\n"
506
      "192.168.1.1 router gw\n")
507

    
508
  def testRemovingNonExistingHost(self):
509
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
510

    
511
    self.assertFileContent(self.tmpname,
512
      "# This is a test file for /etc/hosts\n"
513
      "127.0.0.1\tlocalhost\n"
514
      "192.168.1.1 router gw\n")
515

    
516
  def testRemovingAlias(self):
517
    RemoveEtcHostsEntry(self.tmpname, 'gw')
518

    
519
    self.assertFileContent(self.tmpname,
520
      "# This is a test file for /etc/hosts\n"
521
      "127.0.0.1\tlocalhost\n"
522
      "192.168.1.1 router\n")
523

    
524

    
525
class TestShellQuoting(unittest.TestCase):
526
  """Test case for shell quoting functions"""
527

    
528
  def testShellQuote(self):
529
    self.assertEqual(ShellQuote('abc'), "abc")
530
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
531
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
532
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
533
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
534

    
535
  def testShellQuoteArgs(self):
536
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
537
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
538
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
539

    
540

    
541
class TestTcpPing(unittest.TestCase):
542
  """Testcase for TCP version of ping - against listen(2)ing port"""
543

    
544
  def setUp(self):
545
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
546
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
547
    self.listenerport = self.listener.getsockname()[1]
548
    self.listener.listen(1)
549

    
550
  def tearDown(self):
551
    self.listener.shutdown(socket.SHUT_RDWR)
552
    del self.listener
553
    del self.listenerport
554

    
555
  def testTcpPingToLocalHostAccept(self):
556
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
557
                         self.listenerport,
558
                         timeout=10,
559
                         live_port_needed=True,
560
                         source=constants.LOCALHOST_IP_ADDRESS,
561
                         ),
562
                 "failed to connect to test listener")
563

    
564
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
565
                         self.listenerport,
566
                         timeout=10,
567
                         live_port_needed=True,
568
                         ),
569
                 "failed to connect to test listener (no source)")
570

    
571

    
572
class TestTcpPingDeaf(unittest.TestCase):
573
  """Testcase for TCP version of ping - against non listen(2)ing port"""
574

    
575
  def setUp(self):
576
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
577
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
578
    self.deaflistenerport = self.deaflistener.getsockname()[1]
579

    
580
  def tearDown(self):
581
    del self.deaflistener
582
    del self.deaflistenerport
583

    
584
  def testTcpPingToLocalHostAcceptDeaf(self):
585
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
586
                        self.deaflistenerport,
587
                        timeout=constants.TCP_PING_TIMEOUT,
588
                        live_port_needed=True,
589
                        source=constants.LOCALHOST_IP_ADDRESS,
590
                        ), # need successful connect(2)
591
                "successfully connected to deaf listener")
592

    
593
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
594
                        self.deaflistenerport,
595
                        timeout=constants.TCP_PING_TIMEOUT,
596
                        live_port_needed=True,
597
                        ), # need successful connect(2)
598
                "successfully connected to deaf listener (no source addr)")
599

    
600
  def testTcpPingToLocalHostNoAccept(self):
601
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
602
                         self.deaflistenerport,
603
                         timeout=constants.TCP_PING_TIMEOUT,
604
                         live_port_needed=False,
605
                         source=constants.LOCALHOST_IP_ADDRESS,
606
                         ), # ECONNREFUSED is OK
607
                 "failed to ping alive host on deaf port")
608

    
609
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
610
                         self.deaflistenerport,
611
                         timeout=constants.TCP_PING_TIMEOUT,
612
                         live_port_needed=False,
613
                         ), # ECONNREFUSED is OK
614
                 "failed to ping alive host on deaf port (no source addr)")
615

    
616

    
617
class TestListVisibleFiles(unittest.TestCase):
618
  """Test case for ListVisibleFiles"""
619

    
620
  def setUp(self):
621
    self.path = tempfile.mkdtemp()
622

    
623
  def tearDown(self):
624
    shutil.rmtree(self.path)
625

    
626
  def _test(self, files, expected):
627
    # Sort a copy
628
    expected = expected[:]
629
    expected.sort()
630

    
631
    for name in files:
632
      f = open(os.path.join(self.path, name), 'w')
633
      try:
634
        f.write("Test\n")
635
      finally:
636
        f.close()
637

    
638
    found = ListVisibleFiles(self.path)
639
    found.sort()
640

    
641
    self.assertEqual(found, expected)
642

    
643
  def testAllVisible(self):
644
    files = ["a", "b", "c"]
645
    expected = files
646
    self._test(files, expected)
647

    
648
  def testNoneVisible(self):
649
    files = [".a", ".b", ".c"]
650
    expected = []
651
    self._test(files, expected)
652

    
653
  def testSomeVisible(self):
654
    files = ["a", "b", ".c"]
655
    expected = ["a", "b"]
656
    self._test(files, expected)
657

    
658

    
659
class TestNewUUID(unittest.TestCase):
660
  """Test case for NewUUID"""
661

    
662
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
663
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
664

    
665
  def runTest(self):
666
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
667

    
668

    
669
class TestUniqueSequence(unittest.TestCase):
670
  """Test case for UniqueSequence"""
671

    
672
  def _test(self, input, expected):
673
    self.assertEqual(utils.UniqueSequence(input), expected)
674

    
675
  def runTest(self):
676
    # Ordered input
677
    self._test([1, 2, 3], [1, 2, 3])
678
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
679
    self._test([1, 2, 2, 3], [1, 2, 3])
680
    self._test([1, 2, 3, 3], [1, 2, 3])
681

    
682
    # Unordered input
683
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
684
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
685

    
686
    # Strings
687
    self._test(["a", "a"], ["a"])
688
    self._test(["a", "b"], ["a", "b"])
689
    self._test(["a", "b", "a"], ["a", "b"])
690

    
691
class TestFirstFree(unittest.TestCase):
692
  """Test case for the FirstFree function"""
693

    
694
  def test(self):
695
    """Test FirstFree"""
696
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
697
    self.failUnlessEqual(FirstFree([]), None)
698
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
699
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
700
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
701

    
702
if __name__ == '__main__':
703
  unittest.main()