Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ d868edb4

History | View | Annotate | Download (26.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti.utils import IsProcessAlive, RunCmd, \
42
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46
from ganeti.errors import LockError, UnitParseError, GenericError, \
47
     ProgrammerError
48

    
49

    
50
class TestIsProcessAlive(unittest.TestCase):
51
  """Testing case for IsProcessAlive"""
52

    
53
  def testExists(self):
54
    mypid = os.getpid()
55
    self.assert_(IsProcessAlive(mypid),
56
                 "can't find myself running")
57

    
58
  def testNotExisting(self):
59
    pid_non_existing = os.fork()
60
    if pid_non_existing == 0:
61
      os._exit(0)
62
    elif pid_non_existing < 0:
63
      raise SystemError("can't fork")
64
    os.waitpid(pid_non_existing, 0)
65
    self.assert_(not IsProcessAlive(pid_non_existing),
66
                 "nonexisting process detected")
67

    
68

    
69
class TestPidFileFunctions(unittest.TestCase):
70
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
71

    
72
  def setUp(self):
73
    self.dir = tempfile.mkdtemp()
74
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
75
    utils.DaemonPidFileName = self.f_dpn
76

    
77
  def testPidFileFunctions(self):
78
    pid_file = self.f_dpn('test')
79
    utils.WritePidFile('test')
80
    self.failUnless(os.path.exists(pid_file),
81
                    "PID file should have been created")
82
    read_pid = utils.ReadPidFile(pid_file)
83
    self.failUnlessEqual(read_pid, os.getpid())
84
    self.failUnless(utils.IsProcessAlive(read_pid))
85
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
86
    utils.RemovePidFile('test')
87
    self.failIf(os.path.exists(pid_file),
88
                "PID file should not exist anymore")
89
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
90
                         "ReadPidFile should return 0 for missing pid file")
91
    fh = open(pid_file, "w")
92
    fh.write("blah\n")
93
    fh.close()
94
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95
                         "ReadPidFile should return 0 for invalid pid file")
96
    utils.RemovePidFile('test')
97
    self.failIf(os.path.exists(pid_file),
98
                "PID file should not exist anymore")
99

    
100
  def testKill(self):
101
    pid_file = self.f_dpn('child')
102
    r_fd, w_fd = os.pipe()
103
    new_pid = os.fork()
104
    if new_pid == 0: #child
105
      utils.WritePidFile('child')
106
      os.write(w_fd, 'a')
107
      signal.pause()
108
      os._exit(0)
109
      return
110
    # else we are in the parent
111
    # wait until the child has written the pid file
112
    os.read(r_fd, 1)
113
    read_pid = utils.ReadPidFile(pid_file)
114
    self.failUnlessEqual(read_pid, new_pid)
115
    self.failUnless(utils.IsProcessAlive(new_pid))
116
    utils.KillProcess(new_pid, waitpid=True)
117
    self.failIf(utils.IsProcessAlive(new_pid))
118
    utils.RemovePidFile('child')
119
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
120

    
121
  def tearDown(self):
122
    for name in os.listdir(self.dir):
123
      os.unlink(os.path.join(self.dir, name))
124
    os.rmdir(self.dir)
125

    
126

    
127
class TestRunCmd(testutils.GanetiTestCase):
128
  """Testing case for the RunCmd function"""
129

    
130
  def setUp(self):
131
    self.magic = time.ctime() + " ganeti test"
132
    fh, self.fname = tempfile.mkstemp()
133
    os.close(fh)
134

    
135
  def tearDown(self):
136
    if self.fname:
137
      utils.RemoveFile(self.fname)
138

    
139
  def testOk(self):
140
    """Test successful exit code"""
141
    result = RunCmd("/bin/sh -c 'exit 0'")
142
    self.assertEqual(result.exit_code, 0)
143
    self.assertEqual(result.output, "")
144

    
145
  def testFail(self):
146
    """Test fail exit code"""
147
    result = RunCmd("/bin/sh -c 'exit 1'")
148
    self.assertEqual(result.exit_code, 1)
149
    self.assertEqual(result.output, "")
150

    
151
  def testStdout(self):
152
    """Test standard output"""
153
    cmd = 'echo -n "%s"' % self.magic
154
    result = RunCmd("/bin/sh -c '%s'" % cmd)
155
    self.assertEqual(result.stdout, self.magic)
156
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157
    self.assertEqual(result.output, "")
