Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 73027ed2

History | View | Annotate | Download (39.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36
import string
37
import fcntl
38

    
39
import ganeti
40
import testutils
41
from ganeti import constants
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti.utils import IsProcessAlive, RunCmd, \
45
     RemoveFile, MatchNameComponent, FormatUnit, \
46
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
47
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
48
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
49
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
50
     UnescapeAndSplit
51

    
52
from ganeti.errors import LockError, UnitParseError, GenericError, \
53
     ProgrammerError
54

    
55

    
56
class TestIsProcessAlive(unittest.TestCase):
57
  """Testing case for IsProcessAlive"""
58

    
59
  def testExists(self):
60
    mypid = os.getpid()
61
    self.assert_(IsProcessAlive(mypid),
62
                 "can't find myself running")
63

    
64
  def testNotExisting(self):
65
    pid_non_existing = os.fork()
66
    if pid_non_existing == 0:
67
      os._exit(0)
68
    elif pid_non_existing < 0:
69
      raise SystemError("can't fork")
70
    os.waitpid(pid_non_existing, 0)
71
    self.assert_(not IsProcessAlive(pid_non_existing),
72
                 "nonexisting process detected")
73

    
74

    
75
class TestPidFileFunctions(unittest.TestCase):
76
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
77

    
78
  def setUp(self):
79
    self.dir = tempfile.mkdtemp()
80
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
81
    utils.DaemonPidFileName = self.f_dpn
82

    
83
  def testPidFileFunctions(self):
84
    pid_file = self.f_dpn('test')
85
    utils.WritePidFile('test')
86
    self.failUnless(os.path.exists(pid_file),
87
                    "PID file should have been created")
88
    read_pid = utils.ReadPidFile(pid_file)
89
    self.failUnlessEqual(read_pid, os.getpid())
90
    self.failUnless(utils.IsProcessAlive(read_pid))
91
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
92
    utils.RemovePidFile('test')
93
    self.failIf(os.path.exists(pid_file),
94
                "PID file should not exist anymore")
95
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
96
                         "ReadPidFile should return 0 for missing pid file")
97
    fh = open(pid_file, "w")
98
    fh.write("blah\n")
99
    fh.close()
100
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
101
                         "ReadPidFile should return 0 for invalid pid file")
102
    utils.RemovePidFile('test')
103
    self.failIf(os.path.exists(pid_file),
104
                "PID file should not exist anymore")
105

    
106
  def testKill(self):
107
    pid_file = self.f_dpn('child')
108
    r_fd, w_fd = os.pipe()
109
    new_pid = os.fork()
110
    if new_pid == 0: #child
111
      utils.WritePidFile('child')
112
      os.write(w_fd, 'a')
113
      signal.pause()
114
      os._exit(0)
115
      return
116
    # else we are in the parent
117
    # wait until the child has written the pid file
118
    os.read(r_fd, 1)
119
    read_pid = utils.ReadPidFile(pid_file)
120
    self.failUnlessEqual(read_pid, new_pid)
121
    self.failUnless(utils.IsProcessAlive(new_pid))
122
    utils.KillProcess(new_pid, waitpid=True)
123
    self.failIf(utils.IsProcessAlive(new_pid))
124
    utils.RemovePidFile('child')
125
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
126

    
127
  def tearDown(self):
128
    for name in os.listdir(self.dir):
129
      os.unlink(os.path.join(self.dir, name))
130
    os.rmdir(self.dir)
131

    
132

    
133
class TestRunCmd(testutils.GanetiTestCase):
134
  """Testing case for the RunCmd function"""
135

    
136
  def setUp(self):
137
    testutils.GanetiTestCase.setUp(self)
138
    self.magic = time.ctime() + " ganeti test"
139
    self.fname = self._CreateTempFile()
140

    
141
  def testOk(self):
142
    """Test successful exit code"""
143
    result = RunCmd("/bin/sh -c 'exit 0'")
144
    self.assertEqual(result.exit_code, 0)
145
    self.assertEqual(result.output, "")
146

    
147
  def testFail(self):
148
    """Test fail exit code"""
149
    result = RunCmd("/bin/sh -c 'exit 1'")
150
    self.assertEqual(result.exit_code, 1)
151
    self.assertEqual(result.output, "")
152

    
153
  def testStdout(self):
154
    """Test standard output"""
155
    cmd = 'echo -n "%s"' % self.magic
156
    result = RunCmd("/bin/sh -c '%s'" % cmd)
157
    self.assertEqual(result.stdout, self.magic)
158
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
159
    self.assertEqual(result.output, "")
160
    self.assertFileContent(self.fname, self.magic)
161

    
162
  def testStderr(self):
163
    """Test standard error"""
