Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 4d4a651d

History | View | Annotate | Download (33.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36
import string
37

    
38
import ganeti
39
import testutils
40
from ganeti import constants
41
from ganeti import utils
42
from ganeti import errors
43
from ganeti.utils import IsProcessAlive, RunCmd, \
44
     RemoveFile, MatchNameComponent, FormatUnit, \
45
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
46
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
47
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
48
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime
49

    
50
from ganeti.errors import LockError, UnitParseError, GenericError, \
51
     ProgrammerError
52

    
53

    
54
class TestIsProcessAlive(unittest.TestCase):
55
  """Testing case for IsProcessAlive"""
56

    
57
  def testExists(self):
58
    mypid = os.getpid()
59
    self.assert_(IsProcessAlive(mypid),
60
                 "can't find myself running")
61

    
62
  def testNotExisting(self):
63
    pid_non_existing = os.fork()
64
    if pid_non_existing == 0:
65
      os._exit(0)
66
    elif pid_non_existing < 0:
67
      raise SystemError("can't fork")
68
    os.waitpid(pid_non_existing, 0)
69
    self.assert_(not IsProcessAlive(pid_non_existing),
70
                 "nonexisting process detected")
71

    
72

    
73
class TestPidFileFunctions(unittest.TestCase):
74
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
75

    
76
  def setUp(self):
77
    self.dir = tempfile.mkdtemp()
78
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
79
    utils.DaemonPidFileName = self.f_dpn
80

    
81
  def testPidFileFunctions(self):
82
    pid_file = self.f_dpn('test')
83
    utils.WritePidFile('test')
84
    self.failUnless(os.path.exists(pid_file),
85
                    "PID file should have been created")
86
    read_pid = utils.ReadPidFile(pid_file)
87
    self.failUnlessEqual(read_pid, os.getpid())
88
    self.failUnless(utils.IsProcessAlive(read_pid))
89
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
90
    utils.RemovePidFile('test')
91
    self.failIf(os.path.exists(pid_file),
92
                "PID file should not exist anymore")
93
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
94
                         "ReadPidFile should return 0 for missing pid file")
95
    fh = open(pid_file, "w")
96
    fh.write("blah\n")
97
    fh.close()
98
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
99
                         "ReadPidFile should return 0 for invalid pid file")
100
    utils.RemovePidFile('test')
101
    self.failIf(os.path.exists(pid_file),
102
                "PID file should not exist anymore")
103

    
104
  def testKill(self):
105
    pid_file = self.f_dpn('child')
106
    r_fd, w_fd = os.pipe()
107
    new_pid = os.fork()
108
    if new_pid == 0: #child
109
      utils.WritePidFile('child')
110
      os.write(w_fd, 'a')
111
      signal.pause()
112
      os._exit(0)
113
      return
114
    # else we are in the parent
115
    # wait until the child has written the pid file
116
    os.read(r_fd, 1)
117
    read_pid = utils.ReadPidFile(pid_file)
118
    self.failUnlessEqual(read_pid, new_pid)
119
    self.failUnless(utils.IsProcessAlive(new_pid))
120
    utils.KillProcess(new_pid, waitpid=True)
121
    self.failIf(utils.IsProcessAlive(new_pid))
122
    utils.RemovePidFile('child')
123
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
124

    
125
  def tearDown(self):
126
    for name in os.listdir(self.dir):
127
      os.unlink(os.path.join(self.dir, name))
128
    os.rmdir(self.dir)
129

    
130

    
131
class TestRunCmd(testutils.GanetiTestCase):
132
  """Testing case for the RunCmd function"""
133

    
134
  def setUp(self):
135
    testutils.GanetiTestCase.setUp(self)
136
    self.magic = time.ctime() + " ganeti test"
137
    self.fname = self._CreateTempFile()
138

    
139
  def testOk(self):
140
    """Test successful exit code"""
141
    result = RunCmd("/bin/sh -c 'exit 0'")
142
    self.assertEqual(result.exit_code, 0)
143
    self.assertEqual(result.output, "")
144

    
145
  def testFail(self):
