Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 6e797216

History | View | Annotate | Download (29.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti.utils import IsProcessAlive, RunCmd, \
42
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46
from ganeti.errors import LockError, UnitParseError, GenericError, \
47
     ProgrammerError
48

    
49

    
50
class TestIsProcessAlive(unittest.TestCase):
51
  """Testing case for IsProcessAlive"""
52

    
53
  def testExists(self):
54
    mypid = os.getpid()
55
    self.assert_(IsProcessAlive(mypid),
56
                 "can't find myself running")
57

    
58
  def testNotExisting(self):
59
    pid_non_existing = os.fork()
60
    if pid_non_existing == 0:
61
      os._exit(0)
62
    elif pid_non_existing < 0:
63
      raise SystemError("can't fork")
64
    os.waitpid(pid_non_existing, 0)
65
    self.assert_(not IsProcessAlive(pid_non_existing),
66
                 "nonexisting process detected")
67

    
68

    
69
class TestPidFileFunctions(unittest.TestCase):
70
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
71

    
72
  def setUp(self):
73
    self.dir = tempfile.mkdtemp()
74
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
75
    utils.DaemonPidFileName = self.f_dpn
76

    
77
  def testPidFileFunctions(self):
78
    pid_file = self.f_dpn('test')
79
    utils.WritePidFile('test')
80
    self.failUnless(os.path.exists(pid_file),
81
                    "PID file should have been created")
82
    read_pid = utils.ReadPidFile(pid_file)
83
    self.failUnlessEqual(read_pid, os.getpid())
84
    self.failUnless(utils.IsProcessAlive(read_pid))
85
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
86
    utils.RemovePidFile('test')
87
    self.failIf(os.path.exists(pid_file),
88
                "PID file should not exist anymore")
89
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
90
                         "ReadPidFile should return 0 for missing pid file")
91
    fh = open(pid_file, "w")
92
    fh.write("blah\n")
93
    fh.close()
94
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95
                         "ReadPidFile should return 0 for invalid pid file")
96
    utils.RemovePidFile('test')
97
    self.failIf(os.path.exists(pid_file),
98
                "PID file should not exist anymore")
99

    
100
  def testKill(self):
101
    pid_file = self.f_dpn('child')
102
    r_fd, w_fd = os.pipe()
103
    new_pid = os.fork()
104
    if new_pid == 0: #child
105
      utils.WritePidFile('child')
106
      os.write(w_fd, 'a')
107
      signal.pause()
108
      os._exit(0)
109
      return
110
    # else we are in the parent
111
    # wait until the child has written the pid file
112
    os.read(r_fd, 1)
113
    read_pid = utils.ReadPidFile(pid_file)
114
    self.failUnlessEqual(read_pid, new_pid)
115
    self.failUnless(utils.IsProcessAlive(new_pid))
116
    utils.KillProcess(new_pid, waitpid=True)
117
    self.failIf(utils.IsProcessAlive(new_pid))
118
    utils.RemovePidFile('child')
119
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
120

    
121
  def tearDown(self):
122
    for name in os.listdir(self.dir):
123
      os.unlink(os.path.join(self.dir, name))
124
    os.rmdir(self.dir)
125

    
126

    
127
class TestRunCmd(testutils.GanetiTestCase):
128
  """Testing case for the RunCmd function"""
129

    
130
  def setUp(self):
131
    self.magic = time.ctime() + " ganeti test"
132
    fh, self.fname = tempfile.mkstemp()
133
    os.close(fh)
134

    
135
  def tearDown(self):
136
    if self.fname:
137
      utils.RemoveFile(self.fname)
138

    
139
  def testOk(self):
140
    """Test successful exit code"""
141
    result = RunCmd("/bin/sh -c 'exit 0'")
142
    self.assertEqual(result.exit_code, 0)
143
    self.assertEqual(result.output, "")
144

    
145
  def testFail(self):
146
    """Test fail exit code"""
147
    result = RunCmd("/bin/sh -c 'exit 1'")
148
    self.assertEqual(result.exit_code, 1)
149
    self.assertEqual(result.output, "")
