Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 31e22135

History | View | Annotate | Download (32.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti import errors
42
from ganeti.utils import IsProcessAlive, RunCmd, \
43
     RemoveFile, MatchNameComponent, FormatUnit, \
44
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
45
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
46
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
47
     TailFile, ForceDictType, IsNormAbsPath
48

    
49
from ganeti.errors import LockError, UnitParseError, GenericError, \
50
     ProgrammerError
51

    
52

    
53
class TestIsProcessAlive(unittest.TestCase):
54
  """Testing case for IsProcessAlive"""
55

    
56
  def testExists(self):
57
    mypid = os.getpid()
58
    self.assert_(IsProcessAlive(mypid),
59
                 "can't find myself running")
60

    
61
  def testNotExisting(self):
62
    pid_non_existing = os.fork()
63
    if pid_non_existing == 0:
64
      os._exit(0)
65
    elif pid_non_existing < 0:
66
      raise SystemError("can't fork")
67
    os.waitpid(pid_non_existing, 0)
68
    self.assert_(not IsProcessAlive(pid_non_existing),
69
                 "nonexisting process detected")
70

    
71

    
72
class TestPidFileFunctions(unittest.TestCase):
73
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
74

    
75
  def setUp(self):
76
    self.dir = tempfile.mkdtemp()
77
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
78
    utils.DaemonPidFileName = self.f_dpn
79

    
80
  def testPidFileFunctions(self):
81
    pid_file = self.f_dpn('test')
82
    utils.WritePidFile('test')
83
    self.failUnless(os.path.exists(pid_file),
84
                    "PID file should have been created")
85
    read_pid = utils.ReadPidFile(pid_file)
86
    self.failUnlessEqual(read_pid, os.getpid())
87
    self.failUnless(utils.IsProcessAlive(read_pid))
88
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
89
    utils.RemovePidFile('test')
90
    self.failIf(os.path.exists(pid_file),
91
                "PID file should not exist anymore")
92
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
93
                         "ReadPidFile should return 0 for missing pid file")
94
    fh = open(pid_file, "w")
95
    fh.write("blah\n")
96
    fh.close()
97
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
98
                         "ReadPidFile should return 0 for invalid pid file")
99
    utils.RemovePidFile('test')
100
    self.failIf(os.path.exists(pid_file),
101
                "PID file should not exist anymore")
102

    
103
  def testKill(self):
104
    pid_file = self.f_dpn('child')
105
    r_fd, w_fd = os.pipe()
106
    new_pid = os.fork()
107
    if new_pid == 0: #child
108
      utils.WritePidFile('child')
109
      os.write(w_fd, 'a')
110
      signal.pause()
111
      os._exit(0)
112
      return
113
    # else we are in the parent
114
    # wait until the child has written the pid file
115
    os.read(r_fd, 1)
116
    read_pid = utils.ReadPidFile(pid_file)
117
    self.failUnlessEqual(read_pid, new_pid)
118
    self.failUnless(utils.IsProcessAlive(new_pid))
119
    utils.KillProcess(new_pid, waitpid=True)
120
    self.failIf(utils.IsProcessAlive(new_pid))
121
    utils.RemovePidFile('child')
122
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
123

    
124
  def tearDown(self):
125
    for name in os.listdir(self.dir):
126
      os.unlink(os.path.join(self.dir, name))
127
    os.rmdir(self.dir)
128

    
129

    
130
class TestRunCmd(testutils.GanetiTestCase):
131
  """Testing case for the RunCmd function"""
132

    
133
  def setUp(self):
134
    testutils.GanetiTestCase.setUp(self)
135
    self.magic = time.ctime() + " ganeti test"
136
    self.fname = self._CreateTempFile()
137

    
138
  def testOk(self):
139
    """Test successful exit code"""
140
    result = RunCmd("/bin/sh -c 'exit 0'")
141
    self.assertEqual(result.exit_code, 0)
