Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ e587b46a

History | View | Annotate | Download (59.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import stat
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36
import string
37
import OpenSSL
38
import warnings
39
import distutils.version
40
import glob
41
import md5
42

    
43
import ganeti
44
import testutils
45
from ganeti import constants
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import serializer
49
from ganeti.utils import IsProcessAlive, RunCmd, \
50
     RemoveFile, MatchNameComponent, FormatUnit, \
51
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
52
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
53
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
54
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
55
     UnescapeAndSplit, RunParts, PathJoin, HostInfo, ReadOneLineFile
56

    
57
from ganeti.errors import LockError, UnitParseError, GenericError, \
58
     ProgrammerError, OpPrereqError
59

    
60

    
61
class TestIsProcessAlive(unittest.TestCase):
62
  """Testing case for IsProcessAlive"""
63

    
64
  def testExists(self):
65
    mypid = os.getpid()
66
    self.assert_(IsProcessAlive(mypid),
67
                 "can't find myself running")
68

    
69
  def testNotExisting(self):
70
    pid_non_existing = os.fork()
71
    if pid_non_existing == 0:
72
      os._exit(0)
73
    elif pid_non_existing < 0:
74
      raise SystemError("can't fork")
75
    os.waitpid(pid_non_existing, 0)
76
    self.assert_(not IsProcessAlive(pid_non_existing),
77
                 "nonexisting process detected")
78

    
79

    
80
class TestPidFileFunctions(unittest.TestCase):
81
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
82

    
83
  def setUp(self):
84
    self.dir = tempfile.mkdtemp()
85
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
86
    utils.DaemonPidFileName = self.f_dpn
87

    
88
  def testPidFileFunctions(self):
89
    pid_file = self.f_dpn('test')
90
    utils.WritePidFile('test')
91
    self.failUnless(os.path.exists(pid_file),
92
                    "PID file should have been created")
93
    read_pid = utils.ReadPidFile(pid_file)
94
    self.failUnlessEqual(read_pid, os.getpid())
95
    self.failUnless(utils.IsProcessAlive(read_pid))
96
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
97
    utils.RemovePidFile('test')
98
    self.failIf(os.path.exists(pid_file),
99
                "PID file should not exist anymore")
100
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
101
                         "ReadPidFile should return 0 for missing pid file")
102
    fh = open(pid_file, "w")
103
    fh.write("blah\n")
104
    fh.close()
105
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
106
                         "ReadPidFile should return 0 for invalid pid file")
107
    utils.RemovePidFile('test')
108
    self.failIf(os.path.exists(pid_file),
109
                "PID file should not exist anymore")
110

    
111
  def testKill(self):
112
    pid_file = self.f_dpn('child')
113
    r_fd, w_fd = os.pipe()
114
    new_pid = os.fork()
115
    if new_pid == 0: #child
116
      utils.WritePidFile('child')
117
      os.write(w_fd, 'a')
118
      signal.pause()
119
      os._exit(0)
120
      return
121
    # else we are in the parent
122
    # wait until the child has written the pid file
123
    os.read(r_fd, 1)
124
    read_pid = utils.ReadPidFile(pid_file)
125
    self.failUnlessEqual(read_pid, new_pid)
126
    self.failUnless(utils.IsProcessAlive(new_pid))
127
    utils.KillProcess(new_pid, waitpid=True)
128
    self.failIf(utils.IsProcessAlive(new_pid))
129
    utils.RemovePidFile('child')
130
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
131

    
132
  def tearDown(self):
133
    for name in os.listdir(self.dir):
134
      os.unlink(os.path.join(self.dir, name))
135
    os.rmdir(self.dir)
136

    
137

    
138
class TestRunCmd(testutils.GanetiTestCase):
139
  """Testing case for the RunCmd function"""
140

    
141
  def setUp(self):
142
    testutils.GanetiTestCase.setUp(self)
143
    self.magic = time.ctime() + " ganeti test"
144
    self.fname = self._CreateTempFile()
145

    
146
  def testOk(self):
147
    """Test successful exit code"""
148
    result = RunCmd("/bin/sh -c 'exit 0'")
149
    self.assertEqual(result.exit_code, 0)
150
    self.assertEqual(result.output, "")
151

    
152
  def testFail(self):
153
    """Test fail exit code"""
154
    result = RunCmd("/bin/sh -c 'exit 1'")
155
    self.assertEqual(result.exit_code, 1)
156
    self.assertEqual(result.output, "")
157

    
158
  def testStdout(self):
159
    """Test standard output"""
160
    cmd = 'echo -n "%s"' % self.magic
161
    result = RunCmd("/bin/sh -c '%s'" % cmd)
162
    self.assertEqual(result.stdout, self.magic)
163
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
164
    self.assertEqual(result.output, "")
165
    self.assertFileContent(self.fname, self.magic)
166

    
167
  def testStderr(self):
168
    """Test standard error"""
169
    cmd = 'echo -n "%s"' % self.magic
170
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
171
    self.assertEqual(result.stderr, self.magic)
172
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
173
    self.assertEqual(result.output, "")
174
    self.assertFileContent(self.fname, self.magic)
175

    
176
  def testCombined(self):
177
    """Test combined output"""
178
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
179
    expected = "A" + self.magic + "B" + self.magic
180
    result = RunCmd("/bin/sh -c '%s'" % cmd)
181
    self.assertEqual(result.output, expected)
182
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
183
    self.assertEqual(result.output, "")
184
    self.assertFileContent(self.fname, expected)
185

    
186
  def testSignal(self):
187
    """Test signal"""
188
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
189
    self.assertEqual(result.signal, 15)
190
    self.assertEqual(result.output, "")
191

    
192
  def testListRun(self):
193
    """Test list runs"""
194
    result = RunCmd(["true"])
195
    self.assertEqual(result.signal, None)
196
    self.assertEqual(result.exit_code, 0)
197
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
198
    self.assertEqual(result.signal, None)
199
    self.assertEqual(result.exit_code, 1)
200
    result = RunCmd(["echo", "-n", self.magic])
201
    self.assertEqual(result.signal, None)
202
    self.assertEqual(result.exit_code, 0)
203
    self.assertEqual(result.stdout, self.magic)
204

    
205
  def testFileEmptyOutput(self):
206
    """Test file output"""
207
    result = RunCmd(["true"], output=self.fname)
208
    self.assertEqual(result.signal, None)
209
    self.assertEqual(result.exit_code, 0)
210
    self.assertFileContent(self.fname, "")
211

    
212
  def testLang(self):
213
    """Test locale environment"""
214
    old_env = os.environ.copy()
215
    try:
216
      os.environ["LANG"] = "en_US.UTF-8"
217
      os.environ["LC_ALL"] = "en_US.UTF-8"
218
      result = RunCmd(["locale"])
219
      for line in result.output.splitlines():
220
        key, value = line.split("=", 1)
221
        # Ignore these variables, they're overridden by LC_ALL
222
        if key == "LANG" or key == "LANGUAGE":
223
          continue
224
        self.failIf(value and value != "C" and value != '"C"',
225
            "Variable %s is set to the invalid value '%s'" % (key, value))
226
    finally:
227
      os.environ = old_env
228

    
229
  def testDefaultCwd(self):
230
    """Test default working directory"""
231
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
232

    
233
  def testCwd(self):
234
    """Test default working directory"""