146
    """Test fail exit code"""
147
    result = RunCmd("/bin/sh -c 'exit 1'")
148
    self.assertEqual(result.exit_code, 1)
149
    self.assertEqual(result.output, "")
150

    
151
  def testStdout(self):
152
    """Test standard output"""
153
    cmd = 'echo -n "%s"' % self.magic
154
    result = RunCmd("/bin/sh -c '%s'" % cmd)
155
    self.assertEqual(result.stdout, self.magic)
156
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157
    self.assertEqual(result.output, "")
158
    self.assertFileContent(self.fname, self.magic)
159

    
160
  def testStderr(self):
161
    """Test standard error"""
162
    cmd = 'echo -n "%s"' % self.magic
163
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164
    self.assertEqual(result.stderr, self.magic)
165
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166
    self.assertEqual(result.output, "")
167
    self.assertFileContent(self.fname, self.magic)
168

    
169
  def testCombined(self):
170
    """Test combined output"""
171
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172
    expected = "A" + self.magic + "B" + self.magic
173
    result = RunCmd("/bin/sh -c '%s'" % cmd)
174
    self.assertEqual(result.output, expected)
175
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176
    self.assertEqual(result.output, "")
177
    self.assertFileContent(self.fname, expected)
178

    
179
  def testSignal(self):
180
    """Test signal"""
181
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182
    self.assertEqual(result.signal, 15)
183
    self.assertEqual(result.output, "")
184

    
185
  def testListRun(self):
186
    """Test list runs"""
187
    result = RunCmd(["true"])
188
    self.assertEqual(result.signal, None)
189
    self.assertEqual(result.exit_code, 0)
190
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
191
    self.assertEqual(result.signal, None)
192
    self.assertEqual(result.exit_code, 1)
193
    result = RunCmd(["echo", "-n", self.magic])
194
    self.assertEqual(result.signal, None)
195
    self.assertEqual(result.exit_code, 0)
196
    self.assertEqual(result.stdout, self.magic)
197

    
198
  def testFileEmptyOutput(self):
199
    """Test file output"""
200
    result = RunCmd(["true"], output=self.fname)
201
    self.assertEqual(result.signal, None)
202
    self.assertEqual(result.exit_code, 0)
203
    self.assertFileContent(self.fname, "")
204

    
205
  def testLang(self):
206
    """Test locale environment"""
207
    old_env = os.environ.copy()
208
    try:
209
      os.environ["LANG"] = "en_US.UTF-8"
210
      os.environ["LC_ALL"] = "en_US.UTF-8"
211
      result = RunCmd(["locale"])
212
      for line in result.output.splitlines():
213
        key, value = line.split("=", 1)
214
        # Ignore these variables, they're overridden by LC_ALL
215
        if key == "LANG" or key == "LANGUAGE":
216
          continue
217
        self.failIf(value and value != "C" and value != '"C"',
218
            "Variable %s is set to the invalid value '%s'" % (key, value))
219
    finally:
220
      os.environ = old_env
221

    
222
  def testDefaultCwd(self):
223
    """Test default working directory"""
224
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225

    
226
  def testCwd(self):
227
    """Test default working directory"""
228
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230
    cwd = os.getcwd()
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232

    
233

    
234
class TestRemoveFile(unittest.TestCase):
235
  """Test case for the RemoveFile function"""
236

    
237
  def setUp(self):
238
    """Create a temp dir and file for each case"""
239
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241
    os.close(fd)
242

    
243
  def tearDown(self):
244
    if os.path.exists(self.tmpfile):
245
      os.unlink(self.tmpfile)
246
    os.rmdir(self.tmpdir)
247

    
248

    
249
  def testIgnoreDirs(self):
250
    """Test that RemoveFile() ignores directories"""
251
    self.assertEqual(None, RemoveFile(self.tmpdir))
252

    
253

    
254
  def testIgnoreNotExisting(self):
255
    """Test that RemoveFile() ignores non-existing files"""
256
    RemoveFile(self.tmpfile)
257
    RemoveFile(self.tmpfile)