164
    cmd = 'echo -n "%s"' % self.magic
165
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
166
    self.assertEqual(result.stderr, self.magic)
167
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
168
    self.assertEqual(result.output, "")
169
    self.assertFileContent(self.fname, self.magic)
170

    
171
  def testCombined(self):
172
    """Test combined output"""
173
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
174
    expected = "A" + self.magic + "B" + self.magic
175
    result = RunCmd("/bin/sh -c '%s'" % cmd)
176
    self.assertEqual(result.output, expected)
177
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
178
    self.assertEqual(result.output, "")
179
    self.assertFileContent(self.fname, expected)
180

    
181
  def testSignal(self):
182
    """Test signal"""
183
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
184
    self.assertEqual(result.signal, 15)
185
    self.assertEqual(result.output, "")
186

    
187
  def testListRun(self):
188
    """Test list runs"""
189
    result = RunCmd(["true"])
190
    self.assertEqual(result.signal, None)
191
    self.assertEqual(result.exit_code, 0)
192
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
193
    self.assertEqual(result.signal, None)
194
    self.assertEqual(result.exit_code, 1)
195
    result = RunCmd(["echo", "-n", self.magic])
196
    self.assertEqual(result.signal, None)
197
    self.assertEqual(result.exit_code, 0)
198
    self.assertEqual(result.stdout, self.magic)
199

    
200
  def testFileEmptyOutput(self):
201
    """Test file output"""
202
    result = RunCmd(["true"], output=self.fname)
203
    self.assertEqual(result.signal, None)
204
    self.assertEqual(result.exit_code, 0)
205
    self.assertFileContent(self.fname, "")
206

    
207
  def testLang(self):
208
    """Test locale environment"""
209
    old_env = os.environ.copy()
210
    try:
211
      os.environ["LANG"] = "en_US.UTF-8"
212
      os.environ["LC_ALL"] = "en_US.UTF-8"
213
      result = RunCmd(["locale"])
214
      for line in result.output.splitlines():
215
        key, value = line.split("=", 1)
216
        # Ignore these variables, they're overridden by LC_ALL
217
        if key == "LANG" or key == "LANGUAGE":
218
          continue
219
        self.failIf(value and value != "C" and value != '"C"',
220
            "Variable %s is set to the invalid value '%s'" % (key, value))
221
    finally:
222
      os.environ = old_env
223

    
224
  def testDefaultCwd(self):
225
    """Test default working directory"""
226
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
227

    
228
  def testCwd(self):
229
    """Test default working directory"""
230
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
232
    cwd = os.getcwd()
233
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
234

    
235

    
236
class TestSetCloseOnExecFlag(unittest.TestCase):
237
  """Tests for SetCloseOnExecFlag"""
238

    
239
  def setUp(self):
240
    self.tmpfile = tempfile.TemporaryFile()
241

    
242
  def testEnable(self):
243
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
244
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
245
                    fcntl.FD_CLOEXEC)
246

    
247
  def testDisable(self):
248
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
249
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
250
                fcntl.FD_CLOEXEC)
251

    
252

    
253
class TestRemoveFile(unittest.TestCase):
254
  """Test case for the RemoveFile function"""
255

    
256
  def setUp(self):
257
    """Create a temp dir and file for each case"""
258
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
259
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
260
    os.close(fd)
261

    
262
  def tearDown(self):
263
    if os.path.exists(self.tmpfile):
264
      os.unlink(self.tmpfile)
265
    os.rmdir(self.tmpdir)
266

    
267
  def testIgnoreDirs(self):
268
    """Test that RemoveFile() ignores directories"""
269
    self.assertEqual(None, RemoveFile(self.tmpdir))
270

    
271
  def testIgnoreNotExisting(self):
272
    """Test that RemoveFile() ignores non-existing files"""
273
    RemoveFile(self.tmpfile)
274
    RemoveFile(self.tmpfile)
275

    
276
  def testRemoveFile(self):
277
    """Test that RemoveFile does remove a file"""
278
    RemoveFile(self.tmpfile)
279
    if os.path.exists(self.tmpfile):
280
      self.fail("File '%s' not removed" % self.tmpfile)
281

    
282
  def testRemoveSymlink(self):
283
    """Test that RemoveFile does remove symlinks"""
284
    symlink = self.tmpdir + "/symlink"
285
    os.symlink("no-such-file", symlink)
286
    RemoveFile(symlink)
287
    if os.path.exists(symlink):
288
      self.fail("File '%s' not removed" % symlink)
289
    os.symlink(self.tmpfile, symlink)
290
    RemoveFile(symlink)
291
    if os.path.exists(symlink):
292
      self.fail("File '%s' not removed" % symlink)
