Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ ff5251bc

History | View | Annotate | Download (27.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti.utils import IsProcessAlive, RunCmd, \
42
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46
from ganeti.errors import LockError, UnitParseError, GenericError, \
47
     ProgrammerError
48

    
49

    
50
class TestIsProcessAlive(unittest.TestCase):
51
  """Testing case for IsProcessAlive"""
52

    
53
  def _CreateZombie(self):
54
    # create a zombie
55
    r_fd, w_fd = os.pipe()
56
    pid_zombie = os.fork()
57
    if pid_zombie == 0:
58
      # explicit close of read, write end will be closed only due to exit
59
      os.close(r_fd)
60
      os._exit(0)
61
    elif pid_zombie < 0:
62
      raise SystemError("can't fork")
63
    # parent: we close our end of the w_fd, so reads will error as
64
    # soon as the OS cleans the child's filedescriptors on its exit
65
    os.close(w_fd)
66
    # wait for 60 seconds at max for the exit (pathological case, just
67
    # so that the test doesn't hang indefinitely)
68
    r_set, w_set, e_set = select.select([r_fd], [], [], 60)
69
    if not r_set and not e_set:
70
      self.fail("Timeout exceeded in zombie creation")
71
    return pid_zombie
72

    
73
  def testExists(self):
74
    mypid = os.getpid()
75
    self.assert_(IsProcessAlive(mypid),
76
                 "can't find myself running")
77

    
78
  def testZombie(self):
79
    pid_zombie = self._CreateZombie()
80
    is_zombie = not IsProcessAlive(pid_zombie)
81
    self.assert_(is_zombie, "zombie not detected as zombie")
82
    os.waitpid(pid_zombie, os.WNOHANG)
83

    
84
  def testNotExisting(self):
85
    pid_non_existing = os.fork()
86
    if pid_non_existing == 0:
87
      os._exit(0)
88
    elif pid_non_existing < 0:
89
      raise SystemError("can't fork")
90
    os.waitpid(pid_non_existing, 0)
91
    self.assert_(not IsProcessAlive(pid_non_existing),
92
                 "nonexisting process detected")
93

    
94

    
95
class TestPidFileFunctions(unittest.TestCase):
96
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
97

    
98
  def setUp(self):
99
    self.dir = tempfile.mkdtemp()
100
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
101
    utils.DaemonPidFileName = self.f_dpn
102

    
103
  def testPidFileFunctions(self):
104
    pid_file = self.f_dpn('test')
105
    utils.WritePidFile('test')
106
    self.failUnless(os.path.exists(pid_file),
107
                    "PID file should have been created")
108
    read_pid = utils.ReadPidFile(pid_file)
109
    self.failUnlessEqual(read_pid, os.getpid())
110
    self.failUnless(utils.IsProcessAlive(read_pid))
111
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
112
    utils.RemovePidFile('test')
113
    self.failIf(os.path.exists(pid_file),
114
                "PID file should not exist anymore")
115
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
116
                         "ReadPidFile should return 0 for missing pid file")
117
    fh = open(pid_file, "w")
118
    fh.write("blah\n")
119
    fh.close()
120
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
121
                         "ReadPidFile should return 0 for invalid pid file")
122
    utils.RemovePidFile('test')
123
    self.failIf(os.path.exists(pid_file),
124
                "PID file should not exist anymore")
125

    
126
  def testKill(self):
127
    pid_file = self.f_dpn('child')
128
    r_fd, w_fd = os.pipe()
129
    new_pid = os.fork()
130
    if new_pid == 0: #child
131
      utils.WritePidFile('child')
132
      os.write(w_fd, 'a')
133
      signal.pause()
134
      os._exit(0)
135
      return
136
    # else we are in the parent
137
    # wait until the child has written the pid file
138
    os.read(r_fd, 1)
139
    read_pid = utils.ReadPidFile(pid_file)
140
    self.failUnlessEqual(read_pid, new_pid)
141
    self.failUnless(utils.IsProcessAlive(new_pid))
