Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 9fbfbb7b

History | View | Annotate | Download (28.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti.utils import IsProcessAlive, RunCmd, \
42
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46
from ganeti.errors import LockError, UnitParseError, GenericError, \
47
     ProgrammerError
48

    
49

    
50
class TestIsProcessAlive(unittest.TestCase):
51
  """Testing case for IsProcessAlive"""
52

    
53
  def testExists(self):
54
    mypid = os.getpid()
55
    self.assert_(IsProcessAlive(mypid),
56
                 "can't find myself running")
57

    
58
  def testNotExisting(self):
59
    pid_non_existing = os.fork()
60
    if pid_non_existing == 0:
61
      os._exit(0)
62
    elif pid_non_existing < 0:
63
      raise SystemError("can't fork")
64
    os.waitpid(pid_non_existing, 0)
65
    self.assert_(not IsProcessAlive(pid_non_existing),
66
                 "nonexisting process detected")
67

    
68

    
69
class TestPidFileFunctions(unittest.TestCase):
70
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
71

    
72
  def setUp(self):
73
    self.dir = tempfile.mkdtemp()
74
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
75
    utils.DaemonPidFileName = self.f_dpn
76

    
77
  def testPidFileFunctions(self):
78
    pid_file = self.f_dpn('test')
79
    utils.WritePidFile('test')
80
    self.failUnless(os.path.exists(pid_file),
81
                    "PID file should have been created")
82
    read_pid = utils.ReadPidFile(pid_file)
83
    self.failUnlessEqual(read_pid, os.getpid())
84
    self.failUnless(utils.IsProcessAlive(read_pid))
85
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
86
    utils.RemovePidFile('test')
87
    self.failIf(os.path.exists(pid_file),
88
                "PID file should not exist anymore")
89
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
90
                         "ReadPidFile should return 0 for missing pid file")
91
    fh = open(pid_file, "w")
92
    fh.write("blah\n")
93
    fh.close()
94
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95
                         "ReadPidFile should return 0 for invalid pid file")
96
    utils.RemovePidFile('test')
97
    self.failIf(os.path.exists(pid_file),
98
                "PID file should not exist anymore")
99

    
100
  def testKill(self):
101
    pid_file = self.f_dpn('child')
102
    r_fd, w_fd = os.pipe()
103
    new_pid = os.fork()
104
    if new_pid == 0: #child
105
      utils.WritePidFile('child')
106
      os.write(w_fd, 'a')
107
      signal.pause()
108
      os._exit(0)
109
      return
110
    # else we are in the parent
111
    # wait until the child has written the pid file
112
    os.read(r_fd, 1)
113
    read_pid = utils.ReadPidFile(pid_file)
114
    self.failUnlessEqual(read_pid, new_pid)
115
    self.failUnless(utils.IsProcessAlive(new_pid))
116
    utils.KillProcess(new_pid, waitpid=True)
117
    self.failIf(utils.IsProcessAlive(new_pid))
118
    utils.RemovePidFile('child')
119
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
120

    
121
  def tearDown(self):
122
    for name in os.listdir(self.dir):
123
      os.unlink(os.path.join(self.dir, name))
124
    os.rmdir(self.dir)
125

    
126

    
127
class TestRunCmd(testutils.GanetiTestCase):
128
  """Testing case for the RunCmd function"""
129

    
130
  def setUp(self):
131
    self.magic = time.ctime() + " ganeti test"
132
    fh, self.fname = tempfile.mkstemp()
133
    os.close(fh)
134

    
135
  def tearDown(self):
136
    if self.fname:
137
      utils.RemoveFile(self.fname)
138

    
139
  def testOk(self):
140
    """Test successful exit code"""
141
    result = RunCmd("/bin/sh -c 'exit 0'")
142
    self.assertEqual(result.exit_code, 0)
143
    self.assertEqual(result.output, "")
144

    
145
  def testFail(self):
146
    """Test fail exit code"""
