Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ b2a1f511

History | View | Annotate | Download (23 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import tempfile
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti.utils import IsProcessAlive, RunCmd, \
42
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree
46
from ganeti.errors import LockError, UnitParseError, GenericError, \
47
     ProgrammerError
48

    
49
def _ChildHandler(signal, stack):
50
  global _ChildFlag
51
  _ChildFlag = True
52

    
53

    
54
class TestIsProcessAlive(unittest.TestCase):
55
  """Testing case for IsProcessAlive"""
56

    
57
  def setUp(self):
58
    global _ChildFlag
59
    # create a (most probably) non-existing process-id
60
    self.pid_non_existing = os.fork()
61
    if self.pid_non_existing == 0:
62
      os._exit(0)
63
    elif self.pid_non_existing > 0:
64
      os.waitpid(self.pid_non_existing, 0)
65
    else:
66
      raise SystemError("can't fork")
67
    _ChildFlag = False
68
    # Use _ChildHandler for SIGCHLD
69
    self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
70
    # create a zombie
71
    self.pid_zombie = os.fork()
72
    if self.pid_zombie == 0:
73
      os._exit(0)
74
    elif self.pid_zombie < 0:
75
      raise SystemError("can't fork")
76

    
77
  def tearDown(self):
78
    signal.signal(signal.SIGCHLD, self.chldOrig)
79

    
80
  def testExists(self):
81
    mypid = os.getpid()
82
    self.assert_(IsProcessAlive(mypid),
83
                 "can't find myself running")
84

    
85
  def testZombie(self):
86
    global _ChildFlag
87
    timeout = 10
88

    
89
    while not _ChildFlag:
90
      if timeout >= 0:
91
        time.sleep(0.2)
92
        timeout -= 0.2
93
      else:
94
        self.fail("timed out waiting for child's signal")
95
        break # not executed...
96

    
97
    self.assert_(not IsProcessAlive(self.pid_zombie),
98
                 "zombie not detected as zombie")
99

    
100
  def testNotExisting(self):
101
    self.assert_(not IsProcessAlive(self.pid_non_existing),
102
                 "noexisting process detected")
103

    
104

    
105
class TestPidFileFunctions(unittest.TestCase):
106
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
107

    
108
  def setUp(self):
109
    self.dir = tempfile.mkdtemp()
110
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
111
    utils._DaemonPidFileName = self.f_dpn
112

    
113
  def testPidFileFunctions(self):
114
    pid_file = self.f_dpn('test')
115
    utils.WritePidFile('test')
116
    self.failUnless(os.path.exists(pid_file),
117
                    "PID file should have been created")
118
    read_pid = utils.ReadPidFile(pid_file)
119
    self.failUnlessEqual(read_pid, os.getpid())
120
    self.failUnless(utils.IsProcessAlive(read_pid))
121
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
122
    utils.RemovePidFile('test')
123
    self.failIf(os.path.exists(pid_file),
124
                "PID file should not exist anymore")
125
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
126
                         "ReadPidFile should return 0 for missing pid file")
127
    fh = open(pid_file, "w")
128
    fh.write("blah\n")
129
    fh.close()
130
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
131
                         "ReadPidFile should return 0 for invalid pid file")
132
    utils.RemovePidFile('test')
133
    self.failIf(os.path.exists(pid_file),
134
                "PID file should not exist anymore")
135

    
136
  def testKill(self):
137
    pid_file = self.f_dpn('child')
138
    r_fd, w_fd = os.pipe()
139
    new_pid = os.fork()
140
    if new_pid == 0: #child
141
      utils.WritePidFile('child')
142
      os.write(w_fd, 'a')
143
      signal.pause()
144
      os._exit(0)
145
      return
146
    # else we are in the parent
147
    # wait until the child has written the pid file
148
    os.read(r_fd, 1)
149
    read_pid = utils.ReadPidFile(pid_file)
150
    self.failUnlessEqual(read_pid, new_pid)