142
    utils.KillProcess(new_pid, waitpid=True)
143
    self.failIf(utils.IsProcessAlive(new_pid))
144
    utils.RemovePidFile('child')
145
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
146

    
147
  def tearDown(self):
148
    for name in os.listdir(self.dir):
149
      os.unlink(os.path.join(self.dir, name))
150
    os.rmdir(self.dir)
151

    
152

    
153
class TestRunCmd(testutils.GanetiTestCase):
154
  """Testing case for the RunCmd function"""
155

    
156
  def setUp(self):
157
    self.magic = time.ctime() + " ganeti test"
158
    fh, self.fname = tempfile.mkstemp()
159
    os.close(fh)
160

    
161
  def tearDown(self):
162
    if self.fname:
163
      utils.RemoveFile(self.fname)
164

    
165
  def testOk(self):
166
    """Test successful exit code"""
167
    result = RunCmd("/bin/sh -c 'exit 0'")
168
    self.assertEqual(result.exit_code, 0)
169
    self.assertEqual(result.output, "")
170

    
171
  def testFail(self):
172
    """Test fail exit code"""
173
    result = RunCmd("/bin/sh -c 'exit 1'")
174
    self.assertEqual(result.exit_code, 1)
175
    self.assertEqual(result.output, "")
176

    
177
  def testStdout(self):
178
    """Test standard output"""
179
    cmd = 'echo -n "%s"' % self.magic
180
    result = RunCmd("/bin/sh -c '%s'" % cmd)
181
    self.assertEqual(result.stdout, self.magic)
182
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
183
    self.assertEqual(result.output, "")
184
    self.assertFileContent(self.fname, self.magic)
185

    
186
  def testStderr(self):
187
    """Test standard error"""
188
    cmd = 'echo -n "%s"' % self.magic
189
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
190
    self.assertEqual(result.stderr, self.magic)
191
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
192
    self.assertEqual(result.output, "")
193
    self.assertFileContent(self.fname, self.magic)
194

    
195
  def testCombined(self):
196
    """Test combined output"""
197
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
198
    expected = "A" + self.magic + "B" + self.magic
199
    result = RunCmd("/bin/sh -c '%s'" % cmd)
200
    self.assertEqual(result.output, expected)
201
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
202
    self.assertEqual(result.output, "")
203
    self.assertFileContent(self.fname, expected)
204

    
205
  def testSignal(self):
206
    """Test signal"""
207
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
208
    self.assertEqual(result.signal, 15)
209
    self.assertEqual(result.output, "")
210

    
211
  def testListRun(self):
212
    """Test list runs"""
213
    result = RunCmd(["true"])
214
    self.assertEqual(result.signal, None)
215
    self.assertEqual(result.exit_code, 0)
216
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
217
    self.assertEqual(result.signal, None)
218
    self.assertEqual(result.exit_code, 1)
219
    result = RunCmd(["echo", "-n", self.magic])
220
    self.assertEqual(result.signal, None)
221
    self.assertEqual(result.exit_code, 0)
222
    self.assertEqual(result.stdout, self.magic)
223

    
224
  def testFileEmptyOutput(self):
225
    """Test file output"""
226
    result = RunCmd(["true"], output=self.fname)
227
    self.assertEqual(result.signal, None)
228
    self.assertEqual(result.exit_code, 0)
229
    self.assertFileContent(self.fname, "")
230

    
231
  def testLang(self):
232
    """Test locale environment"""
233
    old_env = os.environ.copy()
234
    try:
235
      os.environ["LANG"] = "en_US.UTF-8"
236
      os.environ["LC_ALL"] = "en_US.UTF-8"
237
      result = RunCmd(["locale"])
238
      for line in result.output.splitlines():
239
        key, value = line.split("=", 1)
240
        # Ignore these variables, they're overridden by LC_ALL
241
        if key == "LANG" or key == "LANGUAGE":
242
          continue