158
    self.assertFileContent(self.fname, self.magic)
159

    
160
  def testStderr(self):
161
    """Test standard error"""
162
    cmd = 'echo -n "%s"' % self.magic
163
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164
    self.assertEqual(result.stderr, self.magic)
165
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166
    self.assertEqual(result.output, "")
167
    self.assertFileContent(self.fname, self.magic)
168

    
169
  def testCombined(self):
170
    """Test combined output"""
171
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172
    expected = "A" + self.magic + "B" + self.magic
173
    result = RunCmd("/bin/sh -c '%s'" % cmd)
174
    self.assertEqual(result.output, expected)
175
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176
    self.assertEqual(result.output, "")
177
    self.assertFileContent(self.fname, expected)
178

    
179
  def testSignal(self):
180
    """Test signal"""
181
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182
    self.assertEqual(result.signal, 15)
183
    self.assertEqual(result.output, "")
184

    
185
  def testListRun(self):
186
    """Test list runs"""
187
    result = RunCmd(["true"])
188
    self.assertEqual(result.signal, None)
189
    self.assertEqual(result.exit_code, 0)
190
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
191
    self.assertEqual(result.signal, None)
192
    self.assertEqual(result.exit_code, 1)
193
    result = RunCmd(["echo", "-n", self.magic])
194
    self.assertEqual(result.signal, None)
195
    self.assertEqual(result.exit_code, 0)
196
    self.assertEqual(result.stdout, self.magic)
197

    
198
  def testFileEmptyOutput(self):
199
    """Test file output"""
200
    result = RunCmd(["true"], output=self.fname)
201
    self.assertEqual(result.signal, None)
202
    self.assertEqual(result.exit_code, 0)
203
    self.assertFileContent(self.fname, "")
204

    
205
  def testLang(self):
206
    """Test locale environment"""
207
    old_env = os.environ.copy()
208
    try:
209
      os.environ["LANG"] = "en_US.UTF-8"
210
      os.environ["LC_ALL"] = "en_US.UTF-8"
211
      result = RunCmd(["locale"])
212
      for line in result.output.splitlines():
213
        key, value = line.split("=", 1)
214
        # Ignore these variables, they're overridden by LC_ALL
215
        if key == "LANG" or key == "LANGUAGE":
216
          continue
217
        self.failIf(value and value != "C" and value != '"C"',
218
            "Variable %s is set to the invalid value '%s'" % (key, value))
219
    finally:
220
      os.environ = old_env
221

    
222
  def testDefaultCwd(self):
223
    """Test default working directory"""
224
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225

    
226
  def testCwd(self):
227
    """Test default working directory"""
228
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230
    cwd = os.getcwd()
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232

    
233

    
234
class TestRemoveFile(unittest.TestCase):
235
  """Test case for the RemoveFile function"""
236

    
237
  def setUp(self):
238
    """Create a temp dir and file for each case"""
239
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241
    os.close(fd)
242

    
243
  def tearDown(self):
244
    if os.path.exists(self.tmpfile):
245
      os.unlink(self.tmpfile)
246
    os.rmdir(self.tmpdir)
247

    
248

    
249
  def testIgnoreDirs(self):
250
    """Test that RemoveFile() ignores directories"""
251
    self.assertEqual(None, RemoveFile(self.tmpdir))
252

    
253

    
254
  def testIgnoreNotExisting(self):
255
    """Test that RemoveFile() ignores non-existing files"""
256
    RemoveFile(self.tmpfile)
257
    RemoveFile(self.tmpfile)
258

    
259

    
260
  def testRemoveFile(self):
261
    """Test that RemoveFile does remove a file"""
262
    RemoveFile(self.tmpfile)
263
    if os.path.exists(self.tmpfile):
264
      self.fail("File '%s' not removed" % self.tmpfile)
265

    
266

    
267
  def testRemoveSymlink(self):
268
    """Test that RemoveFile does remove symlinks"""
269
    symlink = self.tmpdir + "/symlink"
270
    os.symlink("no-such-file", symlink)
271
    RemoveFile(symlink)
272
    if os.path.exists(symlink):
273
      self.fail("File '%s' not removed" % symlink)
274
    os.symlink(self.tmpfile, symlink)
275
    RemoveFile(symlink)
276
    if os.path.exists(symlink):
277
      self.fail("File '%s' not removed" % symlink)
278

    
279

    
280
class TestCheckdict(unittest.TestCase):
281
  """Test case for the CheckDict function"""
