Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ f65f63ef

History | View | Annotate | Download (30.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti.utils import IsProcessAlive, RunCmd, \
42
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
46
     TailFile
47

    
48
from ganeti.errors import LockError, UnitParseError, GenericError, \
49
     ProgrammerError
50

    
51

    
52
class TestIsProcessAlive(unittest.TestCase):
53
  """Testing case for IsProcessAlive"""
54

    
55
  def testExists(self):
56
    mypid = os.getpid()
57
    self.assert_(IsProcessAlive(mypid),
58
                 "can't find myself running")
59

    
60
  def testNotExisting(self):
61
    pid_non_existing = os.fork()
62
    if pid_non_existing == 0:
63
      os._exit(0)
64
    elif pid_non_existing < 0:
65
      raise SystemError("can't fork")
66
    os.waitpid(pid_non_existing, 0)
67
    self.assert_(not IsProcessAlive(pid_non_existing),
68
                 "nonexisting process detected")
69

    
70

    
71
class TestPidFileFunctions(unittest.TestCase):
72
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
73

    
74
  def setUp(self):
75
    self.dir = tempfile.mkdtemp()
76
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
77
    utils.DaemonPidFileName = self.f_dpn
78

    
79
  def testPidFileFunctions(self):
80
    pid_file = self.f_dpn('test')
81
    utils.WritePidFile('test')
82
    self.failUnless(os.path.exists(pid_file),
83
                    "PID file should have been created")
84
    read_pid = utils.ReadPidFile(pid_file)
85
    self.failUnlessEqual(read_pid, os.getpid())
86
    self.failUnless(utils.IsProcessAlive(read_pid))
87
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
88
    utils.RemovePidFile('test')
89
    self.failIf(os.path.exists(pid_file),
90
                "PID file should not exist anymore")
91
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
92
                         "ReadPidFile should return 0 for missing pid file")
93
    fh = open(pid_file, "w")
94
    fh.write("blah\n")
95
    fh.close()
96
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
97
                         "ReadPidFile should return 0 for invalid pid file")
98
    utils.RemovePidFile('test')
99
    self.failIf(os.path.exists(pid_file),
100
                "PID file should not exist anymore")
101

    
102
  def testKill(self):
103
    pid_file = self.f_dpn('child')
104
    r_fd, w_fd = os.pipe()
105
    new_pid = os.fork()
106
    if new_pid == 0: #child
107
      utils.WritePidFile('child')
108
      os.write(w_fd, 'a')
109
      signal.pause()
110
      os._exit(0)
111
      return
112
    # else we are in the parent
113
    # wait until the child has written the pid file
114
    os.read(r_fd, 1)
115
    read_pid = utils.ReadPidFile(pid_file)
116
    self.failUnlessEqual(read_pid, new_pid)
117
    self.failUnless(utils.IsProcessAlive(new_pid))
118
    utils.KillProcess(new_pid, waitpid=True)
119
    self.failIf(utils.IsProcessAlive(new_pid))
120
    utils.RemovePidFile('child')
121
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
122

    
123
  def tearDown(self):
124
    for name in os.listdir(self.dir):
125
      os.unlink(os.path.join(self.dir, name))
126
    os.rmdir(self.dir)
127

    
128

    
129
class TestRunCmd(testutils.GanetiTestCase):
130
  """Testing case for the RunCmd function"""
131

    
132
  def setUp(self):
133
    testutils.GanetiTestCase.setUp(self)
134
    self.magic = time.ctime() + " ganeti test"
135
    self.fname = self._CreateTempFile()
136

    
137
  def testOk(self):
138
    """Test successful exit code"""
139
    result = RunCmd("/bin/sh -c 'exit 0'")
140
    self.assertEqual(result.exit_code, 0)
141
    self.assertEqual(result.output, "")
142

    
143
  def testFail(self):
144
    """Test fail exit code"""
145
    result = RunCmd("/bin/sh -c 'exit 1'")
146
    self.assertEqual(result.exit_code, 1)
147
    self.assertEqual(result.output, "")
148

    
149
  def testStdout(self):
150
    """Test standard output"""
151
    cmd = 'echo -n "%s"' % self.magic
152
    result = RunCmd("/bin/sh -c '%s'" % cmd)