258

    
259

    
260
  def testRemoveFile(self):
261
    """Test that RemoveFile does remove a file"""
262
    RemoveFile(self.tmpfile)
263
    if os.path.exists(self.tmpfile):
264
      self.fail("File '%s' not removed" % self.tmpfile)
265

    
266

    
267
  def testRemoveSymlink(self):
268
    """Test that RemoveFile does remove symlinks"""
269
    symlink = self.tmpdir + "/symlink"
270
    os.symlink("no-such-file", symlink)
271
    RemoveFile(symlink)
272
    if os.path.exists(symlink):
273
      self.fail("File '%s' not removed" % symlink)
274
    os.symlink(self.tmpfile, symlink)
275
    RemoveFile(symlink)
276
    if os.path.exists(symlink):
277
      self.fail("File '%s' not removed" % symlink)
278

    
279

    
280
class TestRename(unittest.TestCase):
281
  """Test case for RenameFile"""
282

    
283
  def setUp(self):
284
    """Create a temporary directory"""
285
    self.tmpdir = tempfile.mkdtemp()
286
    self.tmpfile = os.path.join(self.tmpdir, "test1")
287

    
288
    # Touch the file
289
    open(self.tmpfile, "w").close()
290

    
291
  def tearDown(self):
292
    """Remove temporary directory"""
293
    shutil.rmtree(self.tmpdir)
294

    
295
  def testSimpleRename1(self):
296
    """Simple rename 1"""
297
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
298

    
299
  def testSimpleRename2(self):
300
    """Simple rename 2"""
301
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
302
                     mkdir=True)
303

    
304
  def testRenameMkdir(self):
305
    """Rename with mkdir"""
306
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
307
                     mkdir=True)
308

    
309

    
310
class TestMatchNameComponent(unittest.TestCase):
311
  """Test case for the MatchNameComponent function"""
312

    
313
  def testEmptyList(self):
314
    """Test that there is no match against an empty list"""
315

    
316
    self.failUnlessEqual(MatchNameComponent("", []), None)
317
    self.failUnlessEqual(MatchNameComponent("test", []), None)
318

    
319
  def testSingleMatch(self):
320
    """Test that a single match is performed correctly"""
321
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
322
    for key in "test2", "test2.example", "test2.example.com":
323
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
324

    
325
  def testMultipleMatches(self):
326
    """Test that a multiple match is returned as None"""
327
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
328
    for key in "test1", "test1.example":
329
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
330

    
331

    
332
class TestFormatUnit(unittest.TestCase):
333
  """Test case for the FormatUnit function"""
334

    
335
  def testMiB(self):
336
    self.assertEqual(FormatUnit(1, 'h'), '1M')
337
    self.assertEqual(FormatUnit(100, 'h'), '100M')
338
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
339

    
340
    self.assertEqual(FormatUnit(1, 'm'), '1')
341
    self.assertEqual(FormatUnit(100, 'm'), '100')
342
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
343

    
344
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
345
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
346
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
347
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
348

    
349
  def testGiB(self):
350
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
351
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
352
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
353
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
354

    
355
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
356
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
357
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
358
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
359

    
360
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
361
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
362
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
363

    
364
  def testTiB(self):
365
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
366
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
367
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
368

    
369
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
370
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
371
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
372

    
373
class TestParseUnit(unittest.TestCase):
374
  """Test case for the ParseUnit function"""