147
    result = RunCmd("/bin/sh -c 'exit 1'")
148
    self.assertEqual(result.exit_code, 1)
149
    self.assertEqual(result.output, "")
150

    
151
  def testStdout(self):
152
    """Test standard output"""
153
    cmd = 'echo -n "%s"' % self.magic
154
    result = RunCmd("/bin/sh -c '%s'" % cmd)
155
    self.assertEqual(result.stdout, self.magic)
156
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157
    self.assertEqual(result.output, "")
158
    self.assertFileContent(self.fname, self.magic)
159

    
160
  def testStderr(self):
161
    """Test standard error"""
162
    cmd = 'echo -n "%s"' % self.magic
163
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164
    self.assertEqual(result.stderr, self.magic)
165
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166
    self.assertEqual(result.output, "")
167
    self.assertFileContent(self.fname, self.magic)
168

    
169
  def testCombined(self):
170
    """Test combined output"""
171
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172
    expected = "A" + self.magic + "B" + self.magic
173
    result = RunCmd("/bin/sh -c '%s'" % cmd)
174
    self.assertEqual(result.output, expected)
175
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176
    self.assertEqual(result.output, "")
177
    self.assertFileContent(self.fname, expected)
178

    
179
  def testSignal(self):
180
    """Test signal"""
181
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182
    self.assertEqual(result.signal, 15)
183
    self.assertEqual(result.output, "")
184

    
185
  def testListRun(self):
186
    """Test list runs"""
187
    result = RunCmd(["true"])
188
    self.assertEqual(result.signal, None)
189
    self.assertEqual(result.exit_code, 0)
190
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
191
    self.assertEqual(result.signal, None)
192
    self.assertEqual(result.exit_code, 1)
193
    result = RunCmd(["echo", "-n", self.magic])
194
    self.assertEqual(result.signal, None)
195
    self.assertEqual(result.exit_code, 0)
196
    self.assertEqual(result.stdout, self.magic)
197

    
198
  def testFileEmptyOutput(self):
199
    """Test file output"""
200
    result = RunCmd(["true"], output=self.fname)
201
    self.assertEqual(result.signal, None)
202
    self.assertEqual(result.exit_code, 0)
203
    self.assertFileContent(self.fname, "")
204

    
205
  def testLang(self):
206
    """Test locale environment"""
207
    old_env = os.environ.copy()
208
    try:
209
      os.environ["LANG"] = "en_US.UTF-8"
210
      os.environ["LC_ALL"] = "en_US.UTF-8"
211
      result = RunCmd(["locale"])
212
      for line in result.output.splitlines():
213
        key, value = line.split("=", 1)
214
        # Ignore these variables, they're overridden by LC_ALL
215
        if key == "LANG" or key == "LANGUAGE":
216
          continue
217
        self.failIf(value and value != "C" and value != '"C"',
218
            "Variable %s is set to the invalid value '%s'" % (key, value))
219
    finally:
220
      os.environ = old_env
221

    
222
  def testDefaultCwd(self):
223
    """Test default working directory"""
224
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225

    
226
  def testCwd(self):
227
    """Test default working directory"""
228
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230
    cwd = os.getcwd()
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232

    
233

    
234
class TestRemoveFile(unittest.TestCase):
235
  """Test case for the RemoveFile function"""
236

    
237
  def setUp(self):
238
    """Create a temp dir and file for each case"""
239
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241
    os.close(fd)
242

    
243
  def tearDown(self):
244
    if os.path.exists(self.tmpfile):
245
      os.unlink(self.tmpfile)
246
    os.rmdir(self.tmpdir)
247

    
248

    
249
  def testIgnoreDirs(self):
250
    """Test that RemoveFile() ignores directories"""
251
    self.assertEqual(None, RemoveFile(self.tmpdir))
252

    
253

    
254
  def testIgnoreNotExisting(self):
255
    """Test that RemoveFile() ignores non-existing files"""
256
    RemoveFile(self.tmpfile)