142
    self.assertEqual(result.output, "")
143

    
144
  def testFail(self):
145
    """Test fail exit code"""
146
    result = RunCmd("/bin/sh -c 'exit 1'")
147
    self.assertEqual(result.exit_code, 1)
148
    self.assertEqual(result.output, "")
149

    
150
  def testStdout(self):
151
    """Test standard output"""
152
    cmd = 'echo -n "%s"' % self.magic
153
    result = RunCmd("/bin/sh -c '%s'" % cmd)
154
    self.assertEqual(result.stdout, self.magic)
155
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
156
    self.assertEqual(result.output, "")
157
    self.assertFileContent(self.fname, self.magic)
158

    
159
  def testStderr(self):
160
    """Test standard error"""
161
    cmd = 'echo -n "%s"' % self.magic
162
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
163
    self.assertEqual(result.stderr, self.magic)
164
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
165
    self.assertEqual(result.output, "")
166
    self.assertFileContent(self.fname, self.magic)
167

    
168
  def testCombined(self):
169
    """Test combined output"""
170
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
171
    expected = "A" + self.magic + "B" + self.magic
172
    result = RunCmd("/bin/sh -c '%s'" % cmd)
173
    self.assertEqual(result.output, expected)
174
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
175
    self.assertEqual(result.output, "")
176
    self.assertFileContent(self.fname, expected)
177

    
178
  def testSignal(self):
179
    """Test signal"""
180
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
181
    self.assertEqual(result.signal, 15)
182
    self.assertEqual(result.output, "")
183

    
184
  def testListRun(self):
185
    """Test list runs"""
186
    result = RunCmd(["true"])
187
    self.assertEqual(result.signal, None)
188
    self.assertEqual(result.exit_code, 0)
189
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
190
    self.assertEqual(result.signal, None)
191
    self.assertEqual(result.exit_code, 1)
192
    result = RunCmd(["echo", "-n", self.magic])
193
    self.assertEqual(result.signal, None)
194
    self.assertEqual(result.exit_code, 0)
195
    self.assertEqual(result.stdout, self.magic)
196

    
197
  def testFileEmptyOutput(self):
198
    """Test file output"""
199
    result = RunCmd(["true"], output=self.fname)
200
    self.assertEqual(result.signal, None)
201
    self.assertEqual(result.exit_code, 0)
202
    self.assertFileContent(self.fname, "")
203

    
204
  def testLang(self):
205
    """Test locale environment"""
206
    old_env = os.environ.copy()
207
    try:
208
      os.environ["LANG"] = "en_US.UTF-8"
209
      os.environ["LC_ALL"] = "en_US.UTF-8"
210
      result = RunCmd(["locale"])
211
      for line in result.output.splitlines():
212
        key, value = line.split("=", 1)
213
        # Ignore these variables, they're overridden by LC_ALL
214
        if key == "LANG" or key == "LANGUAGE":
215
          continue
216
        self.failIf(value and value != "C" and value != '"C"',
217
            "Variable %s is set to the invalid value '%s'" % (key, value))
218
    finally:
219
      os.environ = old_env
220

    
221
  def testDefaultCwd(self):
222
    """Test default working directory"""
223
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
224

    
225
  def testCwd(self):
226
    """Test default working directory"""
227
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
228
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
229
    cwd = os.getcwd()
230
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
231

    
232

    
233
class TestRemoveFile(unittest.TestCase):
234
  """Test case for the RemoveFile function"""
235

    
236
  def setUp(self):
237
    """Create a temp dir and file for each case"""
238
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
239
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
240
    os.close(fd)
241

    
242
  def tearDown(self):
243
    if os.path.exists(self.tmpfile):
244
      os.unlink(self.tmpfile)
245
    os.rmdir(self.tmpdir)
246

    
247

    
248
  def testIgnoreDirs(self):
249
    """Test that RemoveFile() ignores directories"""