375

    
376
  SCALES = (('', 1),
377
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
378
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
379
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
380

    
381
  def testRounding(self):
382
    self.assertEqual(ParseUnit('0'), 0)
383
    self.assertEqual(ParseUnit('1'), 4)
384
    self.assertEqual(ParseUnit('2'), 4)
385
    self.assertEqual(ParseUnit('3'), 4)
386

    
387
    self.assertEqual(ParseUnit('124'), 124)
388
    self.assertEqual(ParseUnit('125'), 128)
389
    self.assertEqual(ParseUnit('126'), 128)
390
    self.assertEqual(ParseUnit('127'), 128)
391
    self.assertEqual(ParseUnit('128'), 128)
392
    self.assertEqual(ParseUnit('129'), 132)
393
    self.assertEqual(ParseUnit('130'), 132)
394

    
395
  def testFloating(self):
396
    self.assertEqual(ParseUnit('0'), 0)
397
    self.assertEqual(ParseUnit('0.5'), 4)
398
    self.assertEqual(ParseUnit('1.75'), 4)
399
    self.assertEqual(ParseUnit('1.99'), 4)
400
    self.assertEqual(ParseUnit('2.00'), 4)
401
    self.assertEqual(ParseUnit('2.01'), 4)
402
    self.assertEqual(ParseUnit('3.99'), 4)
403
    self.assertEqual(ParseUnit('4.00'), 4)
404
    self.assertEqual(ParseUnit('4.01'), 8)
405
    self.assertEqual(ParseUnit('1.5G'), 1536)
406
    self.assertEqual(ParseUnit('1.8G'), 1844)
407
    self.assertEqual(ParseUnit('8.28T'), 8682212)
408

    
409
  def testSuffixes(self):
410
    for sep in ('', ' ', '   ', "\t", "\t "):
411
      for suffix, scale in TestParseUnit.SCALES:
412
        for func in (lambda x: x, str.lower, str.upper):
413
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
414
                           1024 * scale)
415

    
416
  def testInvalidInput(self):
417
    for sep in ('-', '_', ',', 'a'):
418
      for suffix, _ in TestParseUnit.SCALES:
419
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
420

    
421
    for suffix, _ in TestParseUnit.SCALES:
422
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
423

    
424

    
425
class TestSshKeys(testutils.GanetiTestCase):
426
  """Test case for the AddAuthorizedKey function"""
427

    
428
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
429
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
430
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
431

    
432
  def setUp(self):
433
    testutils.GanetiTestCase.setUp(self)
434
    self.tmpname = self._CreateTempFile()
435
    handle = open(self.tmpname, 'w')
436
    try:
437
      handle.write("%s\n" % TestSshKeys.KEY_A)
438
      handle.write("%s\n" % TestSshKeys.KEY_B)
439
    finally:
440
      handle.close()
441

    
442
  def testAddingNewKey(self):