257
    RemoveFile(self.tmpfile)
258

    
259

    
260
  def testRemoveFile(self):
261
    """Test that RemoveFile does remove a file"""
262
    RemoveFile(self.tmpfile)
263
    if os.path.exists(self.tmpfile):
264
      self.fail("File '%s' not removed" % self.tmpfile)
265

    
266

    
267
  def testRemoveSymlink(self):
268
    """Test that RemoveFile does remove symlinks"""
269
    symlink = self.tmpdir + "/symlink"
270
    os.symlink("no-such-file", symlink)
271
    RemoveFile(symlink)
272
    if os.path.exists(symlink):
273
      self.fail("File '%s' not removed" % symlink)
274
    os.symlink(self.tmpfile, symlink)
275
    RemoveFile(symlink)
276
    if os.path.exists(symlink):
277
      self.fail("File '%s' not removed" % symlink)
278

    
279

    
280
class TestCheckdict(unittest.TestCase):
281
  """Test case for the CheckDict function"""
282

    
283
  def testAdd(self):
284
    """Test that CheckDict adds a missing key with the correct value"""
285

    
286
    tgt = {'a':1}
287
    tmpl = {'b': 2}
288
    CheckDict(tgt, tmpl)
289
    if 'b' not in tgt or tgt['b'] != 2:
290
      self.fail("Failed to update dict")
291

    
292

    
293
  def testNoUpdate(self):
294
    """Test that CheckDict does not overwrite an existing key"""
295
    tgt = {'a':1, 'b': 3}
296
    tmpl = {'b': 2}
297
    CheckDict(tgt, tmpl)
298
    self.failUnlessEqual(tgt['b'], 3)
299

    
300

    
301
class TestMatchNameComponent(unittest.TestCase):
302
  """Test case for the MatchNameComponent function"""
303

    
304
  def testEmptyList(self):
305
    """Test that there is no match against an empty list"""
306

    
307
    self.failUnlessEqual(MatchNameComponent("", []), None)
308
    self.failUnlessEqual(MatchNameComponent("test", []), None)
309

    
310
  def testSingleMatch(self):
311
    """Test that a single match is performed correctly"""
312
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
313
    for key in "test2", "test2.example", "test2.example.com":
314
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
315

    
316
  def testMultipleMatches(self):
317
    """Test that a multiple match is returned as None"""
318
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
319
    for key in "test1", "test1.example":
320
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
321

    
322

    
323
class TestFormatUnit(unittest.TestCase):
324
  """Test case for the FormatUnit function"""
325

    
326
  def testMiB(self):
327
    self.assertEqual(FormatUnit(1, 'h'), '1M')
328
    self.assertEqual(FormatUnit(100, 'h'), '100M')
329
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
330

    
331
    self.assertEqual(FormatUnit(1, 'm'), '1')
332
    self.assertEqual(FormatUnit(100, 'm'), '100')
333
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
334

    
335
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
336
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
337
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
338
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
339

    
340
  def testGiB(self):
341
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
342
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
343
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
344
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
345

    
346
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
347
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
348
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
349
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
350

    
351
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
352
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
353
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
354

    
355
  def testTiB(self):
356
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
357
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
358
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
359

    
360
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
361
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
362
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
363

    
364
class TestParseUnit(unittest.TestCase):
365
  """Test case for the ParseUnit function"""