243
        self.failIf(value and value != "C" and value != '"C"',
244
            "Variable %s is set to the invalid value '%s'" % (key, value))
245
    finally:
246
      os.environ = old_env
247

    
248
  def testDefaultCwd(self):
249
    """Test default working directory"""
250
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
251

    
252
  def testCwd(self):
253
    """Test default working directory"""
254
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
255
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
256
    cwd = os.getcwd()
257
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
258

    
259

    
260
class TestRemoveFile(unittest.TestCase):
261
  """Test case for the RemoveFile function"""
262

    
263
  def setUp(self):
264
    """Create a temp dir and file for each case"""
265
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
266
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
267
    os.close(fd)
268

    
269
  def tearDown(self):
270
    if os.path.exists(self.tmpfile):
271
      os.unlink(self.tmpfile)
272
    os.rmdir(self.tmpdir)
273

    
274

    
275
  def testIgnoreDirs(self):
276
    """Test that RemoveFile() ignores directories"""
277
    self.assertEqual(None, RemoveFile(self.tmpdir))
278

    
279

    
280
  def testIgnoreNotExisting(self):
281
    """Test that RemoveFile() ignores non-existing files"""
282
    RemoveFile(self.tmpfile)
283
    RemoveFile(self.tmpfile)
284

    
285

    
286
  def testRemoveFile(self):
287
    """Test that RemoveFile does remove a file"""
288
    RemoveFile(self.tmpfile)
289
    if os.path.exists(self.tmpfile):
290
      self.fail("File '%s' not removed" % self.tmpfile)
291

    
292

    
293
  def testRemoveSymlink(self):
294
    """Test that RemoveFile does remove symlinks"""
295
    symlink = self.tmpdir + "/symlink"
296
    os.symlink("no-such-file", symlink)
297
    RemoveFile(symlink)
298
    if os.path.exists(symlink):
299
      self.fail("File '%s' not removed" % symlink)
300
    os.symlink(self.tmpfile, symlink)
301
    RemoveFile(symlink)
302
    if os.path.exists(symlink):
303
      self.fail("File '%s' not removed" % symlink)
304

    
305

    
306
class TestCheckdict(unittest.TestCase):
307
  """Test case for the CheckDict function"""
308

    
309
  def testAdd(self):
310
    """Test that CheckDict adds a missing key with the correct value"""
311

    
312
    tgt = {'a':1}
313
    tmpl = {'b': 2}
314
    CheckDict(tgt, tmpl)
315
    if 'b' not in tgt or tgt['b'] != 2:
316
      self.fail("Failed to update dict")
317

    
318

    
319
  def testNoUpdate(self):
320
    """Test that CheckDict does not overwrite an existing key"""
321
    tgt = {'a':1, 'b': 3}
322
    tmpl = {'b': 2}
323
    CheckDict(tgt, tmpl)
324
    self.failUnlessEqual(tgt['b'], 3)
325

    
326

    
327
class TestMatchNameComponent(unittest.TestCase):
328
  """Test case for the MatchNameComponent function"""
329

    
330
  def testEmptyList(self):
331
    """Test that there is no match against an empty list"""
332

    
333
    self.failUnlessEqual(MatchNameComponent("", []), None)
334
    self.failUnlessEqual(MatchNameComponent("test", []), None)
335

    
336
  def testSingleMatch(self):
337
    """Test that a single match is performed correctly"""
338
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
339
    for key in "test2", "test2.example", "test2.example.com":
340
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
341

    
342
  def testMultipleMatches(self):
343
    """Test that a multiple match is returned as None"""
344
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
345
    for key in "test1", "test1.example":
346
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
347

    
348

    
349
class TestFormatUnit(unittest.TestCase):
350
  """Test case for the FormatUnit function"""
351

    
352
  def testMiB(self):
353
    self.assertEqual(FormatUnit(1), '1M')
354
    self.assertEqual(FormatUnit(100), '100M')
355
    self.assertEqual(FormatUnit(1023), '1023M')
356

    
357
  def testGiB(self):