443
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
444

    
445
    self.assertFileContent(self.tmpname,
446
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
447
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
448
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
449
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
450

    
451
  def testAddingAlmostButNotCompletelyTheSameKey(self):
452
    AddAuthorizedKey(self.tmpname,
453
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
454

    
455
    self.assertFileContent(self.tmpname,
456
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
457
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
458
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
459
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
460

    
461
  def testAddingExistingKeyWithSomeMoreSpaces(self):
462
    AddAuthorizedKey(self.tmpname,
463
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
464

    
465
    self.assertFileContent(self.tmpname,
466
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
467
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
468
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
469

    
470
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
471
    RemoveAuthorizedKey(self.tmpname,
472
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
473

    
474
    self.assertFileContent(self.tmpname,
475
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
476
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
477

    
478
  def testRemovingNonExistingKey(self):
479
    RemoveAuthorizedKey(self.tmpname,
480
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
481

    
482
    self.assertFileContent(self.tmpname,
483
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
484
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
485
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
486

    
487

    
488
class TestEtcHosts(testutils.GanetiTestCase):
489
  """Test functions modifying /etc/hosts"""
490

    
491
  def setUp(self):
492
    testutils.GanetiTestCase.setUp(self)
493
    self.tmpname = self._CreateTempFile()
494
    handle = open(self.tmpname, 'w')
495
    try:
496
      handle.write('# This is a test file for /etc/hosts\n')
497
      handle.write('127.0.0.1\tlocalhost\n')
498
      handle.write('192.168.1.1 router gw\n')
499
    finally:
500
      handle.close()
501

    
502
  def testSettingNewIp(self):
503
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
504

    
505
    self.assertFileContent(self.tmpname,
506
      "# This is a test file for /etc/hosts\n"
507
      "127.0.0.1\tlocalhost\n"
508
      "192.168.1.1 router gw\n"
509
      "1.2.3.4\tmyhost.domain.tld myhost\n")
510
    self.assertFileMode(self.tmpname, 0644)
511

    
512
  def testSettingExistingIp(self):
513
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
514
                     ['myhost'])
515

    
516
    self.assertFileContent(self.tmpname,
517
      "# This is a test file for /etc/hosts\n"
518
      "127.0.0.1\tlocalhost\n"
519
      "192.168.1.1\tmyhost.domain.tld myhost\n")
520
    self.assertFileMode(self.tmpname, 0644)
521

    
522
  def testSettingDuplicateName(self):
523
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
524

    
525
    self.assertFileContent(self.tmpname,
526
      "# This is a test file for /etc/hosts\n"
527
      "127.0.0.1\tlocalhost\n"
528
      "192.168.1.1 router gw\n"
529
      "1.2.3.4\tmyhost\n")
530
    self.assertFileMode(self.tmpname, 0644)
531

    
532
  def testRemovingExistingHost(self):
533
    RemoveEtcHostsEntry(self.tmpname, 'router')
534

    
535
    self.assertFileContent(self.tmpname,
536
      "# This is a test file for /etc/hosts\n"
537
      "127.0.0.1\tlocalhost\n"
538
      "192.168.1.1 gw\n")
539
    self.assertFileMode(self.tmpname, 0644)
540

    
541
  def testRemovingSingleExistingHost(self):
542
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
543

    
544
    self.assertFileContent(self.tmpname,
545
      "# This is a test file for /etc/hosts\n"
546
      "192.168.1.1 router gw\n")
547
    self.assertFileMode(self.tmpname, 0644)
548

    
549
  def testRemovingNonExistingHost(self):
550
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
551

    
552
    self.assertFileContent(self.tmpname,
553
      "# This is a test file for /etc/hosts\n"
554
      "127.0.0.1\tlocalhost\n"
555
      "192.168.1.1 router gw\n")
556
    self.assertFileMode(self.tmpname, 0644)
557

    
558
  def testRemovingAlias(self):
559
    RemoveEtcHostsEntry(self.tmpname, 'gw')
560

    
561
    self.assertFileContent(self.tmpname,
562
      "# This is a test file for /etc/hosts\n"
563
      "127.0.0.1\tlocalhost\n"
564
      "192.168.1.1 router\n")
565
    self.assertFileMode(self.tmpname, 0644)
566

    
567

    
568
class TestShellQuoting(unittest.TestCase):
569
  """Test case for shell quoting functions"""
570

    
571
  def testShellQuote(self):
572
    self.assertEqual(ShellQuote('abc'), "abc")
573
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
574
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
575
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
576
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
577

    
578
  def testShellQuoteArgs(self):
579
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
580
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
581
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
582

    
583

    
584
class TestTcpPing(unittest.TestCase):
585
  """Testcase for TCP version of ping - against listen(2)ing port"""
586

    
587
  def setUp(self):
588
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
589
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
590
    self.listenerport = self.listener.getsockname()[1]
591
    self.listener.listen(1)
592

    
593
  def tearDown(self):
594
    self.listener.shutdown(socket.SHUT_RDWR)
595
    del self.listener
596
    del self.listenerport
597

    
598
  def testTcpPingToLocalHostAccept(self):
599
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
600
                         self.listenerport,
601
                         timeout=10,
602
                         live_port_needed=True,
603
                         source=constants.LOCALHOST_IP_ADDRESS,
604
                         ),
605
                 "failed to connect to test listener")
606

    
607
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
608
                         self.listenerport,
609
                         timeout=10,
610
                         live_port_needed=True,
611
                         ),
612
                 "failed to connect to test listener (no source)")
613

    
614

    
615
class TestTcpPingDeaf(unittest.TestCase):
616
  """Testcase for TCP version of ping - against non listen(2)ing port"""
617

    
618
  def setUp(self):
619
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
620
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
621
    self.deaflistenerport = self.deaflistener.getsockname()[1]
622

    
623
  def tearDown(self):
624
    del self.deaflistener
625
    del self.deaflistenerport
626

    
627
  def testTcpPingToLocalHostAcceptDeaf(self):
628
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
629
                        self.deaflistenerport,
630
                        timeout=constants.TCP_PING_TIMEOUT,
631
                        live_port_needed=True,
632
                        source=constants.LOCALHOST_IP_ADDRESS,
633
                        ), # need successful connect(2)
634
                "successfully connected to deaf listener")
635

    
636
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
637
                        self.deaflistenerport,
638
                        timeout=constants.TCP_PING_TIMEOUT,
639
                        live_port_needed=True,
640
                        ), # need successful connect(2)
641
                "successfully connected to deaf listener (no source addr)")