366

    
367
  SCALES = (('', 1),
368
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
369
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
370
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
371

    
372
  def testRounding(self):
373
    self.assertEqual(ParseUnit('0'), 0)
374
    self.assertEqual(ParseUnit('1'), 4)
375
    self.assertEqual(ParseUnit('2'), 4)
376
    self.assertEqual(ParseUnit('3'), 4)
377

    
378
    self.assertEqual(ParseUnit('124'), 124)
379
    self.assertEqual(ParseUnit('125'), 128)
380
    self.assertEqual(ParseUnit('126'), 128)
381
    self.assertEqual(ParseUnit('127'), 128)
382
    self.assertEqual(ParseUnit('128'), 128)
383
    self.assertEqual(ParseUnit('129'), 132)
384
    self.assertEqual(ParseUnit('130'), 132)
385

    
386
  def testFloating(self):
387
    self.assertEqual(ParseUnit('0'), 0)
388
    self.assertEqual(ParseUnit('0.5'), 4)
389
    self.assertEqual(ParseUnit('1.75'), 4)
390
    self.assertEqual(ParseUnit('1.99'), 4)
391
    self.assertEqual(ParseUnit('2.00'), 4)
392
    self.assertEqual(ParseUnit('2.01'), 4)
393
    self.assertEqual(ParseUnit('3.99'), 4)
394
    self.assertEqual(ParseUnit('4.00'), 4)
395
    self.assertEqual(ParseUnit('4.01'), 8)
396
    self.assertEqual(ParseUnit('1.5G'), 1536)
397
    self.assertEqual(ParseUnit('1.8G'), 1844)
398
    self.assertEqual(ParseUnit('8.28T'), 8682212)
399

    
400
  def testSuffixes(self):
401
    for sep in ('', ' ', '   ', "\t", "\t "):
402
      for suffix, scale in TestParseUnit.SCALES:
403
        for func in (lambda x: x, str.lower, str.upper):
404
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
405
                           1024 * scale)
406

    
407
  def testInvalidInput(self):
408
    for sep in ('-', '_', ',', 'a'):
409
      for suffix, _ in TestParseUnit.SCALES:
410
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
411

    
412
    for suffix, _ in TestParseUnit.SCALES:
413
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
414

    
415

    
416
class TestSshKeys(testutils.GanetiTestCase):
417
  """Test case for the AddAuthorizedKey function"""
418

    
419
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
420
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
421
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
422

    
423
  def setUp(self):
424
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
425
    try:
426
      handle = os.fdopen(fd, 'w')
427
      try:
428
        handle.write("%s\n" % TestSshKeys.KEY_A)
429
        handle.write("%s\n" % TestSshKeys.KEY_B)
430
      finally:
431
        handle.close()
432
    except:
433
      utils.RemoveFile(self.tmpname)
434
      raise
435

    
436
  def tearDown(self):
437
    utils.RemoveFile(self.tmpname)
438
    del self.tmpname
439

    
440
  def testAddingNewKey(self):