235
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
236
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
237
    cwd = os.getcwd()
238
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
239

    
240
  def testResetEnv(self):
241
    """Test environment reset functionality"""
242
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
243
    self.failUnlessEqual(RunCmd(["env"], reset_env=True,
244
                                env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
245

    
246

    
247
class TestRunParts(unittest.TestCase):
248
  """Testing case for the RunParts function"""
249

    
250
  def setUp(self):
251
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
252

    
253
  def tearDown(self):
254
    shutil.rmtree(self.rundir)
255

    
256
  def testEmpty(self):
257
    """Test on an empty dir"""
258
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
259

    
260
  def testSkipWrongName(self):
261
    """Test that wrong files are skipped"""
262
    fname = os.path.join(self.rundir, "00test.dot")
263
    utils.WriteFile(fname, data="")
264
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
265
    relname = os.path.basename(fname)
266
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
267
                         [(relname, constants.RUNPARTS_SKIP, None)])
268

    
269
  def testSkipNonExec(self):
270
    """Test that non executable files are skipped"""
271
    fname = os.path.join(self.rundir, "00test")
272
    utils.WriteFile(fname, data="")
273
    relname = os.path.basename(fname)
274
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
275
                         [(relname, constants.RUNPARTS_SKIP, None)])
276

    
277
  def testError(self):
278
    """Test error on a broken executable"""
279
    fname = os.path.join(self.rundir, "00test")
280
    utils.WriteFile(fname, data="")
281
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
282
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
283
    self.failUnlessEqual(relname, os.path.basename(fname))
284
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
285
    self.failUnless(error)
286

    
287
  def testSorted(self):
288
    """Test executions are sorted"""
289
    files = []
290
    files.append(os.path.join(self.rundir, "64test"))
291
    files.append(os.path.join(self.rundir, "00test"))
292
    files.append(os.path.join(self.rundir, "42test"))
293

    
294
    for fname in files:
295
      utils.WriteFile(fname, data="")
296

    
297
    results = RunParts(self.rundir, reset_env=True)
298

    
299
    for fname in sorted(files):
300
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
301

    
302
  def testOk(self):
303
    """Test correct execution"""
304
    fname = os.path.join(self.rundir, "00test")
305
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
306
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
307
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
308
    self.failUnlessEqual(relname, os.path.basename(fname))
309
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
310
    self.failUnlessEqual(runresult.stdout, "ciao")
311

    
312
  def testRunFail(self):
313
    """Test correct execution, with run failure"""
314
    fname = os.path.join(self.rundir, "00test")
315
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
316
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
317
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
318
    self.failUnlessEqual(relname, os.path.basename(fname))
319
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
320
    self.failUnlessEqual(runresult.exit_code, 1)
321
    self.failUnless(runresult.failed)
322

    
323
  def testRunMix(self):
324
    files = []
325
    files.append(os.path.join(self.rundir, "00test"))
326
    files.append(os.path.join(self.rundir, "42test"))
327
    files.append(os.path.join(self.rundir, "64test"))
328
    files.append(os.path.join(self.rundir, "99test"))
329

    
330
    files.sort()
331

    
332
    # 1st has errors in execution
333
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
334
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
335

    
336
    # 2nd is skipped
337
    utils.WriteFile(files[1], data="")
338

    
339
    # 3rd cannot execute properly
340
    utils.WriteFile(files[2], data="")
341
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
342

    
343
    # 4th execs
344
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
345
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
346

    
347
    results = RunParts(self.rundir, reset_env=True)
348

    
349
    (relname, status, runresult) = results[0]
350
    self.failUnlessEqual(relname, os.path.basename(files[0]))
351
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
352
    self.failUnlessEqual(runresult.exit_code, 1)
353
    self.failUnless(runresult.failed)
354

    
355
    (relname, status, runresult) = results[1]
356
    self.failUnlessEqual(relname, os.path.basename(files[1]))
357
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
358
    self.failUnlessEqual(runresult, None)
359

    
360
    (relname, status, runresult) = results[2]
361
    self.failUnlessEqual(relname, os.path.basename(files[2]))
362
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
363
    self.failUnless(runresult)
364

    
365
    (relname, status, runresult) = results[3]
366
    self.failUnlessEqual(relname, os.path.basename(files[3]))
367
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
368
    self.failUnlessEqual(runresult.output, "ciao")
369
    self.failUnlessEqual(runresult.exit_code, 0)
370
    self.failUnless(not runresult.failed)
371

    
372

    
373
class TestRemoveFile(unittest.TestCase):
374
  """Test case for the RemoveFile function"""
375

    
376
  def setUp(self):
377
    """Create a temp dir and file for each case"""
378
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
379
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
380
    os.close(fd)
381

    
382
  def tearDown(self):
383
    if os.path.exists(self.tmpfile):
384
      os.unlink(self.tmpfile)
385
    os.rmdir(self.tmpdir)
386

    
387

    
388
  def testIgnoreDirs(self):
389
    """Test that RemoveFile() ignores directories"""
390
    self.assertEqual(None, RemoveFile(self.tmpdir))
391

    
392

    
393
  def testIgnoreNotExisting(self):
394
    """Test that RemoveFile() ignores non-existing files"""
395
    RemoveFile(self.tmpfile)
396
    RemoveFile(self.tmpfile)
397

    
398

    
399
  def testRemoveFile(self):
400
    """Test that RemoveFile does remove a file"""
401
    RemoveFile(self.tmpfile)
402
    if os.path.exists(self.tmpfile):
403
      self.fail("File '%s' not removed" % self.tmpfile)
404

    
405

    
406
  def testRemoveSymlink(self):
407
    """Test that RemoveFile does remove symlinks"""
408
    symlink = self.tmpdir + "/symlink"
409
    os.symlink("no-such-file", symlink)
410
    RemoveFile(symlink)
411
    if os.path.exists(symlink):
412
      self.fail("File '%s' not removed" % symlink)
413
    os.symlink(self.tmpfile, symlink)
414
    RemoveFile(symlink)
415
    if os.path.exists(symlink):
416
      self.fail("File '%s' not removed" % symlink)
417

    
418

    
419
class TestRename(unittest.TestCase):
420
  """Test case for RenameFile"""
421

    
422
  def setUp(self):
423
    """Create a temporary directory"""
424
    self.tmpdir = tempfile.mkdtemp()
425
    self.tmpfile = os.path.join(self.tmpdir, "test1")
426

    
427
    # Touch the file
428
    open(self.tmpfile, "w").close()
429

    
430
  def tearDown(self):
431
    """Remove temporary directory"""
432
    shutil.rmtree(self.tmpdir)
433

    
434
  def testSimpleRename1(self):
435
    """Simple rename 1"""
436
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
437
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
438

    
439
  def testSimpleRename2(self):
440
    """Simple rename 2"""
441
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
442
                     mkdir=True)
443
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
444

    
445
  def testRenameMkdir(self):
446
    """Rename with mkdir"""
447
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
448
                     mkdir=True)
449
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
450
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
451

    
452
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
453
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
454
                     mkdir=True)
455
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
456
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
457
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
458

    
459

    
460
class TestMatchNameComponent(unittest.TestCase):
461
  """Test case for the MatchNameComponent function"""
462

    
463
  def testEmptyList(self):
464
    """Test that there is no match against an empty list"""