151
    self.failUnless(utils.IsProcessAlive(new_pid))
152
    utils.KillProcess(new_pid)
153
    self.failIf(utils.IsProcessAlive(new_pid))
154
    utils.RemovePidFile('child')
155
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
156

    
157
  def tearDown(self):
158
    for name in os.listdir(self.dir):
159
      os.unlink(os.path.join(self.dir, name))
160
    os.rmdir(self.dir)
161

    
162

    
163
class TestRunCmd(unittest.TestCase):
164
  """Testing case for the RunCmd function"""
165

    
166
  def setUp(self):
167
    self.magic = time.ctime() + " ganeti test"
168

    
169
  def testOk(self):
170
    """Test successful exit code"""
171
    result = RunCmd("/bin/sh -c 'exit 0'")
172
    self.assertEqual(result.exit_code, 0)
173

    
174
  def testFail(self):
175
    """Test fail exit code"""
176
    result = RunCmd("/bin/sh -c 'exit 1'")
177
    self.assertEqual(result.exit_code, 1)
178

    
179

    
180
  def testStdout(self):
181
    """Test standard output"""
182
    cmd = 'echo -n "%s"' % self.magic
183
    result = RunCmd("/bin/sh -c '%s'" % cmd)
184
    self.assertEqual(result.stdout, self.magic)
185

    
186

    
187
  def testStderr(self):
188
    """Test standard error"""
189
    cmd = 'echo -n "%s"' % self.magic
190
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
191
    self.assertEqual(result.stderr, self.magic)
192

    
193

    
194
  def testCombined(self):
195
    """Test combined output"""
196
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
197
    result = RunCmd("/bin/sh -c '%s'" % cmd)
198
    self.assertEqual(result.output, "A" + self.magic + "B" + self.magic)
199

    
200
  def testSignal(self):
201
    """Test signal"""
202
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
203
    self.assertEqual(result.signal, 15)
204

    
205
  def testListRun(self):
206
    """Test list runs"""
207
    result = RunCmd(["true"])
208
    self.assertEqual(result.signal, None)
209
    self.assertEqual(result.exit_code, 0)
210
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
211
    self.assertEqual(result.signal, None)
212
    self.assertEqual(result.exit_code, 1)
213
    result = RunCmd(["echo", "-n", self.magic])
214
    self.assertEqual(result.signal, None)
215
    self.assertEqual(result.exit_code, 0)
216
    self.assertEqual(result.stdout, self.magic)
217

    
218
  def testLang(self):
219
    """Test locale environment"""
220
    old_env = os.environ.copy()
221
    try:
222
      os.environ["LANG"] = "en_US.UTF-8"
223
      os.environ["LC_ALL"] = "en_US.UTF-8"
224
      result = RunCmd(["locale"])
225
      for line in result.output.splitlines():
226
        key, value = line.split("=", 1)
227
        # Ignore these variables, they're overridden by LC_ALL
228
        if key == "LANG" or key == "LANGUAGE":
229
          continue
230
        self.failIf(value and value != "C" and value != '"C"',
231
            "Variable %s is set to the invalid value '%s'" % (key, value))
232
    finally:
233
      os.environ = old_env
234

    
235

    
236
class TestRemoveFile(unittest.TestCase):
237
  """Test case for the RemoveFile function"""
238

    
239
  def setUp(self):
240
    """Create a temp dir and file for each case"""
241
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
242
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
243
    os.close(fd)
244

    
245
  def tearDown(self):
246
    if os.path.exists(self.tmpfile):
247
      os.unlink(self.tmpfile)
248
    os.rmdir(self.tmpdir)
249

    
250

    
251
  def testIgnoreDirs(self):
252
    """Test that RemoveFile() ignores directories"""
253
    self.assertEqual(None, RemoveFile(self.tmpdir))
254

    
255

    
256
  def testIgnoreNotExisting(self):
257
    """Test that RemoveFile() ignores non-existing files"""