250
    self.assertEqual(None, RemoveFile(self.tmpdir))
251

    
252

    
253
  def testIgnoreNotExisting(self):
254
    """Test that RemoveFile() ignores non-existing files"""
255
    RemoveFile(self.tmpfile)
256
    RemoveFile(self.tmpfile)
257

    
258

    
259
  def testRemoveFile(self):
260
    """Test that RemoveFile does remove a file"""
261
    RemoveFile(self.tmpfile)
262
    if os.path.exists(self.tmpfile):
263
      self.fail("File '%s' not removed" % self.tmpfile)
264

    
265

    
266
  def testRemoveSymlink(self):
267
    """Test that RemoveFile does remove symlinks"""
268
    symlink = self.tmpdir + "/symlink"
269
    os.symlink("no-such-file", symlink)
270
    RemoveFile(symlink)
271
    if os.path.exists(symlink):
272
      self.fail("File '%s' not removed" % symlink)
273
    os.symlink(self.tmpfile, symlink)
274
    RemoveFile(symlink)
275
    if os.path.exists(symlink):
276
      self.fail("File '%s' not removed" % symlink)
277

    
278

    
279
class TestRename(unittest.TestCase):
280
  """Test case for RenameFile"""
281

    
282
  def setUp(self):
283
    """Create a temporary directory"""
284
    self.tmpdir = tempfile.mkdtemp()
285
    self.tmpfile = os.path.join(self.tmpdir, "test1")
286

    
287
    # Touch the file
288
    open(self.tmpfile, "w").close()
289

    
290
  def tearDown(self):
291
    """Remove temporary directory"""
292
    shutil.rmtree(self.tmpdir)
293

    
294
  def testSimpleRename1(self):
295
    """Simple rename 1"""
296
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
297

    
298
  def testSimpleRename2(self):
299
    """Simple rename 2"""
300
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
301
                     mkdir=True)
302

    
303
  def testRenameMkdir(self):
304
    """Rename with mkdir"""
305
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
306
                     mkdir=True)
307

    
308

    
309
class TestMatchNameComponent(unittest.TestCase):
310
  """Test case for the MatchNameComponent function"""
311

    
312
  def testEmptyList(self):
313
    """Test that there is no match against an empty list"""
314

    
315
    self.failUnlessEqual(MatchNameComponent("", []), None)
316
    self.failUnlessEqual(MatchNameComponent("test", []), None)
317

    
318
  def testSingleMatch(self):
319
    """Test that a single match is performed correctly"""
320
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
321
    for key in "test2", "test2.example", "test2.example.com":
322
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
323

    
324
  def testMultipleMatches(self):
325
    """Test that a multiple match is returned as None"""
326
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
327
    for key in "test1", "test1.example":
328
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
329

    
330

    
331
class TestFormatUnit(unittest.TestCase):
332
  """Test case for the FormatUnit function"""
333

    
334
  def testMiB(self):
335
    self.assertEqual(FormatUnit(1, 'h'), '1M')
336
    self.assertEqual(FormatUnit(100, 'h'), '100M')
337
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
338

    
339
    self.assertEqual(FormatUnit(1, 'm'), '1')
340
    self.assertEqual(FormatUnit(100, 'm'), '100')
341
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
342

    
343
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
344
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
345
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
346
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
347

    
348
  def testGiB(self):
349
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
350
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
351
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
352
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
353

    
354
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
355
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
356
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
357
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
358

    
359
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
360
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
361
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
362

    
363
  def testTiB(self):
364
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
365
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
366
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
367

    
368
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
369
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
370
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
371

    
372
class TestParseUnit(unittest.TestCase):
373
  """Test case for the ParseUnit function"""
