Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 51596eb2

History | View | Annotate | Download (29.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti.utils import IsProcessAlive, RunCmd, \
42
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46
from ganeti.errors import LockError, UnitParseError, GenericError, \
47
     ProgrammerError
48

    
49

    
50
class TestIsProcessAlive(unittest.TestCase):
51
  """Testing case for IsProcessAlive"""
52

    
53
  def testExists(self):
54
    mypid = os.getpid()
55
    self.assert_(IsProcessAlive(mypid),
56
                 "can't find myself running")
57

    
58
  def testNotExisting(self):
59
    pid_non_existing = os.fork()
60
    if pid_non_existing == 0:
61
      os._exit(0)
62
    elif pid_non_existing < 0:
63
      raise SystemError("can't fork")
64
    os.waitpid(pid_non_existing, 0)
65
    self.assert_(not IsProcessAlive(pid_non_existing),
66
                 "nonexisting process detected")
67

    
68

    
69
class TestPidFileFunctions(unittest.TestCase):
70
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
71

    
72
  def setUp(self):
73
    self.dir = tempfile.mkdtemp()
74
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
75
    utils.DaemonPidFileName = self.f_dpn
76

    
77
  def testPidFileFunctions(self):
78
    pid_file = self.f_dpn('test')
79
    utils.WritePidFile('test')
80
    self.failUnless(os.path.exists(pid_file),
81
                    "PID file should have been created")
82
    read_pid = utils.ReadPidFile(pid_file)
83
    self.failUnlessEqual(read_pid, os.getpid())
84
    self.failUnless(utils.IsProcessAlive(read_pid))
85
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
86
    utils.RemovePidFile('test')
87
    self.failIf(os.path.exists(pid_file),
88
                "PID file should not exist anymore")
89
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
90
                         "ReadPidFile should return 0 for missing pid file")
91
    fh = open(pid_file, "w")
92
    fh.write("blah\n")
93
    fh.close()
94
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95
                         "ReadPidFile should return 0 for invalid pid file")
96
    utils.RemovePidFile('test')
97
    self.failIf(os.path.exists(pid_file),
98
                "PID file should not exist anymore")
99

    
100
  def testKill(self):
101
    pid_file = self.f_dpn('child')
102
    r_fd, w_fd = os.pipe()
103
    new_pid = os.fork()
104
    if new_pid == 0: #child
105
      utils.WritePidFile('child')
106
      os.write(w_fd, 'a')
107
      signal.pause()
108
      os._exit(0)
109
      return
110
    # else we are in the parent
111
    # wait until the child has written the pid file
112
    os.read(r_fd, 1)
113
    read_pid = utils.ReadPidFile(pid_file)
114
    self.failUnlessEqual(read_pid, new_pid)
115
    self.failUnless(utils.IsProcessAlive(new_pid))
116
    utils.KillProcess(new_pid, waitpid=True)
117
    self.failIf(utils.IsProcessAlive(new_pid))
118
    utils.RemovePidFile('child')
119
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
120

    
121
  def tearDown(self):
122
    for name in os.listdir(self.dir):
123
      os.unlink(os.path.join(self.dir, name))
124
    os.rmdir(self.dir)
125

    
126

    
127
class TestRunCmd(testutils.GanetiTestCase):
128
  """Testing case for the RunCmd function"""
129

    
130
  def setUp(self):
131
    testutils.GanetiTestCase.setUp(self)
132
    self.magic = time.ctime() + " ganeti test"
133
    self.fname = self._CreateTempFile()
134

    
135
  def testOk(self):
136
    """Test successful exit code"""
137
    result = RunCmd("/bin/sh -c 'exit 0'")
138
    self.assertEqual(result.exit_code, 0)
139
    self.assertEqual(result.output, "")
140

    
141
  def testFail(self):
142
    """Test fail exit code"""
143
    result = RunCmd("/bin/sh -c 'exit 1'")
144
    self.assertEqual(result.exit_code, 1)
145
    self.assertEqual(result.output, "")
146

    
147
  def testStdout(self):