441
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
442

    
443
    self.assertFileContent(self.tmpname,
444
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
445
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
446
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
447
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
448

    
449
  def testAddingAlmostButNotCompletelyTheSameKey(self):
450
    AddAuthorizedKey(self.tmpname,
451
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
452

    
453
    self.assertFileContent(self.tmpname,
454
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
455
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
456
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
457
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
458

    
459
  def testAddingExistingKeyWithSomeMoreSpaces(self):
460
    AddAuthorizedKey(self.tmpname,
461
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
462

    
463
    self.assertFileContent(self.tmpname,
464
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
465
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
466
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
467

    
468
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
469
    RemoveAuthorizedKey(self.tmpname,
470
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
471

    
472
    self.assertFileContent(self.tmpname,
473
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
474
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
475

    
476
  def testRemovingNonExistingKey(self):
477
    RemoveAuthorizedKey(self.tmpname,
478
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
479

    
480
    self.assertFileContent(self.tmpname,
481
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
482
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
483
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
484

    
485

    
486
class TestEtcHosts(testutils.GanetiTestCase):
487
  """Test functions modifying /etc/hosts"""
488

    
489
  def setUp(self):
490
    (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
491
    try:
492
      handle = os.fdopen(fd, 'w')
493
      try:
494
        handle.write('# This is a test file for /etc/hosts\n')
495
        handle.write('127.0.0.1\tlocalhost\n')
496
        handle.write('192.168.1.1 router gw\n')
497
      finally:
498
        handle.close()
499
    except:
500
      utils.RemoveFile(self.tmpname)
501
      raise
502

    
503
  def tearDown(self):
504
    utils.RemoveFile(self.tmpname)
505
    del self.tmpname
506

    
507
  def testSettingNewIp(self):
508
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
509

    
510
    self.assertFileContent(self.tmpname,
511
      "# This is a test file for /etc/hosts\n"
512
      "127.0.0.1\tlocalhost\n"
513
      "192.168.1.1 router gw\n"
514
      "1.2.3.4\tmyhost.domain.tld myhost\n")
515

    
516
  def testSettingExistingIp(self):
517
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
518
                     ['myhost'])
519

    
520
    self.assertFileContent(self.tmpname,
521
      "# This is a test file for /etc/hosts\n"
522
      "127.0.0.1\tlocalhost\n"
523
      "192.168.1.1\tmyhost.domain.tld myhost\n")
524

    
525
  def testSettingDuplicateName(self):
526
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
527

    
528
    self.assertFileContent(self.tmpname,
529
      "# This is a test file for /etc/hosts\n"
530
      "127.0.0.1\tlocalhost\n"
531
      "192.168.1.1 router gw\n"
532
      "1.2.3.4\tmyhost\n")
533

    
534
  def testRemovingExistingHost(self):
535
    RemoveEtcHostsEntry(self.tmpname, 'router')
536

    
537
    self.assertFileContent(self.tmpname,
538
      "# This is a test file for /etc/hosts\n"
539
      "127.0.0.1\tlocalhost\n"
540
      "192.168.1.1 gw\n")
541

    
542
  def testRemovingSingleExistingHost(self):
543
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
544

    
545
    self.assertFileContent(self.tmpname,
546
      "# This is a test file for /etc/hosts\n"
547
      "192.168.1.1 router gw\n")
548

    
549
  def testRemovingNonExistingHost(self):
550
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
551

    
552
    self.assertFileContent(self.tmpname,
553
      "# This is a test file for /etc/hosts\n"
554
      "127.0.0.1\tlocalhost\n"
555
      "192.168.1.1 router gw\n")
556

    
557
  def testRemovingAlias(self):
558
    RemoveEtcHostsEntry(self.tmpname, 'gw')
559

    
560
    self.assertFileContent(self.tmpname,
561
      "# This is a test file for /etc/hosts\n"
562
      "127.0.0.1\tlocalhost\n"
563
      "192.168.1.1 router\n")
564

    
565

    
566
class TestShellQuoting(unittest.TestCase):
567
  """Test case for shell quoting functions"""
568

    
569
  def testShellQuote(self):
570
    self.assertEqual(ShellQuote('abc'), "abc")
571
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
572
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
573
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
574
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
575

    
576
  def testShellQuoteArgs(self):
577
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
578
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
579
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
580

    
581

    
582
class TestTcpPing(unittest.TestCase):
583
  """Testcase for TCP version of ping - against listen(2)ing port"""
584

    
585
  def setUp(self):
586
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
587
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
588
    self.listenerport = self.listener.getsockname()[1]
589
    self.listener.listen(1)
590

    
591
  def tearDown(self):
592
    self.listener.shutdown(socket.SHUT_RDWR)
593
    del self.listener
594
    del self.listenerport
595

    
596
  def testTcpPingToLocalHostAccept(self):
597
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
598
                         self.listenerport,
599
                         timeout=10,
600
                         live_port_needed=True,
601
                         source=constants.LOCALHOST_IP_ADDRESS,
602
                         ),
603
                 "failed to connect to test listener")
604

    
605
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
606
                         self.listenerport,
607
                         timeout=10,
608
                         live_port_needed=True,
609
                         ),
610
                 "failed to connect to test listener (no source)")
611

    
612

    
613
class TestTcpPingDeaf(unittest.TestCase):
614
  """Testcase for TCP version of ping - against non listen(2)ing port"""
615

    
616
  def setUp(self):
617
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
618
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
619
    self.deaflistenerport = self.deaflistener.getsockname()[1]
620

    
621
  def tearDown(self):
622
    del self.deaflistener
623
    del self.deaflistenerport
624

    
625
  def testTcpPingToLocalHostAcceptDeaf(self):
626
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
627
                        self.deaflistenerport,
628
                        timeout=constants.TCP_PING_TIMEOUT,
629
                        live_port_needed=True,
630
                        source=constants.LOCALHOST_IP_ADDRESS,
631
                        ), # need successful connect(2)
632
                "successfully connected to deaf listener")
633

    
634
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
635
                        self.deaflistenerport,
636
                        timeout=constants.TCP_PING_TIMEOUT,
637
                        live_port_needed=True,
638
                        ), # need successful connect(2)
639
                "successfully connected to deaf listener (no source addr)")
640

    
641
  def testTcpPingToLocalHostNoAccept(self):
642
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
643
                         self.deaflistenerport,
644
                         timeout=constants.TCP_PING_TIMEOUT,
645
                         live_port_needed=False,
646
                         source=constants.LOCALHOST_IP_ADDRESS,
647
                         ), # ECONNREFUSED is OK