465

    
466
    self.failUnlessEqual(MatchNameComponent("", []), None)
467
    self.failUnlessEqual(MatchNameComponent("test", []), None)
468

    
469
  def testSingleMatch(self):
470
    """Test that a single match is performed correctly"""
471
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
472
    for key in "test2", "test2.example", "test2.example.com":
473
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
474

    
475
  def testMultipleMatches(self):
476
    """Test that a multiple match is returned as None"""
477
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
478
    for key in "test1", "test1.example":
479
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
480

    
481
  def testFullMatch(self):
482
    """Test that a full match is returned correctly"""
483
    key1 = "test1"
484
    key2 = "test1.example"
485
    mlist = [key2, key2 + ".com"]
486
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
487
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
488

    
489
  def testCaseInsensitivePartialMatch(self):
490
    """Test for the case_insensitive keyword"""
491
    mlist = ["test1.example.com", "test2.example.net"]
492
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
493
                     "test2.example.net")
494
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
495
                     "test2.example.net")
496
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
497
                     "test2.example.net")
498
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
499
                     "test2.example.net")
500

    
501

    
502
  def testCaseInsensitiveFullMatch(self):
503
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
504
    # Between the two ts1 a full string match non-case insensitive should work
505
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
506
                     None)
507
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
508
                     "ts1.ex")
509
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
510
                     "ts1.ex")
511
    # Between the two ts2 only case differs, so only case-match works
512
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
513
                     "ts2.ex")
514
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
515
                     "Ts2.ex")
516
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
517
                     None)
518

    
519

    
520
class TestReadFile(testutils.GanetiTestCase):
521

    
522
  def testReadAll(self):
523
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
524
    self.assertEqual(len(data), 814)
525

    
526
    h = md5.new()
527
    h.update(data)
528
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
529

    
530
  def testReadSize(self):
531
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
532
                          size=100)
533
    self.assertEqual(len(data), 100)
534

    
535
    h = md5.new()
536
    h.update(data)
537
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
538

    
539
  def testError(self):
540
    self.assertRaises(EnvironmentError, utils.ReadFile,
541
                      "/dev/null/does-not-exist")
542

    
543

    
544
class TestReadOneLineFile(testutils.GanetiTestCase):
545

    
546
  def setUp(self):
547
    testutils.GanetiTestCase.setUp(self)
548

    
549
  def testDefault(self):
550
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
551
    self.assertEqual(len(data), 27)
552
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
553

    
554
  def testNotStrict(self):
555
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
556
    self.assertEqual(len(data), 27)
557
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
558

    
559
  def testStrictFailure(self):
560
    self.assertRaises(errors.GenericError, ReadOneLineFile,
561
                      self._TestDataFilename("cert1.pem"), strict=True)
562

    
563
  def testLongLine(self):
564
    dummydata = (1024 * "Hello World! ")
565
    myfile = self._CreateTempFile()
566
    utils.WriteFile(myfile, data=dummydata)
567
    datastrict = ReadOneLineFile(myfile, strict=True)
568
    datalax = ReadOneLineFile(myfile, strict=False)
569
    self.assertEqual(dummydata, datastrict)
570
    self.assertEqual(dummydata, datalax)
571

    
572
  def testNewline(self):
573
    myfile = self._CreateTempFile()
574
    myline = "myline"
575
    for nl in ["", "\n", "\r\n"]:
576
      dummydata = "%s%s" % (myline, nl)
577
      utils.WriteFile(myfile, data=dummydata)
578
      datalax = ReadOneLineFile(myfile, strict=False)
579
      self.assertEqual(myline, datalax)
580
      datastrict = ReadOneLineFile(myfile, strict=True)
581
      self.assertEqual(myline, datastrict)
582

    
583
  def testWhitespaceAndMultipleLines(self):
584
    myfile = self._CreateTempFile()
585
    for nl in ["", "\n", "\r\n"]:
586
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
587
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
588
        utils.WriteFile(myfile, data=dummydata)
589
        datalax = ReadOneLineFile(myfile, strict=False)
590
        if nl:
591
          self.assert_(set("\r\n") & set(dummydata))
592
          self.assertRaises(errors.GenericError, ReadOneLineFile,
593
                            myfile, strict=True)
594
          explen = len("Foo bar baz ") + len(ws)
595
          self.assertEqual(len(datalax), explen)
596
          self.assertEqual(datalax, dummydata[:explen])
597
          self.assertFalse(set("\r\n") & set(datalax))
598
        else:
599
          datastrict = ReadOneLineFile(myfile, strict=True)
600
          self.assertEqual(dummydata, datastrict)
601
          self.assertEqual(dummydata, datalax)
602

    
603
  def testEmptylines(self):
604
    myfile = self._CreateTempFile()
605
    myline = "myline"
606
    for nl in ["\n", "\r\n"]:
607
      for ol in ["", "otherline"]:
608
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
609
        utils.WriteFile(myfile, data=dummydata)
610
        self.assert_(set("\r\n") & set(dummydata))
611
        datalax = ReadOneLineFile(myfile, strict=False)
612
        self.assertEqual(myline, datalax)
613
        if ol:
614
          self.assertRaises(errors.GenericError, ReadOneLineFile,
615
                            myfile, strict=True)
616
        else:
617
          datastrict = ReadOneLineFile(myfile, strict=True)
618
          self.assertEqual(myline, datastrict)
619

    
620

    
621
class TestTimestampForFilename(unittest.TestCase):
622
  def test(self):
623
    self.assert_("." not in utils.TimestampForFilename())
624
    self.assert_(":" not in utils.TimestampForFilename())
625

    
626

    
627
class TestCreateBackup(testutils.GanetiTestCase):
628
  def setUp(self):
629
    testutils.GanetiTestCase.setUp(self)
630

    
631
    self.tmpdir = tempfile.mkdtemp()
632

    
633
  def tearDown(self):
634
    testutils.GanetiTestCase.tearDown(self)
635

    
636
    shutil.rmtree(self.tmpdir)
637

    
638
  def testEmpty(self):
639
    filename = utils.PathJoin(self.tmpdir, "config.data")
640
    utils.WriteFile(filename, data="")
641
    bname = utils.CreateBackup(filename)
642
    self.assertFileContent(bname, "")
643
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
644
    utils.CreateBackup(filename)
645
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
646
    utils.CreateBackup(filename)
647
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
648

    
649
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
650
    os.mkfifo(fifoname)
651
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
652

    
653
  def testContent(self):
654
    bkpcount = 0
655
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
656
      for rep in [1, 2, 10, 127]:
657
        testdata = data * rep
658

    
659
        filename = utils.PathJoin(self.tmpdir, "test.data_")
660
        utils.WriteFile(filename, data=testdata)
661
        self.assertFileContent(filename, testdata)
662

    
663
        for _ in range(3):
664
          bname = utils.CreateBackup(filename)
665
          bkpcount += 1
666
          self.assertFileContent(bname, testdata)
667
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
668

    
669

    
670
class TestFormatUnit(unittest.TestCase):
671
  """Test case for the FormatUnit function"""
672

    
673
  def testMiB(self):
674
    self.assertEqual(FormatUnit(1, 'h'), '1M')
675
    self.assertEqual(FormatUnit(100, 'h'), '100M')
676
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
677

    
678
    self.assertEqual(FormatUnit(1, 'm'), '1')