293

    
294

    
295
class TestRename(unittest.TestCase):
296
  """Test case for RenameFile"""
297

    
298
  def setUp(self):
299
    """Create a temporary directory"""
300
    self.tmpdir = tempfile.mkdtemp()
301
    self.tmpfile = os.path.join(self.tmpdir, "test1")
302

    
303
    # Touch the file
304
    open(self.tmpfile, "w").close()
305

    
306
  def tearDown(self):
307
    """Remove temporary directory"""
308
    shutil.rmtree(self.tmpdir)
309

    
310
  def testSimpleRename1(self):
311
    """Simple rename 1"""
312
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
313
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
314

    
315
  def testSimpleRename2(self):
316
    """Simple rename 2"""
317
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
318
                     mkdir=True)
319
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
320

    
321
  def testRenameMkdir(self):
322
    """Rename with mkdir"""
323
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
324
                     mkdir=True)
325
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
326
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
327

    
328
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
329
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
330
                     mkdir=True)
331
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
332
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
333
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
334

    
335

    
336
class TestMatchNameComponent(unittest.TestCase):
337
  """Test case for the MatchNameComponent function"""
338

    
339
  def testEmptyList(self):
340
    """Test that there is no match against an empty list"""
341

    
342
    self.failUnlessEqual(MatchNameComponent("", []), None)
343
    self.failUnlessEqual(MatchNameComponent("test", []), None)
344

    
345
  def testSingleMatch(self):
346
    """Test that a single match is performed correctly"""
347
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
348
    for key in "test2", "test2.example", "test2.example.com":
349
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
350

    
351
  def testMultipleMatches(self):
352
    """Test that a multiple match is returned as None"""
353
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
354
    for key in "test1", "test1.example":
355
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
356

    
357
  def testFullMatch(self):
358
    """Test that a full match is returned correctly"""
359
    key1 = "test1"
360
    key2 = "test1.example"
361
    mlist = [key2, key2 + ".com"]
362
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
363
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
364

    
365
  def testCaseInsensitivePartialMatch(self):
366
    """Test for the case_insensitive keyword"""
367
    mlist = ["test1.example.com", "test2.example.net"]
368
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
369
                     "test2.example.net")
370
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
371
                     "test2.example.net")
372
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
373
                     "test2.example.net")
374
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
375
                     "test2.example.net")
376

    
377

    
378
  def testCaseInsensitiveFullMatch(self):
379
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
380
    # Between the two ts1 a full string match non-case insensitive should work
381
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
382
                     None)
383
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
384
                     "ts1.ex")
385
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
386
                     "ts1.ex")
387
    # Between the two ts2 only case differs, so only case-match works
388
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
389
                     "ts2.ex")
390
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
391
                     "Ts2.ex")
392
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
393
                     None)
394

    
395

    
396
class TestFormatUnit(unittest.TestCase):
397
  """Test case for the FormatUnit function"""
398

    
399
  def testMiB(self):
400
    self.assertEqual(FormatUnit(1, 'h'), '1M')
401
    self.assertEqual(FormatUnit(100, 'h'), '100M')
402
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
403

    
404
    self.assertEqual(FormatUnit(1, 'm'), '1')
405
    self.assertEqual(FormatUnit(100, 'm'), '100')
406
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
407

    
408
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
409
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
410
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
411
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
412

    
413
  def testGiB(self):
414
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
415
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
416
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
417
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
418

    
419
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
420
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
421
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
422
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
423

    
424
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
425
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
426
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
427

    
428
  def testTiB(self):
429
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
430
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
431
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
432

    
433
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
434
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
435
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
436

    
437
class TestParseUnit(unittest.TestCase):
438
  """Test case for the ParseUnit function"""
439

    
440
  SCALES = (('', 1),
441
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
442
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
443
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
444

    
445
  def testRounding(self):
446
    self.assertEqual(ParseUnit('0'), 0)
447
    self.assertEqual(ParseUnit('1'), 4)
448
    self.assertEqual(ParseUnit('2'), 4)
449
    self.assertEqual(ParseUnit('3'), 4)
450

    
451
    self.assertEqual(ParseUnit('124'), 124)
452
    self.assertEqual(ParseUnit('125'), 128)
453
    self.assertEqual(ParseUnit('126'), 128)
454
    self.assertEqual(ParseUnit('127'), 128)
455
    self.assertEqual(ParseUnit('128'), 128)
456
    self.assertEqual(ParseUnit('129'), 132)
457
    self.assertEqual(ParseUnit('130'), 132)
458

    
459
  def testFloating(self):
460
    self.assertEqual(ParseUnit('0'), 0)
461
    self.assertEqual(ParseUnit('0.5'), 4)
462
    self.assertEqual(ParseUnit('1.75'), 4)
463
    self.assertEqual(ParseUnit('1.99'), 4)
464
    self.assertEqual(ParseUnit('2.00'), 4)
465
    self.assertEqual(ParseUnit('2.01'), 4)
466
    self.assertEqual(ParseUnit('3.99'), 4)
467
    self.assertEqual(ParseUnit('4.00'), 4)
468
    self.assertEqual(ParseUnit('4.01'), 8)
469
    self.assertEqual(ParseUnit('1.5G'), 1536)
470
    self.assertEqual(ParseUnit('1.8G'), 1844)
471
    self.assertEqual(ParseUnit('8.28T'), 8682212)
472

    
473
  def testSuffixes(self):
474
    for sep in ('', ' ', '   ', "\t", "\t "):
475
      for suffix, scale in TestParseUnit.SCALES:
476
        for func in (lambda x: x, str.lower, str.upper):
477
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
478
                           1024 * scale)