374

    
375
  SCALES = (('', 1),
376
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
377
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
378
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
379

    
380
  def testRounding(self):
381
    self.assertEqual(ParseUnit('0'), 0)
382
    self.assertEqual(ParseUnit('1'), 4)
383
    self.assertEqual(ParseUnit('2'), 4)
384
    self.assertEqual(ParseUnit('3'), 4)
385

    
386
    self.assertEqual(ParseUnit('124'), 124)
387
    self.assertEqual(ParseUnit('125'), 128)
388
    self.assertEqual(ParseUnit('126'), 128)
389
    self.assertEqual(ParseUnit('127'), 128)
390
    self.assertEqual(ParseUnit('128'), 128)
391
    self.assertEqual(ParseUnit('129'), 132)
392
    self.assertEqual(ParseUnit('130'), 132)
393

    
394
  def testFloating(self):
395
    self.assertEqual(ParseUnit('0'), 0)
396
    self.assertEqual(ParseUnit('0.5'), 4)
397
    self.assertEqual(ParseUnit('1.75'), 4)
398
    self.assertEqual(ParseUnit('1.99'), 4)
399
    self.assertEqual(ParseUnit('2.00'), 4)
400
    self.assertEqual(ParseUnit('2.01'), 4)
401
    self.assertEqual(ParseUnit('3.99'), 4)
402
    self.assertEqual(ParseUnit('4.00'), 4)
403
    self.assertEqual(ParseUnit('4.01'), 8)
404
    self.assertEqual(ParseUnit('1.5G'), 1536)
405
    self.assertEqual(ParseUnit('1.8G'), 1844)
406
    self.assertEqual(ParseUnit('8.28T'), 8682212)
407

    
408
  def testSuffixes(self):
409
    for sep in ('', ' ', '   ', "\t", "\t "):
410
      for suffix, scale in TestParseUnit.SCALES:
411
        for func in (lambda x: x, str.lower, str.upper):
412
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
413
                           1024 * scale)
414

    
415
  def testInvalidInput(self):
416
    for sep in ('-', '_', ',', 'a'):
417
      for suffix, _ in TestParseUnit.SCALES:
418
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
419

    
420
    for suffix, _ in TestParseUnit.SCALES:
421
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
422

    
423

    
424
class TestSshKeys(testutils.GanetiTestCase):
425
  """Test case for the AddAuthorizedKey function"""
426

    
427
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
428
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
429
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
430

    
431
  def setUp(self):
432
    testutils.GanetiTestCase.setUp(self)
433
    self.tmpname = self._CreateTempFile()
434
    handle = open(self.tmpname, 'w')
435
    try:
436
      handle.write("%s\n" % TestSshKeys.KEY_A)
437
      handle.write("%s\n" % TestSshKeys.KEY_B)
438
    finally:
439
      handle.close()
440

    
441
  def testAddingNewKey(self):
