Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ bdd5e420

History | View | Annotate | Download (40.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36
import string
37
import fcntl
38
import OpenSSL
39

    
40
import ganeti
41
import testutils
42
from ganeti import constants
43
from ganeti import utils
44
from ganeti import errors
45
from ganeti.utils import IsProcessAlive, RunCmd, \
46
     RemoveFile, MatchNameComponent, FormatUnit, \
47
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
48
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
49
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
50
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
51
     UnescapeAndSplit
52

    
53
from ganeti.errors import LockError, UnitParseError, GenericError, \
54
     ProgrammerError
55

    
56

    
57
class TestIsProcessAlive(unittest.TestCase):
58
  """Testing case for IsProcessAlive"""
59

    
60
  def testExists(self):
61
    mypid = os.getpid()
62
    self.assert_(IsProcessAlive(mypid),
63
                 "can't find myself running")
64

    
65
  def testNotExisting(self):
66
    pid_non_existing = os.fork()
67
    if pid_non_existing == 0:
68
      os._exit(0)
69
    elif pid_non_existing < 0:
70
      raise SystemError("can't fork")
71
    os.waitpid(pid_non_existing, 0)
72
    self.assert_(not IsProcessAlive(pid_non_existing),
73
                 "nonexisting process detected")
74

    
75

    
76
class TestPidFileFunctions(unittest.TestCase):
77
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
78

    
79
  def setUp(self):
80
    self.dir = tempfile.mkdtemp()
81
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
82
    utils.DaemonPidFileName = self.f_dpn
83

    
84
  def testPidFileFunctions(self):
85
    pid_file = self.f_dpn('test')
86
    utils.WritePidFile('test')
87
    self.failUnless(os.path.exists(pid_file),
88
                    "PID file should have been created")
89
    read_pid = utils.ReadPidFile(pid_file)
90
    self.failUnlessEqual(read_pid, os.getpid())
91
    self.failUnless(utils.IsProcessAlive(read_pid))
92
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
93
    utils.RemovePidFile('test')
94
    self.failIf(os.path.exists(pid_file),
95
                "PID file should not exist anymore")
96
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
97
                         "ReadPidFile should return 0 for missing pid file")
98
    fh = open(pid_file, "w")
99
    fh.write("blah\n")
100
    fh.close()
101
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
102
                         "ReadPidFile should return 0 for invalid pid file")
103
    utils.RemovePidFile('test')
104
    self.failIf(os.path.exists(pid_file),
105
                "PID file should not exist anymore")
106

    
107
  def testKill(self):
108
    pid_file = self.f_dpn('child')
109
    r_fd, w_fd = os.pipe()
110
    new_pid = os.fork()
111
    if new_pid == 0: #child
112
      utils.WritePidFile('child')
113
      os.write(w_fd, 'a')
114
      signal.pause()
115
      os._exit(0)
116
      return
117
    # else we are in the parent
118
    # wait until the child has written the pid file
119
    os.read(r_fd, 1)
120
    read_pid = utils.ReadPidFile(pid_file)
121
    self.failUnlessEqual(read_pid, new_pid)
122
    self.failUnless(utils.IsProcessAlive(new_pid))
123
    utils.KillProcess(new_pid, waitpid=True)
124
    self.failIf(utils.IsProcessAlive(new_pid))
125
    utils.RemovePidFile('child')
126
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
127

    
128
  def tearDown(self):
129
    for name in os.listdir(self.dir):
130
      os.unlink(os.path.join(self.dir, name))
131
    os.rmdir(self.dir)
132

    
133

    
134
class TestRunCmd(testutils.GanetiTestCase):
135
  """Testing case for the RunCmd function"""
136

    
137
  def setUp(self):
138
    testutils.GanetiTestCase.setUp(self)
139
    self.magic = time.ctime() + " ganeti test"
140
    self.fname = self._CreateTempFile()
141

    
142
  def testOk(self):
143
    """Test successful exit code"""
144
    result = RunCmd("/bin/sh -c 'exit 0'")
145
    self.assertEqual(result.exit_code, 0)
146
    self.assertEqual(result.output, "")
147

    
148
  def testFail(self):
149
    """Test fail exit code"""
150
    result = RunCmd("/bin/sh -c 'exit 1'")
151
    self.assertEqual(result.exit_code, 1)
152
    self.assertEqual(result.output, "")
153

    
154
  def testStdout(self):
155
    """Test standard output"""
156
    cmd = 'echo -n "%s"' % self.magic
157
    result = RunCmd("/bin/sh -c '%s'" % cmd)
158
    self.assertEqual(result.stdout, self.magic)
159
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
160
    self.assertEqual(result.output, "")
161
    self.assertFileContent(self.fname, self.magic)
162

    
163
  def testStderr(self):
164
    """Test standard error"""
165
    cmd = 'echo -n "%s"' % self.magic