148
    """Test standard output"""
149
    cmd = 'echo -n "%s"' % self.magic
150
    result = RunCmd("/bin/sh -c '%s'" % cmd)
151
    self.assertEqual(result.stdout, self.magic)
152
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
153
    self.assertEqual(result.output, "")
154
    self.assertFileContent(self.fname, self.magic)
155

    
156
  def testStderr(self):
157
    """Test standard error"""
158
    cmd = 'echo -n "%s"' % self.magic
159
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
160
    self.assertEqual(result.stderr, self.magic)
161
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
162
    self.assertEqual(result.output, "")
163
    self.assertFileContent(self.fname, self.magic)
164

    
165
  def testCombined(self):
166
    """Test combined output"""
167
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
168
    expected = "A" + self.magic + "B" + self.magic
169
    result = RunCmd("/bin/sh -c '%s'" % cmd)
170
    self.assertEqual(result.output, expected)
171
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
172
    self.assertEqual(result.output, "")
173
    self.assertFileContent(self.fname, expected)
174

    
175
  def testSignal(self):
176
    """Test signal"""
177
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
178
    self.assertEqual(result.signal, 15)
179
    self.assertEqual(result.output, "")
180

    
181
  def testListRun(self):
182
    """Test list runs"""
183
    result = RunCmd(["true"])
184
    self.assertEqual(result.signal, None)
185
    self.assertEqual(result.exit_code, 0)
186
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
187
    self.assertEqual(result.signal, None)
188
    self.assertEqual(result.exit_code, 1)
189
    result = RunCmd(["echo", "-n", self.magic])
190
    self.assertEqual(result.signal, None)
191
    self.assertEqual(result.exit_code, 0)
192
    self.assertEqual(result.stdout, self.magic)
193

    
194
  def testFileEmptyOutput(self):
195
    """Test file output"""
196
    result = RunCmd(["true"], output=self.fname)
197
    self.assertEqual(result.signal, None)
198
    self.assertEqual(result.exit_code, 0)
199
    self.assertFileContent(self.fname, "")
200

    
201
  def testLang(self):
202
    """Test locale environment"""
203
    old_env = os.environ.copy()
204
    try:
205
      os.environ["LANG"] = "en_US.UTF-8"
206
      os.environ["LC_ALL"] = "en_US.UTF-8"
207
      result = RunCmd(["locale"])
208
      for line in result.output.splitlines():
209
        key, value = line.split("=", 1)
210
        # Ignore these variables, they're overridden by LC_ALL
211
        if key == "LANG" or key == "LANGUAGE":
212
          continue
213
        self.failIf(value and value != "C" and value != '"C"',
214
            "Variable %s is set to the invalid value '%s'" % (key, value))
215
    finally:
216
      os.environ = old_env
217

    
218
  def testDefaultCwd(self):
219
    """Test default working directory"""
220
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
221

    
222
  def testCwd(self):
223
    """Test default working directory"""
224
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
225
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
226
    cwd = os.getcwd()
227
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
228

    
229

    
230
class TestRemoveFile(unittest.TestCase):
231
  """Test case for the RemoveFile function"""
232

    
233
  def setUp(self):
234
    """Create a temp dir and file for each case"""
235
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
236
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
237
    os.close(fd)
238

    
239
  def tearDown(self):
240
    if os.path.exists(self.tmpfile):
241
      os.unlink(self.tmpfile)
242
    os.rmdir(self.tmpdir)
243

    
244

    
245
  def testIgnoreDirs(self):
246
    """Test that RemoveFile() ignores directories"""
247
    self.assertEqual(None, RemoveFile(self.tmpdir))
248

    
249

    
250
  def testIgnoreNotExisting(self):
251
    """Test that RemoveFile() ignores non-existing files"""
252
    RemoveFile(self.tmpfile)
253
    RemoveFile(self.tmpfile)
254

    
255

    
256
  def testRemoveFile(self):
257
    """Test that RemoveFile does remove a file"""
258
    RemoveFile(self.tmpfile)
259
    if os.path.exists(self.tmpfile):