642

    
643
  def testTcpPingToLocalHostNoAccept(self):
644
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
645
                         self.deaflistenerport,
646
                         timeout=constants.TCP_PING_TIMEOUT,
647
                         live_port_needed=False,
648
                         source=constants.LOCALHOST_IP_ADDRESS,
649
                         ), # ECONNREFUSED is OK
650
                 "failed to ping alive host on deaf port")
651

    
652
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
653
                         self.deaflistenerport,
654
                         timeout=constants.TCP_PING_TIMEOUT,
655
                         live_port_needed=False,
656
                         ), # ECONNREFUSED is OK
657
                 "failed to ping alive host on deaf port (no source addr)")
658

    
659

    
660
class TestOwnIpAddress(unittest.TestCase):
661
  """Testcase for OwnIpAddress"""
662

    
663
  def testOwnLoopback(self):
664
    """check having the loopback ip"""
665
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
666
                    "Should own the loopback address")
667

    
668
  def testNowOwnAddress(self):
669
    """check that I don't own an address"""
670

    
671
    # network 192.0.2.0/24 is reserved for test/documentation as per
672
    # rfc 3330, so we *should* not have an address of this range... if
673
    # this fails, we should extend the test to multiple addresses
674
    DST_IP = "192.0.2.1"
675
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
676

    
677

    
678
class TestListVisibleFiles(unittest.TestCase):
679
  """Test case for ListVisibleFiles"""
680

    
681
  def setUp(self):
682
    self.path = tempfile.mkdtemp()
683

    
684
  def tearDown(self):
685
    shutil.rmtree(self.path)
686

    
687
  def _test(self, files, expected):
688
    # Sort a copy
689
    expected = expected[:]
690
    expected.sort()
691

    
692
    for name in files:
693
      f = open(os.path.join(self.path, name), 'w')
694
      try:
695
        f.write("Test\n")
696
      finally:
697
        f.close()
698

    
699
    found = ListVisibleFiles(self.path)
700
    found.sort()
701

    
702
    self.assertEqual(found, expected)
703

    
704
  def testAllVisible(self):
705
    files = ["a", "b", "c"]
706
    expected = files
707
    self._test(files, expected)
708

    
709
  def testNoneVisible(self):
710
    files = [".a", ".b", ".c"]
711
    expected = []
712
    self._test(files, expected)
713

    
714
  def testSomeVisible(self):
715
    files = ["a", "b", ".c"]
716
    expected = ["a", "b"]
717
    self._test(files, expected)
718

    
719

    
720
class TestNewUUID(unittest.TestCase):
721
  """Test case for NewUUID"""
722

    
723
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
724
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
725

    
726
  def runTest(self):
727
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
728

    
729

    
730
class TestUniqueSequence(unittest.TestCase):
731
  """Test case for UniqueSequence"""
732

    
733
  def _test(self, input, expected):
734
    self.assertEqual(utils.UniqueSequence(input), expected)
735

    
736
  def runTest(self):
737
    # Ordered input
738
    self._test([1, 2, 3], [1, 2, 3])
739
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
740
    self._test([1, 2, 2, 3], [1, 2, 3])
741
    self._test([1, 2, 3, 3], [1, 2, 3])
742

    
743
    # Unordered input
744
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
745
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
746

    
747
    # Strings
748
    self._test(["a", "a"], ["a"])
749
    self._test(["a", "b"], ["a", "b"])
750
    self._test(["a", "b", "a"], ["a", "b"])