166
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
167
    self.assertEqual(result.stderr, self.magic)
168
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
169
    self.assertEqual(result.output, "")
170
    self.assertFileContent(self.fname, self.magic)
171

    
172
  def testCombined(self):
173
    """Test combined output"""
174
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
175
    expected = "A" + self.magic + "B" + self.magic
176
    result = RunCmd("/bin/sh -c '%s'" % cmd)
177
    self.assertEqual(result.output, expected)
178
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
179
    self.assertEqual(result.output, "")
180
    self.assertFileContent(self.fname, expected)
181

    
182
  def testSignal(self):
183
    """Test signal"""
184
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
185
    self.assertEqual(result.signal, 15)
186
    self.assertEqual(result.output, "")
187

    
188
  def testListRun(self):
189
    """Test list runs"""
190
    result = RunCmd(["true"])
191
    self.assertEqual(result.signal, None)
192
    self.assertEqual(result.exit_code, 0)
193
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
194
    self.assertEqual(result.signal, None)
195
    self.assertEqual(result.exit_code, 1)
196
    result = RunCmd(["echo", "-n", self.magic])
197
    self.assertEqual(result.signal, None)
198
    self.assertEqual(result.exit_code, 0)
199
    self.assertEqual(result.stdout, self.magic)
200

    
201
  def testFileEmptyOutput(self):
202
    """Test file output"""
203
    result = RunCmd(["true"], output=self.fname)
204
    self.assertEqual(result.signal, None)
205
    self.assertEqual(result.exit_code, 0)
206
    self.assertFileContent(self.fname, "")
207

    
208
  def testLang(self):
209
    """Test locale environment"""
210
    old_env = os.environ.copy()
211
    try:
212
      os.environ["LANG"] = "en_US.UTF-8"
213
      os.environ["LC_ALL"] = "en_US.UTF-8"
214
      result = RunCmd(["locale"])
215
      for line in result.output.splitlines():
216
        key, value = line.split("=", 1)
217
        # Ignore these variables, they're overridden by LC_ALL
218
        if key == "LANG" or key == "LANGUAGE":
219
          continue
220
        self.failIf(value and value != "C" and value != '"C"',
221
            "Variable %s is set to the invalid value '%s'" % (key, value))
222
    finally:
223
      os.environ = old_env
224

    
225
  def testDefaultCwd(self):
226
    """Test default working directory"""
227
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
228

    
229
  def testCwd(self):
230
    """Test default working directory"""
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
232
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
233
    cwd = os.getcwd()
234
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
235

    
236

    
237
class TestSetCloseOnExecFlag(unittest.TestCase):
238
  """Tests for SetCloseOnExecFlag"""
239

    
240
  def setUp(self):
241
    self.tmpfile = tempfile.TemporaryFile()
242

    
243
  def testEnable(self):
244
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
245
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
246
                    fcntl.FD_CLOEXEC)
247

    
248
  def testDisable(self):
249
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
250
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
251
                fcntl.FD_CLOEXEC)
252

    
253

    
254
class TestSetNonblockFlag(unittest.TestCase):
255
  def setUp(self):
256
    self.tmpfile = tempfile.TemporaryFile()
257

    
258
  def testEnable(self):
259
    utils.SetNonblockFlag(self.tmpfile.fileno(), True)
260
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
261
                    os.O_NONBLOCK)
262

    
263
  def testDisable(self):
264
    utils.SetNonblockFlag(self.tmpfile.fileno(), False)
265
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
266
                os.O_NONBLOCK)
267

    
268

    
269
class TestRemoveFile(unittest.TestCase):
270
  """Test case for the RemoveFile function"""
271

    
272
  def setUp(self):
273
    """Create a temp dir and file for each case"""
274
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
275
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
276
    os.close(fd)
277

    
278
  def tearDown(self):
279
    if os.path.exists(self.tmpfile):
280
      os.unlink(self.tmpfile)
281
    os.rmdir(self.tmpdir)
282

    
283
  def testIgnoreDirs(self):
284
    """Test that RemoveFile() ignores directories"""
285
    self.assertEqual(None, RemoveFile(self.tmpdir))
286

    
287
  def testIgnoreNotExisting(self):
288
    """Test that RemoveFile() ignores non-existing files"""
289
    RemoveFile(self.tmpfile)
290
    RemoveFile(self.tmpfile)
291

    
292
  def testRemoveFile(self):
293
    """Test that RemoveFile does remove a file"""
294
    RemoveFile(self.tmpfile)
295
    if os.path.exists(self.tmpfile):
296
      self.fail("File '%s' not removed" % self.tmpfile)
297

    
298
  def testRemoveSymlink(self):
299
    """Test that RemoveFile does remove symlinks"""
300
    symlink = self.tmpdir + "/symlink"
301
    os.symlink("no-such-file", symlink)