150

    
151
  def testStdout(self):
152
    """Test standard output"""
153
    cmd = 'echo -n "%s"' % self.magic
154
    result = RunCmd("/bin/sh -c '%s'" % cmd)
155
    self.assertEqual(result.stdout, self.magic)
156
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157
    self.assertEqual(result.output, "")
158
    self.assertFileContent(self.fname, self.magic)
159

    
160
  def testStderr(self):
161
    """Test standard error"""
162
    cmd = 'echo -n "%s"' % self.magic
163
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164
    self.assertEqual(result.stderr, self.magic)
165
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166
    self.assertEqual(result.output, "")
167
    self.assertFileContent(self.fname, self.magic)
168

    
169
  def testCombined(self):
170
    """Test combined output"""
171
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172
    expected = "A" + self.magic + "B" + self.magic
173
    result = RunCmd("/bin/sh -c '%s'" % cmd)
174
    self.assertEqual(result.output, expected)
175
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176
    self.assertEqual(result.output, "")
177
    self.assertFileContent(self.fname, expected)
178

    
179
  def testSignal(self):
180
    """Test signal"""
181
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182
    self.assertEqual(result.signal, 15)
183
    self.assertEqual(result.output, "")
184

    
185
  def testListRun(self):
186
    """Test list runs"""
187
    result = RunCmd(["true"])
188
    self.assertEqual(result.signal, None)
189
    self.assertEqual(result.exit_code, 0)
190
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
191
    self.assertEqual(result.signal, None)
192
    self.assertEqual(result.exit_code, 1)
193
    result = RunCmd(["echo", "-n", self.magic])
194
    self.assertEqual(result.signal, None)
195
    self.assertEqual(result.exit_code, 0)
196
    self.assertEqual(result.stdout, self.magic)
197

    
198
  def testFileEmptyOutput(self):
199
    """Test file output"""
200
    result = RunCmd(["true"], output=self.fname)
201
    self.assertEqual(result.signal, None)
202
    self.assertEqual(result.exit_code, 0)
203
    self.assertFileContent(self.fname, "")
204

    
205
  def testLang(self):
206
    """Test locale environment"""
207
    old_env = os.environ.copy()
208
    try:
209
      os.environ["LANG"] = "en_US.UTF-8"
210
      os.environ["LC_ALL"] = "en_US.UTF-8"
211
      result = RunCmd(["locale"])
212
      for line in result.output.splitlines():
213
        key, value = line.split("=", 1)
214
        # Ignore these variables, they're overridden by LC_ALL
215
        if key == "LANG" or key == "LANGUAGE":
216
          continue
217
        self.failIf(value and value != "C" and value != '"C"',
218
            "Variable %s is set to the invalid value '%s'" % (key, value))
219
    finally:
220
      os.environ = old_env
221

    
222
  def testDefaultCwd(self):
223
    """Test default working directory"""
224
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225

    
226
  def testCwd(self):
227
    """Test default working directory"""
228
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230
    cwd = os.getcwd()
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232

    
233

    
234
class TestRemoveFile(unittest.TestCase):
235
  """Test case for the RemoveFile function"""
236

    
237
  def setUp(self):
238
    """Create a temp dir and file for each case"""
239
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241
    os.close(fd)
242

    
243
  def tearDown(self):
244
    if os.path.exists(self.tmpfile):
245
      os.unlink(self.tmpfile)
246
    os.rmdir(self.tmpdir)
247

    
248

    
249
  def testIgnoreDirs(self):
250
    """Test that RemoveFile() ignores directories"""
251
    self.assertEqual(None, RemoveFile(self.tmpdir))
252

    
253

    
254
  def testIgnoreNotExisting(self):
255
    """Test that RemoveFile() ignores non-existing files"""
256
    RemoveFile(self.tmpfile)
257
    RemoveFile(self.tmpfile)
258

    
259

    
260
  def testRemoveFile(self):
261
    """Test that RemoveFile does remove a file"""
262
    RemoveFile(self.tmpfile)
263
    if os.path.exists(self.tmpfile):