479

    
480
  def testInvalidInput(self):
481
    for sep in ('-', '_', ',', 'a'):
482
      for suffix, _ in TestParseUnit.SCALES:
483
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
484

    
485
    for suffix, _ in TestParseUnit.SCALES:
486
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
487

    
488

    
489
class TestSshKeys(testutils.GanetiTestCase):
490
  """Test case for the AddAuthorizedKey function"""
491

    
492
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
493
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
494
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
495

    
496
  def setUp(self):
497
    testutils.GanetiTestCase.setUp(self)
498
    self.tmpname = self._CreateTempFile()
499
    handle = open(self.tmpname, 'w')
500
    try:
501
      handle.write("%s\n" % TestSshKeys.KEY_A)
502
      handle.write("%s\n" % TestSshKeys.KEY_B)
503
    finally:
504
      handle.close()
505

    
506
  def testAddingNewKey(self):
507
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
508

    
509
    self.assertFileContent(self.tmpname,
510
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
511
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
512
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
513
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
514

    
515
  def testAddingAlmostButNotCompletelyTheSameKey(self):
516
    AddAuthorizedKey(self.tmpname,
517
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
518

    
519
    self.assertFileContent(self.tmpname,
520
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
521
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
522
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
523
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
524

    
525
  def testAddingExistingKeyWithSomeMoreSpaces(self):
526
    AddAuthorizedKey(self.tmpname,
527
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
528

    
529
    self.assertFileContent(self.tmpname,
530
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
531
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
532
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
533

    
534
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
535
    RemoveAuthorizedKey(self.tmpname,
536
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
537

    
538
    self.assertFileContent(self.tmpname,
539
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
540
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
541

    
542
  def testRemovingNonExistingKey(self):
543
    RemoveAuthorizedKey(self.tmpname,
544
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
545

    
546
    self.assertFileContent(self.tmpname,
547
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
548
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
549
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
550

    
551

    
552
class TestEtcHosts(testutils.GanetiTestCase):
553
  """Test functions modifying /etc/hosts"""
554

    
555
  def setUp(self):
556
    testutils.GanetiTestCase.setUp(self)
557
    self.tmpname = self._CreateTempFile()
558
    handle = open(self.tmpname, 'w')
559
    try:
560
      handle.write('# This is a test file for /etc/hosts\n')
561
      handle.write('127.0.0.1\tlocalhost\n')
562
      handle.write('192.168.1.1 router gw\n')
563
    finally:
564
      handle.close()
565

    
566
  def testSettingNewIp(self):
567
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
568

    
569
    self.assertFileContent(self.tmpname,
570
      "# This is a test file for /etc/hosts\n"
571
      "127.0.0.1\tlocalhost\n"
572
      "192.168.1.1 router gw\n"
573
      "1.2.3.4\tmyhost.domain.tld myhost\n")
574
    self.assertFileMode(self.tmpname, 0644)
575

    
576
  def testSettingExistingIp(self):
577
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
578
                     ['myhost'])
579

    
580
    self.assertFileContent(self.tmpname,
581
      "# This is a test file for /etc/hosts\n"
582
      "127.0.0.1\tlocalhost\n"
583
      "192.168.1.1\tmyhost.domain.tld myhost\n")
584
    self.assertFileMode(self.tmpname, 0644)
585

    
586
  def testSettingDuplicateName(self):
587
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
588

    
589
    self.assertFileContent(self.tmpname,
590
      "# This is a test file for /etc/hosts\n"
591
      "127.0.0.1\tlocalhost\n"
592
      "192.168.1.1 router gw\n"
593
      "1.2.3.4\tmyhost\n")
594
    self.assertFileMode(self.tmpname, 0644)
595

    
596
  def testRemovingExistingHost(self):
597
    RemoveEtcHostsEntry(self.tmpname, 'router')
598

    
599
    self.assertFileContent(self.tmpname,
600
      "# This is a test file for /etc/hosts\n"
601
      "127.0.0.1\tlocalhost\n"
602
      "192.168.1.1 gw\n")
603
    self.assertFileMode(self.tmpname, 0644)
604

    
605
  def testRemovingSingleExistingHost(self):
606
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
607

    
608
    self.assertFileContent(self.tmpname,
609
      "# This is a test file for /etc/hosts\n"
610
      "192.168.1.1 router gw\n")
611
    self.assertFileMode(self.tmpname, 0644)
612

    
613
  def testRemovingNonExistingHost(self):
614
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
615

    
616
    self.assertFileContent(self.tmpname,
617
      "# This is a test file for /etc/hosts\n"
618
      "127.0.0.1\tlocalhost\n"
619
      "192.168.1.1 router gw\n")
620
    self.assertFileMode(self.tmpname, 0644)
621

    
622
  def testRemovingAlias(self):
623
    RemoveEtcHostsEntry(self.tmpname, 'gw')
624

    
625
    self.assertFileContent(self.tmpname,
626
      "# This is a test file for /etc/hosts\n"
627
      "127.0.0.1\tlocalhost\n"
628
      "192.168.1.1 router\n")
629
    self.assertFileMode(self.tmpname, 0644)
630

    
631

    
632
class TestShellQuoting(unittest.TestCase):
633
  """Test case for shell quoting functions"""
634

    
635
  def testShellQuote(self):
636
    self.assertEqual(ShellQuote('abc'), "abc")
637
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
638
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
639
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
640
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
641

    
642
  def testShellQuoteArgs(self):
643
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
644
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
645
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
646

    
647

    
648
class TestTcpPing(unittest.TestCase):
649
  """Testcase for TCP version of ping - against listen(2)ing port"""
650

    
651
  def setUp(self):
652
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
653
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
654
    self.listenerport = self.listener.getsockname()[1]
655
    self.listener.listen(1)
656

    
657
  def tearDown(self):
658
    self.listener.shutdown(socket.SHUT_RDWR)
659
    del self.listener
660
    del self.listenerport
661

    
662
  def testTcpPingToLocalHostAccept(self):
663
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
664
                         self.listenerport,
665
                         timeout=10,
666
                         live_port_needed=True,
667
                         source=constants.LOCALHOST_IP_ADDRESS,
668
                         ),
669
                 "failed to connect to test listener")
670

    
671
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
672
                         self.listenerport,
673
                         timeout=10,
674
                         live_port_needed=True,
675
                         ),
676
                 "failed to connect to test listener (no source)")
677

    
678

    
679
class TestTcpPingDeaf(unittest.TestCase):
680
  """Testcase for TCP version of ping - against non listen(2)ing port"""
681

    
682
  def setUp(self):
683
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
684
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
685
    self.deaflistenerport = self.deaflistener.getsockname()[1]
686

    
687
  def tearDown(self):
688
    del self.deaflistener
689
    del self.deaflistenerport
690

    
691
  def testTcpPingToLocalHostAcceptDeaf(self):
692
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
693
                        self.deaflistenerport,
694
                        timeout=constants.TCP_PING_TIMEOUT,
695
                        live_port_needed=True,
696
                        source=constants.LOCALHOST_IP_ADDRESS,
697
                        ), # need successful connect(2)
698
                "successfully connected to deaf listener")
699

    
700
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
701
                        self.deaflistenerport,
702
                        timeout=constants.TCP_PING_TIMEOUT,
703
                        live_port_needed=True,
704
                        ), # need successful connect(2)
705
                "successfully connected to deaf listener (no source addr)")