442
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
443

    
444
    self.assertFileContent(self.tmpname,
445
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
446
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
447
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
448
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
449

    
450
  def testAddingAlmostButNotCompletelyTheSameKey(self):
451
    AddAuthorizedKey(self.tmpname,
452
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
453

    
454
    self.assertFileContent(self.tmpname,
455
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
456
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
457
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
458
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
459

    
460
  def testAddingExistingKeyWithSomeMoreSpaces(self):
461
    AddAuthorizedKey(self.tmpname,
462
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
463

    
464
    self.assertFileContent(self.tmpname,
465
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
466
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
467
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
468

    
469
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
470
    RemoveAuthorizedKey(self.tmpname,
471
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
472

    
473
    self.assertFileContent(self.tmpname,
474
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
475
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
476

    
477
  def testRemovingNonExistingKey(self):
478
    RemoveAuthorizedKey(self.tmpname,
479
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
480

    
481
    self.assertFileContent(self.tmpname,
482
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
483
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
484
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
485

    
486

    
487
class TestEtcHosts(testutils.GanetiTestCase):
488
  """Test functions modifying /etc/hosts"""
489

    
490
  def setUp(self):
491
    testutils.GanetiTestCase.setUp(self)
492
    self.tmpname = self._CreateTempFile()
493
    handle = open(self.tmpname, 'w')
494
    try:
495
      handle.write('# This is a test file for /etc/hosts\n')
496
      handle.write('127.0.0.1\tlocalhost\n')
497
      handle.write('192.168.1.1 router gw\n')
498
    finally:
499
      handle.close()
500

    
501
  def testSettingNewIp(self):
502
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
503

    
504
    self.assertFileContent(self.tmpname,
505
      "# This is a test file for /etc/hosts\n"
506
      "127.0.0.1\tlocalhost\n"
507
      "192.168.1.1 router gw\n"
508
      "1.2.3.4\tmyhost.domain.tld myhost\n")
509
    self.assertFileMode(self.tmpname, 0644)
510

    
511
  def testSettingExistingIp(self):
512
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
513
                     ['myhost'])
514

    
515
    self.assertFileContent(self.tmpname,
516
      "# This is a test file for /etc/hosts\n"
517
      "127.0.0.1\tlocalhost\n"
518
      "192.168.1.1\tmyhost.domain.tld myhost\n")
519
    self.assertFileMode(self.tmpname, 0644)
520

    
521
  def testSettingDuplicateName(self):
522
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
523

    
524
    self.assertFileContent(self.tmpname,
525
      "# This is a test file for /etc/hosts\n"
526
      "127.0.0.1\tlocalhost\n"
527
      "192.168.1.1 router gw\n"
528
      "1.2.3.4\tmyhost\n")
529
    self.assertFileMode(self.tmpname, 0644)
530

    
531
  def testRemovingExistingHost(self):
532
    RemoveEtcHostsEntry(self.tmpname, 'router')
533

    
534
    self.assertFileContent(self.tmpname,
535
      "# This is a test file for /etc/hosts\n"
536
      "127.0.0.1\tlocalhost\n"
537
      "192.168.1.1 gw\n")
538
    self.assertFileMode(self.tmpname, 0644)
539

    
540
  def testRemovingSingleExistingHost(self):
541
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
542

    
543
    self.assertFileContent(self.tmpname,
544
      "# This is a test file for /etc/hosts\n"
545
      "192.168.1.1 router gw\n")
546
    self.assertFileMode(self.tmpname, 0644)
547

    
548
  def testRemovingNonExistingHost(self):
549
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
550

    
551
    self.assertFileContent(self.tmpname,
552
      "# This is a test file for /etc/hosts\n"
553
      "127.0.0.1\tlocalhost\n"
554
      "192.168.1.1 router gw\n")
555
    self.assertFileMode(self.tmpname, 0644)
556

    
557
  def testRemovingAlias(self):
558
    RemoveEtcHostsEntry(self.tmpname, 'gw')
559

    
560
    self.assertFileContent(self.tmpname,
561
      "# This is a test file for /etc/hosts\n"
562
      "127.0.0.1\tlocalhost\n"
563
      "192.168.1.1 router\n")
564
    self.assertFileMode(self.tmpname, 0644)
565

    
566

    
567
class TestShellQuoting(unittest.TestCase):
568
  """Test case for shell quoting functions"""
569

    
570
  def testShellQuote(self):
571
    self.assertEqual(ShellQuote('abc'), "abc")
572
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
573
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
574
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
575
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
576

    
577
  def testShellQuoteArgs(self):
578
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
579
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
580
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
581

    
582

    
583
class TestTcpPing(unittest.TestCase):
584
  """Testcase for TCP version of ping - against listen(2)ing port"""
585

    
586
  def setUp(self):
587
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
588
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
589
    self.listenerport = self.listener.getsockname()[1]
590
    self.listener.listen(1)
591

    
592
  def tearDown(self):
593
    self.listener.shutdown(socket.SHUT_RDWR)
594
    del self.listener
595
    del self.listenerport
596

    
597
  def testTcpPingToLocalHostAccept(self):
598
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
599
                         self.listenerport,
600
                         timeout=10,
601
                         live_port_needed=True,
602
                         source=constants.LOCALHOST_IP_ADDRESS,
603
                         ),
604
                 "failed to connect to test listener")
605

    
606
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
607
                         self.listenerport,
608
                         timeout=10,
609
                         live_port_needed=True,
610
                         ),
611
                 "failed to connect to test listener (no source)")
612

    
613

    
614
class TestTcpPingDeaf(unittest.TestCase):
615
  """Testcase for TCP version of ping - against non listen(2)ing port"""
616

    
617
  def setUp(self):
618
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
619
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
620
    self.deaflistenerport = self.deaflistener.getsockname()[1]
621

    
622
  def tearDown(self):
623
    del self.deaflistener
624
    del self.deaflistenerport
625

    
626
  def testTcpPingToLocalHostAcceptDeaf(self):
627
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
628
                        self.deaflistenerport,
629
                        timeout=constants.TCP_PING_TIMEOUT,
630
                        live_port_needed=True,
631
                        source=constants.LOCALHOST_IP_ADDRESS,
632
                        ), # need successful connect(2)
633
                "successfully connected to deaf listener")
634

    
635
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
636
                        self.deaflistenerport,
637
                        timeout=constants.TCP_PING_TIMEOUT,
638
                        live_port_needed=True,
639
                        ), # need successful connect(2)
640
                "successfully connected to deaf listener (no source addr)")