264
      self.fail("File '%s' not removed" % self.tmpfile)
265

    
266

    
267
  def testRemoveSymlink(self):
268
    """Test that RemoveFile does remove symlinks"""
269
    symlink = self.tmpdir + "/symlink"
270
    os.symlink("no-such-file", symlink)
271
    RemoveFile(symlink)
272
    if os.path.exists(symlink):
273
      self.fail("File '%s' not removed" % symlink)
274
    os.symlink(self.tmpfile, symlink)
275
    RemoveFile(symlink)
276
    if os.path.exists(symlink):
277
      self.fail("File '%s' not removed" % symlink)
278

    
279

    
280
class TestRename(unittest.TestCase):
281
  """Test case for RenameFile"""
282

    
283
  def setUp(self):
284
    """Create a temporary directory"""
285
    self.tmpdir = tempfile.mkdtemp()
286
    self.tmpfile = os.path.join(self.tmpdir, "test1")
287

    
288
    # Touch the file
289
    open(self.tmpfile, "w").close()
290

    
291
  def tearDown(self):
292
    """Remove temporary directory"""
293
    shutil.rmtree(self.tmpdir)
294

    
295
  def testSimpleRename1(self):
296
    """Simple rename 1"""
297
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
298

    
299
  def testSimpleRename2(self):
300
    """Simple rename 2"""
301
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
302
                     mkdir=True)
303

    
304
  def testRenameMkdir(self):
305
    """Rename with mkdir"""
306
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
307
                     mkdir=True)
308

    
309

    
310
class TestCheckdict(unittest.TestCase):
311
  """Test case for the CheckDict function"""
312

    
313
  def testAdd(self):
314
    """Test that CheckDict adds a missing key with the correct value"""
315

    
316
    tgt = {'a':1}
317
    tmpl = {'b': 2}
318
    CheckDict(tgt, tmpl)
319
    if 'b' not in tgt or tgt['b'] != 2:
320
      self.fail("Failed to update dict")
321

    
322

    
323
  def testNoUpdate(self):
324
    """Test that CheckDict does not overwrite an existing key"""
325
    tgt = {'a':1, 'b': 3}
326
    tmpl = {'b': 2}
327
    CheckDict(tgt, tmpl)
328
    self.failUnlessEqual(tgt['b'], 3)
329

    
330

    
331
class TestMatchNameComponent(unittest.TestCase):
332
  """Test case for the MatchNameComponent function"""
333

    
334
  def testEmptyList(self):
335
    """Test that there is no match against an empty list"""
336

    
337
    self.failUnlessEqual(MatchNameComponent("", []), None)
338
    self.failUnlessEqual(MatchNameComponent("test", []), None)
339

    
340
  def testSingleMatch(self):
341
    """Test that a single match is performed correctly"""
342
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
343
    for key in "test2", "test2.example", "test2.example.com":
344
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
345

    
346
  def testMultipleMatches(self):
347
    """Test that a multiple match is returned as None"""
348
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
349
    for key in "test1", "test1.example":
350
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
351

    
352

    
353
class TestFormatUnit(unittest.TestCase):
354
  """Test case for the FormatUnit function"""
355

    
356
  def testMiB(self):
357
    self.assertEqual(FormatUnit(1, 'h'), '1M')
358
    self.assertEqual(FormatUnit(100, 'h'), '100M')
359
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
360

    
361
    self.assertEqual(FormatUnit(1, 'm'), '1')
362
    self.assertEqual(FormatUnit(100, 'm'), '100')
363
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
364

    
365
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
366
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
367
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
368
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
369

    
370
  def testGiB(self):
371
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
372
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
373
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
374
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
375

    
376
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
377
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
378
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
379
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
380

    
381
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
382
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
383
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
384

    
385
  def testTiB(self):
386
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
387
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
388
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
389

    
390
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
391
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
392
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
393

    
394
class TestParseUnit(unittest.TestCase):
395
  """Test case for the ParseUnit function"""