258
    RemoveFile(self.tmpfile)
259
    RemoveFile(self.tmpfile)
260

    
261

    
262
  def testRemoveFile(self):
263
    """Test that RemoveFile does remove a file"""
264
    RemoveFile(self.tmpfile)
265
    if os.path.exists(self.tmpfile):
266
      self.fail("File '%s' not removed" % self.tmpfile)
267

    
268

    
269
  def testRemoveSymlink(self):
270
    """Test that RemoveFile does remove symlinks"""
271
    symlink = self.tmpdir + "/symlink"
272
    os.symlink("no-such-file", symlink)
273
    RemoveFile(symlink)
274
    if os.path.exists(symlink):
275
      self.fail("File '%s' not removed" % symlink)
276
    os.symlink(self.tmpfile, symlink)
277
    RemoveFile(symlink)
278
    if os.path.exists(symlink):
279
      self.fail("File '%s' not removed" % symlink)
280

    
281

    
282
class TestCheckdict(unittest.TestCase):
283
  """Test case for the CheckDict function"""
284

    
285
  def testAdd(self):
286
    """Test that CheckDict adds a missing key with the correct value"""
287

    
288
    tgt = {'a':1}
289
    tmpl = {'b': 2}
290
    CheckDict(tgt, tmpl)
291
    if 'b' not in tgt or tgt['b'] != 2:
292
      self.fail("Failed to update dict")
293

    
294

    
295
  def testNoUpdate(self):
296
    """Test that CheckDict does not overwrite an existing key"""
297
    tgt = {'a':1, 'b': 3}
298
    tmpl = {'b': 2}
299
    CheckDict(tgt, tmpl)
300
    self.failUnlessEqual(tgt['b'], 3)
301

    
302

    
303
class TestMatchNameComponent(unittest.TestCase):
304
  """Test case for the MatchNameComponent function"""
305

    
306
  def testEmptyList(self):
307
    """Test that there is no match against an empty list"""
308

    
309
    self.failUnlessEqual(MatchNameComponent("", []), None)
310
    self.failUnlessEqual(MatchNameComponent("test", []), None)
311

    
312
  def testSingleMatch(self):
313
    """Test that a single match is performed correctly"""
314
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
315
    for key in "test2", "test2.example", "test2.example.com":
316
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
317

    
318
  def testMultipleMatches(self):
319
    """Test that a multiple match is returned as None"""
320
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
321
    for key in "test1", "test1.example":
322
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
323

    
324

    
325
class TestFormatUnit(unittest.TestCase):
326
  """Test case for the FormatUnit function"""
327

    
328
  def testMiB(self):
329
    self.assertEqual(FormatUnit(1), '1M')
330
    self.assertEqual(FormatUnit(100), '100M')
331
    self.assertEqual(FormatUnit(1023), '1023M')
332

    
333
  def testGiB(self):
334
    self.assertEqual(FormatUnit(1024), '1.0G')
335
    self.assertEqual(FormatUnit(1536), '1.5G')
336
    self.assertEqual(FormatUnit(17133), '16.7G')
337
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
338

    
339
  def testTiB(self):
340
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
341
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
342
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
343

    
344

    
345
class TestParseUnit(unittest.TestCase):
346
  """Test case for the ParseUnit function"""