641

    
642
  def testTcpPingToLocalHostNoAccept(self):
643
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
644
                         self.deaflistenerport,
645
                         timeout=constants.TCP_PING_TIMEOUT,
646
                         live_port_needed=False,
647
                         source=constants.LOCALHOST_IP_ADDRESS,
648
                         ), # ECONNREFUSED is OK
649
                 "failed to ping alive host on deaf port")
650

    
651
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
652
                         self.deaflistenerport,
653
                         timeout=constants.TCP_PING_TIMEOUT,
654
                         live_port_needed=False,
655
                         ), # ECONNREFUSED is OK
656
                 "failed to ping alive host on deaf port (no source addr)")
657

    
658

    
659
class TestOwnIpAddress(unittest.TestCase):
660
  """Testcase for OwnIpAddress"""
661

    
662
  def testOwnLoopback(self):
663
    """check having the loopback ip"""
664
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
665
                    "Should own the loopback address")
666

    
667
  def testNowOwnAddress(self):
668
    """check that I don't own an address"""
669

    
670
    # network 192.0.2.0/24 is reserved for test/documentation as per
671
    # rfc 3330, so we *should* not have an address of this range... if
672
    # this fails, we should extend the test to multiple addresses
673
    DST_IP = "192.0.2.1"
674
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
675

    
676

    
677
class TestListVisibleFiles(unittest.TestCase):
678
  """Test case for ListVisibleFiles"""
679

    
680
  def setUp(self):
681
    self.path = tempfile.mkdtemp()
682

    
683
  def tearDown(self):
684
    shutil.rmtree(self.path)
685

    
686
  def _test(self, files, expected):
687
    # Sort a copy
688
    expected = expected[:]
689
    expected.sort()
690

    
691
    for name in files:
692
      f = open(os.path.join(self.path, name), 'w')
693
      try:
694
        f.write("Test\n")
695
      finally:
696
        f.close()
697

    
698
    found = ListVisibleFiles(self.path)
699
    found.sort()
700

    
701
    self.assertEqual(found, expected)
702

    
703
  def testAllVisible(self):
704
    files = ["a", "b", "c"]
705
    expected = files
706
    self._test(files, expected)
707

    
708
  def testNoneVisible(self):
709
    files = [".a", ".b", ".c"]
710
    expected = []
711
    self._test(files, expected)
712

    
713
  def testSomeVisible(self):
714
    files = ["a", "b", ".c"]