396

    
397
  SCALES = (('', 1),
398
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
399
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
400
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
401

    
402
  def testRounding(self):
403
    self.assertEqual(ParseUnit('0'), 0)
404
    self.assertEqual(ParseUnit('1'), 4)
405
    self.assertEqual(ParseUnit('2'), 4)
406
    self.assertEqual(ParseUnit('3'), 4)
407

    
408
    self.assertEqual(ParseUnit('124'), 124)
409
    self.assertEqual(ParseUnit('125'), 128)
410
    self.assertEqual(ParseUnit('126'), 128)
411
    self.assertEqual(ParseUnit('127'), 128)
412
    self.assertEqual(ParseUnit('128'), 128)
413
    self.assertEqual(ParseUnit('129'), 132)
414
    self.assertEqual(ParseUnit('130'), 132)
415

    
416
  def testFloating(self):
417
    self.assertEqual(ParseUnit('0'), 0)
418
    self.assertEqual(ParseUnit('0.5'), 4)
419
    self.assertEqual(ParseUnit('1.75'), 4)
420
    self.assertEqual(ParseUnit('1.99'), 4)
421
    self.assertEqual(ParseUnit('2.00'), 4)
422
    self.assertEqual(ParseUnit('2.01'), 4)
423
    self.assertEqual(ParseUnit('3.99'), 4)
424
    self.assertEqual(ParseUnit('4.00'), 4)
425
    self.assertEqual(ParseUnit('4.01'), 8)
426
    self.assertEqual(ParseUnit('1.5G'), 1536)
427
    self.assertEqual(ParseUnit('1.8G'), 1844)
428
    self.assertEqual(ParseUnit('8.28T'), 8682212)
429

    
430
  def testSuffixes(self):
431
    for sep in ('', ' ', '   ', "\t", "\t "):
432
      for suffix, scale in TestParseUnit.SCALES:
433
        for func in (lambda x: x, str.lower, str.upper):
434
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
435
                           1024 * scale)
436

    
437
  def testInvalidInput(self):
438
    for sep in ('-', '_', ',', 'a'):
439
      for suffix, _ in TestParseUnit.SCALES:
440
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
441

    
442
    for suffix, _ in TestParseUnit.SCALES:
443
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
444

    
445

    
446
class TestSshKeys(testutils.GanetiTestCase):
447
  """Test case for the AddAuthorizedKey function"""
448

    
449
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
450
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
451
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
452

    
453
  def setUp(self):
454
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
455
    try:
456
      handle = os.fdopen(fd, 'w')
457
      try:
458
        handle.write("%s\n" % TestSshKeys.KEY_A)
459
        handle.write("%s\n" % TestSshKeys.KEY_B)
460
      finally:
461
        handle.close()
462
    except:
463
      utils.RemoveFile(self.tmpname)
464
      raise
465

    
466
  def tearDown(self):
467
    utils.RemoveFile(self.tmpname)
468
    del self.tmpname
469

    
470
  def testAddingNewKey(self):