358
    self.assertEqual(FormatUnit(1024), '1.0G')
359
    self.assertEqual(FormatUnit(1536), '1.5G')
360
    self.assertEqual(FormatUnit(17133), '16.7G')
361
    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
362

    
363
  def testTiB(self):
364
    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
365
    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
366
    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
367

    
368

    
369
class TestParseUnit(unittest.TestCase):
370
  """Test case for the ParseUnit function"""
371

    
372
  SCALES = (('', 1),
373
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
374
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
375
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
376

    
377
  def testRounding(self):
378
    self.assertEqual(ParseUnit('0'), 0)
379
    self.assertEqual(ParseUnit('1'), 4)
380
    self.assertEqual(ParseUnit('2'), 4)
381
    self.assertEqual(ParseUnit('3'), 4)
382

    
383
    self.assertEqual(ParseUnit('124'), 124)
384
    self.assertEqual(ParseUnit('125'), 128)
385
    self.assertEqual(ParseUnit('126'), 128)
386
    self.assertEqual(ParseUnit('127'), 128)
387
    self.assertEqual(ParseUnit('128'), 128)
388
    self.assertEqual(ParseUnit('129'), 132)
389
    self.assertEqual(ParseUnit('130'), 132)
390

    
391
  def testFloating(self):
392
    self.assertEqual(ParseUnit('0'), 0)
393
    self.assertEqual(ParseUnit('0.5'), 4)
394
    self.assertEqual(ParseUnit('1.75'), 4)
395
    self.assertEqual(ParseUnit('1.99'), 4)
396
    self.assertEqual(ParseUnit('2.00'), 4)
397
    self.assertEqual(ParseUnit('2.01'), 4)
398
    self.assertEqual(ParseUnit('3.99'), 4)
399
    self.assertEqual(ParseUnit('4.00'), 4)
400
    self.assertEqual(ParseUnit('4.01'), 8)
401
    self.assertEqual(ParseUnit('1.5G'), 1536)
402
    self.assertEqual(ParseUnit('1.8G'), 1844)
403
    self.assertEqual(ParseUnit('8.28T'), 8682212)
404

    
405
  def testSuffixes(self):
406
    for sep in ('', ' ', '   ', "\t", "\t "):
407
      for suffix, scale in TestParseUnit.SCALES:
408
        for func in (lambda x: x, str.lower, str.upper):
409
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
410
                           1024 * scale)
411

    
412
  def testInvalidInput(self):
413
    for sep in ('-', '_', ',', 'a'):
414
      for suffix, _ in TestParseUnit.SCALES:
415
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
416

    
417
    for suffix, _ in TestParseUnit.SCALES:
418
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
419

    
420

    
421
class TestSshKeys(testutils.GanetiTestCase):
422
  """Test case for the AddAuthorizedKey function"""
423

    
424
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
425
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
426
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
427

    
428
  def setUp(self):
429
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
430
    try:
431
      handle = os.fdopen(fd, 'w')
432
      try:
433
        handle.write("%s\n" % TestSshKeys.KEY_A)
434
        handle.write("%s\n" % TestSshKeys.KEY_B)
435
      finally:
436
        handle.close()
437
    except:
438
      utils.RemoveFile(self.tmpname)
439
      raise
440

    
441
  def tearDown(self):
442
    utils.RemoveFile(self.tmpname)
443
    del self.tmpname
444

    
445
  def testAddingNewKey(self):