153
    self.assertEqual(result.stdout, self.magic)
154
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
155
    self.assertEqual(result.output, "")
156
    self.assertFileContent(self.fname, self.magic)
157

    
158
  def testStderr(self):
159
    """Test standard error"""
160
    cmd = 'echo -n "%s"' % self.magic
161
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
162
    self.assertEqual(result.stderr, self.magic)
163
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
164
    self.assertEqual(result.output, "")
165
    self.assertFileContent(self.fname, self.magic)
166

    
167
  def testCombined(self):
168
    """Test combined output"""
169
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
170
    expected = "A" + self.magic + "B" + self.magic
171
    result = RunCmd("/bin/sh -c '%s'" % cmd)
172
    self.assertEqual(result.output, expected)
173
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
174
    self.assertEqual(result.output, "")
175
    self.assertFileContent(self.fname, expected)
176

    
177
  def testSignal(self):
178
    """Test signal"""
179
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
180
    self.assertEqual(result.signal, 15)
181
    self.assertEqual(result.output, "")
182

    
183
  def testListRun(self):
184
    """Test list runs"""
185
    result = RunCmd(["true"])
186
    self.assertEqual(result.signal, None)
187
    self.assertEqual(result.exit_code, 0)
188
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
189
    self.assertEqual(result.signal, None)
190
    self.assertEqual(result.exit_code, 1)
191
    result = RunCmd(["echo", "-n", self.magic])
192
    self.assertEqual(result.signal, None)
193
    self.assertEqual(result.exit_code, 0)
194
    self.assertEqual(result.stdout, self.magic)
195

    
196
  def testFileEmptyOutput(self):
197
    """Test file output"""
198
    result = RunCmd(["true"], output=self.fname)
199
    self.assertEqual(result.signal, None)
200
    self.assertEqual(result.exit_code, 0)
201
    self.assertFileContent(self.fname, "")
202

    
203
  def testLang(self):
204
    """Test locale environment"""
205
    old_env = os.environ.copy()
206
    try:
207
      os.environ["LANG"] = "en_US.UTF-8"
208
      os.environ["LC_ALL"] = "en_US.UTF-8"
209
      result = RunCmd(["locale"])
210
      for line in result.output.splitlines():
211
        key, value = line.split("=", 1)
212
        # Ignore these variables, they're overridden by LC_ALL
213
        if key == "LANG" or key == "LANGUAGE":
214
          continue
215
        self.failIf(value and value != "C" and value != '"C"',
216
            "Variable %s is set to the invalid value '%s'" % (key, value))
217
    finally:
218
      os.environ = old_env
219

    
220
  def testDefaultCwd(self):
221
    """Test default working directory"""
222
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
223

    
224
  def testCwd(self):
225
    """Test default working directory"""
226
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
227
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
228
    cwd = os.getcwd()
229
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
230

    
231

    
232
class TestRemoveFile(unittest.TestCase):
233
  """Test case for the RemoveFile function"""
234

    
235
  def setUp(self):
236
    """Create a temp dir and file for each case"""
237
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
238
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
239
    os.close(fd)
240

    
241
  def tearDown(self):
242
    if os.path.exists(self.tmpfile):
243
      os.unlink(self.tmpfile)
244
    os.rmdir(self.tmpdir)
245

    
246

    
247
  def testIgnoreDirs(self):
248
    """Test that RemoveFile() ignores directories"""
249
    self.assertEqual(None, RemoveFile(self.tmpdir))
250

    
251

    
252
  def testIgnoreNotExisting(self):
253
    """Test that RemoveFile() ignores non-existing files"""
254
    RemoveFile(self.tmpfile)
255
    RemoveFile(self.tmpfile)
256

    
257

    
258
  def testRemoveFile(self):
259
    """Test that RemoveFile does remove a file"""
260
    RemoveFile(self.tmpfile)
261
    if os.path.exists(self.tmpfile):
262
      self.fail("File '%s' not removed" % self.tmpfile)
263

    
264

    
265
  def testRemoveSymlink(self):
266
    """Test that RemoveFile does remove symlinks"""
267
    symlink = self.tmpdir + "/symlink"
268
    os.symlink("no-such-file", symlink)
269
    RemoveFile(symlink)
270
    if os.path.exists(symlink):