260
      self.fail("File '%s' not removed" % self.tmpfile)
261

    
262

    
263
  def testRemoveSymlink(self):
264
    """Test that RemoveFile does remove symlinks"""
265
    symlink = self.tmpdir + "/symlink"
266
    os.symlink("no-such-file", symlink)
267
    RemoveFile(symlink)
268
    if os.path.exists(symlink):
269
      self.fail("File '%s' not removed" % symlink)
270
    os.symlink(self.tmpfile, symlink)
271
    RemoveFile(symlink)
272
    if os.path.exists(symlink):
273
      self.fail("File '%s' not removed" % symlink)
274

    
275

    
276
class TestRename(unittest.TestCase):
277
  """Test case for RenameFile"""
278

    
279
  def setUp(self):
280
    """Create a temporary directory"""
281
    self.tmpdir = tempfile.mkdtemp()
282
    self.tmpfile = os.path.join(self.tmpdir, "test1")
283

    
284
    # Touch the file
285
    open(self.tmpfile, "w").close()
286

    
287
  def tearDown(self):
288
    """Remove temporary directory"""
289
    shutil.rmtree(self.tmpdir)
290

    
291
  def testSimpleRename1(self):
292
    """Simple rename 1"""
293
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
294

    
295
  def testSimpleRename2(self):
296
    """Simple rename 2"""
297
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
298
                     mkdir=True)
299

    
300
  def testRenameMkdir(self):
301
    """Rename with mkdir"""
302
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
303
                     mkdir=True)
304

    
305

    
306
class TestCheckdict(unittest.TestCase):
307
  """Test case for the CheckDict function"""
308

    
309
  def testAdd(self):
310
    """Test that CheckDict adds a missing key with the correct value"""
311

    
312
    tgt = {'a':1}
313
    tmpl = {'b': 2}
314
    CheckDict(tgt, tmpl)
315
    if 'b' not in tgt or tgt['b'] != 2:
316
      self.fail("Failed to update dict")
317

    
318

    
319
  def testNoUpdate(self):
320
    """Test that CheckDict does not overwrite an existing key"""
321
    tgt = {'a':1, 'b': 3}
322
    tmpl = {'b': 2}
323
    CheckDict(tgt, tmpl)
324
    self.failUnlessEqual(tgt['b'], 3)
325

    
326

    
327
class TestMatchNameComponent(unittest.TestCase):
328
  """Test case for the MatchNameComponent function"""
329

    
330
  def testEmptyList(self):
331
    """Test that there is no match against an empty list"""
332

    
333
    self.failUnlessEqual(MatchNameComponent("", []), None)
334
    self.failUnlessEqual(MatchNameComponent("test", []), None)
335

    
336
  def testSingleMatch(self):
337
    """Test that a single match is performed correctly"""
338
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
339
    for key in "test2", "test2.example", "test2.example.com":
340
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
341

    
342
  def testMultipleMatches(self):
343
    """Test that a multiple match is returned as None"""
344
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
345
    for key in "test1", "test1.example":
346
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
347

    
348

    
349
class TestFormatUnit(unittest.TestCase):
350
  """Test case for the FormatUnit function"""
351

    
352
  def testMiB(self):
353
    self.assertEqual(FormatUnit(1, 'h'), '1M')
354
    self.assertEqual(FormatUnit(100, 'h'), '100M')
355
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
356

    
357
    self.assertEqual(FormatUnit(1, 'm'), '1')
358
    self.assertEqual(FormatUnit(100, 'm'), '100')
359
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
360

    
361
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
362
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
363
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
364
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
365

    
366
  def testGiB(self):
367
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
368
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
369
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
370
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
371

    
372
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
373
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
374
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
375
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
376

    
377
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
378
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
379
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
380

    
381
  def testTiB(self):
382
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
383
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
384
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
385

    
386
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
387
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
388
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
389

    
390
class TestParseUnit(unittest.TestCase):
391
  """Test case for the ParseUnit function"""