347

    
348
  SCALES = (('', 1),
349
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
350
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
351
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
352

    
353
  def testRounding(self):
354
    self.assertEqual(ParseUnit('0'), 0)
355
    self.assertEqual(ParseUnit('1'), 4)
356
    self.assertEqual(ParseUnit('2'), 4)
357
    self.assertEqual(ParseUnit('3'), 4)
358

    
359
    self.assertEqual(ParseUnit('124'), 124)
360
    self.assertEqual(ParseUnit('125'), 128)
361
    self.assertEqual(ParseUnit('126'), 128)
362
    self.assertEqual(ParseUnit('127'), 128)
363
    self.assertEqual(ParseUnit('128'), 128)
364
    self.assertEqual(ParseUnit('129'), 132)
365
    self.assertEqual(ParseUnit('130'), 132)
366

    
367
  def testFloating(self):
368
    self.assertEqual(ParseUnit('0'), 0)
369
    self.assertEqual(ParseUnit('0.5'), 4)
370
    self.assertEqual(ParseUnit('1.75'), 4)
371
    self.assertEqual(ParseUnit('1.99'), 4)
372
    self.assertEqual(ParseUnit('2.00'), 4)
373
    self.assertEqual(ParseUnit('2.01'), 4)
374
    self.assertEqual(ParseUnit('3.99'), 4)
375
    self.assertEqual(ParseUnit('4.00'), 4)
376
    self.assertEqual(ParseUnit('4.01'), 8)
377
    self.assertEqual(ParseUnit('1.5G'), 1536)
378
    self.assertEqual(ParseUnit('1.8G'), 1844)
379
    self.assertEqual(ParseUnit('8.28T'), 8682212)
380

    
381
  def testSuffixes(self):
382
    for sep in ('', ' ', '   ', "\t", "\t "):
383
      for suffix, scale in TestParseUnit.SCALES:
384
        for func in (lambda x: x, str.lower, str.upper):
385
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
386
                           1024 * scale)
387

    
388
  def testInvalidInput(self):
389
    for sep in ('-', '_', ',', 'a'):
390
      for suffix, _ in TestParseUnit.SCALES:
391
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
392

    
393
    for suffix, _ in TestParseUnit.SCALES:
394
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
395

    
396

    
397
class TestSshKeys(testutils.GanetiTestCase):
398
  """Test case for the AddAuthorizedKey function"""
399

    
400
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
401
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
402
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
403

    
404
  def setUp(self):
405
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
406
    try:
407
      handle = os.fdopen(fd, 'w')
408
      try:
409
        handle.write("%s\n" % TestSshKeys.KEY_A)
410
        handle.write("%s\n" % TestSshKeys.KEY_B)
411
      finally:
412
        handle.close()
413
    except:
414
      utils.RemoveFile(self.tmpname)
415
      raise
416

    
417
  def tearDown(self):
418
    utils.RemoveFile(self.tmpname)
419
    del self.tmpname
420

    
421
  def testAddingNewKey(self):