679
    self.assertEqual(FormatUnit(100, 'm'), '100')
680
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
681

    
682
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
683
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
684
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
685
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
686

    
687
  def testGiB(self):
688
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
689
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
690
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
691
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
692

    
693
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
694
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
695
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
696
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
697

    
698
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
699
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
700
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
701

    
702
  def testTiB(self):
703
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
704
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
705
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
706

    
707
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
708
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
709
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
710

    
711
class TestParseUnit(unittest.TestCase):
712
  """Test case for the ParseUnit function"""
713

    
714
  SCALES = (('', 1),
715
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
716
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
717
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
718

    
719
  def testRounding(self):
720
    self.assertEqual(ParseUnit('0'), 0)
721
    self.assertEqual(ParseUnit('1'), 4)
722
    self.assertEqual(ParseUnit('2'), 4)
723
    self.assertEqual(ParseUnit('3'), 4)
724

    
725
    self.assertEqual(ParseUnit('124'), 124)
726
    self.assertEqual(ParseUnit('125'), 128)
727
    self.assertEqual(ParseUnit('126'), 128)
728
    self.assertEqual(ParseUnit('127'), 128)
729
    self.assertEqual(ParseUnit('128'), 128)
730
    self.assertEqual(ParseUnit('129'), 132)
731
    self.assertEqual(ParseUnit('130'), 132)
732

    
733
  def testFloating(self):
734
    self.assertEqual(ParseUnit('0'), 0)
735
    self.assertEqual(ParseUnit('0.5'), 4)
736
    self.assertEqual(ParseUnit('1.75'), 4)
737
    self.assertEqual(ParseUnit('1.99'), 4)
738
    self.assertEqual(ParseUnit('2.00'), 4)
739
    self.assertEqual(ParseUnit('2.01'), 4)
740
    self.assertEqual(ParseUnit('3.99'), 4)
741
    self.assertEqual(ParseUnit('4.00'), 4)
742
    self.assertEqual(ParseUnit('4.01'), 8)
743
    self.assertEqual(ParseUnit('1.5G'), 1536)
744
    self.assertEqual(ParseUnit('1.8G'), 1844)
745
    self.assertEqual(ParseUnit('8.28T'), 8682212)
746

    
747
  def testSuffixes(self):
748
    for sep in ('', ' ', '   ', "\t", "\t "):
749
      for suffix, scale in TestParseUnit.SCALES:
750
        for func in (lambda x: x, str.lower, str.upper):
751
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
752
                           1024 * scale)
753

    
754
  def testInvalidInput(self):
755
    for sep in ('-', '_', ',', 'a'):
756
      for suffix, _ in TestParseUnit.SCALES:
757
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
758

    
759
    for suffix, _ in TestParseUnit.SCALES:
760
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
761

    
762

    
763
class TestSshKeys(testutils.GanetiTestCase):
764
  """Test case for the AddAuthorizedKey function"""
765

    
766
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
767
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
768
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
769

    
770
  def setUp(self):
771
    testutils.GanetiTestCase.setUp(self)
772
    self.tmpname = self._CreateTempFile()
773
    handle = open(self.tmpname, 'w')
774
    try:
775
      handle.write("%s\n" % TestSshKeys.KEY_A)
776
      handle.write("%s\n" % TestSshKeys.KEY_B)
777
    finally:
778
      handle.close()
779

    
780
  def testAddingNewKey(self):
781
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
782

    
783
    self.assertFileContent(self.tmpname,
784
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
785
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
786
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
787
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
788

    
789
  def testAddingAlmostButNotCompletelyTheSameKey(self):
790
    AddAuthorizedKey(self.tmpname,
791
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
792

    
793
    self.assertFileContent(self.tmpname,
794
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
795
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
796
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
797
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
798

    
799
  def testAddingExistingKeyWithSomeMoreSpaces(self):
800
    AddAuthorizedKey(self.tmpname,
801
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
802

    
803
    self.assertFileContent(self.tmpname,
804
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
805
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
806
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
807

    
808
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
809
    RemoveAuthorizedKey(self.tmpname,
810
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
811

    
812
    self.assertFileContent(self.tmpname,
813
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
814
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
815

    
816
  def testRemovingNonExistingKey(self):
817
    RemoveAuthorizedKey(self.tmpname,
818
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
819

    
820
    self.assertFileContent(self.tmpname,
821
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
822
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
823
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
824

    
825

    
826
class TestEtcHosts(testutils.GanetiTestCase):
827
  """Test functions modifying /etc/hosts"""
828

    
829
  def setUp(self):
830
    testutils.GanetiTestCase.setUp(self)
831
    self.tmpname = self._CreateTempFile()
832
    handle = open(self.tmpname, 'w')
833
    try:
834
      handle.write('# This is a test file for /etc/hosts\n')
835
      handle.write('127.0.0.1\tlocalhost\n')
836
      handle.write('192.168.1.1 router gw\n')
837
    finally:
838
      handle.close()
839

    
840
  def testSettingNewIp(self):
841
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
842

    
843
    self.assertFileContent(self.tmpname,
844
      "# This is a test file for /etc/hosts\n"
845
      "127.0.0.1\tlocalhost\n"
846
      "192.168.1.1 router gw\n"
847
      "1.2.3.4\tmyhost.domain.tld myhost\n")
848
    self.assertFileMode(self.tmpname, 0644)
849

    
850
  def testSettingExistingIp(self):
851
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
852
                     ['myhost'])
853

    
854
    self.assertFileContent(self.tmpname,
855
      "# This is a test file for /etc/hosts\n"
856
      "127.0.0.1\tlocalhost\n"
857
      "192.168.1.1\tmyhost.domain.tld myhost\n")
858
    self.assertFileMode(self.tmpname, 0644)
859

    
860
  def testSettingDuplicateName(self):
861
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
862

    
863
    self.assertFileContent(self.tmpname,
864
      "# This is a test file for /etc/hosts\n"
865
      "127.0.0.1\tlocalhost\n"
866
      "192.168.1.1 router gw\n"
867
      "1.2.3.4\tmyhost\n")
868
    self.assertFileMode(self.tmpname, 0644)
869

    
870
  def testRemovingExistingHost(self):
871
    RemoveEtcHostsEntry(self.tmpname, 'router')
872

    
873
    self.assertFileContent(self.tmpname,
874
      "# This is a test file for /etc/hosts\n"
875
      "127.0.0.1\tlocalhost\n"
876
      "192.168.1.1 gw\n")
877
    self.assertFileMode(self.tmpname, 0644)
878

    
879
  def testRemovingSingleExistingHost(self):
880
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
881

    
882
    self.assertFileContent(self.tmpname,
883
      "# This is a test file for /etc/hosts\n"
884
      "192.168.1.1 router gw\n")
885
    self.assertFileMode(self.tmpname, 0644)
886

    
887
  def testRemovingNonExistingHost(self):
888
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
889

    
890
    self.assertFileContent(self.tmpname,
891
      "# This is a test file for /etc/hosts\n"
892
      "127.0.0.1\tlocalhost\n"
893
      "192.168.1.1 router gw\n")
894
    self.assertFileMode(self.tmpname, 0644)
895

    
896
  def testRemovingAlias(self):
897
    RemoveEtcHostsEntry(self.tmpname, 'gw')
898

    
899
    self.assertFileContent(self.tmpname,
900
      "# This is a test file for /etc/hosts\n"
901
      "127.0.0.1\tlocalhost\n"
902
      "192.168.1.1 router\n")
903
    self.assertFileMode(self.tmpname, 0644)
904

    
905

    
906
class TestShellQuoting(unittest.TestCase):
907
  """Test case for shell quoting functions"""
908

    
909
  def testShellQuote(self):
910
    self.assertEqual(ShellQuote('abc'), "abc")
911
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
912
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
913
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
914
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
915

    
916
  def testShellQuoteArgs(self):
917
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
918
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
919
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
920

    
921

    
922
class TestTcpPing(unittest.TestCase):
923
  """Testcase for TCP version of ping - against listen(2)ing port"""
924

    
925
  def setUp(self):
926
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
927
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
928
    self.listenerport = self.listener.getsockname()[1]
929
    self.listener.listen(1)
930

    
931
  def tearDown(self):
932
    self.listener.shutdown(socket.SHUT_RDWR)
933
    del self.listener
934
    del self.listenerport
935

    
936
  def testTcpPingToLocalHostAccept(self):
937
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
938
                         self.listenerport,
939
                         timeout=10,
940
                         live_port_needed=True,
941
                         source=constants.LOCALHOST_IP_ADDRESS,
942
                         ),
943
                 "failed to connect to test listener")
944

    
945
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
946
                         self.listenerport,
947
                         timeout=10,
948
                         live_port_needed=True,
949
                         ),
950
                 "failed to connect to test listener (no source)")
951

    
952

    
953
class TestTcpPingDeaf(unittest.TestCase):
954
  """Testcase for TCP version of ping - against non listen(2)ing port"""
955

    
956
  def setUp(self):
957
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
958
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
959
    self.deaflistenerport = self.deaflistener.getsockname()[1]
960

    
961
  def tearDown(self):
962
    del self.deaflistener
963
    del self.deaflistenerport
964

    
965
  def testTcpPingToLocalHostAcceptDeaf(self):
966
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
967
                        self.deaflistenerport,
968
                        timeout=constants.TCP_PING_TIMEOUT,
969
                        live_port_needed=True,
970
                        source=constants.LOCALHOST_IP_ADDRESS,
971
                        ), # need successful connect(2)
972
                "successfully connected to deaf listener")
973

    
974
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
975
                        self.deaflistenerport,
976
                        timeout=constants.TCP_PING_TIMEOUT,
977
                        live_port_needed=True,
978
                        ), # need successful connect(2)
979
                "successfully connected to deaf listener (no source addr)")