751

    
752

    
753
class TestFirstFree(unittest.TestCase):
754
  """Test case for the FirstFree function"""
755

    
756
  def test(self):
757
    """Test FirstFree"""
758
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
759
    self.failUnlessEqual(FirstFree([]), None)
760
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
761
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
762
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
763

    
764

    
765
class TestTailFile(testutils.GanetiTestCase):
766
  """Test case for the TailFile function"""
767

    
768
  def testEmpty(self):
769
    fname = self._CreateTempFile()
770
    self.failUnlessEqual(TailFile(fname), [])
771
    self.failUnlessEqual(TailFile(fname, lines=25), [])
772

    
773
  def testAllLines(self):
774
    data = ["test %d" % i for i in range(30)]
775
    for i in range(30):
776
      fname = self._CreateTempFile()
777
      fd = open(fname, "w")
778
      fd.write("\n".join(data[:i]))
779
      if i > 0:
780
        fd.write("\n")
781
      fd.close()
782
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
783

    
784
  def testPartialLines(self):
785
    data = ["test %d" % i for i in range(30)]
786
    fname = self._CreateTempFile()
787
    fd = open(fname, "w")
788
    fd.write("\n".join(data))
789
    fd.write("\n")
790
    fd.close()
791
    for i in range(1, 30):
792
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
793

    
794
  def testBigFile(self):
795
    data = ["test %d" % i for i in range(30)]
796
    fname = self._CreateTempFile()
797
    fd = open(fname, "w")
798
    fd.write("X" * 1048576)
799
    fd.write("\n")
800
    fd.write("\n".join(data))
801
    fd.write("\n")
802
    fd.close()
803
    for i in range(1, 30):
804
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
805

    
806

    
807
class TestFileLock(unittest.TestCase):
808
  """Test case for the FileLock class"""
809

    
810
  def setUp(self):
811
    self.tmpfile = tempfile.NamedTemporaryFile()
812
    self.lock = utils.FileLock(self.tmpfile.name)
813

    
814
  def testSharedNonblocking(self):
815
    self.lock.Shared(blocking=False)
816
    self.lock.Close()
817

    
818
  def testExclusiveNonblocking(self):
819
    self.lock.Exclusive(blocking=False)
820
    self.lock.Close()
821

    
822
  def testUnlockNonblocking(self):
823
    self.lock.Unlock(blocking=False)
824
    self.lock.Close()
825

    
826
  def testSharedBlocking(self):
827
    self.lock.Shared(blocking=True)
828
    self.lock.Close()
829

    
830
  def testExclusiveBlocking(self):
831
    self.lock.Exclusive(blocking=True)
832
    self.lock.Close()
833

    
834
  def testUnlockBlocking(self):
835
    self.lock.Unlock(blocking=True)
836
    self.lock.Close()
837

    
838
  def testSharedExclusiveUnlock(self):
839
    self.lock.Shared(blocking=False)
840
    self.lock.Exclusive(blocking=False)
841
    self.lock.Unlock(blocking=False)
842
    self.lock.Close()
843

    
844
  def testExclusiveSharedUnlock(self):
845
    self.lock.Exclusive(blocking=False)
846
    self.lock.Shared(blocking=False)
847
    self.lock.Unlock(blocking=False)
848
    self.lock.Close()
849

    
850
  def testCloseShared(self):
851
    self.lock.Close()
852
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
853

    
854
  def testCloseExclusive(self):
855
    self.lock.Close()
856
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
857

    
858
  def testCloseUnlock(self):
859
    self.lock.Close()
860
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
861

    
862

    
863
class TestTimeFunctions(unittest.TestCase):
864
  """Test case for time functions"""
865

    
866
  def runTest(self):
867
    self.assertEqual(utils.SplitTime(1), (1, 0))
868
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
869
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
870
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
871
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
872
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
873
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
874
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
875

    
876
    self.assertRaises(AssertionError, utils.SplitTime, -1)
877

    
878
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
879
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
880
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
881

    
882
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
883
                     1218448917.481)