471
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
472

    
473
    self.assertFileContent(self.tmpname,
474
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
475
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
476
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
477
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
478

    
479
  def testAddingAlmostButNotCompletelyTheSameKey(self):
480
    AddAuthorizedKey(self.tmpname,
481
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
482

    
483
    self.assertFileContent(self.tmpname,
484
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
485
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
486
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
487
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
488

    
489
  def testAddingExistingKeyWithSomeMoreSpaces(self):
490
    AddAuthorizedKey(self.tmpname,
491
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
492

    
493
    self.assertFileContent(self.tmpname,
494
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
495
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
496
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
497

    
498
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
499
    RemoveAuthorizedKey(self.tmpname,
500
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
501

    
502
    self.assertFileContent(self.tmpname,
503
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
504
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
505

    
506
  def testRemovingNonExistingKey(self):
507
    RemoveAuthorizedKey(self.tmpname,
508
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
509

    
510
    self.assertFileContent(self.tmpname,
511
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
512
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
513
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
514

    
515

    
516
class TestEtcHosts(testutils.GanetiTestCase):
517
  """Test functions modifying /etc/hosts"""
518

    
519
  def setUp(self):
520
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
521
    try:
522
      handle = os.fdopen(fd, 'w')
523
      try:
524
        handle.write('# This is a test file for /etc/hosts\n')
525
        handle.write('127.0.0.1\tlocalhost\n')
526
        handle.write('192.168.1.1 router gw\n')
527
      finally:
528
        handle.close()
529
    except:
530
      utils.RemoveFile(self.tmpname)
531
      raise
532

    
533
  def tearDown(self):
534
    utils.RemoveFile(self.tmpname)
535
    del self.tmpname
536

    
537
  def testSettingNewIp(self):
538
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
539

    
540
    self.assertFileContent(self.tmpname,
541
      "# This is a test file for /etc/hosts\n"
542
      "127.0.0.1\tlocalhost\n"
543
      "192.168.1.1 router gw\n"
544
      "1.2.3.4\tmyhost.domain.tld myhost\n")
545

    
546
  def testSettingExistingIp(self):
547
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
548
                     ['myhost'])
549

    
550
    self.assertFileContent(self.tmpname,
551
      "# This is a test file for /etc/hosts\n"
552
      "127.0.0.1\tlocalhost\n"
553
      "192.168.1.1\tmyhost.domain.tld myhost\n")
554

    
555
  def testSettingDuplicateName(self):
556
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
557

    
558
    self.assertFileContent(self.tmpname,
559
      "# This is a test file for /etc/hosts\n"
560
      "127.0.0.1\tlocalhost\n"
561
      "192.168.1.1 router gw\n"
562
      "1.2.3.4\tmyhost\n")
563

    
564
  def testRemovingExistingHost(self):
565
    RemoveEtcHostsEntry(self.tmpname, 'router')
566

    
567
    self.assertFileContent(self.tmpname,
568
      "# This is a test file for /etc/hosts\n"
569
      "127.0.0.1\tlocalhost\n"
570
      "192.168.1.1 gw\n")
571

    
572
  def testRemovingSingleExistingHost(self):
573
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
574

    
575
    self.assertFileContent(self.tmpname,
576
      "# This is a test file for /etc/hosts\n"
577
      "192.168.1.1 router gw\n")
578

    
579
  def testRemovingNonExistingHost(self):
580
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
581

    
582
    self.assertFileContent(self.tmpname,
583
      "# This is a test file for /etc/hosts\n"
584
      "127.0.0.1\tlocalhost\n"
585
      "192.168.1.1 router gw\n")
586

    
587
  def testRemovingAlias(self):
588
    RemoveEtcHostsEntry(self.tmpname, 'gw')
589

    
590
    self.assertFileContent(self.tmpname,
591
      "# This is a test file for /etc/hosts\n"
592
      "127.0.0.1\tlocalhost\n"
593
      "192.168.1.1 router\n")
594

    
595

    
596
class TestShellQuoting(unittest.TestCase):
597
  """Test case for shell quoting functions"""
598

    
599
  def testShellQuote(self):
600
    self.assertEqual(ShellQuote('abc'), "abc")
601
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
602
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
603
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
604
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
605

    
606
  def testShellQuoteArgs(self):
607
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
608
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
609
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
610

    
611

    
612
class TestTcpPing(unittest.TestCase):
613
  """Testcase for TCP version of ping - against listen(2)ing port"""
614

    
615
  def setUp(self):
616
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
617
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
618
    self.listenerport = self.listener.getsockname()[1]
619
    self.listener.listen(1)
620

    
621
  def tearDown(self):
622
    self.listener.shutdown(socket.SHUT_RDWR)
623
    del self.listener
624
    del self.listenerport
625

    
626
  def testTcpPingToLocalHostAccept(self):
627
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
628
                         self.listenerport,
629
                         timeout=10,
630
                         live_port_needed=True,
631
                         source=constants.LOCALHOST_IP_ADDRESS,
632
                         ),
633
                 "failed to connect to test listener")
634

    
635
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
636
                         self.listenerport,
637
                         timeout=10,
638
                         live_port_needed=True,
639
                         ),
640
                 "failed to connect to test listener (no source)")
641

    
642

    
643
class TestTcpPingDeaf(unittest.TestCase):
644
  """Testcase for TCP version of ping - against non listen(2)ing port"""
645

    
646
  def setUp(self):
647
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
648
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
649
    self.deaflistenerport = self.deaflistener.getsockname()[1]
650

    
651
  def tearDown(self):
652
    del self.deaflistener
653
    del self.deaflistenerport
654

    
655
  def testTcpPingToLocalHostAcceptDeaf(self):
656
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
657
                        self.deaflistenerport,
658
                        timeout=constants.TCP_PING_TIMEOUT,
659
                        live_port_needed=True,
660
                        source=constants.LOCALHOST_IP_ADDRESS,
661
                        ), # need successful connect(2)
662
                "successfully connected to deaf listener")
663

    
664
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
665
                        self.deaflistenerport,
666
                        timeout=constants.TCP_PING_TIMEOUT,
667
                        live_port_needed=True,
668
                        ), # need successful connect(2)
669
                "successfully connected to deaf listener (no source addr)")