715
    expected = ["a", "b"]
716
    self._test(files, expected)
717

    
718

    
719
class TestNewUUID(unittest.TestCase):
720
  """Test case for NewUUID"""
721

    
722
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
723
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
724

    
725
  def runTest(self):
726
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
727

    
728

    
729
class TestUniqueSequence(unittest.TestCase):
730
  """Test case for UniqueSequence"""
731

    
732
  def _test(self, input, expected):
733
    self.assertEqual(utils.UniqueSequence(input), expected)
734

    
735
  def runTest(self):
736
    # Ordered input
737
    self._test([1, 2, 3], [1, 2, 3])
738
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
739
    self._test([1, 2, 2, 3], [1, 2, 3])
740
    self._test([1, 2, 3, 3], [1, 2, 3])
741

    
742
    # Unordered input
743
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
744
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
745

    
746
    # Strings
747
    self._test(["a", "a"], ["a"])
748
    self._test(["a", "b"], ["a", "b"])
749
    self._test(["a", "b", "a"], ["a", "b"])
750

    
751

    
752
class TestFirstFree(unittest.TestCase):
753
  """Test case for the FirstFree function"""
754

    
755
  def test(self):
756
    """Test FirstFree"""
757
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
758
    self.failUnlessEqual(FirstFree([]), None)
759
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
760
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
761
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
762

    
763

    
764
class TestTailFile(testutils.GanetiTestCase):
765
  """Test case for the TailFile function"""
766

    
767
  def testEmpty(self):
768
    fname = self._CreateTempFile()
769
    self.failUnlessEqual(TailFile(fname), [])
770
    self.failUnlessEqual(TailFile(fname, lines=25), [])
771

    
772
  def testAllLines(self):
773
    data = ["test %d" % i for i in range(30)]
774
    for i in range(30):
775
      fname = self._CreateTempFile()
776
      fd = open(fname, "w")
777
      fd.write("\n".join(data[:i]))
778
      if i > 0:
779
        fd.write("\n")
780
      fd.close()
781
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
782

    
783
  def testPartialLines(self):
784
    data = ["test %d" % i for i in range(30)]
785
    fname = self._CreateTempFile()
786
    fd = open(fname, "w")
787
    fd.write("\n".join(data))
788
    fd.write("\n")
789
    fd.close()
790
    for i in range(1, 30):
791
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
792

    
793
  def testBigFile(self):
794
    data = ["test %d" % i for i in range(30)]
795
    fname = self._CreateTempFile()
796
    fd = open(fname, "w")
797
    fd.write("X" * 1048576)
798
    fd.write("\n")
799
    fd.write("\n".join(data))
800
    fd.write("\n")
801
    fd.close()
802
    for i in range(1, 30):
803
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
804

    
805

    
806
class TestFileLock(unittest.TestCase):
807
  """Test case for the FileLock class"""
808

    
809
  def setUp(self):
810
    self.tmpfile = tempfile.NamedTemporaryFile()
811
    self.lock = utils.FileLock(self.tmpfile.name)
812

    
813
  def testSharedNonblocking(self):
814
    self.lock.Shared(blocking=False)
815
    self.lock.Close()
816

    
817
  def testExclusiveNonblocking(self):
818
    self.lock.Exclusive(blocking=False)
819
    self.lock.Close()
820

    
821
  def testUnlockNonblocking(self):
822
    self.lock.Unlock(blocking=False)
823
    self.lock.Close()
824

    
825
  def testSharedBlocking(self):
826
    self.lock.Shared(blocking=True)
827
    self.lock.Close()
828

    
829
  def testExclusiveBlocking(self):
830
    self.lock.Exclusive(blocking=True)
831
    self.lock.Close()
832

    
833
  def testUnlockBlocking(self):
834
    self.lock.Unlock(blocking=True)
835
    self.lock.Close()
836

    
837
  def testSharedExclusiveUnlock(self):