980

    
981
  def testTcpPingToLocalHostNoAccept(self):
982
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
983
                         self.deaflistenerport,
984
                         timeout=constants.TCP_PING_TIMEOUT,
985
                         live_port_needed=False,
986
                         source=constants.LOCALHOST_IP_ADDRESS,
987
                         ), # ECONNREFUSED is OK
988
                 "failed to ping alive host on deaf port")
989

    
990
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
991
                         self.deaflistenerport,
992
                         timeout=constants.TCP_PING_TIMEOUT,
993
                         live_port_needed=False,
994
                         ), # ECONNREFUSED is OK
995
                 "failed to ping alive host on deaf port (no source addr)")
996

    
997

    
998
class TestOwnIpAddress(unittest.TestCase):
999
  """Testcase for OwnIpAddress"""
1000

    
1001
  def testOwnLoopback(self):
1002
    """check having the loopback ip"""
1003
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
1004
                    "Should own the loopback address")
1005

    
1006
  def testNowOwnAddress(self):
1007
    """check that I don't own an address"""
1008

    
1009
    # network 192.0.2.0/24 is reserved for test/documentation as per
1010
    # rfc 3330, so we *should* not have an address of this range... if
1011
    # this fails, we should extend the test to multiple addresses
1012
    DST_IP = "192.0.2.1"
1013
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
1014

    
1015

    
1016
def _GetSocketCredentials(path):
1017
  """Connect to a Unix socket and return remote credentials.
1018

1019
  """
1020
  sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1021
  try:
1022
    sock.settimeout(10)
1023
    sock.connect(path)
1024
    return utils.GetSocketCredentials(sock)
1025
  finally:
1026
    sock.close()
1027

    
1028

    
1029
class TestGetSocketCredentials(unittest.TestCase):
1030
  def setUp(self):
1031
    self.tmpdir = tempfile.mkdtemp()
1032
    self.sockpath = utils.PathJoin(self.tmpdir, "sock")
1033

    
1034
    self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1035
    self.listener.settimeout(10)
1036
    self.listener.bind(self.sockpath)
1037
    self.listener.listen(1)
1038

    
1039
  def tearDown(self):
1040
    self.listener.shutdown(socket.SHUT_RDWR)
1041
    self.listener.close()
1042
    shutil.rmtree(self.tmpdir)
1043

    
1044
  def test(self):
1045
    (c2pr, c2pw) = os.pipe()
1046

    
1047
    # Start child process
1048
    child = os.fork()
1049
    if child == 0:
1050
      try:
1051
        data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
1052

    
1053
        os.write(c2pw, data)
1054
        os.close(c2pw)
1055

    
1056
        os._exit(0)
1057
      finally:
1058
        os._exit(1)
1059

    
1060
    os.close(c2pw)
1061

    
1062
    # Wait for one connection
1063
    (conn, _) = self.listener.accept()
1064
    conn.recv(1)
1065
    conn.close()
1066

    
1067
    # Wait for result
1068
    result = os.read(c2pr, 4096)
1069
    os.close(c2pr)
1070

    
1071
    # Check child's exit code
1072
    (_, status) = os.waitpid(child, 0)
1073
    self.assertFalse(os.WIFSIGNALED(status))
1074
    self.assertEqual(os.WEXITSTATUS(status), 0)
1075

    
1076
    # Check result
1077
    (pid, uid, gid) = serializer.LoadJson(result)
1078
    self.assertEqual(pid, os.getpid())
1079
    self.assertEqual(uid, os.getuid())
1080
    self.assertEqual(gid, os.getgid())
1081

    
1082

    
1083
class TestListVisibleFiles(unittest.TestCase):
1084
  """Test case for ListVisibleFiles"""
1085

    
1086
  def setUp(self):
1087
    self.path = tempfile.mkdtemp()
1088

    
1089
  def tearDown(self):
1090
    shutil.rmtree(self.path)
1091

    
1092
  def _test(self, files, expected):
1093
    # Sort a copy
1094
    expected = expected[:]
1095
    expected.sort()
1096

    
1097
    for name in files:
1098
      f = open(os.path.join(self.path, name), 'w')
1099
      try:
1100
        f.write("Test\n")
1101
      finally:
1102
        f.close()
1103

    
1104
    found = ListVisibleFiles(self.path)
1105
    found.sort()
1106

    
1107
    self.assertEqual(found, expected)
1108

    
1109
  def testAllVisible(self):
1110
    files = ["a", "b", "c"]
1111
    expected = files
1112
    self._test(files, expected)
1113

    
1114
  def testNoneVisible(self):
1115
    files = [".a", ".b", ".c"]
1116
    expected = []
1117
    self._test(files, expected)
1118

    
1119
  def testSomeVisible(self):
1120
    files = ["a", "b", ".c"]
1121
    expected = ["a", "b"]
1122
    self._test(files, expected)