271
      self.fail("File '%s' not removed" % symlink)
272
    os.symlink(self.tmpfile, symlink)
273
    RemoveFile(symlink)
274
    if os.path.exists(symlink):
275
      self.fail("File '%s' not removed" % symlink)
276

    
277

    
278
class TestRename(unittest.TestCase):
279
  """Test case for RenameFile"""
280

    
281
  def setUp(self):
282
    """Create a temporary directory"""
283
    self.tmpdir = tempfile.mkdtemp()
284
    self.tmpfile = os.path.join(self.tmpdir, "test1")
285

    
286
    # Touch the file
287
    open(self.tmpfile, "w").close()
288

    
289
  def tearDown(self):
290
    """Remove temporary directory"""
291
    shutil.rmtree(self.tmpdir)
292

    
293
  def testSimpleRename1(self):
294
    """Simple rename 1"""
295
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
296

    
297
  def testSimpleRename2(self):
298
    """Simple rename 2"""
299
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
300
                     mkdir=True)
301

    
302
  def testRenameMkdir(self):
303
    """Rename with mkdir"""
304
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
305
                     mkdir=True)
306

    
307

    
308
class TestCheckdict(unittest.TestCase):
309
  """Test case for the CheckDict function"""
310

    
311
  def testAdd(self):
312
    """Test that CheckDict adds a missing key with the correct value"""
313

    
314
    tgt = {'a':1}
315
    tmpl = {'b': 2}
316
    CheckDict(tgt, tmpl)
317
    if 'b' not in tgt or tgt['b'] != 2:
318
      self.fail("Failed to update dict")
319

    
320

    
321
  def testNoUpdate(self):
322
    """Test that CheckDict does not overwrite an existing key"""
323
    tgt = {'a':1, 'b': 3}
324
    tmpl = {'b': 2}
325
    CheckDict(tgt, tmpl)
326
    self.failUnlessEqual(tgt['b'], 3)
327

    
328

    
329
class TestMatchNameComponent(unittest.TestCase):
330
  """Test case for the MatchNameComponent function"""
331

    
332
  def testEmptyList(self):
333
    """Test that there is no match against an empty list"""
334

    
335
    self.failUnlessEqual(MatchNameComponent("", []), None)
336
    self.failUnlessEqual(MatchNameComponent("test", []), None)
337

    
338
  def testSingleMatch(self):
339
    """Test that a single match is performed correctly"""
340
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
341
    for key in "test2", "test2.example", "test2.example.com":
342
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
343

    
344
  def testMultipleMatches(self):
345
    """Test that a multiple match is returned as None"""
346
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
347
    for key in "test1", "test1.example":
348
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
349

    
350

    
351
class TestFormatUnit(unittest.TestCase):
352
  """Test case for the FormatUnit function"""
353

    
354
  def testMiB(self):
355
    self.assertEqual(FormatUnit(1, 'h'), '1M')
356
    self.assertEqual(FormatUnit(100, 'h'), '100M')
357
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
358

    
359
    self.assertEqual(FormatUnit(1, 'm'), '1')
360
    self.assertEqual(FormatUnit(100, 'm'), '100')
361
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
362

    
363
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
364
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
365
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
366
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
367

    
368
  def testGiB(self):
369
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
370
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
371
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
372
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
373

    
374
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
375
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
376
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
377
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
378

    
379
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
380
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
381
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
382

    
383
  def testTiB(self):
384
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
385
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
386
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
387

    
388
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
389
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
390
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
391

    
392
class TestParseUnit(unittest.TestCase):
393
  """Test case for the ParseUnit function"""