838
    self.lock.Shared(blocking=False)
839
    self.lock.Exclusive(blocking=False)
840
    self.lock.Unlock(blocking=False)
841
    self.lock.Close()
842

    
843
  def testExclusiveSharedUnlock(self):
844
    self.lock.Exclusive(blocking=False)
845
    self.lock.Shared(blocking=False)
846
    self.lock.Unlock(blocking=False)
847
    self.lock.Close()
848

    
849
  def testCloseShared(self):
850
    self.lock.Close()
851
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
852

    
853
  def testCloseExclusive(self):
854
    self.lock.Close()
855
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
856

    
857
  def testCloseUnlock(self):
858
    self.lock.Close()
859
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
860

    
861

    
862
class TestTimeFunctions(unittest.TestCase):
863
  """Test case for time functions"""
864

    
865
  def runTest(self):
866
    self.assertEqual(utils.SplitTime(1), (1, 0))
867
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
868
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
869
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
870
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
871
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
872
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
873
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
874

    
875
    self.assertRaises(AssertionError, utils.SplitTime, -1)
876

    
877
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
878
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
879
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
880

    
881
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
882
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
883

    
884
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
885
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
886
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
887
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
888
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
889

    
890

    
891
class FieldSetTestCase(unittest.TestCase):
892
  """Test case for FieldSets"""
893

    
894
  def testSimpleMatch(self):
895
    f = utils.FieldSet("a", "b", "c", "def")
896
    self.failUnless(f.Matches("a"))
897
    self.failIf(f.Matches("d"), "Substring matched")
898
    self.failIf(f.Matches("defghi"), "Prefix string matched")
899
    self.failIf(f.NonMatching(["b", "c"]))
900
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
901
    self.failUnless(f.NonMatching(["a", "d"]))
902

    
903
  def testRegexMatch(self):
904
    f = utils.FieldSet("a", "b([0-9]+)", "c")
905
    self.failUnless(f.Matches("b1"))
906
    self.failUnless(f.Matches("b99"))
907
    self.failIf(f.Matches("b/1"))
908
    self.failIf(f.NonMatching(["b12", "c"]))
909
    self.failUnless(f.NonMatching(["a", "1"]))
910

    
911
class TestForceDictType(unittest.TestCase):
912
  """Test case for ForceDictType"""
913

    
914
  def setUp(self):
915
    self.key_types = {
916
      'a': constants.VTYPE_INT,
917
      'b': constants.VTYPE_BOOL,
918
      'c': constants.VTYPE_STRING,
919
      'd': constants.VTYPE_SIZE,
920
      }
921

    
922
  def _fdt(self, dict, allowed_values=None):
923
    if allowed_values is None:
924
      ForceDictType(dict, self.key_types)
925
    else:
926
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
927

    
928
    return dict
929

    
930
  def testSimpleDict(self):
931
    self.assertEqual(self._fdt({}), {})
932
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
933
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
934
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
935
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
936
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
937
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
938
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
939
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
940
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
941
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
942
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
943

    
944
  def testErrors(self):
945
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
946
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
947
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
948
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
949

    
950

    
951
class TestIsAbsNormPath(unittest.TestCase):
952
  """Testing case for IsProcessAlive"""
953

    
954
  def _pathTestHelper(self, path, result):
955
    if result:
956
      self.assert_(IsNormAbsPath(path),
957
          "Path %s should result absolute and normalized" % path)
958
    else:
959
      self.assert_(not IsNormAbsPath(path),
960
          "Path %s should not result absolute and normalized" % path)
961

    
962
  def testBase(self):
963
    self._pathTestHelper('/etc', True)
964
    self._pathTestHelper('/srv', True)
965
    self._pathTestHelper('etc', False)
966
    self._pathTestHelper('/etc/../root', False)
967
    self._pathTestHelper('/etc/', False)
968

    
969
if __name__ == '__main__':
970
  unittest.main()