1123

    
1124
  def testNonAbsolutePath(self):
1125
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1126

    
1127
  def testNonNormalizedPath(self):
1128
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1129
                          "/bin/../tmp")
1130

    
1131

    
1132
class TestNewUUID(unittest.TestCase):
1133
  """Test case for NewUUID"""
1134

    
1135
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1136
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
1137

    
1138
  def runTest(self):
1139
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
1140

    
1141

    
1142
class TestUniqueSequence(unittest.TestCase):
1143
  """Test case for UniqueSequence"""
1144

    
1145
  def _test(self, input, expected):
1146
    self.assertEqual(utils.UniqueSequence(input), expected)
1147

    
1148
  def runTest(self):
1149
    # Ordered input
1150
    self._test([1, 2, 3], [1, 2, 3])
1151
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1152
    self._test([1, 2, 2, 3], [1, 2, 3])
1153
    self._test([1, 2, 3, 3], [1, 2, 3])
1154

    
1155
    # Unordered input
1156
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1157
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1158

    
1159
    # Strings
1160
    self._test(["a", "a"], ["a"])
1161
    self._test(["a", "b"], ["a", "b"])
1162
    self._test(["a", "b", "a"], ["a", "b"])
1163

    
1164

    
1165
class TestFirstFree(unittest.TestCase):
1166
  """Test case for the FirstFree function"""
1167

    
1168
  def test(self):
1169
    """Test FirstFree"""
1170
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1171
    self.failUnlessEqual(FirstFree([]), None)
1172
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1173
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1174
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1175

    
1176

    
1177
class TestTailFile(testutils.GanetiTestCase):
1178
  """Test case for the TailFile function"""
1179

    
1180
  def testEmpty(self):
1181
    fname = self._CreateTempFile()
1182
    self.failUnlessEqual(TailFile(fname), [])
1183
    self.failUnlessEqual(TailFile(fname, lines=25), [])
1184

    
1185
  def testAllLines(self):
1186
    data = ["test %d" % i for i in range(30)]
1187
    for i in range(30):
1188
      fname = self._CreateTempFile()
1189
      fd = open(fname, "w")
1190
      fd.write("\n".join(data[:i]))
1191
      if i > 0:
1192
        fd.write("\n")
1193
      fd.close()
1194
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1195

    
1196
  def testPartialLines(self):
1197
    data = ["test %d" % i for i in range(30)]
1198
    fname = self._CreateTempFile()
1199
    fd = open(fname, "w")
1200
    fd.write("\n".join(data))
1201
    fd.write("\n")
1202
    fd.close()
1203
    for i in range(1, 30):
1204
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1205

    
1206
  def testBigFile(self):
1207
    data = ["test %d" % i for i in range(30)]
1208
    fname = self._CreateTempFile()
1209
    fd = open(fname, "w")
1210
    fd.write("X" * 1048576)
1211
    fd.write("\n")
1212
    fd.write("\n".join(data))
1213
    fd.write("\n")
1214
    fd.close()
1215
    for i in range(1, 30):
1216
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1217

    
1218

    
1219
class _BaseFileLockTest:
1220
  """Test case for the FileLock class"""
1221

    
1222
  def testSharedNonblocking(self):
1223
    self.lock.Shared(blocking=False)
1224
    self.lock.Close()
1225

    
1226
  def testExclusiveNonblocking(self):
1227
    self.lock.Exclusive(blocking=False)
1228
    self.lock.Close()
1229

    
1230
  def testUnlockNonblocking(self):
1231
    self.lock.Unlock(blocking=False)
1232
    self.lock.Close()
1233

    
1234
  def testSharedBlocking(self):
1235
    self.lock.Shared(blocking=True)
1236
    self.lock.Close()
1237

    
1238
  def testExclusiveBlocking(self):
1239
    self.lock.Exclusive(blocking=True)
1240
    self.lock.Close()
1241

    
1242
  def testUnlockBlocking(self):
1243
    self.lock.Unlock(blocking=True)
1244
    self.lock.Close()
1245

    
1246
  def testSharedExclusiveUnlock(self):
1247
    self.lock.Shared(blocking=False)
1248
    self.lock.Exclusive(blocking=False)
1249
    self.lock.Unlock(blocking=False)
1250
    self.lock.Close()
1251

    
1252
  def testExclusiveSharedUnlock(self):
1253
    self.lock.Exclusive(blocking=False)
1254
    self.lock.Shared(blocking=False)
1255
    self.lock.Unlock(blocking=False)
1256
    self.lock.Close()
1257

    
1258
  def testSimpleTimeout(self):
1259
    # These will succeed on the first attempt, hence a short timeout
1260
    self.lock.Shared(blocking=True, timeout=10.0)
1261
    self.lock.Exclusive(blocking=False, timeout=10.0)
1262
    self.lock.Unlock(blocking=True, timeout=10.0)
1263
    self.lock.Close()
1264

    
1265
  @staticmethod
1266
  def _TryLockInner(filename, shared, blocking):
1267
    lock = utils.FileLock.Open(filename)
1268

    
1269
    if shared:
1270
      fn = lock.Shared
1271
    else:
1272
      fn = lock.Exclusive
1273

    
1274
    try:
1275
      # The timeout doesn't really matter as the parent process waits for us to
1276
      # finish anyway.
1277
      fn(blocking=blocking, timeout=0.01)
1278
    except errors.LockError, err:
1279
      return False
1280

    
1281
    return True
1282

    
1283
  def _TryLock(self, *args):
1284
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1285
                                      *args)
1286

    
1287
  def testTimeout(self):
1288
    for blocking in [True, False]:
1289
      self.lock.Exclusive(blocking=True)
1290
      self.failIf(self._TryLock(False, blocking))
1291
      self.failIf(self._TryLock(True, blocking))
1292

    
1293
      self.lock.Shared(blocking=True)
1294
      self.assert_(self._TryLock(True, blocking))
1295
      self.failIf(self._TryLock(False, blocking))
1296

    
1297
  def testCloseShared(self):
1298
    self.lock.Close()
1299
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1300

    
1301
  def testCloseExclusive(self):
1302
    self.lock.Close()
1303
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1304

    
1305
  def testCloseUnlock(self):
1306
    self.lock.Close()
1307
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1308

    
1309

    
1310
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1311
  TESTDATA = "Hello World\n" * 10
1312

    
1313
  def setUp(self):
1314
    testutils.GanetiTestCase.setUp(self)
1315

    
1316
    self.tmpfile = tempfile.NamedTemporaryFile()
1317
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1318
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1319

    
1320
    # Ensure "Open" didn't truncate file
1321
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1322

    
1323
  def tearDown(self):
1324
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1325

    
1326
    testutils.GanetiTestCase.tearDown(self)
1327

    
1328

    
1329
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1330
  def setUp(self):
1331
    self.tmpfile = tempfile.NamedTemporaryFile()
1332
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1333

    
1334

    
1335
class TestTimeFunctions(unittest.TestCase):
1336
  """Test case for time functions"""
1337

    
1338
  def runTest(self):
1339
    self.assertEqual(utils.SplitTime(1), (1, 0))
1340
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1341
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1342
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1343
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1344
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1345
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1346
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1347

    
1348
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1349

    
1350
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1351
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1352
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1353

    
1354
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1355
                     1218448917.481)