446
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
447

    
448
    self.assertFileContent(self.tmpname,
449
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
450
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
451
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
452
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
453

    
454
  def testAddingAlmostButNotCompletelyTheSameKey(self):
455
    AddAuthorizedKey(self.tmpname,
456
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
457

    
458
    self.assertFileContent(self.tmpname,
459
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
460
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
461
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
462
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
463

    
464
  def testAddingExistingKeyWithSomeMoreSpaces(self):
465
    AddAuthorizedKey(self.tmpname,
466
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
467

    
468
    self.assertFileContent(self.tmpname,
469
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
470
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
471
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
472

    
473
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
474
    RemoveAuthorizedKey(self.tmpname,
475
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
476

    
477
    self.assertFileContent(self.tmpname,
478
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
479
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
480

    
481
  def testRemovingNonExistingKey(self):
482
    RemoveAuthorizedKey(self.tmpname,
483
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
484

    
485
    self.assertFileContent(self.tmpname,
486
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
487
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
488
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
489

    
490

    
491
class TestEtcHosts(testutils.GanetiTestCase):
492
  """Test functions modifying /etc/hosts"""
493

    
494
  def setUp(self):
495
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
496
    try:
497
      handle = os.fdopen(fd, 'w')
498
      try:
499
        handle.write('# This is a test file for /etc/hosts\n')
500
        handle.write('127.0.0.1\tlocalhost\n')
501
        handle.write('192.168.1.1 router gw\n')
502
      finally:
503
        handle.close()
504
    except:
505
      utils.RemoveFile(self.tmpname)
506
      raise
507

    
508
  def tearDown(self):
509
    utils.RemoveFile(self.tmpname)
510
    del self.tmpname
511

    
512
  def testSettingNewIp(self):
513
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
514

    
515
    self.assertFileContent(self.tmpname,
516
      "# This is a test file for /etc/hosts\n"
517
      "127.0.0.1\tlocalhost\n"
518
      "192.168.1.1 router gw\n"
519
      "1.2.3.4\tmyhost.domain.tld myhost\n")
520

    
521
  def testSettingExistingIp(self):
522
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
523
                     ['myhost'])
524

    
525
    self.assertFileContent(self.tmpname,
526
      "# This is a test file for /etc/hosts\n"
527
      "127.0.0.1\tlocalhost\n"
528
      "192.168.1.1\tmyhost.domain.tld myhost\n")
529

    
530
  def testSettingDuplicateName(self):
531
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
532

    
533
    self.assertFileContent(self.tmpname,
534
      "# This is a test file for /etc/hosts\n"
535
      "127.0.0.1\tlocalhost\n"
536
      "192.168.1.1 router gw\n"
537
      "1.2.3.4\tmyhost\n")
538

    
539
  def testRemovingExistingHost(self):
540
    RemoveEtcHostsEntry(self.tmpname, 'router')
541

    
542
    self.assertFileContent(self.tmpname,
543
      "# This is a test file for /etc/hosts\n"
544
      "127.0.0.1\tlocalhost\n"
545
      "192.168.1.1 gw\n")
546

    
547
  def testRemovingSingleExistingHost(self):
548
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
549

    
550
    self.assertFileContent(self.tmpname,
551
      "# This is a test file for /etc/hosts\n"
552
      "192.168.1.1 router gw\n")
553

    
554
  def testRemovingNonExistingHost(self):
555
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
556

    
557
    self.assertFileContent(self.tmpname,
558
      "# This is a test file for /etc/hosts\n"
559
      "127.0.0.1\tlocalhost\n"
560
      "192.168.1.1 router gw\n")
561

    
562
  def testRemovingAlias(self):
563
    RemoveEtcHostsEntry(self.tmpname, 'gw')
564

    
565
    self.assertFileContent(self.tmpname,
566
      "# This is a test file for /etc/hosts\n"
567
      "127.0.0.1\tlocalhost\n"
568
      "192.168.1.1 router\n")
569

    
570

    
571
class TestShellQuoting(unittest.TestCase):
572
  """Test case for shell quoting functions"""
573

    
574
  def testShellQuote(self):
575
    self.assertEqual(ShellQuote('abc'), "abc")
576
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
577
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
578
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
579
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
580

    
581
  def testShellQuoteArgs(self):
582
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
583
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
584
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
585

    
586

    
587
class TestTcpPing(unittest.TestCase):
588
  """Testcase for TCP version of ping - against listen(2)ing port"""
589

    
590
  def setUp(self):
591
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
592
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
593
    self.listenerport = self.listener.getsockname()[1]
594
    self.listener.listen(1)
595

    
596
  def tearDown(self):
597
    self.listener.shutdown(socket.SHUT_RDWR)
598
    del self.listener
599
    del self.listenerport
600

    
601
  def testTcpPingToLocalHostAccept(self):
602
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
603
                         self.listenerport,
604
                         timeout=10,
605
                         live_port_needed=True,
606
                         source=constants.LOCALHOST_IP_ADDRESS,
607
                         ),
608
                 "failed to connect to test listener")
609

    
610
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
611
                         self.listenerport,
612
                         timeout=10,
613
                         live_port_needed=True,
614
                         ),
615
                 "failed to connect to test listener (no source)")
616

    
617

    
618
class TestTcpPingDeaf(unittest.TestCase):
619
  """Testcase for TCP version of ping - against non listen(2)ing port"""
620

    
621
  def setUp(self):
622
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
623
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
624
    self.deaflistenerport = self.deaflistener.getsockname()[1]
625

    
626
  def tearDown(self):
627
    del self.deaflistener
628
    del self.deaflistenerport
629

    
630
  def testTcpPingToLocalHostAcceptDeaf(self):
631
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
632
                        self.deaflistenerport,
633
                        timeout=constants.TCP_PING_TIMEOUT,
634
                        live_port_needed=True,
635
                        source=constants.LOCALHOST_IP_ADDRESS,
636
                        ), # need successful connect(2)
637
                "successfully connected to deaf listener")
638

    
639
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
640
                        self.deaflistenerport,
641
                        timeout=constants.TCP_PING_TIMEOUT,
642
                        live_port_needed=True,
643
                        ), # need successful connect(2)
644
                "successfully connected to deaf listener (no source addr)")