422
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
423

    
424
    self.assertFileContent(self.tmpname,
425
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
426
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
427
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
428
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
429

    
430
  def testAddingAlmostButNotCompletelyTheSameKey(self):
431
    AddAuthorizedKey(self.tmpname,
432
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
433

    
434
    self.assertFileContent(self.tmpname,
435
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
436
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
437
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
438
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
439

    
440
  def testAddingExistingKeyWithSomeMoreSpaces(self):
441
    AddAuthorizedKey(self.tmpname,
442
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
443

    
444
    self.assertFileContent(self.tmpname,
445
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
446
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
447
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
448

    
449
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
450
    RemoveAuthorizedKey(self.tmpname,
451
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
452

    
453
    self.assertFileContent(self.tmpname,
454
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
455
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
456

    
457
  def testRemovingNonExistingKey(self):
458
    RemoveAuthorizedKey(self.tmpname,
459
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
460

    
461
    self.assertFileContent(self.tmpname,
462
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
463
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
464
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
465

    
466

    
467
class TestEtcHosts(testutils.GanetiTestCase):
468
  """Test functions modifying /etc/hosts"""
469

    
470
  def setUp(self):
471
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
472
    try:
473
      handle = os.fdopen(fd, 'w')
474
      try:
475
        handle.write('# This is a test file for /etc/hosts\n')
476
        handle.write('127.0.0.1\tlocalhost\n')
477
        handle.write('192.168.1.1 router gw\n')
478
      finally:
479
        handle.close()
480
    except:
481
      utils.RemoveFile(self.tmpname)
482
      raise
483

    
484
  def tearDown(self):
485
    utils.RemoveFile(self.tmpname)
486
    del self.tmpname
487

    
488
  def testSettingNewIp(self):
489
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
490

    
491
    self.assertFileContent(self.tmpname,
492
      "# This is a test file for /etc/hosts\n"
493
      "127.0.0.1\tlocalhost\n"
494
      "192.168.1.1 router gw\n"
495
      "1.2.3.4\tmyhost.domain.tld myhost\n")
496

    
497
  def testSettingExistingIp(self):
498
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
499
                     ['myhost'])
500

    
501
    self.assertFileContent(self.tmpname,
502
      "# This is a test file for /etc/hosts\n"
503
      "127.0.0.1\tlocalhost\n"
504
      "192.168.1.1\tmyhost.domain.tld myhost\n")
505

    
506
  def testSettingDuplicateName(self):
507
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
508

    
509
    self.assertFileContent(self.tmpname,
510
      "# This is a test file for /etc/hosts\n"
511
      "127.0.0.1\tlocalhost\n"
512
      "192.168.1.1 router gw\n"
513
      "1.2.3.4\tmyhost\n")
514

    
515
  def testRemovingExistingHost(self):
516
    RemoveEtcHostsEntry(self.tmpname, 'router')
517

    
518
    self.assertFileContent(self.tmpname,
519
      "# This is a test file for /etc/hosts\n"
520
      "127.0.0.1\tlocalhost\n"
521
      "192.168.1.1 gw\n")
522

    
523
  def testRemovingSingleExistingHost(self):
524
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
525

    
526
    self.assertFileContent(self.tmpname,
527
      "# This is a test file for /etc/hosts\n"
528
      "192.168.1.1 router gw\n")
529

    
530
  def testRemovingNonExistingHost(self):
531
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
532

    
533
    self.assertFileContent(self.tmpname,
534
      "# This is a test file for /etc/hosts\n"
535
      "127.0.0.1\tlocalhost\n"
536
      "192.168.1.1 router gw\n")
537

    
538
  def testRemovingAlias(self):
539
    RemoveEtcHostsEntry(self.tmpname, 'gw')
540

    
541
    self.assertFileContent(self.tmpname,
542
      "# This is a test file for /etc/hosts\n"
543
      "127.0.0.1\tlocalhost\n"
544
      "192.168.1.1 router\n")
545

    
546

    
547
class TestShellQuoting(unittest.TestCase):
548
  """Test case for shell quoting functions"""
549

    
550
  def testShellQuote(self):
551
    self.assertEqual(ShellQuote('abc'), "abc")
552
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
553
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
554
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
555
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
556

    
557
  def testShellQuoteArgs(self):
558
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
559
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
560
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
561

    
562

    
563
class TestTcpPing(unittest.TestCase):
564
  """Testcase for TCP version of ping - against listen(2)ing port"""
565

    
566
  def setUp(self):
567
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
568
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
569
    self.listenerport = self.listener.getsockname()[1]
570
    self.listener.listen(1)
571

    
572
  def tearDown(self):
573
    self.listener.shutdown(socket.SHUT_RDWR)
574
    del self.listener
575
    del self.listenerport
576

    
577
  def testTcpPingToLocalHostAccept(self):
578
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
579
                         self.listenerport,
580
                         timeout=10,
581
                         live_port_needed=True,
582
                         source=constants.LOCALHOST_IP_ADDRESS,
583
                         ),
584
                 "failed to connect to test listener")
585

    
586
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
587
                         self.listenerport,
588
                         timeout=10,
589
                         live_port_needed=True,
590
                         ),
591
                 "failed to connect to test listener (no source)")
592

    
593

    
594
class TestTcpPingDeaf(unittest.TestCase):
595
  """Testcase for TCP version of ping - against non listen(2)ing port"""
596

    
597
  def setUp(self):
598
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
599
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
600
    self.deaflistenerport = self.deaflistener.getsockname()[1]
601

    
602
  def tearDown(self):
603
    del self.deaflistener
604
    del self.deaflistenerport
605

    
606
  def testTcpPingToLocalHostAcceptDeaf(self):
607
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
608
                        self.deaflistenerport,
609
                        timeout=constants.TCP_PING_TIMEOUT,
610
                        live_port_needed=True,
611
                        source=constants.LOCALHOST_IP_ADDRESS,
612
                        ), # need successful connect(2)
613
                "successfully connected to deaf listener")
614

    
615
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
616
                        self.deaflistenerport,
617
                        timeout=constants.TCP_PING_TIMEOUT,
618
                        live_port_needed=True,
619
                        ), # need successful connect(2)
620
                "successfully connected to deaf listener (no source addr)")