706

    
707
  def testTcpPingToLocalHostNoAccept(self):
708
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
709
                         self.deaflistenerport,
710
                         timeout=constants.TCP_PING_TIMEOUT,
711
                         live_port_needed=False,
712
                         source=constants.LOCALHOST_IP_ADDRESS,
713
                         ), # ECONNREFUSED is OK
714
                 "failed to ping alive host on deaf port")
715

    
716
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
717
                         self.deaflistenerport,
718
                         timeout=constants.TCP_PING_TIMEOUT,
719
                         live_port_needed=False,
720
                         ), # ECONNREFUSED is OK
721
                 "failed to ping alive host on deaf port (no source addr)")
722

    
723

    
724
class TestOwnIpAddress(unittest.TestCase):
725
  """Testcase for OwnIpAddress"""
726

    
727
  def testOwnLoopback(self):
728
    """check having the loopback ip"""
729
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
730
                    "Should own the loopback address")
731

    
732
  def testNowOwnAddress(self):
733
    """check that I don't own an address"""
734

    
735
    # network 192.0.2.0/24 is reserved for test/documentation as per
736
    # rfc 3330, so we *should* not have an address of this range... if
737
    # this fails, we should extend the test to multiple addresses
738
    DST_IP = "192.0.2.1"
739
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
740

    
741

    
742
class TestListVisibleFiles(unittest.TestCase):
743
  """Test case for ListVisibleFiles"""