394

    
395
  SCALES = (('', 1),
396
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
397
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
398
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
399

    
400
  def testRounding(self):
401
    self.assertEqual(ParseUnit('0'), 0)
402
    self.assertEqual(ParseUnit('1'), 4)
403
    self.assertEqual(ParseUnit('2'), 4)
404
    self.assertEqual(ParseUnit('3'), 4)
405

    
406
    self.assertEqual(ParseUnit('124'), 124)
407
    self.assertEqual(ParseUnit('125'), 128)
408
    self.assertEqual(ParseUnit('126'), 128)
409
    self.assertEqual(ParseUnit('127'), 128)
410
    self.assertEqual(ParseUnit('128'), 128)
411
    self.assertEqual(ParseUnit('129'), 132)
412
    self.assertEqual(ParseUnit('130'), 132)
413

    
414
  def testFloating(self):
415
    self.assertEqual(ParseUnit('0'), 0)
416
    self.assertEqual(ParseUnit('0.5'), 4)
417
    self.assertEqual(ParseUnit('1.75'), 4)
418
    self.assertEqual(ParseUnit('1.99'), 4)
419
    self.assertEqual(ParseUnit('2.00'), 4)
420
    self.assertEqual(ParseUnit('2.01'), 4)
421
    self.assertEqual(ParseUnit('3.99'), 4)
422
    self.assertEqual(ParseUnit('4.00'), 4)
423
    self.assertEqual(ParseUnit('4.01'), 8)
424
    self.assertEqual(ParseUnit('1.5G'), 1536)
425
    self.assertEqual(ParseUnit('1.8G'), 1844)
426
    self.assertEqual(ParseUnit('8.28T'), 8682212)
427

    
428
  def testSuffixes(self):
429
    for sep in ('', ' ', '   ', "\t", "\t "):
430
      for suffix, scale in TestParseUnit.SCALES:
431
        for func in (lambda x: x, str.lower, str.upper):
432
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
433
                           1024 * scale)
434

    
435
  def testInvalidInput(self):
436
    for sep in ('-', '_', ',', 'a'):
437
      for suffix, _ in TestParseUnit.SCALES:
438
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
439

    
440
    for suffix, _ in TestParseUnit.SCALES:
441
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
442

    
443

    
444
class TestSshKeys(testutils.GanetiTestCase):
445
  """Test case for the AddAuthorizedKey function"""
446

    
447
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
448
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
449
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
450

    
451
  def setUp(self):
452
    testutils.GanetiTestCase.setUp(self)
453
    self.tmpname = self._CreateTempFile()
454
    handle = open(self.tmpname, 'w')
455
    try:
456
      handle.write("%s\n" % TestSshKeys.KEY_A)
457
      handle.write("%s\n" % TestSshKeys.KEY_B)
458
    finally:
459
      handle.close()
460

    
461
  def testAddingNewKey(self):