282

    
283
  def testAdd(self):
284
    """Test that CheckDict adds a missing key with the correct value"""
285

    
286
    tgt = {'a':1}
287
    tmpl = {'b': 2}
288
    CheckDict(tgt, tmpl)
289
    if 'b' not in tgt or tgt['b'] != 2:
290
      self.fail("Failed to update dict")
291

    
292

    
293
  def testNoUpdate(self):
294
    """Test that CheckDict does not overwrite an existing key"""
295
    tgt = {'a':1, 'b': 3}
296
    tmpl = {'b': 2}
297
    CheckDict(tgt, tmpl)
298
    self.failUnlessEqual(tgt['b'], 3)
299

    
300

    
301
class TestMatchNameComponent(unittest.TestCase):
302
  """Test case for the MatchNameComponent function"""
303

    
304
  def testEmptyList(self):
305
    """Test that there is no match against an empty list"""
306

    
307
    self.failUnlessEqual(MatchNameComponent("", []), None)
308
    self.failUnlessEqual(MatchNameComponent("test", []), None)
309

    
310
  def testSingleMatch(self):
311
    """Test that a single match is performed correctly"""
312
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
313
    for key in "test2", "test2.example", "test2.example.com":
314
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
315

    
316
  def testMultipleMatches(self):
317
    """Test that a multiple match is returned as None"""
318
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
319
    for key in "test1", "test1.example":
320
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
321

    
322

    
323
class TestFormatUnit(unittest.TestCase):
324
  """Test case for the FormatUnit function"""
325

    
326
  def testMiB(self):
327
    self.assertEqual(FormatUnit(1), '1M')
328
    self.assertEqual(FormatUnit(100), '100M')
329
    self.assertEqual(FormatUnit(1023), '1023M')
330

    
331
  def testGiB(self):
332
    self.assertEqual(FormatUnit(1024), '1.0G')
333
    self.assertEqual(FormatUnit(1536), '1.5G')
334
    self.assertEqual(FormatUnit(17133), '16.7G')
335
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
336

    
337
  def testTiB(self):
338
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
339
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
340
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
341

    
342

    
343
class TestParseUnit(unittest.TestCase):
344
  """Test case for the ParseUnit function"""
345

    
346
  SCALES = (('', 1),
347
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
348
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
349
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
350

    
351
  def testRounding(self):
352
    self.assertEqual(ParseUnit('0'), 0)
353
    self.assertEqual(ParseUnit('1'), 4)
354
    self.assertEqual(ParseUnit('2'), 4)
355
    self.assertEqual(ParseUnit('3'), 4)
356

    
357
    self.assertEqual(ParseUnit('124'), 124)
358
    self.assertEqual(ParseUnit('125'), 128)
359
    self.assertEqual(ParseUnit('126'), 128)
360
    self.assertEqual(ParseUnit('127'), 128)
361
    self.assertEqual(ParseUnit('128'), 128)
362
    self.assertEqual(ParseUnit('129'), 132)
363
    self.assertEqual(ParseUnit('130'), 132)
364

    
365
  def testFloating(self):
366
    self.assertEqual(ParseUnit('0'), 0)
367
    self.assertEqual(ParseUnit('0.5'), 4)
368
    self.assertEqual(ParseUnit('1.75'), 4)
369
    self.assertEqual(ParseUnit('1.99'), 4)
370
    self.assertEqual(ParseUnit('2.00'), 4)
371
    self.assertEqual(ParseUnit('2.01'), 4)
372
    self.assertEqual(ParseUnit('3.99'), 4)
373
    self.assertEqual(ParseUnit('4.00'), 4)
374
    self.assertEqual(ParseUnit('4.01'), 8)
375
    self.assertEqual(ParseUnit('1.5G'), 1536)
376
    self.assertEqual(ParseUnit('1.8G'), 1844)
377
    self.assertEqual(ParseUnit('8.28T'), 8682212)
378

    
379
  def testSuffixes(self):
380
    for sep in ('', ' ', '   ', "\t", "\t "):
381
      for suffix, scale in TestParseUnit.SCALES:
382
        for func in (lambda x: x, str.lower, str.upper):
383
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
384
                           1024 * scale)
385

    
386
  def testInvalidInput(self):
387
    for sep in ('-', '_', ',', 'a'):
388
      for suffix, _ in TestParseUnit.SCALES:
389
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
390

    
391
    for suffix, _ in TestParseUnit.SCALES:
392
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
393

    
394

    
395
class TestSshKeys(testutils.GanetiTestCase):
396
  """Test case for the AddAuthorizedKey function"""
397

    
398
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
399
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
400
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
401

    
402
  def setUp(self):
403
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
404
    try:
405
      handle = os.fdopen(fd, 'w')
406
      try:
407
        handle.write("%s\n" % TestSshKeys.KEY_A)
408
        handle.write("%s\n" % TestSshKeys.KEY_B)
409
      finally:
410
        handle.close()
411
    except:
412
      utils.RemoveFile(self.tmpname)
413
      raise
414

    
415
  def tearDown(self):
416
    utils.RemoveFile(self.tmpname)
417
    del self.tmpname
418

    
419
  def testAddingNewKey(self):
420
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
421

    
422
    self.assertFileContent(self.tmpname,
423
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
424
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
425
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
426
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
427

    
428
  def testAddingAlmostButNotCompletelyTheSameKey(self):
429
    AddAuthorizedKey(self.tmpname,
430
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
431

    
432
    self.assertFileContent(self.tmpname,
433
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
434
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
435
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
436
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
437

    
438
  def testAddingExistingKeyWithSomeMoreSpaces(self):
439
    AddAuthorizedKey(self.tmpname,
440
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
441

    
442
    self.assertFileContent(self.tmpname,
443
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
444
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
445
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
446

    
447
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
448
    RemoveAuthorizedKey(self.tmpname,
449
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
450

    
451
    self.assertFileContent(self.tmpname,
452
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
453
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
454

    
455
  def testRemovingNonExistingKey(self):
456
    RemoveAuthorizedKey(self.tmpname,
457
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
458

    
459
    self.assertFileContent(self.tmpname,
460
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
461
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
462
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
463

    
464

    
465
class TestEtcHosts(testutils.GanetiTestCase):
466
  """Test functions modifying /etc/hosts"""
467

    
468
  def setUp(self):
469
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
470
    try:
471
      handle = os.fdopen(fd, 'w')
472
      try:
473
        handle.write('# This is a test file for /etc/hosts\n')
474
        handle.write('127.0.0.1\tlocalhost\n')
475
        handle.write('192.168.1.1 router gw\n')
476
      finally:
477
        handle.close()
478
    except:
479
      utils.RemoveFile(self.tmpname)
480
      raise
481

    
482
  def tearDown(self):
483
    utils.RemoveFile(self.tmpname)
484
    del self.tmpname
485

    
486
  def testSettingNewIp(self):
487
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
488

    
489
    self.assertFileContent(self.tmpname,
490
      "# This is a test file for /etc/hosts\n"
491
      "127.0.0.1\tlocalhost\n"
492
      "192.168.1.1 router gw\n"
493
      "1.2.3.4\tmyhost.domain.tld myhost\n")
494

    
495
  def testSettingExistingIp(self):
496
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
497
                     ['myhost'])
498

    
499
    self.assertFileContent(self.tmpname,
500
      "# This is a test file for /etc/hosts\n"
501
      "127.0.0.1\tlocalhost\n"
502
      "192.168.1.1\tmyhost.domain.tld myhost\n")
503

    
504
  def testSettingDuplicateName(self):
505
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
506

    
507
    self.assertFileContent(self.tmpname,
508
      "# This is a test file for /etc/hosts\n"
509
      "127.0.0.1\tlocalhost\n"
510
      "192.168.1.1 router gw\n"
511
      "1.2.3.4\tmyhost\n")
512

    
513
  def testRemovingExistingHost(self):
514
    RemoveEtcHostsEntry(self.tmpname, 'router')
515

    
516
    self.assertFileContent(self.tmpname,
517
      "# This is a test file for /etc/hosts\n"
518
      "127.0.0.1\tlocalhost\n"
519
      "192.168.1.1 gw\n")
520

    
521
  def testRemovingSingleExistingHost(self):
522
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
523

    
524
    self.assertFileContent(self.tmpname,
525
      "# This is a test file for /etc/hosts\n"
526
      "192.168.1.1 router gw\n")
527

    
528
  def testRemovingNonExistingHost(self):
529
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
530

    
531
    self.assertFileContent(self.tmpname,
532
      "# This is a test file for /etc/hosts\n"
533
      "127.0.0.1\tlocalhost\n"
534
      "192.168.1.1 router gw\n")
535

    
536
  def testRemovingAlias(self):
537
    RemoveEtcHostsEntry(self.tmpname, 'gw')
538

    
539
    self.assertFileContent(self.tmpname,
540
      "# This is a test file for /etc/hosts\n"
541
      "127.0.0.1\tlocalhost\n"
542
      "192.168.1.1 router\n")
543

    
544

    
545
class TestShellQuoting(unittest.TestCase):
546
  """Test case for shell quoting functions"""
547

    
548
  def testShellQuote(self):
549
    self.assertEqual(ShellQuote('abc'), "abc")
550
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
551
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
552
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
553
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
554

    
555
  def testShellQuoteArgs(self):
556
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
557
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
558
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
559

    
560

    
561
class TestTcpPing(unittest.TestCase):
562
  """Testcase for TCP version of ping - against listen(2)ing port"""
563

    
564
  def setUp(self):
565
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
566
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
567
    self.listenerport = self.listener.getsockname()[1]
568
    self.listener.listen(1)
569

    
570
  def tearDown(self):
571
    self.listener.shutdown(socket.SHUT_RDWR)
572
    del self.listener
573
    del self.listenerport
574

    
575
  def testTcpPingToLocalHostAccept(self):
576
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
577
                         self.listenerport,
578
                         timeout=10,
579
                         live_port_needed=True,
580
                         source=constants.LOCALHOST_IP_ADDRESS,
581
                         ),
582
                 "failed to connect to test listener")
583

    
584
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
585
                         self.listenerport,
586
                         timeout=10,
587
                         live_port_needed=True,
588
                         ),
589
                 "failed to connect to test listener (no source)")
590

    
591

    
592
class TestTcpPingDeaf(unittest.TestCase):
593
  """Testcase for TCP version of ping - against non listen(2)ing port"""
594

    
595
  def setUp(self):
596
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
597
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
598
    self.deaflistenerport = self.deaflistener.getsockname()[1]
599

    
600
  def tearDown(self):
601
    del self.deaflistener
602
    del self.deaflistenerport
603

    
604
  def testTcpPingToLocalHostAcceptDeaf(self):
605
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
606
                        self.deaflistenerport,
607
                        timeout=constants.TCP_PING_TIMEOUT,
608
                        live_port_needed=True,
609
                        source=constants.LOCALHOST_IP_ADDRESS,
610
                        ), # need successful connect(2)
611
                "successfully connected to deaf listener")
612

    
613
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
614
                        self.deaflistenerport,
615
                        timeout=constants.TCP_PING_TIMEOUT,
616
                        live_port_needed=True,
617
                        ), # need successful connect(2)
618
                "successfully connected to deaf listener (no source addr)")