884
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
885

    
886
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
887
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
888
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
889
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
890
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
891

    
892

    
893
class FieldSetTestCase(unittest.TestCase):
894
  """Test case for FieldSets"""
895

    
896
  def testSimpleMatch(self):
897
    f = utils.FieldSet("a", "b", "c", "def")
898
    self.failUnless(f.Matches("a"))
899
    self.failIf(f.Matches("d"), "Substring matched")
900
    self.failIf(f.Matches("defghi"), "Prefix string matched")
901
    self.failIf(f.NonMatching(["b", "c"]))
902
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
903
    self.failUnless(f.NonMatching(["a", "d"]))
904

    
905
  def testRegexMatch(self):
906
    f = utils.FieldSet("a", "b([0-9]+)", "c")
907
    self.failUnless(f.Matches("b1"))
908
    self.failUnless(f.Matches("b99"))
909
    self.failIf(f.Matches("b/1"))
910
    self.failIf(f.NonMatching(["b12", "c"]))
911
    self.failUnless(f.NonMatching(["a", "1"]))
912

    
913
class TestForceDictType(unittest.TestCase):
914
  """Test case for ForceDictType"""
915

    
916
  def setUp(self):
917
    self.key_types = {
918
      'a': constants.VTYPE_INT,
919
      'b': constants.VTYPE_BOOL,
920
      'c': constants.VTYPE_STRING,
921
      'd': constants.VTYPE_SIZE,
922
      }
923

    
924
  def _fdt(self, dict, allowed_values=None):
925
    if allowed_values is None:
926
      ForceDictType(dict, self.key_types)
927
    else:
928
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
929

    
930
    return dict
931

    
932
  def testSimpleDict(self):
933
    self.assertEqual(self._fdt({}), {})
934
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
935
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
936
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
937
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
938
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
939
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
940
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
941
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
942
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
943
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
944
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
945

    
946
  def testErrors(self):
947
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
948
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
949
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
950
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
951

    
952

    
953
class TestIsAbsNormPath(unittest.TestCase):
954
  """Testing case for IsProcessAlive"""
955

    
956
  def _pathTestHelper(self, path, result):
957
    if result:
958
      self.assert_(IsNormAbsPath(path),
959
          "Path %s should result absolute and normalized" % path)
960
    else:
961
      self.assert_(not IsNormAbsPath(path),
962
          "Path %s should not result absolute and normalized" % path)
963

    
964
  def testBase(self):
965
    self._pathTestHelper('/etc', True)
966
    self._pathTestHelper('/srv', True)
967
    self._pathTestHelper('etc', False)
968
    self._pathTestHelper('/etc/../root', False)
969
    self._pathTestHelper('/etc/', False)
970

    
971

    
972
class TestSafeEncode(unittest.TestCase):
973
  """Test case for SafeEncode"""
974

    
975
  def testAscii(self):
976
    for txt in [string.digits, string.letters, string.punctuation]:
977
      self.failUnlessEqual(txt, SafeEncode(txt))
978

    
979
  def testDoubleEncode(self):
980
    for i in range(255):
981
      txt = SafeEncode(chr(i))
982
      self.failUnlessEqual(txt, SafeEncode(txt))
983

    
984
  def testUnicode(self):
985
    # 1024 is high enough to catch non-direct ASCII mappings
986
    for i in range(1024):
987
      txt = SafeEncode(unichr(i))
988
      self.failUnlessEqual(txt, SafeEncode(txt))
989

    
990

    
991
class TestFormatTime(unittest.TestCase):
992
  """Testing case for FormatTime"""
993

    
994
  def testNone(self):
995
    self.failUnlessEqual(FormatTime(None), "N/A")
996

    
997
  def testInvalid(self):
998
    self.failUnlessEqual(FormatTime(()), "N/A")
999

    
1000
  def testNow(self):
1001
    # tests that we accept time.time input
1002
    FormatTime(time.time())
1003
    # tests that we accept int input
1004
    FormatTime(int(time.time()))
1005

    
1006

    
1007
if __name__ == '__main__':
1008
  unittest.main()