462
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
463

    
464
    self.assertFileContent(self.tmpname,
465
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
466
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
467
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
468
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
469

    
470
  def testAddingAlmostButNotCompletelyTheSameKey(self):
471
    AddAuthorizedKey(self.tmpname,
472
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
473

    
474
    self.assertFileContent(self.tmpname,
475
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
476
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
477
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
478
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
479

    
480
  def testAddingExistingKeyWithSomeMoreSpaces(self):
481
    AddAuthorizedKey(self.tmpname,
482
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
483

    
484
    self.assertFileContent(self.tmpname,
485
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
486
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
487
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
488

    
489
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
490
    RemoveAuthorizedKey(self.tmpname,
491
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
492

    
493
    self.assertFileContent(self.tmpname,
494
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
495
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
496

    
497
  def testRemovingNonExistingKey(self):
498
    RemoveAuthorizedKey(self.tmpname,
499
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
500

    
501
    self.assertFileContent(self.tmpname,
502
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
503
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
504
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
505

    
506

    
507
class TestEtcHosts(testutils.GanetiTestCase):
508
  """Test functions modifying /etc/hosts"""
509

    
510
  def setUp(self):
511
    testutils.GanetiTestCase.setUp(self)
512
    self.tmpname = self._CreateTempFile()
513
    handle = open(self.tmpname, 'w')
514
    try:
515
      handle.write('# This is a test file for /etc/hosts\n')
516
      handle.write('127.0.0.1\tlocalhost\n')
517
      handle.write('192.168.1.1 router gw\n')
518
    finally:
519
      handle.close()
520

    
521
  def testSettingNewIp(self):
522
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
523

    
524
    self.assertFileContent(self.tmpname,
525
      "# This is a test file for /etc/hosts\n"
526
      "127.0.0.1\tlocalhost\n"
527
      "192.168.1.1 router gw\n"
528
      "1.2.3.4\tmyhost.domain.tld myhost\n")
529

    
530
  def testSettingExistingIp(self):
531
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
532
                     ['myhost'])
533

    
534
    self.assertFileContent(self.tmpname,
535
      "# This is a test file for /etc/hosts\n"
536
      "127.0.0.1\tlocalhost\n"
537
      "192.168.1.1\tmyhost.domain.tld myhost\n")
538

    
539
  def testSettingDuplicateName(self):
540
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
541

    
542
    self.assertFileContent(self.tmpname,
543
      "# This is a test file for /etc/hosts\n"
544
      "127.0.0.1\tlocalhost\n"
545
      "192.168.1.1 router gw\n"
546
      "1.2.3.4\tmyhost\n")
547

    
548
  def testRemovingExistingHost(self):
549
    RemoveEtcHostsEntry(self.tmpname, 'router')
550

    
551
    self.assertFileContent(self.tmpname,
552
      "# This is a test file for /etc/hosts\n"
553
      "127.0.0.1\tlocalhost\n"
554
      "192.168.1.1 gw\n")
555

    
556
  def testRemovingSingleExistingHost(self):
557
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
558

    
559
    self.assertFileContent(self.tmpname,
560
      "# This is a test file for /etc/hosts\n"
561
      "192.168.1.1 router gw\n")
562

    
563
  def testRemovingNonExistingHost(self):
564
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
565

    
566
    self.assertFileContent(self.tmpname,
567
      "# This is a test file for /etc/hosts\n"
568
      "127.0.0.1\tlocalhost\n"
569
      "192.168.1.1 router gw\n")
570

    
571
  def testRemovingAlias(self):
572
    RemoveEtcHostsEntry(self.tmpname, 'gw')
573

    
574
    self.assertFileContent(self.tmpname,
575
      "# This is a test file for /etc/hosts\n"
576
      "127.0.0.1\tlocalhost\n"
577
      "192.168.1.1 router\n")
578

    
579

    
580
class TestShellQuoting(unittest.TestCase):
581
  """Test case for shell quoting functions"""
582

    
583
  def testShellQuote(self):
584
    self.assertEqual(ShellQuote('abc'), "abc")
585
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
586
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
587
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
588
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
589

    
590
  def testShellQuoteArgs(self):
591
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
592
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
593
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
594

    
595

    
596
class TestTcpPing(unittest.TestCase):
597
  """Testcase for TCP version of ping - against listen(2)ing port"""
598

    
599
  def setUp(self):
600
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
601
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
602
    self.listenerport = self.listener.getsockname()[1]
603
    self.listener.listen(1)
604

    
605
  def tearDown(self):
606
    self.listener.shutdown(socket.SHUT_RDWR)
607
    del self.listener
608
    del self.listenerport
609

    
610
  def testTcpPingToLocalHostAccept(self):
611
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
612
                         self.listenerport,
613
                         timeout=10,
614
                         live_port_needed=True,
615
                         source=constants.LOCALHOST_IP_ADDRESS,
616
                         ),
617
                 "failed to connect to test listener")
618

    
619
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
620
                         self.listenerport,
621
                         timeout=10,
622
                         live_port_needed=True,
623
                         ),
624
                 "failed to connect to test listener (no source)")
625

    
626

    
627
class TestTcpPingDeaf(unittest.TestCase):
628
  """Testcase for TCP version of ping - against non listen(2)ing port"""
629

    
630
  def setUp(self):
631
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
632
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
633
    self.deaflistenerport = self.deaflistener.getsockname()[1]
634

    
635
  def tearDown(self):
636
    del self.deaflistener
637
    del self.deaflistenerport
638

    
639
  def testTcpPingToLocalHostAcceptDeaf(self):
640
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
641
                        self.deaflistenerport,
642
                        timeout=constants.TCP_PING_TIMEOUT,
643
                        live_port_needed=True,
644
                        source=constants.LOCALHOST_IP_ADDRESS,
645
                        ), # need successful connect(2)
646
                "successfully connected to deaf listener")
647

    
648
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
649
                        self.deaflistenerport,
650
                        timeout=constants.TCP_PING_TIMEOUT,
651
                        live_port_needed=True,
652
                        ), # need successful connect(2)
653
                "successfully connected to deaf listener (no source addr)")
654

    
655
  def testTcpPingToLocalHostNoAccept(self):
656
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
657
                         self.deaflistenerport,