302
    RemoveFile(symlink)
303
    if os.path.exists(symlink):
304
      self.fail("File '%s' not removed" % symlink)
305
    os.symlink(self.tmpfile, symlink)
306
    RemoveFile(symlink)
307
    if os.path.exists(symlink):
308
      self.fail("File '%s' not removed" % symlink)
309

    
310

    
311
class TestRename(unittest.TestCase):
312
  """Test case for RenameFile"""
313

    
314
  def setUp(self):
315
    """Create a temporary directory"""
316
    self.tmpdir = tempfile.mkdtemp()
317
    self.tmpfile = os.path.join(self.tmpdir, "test1")
318

    
319
    # Touch the file
320
    open(self.tmpfile, "w").close()
321

    
322
  def tearDown(self):
323
    """Remove temporary directory"""
324
    shutil.rmtree(self.tmpdir)
325

    
326
  def testSimpleRename1(self):
327
    """Simple rename 1"""
328
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
329
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
330

    
331
  def testSimpleRename2(self):
332
    """Simple rename 2"""
333
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
334
                     mkdir=True)
335
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
336

    
337
  def testRenameMkdir(self):
338
    """Rename with mkdir"""
339
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
340
                     mkdir=True)
341
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
342
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
343

    
344
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
345
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
346
                     mkdir=True)
347
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
348
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
349
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
350

    
351

    
352
class TestMatchNameComponent(unittest.TestCase):
353
  """Test case for the MatchNameComponent function"""
354

    
355
  def testEmptyList(self):
356
    """Test that there is no match against an empty list"""
357

    
358
    self.failUnlessEqual(MatchNameComponent("", []), None)
359
    self.failUnlessEqual(MatchNameComponent("test", []), None)
360

    
361
  def testSingleMatch(self):
362
    """Test that a single match is performed correctly"""
363
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
364
    for key in "test2", "test2.example", "test2.example.com":
365
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
366

    
367
  def testMultipleMatches(self):
368
    """Test that a multiple match is returned as None"""
369
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
370
    for key in "test1", "test1.example":
371
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
372

    
373
  def testFullMatch(self):
374
    """Test that a full match is returned correctly"""
375
    key1 = "test1"
376
    key2 = "test1.example"
377
    mlist = [key2, key2 + ".com"]
378
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
379
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
380

    
381
  def testCaseInsensitivePartialMatch(self):
382
    """Test for the case_insensitive keyword"""
383
    mlist = ["test1.example.com", "test2.example.net"]
384
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
385
                     "test2.example.net")
386
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
387
                     "test2.example.net")
388
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
389
                     "test2.example.net")
390
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
391
                     "test2.example.net")
392

    
393

    
394
  def testCaseInsensitiveFullMatch(self):
395
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
396
    # Between the two ts1 a full string match non-case insensitive should work
397
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
398
                     None)
399
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
400
                     "ts1.ex")
401
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
402
                     "ts1.ex")
403
    # Between the two ts2 only case differs, so only case-match works
404
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
405
                     "ts2.ex")
406
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
407
                     "Ts2.ex")
408
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
409
                     None)
410

    
411

    
412
class TestFormatUnit(unittest.TestCase):
413
  """Test case for the FormatUnit function"""
414

    
415
  def testMiB(self):
416
    self.assertEqual(FormatUnit(1, 'h'), '1M')
417
    self.assertEqual(FormatUnit(100, 'h'), '100M')
418
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
419

    
420
    self.assertEqual(FormatUnit(1, 'm'), '1')
421
    self.assertEqual(FormatUnit(100, 'm'), '100')
422
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
423

    
424
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
425
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
426
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
427
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
428

    
429
  def testGiB(self):
430
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
431
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
432
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
433
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
434

    
435
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
436
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
437
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
438
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
439

    
440
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
441
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
442
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
443

    
444
  def testTiB(self):
445
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
446
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
447
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
448

    
449
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
450
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
451
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
452

    
453
class TestParseUnit(unittest.TestCase):
454
  """Test case for the ParseUnit function"""