392

    
393
  SCALES = (('', 1),
394
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
395
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
396
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
397

    
398
  def testRounding(self):
399
    self.assertEqual(ParseUnit('0'), 0)
400
    self.assertEqual(ParseUnit('1'), 4)
401
    self.assertEqual(ParseUnit('2'), 4)
402
    self.assertEqual(ParseUnit('3'), 4)
403

    
404
    self.assertEqual(ParseUnit('124'), 124)
405
    self.assertEqual(ParseUnit('125'), 128)
406
    self.assertEqual(ParseUnit('126'), 128)
407
    self.assertEqual(ParseUnit('127'), 128)
408
    self.assertEqual(ParseUnit('128'), 128)
409
    self.assertEqual(ParseUnit('129'), 132)
410
    self.assertEqual(ParseUnit('130'), 132)
411

    
412
  def testFloating(self):
413
    self.assertEqual(ParseUnit('0'), 0)
414
    self.assertEqual(ParseUnit('0.5'), 4)
415
    self.assertEqual(ParseUnit('1.75'), 4)
416
    self.assertEqual(ParseUnit('1.99'), 4)
417
    self.assertEqual(ParseUnit('2.00'), 4)
418
    self.assertEqual(ParseUnit('2.01'), 4)
419
    self.assertEqual(ParseUnit('3.99'), 4)
420
    self.assertEqual(ParseUnit('4.00'), 4)
421
    self.assertEqual(ParseUnit('4.01'), 8)
422
    self.assertEqual(ParseUnit('1.5G'), 1536)
423
    self.assertEqual(ParseUnit('1.8G'), 1844)
424
    self.assertEqual(ParseUnit('8.28T'), 8682212)
425

    
426
  def testSuffixes(self):
427
    for sep in ('', ' ', '   ', "\t", "\t "):
428
      for suffix, scale in TestParseUnit.SCALES:
429
        for func in (lambda x: x, str.lower, str.upper):
430
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
431
                           1024 * scale)
432

    
433
  def testInvalidInput(self):
434
    for sep in ('-', '_', ',', 'a'):
435
      for suffix, _ in TestParseUnit.SCALES:
436
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
437

    
438
    for suffix, _ in TestParseUnit.SCALES:
439
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
440

    
441

    
442
class TestSshKeys(testutils.GanetiTestCase):
443
  """Test case for the AddAuthorizedKey function"""
444

    
445
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
446
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
447
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
448

    
449
  def setUp(self):
450
    testutils.GanetiTestCase.setUp(self)
451
    self.tmpname = self._CreateTempFile()
452
    handle = open(self.tmpname, 'w')
453
    try:
454
      handle.write("%s\n" % TestSshKeys.KEY_A)
455
      handle.write("%s\n" % TestSshKeys.KEY_B)
456
    finally:
457
      handle.close()
458

    
459
  def testAddingNewKey(self):