744

    
745
  def setUp(self):
746
    self.path = tempfile.mkdtemp()
747

    
748
  def tearDown(self):
749
    shutil.rmtree(self.path)
750

    
751
  def _test(self, files, expected):
752
    # Sort a copy
753
    expected = expected[:]
754
    expected.sort()
755

    
756
    for name in files:
757
      f = open(os.path.join(self.path, name), 'w')
758
      try:
759
        f.write("Test\n")
760
      finally:
761
        f.close()
762

    
763
    found = ListVisibleFiles(self.path)
764
    found.sort()
765

    
766
    self.assertEqual(found, expected)
767

    
768
  def testAllVisible(self):
769
    files = ["a", "b", "c"]
770
    expected = files
771
    self._test(files, expected)
772

    
773
  def testNoneVisible(self):
774
    files = [".a", ".b", ".c"]
775
    expected = []
776
    self._test(files, expected)
777

    
778
  def testSomeVisible(self):
779
    files = ["a", "b", ".c"]
780
    expected = ["a", "b"]
781
    self._test(files, expected)
782

    
783

    
784
class TestNewUUID(unittest.TestCase):
785
  """Test case for NewUUID"""
786

    
787
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
788
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
789

    
790
  def runTest(self):
791
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
792

    
793

    
794
class TestUniqueSequence(unittest.TestCase):
795
  """Test case for UniqueSequence"""
796

    
797
  def _test(self, input, expected):
798
    self.assertEqual(utils.UniqueSequence(input), expected)
799

    
800
  def runTest(self):
801
    # Ordered input
802
    self._test([1, 2, 3], [1, 2, 3])
803
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
804
    self._test([1, 2, 2, 3], [1, 2, 3])
805
    self._test([1, 2, 3, 3], [1, 2, 3])
806

    
807
    # Unordered input
808
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
809
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
810

    
811
    # Strings
812
    self._test(["a", "a"], ["a"])
813
    self._test(["a", "b"], ["a", "b"])
814
    self._test(["a", "b", "a"], ["a", "b"])
815

    
816

    
817
class TestFirstFree(unittest.TestCase):
818
  """Test case for the FirstFree function"""
819

    
820
  def test(self):
821
    """Test FirstFree"""
822
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
823
    self.failUnlessEqual(FirstFree([]), None)
824
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
825
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
826
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
827

    
828

    
829
class TestTailFile(testutils.GanetiTestCase):
830
  """Test case for the TailFile function"""
831

    
832
  def testEmpty(self):
833
    fname = self._CreateTempFile()
834
    self.failUnlessEqual(TailFile(fname), [])
835
    self.failUnlessEqual(TailFile(fname, lines=25), [])
836

    
837
  def testAllLines(self):
838
    data = ["test %d" % i for i in range(30)]
839
    for i in range(30):
840
      fname = self._CreateTempFile()
841
      fd = open(fname, "w")
842
      fd.write("\n".join(data[:i]))
843
      if i > 0:
844
        fd.write("\n")
845
      fd.close()
846
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
847

    
848
  def testPartialLines(self):
849
    data = ["test %d" % i for i in range(30)]
850
    fname = self._CreateTempFile()
851
    fd = open(fname, "w")
852
    fd.write("\n".join(data))
853
    fd.write("\n")
854
    fd.close()
855
    for i in range(1, 30):
856
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
857

    
858
  def testBigFile(self):
859
    data = ["test %d" % i for i in range(30)]
860
    fname = self._CreateTempFile()
861
    fd = open(fname, "w")
862
    fd.write("X" * 1048576)
863
    fd.write("\n")
864
    fd.write("\n".join(data))
865
    fd.write("\n")
866
    fd.close()
867
    for i in range(1, 30):
868
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
869

    
870

    
871
class TestFileLock(unittest.TestCase):
872
  """Test case for the FileLock class"""
873

    
874
  def setUp(self):
875
    self.tmpfile = tempfile.NamedTemporaryFile()
876
    self.lock = utils.FileLock(self.tmpfile.name)
877

    
878
  def testSharedNonblocking(self):
879
    self.lock.Shared(blocking=False)
880
    self.lock.Close()
881

    
882
  def testExclusiveNonblocking(self):
883
    self.lock.Exclusive(blocking=False)
884
    self.lock.Close()