648
                 "failed to ping alive host on deaf port")
649

    
650
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
651
                         self.deaflistenerport,
652
                         timeout=constants.TCP_PING_TIMEOUT,
653
                         live_port_needed=False,
654
                         ), # ECONNREFUSED is OK
655
                 "failed to ping alive host on deaf port (no source addr)")
656

    
657

    
658
class TestOwnIpAddress(unittest.TestCase):
659
  """Testcase for OwnIpAddress"""
660

    
661
  def testOwnLoopback(self):
662
    """check having the loopback ip"""
663
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
664
                    "Should own the loopback address")
665

    
666
  def testNowOwnAddress(self):
667
    """check that I don't own an address"""
668

    
669
    # network 192.0.2.0/24 is reserved for test/documentation as per
670
    # rfc 3330, so we *should* not have an address of this range... if
671
    # this fails, we should extend the test to multiple addresses
672
    DST_IP = "192.0.2.1"
673
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
674

    
675

    
676
class TestListVisibleFiles(unittest.TestCase):
677
  """Test case for ListVisibleFiles"""
678

    
679
  def setUp(self):
680
    self.path = tempfile.mkdtemp()
681

    
682
  def tearDown(self):
683
    shutil.rmtree(self.path)
684

    
685
  def _test(self, files, expected):
686
    # Sort a copy
687
    expected = expected[:]
688
    expected.sort()
689

    
690
    for name in files:
691
      f = open(os.path.join(self.path, name), 'w')
692
      try:
693
        f.write("Test\n")
694
      finally:
695
        f.close()
696

    
697
    found = ListVisibleFiles(self.path)
698
    found.sort()
699

    
700
    self.assertEqual(found, expected)
701

    
702
  def testAllVisible(self):
703
    files = ["a", "b", "c"]
704
    expected = files
705
    self._test(files, expected)
706

    
707
  def testNoneVisible(self):
708
    files = [".a", ".b", ".c"]
709
    expected = []
710
    self._test(files, expected)
711

    
712
  def testSomeVisible(self):
713
    files = ["a", "b", ".c"]
714
    expected = ["a", "b"]
715
    self._test(files, expected)
716

    
717

    
718
class TestNewUUID(unittest.TestCase):
719
  """Test case for NewUUID"""
720

    
721
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
722
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
723

    
724
  def runTest(self):
725
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
726

    
727

    
728
class TestUniqueSequence(unittest.TestCase):
729
  """Test case for UniqueSequence"""
730

    
731
  def _test(self, input, expected):
732
    self.assertEqual(utils.UniqueSequence(input), expected)
733

    
734
  def runTest(self):
735
    # Ordered input
736
    self._test([1, 2, 3], [1, 2, 3])
737
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
738
    self._test([1, 2, 2, 3], [1, 2, 3])
739
    self._test([1, 2, 3, 3], [1, 2, 3])
740

    
741
    # Unordered input