658
                         timeout=constants.TCP_PING_TIMEOUT,
659
                         live_port_needed=False,
660
                         source=constants.LOCALHOST_IP_ADDRESS,
661
                         ), # ECONNREFUSED is OK
662
                 "failed to ping alive host on deaf port")
663

    
664
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
665
                         self.deaflistenerport,
666
                         timeout=constants.TCP_PING_TIMEOUT,
667
                         live_port_needed=False,
668
                         ), # ECONNREFUSED is OK
669
                 "failed to ping alive host on deaf port (no source addr)")
670

    
671

    
672
class TestOwnIpAddress(unittest.TestCase):
673
  """Testcase for OwnIpAddress"""
674

    
675
  def testOwnLoopback(self):
676
    """check having the loopback ip"""
677
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
678
                    "Should own the loopback address")
679

    
680
  def testNowOwnAddress(self):
681
    """check that I don't own an address"""
682

    
683
    # network 192.0.2.0/24 is reserved for test/documentation as per
684
    # rfc 3330, so we *should* not have an address of this range... if
685
    # this fails, we should extend the test to multiple addresses
686
    DST_IP = "192.0.2.1"
687
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
688

    
689

    
690
class TestListVisibleFiles(unittest.TestCase):
691
  """Test case for ListVisibleFiles"""
692

    
693
  def setUp(self):
694
    self.path = tempfile.mkdtemp()
695

    
696
  def tearDown(self):
697
    shutil.rmtree(self.path)
698

    
699
  def _test(self, files, expected):
700
    # Sort a copy
701
    expected = expected[:]
702
    expected.sort()
703

    
704
    for name in files:
705
      f = open(os.path.join(self.path, name), 'w')
706
      try:
707
        f.write("Test\n")
708
      finally:
709
        f.close()
710

    
711
    found = ListVisibleFiles(self.path)
712
    found.sort()
713

    
714
    self.assertEqual(found, expected)
715

    
716
  def testAllVisible(self):
717
    files = ["a", "b", "c"]
718
    expected = files
719
    self._test(files, expected)
720

    
721
  def testNoneVisible(self):
722
    files = [".a", ".b", ".c"]
723
    expected = []
724
    self._test(files, expected)
725

    
726
  def testSomeVisible(self):
727
    files = ["a", "b", ".c"]
728
    expected = ["a", "b"]
729
    self._test(files, expected)
730

    
731

    
732
class TestNewUUID(unittest.TestCase):
733
  """Test case for NewUUID"""
734

    
735
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
736
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
737

    
738
  def runTest(self):
739
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
740

    
741

    
742
class TestUniqueSequence(unittest.TestCase):
743
  """Test case for UniqueSequence"""
744

    
745
  def _test(self, input, expected):
746
    self.assertEqual(utils.UniqueSequence(input), expected)
747

    
748
  def runTest(self):
749
    # Ordered input
750
    self._test([1, 2, 3], [1, 2, 3])
751
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
752
    self._test([1, 2, 2, 3], [1, 2, 3])
753
    self._test([1, 2, 3, 3], [1, 2, 3])
754

    
755
    # Unordered input
756
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
757
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
758

    
759
    # Strings
760
    self._test(["a", "a"], ["a"])
761
    self._test(["a", "b"], ["a", "b"])
762
    self._test(["a", "b", "a"], ["a", "b"])
763

    
764

    
765
class TestFirstFree(unittest.TestCase):
766
  """Test case for the FirstFree function"""
767

    
768
  def test(self):
769
    """Test FirstFree"""
770
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
771
    self.failUnlessEqual(FirstFree([]), None)
772
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
773
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
774
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
775

    
776

    
777
class TestTailFile(testutils.GanetiTestCase):
778
  """Test case for the TailFile function"""
779

    
780
  def testEmpty(self):
781
    fname = self._CreateTempFile()
782
    self.failUnlessEqual(TailFile(fname), [])
783
    self.failUnlessEqual(TailFile(fname, lines=25), [])
784

    
785
  def testAllLines(self):
786
    data = ["test %d" % i for i in range(30)]
787
    for i in range(30):
788
      fname = self._CreateTempFile()
789
      fd = open(fname, "w")
790
      fd.write("\n".join(data[:i]))