460
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
461

    
462
    self.assertFileContent(self.tmpname,
463
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
464
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
465
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
466
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
467

    
468
  def testAddingAlmostButNotCompletelyTheSameKey(self):
469
    AddAuthorizedKey(self.tmpname,
470
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
471

    
472
    self.assertFileContent(self.tmpname,
473
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
474
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
475
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
476
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
477

    
478
  def testAddingExistingKeyWithSomeMoreSpaces(self):
479
    AddAuthorizedKey(self.tmpname,
480
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
481

    
482
    self.assertFileContent(self.tmpname,
483
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
484
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
485
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
486

    
487
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
488
    RemoveAuthorizedKey(self.tmpname,
489
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
490

    
491
    self.assertFileContent(self.tmpname,
492
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
493
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
494

    
495
  def testRemovingNonExistingKey(self):
496
    RemoveAuthorizedKey(self.tmpname,
497
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
498

    
499
    self.assertFileContent(self.tmpname,
500
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
501
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
502
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
503

    
504

    
505
class TestEtcHosts(testutils.GanetiTestCase):
506
  """Test functions modifying /etc/hosts"""
507

    
508
  def setUp(self):
509
    testutils.GanetiTestCase.setUp(self)
510
    self.tmpname = self._CreateTempFile()
511
    handle = open(self.tmpname, 'w')
512
    try:
513
      handle.write('# This is a test file for /etc/hosts\n')
514
      handle.write('127.0.0.1\tlocalhost\n')
515
      handle.write('192.168.1.1 router gw\n')
516
    finally:
517
      handle.close()
518

    
519
  def testSettingNewIp(self):
520
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
521

    
522
    self.assertFileContent(self.tmpname,
523
      "# This is a test file for /etc/hosts\n"
524
      "127.0.0.1\tlocalhost\n"
525
      "192.168.1.1 router gw\n"
526
      "1.2.3.4\tmyhost.domain.tld myhost\n")
527

    
528
  def testSettingExistingIp(self):
529
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
530
                     ['myhost'])
531

    
532
    self.assertFileContent(self.tmpname,
533
      "# This is a test file for /etc/hosts\n"
534
      "127.0.0.1\tlocalhost\n"
535
      "192.168.1.1\tmyhost.domain.tld myhost\n")
536

    
537
  def testSettingDuplicateName(self):
538
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
539

    
540
    self.assertFileContent(self.tmpname,
541
      "# This is a test file for /etc/hosts\n"
542
      "127.0.0.1\tlocalhost\n"
543
      "192.168.1.1 router gw\n"
544
      "1.2.3.4\tmyhost\n")
545

    
546
  def testRemovingExistingHost(self):
547
    RemoveEtcHostsEntry(self.tmpname, 'router')
548

    
549
    self.assertFileContent(self.tmpname,
550
      "# This is a test file for /etc/hosts\n"
551
      "127.0.0.1\tlocalhost\n"
552
      "192.168.1.1 gw\n")
553

    
554
  def testRemovingSingleExistingHost(self):
555
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
556

    
557
    self.assertFileContent(self.tmpname,
558
      "# This is a test file for /etc/hosts\n"
559
      "192.168.1.1 router gw\n")
560

    
561
  def testRemovingNonExistingHost(self):
562
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
563

    
564
    self.assertFileContent(self.tmpname,
565
      "# This is a test file for /etc/hosts\n"
566
      "127.0.0.1\tlocalhost\n"
567
      "192.168.1.1 router gw\n")
568

    
569
  def testRemovingAlias(self):
570
    RemoveEtcHostsEntry(self.tmpname, 'gw')
571

    
572
    self.assertFileContent(self.tmpname,
573
      "# This is a test file for /etc/hosts\n"
574
      "127.0.0.1\tlocalhost\n"
575
      "192.168.1.1 router\n")
576

    
577

    
578
class TestShellQuoting(unittest.TestCase):
579
  """Test case for shell quoting functions"""
580

    
581
  def testShellQuote(self):
582
    self.assertEqual(ShellQuote('abc'), "abc")
583
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
584
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
585
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
586
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
587

    
588
  def testShellQuoteArgs(self):
589
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
590
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
591
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
592

    
593

    
594
class TestTcpPing(unittest.TestCase):
595
  """Testcase for TCP version of ping - against listen(2)ing port"""
596

    
597
  def setUp(self):
598
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
599
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
600
    self.listenerport = self.listener.getsockname()[1]
601
    self.listener.listen(1)
602

    
603
  def tearDown(self):
604
    self.listener.shutdown(socket.SHUT_RDWR)
605
    del self.listener
606
    del self.listenerport
607

    
608
  def testTcpPingToLocalHostAccept(self):
609
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
610
                         self.listenerport,
611
                         timeout=10,
612
                         live_port_needed=True,
613
                         source=constants.LOCALHOST_IP_ADDRESS,
614
                         ),
615
                 "failed to connect to test listener")
616

    
617
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
618
                         self.listenerport,
619
                         timeout=10,
620
                         live_port_needed=True,
621
                         ),
622
                 "failed to connect to test listener (no source)")
623

    
624

    
625
class TestTcpPingDeaf(unittest.TestCase):
626
  """Testcase for TCP version of ping - against non listen(2)ing port"""
627

    
628
  def setUp(self):
629
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
630
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
631
    self.deaflistenerport = self.deaflistener.getsockname()[1]
632

    
633
  def tearDown(self):
634
    del self.deaflistener
635
    del self.deaflistenerport
636

    
637
  def testTcpPingToLocalHostAcceptDeaf(self):
638
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
639
                        self.deaflistenerport,
640
                        timeout=constants.TCP_PING_TIMEOUT,
641
                        live_port_needed=True,
642
                        source=constants.LOCALHOST_IP_ADDRESS,
643
                        ), # need successful connect(2)
644
                "successfully connected to deaf listener")
645

    
646
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
647
                        self.deaflistenerport,
648
                        timeout=constants.TCP_PING_TIMEOUT,
649
                        live_port_needed=True,
650
                        ), # need successful connect(2)
651
                "successfully connected to deaf listener (no source addr)")