619

    
620
  def testTcpPingToLocalHostNoAccept(self):
621
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
622
                         self.deaflistenerport,
623
                         timeout=constants.TCP_PING_TIMEOUT,
624
                         live_port_needed=False,
625
                         source=constants.LOCALHOST_IP_ADDRESS,
626
                         ), # ECONNREFUSED is OK
627
                 "failed to ping alive host on deaf port")
628

    
629
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
630
                         self.deaflistenerport,
631
                         timeout=constants.TCP_PING_TIMEOUT,
632
                         live_port_needed=False,
633
                         ), # ECONNREFUSED is OK
634
                 "failed to ping alive host on deaf port (no source addr)")
635

    
636

    
637
class TestOwnIpAddress(unittest.TestCase):
638
  """Testcase for OwnIpAddress"""
639

    
640
  def testOwnLoopback(self):
641
    """check having the loopback ip"""
642
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
643
                    "Should own the loopback address")
644

    
645
  def testNowOwnAddress(self):
646
    """check that I don't own an address"""
647

    
648
    # network 192.0.2.0/24 is reserved for test/documentation as per
649
    # rfc 3330, so we *should* not have an address of this range... if
650
    # this fails, we should extend the test to multiple addresses
651
    DST_IP = "192.0.2.1"
652
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
653

    
654

    
655
class TestListVisibleFiles(unittest.TestCase):
656
  """Test case for ListVisibleFiles"""
657

    
658
  def setUp(self):
659
    self.path = tempfile.mkdtemp()
660

    
661
  def tearDown(self):
662
    shutil.rmtree(self.path)
663

    
664
  def _test(self, files, expected):
665
    # Sort a copy
666
    expected = expected[:]
667
    expected.sort()
668

    
669
    for name in files:
670
      f = open(os.path.join(self.path, name), 'w')
671
      try:
672
        f.write("Test\n")
673
      finally:
674
        f.close()
675

    
676
    found = ListVisibleFiles(self.path)
677
    found.sort()
678

    
679
    self.assertEqual(found, expected)
680

    
681
  def testAllVisible(self):
682
    files = ["a", "b", "c"]
683
    expected = files
684
    self._test(files, expected)
685

    
686
  def testNoneVisible(self):
687
    files = [".a", ".b", ".c"]
688
    expected = []
689
    self._test(files, expected)
690

    
691
  def testSomeVisible(self):
692
    files = ["a", "b", ".c"]
693
    expected = ["a", "b"]
694
    self._test(files, expected)
695

    
696

    
697
class TestNewUUID(unittest.TestCase):
698
  """Test case for NewUUID"""
699

    
700
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
701
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
702

    
703
  def runTest(self):
704
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
705

    
706

    
707
class TestUniqueSequence(unittest.TestCase):
708
  """Test case for UniqueSequence"""
709

    
710
  def _test(self, input, expected):
711
    self.assertEqual(utils.UniqueSequence(input), expected)
712

    
713
  def runTest(self):
714
    # Ordered input
715
    self._test([1, 2, 3], [1, 2, 3])
716
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
717
    self._test([1, 2, 2, 3], [1, 2, 3])
718
    self._test([1, 2, 3, 3], [1, 2, 3])
719

    
720
    # Unordered input
721
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
722
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
723

    
724
    # Strings
725
    self._test(["a", "a"], ["a"])
726
    self._test(["a", "b"], ["a", "b"])
727
    self._test(["a", "b", "a"], ["a", "b"])
728

    
729

    
730
class TestFirstFree(unittest.TestCase):
731
  """Test case for the FirstFree function"""
732

    
733
  def test(self):
734
    """Test FirstFree"""
735
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
736
    self.failUnlessEqual(FirstFree([]), None)
737
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
738
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
739
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
740

    
741

    
742
class TestFileLock(unittest.TestCase):
743
  """Test case for the FileLock class"""
744

    
745
  def setUp(self):
746
    self.tmpfile = tempfile.NamedTemporaryFile()
747
    self.lock = utils.FileLock(self.tmpfile.name)
748

    
749
  def testSharedNonblocking(self):
750
    self.lock.Shared(blocking=False)
751
    self.lock.Close()
752

    
753
  def testExclusiveNonblocking(self):
754
    self.lock.Exclusive(blocking=False)
755
    self.lock.Close()
756

    
757
  def testUnlockNonblocking(self):
758
    self.lock.Unlock(blocking=False)
759
    self.lock.Close()
760

    
761
  def testSharedBlocking(self):
762
    self.lock.Shared(blocking=True)
763
    self.lock.Close()
764

    
765
  def testExclusiveBlocking(self):
766
    self.lock.Exclusive(blocking=True)
767
    self.lock.Close()
768

    
769
  def testUnlockBlocking(self):
770
    self.lock.Unlock(blocking=True)
771
    self.lock.Close()
772

    
773
  def testSharedExclusiveUnlock(self):
774
    self.lock.Shared(blocking=False)
775
    self.lock.Exclusive(blocking=False)
776
    self.lock.Unlock(blocking=False)
777
    self.lock.Close()
778

    
779
  def testExclusiveSharedUnlock(self):
780
    self.lock.Exclusive(blocking=False)
781
    self.lock.Shared(blocking=False)
782
    self.lock.Unlock(blocking=False)
783
    self.lock.Close()
784

    
785
  def testCloseShared(self):
786
    self.lock.Close()
787
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
788

    
789
  def testCloseExclusive(self):
790
    self.lock.Close()
791
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
792

    
793
  def testCloseUnlock(self):
794
    self.lock.Close()
795
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
796

    
797

    
798
class TestTimeFunctions(unittest.TestCase):
799
  """Test case for time functions"""
800

    
801
  def runTest(self):
802
    self.assertEqual(utils.SplitTime(1), (1, 0))
803
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
804
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
805
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
806
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
807
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
808
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
809
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
810

    
811
    self.assertRaises(AssertionError, utils.SplitTime, -1)
812

    
813
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
814
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
815
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
816

    
817
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
818
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
819

    
820
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
821
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
822
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
823
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
824
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
825

    
826

    
827
if __name__ == '__main__':
828
  unittest.main()