885

    
886
  def testUnlockNonblocking(self):
887
    self.lock.Unlock(blocking=False)
888
    self.lock.Close()
889

    
890
  def testSharedBlocking(self):
891
    self.lock.Shared(blocking=True)
892
    self.lock.Close()
893

    
894
  def testExclusiveBlocking(self):
895
    self.lock.Exclusive(blocking=True)
896
    self.lock.Close()
897

    
898
  def testUnlockBlocking(self):
899
    self.lock.Unlock(blocking=True)
900
    self.lock.Close()
901

    
902
  def testSharedExclusiveUnlock(self):
903
    self.lock.Shared(blocking=False)
904
    self.lock.Exclusive(blocking=False)
905
    self.lock.Unlock(blocking=False)
906
    self.lock.Close()
907

    
908
  def testExclusiveSharedUnlock(self):
909
    self.lock.Exclusive(blocking=False)
910
    self.lock.Shared(blocking=False)
911
    self.lock.Unlock(blocking=False)
912
    self.lock.Close()
913

    
914
  def testCloseShared(self):
915
    self.lock.Close()
916
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
917

    
918
  def testCloseExclusive(self):
919
    self.lock.Close()
920
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
921

    
922
  def testCloseUnlock(self):
923
    self.lock.Close()
924
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
925

    
926

    
927
class TestTimeFunctions(unittest.TestCase):
928
  """Test case for time functions"""
929

    
930
  def runTest(self):
931
    self.assertEqual(utils.SplitTime(1), (1, 0))
932
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
933
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
934
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
935
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
936
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
937
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
938
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
939

    
940
    self.assertRaises(AssertionError, utils.SplitTime, -1)
941

    
942
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
943
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
944
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
945

    
946
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
947
                     1218448917.481)
948
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
949

    
950
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
951
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
952
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
953
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
954
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
955

    
956

    
957
class FieldSetTestCase(unittest.TestCase):
958
  """Test case for FieldSets"""
959

    
960
  def testSimpleMatch(self):
961
    f = utils.FieldSet("a", "b", "c", "def")
962
    self.failUnless(f.Matches("a"))
963
    self.failIf(f.Matches("d"), "Substring matched")
964
    self.failIf(f.Matches("defghi"), "Prefix string matched")
965
    self.failIf(f.NonMatching(["b", "c"]))
966
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
967
    self.failUnless(f.NonMatching(["a", "d"]))
968

    
969
  def testRegexMatch(self):
970
    f = utils.FieldSet("a", "b([0-9]+)", "c")
971
    self.failUnless(f.Matches("b1"))
972
    self.failUnless(f.Matches("b99"))
973
    self.failIf(f.Matches("b/1"))
974
    self.failIf(f.NonMatching(["b12", "c"]))
975
    self.failUnless(f.NonMatching(["a", "1"]))
976

    
977
class TestForceDictType(unittest.TestCase):
978
  """Test case for ForceDictType"""
979

    
980
  def setUp(self):
981
    self.key_types = {
982
      'a': constants.VTYPE_INT,
983
      'b': constants.VTYPE_BOOL,
984
      'c': constants.VTYPE_STRING,
985
      'd': constants.VTYPE_SIZE,
986
      }
987

    
988
  def _fdt(self, dict, allowed_values=None):
989
    if allowed_values is None:
990
      ForceDictType(dict, self.key_types)
991
    else:
992
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
993

    
994
    return dict
995

    
996
  def testSimpleDict(self):
997
    self.assertEqual(self._fdt({}), {})
998
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
999
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1000
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1001
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1002
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1003
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1004
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1005
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1006
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1007
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1008
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1009

    
1010
  def testErrors(self):
1011
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1012
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1013
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1014
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1015

    
1016

    
1017
class TestIsAbsNormPath(unittest.TestCase):
1018
  """Testing case for IsProcessAlive"""
1019

    
1020
  def _pathTestHelper(self, path, result):
1021
    if result:
1022
      self.assert_(IsNormAbsPath(path),
1023
          "Path %s should result absolute and normalized" % path)
1024
    else:
1025
      self.assert_(not IsNormAbsPath(path),
1026
          "Path %s should not result absolute and normalized" % path)
1027

    
1028
  def testBase(self):
1029
    self._pathTestHelper('/etc', True)
1030
    self._pathTestHelper('/srv', True)
1031
    self._pathTestHelper('etc', False)
1032
    self._pathTestHelper('/etc/../root', False)
1033
    self._pathTestHelper('/etc/', False)
1034

    
1035

    
1036
class TestSafeEncode(unittest.TestCase):
1037
  """Test case for SafeEncode"""
1038

    
1039
  def testAscii(self):