652

    
653
  def testTcpPingToLocalHostNoAccept(self):
654
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
655
                         self.deaflistenerport,
656
                         timeout=constants.TCP_PING_TIMEOUT,
657
                         live_port_needed=False,
658
                         source=constants.LOCALHOST_IP_ADDRESS,
659
                         ), # ECONNREFUSED is OK
660
                 "failed to ping alive host on deaf port")
661

    
662
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
663
                         self.deaflistenerport,
664
                         timeout=constants.TCP_PING_TIMEOUT,
665
                         live_port_needed=False,
666
                         ), # ECONNREFUSED is OK
667
                 "failed to ping alive host on deaf port (no source addr)")
668

    
669

    
670
class TestOwnIpAddress(unittest.TestCase):
671
  """Testcase for OwnIpAddress"""
672

    
673
  def testOwnLoopback(self):
674
    """check having the loopback ip"""
675
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
676
                    "Should own the loopback address")
677

    
678
  def testNowOwnAddress(self):
679
    """check that I don't own an address"""
680

    
681
    # network 192.0.2.0/24 is reserved for test/documentation as per
682
    # rfc 3330, so we *should* not have an address of this range... if
683
    # this fails, we should extend the test to multiple addresses
684
    DST_IP = "192.0.2.1"
685
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
686

    
687

    
688
class TestListVisibleFiles(unittest.TestCase):
689
  """Test case for ListVisibleFiles"""
690

    
691
  def setUp(self):
692
    self.path = tempfile.mkdtemp()
693

    
694
  def tearDown(self):
695
    shutil.rmtree(self.path)
696

    
697
  def _test(self, files, expected):
698
    # Sort a copy
699
    expected = expected[:]
700
    expected.sort()
701

    
702
    for name in files:
703
      f = open(os.path.join(self.path, name), 'w')
704
      try:
705
        f.write("Test\n")
706
      finally:
707
        f.close()
708

    
709
    found = ListVisibleFiles(self.path)
710
    found.sort()
711

    
712
    self.assertEqual(found, expected)
713

    
714
  def testAllVisible(self):
715
    files = ["a", "b", "c"]
716
    expected = files
717
    self._test(files, expected)
718

    
719
  def testNoneVisible(self):
720
    files = [".a", ".b", ".c"]
721
    expected = []
722
    self._test(files, expected)
723

    
724
  def testSomeVisible(self):
725
    files = ["a", "b", ".c"]
726
    expected = ["a", "b"]
727
    self._test(files, expected)
728

    
729

    
730
class TestNewUUID(unittest.TestCase):
731
  """Test case for NewUUID"""
732

    
733
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
734
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
735

    
736
  def runTest(self):
737
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
738

    
739

    
740
class TestUniqueSequence(unittest.TestCase):
741
  """Test case for UniqueSequence"""
742

    
743
  def _test(self, input, expected):
744
    self.assertEqual(utils.UniqueSequence(input), expected)
745

    
746
  def runTest(self):
747
    # Ordered input
748
    self._test([1, 2, 3], [1, 2, 3])
749
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
750
    self._test([1, 2, 2, 3], [1, 2, 3])