455

    
456
  SCALES = (('', 1),
457
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
458
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
459
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
460

    
461
  def testRounding(self):
462
    self.assertEqual(ParseUnit('0'), 0)
463
    self.assertEqual(ParseUnit('1'), 4)
464
    self.assertEqual(ParseUnit('2'), 4)
465
    self.assertEqual(ParseUnit('3'), 4)
466

    
467
    self.assertEqual(ParseUnit('124'), 124)
468
    self.assertEqual(ParseUnit('125'), 128)
469
    self.assertEqual(ParseUnit('126'), 128)
470
    self.assertEqual(ParseUnit('127'), 128)
471
    self.assertEqual(ParseUnit('128'), 128)
472
    self.assertEqual(ParseUnit('129'), 132)
473
    self.assertEqual(ParseUnit('130'), 132)
474

    
475
  def testFloating(self):
476
    self.assertEqual(ParseUnit('0'), 0)
477
    self.assertEqual(ParseUnit('0.5'), 4)
478
    self.assertEqual(ParseUnit('1.75'), 4)
479
    self.assertEqual(ParseUnit('1.99'), 4)
480
    self.assertEqual(ParseUnit('2.00'), 4)
481
    self.assertEqual(ParseUnit('2.01'), 4)
482
    self.assertEqual(ParseUnit('3.99'), 4)
483
    self.assertEqual(ParseUnit('4.00'), 4)
484
    self.assertEqual(ParseUnit('4.01'), 8)
485
    self.assertEqual(ParseUnit('1.5G'), 1536)
486
    self.assertEqual(ParseUnit('1.8G'), 1844)
487
    self.assertEqual(ParseUnit('8.28T'), 8682212)
488

    
489
  def testSuffixes(self):
490
    for sep in ('', ' ', '   ', "\t", "\t "):
491
      for suffix, scale in TestParseUnit.SCALES:
492
        for func in (lambda x: x, str.lower, str.upper):
493
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
494
                           1024 * scale)
495

    
496
  def testInvalidInput(self):
497
    for sep in ('-', '_', ',', 'a'):
498
      for suffix, _ in TestParseUnit.SCALES:
499
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
500

    
501
    for suffix, _ in TestParseUnit.SCALES:
502
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
503

    
504

    
505
class TestSshKeys(testutils.GanetiTestCase):
506
  """Test case for the AddAuthorizedKey function"""
507

    
508
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
509
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
510
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
511

    
512
  def setUp(self):
513
    testutils.GanetiTestCase.setUp(self)
514
    self.tmpname = self._CreateTempFile()
515
    handle = open(self.tmpname, 'w')
516
    try:
517
      handle.write("%s\n" % TestSshKeys.KEY_A)
518
      handle.write("%s\n" % TestSshKeys.KEY_B)
519
    finally:
520
      handle.close()
521

    
522
  def testAddingNewKey(self):