645

    
646
  def testTcpPingToLocalHostNoAccept(self):
647
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
648
                         self.deaflistenerport,
649
                         timeout=constants.TCP_PING_TIMEOUT,
650
                         live_port_needed=False,
651
                         source=constants.LOCALHOST_IP_ADDRESS,
652
                         ), # ECONNREFUSED is OK
653
                 "failed to ping alive host on deaf port")
654

    
655
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
656
                         self.deaflistenerport,
657
                         timeout=constants.TCP_PING_TIMEOUT,
658
                         live_port_needed=False,
659
                         ), # ECONNREFUSED is OK
660
                 "failed to ping alive host on deaf port (no source addr)")
661

    
662

    
663
class TestOwnIpAddress(unittest.TestCase):
664
  """Testcase for OwnIpAddress"""
665

    
666
  def testOwnLoopback(self):
667
    """check having the loopback ip"""
668
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
669
                    "Should own the loopback address")
670

    
671
  def testNowOwnAddress(self):
672
    """check that I don't own an address"""
673

    
674
    # network 192.0.2.0/24 is reserved for test/documentation as per
675
    # rfc 3330, so we *should* not have an address of this range... if
676
    # this fails, we should extend the test to multiple addresses
677
    DST_IP = "192.0.2.1"
678
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
679

    
680

    
681
class TestListVisibleFiles(unittest.TestCase):
682
  """Test case for ListVisibleFiles"""
683

    
684
  def setUp(self):
685
    self.path = tempfile.mkdtemp()
686

    
687
  def tearDown(self):
688
    shutil.rmtree(self.path)
689

    
690
  def _test(self, files, expected):
691
    # Sort a copy
692
    expected = expected[:]
693
    expected.sort()
694

    
695
    for name in files:
696
      f = open(os.path.join(self.path, name), 'w')
697
      try:
698
        f.write("Test\n")
699
      finally:
700
        f.close()
701

    
702
    found = ListVisibleFiles(self.path)
703
    found.sort()
704

    
705
    self.assertEqual(found, expected)
706

    
707
  def testAllVisible(self):
708
    files = ["a", "b", "c"]
709
    expected = files
710
    self._test(files, expected)
711

    
712
  def testNoneVisible(self):