791
      if i > 0:
792
        fd.write("\n")
793
      fd.close()
794
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
795

    
796
  def testPartialLines(self):
797
    data = ["test %d" % i for i in range(30)]
798
    fname = self._CreateTempFile()
799
    fd = open(fname, "w")
800
    fd.write("\n".join(data))
801
    fd.write("\n")
802
    fd.close()
803
    for i in range(1, 30):
804
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
805

    
806
  def testBigFile(self):
807
    data = ["test %d" % i for i in range(30)]
808
    fname = self._CreateTempFile()
809
    fd = open(fname, "w")
810
    fd.write("X" * 1048576)
811
    fd.write("\n")
812
    fd.write("\n".join(data))
813
    fd.write("\n")
814
    fd.close()
815
    for i in range(1, 30):
816
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
817

    
818

    
819
class TestFileLock(unittest.TestCase):
820
  """Test case for the FileLock class"""
821

    
822
  def setUp(self):
823
    self.tmpfile = tempfile.NamedTemporaryFile()
824
    self.lock = utils.FileLock(self.tmpfile.name)
825

    
826
  def testSharedNonblocking(self):
827
    self.lock.Shared(blocking=False)
828
    self.lock.Close()
829

    
830
  def testExclusiveNonblocking(self):
831
    self.lock.Exclusive(blocking=False)
832
    self.lock.Close()
833

    
834
  def testUnlockNonblocking(self):
835
    self.lock.Unlock(blocking=False)
836
    self.lock.Close()
837

    
838
  def testSharedBlocking(self):
839
    self.lock.Shared(blocking=True)
840
    self.lock.Close()
841

    
842
  def testExclusiveBlocking(self):
843
    self.lock.Exclusive(blocking=True)
844
    self.lock.Close()
845

    
846
  def testUnlockBlocking(self):
847
    self.lock.Unlock(blocking=True)
848
    self.lock.Close()
849

    
850
  def testSharedExclusiveUnlock(self):
851
    self.lock.Shared(blocking=False)
852
    self.lock.Exclusive(blocking=False)
853
    self.lock.Unlock(blocking=False)
854
    self.lock.Close()
855

    
856
  def testExclusiveSharedUnlock(self):
857
    self.lock.Exclusive(blocking=False)
858
    self.lock.Shared(blocking=False)
859
    self.lock.Unlock(blocking=False)
860
    self.lock.Close()
861

    
862
  def testCloseShared(self):
863
    self.lock.Close()
864
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
865

    
866
  def testCloseExclusive(self):
867
    self.lock.Close()
868
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
869

    
870
  def testCloseUnlock(self):
871
    self.lock.Close()
872
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
873

    
874

    
875
class TestTimeFunctions(unittest.TestCase):
876
  """Test case for time functions"""
877

    
878
  def runTest(self):
879
    self.assertEqual(utils.SplitTime(1), (1, 0))
880
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
881
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
882
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
883
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
884
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
885
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
886
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
887

    
888
    self.assertRaises(AssertionError, utils.SplitTime, -1)
889

    
890
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
891
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
892
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
893

    
894
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
895
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
896

    
897
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
898
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
899
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
900
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
901
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
902

    
903

    
904
class FieldSetTestCase(unittest.TestCase):
905
  """Test case for FieldSets"""
906

    
907
  def testSimpleMatch(self):
908
    f = utils.FieldSet("a", "b", "c", "def")
909
    self.failUnless(f.Matches("a"))
910
    self.failIf(f.Matches("d"), "Substring matched")
911
    self.failIf(f.Matches("defghi"), "Prefix string matched")
912
    self.failIf(f.NonMatching(["b", "c"]))
913
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
914
    self.failUnless(f.NonMatching(["a", "d"]))
915

    
916
  def testRegexMatch(self):
917
    f = utils.FieldSet("a", "b([0-9]+)", "c")
918
    self.failUnless(f.Matches("b1"))
919
    self.failUnless(f.Matches("b99"))
920
    self.failIf(f.Matches("b/1"))
921
    self.failIf(f.NonMatching(["b12", "c"]))
922
    self.failUnless(f.NonMatching(["a", "1"]))
923

    
924

    
925
if __name__ == '__main__':
926
  unittest.main()