1356
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1357

    
1358
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1359
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1360
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1361
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1362
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1363

    
1364

    
1365
class FieldSetTestCase(unittest.TestCase):
1366
  """Test case for FieldSets"""
1367

    
1368
  def testSimpleMatch(self):
1369
    f = utils.FieldSet("a", "b", "c", "def")
1370
    self.failUnless(f.Matches("a"))
1371
    self.failIf(f.Matches("d"), "Substring matched")
1372
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1373
    self.failIf(f.NonMatching(["b", "c"]))
1374
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1375
    self.failUnless(f.NonMatching(["a", "d"]))
1376

    
1377
  def testRegexMatch(self):
1378
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1379
    self.failUnless(f.Matches("b1"))
1380
    self.failUnless(f.Matches("b99"))
1381
    self.failIf(f.Matches("b/1"))
1382
    self.failIf(f.NonMatching(["b12", "c"]))
1383
    self.failUnless(f.NonMatching(["a", "1"]))
1384

    
1385
class TestForceDictType(unittest.TestCase):
1386
  """Test case for ForceDictType"""
1387

    
1388
  def setUp(self):
1389
    self.key_types = {
1390
      'a': constants.VTYPE_INT,
1391
      'b': constants.VTYPE_BOOL,
1392
      'c': constants.VTYPE_STRING,
1393
      'd': constants.VTYPE_SIZE,
1394
      }
1395

    
1396
  def _fdt(self, dict, allowed_values=None):
1397
    if allowed_values is None:
1398
      ForceDictType(dict, self.key_types)
1399
    else:
1400
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1401

    
1402
    return dict
1403

    
1404
  def testSimpleDict(self):
1405
    self.assertEqual(self._fdt({}), {})
1406
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1407
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1408
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1409
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1410
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1411
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1412
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1413
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1414
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1415
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1416
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1417

    
1418
  def testErrors(self):
1419
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1420
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1421
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1422
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1423

    
1424

    
1425
class TestIsNormAbsPath(unittest.TestCase):
1426
  """Testing case for IsNormAbsPath"""
1427

    
1428
  def _pathTestHelper(self, path, result):
1429
    if result:
1430
      self.assert_(IsNormAbsPath(path),
1431
          "Path %s should result absolute and normalized" % path)
1432
    else:
1433
      self.assert_(not IsNormAbsPath(path),
1434
          "Path %s should not result absolute and normalized" % path)
1435

    
1436
  def testBase(self):
1437
    self._pathTestHelper('/etc', True)
1438
    self._pathTestHelper('/srv', True)
1439
    self._pathTestHelper('etc', False)
1440
    self._pathTestHelper('/etc/../root', False)
1441
    self._pathTestHelper('/etc/', False)
1442

    
1443

    
1444
class TestSafeEncode(unittest.TestCase):
1445
  """Test case for SafeEncode"""
1446

    
1447
  def testAscii(self):
1448
    for txt in [string.digits, string.letters, string.punctuation]:
1449
      self.failUnlessEqual(txt, SafeEncode(txt))
1450

    
1451
  def testDoubleEncode(self):
1452
    for i in range(255):
1453
      txt = SafeEncode(chr(i))
1454
      self.failUnlessEqual(txt, SafeEncode(txt))
1455

    
1456
  def testUnicode(self):
1457
    # 1024 is high enough to catch non-direct ASCII mappings
1458
    for i in range(1024):
1459
      txt = SafeEncode(unichr(i))
1460
      self.failUnlessEqual(txt, SafeEncode(txt))
1461

    
1462

    
1463
class TestFormatTime(unittest.TestCase):
1464
  """Testing case for FormatTime"""
1465

    
1466
  def testNone(self):
1467
    self.failUnlessEqual(FormatTime(None), "N/A")
1468

    
1469
  def testInvalid(self):
1470
    self.failUnlessEqual(FormatTime(()), "N/A")
1471

    
1472
  def testNow(self):
1473
    # tests that we accept time.time input
1474
    FormatTime(time.time())
1475
    # tests that we accept int input
1476
    FormatTime(int(time.time()))
1477

    
1478

    
1479
class RunInSeparateProcess(unittest.TestCase):
1480
  def test(self):
1481
    for exp in [True, False]:
1482
      def _child():
1483
        return exp
1484

    
1485
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1486

    
1487
  def testArgs(self):
1488
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1489
      def _child(carg1, carg2):
1490
        return carg1 == "Foo" and carg2 == arg
1491

    
1492
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1493

    
1494
  def testPid(self):
1495
    parent_pid = os.getpid()
1496

    
1497
    def _check():
1498
      return os.getpid() == parent_pid
1499

    
1500
    self.failIf(utils.RunInSeparateProcess(_check))
1501

    
1502
  def testSignal(self):
1503
    def _kill():
1504
      os.kill(os.getpid(), signal.SIGTERM)
1505

    
1506
    self.assertRaises(errors.GenericError,
1507
                      utils.RunInSeparateProcess, _kill)
1508

    
1509
  def testException(self):
1510
    def _exc():
1511
      raise errors.GenericError("This is a test")
1512

    
1513
    self.assertRaises(errors.GenericError,
1514
                      utils.RunInSeparateProcess, _exc)
1515

    
1516

    
1517
class TestFingerprintFile(unittest.TestCase):
1518
  def setUp(self):
1519
    self.tmpfile = tempfile.NamedTemporaryFile()
1520

    
1521
  def test(self):
1522
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1523
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1524

    
1525
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1526
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1527
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1528

    
1529

    
1530
class TestUnescapeAndSplit(unittest.TestCase):
1531
  """Testing case for UnescapeAndSplit"""
1532

    
1533
  def setUp(self):
1534
    # testing more that one separator for regexp safety
1535
    self._seps = [",", "+", "."]
1536

    
1537
  def testSimple(self):
1538
    a = ["a", "b", "c", "d"]
1539
    for sep in self._seps:
1540
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1541

    
1542
  def testEscape(self):
1543
    for sep in self._seps:
1544
      a = ["a", "b\\" + sep + "c", "d"]
1545
      b = ["a", "b" + sep + "c", "d"]
1546
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1547

    
1548
  def testDoubleEscape(self):
1549
    for sep in self._seps:
1550
      a = ["a", "b\\\\", "c", "d"]
1551
      b = ["a", "b\\", "c", "d"]
1552
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1553

    
1554
  def testThreeEscape(self):
1555
    for sep in self._seps:
1556
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1557
      b = ["a", "b\\" + sep + "c", "d"]
1558
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1559

    
1560

    
1561
class TestPathJoin(unittest.TestCase):
1562
  """Testing case for PathJoin"""
1563

    
1564
  def testBasicItems(self):
1565
    mlist = ["/a", "b", "c"]
1566
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1567

    
1568
  def testNonAbsPrefix(self):
1569
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1570

    
1571
  def testBackTrack(self):
1572
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1573

    
1574
  def testMultiAbs(self):
1575
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1576

    
1577

    
1578
class TestHostInfo(unittest.TestCase):
1579
  """Testing case for HostInfo"""
1580

    
1581
  def testUppercase(self):
1582
    data = "AbC.example.com"
1583
    self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1584

    
1585
  def testTooLongName(self):
1586
    data = "a.b." + "c" * 255
1587
    self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1588

    
1589
  def testTrailingDot(self):
1590
    data = "a.b.c"
1591
    self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1592

    