713
    files = [".a", ".b", ".c"]
714
    expected = []
715
    self._test(files, expected)
716

    
717
  def testSomeVisible(self):
718
    files = ["a", "b", ".c"]
719
    expected = ["a", "b"]
720
    self._test(files, expected)
721

    
722

    
723
class TestNewUUID(unittest.TestCase):
724
  """Test case for NewUUID"""
725

    
726
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
727
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
728

    
729
  def runTest(self):
730
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
731

    
732

    
733
class TestUniqueSequence(unittest.TestCase):
734
  """Test case for UniqueSequence"""
735

    
736
  def _test(self, input, expected):
737
    self.assertEqual(utils.UniqueSequence(input), expected)
738

    
739
  def runTest(self):
740
    # Ordered input
741
    self._test([1, 2, 3], [1, 2, 3])
742
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
743
    self._test([1, 2, 2, 3], [1, 2, 3])
744
    self._test([1, 2, 3, 3], [1, 2, 3])
745

    
746
    # Unordered input
747
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
748
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
749

    
750
    # Strings
751
    self._test(["a", "a"], ["a"])
752
    self._test(["a", "b"], ["a", "b"])
753
    self._test(["a", "b", "a"], ["a", "b"])
754

    
755

    
756
class TestFirstFree(unittest.TestCase):
757
  """Test case for the FirstFree function"""
758

    
759
  def test(self):
760
    """Test FirstFree"""
761
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
762
    self.failUnlessEqual(FirstFree([]), None)
763
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
764
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
765
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
766

    
767

    
768
class TestFileLock(unittest.TestCase):
769
  """Test case for the FileLock class"""
770

    
771
  def setUp(self):
772
    self.tmpfile = tempfile.NamedTemporaryFile()
773
    self.lock = utils.FileLock(self.tmpfile.name)
774

    
775
  def testSharedNonblocking(self):
776
    self.lock.Shared(blocking=False)
777
    self.lock.Close()
778

    
779
  def testExclusiveNonblocking(self):
780
    self.lock.Exclusive(blocking=False)
781
    self.lock.Close()
782

    
783
  def testUnlockNonblocking(self):
784
    self.lock.Unlock(blocking=False)
785
    self.lock.Close()
786

    
787
  def testSharedBlocking(self):
788
    self.lock.Shared(blocking=True)
789
    self.lock.Close()
790

    
791
  def testExclusiveBlocking(self):
792
    self.lock.Exclusive(blocking=True)
793
    self.lock.Close()
794

    
795
  def testUnlockBlocking(self):
796
    self.lock.Unlock(blocking=True)
797
    self.lock.Close()
798

    
799
  def testSharedExclusiveUnlock(self):
800
    self.lock.Shared(blocking=False)
801
    self.lock.Exclusive(blocking=False)
802
    self.lock.Unlock(blocking=False)
803
    self.lock.Close()
804

    
805
  def testExclusiveSharedUnlock(self):
806
    self.lock.Exclusive(blocking=False)
807
    self.lock.Shared(blocking=False)
808
    self.lock.Unlock(blocking=False)
809
    self.lock.Close()
810

    
811
  def testCloseShared(self):
812
    self.lock.Close()
813
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
814

    
815
  def testCloseExclusive(self):
816
    self.lock.Close()
817
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
818

    
819
  def testCloseUnlock(self):
820
    self.lock.Close()
821
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
822

    
823

    
824
class TestTimeFunctions(unittest.TestCase):
825
  """Test case for time functions"""
826

    
827
  def runTest(self):
828
    self.assertEqual(utils.SplitTime(1), (1, 0))
829
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
830
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
831
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
832
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
833
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
834
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
835
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
836

    
837
    self.assertRaises(AssertionError, utils.SplitTime, -1)
838

    
839
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
840
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
841
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
842

    
843
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
844
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
845

    
846
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
847
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
848
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
849
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
850
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
851

    
852

    
853
if __name__ == '__main__':
854
  unittest.main()