523
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
524

    
525
    self.assertFileContent(self.tmpname,
526
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
527
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
528
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
529
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
530

    
531
  def testAddingAlmostButNotCompletelyTheSameKey(self):
532
    AddAuthorizedKey(self.tmpname,
533
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
534

    
535
    self.assertFileContent(self.tmpname,
536
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
537
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
538
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
539
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
540

    
541
  def testAddingExistingKeyWithSomeMoreSpaces(self):
542
    AddAuthorizedKey(self.tmpname,
543
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
544

    
545
    self.assertFileContent(self.tmpname,
546
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
547
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
548
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
549

    
550
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
551
    RemoveAuthorizedKey(self.tmpname,
552
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
553

    
554
    self.assertFileContent(self.tmpname,
555
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
556
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
557

    
558
  def testRemovingNonExistingKey(self):
559
    RemoveAuthorizedKey(self.tmpname,
560
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
561

    
562
    self.assertFileContent(self.tmpname,
563
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
564
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
565
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
566

    
567

    
568
class TestEtcHosts(testutils.GanetiTestCase):
569
  """Test functions modifying /etc/hosts"""
570

    
571
  def setUp(self):
572
    testutils.GanetiTestCase.setUp(self)
573
    self.tmpname = self._CreateTempFile()
574
    handle = open(self.tmpname, 'w')
575
    try:
576
      handle.write('# This is a test file for /etc/hosts\n')
577
      handle.write('127.0.0.1\tlocalhost\n')
578
      handle.write('192.168.1.1 router gw\n')
579
    finally:
580
      handle.close()
581

    
582
  def testSettingNewIp(self):
583
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
584

    
585
    self.assertFileContent(self.tmpname,
586
      "# This is a test file for /etc/hosts\n"
587
      "127.0.0.1\tlocalhost\n"
588
      "192.168.1.1 router gw\n"
589
      "1.2.3.4\tmyhost.domain.tld myhost\n")
590
    self.assertFileMode(self.tmpname, 0644)
591

    
592
  def testSettingExistingIp(self):
593
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
594
                     ['myhost'])
595

    
596
    self.assertFileContent(self.tmpname,
597
      "# This is a test file for /etc/hosts\n"
598
      "127.0.0.1\tlocalhost\n"
599
      "192.168.1.1\tmyhost.domain.tld myhost\n")
600
    self.assertFileMode(self.tmpname, 0644)
601

    
602
  def testSettingDuplicateName(self):
603
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
604

    
605
    self.assertFileContent(self.tmpname,
606
      "# This is a test file for /etc/hosts\n"
607
      "127.0.0.1\tlocalhost\n"
608
      "192.168.1.1 router gw\n"
609
      "1.2.3.4\tmyhost\n")
610
    self.assertFileMode(self.tmpname, 0644)
611

    
612
  def testRemovingExistingHost(self):
613
    RemoveEtcHostsEntry(self.tmpname, 'router')
614

    
615
    self.assertFileContent(self.tmpname,
616
      "# This is a test file for /etc/hosts\n"
617
      "127.0.0.1\tlocalhost\n"
618
      "192.168.1.1 gw\n")
619
    self.assertFileMode(self.tmpname, 0644)
620

    
621
  def testRemovingSingleExistingHost(self):
622
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
623

    
624
    self.assertFileContent(self.tmpname,
625
      "# This is a test file for /etc/hosts\n"
626
      "192.168.1.1 router gw\n")
627
    self.assertFileMode(self.tmpname, 0644)
628

    
629
  def testRemovingNonExistingHost(self):
630
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
631

    
632
    self.assertFileContent(self.tmpname,
633
      "# This is a test file for /etc/hosts\n"
634
      "127.0.0.1\tlocalhost\n"
635
      "192.168.1.1 router gw\n")
636
    self.assertFileMode(self.tmpname, 0644)
637

    
638
  def testRemovingAlias(self):
639
    RemoveEtcHostsEntry(self.tmpname, 'gw')
640

    
641
    self.assertFileContent(self.tmpname,
642
      "# This is a test file for /etc/hosts\n"
643
      "127.0.0.1\tlocalhost\n"
644
      "192.168.1.1 router\n")
645
    self.assertFileMode(self.tmpname, 0644)
646

    
647

    
648
class TestShellQuoting(unittest.TestCase):
649
  """Test case for shell quoting functions"""
650

    
651
  def testShellQuote(self):
652
    self.assertEqual(ShellQuote('abc'), "abc")
653
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
654
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
655
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
656
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
657

    
658
  def testShellQuoteArgs(self):
659
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
660
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
661
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
662

    
663

    
664
class TestTcpPing(unittest.TestCase):
665
  """Testcase for TCP version of ping - against listen(2)ing port"""
666

    
667
  def setUp(self):
668
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
669
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
670
    self.listenerport = self.listener.getsockname()[1]
671
    self.listener.listen(1)
672

    
673
  def tearDown(self):
674
    self.listener.shutdown(socket.SHUT_RDWR)
675
    del self.listener
676
    del self.listenerport
677

    
678
  def testTcpPingToLocalHostAccept(self):
679
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
680
                         self.listenerport,
681
                         timeout=10,
682
                         live_port_needed=True,
683
                         source=constants.LOCALHOST_IP_ADDRESS,
684
                         ),
685
                 "failed to connect to test listener")
686

    
687
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
688
                         self.listenerport,
689
                         timeout=10,
690
                         live_port_needed=True,
691
                         ),
692
                 "failed to connect to test listener (no source)")
693

    
694

    
695
class TestTcpPingDeaf(unittest.TestCase):
696
  """Testcase for TCP version of ping - against non listen(2)ing port"""
697

    
698
  def setUp(self):
699
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
700
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
701
    self.deaflistenerport = self.deaflistener.getsockname()[1]
702

    
703
  def tearDown(self):
704
    del self.deaflistener
705
    del self.deaflistenerport
706

    
707
  def testTcpPingToLocalHostAcceptDeaf(self):
708
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
709
                        self.deaflistenerport,
710
                        timeout=constants.TCP_PING_TIMEOUT,
711
                        live_port_needed=True,
712
                        source=constants.LOCALHOST_IP_ADDRESS,
713
                        ), # need successful connect(2)
714
                "successfully connected to deaf listener")
715

    
716
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
717
                        self.deaflistenerport,
718
                        timeout=constants.TCP_PING_TIMEOUT,
719
                        live_port_needed=True,
720
                        ), # need successful connect(2)
721
                "successfully connected to deaf listener (no source addr)")
722

    
723
  def testTcpPingToLocalHostNoAccept(self):
724
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
725
                         self.deaflistenerport,
726
                         timeout=constants.TCP_PING_TIMEOUT,
727
                         live_port_needed=False,
728
                         source=constants.LOCALHOST_IP_ADDRESS,
729
                         ), # ECONNREFUSED is OK
730
                 "failed to ping alive host on deaf port")
731

    
732
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
733
                         self.deaflistenerport,
734
                         timeout=constants.TCP_PING_TIMEOUT,
735
                         live_port_needed=False,
736
                         ), # ECONNREFUSED is OK
737
                 "failed to ping alive host on deaf port (no source addr)")
738

    
739

    
740
class TestOwnIpAddress(unittest.TestCase):
741
  """Testcase for OwnIpAddress"""
742

    
743
  def testOwnLoopback(self):
744
    """check having the loopback ip"""
745
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
746
                    "Should own the loopback address")
747

    
748
  def testNowOwnAddress(self):
749
    """check that I don't own an address"""