670

    
671
  def testTcpPingToLocalHostNoAccept(self):
672
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
673
                         self.deaflistenerport,
674
                         timeout=constants.TCP_PING_TIMEOUT,
675
                         live_port_needed=False,
676
                         source=constants.LOCALHOST_IP_ADDRESS,
677
                         ), # ECONNREFUSED is OK
678
                 "failed to ping alive host on deaf port")
679

    
680
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
681
                         self.deaflistenerport,
682
                         timeout=constants.TCP_PING_TIMEOUT,
683
                         live_port_needed=False,
684
                         ), # ECONNREFUSED is OK
685
                 "failed to ping alive host on deaf port (no source addr)")
686

    
687

    
688
class TestOwnIpAddress(unittest.TestCase):
689
  """Testcase for OwnIpAddress"""
690

    
691
  def testOwnLoopback(self):
692
    """check having the loopback ip"""
693
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
694
                    "Should own the loopback address")
695

    
696
  def testNowOwnAddress(self):
697
    """check that I don't own an address"""
698

    
699
    # network 192.0.2.0/24 is reserved for test/documentation as per
700
    # rfc 3330, so we *should* not have an address of this range... if
701
    # this fails, we should extend the test to multiple addresses
702
    DST_IP = "192.0.2.1"
703
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
704

    
705

    
706
class TestListVisibleFiles(unittest.TestCase):
707
  """Test case for ListVisibleFiles"""
708

    
709
  def setUp(self):
710
    self.path = tempfile.mkdtemp()
711

    
712
  def tearDown(self):
713
    shutil.rmtree(self.path)
714

    
715
  def _test(self, files, expected):
716
    # Sort a copy
717
    expected = expected[:]
718
    expected.sort()
719

    
720
    for name in files:
721
      f = open(os.path.join(self.path, name), 'w')
722
      try:
723
        f.write("Test\n")
724
      finally:
725
        f.close()
726

    
727
    found = ListVisibleFiles(self.path)
728
    found.sort()
729

    
730
    self.assertEqual(found, expected)
731

    
732
  def testAllVisible(self):
733
    files = ["a", "b", "c"]
734
    expected = files
735
    self._test(files, expected)
736

    
737
  def testNoneVisible(self):
738
    files = [".a", ".b", ".c"]
739
    expected = []
740
    self._test(files, expected)
741

    
742
  def testSomeVisible(self):
743
    files = ["a", "b", ".c"]
744
    expected = ["a", "b"]
745
    self._test(files, expected)
746

    
747

    
748
class TestNewUUID(unittest.TestCase):
749
  """Test case for NewUUID"""
750

    
751
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
752
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
753

    
754
  def runTest(self):
755
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
756

    
757

    
758
class TestUniqueSequence(unittest.TestCase):
759
  """Test case for UniqueSequence"""
760

    
761
  def _test(self, input, expected):
762
    self.assertEqual(utils.UniqueSequence(input), expected)
763

    
764
  def runTest(self):
765
    # Ordered input
766
    self._test([1, 2, 3], [1, 2, 3])
767
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
768
    self._test([1, 2, 2, 3], [1, 2, 3])