1593
  def testInvalidName(self):
1594
    data = [
1595
      "a b",
1596
      "a/b",
1597
      ".a.b",
1598
      "a..b",
1599
      ]
1600
    for value in data:
1601
      self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1602

    
1603
  def testValidName(self):
1604
    data = [
1605
      "a.b",
1606
      "a-b",
1607
      "a_b",
1608
      "a.b.c",
1609
      ]
1610
    for value in data:
1611
      HostInfo.NormalizeName(value)
1612

    
1613

    
1614
class TestParseAsn1Generalizedtime(unittest.TestCase):
1615
  def test(self):
1616
    # UTC
1617
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1618
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1619
                     1266860512)
1620
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1621
                     (2**31) - 1)
1622

    
1623
    # With offset
1624
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1625
                     1266860512)
1626
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1627
                     1266931012)
1628
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1629
                     1266931088)
1630
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1631
                     1266931295)
1632
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1633
                     3600)
1634

    
1635
    # Leap seconds are not supported by datetime.datetime
1636
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1637
                      "19841231235960+0000")
1638
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1639
                      "19920630235960+0000")
1640

    
1641
    # Errors
1642
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1643
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1644
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1645
                      "20100222174152")
1646
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1647
                      "Mon Feb 22 17:47:02 UTC 2010")
1648
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1649
                      "2010-02-22 17:42:02")
1650

    
1651

    
1652
class TestGetX509CertValidity(testutils.GanetiTestCase):
1653
  def setUp(self):
1654
    testutils.GanetiTestCase.setUp(self)
1655

    
1656
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1657

    
1658
    # Test whether we have pyOpenSSL 0.7 or above
1659
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1660

    
1661
    if not self.pyopenssl0_7:
1662
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1663
                    " function correctly")
1664

    
1665
  def _LoadCert(self, name):
1666
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1667
                                           self._ReadTestData(name))
1668

    
1669
  def test(self):
1670
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1671
    if self.pyopenssl0_7:
1672
      self.assertEqual(validity, (1266919967, 1267524767))
1673
    else:
1674
      self.assertEqual(validity, (None, None))
1675

    
1676

    
1677
class TestMakedirs(unittest.TestCase):
1678
  def setUp(self):
1679
    self.tmpdir = tempfile.mkdtemp()
1680

    
1681
  def tearDown(self):
1682
    shutil.rmtree(self.tmpdir)
1683

    
1684
  def testNonExisting(self):
1685
    path = utils.PathJoin(self.tmpdir, "foo")
1686
    utils.Makedirs(path)
1687
    self.assert_(os.path.isdir(path))
1688

    
1689
  def testExisting(self):
1690
    path = utils.PathJoin(self.tmpdir, "foo")
1691
    os.mkdir(path)
1692
    utils.Makedirs(path)
1693
    self.assert_(os.path.isdir(path))
1694

    
1695
  def testRecursiveNonExisting(self):
1696
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
1697
    utils.Makedirs(path)
1698
    self.assert_(os.path.isdir(path))
1699

    
1700
  def testRecursiveExisting(self):
1701
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
1702
    self.assert_(not os.path.exists(path))
1703
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
1704
    utils.Makedirs(path)
1705
    self.assert_(os.path.isdir(path))
1706

    
1707

    
1708
class TestRetry(testutils.GanetiTestCase):
1709
  def setUp(self):
1710
    testutils.GanetiTestCase.setUp(self)
1711
    self.retries = 0
1712

    
1713
  @staticmethod
1714
  def _RaiseRetryAgain():
1715
    raise utils.RetryAgain()
1716

    
1717
  @staticmethod
1718
  def _RaiseRetryAgainWithArg(args):
1719
    raise utils.RetryAgain(*args)
1720

    
1721
  def _WrongNestedLoop(self):
1722
    return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
1723

    
1724
  def _RetryAndSucceed(self, retries):
1725
    if self.retries < retries:
1726
      self.retries += 1
1727
      raise utils.RetryAgain()
1728
    else:
1729
      return True
1730

    
1731
  def testRaiseTimeout(self):
1732
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1733
                          self._RaiseRetryAgain, 0.01, 0.02)
1734
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1735
                          self._RetryAndSucceed, 0.01, 0, args=[1])
1736
    self.failUnlessEqual(self.retries, 1)
1737

    
1738
  def testComplete(self):
1739
    self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
1740
    self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
1741
                         True)
1742
    self.failUnlessEqual(self.retries, 2)
1743

    
1744
  def testNestedLoop(self):
1745
    try:
1746
      self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
1747
                            self._WrongNestedLoop, 0, 1)
1748
    except utils.RetryTimeout:
1749
      self.fail("Didn't detect inner loop's exception")
1750

    
1751
  def testTimeoutArgument(self):
1752
    retry_arg="my_important_debugging_message"
1753
    try:
1754
      utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
1755
    except utils.RetryTimeout, err:
1756
      self.failUnlessEqual(err.args, (retry_arg, ))
1757
    else:
1758
      self.fail("Expected timeout didn't happen")
1759

    
1760
  def testRaiseInnerWithExc(self):
1761
    retry_arg="my_important_debugging_message"
1762
    try:
1763
      try:
1764
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1765
                    args=[[errors.GenericError(retry_arg, retry_arg)]])
1766
      except utils.RetryTimeout, err:
1767
        err.RaiseInner()
1768
      else:
1769
        self.fail("Expected timeout didn't happen")
1770
    except errors.GenericError, err:
1771
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1772
    else:
1773
      self.fail("Expected GenericError didn't happen")
1774

    
1775
  def testRaiseInnerWithMsg(self):
1776
    retry_arg="my_important_debugging_message"
1777
    try:
1778
      try:
1779
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1780
                    args=[[retry_arg, retry_arg]])
1781
      except utils.RetryTimeout, err:
1782
        err.RaiseInner()
1783
      else:
1784
        self.fail("Expected timeout didn't happen")
1785
    except utils.RetryTimeout, err:
1786
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1787
    else:
1788
      self.fail("Expected RetryTimeout didn't happen")
1789

    
1790

    
1791
class TestLineSplitter(unittest.TestCase):
1792
  def test(self):
1793
    lines = []
1794
    ls = utils.LineSplitter(lines.append)
1795
    ls.write("Hello World\n")
1796
    self.assertEqual(lines, [])
1797
    ls.write("Foo\n Bar\r\n ")
1798
    ls.write("Baz")
1799
    ls.write("Moo")
1800
    self.assertEqual(lines, [])
1801
    ls.flush()
1802
    self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
1803
    ls.close()
1804
    self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
1805

    
1806
  def _testExtra(self, line, all_lines, p1, p2):
1807
    self.assertEqual(p1, 999)
1808
    self.assertEqual(p2, "extra")
1809
    all_lines.append(line)
1810

    
1811
  def testExtraArgsNoFlush(self):
1812
    lines = []
1813
    ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
1814
    ls.write("\n\nHello World\n")
1815
    ls.write("Foo\n Bar\r\n ")
1816
    ls.write("")
1817
    ls.write("Baz")
1818
    ls.write("Moo\n\nx\n")
1819
    self.assertEqual(lines, [])
1820
    ls.close()
1821
    self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
1822
                             "", "x"])
1823

    
1824

    
1825
if __name__ == '__main__':
1826
  testutils.GanetiTestProgram()