1040
    for txt in [string.digits, string.letters, string.punctuation]:
1041
      self.failUnlessEqual(txt, SafeEncode(txt))
1042

    
1043
  def testDoubleEncode(self):
1044
    for i in range(255):
1045
      txt = SafeEncode(chr(i))
1046
      self.failUnlessEqual(txt, SafeEncode(txt))
1047

    
1048
  def testUnicode(self):
1049
    # 1024 is high enough to catch non-direct ASCII mappings
1050
    for i in range(1024):
1051
      txt = SafeEncode(unichr(i))
1052
      self.failUnlessEqual(txt, SafeEncode(txt))
1053

    
1054

    
1055
class TestFormatTime(unittest.TestCase):
1056
  """Testing case for FormatTime"""
1057

    
1058
  def testNone(self):
1059
    self.failUnlessEqual(FormatTime(None), "N/A")
1060

    
1061
  def testInvalid(self):
1062
    self.failUnlessEqual(FormatTime(()), "N/A")
1063

    
1064
  def testNow(self):
1065
    # tests that we accept time.time input
1066
    FormatTime(time.time())
1067
    # tests that we accept int input
1068
    FormatTime(int(time.time()))
1069

    
1070

    
1071
class RunInSeparateProcess(unittest.TestCase):
1072
  def test(self):
1073
    for exp in [True, False]:
1074
      def _child():
1075
        return exp
1076

    
1077
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1078

    
1079
  def testPid(self):
1080
    parent_pid = os.getpid()
1081

    
1082
    def _check():
1083
      return os.getpid() == parent_pid
1084

    
1085
    self.failIf(utils.RunInSeparateProcess(_check))
1086

    
1087
  def testSignal(self):
1088
    def _kill():
1089
      os.kill(os.getpid(), signal.SIGTERM)
1090

    
1091
    self.assertRaises(errors.GenericError,
1092
                      utils.RunInSeparateProcess, _kill)
1093

    
1094
  def testException(self):
1095
    def _exc():
1096
      raise errors.GenericError("This is a test")
1097

    
1098
    self.assertRaises(errors.GenericError,
1099
                      utils.RunInSeparateProcess, _exc)
1100

    
1101

    
1102
class TestFingerprintFile(unittest.TestCase):
1103
  def setUp(self):
1104
    self.tmpfile = tempfile.NamedTemporaryFile()
1105

    
1106
  def test(self):
1107
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1108
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1109

    
1110
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1111
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1112
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1113

    
1114

    
1115
class TestUnescapeAndSplit(unittest.TestCase):
1116
  """Testing case for UnescapeAndSplit"""
1117

    
1118
  def setUp(self):
1119
    # testing more that one separator for regexp safety
1120
    self._seps = [",", "+", "."]
1121

    
1122
  def testSimple(self):
1123
    a = ["a", "b", "c", "d"]
1124
    for sep in self._seps:
1125
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1126

    
1127
  def testEscape(self):
1128
    for sep in self._seps:
1129
      a = ["a", "b\\" + sep + "c", "d"]
1130
      b = ["a", "b" + sep + "c", "d"]
1131
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1132

    
1133
  def testDoubleEscape(self):
1134
    for sep in self._seps:
1135
      a = ["a", "b\\\\", "c", "d"]
1136
      b = ["a", "b\\", "c", "d"]
1137
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1138

    
1139
  def testThreeEscape(self):
1140
    for sep in self._seps:
1141
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1142
      b = ["a", "b\\" + sep + "c", "d"]
1143
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1144

    
1145

    
1146
class TestGenerateSelfSignedSslCert(unittest.TestCase):
1147
  def setUp(self):
1148
    self.tmpdir = tempfile.mkdtemp()
1149

    
1150
  def tearDown(self):
1151
    shutil.rmtree(self.tmpdir)
1152

    
1153
  def _checkPrivateRsaKey(self, key):
1154
    lines = key.splitlines()
1155
    self.assert_("-----BEGIN RSA PRIVATE KEY-----" in lines)
1156
    self.assert_("-----END RSA PRIVATE KEY-----" in lines)
1157

    
1158
  def _checkRsaCertificate(self, cert):
1159
    lines = cert.splitlines()
1160
    self.assert_("-----BEGIN CERTIFICATE-----" in lines)
1161
    self.assert_("-----END CERTIFICATE-----" in lines)
1162

    
1163
  def testSingleFile(self):
1164
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1165

    
1166
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1167

    
1168
    cert1 = utils.ReadFile(cert1_filename)
1169

    
1170
    self._checkPrivateRsaKey(cert1)
1171
    self._checkRsaCertificate(cert1)
1172

    
1173

    
1174
if __name__ == '__main__':
1175
  testutils.GanetiTestProgram()