769
    self._test([1, 2, 3, 3], [1, 2, 3])
770

    
771
    # Unordered input
772
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
773
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
774

    
775
    # Strings
776
    self._test(["a", "a"], ["a"])
777
    self._test(["a", "b"], ["a", "b"])
778
    self._test(["a", "b", "a"], ["a", "b"])
779

    
780

    
781
class TestFirstFree(unittest.TestCase):
782
  """Test case for the FirstFree function"""
783

    
784
  def test(self):
785
    """Test FirstFree"""
786
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
787
    self.failUnlessEqual(FirstFree([]), None)
788
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
789
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
790
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
791

    
792

    
793
class TestFileLock(unittest.TestCase):
794
  """Test case for the FileLock class"""
795

    
796
  def setUp(self):
797
    self.tmpfile = tempfile.NamedTemporaryFile()
798
    self.lock = utils.FileLock(self.tmpfile.name)
799

    
800
  def testSharedNonblocking(self):
801
    self.lock.Shared(blocking=False)
802
    self.lock.Close()
803

    
804
  def testExclusiveNonblocking(self):
805
    self.lock.Exclusive(blocking=False)
806
    self.lock.Close()
807

    
808
  def testUnlockNonblocking(self):
809
    self.lock.Unlock(blocking=False)
810
    self.lock.Close()
811

    
812
  def testSharedBlocking(self):
813
    self.lock.Shared(blocking=True)
814
    self.lock.Close()
815

    
816
  def testExclusiveBlocking(self):
817
    self.lock.Exclusive(blocking=True)
818
    self.lock.Close()
819

    
820
  def testUnlockBlocking(self):
821
    self.lock.Unlock(blocking=True)
822
    self.lock.Close()
823

    
824
  def testSharedExclusiveUnlock(self):
825
    self.lock.Shared(blocking=False)
826
    self.lock.Exclusive(blocking=False)
827
    self.lock.Unlock(blocking=False)
828
    self.lock.Close()
829

    
830
  def testExclusiveSharedUnlock(self):
831
    self.lock.Exclusive(blocking=False)
832
    self.lock.Shared(blocking=False)
833
    self.lock.Unlock(blocking=False)
834
    self.lock.Close()
835

    
836
  def testCloseShared(self):
837
    self.lock.Close()
838
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
839

    
840
  def testCloseExclusive(self):
841
    self.lock.Close()
842
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
843

    
844
  def testCloseUnlock(self):
845
    self.lock.Close()
846
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
847

    
848

    
849
class TestTimeFunctions(unittest.TestCase):
850
  """Test case for time functions"""
851

    
852
  def runTest(self):
853
    self.assertEqual(utils.SplitTime(1), (1, 0))
854
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
855
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
856
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
857
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
858
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
859
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
860
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
861

    
862
    self.assertRaises(AssertionError, utils.SplitTime, -1)
863

    
864
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
865
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
866
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
867

    
868
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
869
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
870

    
871
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
872
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
873
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
874
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
875
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
876

    
877

    
878
class FieldSetTestCase(unittest.TestCase):
879
  """Test case for FieldSets"""
880

    
881
  def testSimpleMatch(self):
882
    f = utils.FieldSet("a", "b", "c", "def")
883
    self.failUnless(f.Matches("a"))
884
    self.failIf(f.Matches("d"), "Substring matched")
885
    self.failIf(f.Matches("defghi"), "Prefix string matched")
886
    self.failIf(f.NonMatching(["b", "c"]))
887
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
888
    self.failUnless(f.NonMatching(["a", "d"]))
889

    
890
  def testRegexMatch(self):
891
    f = utils.FieldSet("a", "b([0-9]+)", "c")
892
    self.failUnless(f.Matches("b1"))
893
    self.failUnless(f.Matches("b99"))
894
    self.failIf(f.Matches("b/1"))
895
    self.failIf(f.NonMatching(["b12", "c"]))
896
    self.failUnless(f.NonMatching(["a", "1"]))
897

    
898

    
899
if __name__ == '__main__':
900
  unittest.main()