750

    
751
    # network 192.0.2.0/24 is reserved for test/documentation as per
752
    # rfc 3330, so we *should* not have an address of this range... if
753
    # this fails, we should extend the test to multiple addresses
754
    DST_IP = "192.0.2.1"
755
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
756

    
757

    
758
class TestListVisibleFiles(unittest.TestCase):
759
  """Test case for ListVisibleFiles"""
760

    
761
  def setUp(self):
762
    self.path = tempfile.mkdtemp()
763

    
764
  def tearDown(self):
765
    shutil.rmtree(self.path)
766

    
767
  def _test(self, files, expected):
768
    # Sort a copy
769
    expected = expected[:]
770
    expected.sort()
771

    
772
    for name in files:
773
      f = open(os.path.join(self.path, name), 'w')
774
      try:
775
        f.write("Test\n")
776
      finally:
777
        f.close()
778

    
779
    found = ListVisibleFiles(self.path)
780
    found.sort()
781

    
782
    self.assertEqual(found, expected)
783

    
784
  def testAllVisible(self):
785
    files = ["a", "b", "c"]
786
    expected = files
787
    self._test(files, expected)
788

    
789
  def testNoneVisible(self):
790
    files = [".a", ".b", ".c"]
791
    expected = []
792
    self._test(files, expected)
793

    
794
  def testSomeVisible(self):
795
    files = ["a", "b", ".c"]
796
    expected = ["a", "b"]
797
    self._test(files, expected)
798

    
799

    
800
class TestNewUUID(unittest.TestCase):
801
  """Test case for NewUUID"""
802

    
803
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
804
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
805

    
806
  def runTest(self):
807
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
808

    
809

    
810
class TestUniqueSequence(unittest.TestCase):
811
  """Test case for UniqueSequence"""
812

    
813
  def _test(self, input, expected):
814
    self.assertEqual(utils.UniqueSequence(input), expected)
815

    
816
  def runTest(self):
817
    # Ordered input
818
    self._test([1, 2, 3], [1, 2, 3])
819
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
820
    self._test([1, 2, 2, 3], [1, 2, 3])
821
    self._test([1, 2, 3, 3], [1, 2, 3])
822

    
823
    # Unordered input
824
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
825
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
826

    
827
    # Strings
828
    self._test(["a", "a"], ["a"])
829
    self._test(["a", "b"], ["a", "b"])
830
    self._test(["a", "b", "a"], ["a", "b"])
831

    
832

    
833
class TestFirstFree(unittest.TestCase):
834
  """Test case for the FirstFree function"""
835

    
836
  def test(self):
837
    """Test FirstFree"""
838
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
839
    self.failUnlessEqual(FirstFree([]), None)
840
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
841
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
842
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
843

    
844

    
845
class TestTailFile(testutils.GanetiTestCase):
846
  """Test case for the TailFile function"""
847

    
848
  def testEmpty(self):
849
    fname = self._CreateTempFile()
850
    self.failUnlessEqual(TailFile(fname), [])
851
    self.failUnlessEqual(TailFile(fname, lines=25), [])
852

    
853
  def testAllLines(self):
854
    data = ["test %d" % i for i in range(30)]
855
    for i in range(30):
856
      fname = self._CreateTempFile()
857
      fd = open(fname, "w")
858
      fd.write("\n".join(data[:i]))
859
      if i > 0:
860
        fd.write("\n")
861
      fd.close()
862
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
863

    
864
  def testPartialLines(self):
865
    data = ["test %d" % i for i in range(30)]
866
    fname = self._CreateTempFile()
867
    fd = open(fname, "w")
868
    fd.write("\n".join(data))
869
    fd.write("\n")
870
    fd.close()
871
    for i in range(1, 30):
872
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
873

    
874
  def testBigFile(self):
875
    data = ["test %d" % i for i in range(30)]
876
    fname = self._CreateTempFile()
877
    fd = open(fname, "w")
878
    fd.write("X" * 1048576)
879
    fd.write("\n")
880
    fd.write("\n".join(data))
881
    fd.write("\n")
882
    fd.close()
883
    for i in range(1, 30):
884
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
885

    
886

    
887
class TestFileLock(unittest.TestCase):
888
  """Test case for the FileLock class"""
889

    
890
  def setUp(self):
891
    self.tmpfile = tempfile.NamedTemporaryFile()
892
    self.lock = utils.FileLock(self.tmpfile.name)
893

    
894
  def testSharedNonblocking(self):
895
    self.lock.Shared(blocking=False)
896
    self.lock.Close()
897

    
898
  def testExclusiveNonblocking(self):
899
    self.lock.Exclusive(blocking=False)
900
    self.lock.Close()
901

    
902
  def testUnlockNonblocking(self):
903
    self.lock.Unlock(blocking=False)
904
    self.lock.Close()
905

    
906
  def testSharedBlocking(self):