621

    
622
  def testTcpPingToLocalHostNoAccept(self):
623
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
624
                         self.deaflistenerport,
625
                         timeout=constants.TCP_PING_TIMEOUT,
626
                         live_port_needed=False,
627
                         source=constants.LOCALHOST_IP_ADDRESS,
628
                         ), # ECONNREFUSED is OK
629
                 "failed to ping alive host on deaf port")
630

    
631
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
632
                         self.deaflistenerport,
633
                         timeout=constants.TCP_PING_TIMEOUT,
634
                         live_port_needed=False,
635
                         ), # ECONNREFUSED is OK
636
                 "failed to ping alive host on deaf port (no source addr)")
637

    
638

    
639
class TestListVisibleFiles(unittest.TestCase):
640
  """Test case for ListVisibleFiles"""
641

    
642
  def setUp(self):
643
    self.path = tempfile.mkdtemp()
644

    
645
  def tearDown(self):
646
    shutil.rmtree(self.path)
647

    
648
  def _test(self, files, expected):
649
    # Sort a copy
650
    expected = expected[:]
651
    expected.sort()
652

    
653
    for name in files:
654
      f = open(os.path.join(self.path, name), 'w')
655
      try:
656
        f.write("Test\n")
657
      finally:
658
        f.close()
659

    
660
    found = ListVisibleFiles(self.path)
661
    found.sort()
662

    
663
    self.assertEqual(found, expected)
664

    
665
  def testAllVisible(self):
666
    files = ["a", "b", "c"]
667
    expected = files
668
    self._test(files, expected)
669

    
670
  def testNoneVisible(self):
671
    files = [".a", ".b", ".c"]
672
    expected = []
673
    self._test(files, expected)
674

    
675
  def testSomeVisible(self):
676
    files = ["a", "b", ".c"]
677
    expected = ["a", "b"]
678
    self._test(files, expected)
679

    
680

    
681
class TestNewUUID(unittest.TestCase):
682
  """Test case for NewUUID"""
683

    
684
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
685
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
686

    
687
  def runTest(self):
688
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
689

    
690

    
691
class TestUniqueSequence(unittest.TestCase):
692
  """Test case for UniqueSequence"""
693

    
694
  def _test(self, input, expected):
695
    self.assertEqual(utils.UniqueSequence(input), expected)
696

    
697
  def runTest(self):
698
    # Ordered input
699
    self._test([1, 2, 3], [1, 2, 3])
700
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
701
    self._test([1, 2, 2, 3], [1, 2, 3])
702
    self._test([1, 2, 3, 3], [1, 2, 3])
703

    
704
    # Unordered input
705
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
706
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
707

    
708
    # Strings
709
    self._test(["a", "a"], ["a"])
710
    self._test(["a", "b"], ["a", "b"])
711
    self._test(["a", "b", "a"], ["a", "b"])
712

    
713
class TestFirstFree(unittest.TestCase):
714
  """Test case for the FirstFree function"""
715

    
716
  def test(self):
717
    """Test FirstFree"""
718
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
719
    self.failUnlessEqual(FirstFree([]), None)
720
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
721
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
722
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
723

    
724
if __name__ == '__main__':
725
  unittest.main()