742
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
743
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
744

    
745
    # Strings
746
    self._test(["a", "a"], ["a"])
747
    self._test(["a", "b"], ["a", "b"])
748
    self._test(["a", "b", "a"], ["a", "b"])
749

    
750

    
751
class TestFirstFree(unittest.TestCase):
752
  """Test case for the FirstFree function"""
753

    
754
  def test(self):
755
    """Test FirstFree"""
756
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
757
    self.failUnlessEqual(FirstFree([]), None)
758
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
759
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
760
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
761

    
762

    
763
class TestFileLock(unittest.TestCase):
764
  """Test case for the FileLock class"""
765

    
766
  def setUp(self):
767
    self.tmpfile = tempfile.NamedTemporaryFile()
768
    self.lock = utils.FileLock(self.tmpfile.name)
769

    
770
  def testSharedNonblocking(self):
771
    self.lock.Shared(blocking=False)
772
    self.lock.Close()
773

    
774
  def testExclusiveNonblocking(self):
775
    self.lock.Exclusive(blocking=False)
776
    self.lock.Close()
777

    
778
  def testUnlockNonblocking(self):
779
    self.lock.Unlock(blocking=False)
780
    self.lock.Close()
781

    
782
  def testSharedBlocking(self):
783
    self.lock.Shared(blocking=True)
784
    self.lock.Close()
785

    
786
  def testExclusiveBlocking(self):
787
    self.lock.Exclusive(blocking=True)
788
    self.lock.Close()
789

    
790
  def testUnlockBlocking(self):
791
    self.lock.Unlock(blocking=True)
792
    self.lock.Close()
793

    
794
  def testSharedExclusiveUnlock(self):
795
    self.lock.Shared(blocking=False)
796
    self.lock.Exclusive(blocking=False)
797
    self.lock.Unlock(blocking=False)
798
    self.lock.Close()
799

    
800
  def testExclusiveSharedUnlock(self):
801
    self.lock.Exclusive(blocking=False)
802
    self.lock.Shared(blocking=False)
803
    self.lock.Unlock(blocking=False)
804
    self.lock.Close()
805

    
806
  def testCloseShared(self):
807
    self.lock.Close()
808
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
809

    
810
  def testCloseExclusive(self):
811
    self.lock.Close()
812
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
813

    
814
  def testCloseUnlock(self):
815
    self.lock.Close()
816
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
817

    
818

    
819
class TestTimeFunctions(unittest.TestCase):
820
  """Test case for time functions"""
821

    
822
  def runTest(self):
823
    self.assertEqual(utils.SplitTime(1), (1, 0))
824
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
825
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
826
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
827
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
828
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
829
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
830
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
831

    
832
    self.assertRaises(AssertionError, utils.SplitTime, -1)
833

    
834
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
835
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
836
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
837

    
838
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
839
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
840

    
841
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
842
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
843
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
844
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
845
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
846

    
847

    
848
class FieldSetTestCase(unittest.TestCase):
849
  """Test case for FieldSets"""
850

    
851
  def testSimpleMatch(self):
852
    f = utils.FieldSet("a", "b", "c", "def")
853
    self.failUnless(f.Matches("a"))
854
    self.failIf(f.Matches("d"), "Substring matched")
855
    self.failIf(f.Matches("defghi"), "Prefix string matched")
856
    self.failIf(f.NonMatching(["b", "c"]))
857
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
858
    self.failUnless(f.NonMatching(["a", "d"]))
859

    
860
  def testRegexMatch(self):
861
    f = utils.FieldSet("a", "b([0-9]+)", "c")
862
    self.failUnless(f.Matches("b1"))
863
    self.failUnless(f.Matches("b99"))
864
    self.failIf(f.Matches("b/1"))
865
    self.failIf(f.NonMatching(["b12", "c"]))
866
    self.failUnless(f.NonMatching(["a", "1"]))
867

    
868

    
869
if __name__ == '__main__':
870
  unittest.main()