907
    self.lock.Shared(blocking=True)
908
    self.lock.Close()
909

    
910
  def testExclusiveBlocking(self):
911
    self.lock.Exclusive(blocking=True)
912
    self.lock.Close()
913

    
914
  def testUnlockBlocking(self):
915
    self.lock.Unlock(blocking=True)
916
    self.lock.Close()
917

    
918
  def testSharedExclusiveUnlock(self):
919
    self.lock.Shared(blocking=False)
920
    self.lock.Exclusive(blocking=False)
921
    self.lock.Unlock(blocking=False)
922
    self.lock.Close()
923

    
924
  def testExclusiveSharedUnlock(self):
925
    self.lock.Exclusive(blocking=False)
926
    self.lock.Shared(blocking=False)
927
    self.lock.Unlock(blocking=False)
928
    self.lock.Close()
929

    
930
  def testCloseShared(self):
931
    self.lock.Close()
932
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
933

    
934
  def testCloseExclusive(self):
935
    self.lock.Close()
936
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
937

    
938
  def testCloseUnlock(self):
939
    self.lock.Close()
940
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
941

    
942

    
943
class TestTimeFunctions(unittest.TestCase):
944
  """Test case for time functions"""
945

    
946
  def runTest(self):
947
    self.assertEqual(utils.SplitTime(1), (1, 0))
948
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
949
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
950
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
951
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
952
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
953
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
954
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
955

    
956
    self.assertRaises(AssertionError, utils.SplitTime, -1)
957

    
958
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
959
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
960
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
961

    
962
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
963
                     1218448917.481)
964
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
965

    
966
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
967
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
968
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
969
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
970
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
971

    
972

    
973
class FieldSetTestCase(unittest.TestCase):
974
  """Test case for FieldSets"""
975

    
976
  def testSimpleMatch(self):
977
    f = utils.FieldSet("a", "b", "c", "def")
978
    self.failUnless(f.Matches("a"))
979
    self.failIf(f.Matches("d"), "Substring matched")
980
    self.failIf(f.Matches("defghi"), "Prefix string matched")
981
    self.failIf(f.NonMatching(["b", "c"]))
982
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
983
    self.failUnless(f.NonMatching(["a", "d"]))
984

    
985
  def testRegexMatch(self):
986
    f = utils.FieldSet("a", "b([0-9]+)", "c")
987
    self.failUnless(f.Matches("b1"))
988
    self.failUnless(f.Matches("b99"))
989
    self.failIf(f.Matches("b/1"))
990
    self.failIf(f.NonMatching(["b12", "c"]))
991
    self.failUnless(f.NonMatching(["a", "1"]))
992

    
993
class TestForceDictType(unittest.TestCase):
994
  """Test case for ForceDictType"""
995

    
996
  def setUp(self):
997
    self.key_types = {
998
      'a': constants.VTYPE_INT,
999
      'b': constants.VTYPE_BOOL,
1000
      'c': constants.VTYPE_STRING,
1001
      'd': constants.VTYPE_SIZE,
1002
      }
1003

    
1004
  def _fdt(self, dict, allowed_values=None):
1005
    if allowed_values is None:
1006
      ForceDictType(dict, self.key_types)
1007
    else:
1008
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1009

    
1010
    return dict
1011

    
1012
  def testSimpleDict(self):
1013
    self.assertEqual(self._fdt({}), {})
1014
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1015
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1016
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1017
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1018
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1019
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1020
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1021
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1022
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1023
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1024
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1025

    
1026
  def testErrors(self):
1027
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1028
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1029
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1030
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1031

    
1032

    
1033
class TestIsAbsNormPath(unittest.TestCase):
1034
  """Testing case for IsProcessAlive"""
1035

    
1036
  def _pathTestHelper(self, path, result):
1037
    if result:
1038
      self.assert_(IsNormAbsPath(path),
1039
          "Path %s should result absolute and normalized" % path)
1040
    else:
1041
      self.assert_(not IsNormAbsPath(path),
1042
          "Path %s should not result absolute and normalized" % path)
1043

    
1044
  def testBase(self):
1045
    self._pathTestHelper('/etc', True)
1046
    self._pathTestHelper('/srv', True)
1047
    self._pathTestHelper('etc', False)
1048
    self._pathTestHelper('/etc/../root', False)
1049
    self._pathTestHelper('/etc/', False)
1050

    
1051

    
1052
class TestSafeEncode(unittest.TestCase):
1053
  """Test case for SafeEncode"""
1054

    
1055
  def testAscii(self):
1056
    for txt in [string.digits, string.letters, string.punctuation]:
1057
      self.failUnlessEqual(txt, SafeEncode(txt))
1058

    
1059
  def testDoubleEncode(self):
1060
    for i in range(255):
1061
      txt = SafeEncode(chr(i))