751
    self._test([1, 2, 3, 3], [1, 2, 3])
752

    
753
    # Unordered input
754
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
755
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
756

    
757
    # Strings
758
    self._test(["a", "a"], ["a"])
759
    self._test(["a", "b"], ["a", "b"])
760
    self._test(["a", "b", "a"], ["a", "b"])
761

    
762

    
763
class TestFirstFree(unittest.TestCase):
764
  """Test case for the FirstFree function"""
765

    
766
  def test(self):
767
    """Test FirstFree"""
768
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
769
    self.failUnlessEqual(FirstFree([]), None)
770
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
771
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
772
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
773

    
774

    
775
class TestFileLock(unittest.TestCase):
776
  """Test case for the FileLock class"""
777

    
778
  def setUp(self):
779
    self.tmpfile = tempfile.NamedTemporaryFile()
780
    self.lock = utils.FileLock(self.tmpfile.name)
781

    
782
  def testSharedNonblocking(self):
783
    self.lock.Shared(blocking=False)
784
    self.lock.Close()
785

    
786
  def testExclusiveNonblocking(self):
787
    self.lock.Exclusive(blocking=False)
788
    self.lock.Close()
789

    
790
  def testUnlockNonblocking(self):
791
    self.lock.Unlock(blocking=False)
792
    self.lock.Close()
793

    
794
  def testSharedBlocking(self):
795
    self.lock.Shared(blocking=True)
796
    self.lock.Close()
797

    
798
  def testExclusiveBlocking(self):
799
    self.lock.Exclusive(blocking=True)
800
    self.lock.Close()
801

    
802
  def testUnlockBlocking(self):
803
    self.lock.Unlock(blocking=True)
804
    self.lock.Close()
805

    
806
  def testSharedExclusiveUnlock(self):
807
    self.lock.Shared(blocking=False)
808
    self.lock.Exclusive(blocking=False)
809
    self.lock.Unlock(blocking=False)
810
    self.lock.Close()
811

    
812
  def testExclusiveSharedUnlock(self):
813
    self.lock.Exclusive(blocking=False)
814
    self.lock.Shared(blocking=False)
815
    self.lock.Unlock(blocking=False)
816
    self.lock.Close()
817

    
818
  def testCloseShared(self):
819
    self.lock.Close()
820
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
821

    
822
  def testCloseExclusive(self):
823
    self.lock.Close()
824
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
825

    
826
  def testCloseUnlock(self):
827
    self.lock.Close()
828
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
829

    
830

    
831
class TestTimeFunctions(unittest.TestCase):
832
  """Test case for time functions"""
833

    
834
  def runTest(self):
835
    self.assertEqual(utils.SplitTime(1), (1, 0))
836
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
837
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
838
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
839
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
840
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
841
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
842
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
843

    
844
    self.assertRaises(AssertionError, utils.SplitTime, -1)
845

    
846
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
847
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
848
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
849

    
850
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
851
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
852

    
853
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
854
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
855
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
856
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
857
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
858

    
859

    
860
class FieldSetTestCase(unittest.TestCase):
861
  """Test case for FieldSets"""
862

    
863
  def testSimpleMatch(self):
864
    f = utils.FieldSet("a", "b", "c", "def")
865
    self.failUnless(f.Matches("a"))
866
    self.failIf(f.Matches("d"), "Substring matched")
867
    self.failIf(f.Matches("defghi"), "Prefix string matched")
868
    self.failIf(f.NonMatching(["b", "c"]))
869
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
870
    self.failUnless(f.NonMatching(["a", "d"]))
871

    
872
  def testRegexMatch(self):
873
    f = utils.FieldSet("a", "b([0-9]+)", "c")
874
    self.failUnless(f.Matches("b1"))
875
    self.failUnless(f.Matches("b99"))
876
    self.failIf(f.Matches("b/1"))
877
    self.failIf(f.NonMatching(["b12", "c"]))
878
    self.failUnless(f.NonMatching(["a", "1"]))
879

    
880

    
881
if __name__ == '__main__':
882
  unittest.main()