1062
      self.failUnlessEqual(txt, SafeEncode(txt))
1063

    
1064
  def testUnicode(self):
1065
    # 1024 is high enough to catch non-direct ASCII mappings
1066
    for i in range(1024):
1067
      txt = SafeEncode(unichr(i))
1068
      self.failUnlessEqual(txt, SafeEncode(txt))
1069

    
1070

    
1071
class TestFormatTime(unittest.TestCase):
1072
  """Testing case for FormatTime"""
1073

    
1074
  def testNone(self):
1075
    self.failUnlessEqual(FormatTime(None), "N/A")
1076

    
1077
  def testInvalid(self):
1078
    self.failUnlessEqual(FormatTime(()), "N/A")
1079

    
1080
  def testNow(self):
1081
    # tests that we accept time.time input
1082
    FormatTime(time.time())
1083
    # tests that we accept int input
1084
    FormatTime(int(time.time()))
1085

    
1086

    
1087
class RunInSeparateProcess(unittest.TestCase):
1088
  def test(self):
1089
    for exp in [True, False]:
1090
      def _child():
1091
        return exp
1092

    
1093
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1094

    
1095
  def testPid(self):
1096
    parent_pid = os.getpid()
1097

    
1098
    def _check():
1099
      return os.getpid() == parent_pid
1100

    
1101
    self.failIf(utils.RunInSeparateProcess(_check))
1102

    
1103
  def testSignal(self):
1104
    def _kill():
1105
      os.kill(os.getpid(), signal.SIGTERM)
1106

    
1107
    self.assertRaises(errors.GenericError,
1108
                      utils.RunInSeparateProcess, _kill)
1109

    
1110
  def testException(self):
1111
    def _exc():
1112
      raise errors.GenericError("This is a test")
1113

    
1114
    self.assertRaises(errors.GenericError,
1115
                      utils.RunInSeparateProcess, _exc)
1116

    
1117

    
1118
class TestFingerprintFile(unittest.TestCase):
1119
  def setUp(self):
1120
    self.tmpfile = tempfile.NamedTemporaryFile()
1121

    
1122
  def test(self):
1123
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1124
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1125

    
1126
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1127
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1128
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1129

    
1130

    
1131
class TestUnescapeAndSplit(unittest.TestCase):
1132
  """Testing case for UnescapeAndSplit"""
1133

    
1134
  def setUp(self):
1135
    # testing more that one separator for regexp safety
1136
    self._seps = [",", "+", "."]
1137

    
1138
  def testSimple(self):
1139
    a = ["a", "b", "c", "d"]
1140
    for sep in self._seps:
1141
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1142

    
1143
  def testEscape(self):
1144
    for sep in self._seps:
1145
      a = ["a", "b\\" + sep + "c", "d"]
1146
      b = ["a", "b" + sep + "c", "d"]
1147
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1148

    
1149
  def testDoubleEscape(self):
1150
    for sep in self._seps:
1151
      a = ["a", "b\\\\", "c", "d"]
1152
      b = ["a", "b\\", "c", "d"]
1153
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1154

    
1155
  def testThreeEscape(self):
1156
    for sep in self._seps:
1157
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1158
      b = ["a", "b\\" + sep + "c", "d"]
1159
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1160

    
1161

    
1162
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1163
  def setUp(self):
1164
    self.tmpdir = tempfile.mkdtemp()
1165

    
1166
  def tearDown(self):
1167
    shutil.rmtree(self.tmpdir)
1168

    
1169
  def _checkRsaPrivateKey(self, key):
1170
    lines = key.splitlines()
1171
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1172
            "-----END RSA PRIVATE KEY-----" in lines)
1173

    
1174
  def _checkCertificate(self, cert):
1175
    lines = cert.splitlines()
1176
    return ("-----BEGIN CERTIFICATE-----" in lines and
1177
            "-----END CERTIFICATE-----" in lines)
1178

    
1179
  def test(self):
1180
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1181
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1182
      self._checkRsaPrivateKey(key_pem)
1183
      self._checkCertificate(cert_pem)
1184

    
1185
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1186
                                           key_pem)
1187
      self.assert_(key.bits() >= 1024)
1188
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1189
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1190

    
1191
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1192
                                             cert_pem)
1193
      self.failIf(x509.has_expired())
1194
      self.assertEqual(x509.get_issuer().CN, common_name)
1195
      self.assertEqual(x509.get_subject().CN, common_name)
1196
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1197

    
1198
  def testLegacy(self):
1199
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1200

    
1201
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1202

    
1203
    cert1 = utils.ReadFile(cert1_filename)
1204

    
1205
    self.assert_(self._checkRsaPrivateKey(cert1))
1206
    self.assert_(self._checkCertificate(cert1))
1207

    
1208

    
1209
if __name__ == '__main__':
1210
  testutils.GanetiTestProgram()