Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ b774bb10

History | View | Annotate | Download (58.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import stat
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36
import string
37
import OpenSSL
38
import warnings
39
import distutils.version
40
import glob
41
import md5
42

    
43
import ganeti
44
import testutils
45
from ganeti import constants
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti import serializer
49
from ganeti.utils import IsProcessAlive, RunCmd, \
50
     RemoveFile, MatchNameComponent, FormatUnit, \
51
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
52
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
53
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
54
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
55
     UnescapeAndSplit, RunParts, PathJoin, HostInfo
56

    
57
from ganeti.errors import LockError, UnitParseError, GenericError, \
58
     ProgrammerError, OpPrereqError
59

    
60

    
61
class TestIsProcessAlive(unittest.TestCase):
62
  """Testing case for IsProcessAlive"""
63

    
64
  def testExists(self):
65
    mypid = os.getpid()
66
    self.assert_(IsProcessAlive(mypid),
67
                 "can't find myself running")
68

    
69
  def testNotExisting(self):
70
    pid_non_existing = os.fork()
71
    if pid_non_existing == 0:
72
      os._exit(0)
73
    elif pid_non_existing < 0:
74
      raise SystemError("can't fork")
75
    os.waitpid(pid_non_existing, 0)
76
    self.assert_(not IsProcessAlive(pid_non_existing),
77
                 "nonexisting process detected")
78

    
79

    
80
class TestPidFileFunctions(unittest.TestCase):
81
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
82

    
83
  def setUp(self):
84
    self.dir = tempfile.mkdtemp()
85
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
86
    utils.DaemonPidFileName = self.f_dpn
87

    
88
  def testPidFileFunctions(self):
89
    pid_file = self.f_dpn('test')
90
    utils.WritePidFile('test')
91
    self.failUnless(os.path.exists(pid_file),
92
                    "PID file should have been created")
93
    read_pid = utils.ReadPidFile(pid_file)
94
    self.failUnlessEqual(read_pid, os.getpid())
95
    self.failUnless(utils.IsProcessAlive(read_pid))
96
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
97
    utils.RemovePidFile('test')
98
    self.failIf(os.path.exists(pid_file),
99
                "PID file should not exist anymore")
100
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
101
                         "ReadPidFile should return 0 for missing pid file")
102
    fh = open(pid_file, "w")
103
    fh.write("blah\n")
104
    fh.close()
105
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
106
                         "ReadPidFile should return 0 for invalid pid file")
107
    utils.RemovePidFile('test')
108
    self.failIf(os.path.exists(pid_file),
109
                "PID file should not exist anymore")
110

    
111
  def testKill(self):
112
    pid_file = self.f_dpn('child')
113
    r_fd, w_fd = os.pipe()
114
    new_pid = os.fork()
115
    if new_pid == 0: #child
116
      utils.WritePidFile('child')
117
      os.write(w_fd, 'a')
118
      signal.pause()
119
      os._exit(0)
120
      return
121
    # else we are in the parent
122
    # wait until the child has written the pid file
123
    os.read(r_fd, 1)
124
    read_pid = utils.ReadPidFile(pid_file)
125
    self.failUnlessEqual(read_pid, new_pid)
126
    self.failUnless(utils.IsProcessAlive(new_pid))
127
    utils.KillProcess(new_pid, waitpid=True)
128
    self.failIf(utils.IsProcessAlive(new_pid))
129
    utils.RemovePidFile('child')
130
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
131

    
132
  def tearDown(self):
133
    for name in os.listdir(self.dir):
134
      os.unlink(os.path.join(self.dir, name))
135
    os.rmdir(self.dir)
136

    
137

    
138
class TestRunCmd(testutils.GanetiTestCase):
139
  """Testing case for the RunCmd function"""
140

    
141
  def setUp(self):
142
    testutils.GanetiTestCase.setUp(self)
143
    self.magic = time.ctime() + " ganeti test"
144
    self.fname = self._CreateTempFile()
145

    
146
  def testOk(self):
147
    """Test successful exit code"""
148
    result = RunCmd("/bin/sh -c 'exit 0'")
149
    self.assertEqual(result.exit_code, 0)
150
    self.assertEqual(result.output, "")
151

    
152
  def testFail(self):
153
    """Test fail exit code"""
154
    result = RunCmd("/bin/sh -c 'exit 1'")
155
    self.assertEqual(result.exit_code, 1)
156
    self.assertEqual(result.output, "")
157

    
158
  def testStdout(self):
159
    """Test standard output"""
160
    cmd = 'echo -n "%s"' % self.magic
161
    result = RunCmd("/bin/sh -c '%s'" % cmd)
162
    self.assertEqual(result.stdout, self.magic)
163
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
164
    self.assertEqual(result.output, "")
165
    self.assertFileContent(self.fname, self.magic)
166

    
167
  def testStderr(self):
168
    """Test standard error"""
169
    cmd = 'echo -n "%s"' % self.magic
170
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
171
    self.assertEqual(result.stderr, self.magic)
172
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
173
    self.assertEqual(result.output, "")
174
    self.assertFileContent(self.fname, self.magic)
175

    
176
  def testCombined(self):
177
    """Test combined output"""
178
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
179
    expected = "A" + self.magic + "B" + self.magic
180
    result = RunCmd("/bin/sh -c '%s'" % cmd)
181
    self.assertEqual(result.output, expected)
182
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
183
    self.assertEqual(result.output, "")
184
    self.assertFileContent(self.fname, expected)
185

    
186
  def testSignal(self):
187
    """Test signal"""
188
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
189
    self.assertEqual(result.signal, 15)
190
    self.assertEqual(result.output, "")
191

    
192
  def testListRun(self):
193
    """Test list runs"""
194
    result = RunCmd(["true"])
195
    self.assertEqual(result.signal, None)
196
    self.assertEqual(result.exit_code, 0)
197
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
198
    self.assertEqual(result.signal, None)
199
    self.assertEqual(result.exit_code, 1)
200
    result = RunCmd(["echo", "-n", self.magic])
201
    self.assertEqual(result.signal, None)
202
    self.assertEqual(result.exit_code, 0)
203
    self.assertEqual(result.stdout, self.magic)
204

    
205
  def testFileEmptyOutput(self):
206
    """Test file output"""
207
    result = RunCmd(["true"], output=self.fname)
208
    self.assertEqual(result.signal, None)
209
    self.assertEqual(result.exit_code, 0)
210
    self.assertFileContent(self.fname, "")
211

    
212
  def testLang(self):
213
    """Test locale environment"""
214
    old_env = os.environ.copy()
215
    try:
216
      os.environ["LANG"] = "en_US.UTF-8"
217
      os.environ["LC_ALL"] = "en_US.UTF-8"
218
      result = RunCmd(["locale"])
219
      for line in result.output.splitlines():
220
        key, value = line.split("=", 1)
221
        # Ignore these variables, they're overridden by LC_ALL
222
        if key == "LANG" or key == "LANGUAGE":
223
          continue
224
        self.failIf(value and value != "C" and value != '"C"',
225
            "Variable %s is set to the invalid value '%s'" % (key, value))
226
    finally:
227
      os.environ = old_env
228

    
229
  def testDefaultCwd(self):
230
    """Test default working directory"""
231
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
232

    
233
  def testCwd(self):
234
    """Test default working directory"""
235
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
236
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
237
    cwd = os.getcwd()
238
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
239

    
240
  def testResetEnv(self):
241
    """Test environment reset functionality"""
242
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
243
    self.failUnlessEqual(RunCmd(["env"], reset_env=True,
244
                                env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
245

    
246

    
247
class TestRunParts(unittest.TestCase):
248
  """Testing case for the RunParts function"""
249

    
250
  def setUp(self):
251
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
252

    
253
  def tearDown(self):
254
    shutil.rmtree(self.rundir)
255

    
256
  def testEmpty(self):
257
    """Test on an empty dir"""
258
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
259

    
260
  def testSkipWrongName(self):
261
    """Test that wrong files are skipped"""
262
    fname = os.path.join(self.rundir, "00test.dot")
263
    utils.WriteFile(fname, data="")
264
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
265
    relname = os.path.basename(fname)
266
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
267
                         [(relname, constants.RUNPARTS_SKIP, None)])
268

    
269
  def testSkipNonExec(self):
270
    """Test that non executable files are skipped"""
271
    fname = os.path.join(self.rundir, "00test")
272
    utils.WriteFile(fname, data="")
273
    relname = os.path.basename(fname)
274
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
275
                         [(relname, constants.RUNPARTS_SKIP, None)])
276

    
277
  def testError(self):
278
    """Test error on a broken executable"""
279
    fname = os.path.join(self.rundir, "00test")
280
    utils.WriteFile(fname, data="")
281
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
282
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
283
    self.failUnlessEqual(relname, os.path.basename(fname))
284
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
285
    self.failUnless(error)
286

    
287
  def testSorted(self):
288
    """Test executions are sorted"""
289
    files = []
290
    files.append(os.path.join(self.rundir, "64test"))
291
    files.append(os.path.join(self.rundir, "00test"))
292
    files.append(os.path.join(self.rundir, "42test"))
293

    
294
    for fname in files:
295
      utils.WriteFile(fname, data="")
296

    
297
    results = RunParts(self.rundir, reset_env=True)
298

    
299
    for fname in sorted(files):
300
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
301

    
302
  def testOk(self):
303
    """Test correct execution"""
304
    fname = os.path.join(self.rundir, "00test")
305
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
306
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
307
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
308
    self.failUnlessEqual(relname, os.path.basename(fname))
309
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
310
    self.failUnlessEqual(runresult.stdout, "ciao")
311

    
312
  def testRunFail(self):
313
    """Test correct execution, with run failure"""
314
    fname = os.path.join(self.rundir, "00test")
315
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
316
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
317
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
318
    self.failUnlessEqual(relname, os.path.basename(fname))
319
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
320
    self.failUnlessEqual(runresult.exit_code, 1)
321
    self.failUnless(runresult.failed)
322

    
323
  def testRunMix(self):
324
    files = []
325
    files.append(os.path.join(self.rundir, "00test"))
326
    files.append(os.path.join(self.rundir, "42test"))
327
    files.append(os.path.join(self.rundir, "64test"))
328
    files.append(os.path.join(self.rundir, "99test"))
329

    
330
    files.sort()
331

    
332
    # 1st has errors in execution
333
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
334
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
335

    
336
    # 2nd is skipped
337
    utils.WriteFile(files[1], data="")
338

    
339
    # 3rd cannot execute properly
340
    utils.WriteFile(files[2], data="")
341
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
342

    
343
    # 4th execs
344
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
345
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
346

    
347
    results = RunParts(self.rundir, reset_env=True)
348

    
349
    (relname, status, runresult) = results[0]
350
    self.failUnlessEqual(relname, os.path.basename(files[0]))
351
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
352
    self.failUnlessEqual(runresult.exit_code, 1)
353
    self.failUnless(runresult.failed)
354

    
355
    (relname, status, runresult) = results[1]
356
    self.failUnlessEqual(relname, os.path.basename(files[1]))
357
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
358
    self.failUnlessEqual(runresult, None)
359

    
360
    (relname, status, runresult) = results[2]
361
    self.failUnlessEqual(relname, os.path.basename(files[2]))
362
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
363
    self.failUnless(runresult)
364

    
365
    (relname, status, runresult) = results[3]
366
    self.failUnlessEqual(relname, os.path.basename(files[3]))
367
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
368
    self.failUnlessEqual(runresult.output, "ciao")
369
    self.failUnlessEqual(runresult.exit_code, 0)
370
    self.failUnless(not runresult.failed)
371

    
372

    
373
class TestRemoveFile(unittest.TestCase):
374
  """Test case for the RemoveFile function"""
375

    
376
  def setUp(self):
377
    """Create a temp dir and file for each case"""
378
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
379
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
380
    os.close(fd)
381

    
382
  def tearDown(self):
383
    if os.path.exists(self.tmpfile):
384
      os.unlink(self.tmpfile)
385
    os.rmdir(self.tmpdir)
386

    
387

    
388
  def testIgnoreDirs(self):
389
    """Test that RemoveFile() ignores directories"""
390
    self.assertEqual(None, RemoveFile(self.tmpdir))
391

    
392

    
393
  def testIgnoreNotExisting(self):
394
    """Test that RemoveFile() ignores non-existing files"""
395
    RemoveFile(self.tmpfile)
396
    RemoveFile(self.tmpfile)
397

    
398

    
399
  def testRemoveFile(self):
400
    """Test that RemoveFile does remove a file"""
401
    RemoveFile(self.tmpfile)
402
    if os.path.exists(self.tmpfile):
403
      self.fail("File '%s' not removed" % self.tmpfile)
404

    
405

    
406
  def testRemoveSymlink(self):
407
    """Test that RemoveFile does remove symlinks"""
408
    symlink = self.tmpdir + "/symlink"
409
    os.symlink("no-such-file", symlink)
410
    RemoveFile(symlink)
411
    if os.path.exists(symlink):
412
      self.fail("File '%s' not removed" % symlink)
413
    os.symlink(self.tmpfile, symlink)
414
    RemoveFile(symlink)
415
    if os.path.exists(symlink):
416
      self.fail("File '%s' not removed" % symlink)
417

    
418

    
419
class TestRename(unittest.TestCase):
420
  """Test case for RenameFile"""
421

    
422
  def setUp(self):
423
    """Create a temporary directory"""
424
    self.tmpdir = tempfile.mkdtemp()
425
    self.tmpfile = os.path.join(self.tmpdir, "test1")
426

    
427
    # Touch the file
428
    open(self.tmpfile, "w").close()
429

    
430
  def tearDown(self):
431
    """Remove temporary directory"""
432
    shutil.rmtree(self.tmpdir)
433

    
434
  def testSimpleRename1(self):
435
    """Simple rename 1"""
436
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
437
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
438

    
439
  def testSimpleRename2(self):
440
    """Simple rename 2"""
441
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
442
                     mkdir=True)
443
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
444

    
445
  def testRenameMkdir(self):
446
    """Rename with mkdir"""
447
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
448
                     mkdir=True)
449
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
450
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
451

    
452
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
453
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
454
                     mkdir=True)
455
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
456
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
457
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
458

    
459

    
460
class TestMatchNameComponent(unittest.TestCase):
461
  """Test case for the MatchNameComponent function"""
462

    
463
  def testEmptyList(self):
464
    """Test that there is no match against an empty list"""
465

    
466
    self.failUnlessEqual(MatchNameComponent("", []), None)
467
    self.failUnlessEqual(MatchNameComponent("test", []), None)
468

    
469
  def testSingleMatch(self):
470
    """Test that a single match is performed correctly"""
471
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
472
    for key in "test2", "test2.example", "test2.example.com":
473
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
474

    
475
  def testMultipleMatches(self):
476
    """Test that a multiple match is returned as None"""
477
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
478
    for key in "test1", "test1.example":
479
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
480

    
481
  def testFullMatch(self):
482
    """Test that a full match is returned correctly"""
483
    key1 = "test1"
484
    key2 = "test1.example"
485
    mlist = [key2, key2 + ".com"]
486
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
487
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
488

    
489
  def testCaseInsensitivePartialMatch(self):
490
    """Test for the case_insensitive keyword"""
491
    mlist = ["test1.example.com", "test2.example.net"]
492
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
493
                     "test2.example.net")
494
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
495
                     "test2.example.net")
496
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
497
                     "test2.example.net")
498
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
499
                     "test2.example.net")
500

    
501

    
502
  def testCaseInsensitiveFullMatch(self):
503
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
504
    # Between the two ts1 a full string match non-case insensitive should work
505
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
506
                     None)
507
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
508
                     "ts1.ex")
509
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
510
                     "ts1.ex")
511
    # Between the two ts2 only case differs, so only case-match works
512
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
513
                     "ts2.ex")
514
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
515
                     "Ts2.ex")
516
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
517
                     None)
518

    
519

    
520
class TestReadFile(testutils.GanetiTestCase):
521
  def setUp(self):
522
    testutils.GanetiTestCase.setUp(self)
523

    
524
    self.tmpdir = tempfile.mkdtemp()
525
    self.fname = utils.PathJoin(self.tmpdir, "data1")
526

    
527
  def tearDown(self):
528
    testutils.GanetiTestCase.tearDown(self)
529

    
530
    shutil.rmtree(self.tmpdir)
531

    
532
  def testReadAll(self):
533
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
534
    self.assertEqual(len(data), 814)
535

    
536
    h = md5.new()
537
    h.update(data)
538
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
539

    
540
  def testReadSize(self):
541
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
542
                          size=100)
543
    self.assertEqual(len(data), 100)
544

    
545
    h = md5.new()
546
    h.update(data)
547
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
548

    
549
  def testReadOneline(self):
550
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
551
                          oneline=True)
552
    self.assertEqual(len(data), 27)
553
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
554

    
555
  def testReadOnelineSize(self):
556
    dummydata = (1024 * "Hello World! ")
557
    self.assertFalse(set("\r\n") & set(dummydata))
558

    
559
    utils.WriteFile(self.fname, data=dummydata)
560

    
561
    data = utils.ReadFile(self.fname, oneline=True, size=555)
562
    self.assertEqual(len(data), 555)
563
    self.assertEqual(data, dummydata[:555])
564
    self.assertFalse(set("\r\n") & set(data))
565

    
566
  def testReadOnelineSize2(self):
567
    for end in ["\n", "\r\n"]:
568
      dummydata = (1024 * ("Hello World%s" % end))
569
      self.assert_(set("\r\n") & set(dummydata))
570

    
571
      utils.WriteFile(self.fname, data=dummydata)
572

    
573
      data = utils.ReadFile(self.fname, oneline=True, size=555)
574
      self.assertEqual(len(data), len("Hello World"))
575
      self.assertEqual(data, dummydata[:11])
576
      self.assertFalse(set("\r\n") & set(data))
577

    
578
  def testReadOnelineWhitespace(self):
579
    for ws in [" ", "\t", "\t\t  \t", "\t "]:
580
      dummydata = (1024 * ("Foo bar baz %s\n" % ws))
581
      self.assert_(set("\r\n") & set(dummydata))
582

    
583
      utils.WriteFile(self.fname, data=dummydata)
584

    
585
      data = utils.ReadFile(self.fname, oneline=True, size=555)
586
      explen = len("Foo bar baz ") + len(ws)
587
      self.assertEqual(len(data), explen)
588
      self.assertEqual(data, dummydata[:explen])
589
      self.assertFalse(set("\r\n") & set(data))
590

    
591
  def testError(self):
592
    self.assertRaises(EnvironmentError, utils.ReadFile,
593
                      utils.PathJoin(self.tmpdir, "does-not-exist"))
594

    
595

    
596
class TestTimestampForFilename(unittest.TestCase):
597
  def test(self):
598
    self.assert_("." not in utils.TimestampForFilename())
599
    self.assert_(":" not in utils.TimestampForFilename())
600

    
601

    
602
class TestCreateBackup(testutils.GanetiTestCase):
603
  def setUp(self):
604
    testutils.GanetiTestCase.setUp(self)
605

    
606
    self.tmpdir = tempfile.mkdtemp()
607

    
608
  def tearDown(self):
609
    testutils.GanetiTestCase.tearDown(self)
610

    
611
    shutil.rmtree(self.tmpdir)
612

    
613
  def testEmpty(self):
614
    filename = utils.PathJoin(self.tmpdir, "config.data")
615
    utils.WriteFile(filename, data="")
616
    bname = utils.CreateBackup(filename)
617
    self.assertFileContent(bname, "")
618
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
619
    utils.CreateBackup(filename)
620
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
621
    utils.CreateBackup(filename)
622
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
623

    
624
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
625
    os.mkfifo(fifoname)
626
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
627

    
628
  def testContent(self):
629
    bkpcount = 0
630
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
631
      for rep in [1, 2, 10, 127]:
632
        testdata = data * rep
633

    
634
        filename = utils.PathJoin(self.tmpdir, "test.data_")
635
        utils.WriteFile(filename, data=testdata)
636
        self.assertFileContent(filename, testdata)
637

    
638
        for _ in range(3):
639
          bname = utils.CreateBackup(filename)
640
          bkpcount += 1
641
          self.assertFileContent(bname, testdata)
642
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
643

    
644

    
645
class TestFormatUnit(unittest.TestCase):
646
  """Test case for the FormatUnit function"""
647

    
648
  def testMiB(self):
649
    self.assertEqual(FormatUnit(1, 'h'), '1M')
650
    self.assertEqual(FormatUnit(100, 'h'), '100M')
651
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
652

    
653
    self.assertEqual(FormatUnit(1, 'm'), '1')
654
    self.assertEqual(FormatUnit(100, 'm'), '100')
655
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
656

    
657
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
658
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
659
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
660
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
661

    
662
  def testGiB(self):
663
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
664
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
665
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
666
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
667

    
668
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
669
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
670
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
671
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
672

    
673
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
674
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
675
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
676

    
677
  def testTiB(self):
678
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
679
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
680
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
681

    
682
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
683
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
684
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
685

    
686
class TestParseUnit(unittest.TestCase):
687
  """Test case for the ParseUnit function"""
688

    
689
  SCALES = (('', 1),
690
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
691
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
692
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
693

    
694
  def testRounding(self):
695
    self.assertEqual(ParseUnit('0'), 0)
696
    self.assertEqual(ParseUnit('1'), 4)
697
    self.assertEqual(ParseUnit('2'), 4)
698
    self.assertEqual(ParseUnit('3'), 4)
699

    
700
    self.assertEqual(ParseUnit('124'), 124)
701
    self.assertEqual(ParseUnit('125'), 128)
702
    self.assertEqual(ParseUnit('126'), 128)
703
    self.assertEqual(ParseUnit('127'), 128)
704
    self.assertEqual(ParseUnit('128'), 128)
705
    self.assertEqual(ParseUnit('129'), 132)
706
    self.assertEqual(ParseUnit('130'), 132)
707

    
708
  def testFloating(self):
709
    self.assertEqual(ParseUnit('0'), 0)
710
    self.assertEqual(ParseUnit('0.5'), 4)
711
    self.assertEqual(ParseUnit('1.75'), 4)
712
    self.assertEqual(ParseUnit('1.99'), 4)
713
    self.assertEqual(ParseUnit('2.00'), 4)
714
    self.assertEqual(ParseUnit('2.01'), 4)
715
    self.assertEqual(ParseUnit('3.99'), 4)
716
    self.assertEqual(ParseUnit('4.00'), 4)
717
    self.assertEqual(ParseUnit('4.01'), 8)
718
    self.assertEqual(ParseUnit('1.5G'), 1536)
719
    self.assertEqual(ParseUnit('1.8G'), 1844)
720
    self.assertEqual(ParseUnit('8.28T'), 8682212)
721

    
722
  def testSuffixes(self):
723
    for sep in ('', ' ', '   ', "\t", "\t "):
724
      for suffix, scale in TestParseUnit.SCALES:
725
        for func in (lambda x: x, str.lower, str.upper):
726
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
727
                           1024 * scale)
728

    
729
  def testInvalidInput(self):
730
    for sep in ('-', '_', ',', 'a'):
731
      for suffix, _ in TestParseUnit.SCALES:
732
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
733

    
734
    for suffix, _ in TestParseUnit.SCALES:
735
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
736

    
737

    
738
class TestSshKeys(testutils.GanetiTestCase):
739
  """Test case for the AddAuthorizedKey function"""
740

    
741
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
742
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
743
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
744

    
745
  def setUp(self):
746
    testutils.GanetiTestCase.setUp(self)
747
    self.tmpname = self._CreateTempFile()
748
    handle = open(self.tmpname, 'w')
749
    try:
750
      handle.write("%s\n" % TestSshKeys.KEY_A)
751
      handle.write("%s\n" % TestSshKeys.KEY_B)
752
    finally:
753
      handle.close()
754

    
755
  def testAddingNewKey(self):
756
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
757

    
758
    self.assertFileContent(self.tmpname,
759
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
760
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
761
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
762
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
763

    
764
  def testAddingAlmostButNotCompletelyTheSameKey(self):
765
    AddAuthorizedKey(self.tmpname,
766
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
767

    
768
    self.assertFileContent(self.tmpname,
769
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
770
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
771
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
772
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
773

    
774
  def testAddingExistingKeyWithSomeMoreSpaces(self):
775
    AddAuthorizedKey(self.tmpname,
776
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
777

    
778
    self.assertFileContent(self.tmpname,
779
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
780
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
781
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
782

    
783
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
784
    RemoveAuthorizedKey(self.tmpname,
785
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
786

    
787
    self.assertFileContent(self.tmpname,
788
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
789
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
790

    
791
  def testRemovingNonExistingKey(self):
792
    RemoveAuthorizedKey(self.tmpname,
793
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
794

    
795
    self.assertFileContent(self.tmpname,
796
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
797
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
798
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
799

    
800

    
801
class TestEtcHosts(testutils.GanetiTestCase):
802
  """Test functions modifying /etc/hosts"""
803

    
804
  def setUp(self):
805
    testutils.GanetiTestCase.setUp(self)
806
    self.tmpname = self._CreateTempFile()
807
    handle = open(self.tmpname, 'w')
808
    try:
809
      handle.write('# This is a test file for /etc/hosts\n')
810
      handle.write('127.0.0.1\tlocalhost\n')
811
      handle.write('192.168.1.1 router gw\n')
812
    finally:
813
      handle.close()
814

    
815
  def testSettingNewIp(self):
816
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
817

    
818
    self.assertFileContent(self.tmpname,
819
      "# This is a test file for /etc/hosts\n"
820
      "127.0.0.1\tlocalhost\n"
821
      "192.168.1.1 router gw\n"
822
      "1.2.3.4\tmyhost.domain.tld myhost\n")
823
    self.assertFileMode(self.tmpname, 0644)
824

    
825
  def testSettingExistingIp(self):
826
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
827
                     ['myhost'])
828

    
829
    self.assertFileContent(self.tmpname,
830
      "# This is a test file for /etc/hosts\n"
831
      "127.0.0.1\tlocalhost\n"
832
      "192.168.1.1\tmyhost.domain.tld myhost\n")
833
    self.assertFileMode(self.tmpname, 0644)
834

    
835
  def testSettingDuplicateName(self):
836
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
837

    
838
    self.assertFileContent(self.tmpname,
839
      "# This is a test file for /etc/hosts\n"
840
      "127.0.0.1\tlocalhost\n"
841
      "192.168.1.1 router gw\n"
842
      "1.2.3.4\tmyhost\n")
843
    self.assertFileMode(self.tmpname, 0644)
844

    
845
  def testRemovingExistingHost(self):
846
    RemoveEtcHostsEntry(self.tmpname, 'router')
847

    
848
    self.assertFileContent(self.tmpname,
849
      "# This is a test file for /etc/hosts\n"
850
      "127.0.0.1\tlocalhost\n"
851
      "192.168.1.1 gw\n")
852
    self.assertFileMode(self.tmpname, 0644)
853

    
854
  def testRemovingSingleExistingHost(self):
855
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
856

    
857
    self.assertFileContent(self.tmpname,
858
      "# This is a test file for /etc/hosts\n"
859
      "192.168.1.1 router gw\n")
860
    self.assertFileMode(self.tmpname, 0644)
861

    
862
  def testRemovingNonExistingHost(self):
863
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
864

    
865
    self.assertFileContent(self.tmpname,
866
      "# This is a test file for /etc/hosts\n"
867
      "127.0.0.1\tlocalhost\n"
868
      "192.168.1.1 router gw\n")
869
    self.assertFileMode(self.tmpname, 0644)
870

    
871
  def testRemovingAlias(self):
872
    RemoveEtcHostsEntry(self.tmpname, 'gw')
873

    
874
    self.assertFileContent(self.tmpname,
875
      "# This is a test file for /etc/hosts\n"
876
      "127.0.0.1\tlocalhost\n"
877
      "192.168.1.1 router\n")
878
    self.assertFileMode(self.tmpname, 0644)
879

    
880

    
881
class TestShellQuoting(unittest.TestCase):
882
  """Test case for shell quoting functions"""
883

    
884
  def testShellQuote(self):
885
    self.assertEqual(ShellQuote('abc'), "abc")
886
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
887
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
888
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
889
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
890

    
891
  def testShellQuoteArgs(self):
892
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
893
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
894
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
895

    
896

    
897
class TestTcpPing(unittest.TestCase):
898
  """Testcase for TCP version of ping - against listen(2)ing port"""
899

    
900
  def setUp(self):
901
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
902
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
903
    self.listenerport = self.listener.getsockname()[1]
904
    self.listener.listen(1)
905

    
906
  def tearDown(self):
907
    self.listener.shutdown(socket.SHUT_RDWR)
908
    del self.listener
909
    del self.listenerport
910

    
911
  def testTcpPingToLocalHostAccept(self):
912
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
913
                         self.listenerport,
914
                         timeout=10,
915
                         live_port_needed=True,
916
                         source=constants.LOCALHOST_IP_ADDRESS,
917
                         ),
918
                 "failed to connect to test listener")
919

    
920
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
921
                         self.listenerport,
922
                         timeout=10,
923
                         live_port_needed=True,
924
                         ),
925
                 "failed to connect to test listener (no source)")
926

    
927

    
928
class TestTcpPingDeaf(unittest.TestCase):
929
  """Testcase for TCP version of ping - against non listen(2)ing port"""
930

    
931
  def setUp(self):
932
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
933
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
934
    self.deaflistenerport = self.deaflistener.getsockname()[1]
935

    
936
  def tearDown(self):
937
    del self.deaflistener
938
    del self.deaflistenerport
939

    
940
  def testTcpPingToLocalHostAcceptDeaf(self):
941
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
942
                        self.deaflistenerport,
943
                        timeout=constants.TCP_PING_TIMEOUT,
944
                        live_port_needed=True,
945
                        source=constants.LOCALHOST_IP_ADDRESS,
946
                        ), # need successful connect(2)
947
                "successfully connected to deaf listener")
948

    
949
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
950
                        self.deaflistenerport,
951
                        timeout=constants.TCP_PING_TIMEOUT,
952
                        live_port_needed=True,
953
                        ), # need successful connect(2)
954
                "successfully connected to deaf listener (no source addr)")
955

    
956
  def testTcpPingToLocalHostNoAccept(self):
957
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
958
                         self.deaflistenerport,
959
                         timeout=constants.TCP_PING_TIMEOUT,
960
                         live_port_needed=False,
961
                         source=constants.LOCALHOST_IP_ADDRESS,
962
                         ), # ECONNREFUSED is OK
963
                 "failed to ping alive host on deaf port")
964

    
965
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
966
                         self.deaflistenerport,
967
                         timeout=constants.TCP_PING_TIMEOUT,
968
                         live_port_needed=False,
969
                         ), # ECONNREFUSED is OK
970
                 "failed to ping alive host on deaf port (no source addr)")
971

    
972

    
973
class TestOwnIpAddress(unittest.TestCase):
974
  """Testcase for OwnIpAddress"""
975

    
976
  def testOwnLoopback(self):
977
    """check having the loopback ip"""
978
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
979
                    "Should own the loopback address")
980

    
981
  def testNowOwnAddress(self):
982
    """check that I don't own an address"""
983

    
984
    # network 192.0.2.0/24 is reserved for test/documentation as per
985
    # rfc 3330, so we *should* not have an address of this range... if
986
    # this fails, we should extend the test to multiple addresses
987
    DST_IP = "192.0.2.1"
988
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
989

    
990

    
991
def _GetSocketCredentials(path):
992
  """Connect to a Unix socket and return remote credentials.
993

994
  """
995
  sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
996
  try:
997
    sock.settimeout(10)
998
    sock.connect(path)
999
    return utils.GetSocketCredentials(sock)
1000
  finally:
1001
    sock.close()
1002

    
1003

    
1004
class TestGetSocketCredentials(unittest.TestCase):
1005
  def setUp(self):
1006
    self.tmpdir = tempfile.mkdtemp()
1007
    self.sockpath = utils.PathJoin(self.tmpdir, "sock")
1008

    
1009
    self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1010
    self.listener.settimeout(10)
1011
    self.listener.bind(self.sockpath)
1012
    self.listener.listen(1)
1013

    
1014
  def tearDown(self):
1015
    self.listener.shutdown(socket.SHUT_RDWR)
1016
    self.listener.close()
1017
    shutil.rmtree(self.tmpdir)
1018

    
1019
  def test(self):
1020
    (c2pr, c2pw) = os.pipe()
1021

    
1022
    # Start child process
1023
    child = os.fork()
1024
    if child == 0:
1025
      try:
1026
        data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
1027

    
1028
        os.write(c2pw, data)
1029
        os.close(c2pw)
1030

    
1031
        os._exit(0)
1032
      finally:
1033
        os._exit(1)
1034

    
1035
    os.close(c2pw)
1036

    
1037
    # Wait for one connection
1038
    (conn, _) = self.listener.accept()
1039
    conn.recv(1)
1040
    conn.close()
1041

    
1042
    # Wait for result
1043
    result = os.read(c2pr, 4096)
1044
    os.close(c2pr)
1045

    
1046
    # Check child's exit code
1047
    (_, status) = os.waitpid(child, 0)
1048
    self.assertFalse(os.WIFSIGNALED(status))
1049
    self.assertEqual(os.WEXITSTATUS(status), 0)
1050

    
1051
    # Check result
1052
    (pid, uid, gid) = serializer.LoadJson(result)
1053
    self.assertEqual(pid, os.getpid())
1054
    self.assertEqual(uid, os.getuid())
1055
    self.assertEqual(gid, os.getgid())
1056

    
1057

    
1058
class TestListVisibleFiles(unittest.TestCase):
1059
  """Test case for ListVisibleFiles"""
1060

    
1061
  def setUp(self):
1062
    self.path = tempfile.mkdtemp()
1063

    
1064
  def tearDown(self):
1065
    shutil.rmtree(self.path)
1066

    
1067
  def _test(self, files, expected):
1068
    # Sort a copy
1069
    expected = expected[:]
1070
    expected.sort()
1071

    
1072
    for name in files:
1073
      f = open(os.path.join(self.path, name), 'w')
1074
      try:
1075
        f.write("Test\n")
1076
      finally:
1077
        f.close()
1078

    
1079
    found = ListVisibleFiles(self.path)
1080
    found.sort()
1081

    
1082
    self.assertEqual(found, expected)
1083

    
1084
  def testAllVisible(self):
1085
    files = ["a", "b", "c"]
1086
    expected = files
1087
    self._test(files, expected)
1088

    
1089
  def testNoneVisible(self):
1090
    files = [".a", ".b", ".c"]
1091
    expected = []
1092
    self._test(files, expected)
1093

    
1094
  def testSomeVisible(self):
1095
    files = ["a", "b", ".c"]
1096
    expected = ["a", "b"]
1097
    self._test(files, expected)
1098

    
1099
  def testNonAbsolutePath(self):
1100
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1101

    
1102
  def testNonNormalizedPath(self):
1103
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1104
                          "/bin/../tmp")
1105

    
1106

    
1107
class TestNewUUID(unittest.TestCase):
1108
  """Test case for NewUUID"""
1109

    
1110
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1111
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
1112

    
1113
  def runTest(self):
1114
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
1115

    
1116

    
1117
class TestUniqueSequence(unittest.TestCase):
1118
  """Test case for UniqueSequence"""
1119

    
1120
  def _test(self, input, expected):
1121
    self.assertEqual(utils.UniqueSequence(input), expected)
1122

    
1123
  def runTest(self):
1124
    # Ordered input
1125
    self._test([1, 2, 3], [1, 2, 3])
1126
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1127
    self._test([1, 2, 2, 3], [1, 2, 3])
1128
    self._test([1, 2, 3, 3], [1, 2, 3])
1129

    
1130
    # Unordered input
1131
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1132
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1133

    
1134
    # Strings
1135
    self._test(["a", "a"], ["a"])
1136
    self._test(["a", "b"], ["a", "b"])
1137
    self._test(["a", "b", "a"], ["a", "b"])
1138

    
1139

    
1140
class TestFirstFree(unittest.TestCase):
1141
  """Test case for the FirstFree function"""
1142

    
1143
  def test(self):
1144
    """Test FirstFree"""
1145
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1146
    self.failUnlessEqual(FirstFree([]), None)
1147
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1148
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1149
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1150

    
1151

    
1152
class TestTailFile(testutils.GanetiTestCase):
1153
  """Test case for the TailFile function"""
1154

    
1155
  def testEmpty(self):
1156
    fname = self._CreateTempFile()
1157
    self.failUnlessEqual(TailFile(fname), [])
1158
    self.failUnlessEqual(TailFile(fname, lines=25), [])
1159

    
1160
  def testAllLines(self):
1161
    data = ["test %d" % i for i in range(30)]
1162
    for i in range(30):
1163
      fname = self._CreateTempFile()
1164
      fd = open(fname, "w")
1165
      fd.write("\n".join(data[:i]))
1166
      if i > 0:
1167
        fd.write("\n")
1168
      fd.close()
1169
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1170

    
1171
  def testPartialLines(self):
1172
    data = ["test %d" % i for i in range(30)]
1173
    fname = self._CreateTempFile()
1174
    fd = open(fname, "w")
1175
    fd.write("\n".join(data))
1176
    fd.write("\n")
1177
    fd.close()
1178
    for i in range(1, 30):
1179
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1180

    
1181
  def testBigFile(self):
1182
    data = ["test %d" % i for i in range(30)]
1183
    fname = self._CreateTempFile()
1184
    fd = open(fname, "w")
1185
    fd.write("X" * 1048576)
1186
    fd.write("\n")
1187
    fd.write("\n".join(data))
1188
    fd.write("\n")
1189
    fd.close()
1190
    for i in range(1, 30):
1191
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1192

    
1193

    
1194
class _BaseFileLockTest:
1195
  """Test case for the FileLock class"""
1196

    
1197
  def testSharedNonblocking(self):
1198
    self.lock.Shared(blocking=False)
1199
    self.lock.Close()
1200

    
1201
  def testExclusiveNonblocking(self):
1202
    self.lock.Exclusive(blocking=False)
1203
    self.lock.Close()
1204

    
1205
  def testUnlockNonblocking(self):
1206
    self.lock.Unlock(blocking=False)
1207
    self.lock.Close()
1208

    
1209
  def testSharedBlocking(self):
1210
    self.lock.Shared(blocking=True)
1211
    self.lock.Close()
1212

    
1213
  def testExclusiveBlocking(self):
1214
    self.lock.Exclusive(blocking=True)
1215
    self.lock.Close()
1216

    
1217
  def testUnlockBlocking(self):
1218
    self.lock.Unlock(blocking=True)
1219
    self.lock.Close()
1220

    
1221
  def testSharedExclusiveUnlock(self):
1222
    self.lock.Shared(blocking=False)
1223
    self.lock.Exclusive(blocking=False)
1224
    self.lock.Unlock(blocking=False)
1225
    self.lock.Close()
1226

    
1227
  def testExclusiveSharedUnlock(self):
1228
    self.lock.Exclusive(blocking=False)
1229
    self.lock.Shared(blocking=False)
1230
    self.lock.Unlock(blocking=False)
1231
    self.lock.Close()
1232

    
1233
  def testSimpleTimeout(self):
1234
    # These will succeed on the first attempt, hence a short timeout
1235
    self.lock.Shared(blocking=True, timeout=10.0)
1236
    self.lock.Exclusive(blocking=False, timeout=10.0)
1237
    self.lock.Unlock(blocking=True, timeout=10.0)
1238
    self.lock.Close()
1239

    
1240
  @staticmethod
1241
  def _TryLockInner(filename, shared, blocking):
1242
    lock = utils.FileLock.Open(filename)
1243

    
1244
    if shared:
1245
      fn = lock.Shared
1246
    else:
1247
      fn = lock.Exclusive
1248

    
1249
    try:
1250
      # The timeout doesn't really matter as the parent process waits for us to
1251
      # finish anyway.
1252
      fn(blocking=blocking, timeout=0.01)
1253
    except errors.LockError, err:
1254
      return False
1255

    
1256
    return True
1257

    
1258
  def _TryLock(self, *args):
1259
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1260
                                      *args)
1261

    
1262
  def testTimeout(self):
1263
    for blocking in [True, False]:
1264
      self.lock.Exclusive(blocking=True)
1265
      self.failIf(self._TryLock(False, blocking))
1266
      self.failIf(self._TryLock(True, blocking))
1267

    
1268
      self.lock.Shared(blocking=True)
1269
      self.assert_(self._TryLock(True, blocking))
1270
      self.failIf(self._TryLock(False, blocking))
1271

    
1272
  def testCloseShared(self):
1273
    self.lock.Close()
1274
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1275

    
1276
  def testCloseExclusive(self):
1277
    self.lock.Close()
1278
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1279

    
1280
  def testCloseUnlock(self):
1281
    self.lock.Close()
1282
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1283

    
1284

    
1285
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1286
  TESTDATA = "Hello World\n" * 10
1287

    
1288
  def setUp(self):
1289
    testutils.GanetiTestCase.setUp(self)
1290

    
1291
    self.tmpfile = tempfile.NamedTemporaryFile()
1292
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1293
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1294

    
1295
    # Ensure "Open" didn't truncate file
1296
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1297

    
1298
  def tearDown(self):
1299
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1300

    
1301
    testutils.GanetiTestCase.tearDown(self)
1302

    
1303

    
1304
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1305
  def setUp(self):
1306
    self.tmpfile = tempfile.NamedTemporaryFile()
1307
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1308

    
1309

    
1310
class TestTimeFunctions(unittest.TestCase):
1311
  """Test case for time functions"""
1312

    
1313
  def runTest(self):
1314
    self.assertEqual(utils.SplitTime(1), (1, 0))
1315
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1316
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1317
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1318
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1319
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1320
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1321
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1322

    
1323
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1324

    
1325
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1326
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1327
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1328

    
1329
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1330
                     1218448917.481)
1331
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1332

    
1333
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1334
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1335
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1336
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1337
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1338

    
1339

    
1340
class FieldSetTestCase(unittest.TestCase):
1341
  """Test case for FieldSets"""
1342

    
1343
  def testSimpleMatch(self):
1344
    f = utils.FieldSet("a", "b", "c", "def")
1345
    self.failUnless(f.Matches("a"))
1346
    self.failIf(f.Matches("d"), "Substring matched")
1347
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1348
    self.failIf(f.NonMatching(["b", "c"]))
1349
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1350
    self.failUnless(f.NonMatching(["a", "d"]))
1351

    
1352
  def testRegexMatch(self):
1353
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1354
    self.failUnless(f.Matches("b1"))
1355
    self.failUnless(f.Matches("b99"))
1356
    self.failIf(f.Matches("b/1"))
1357
    self.failIf(f.NonMatching(["b12", "c"]))
1358
    self.failUnless(f.NonMatching(["a", "1"]))
1359

    
1360
class TestForceDictType(unittest.TestCase):
1361
  """Test case for ForceDictType"""
1362

    
1363
  def setUp(self):
1364
    self.key_types = {
1365
      'a': constants.VTYPE_INT,
1366
      'b': constants.VTYPE_BOOL,
1367
      'c': constants.VTYPE_STRING,
1368
      'd': constants.VTYPE_SIZE,
1369
      }
1370

    
1371
  def _fdt(self, dict, allowed_values=None):
1372
    if allowed_values is None:
1373
      ForceDictType(dict, self.key_types)
1374
    else:
1375
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1376

    
1377
    return dict
1378

    
1379
  def testSimpleDict(self):
1380
    self.assertEqual(self._fdt({}), {})
1381
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1382
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1383
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1384
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1385
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1386
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1387
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1388
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1389
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1390
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1391
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1392

    
1393
  def testErrors(self):
1394
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1395
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1396
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1397
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1398

    
1399

    
1400
class TestIsNormAbsPath(unittest.TestCase):
1401
  """Testing case for IsNormAbsPath"""
1402

    
1403
  def _pathTestHelper(self, path, result):
1404
    if result:
1405
      self.assert_(IsNormAbsPath(path),
1406
          "Path %s should result absolute and normalized" % path)
1407
    else:
1408
      self.assert_(not IsNormAbsPath(path),
1409
          "Path %s should not result absolute and normalized" % path)
1410

    
1411
  def testBase(self):
1412
    self._pathTestHelper('/etc', True)
1413
    self._pathTestHelper('/srv', True)
1414
    self._pathTestHelper('etc', False)
1415
    self._pathTestHelper('/etc/../root', False)
1416
    self._pathTestHelper('/etc/', False)
1417

    
1418

    
1419
class TestSafeEncode(unittest.TestCase):
1420
  """Test case for SafeEncode"""
1421

    
1422
  def testAscii(self):
1423
    for txt in [string.digits, string.letters, string.punctuation]:
1424
      self.failUnlessEqual(txt, SafeEncode(txt))
1425

    
1426
  def testDoubleEncode(self):
1427
    for i in range(255):
1428
      txt = SafeEncode(chr(i))
1429
      self.failUnlessEqual(txt, SafeEncode(txt))
1430

    
1431
  def testUnicode(self):
1432
    # 1024 is high enough to catch non-direct ASCII mappings
1433
    for i in range(1024):
1434
      txt = SafeEncode(unichr(i))
1435
      self.failUnlessEqual(txt, SafeEncode(txt))
1436

    
1437

    
1438
class TestFormatTime(unittest.TestCase):
1439
  """Testing case for FormatTime"""
1440

    
1441
  def testNone(self):
1442
    self.failUnlessEqual(FormatTime(None), "N/A")
1443

    
1444
  def testInvalid(self):
1445
    self.failUnlessEqual(FormatTime(()), "N/A")
1446

    
1447
  def testNow(self):
1448
    # tests that we accept time.time input
1449
    FormatTime(time.time())
1450
    # tests that we accept int input
1451
    FormatTime(int(time.time()))
1452

    
1453

    
1454
class RunInSeparateProcess(unittest.TestCase):
1455
  def test(self):
1456
    for exp in [True, False]:
1457
      def _child():
1458
        return exp
1459

    
1460
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1461

    
1462
  def testArgs(self):
1463
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1464
      def _child(carg1, carg2):
1465
        return carg1 == "Foo" and carg2 == arg
1466

    
1467
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1468

    
1469
  def testPid(self):
1470
    parent_pid = os.getpid()
1471

    
1472
    def _check():
1473
      return os.getpid() == parent_pid
1474

    
1475
    self.failIf(utils.RunInSeparateProcess(_check))
1476

    
1477
  def testSignal(self):
1478
    def _kill():
1479
      os.kill(os.getpid(), signal.SIGTERM)
1480

    
1481
    self.assertRaises(errors.GenericError,
1482
                      utils.RunInSeparateProcess, _kill)
1483

    
1484
  def testException(self):
1485
    def _exc():
1486
      raise errors.GenericError("This is a test")
1487

    
1488
    self.assertRaises(errors.GenericError,
1489
                      utils.RunInSeparateProcess, _exc)
1490

    
1491

    
1492
class TestFingerprintFile(unittest.TestCase):
1493
  def setUp(self):
1494
    self.tmpfile = tempfile.NamedTemporaryFile()
1495

    
1496
  def test(self):
1497
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1498
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1499

    
1500
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1501
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1502
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1503

    
1504

    
1505
class TestUnescapeAndSplit(unittest.TestCase):
1506
  """Testing case for UnescapeAndSplit"""
1507

    
1508
  def setUp(self):
1509
    # testing more that one separator for regexp safety
1510
    self._seps = [",", "+", "."]
1511

    
1512
  def testSimple(self):
1513
    a = ["a", "b", "c", "d"]
1514
    for sep in self._seps:
1515
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1516

    
1517
  def testEscape(self):
1518
    for sep in self._seps:
1519
      a = ["a", "b\\" + sep + "c", "d"]
1520
      b = ["a", "b" + sep + "c", "d"]
1521
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1522

    
1523
  def testDoubleEscape(self):
1524
    for sep in self._seps:
1525
      a = ["a", "b\\\\", "c", "d"]
1526
      b = ["a", "b\\", "c", "d"]
1527
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1528

    
1529
  def testThreeEscape(self):
1530
    for sep in self._seps:
1531
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1532
      b = ["a", "b\\" + sep + "c", "d"]
1533
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1534

    
1535

    
1536
class TestPathJoin(unittest.TestCase):
1537
  """Testing case for PathJoin"""
1538

    
1539
  def testBasicItems(self):
1540
    mlist = ["/a", "b", "c"]
1541
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1542

    
1543
  def testNonAbsPrefix(self):
1544
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1545

    
1546
  def testBackTrack(self):
1547
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1548

    
1549
  def testMultiAbs(self):
1550
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1551

    
1552

    
1553
class TestHostInfo(unittest.TestCase):
1554
  """Testing case for HostInfo"""
1555

    
1556
  def testUppercase(self):
1557
    data = "AbC.example.com"
1558
    self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1559

    
1560
  def testTooLongName(self):
1561
    data = "a.b." + "c" * 255
1562
    self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1563

    
1564
  def testTrailingDot(self):
1565
    data = "a.b.c"
1566
    self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1567

    
1568
  def testInvalidName(self):
1569
    data = [
1570
      "a b",
1571
      "a/b",
1572
      ".a.b",
1573
      "a..b",
1574
      ]
1575
    for value in data:
1576
      self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1577

    
1578
  def testValidName(self):
1579
    data = [
1580
      "a.b",
1581
      "a-b",
1582
      "a_b",
1583
      "a.b.c",
1584
      ]
1585
    for value in data:
1586
      HostInfo.NormalizeName(value)
1587

    
1588

    
1589
class TestParseAsn1Generalizedtime(unittest.TestCase):
1590
  def test(self):
1591
    # UTC
1592
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1593
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1594
                     1266860512)
1595
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1596
                     (2**31) - 1)
1597

    
1598
    # With offset
1599
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1600
                     1266860512)
1601
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1602
                     1266931012)
1603
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1604
                     1266931088)
1605
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1606
                     1266931295)
1607
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1608
                     3600)
1609

    
1610
    # Leap seconds are not supported by datetime.datetime
1611
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1612
                      "19841231235960+0000")
1613
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1614
                      "19920630235960+0000")
1615

    
1616
    # Errors
1617
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1618
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1619
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1620
                      "20100222174152")
1621
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1622
                      "Mon Feb 22 17:47:02 UTC 2010")
1623
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1624
                      "2010-02-22 17:42:02")
1625

    
1626

    
1627
class TestGetX509CertValidity(testutils.GanetiTestCase):
1628
  def setUp(self):
1629
    testutils.GanetiTestCase.setUp(self)
1630

    
1631
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1632

    
1633
    # Test whether we have pyOpenSSL 0.7 or above
1634
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1635

    
1636
    if not self.pyopenssl0_7:
1637
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1638
                    " function correctly")
1639

    
1640
  def _LoadCert(self, name):
1641
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1642
                                           self._ReadTestData(name))
1643

    
1644
  def test(self):
1645
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1646
    if self.pyopenssl0_7:
1647
      self.assertEqual(validity, (1266919967, 1267524767))
1648
    else:
1649
      self.assertEqual(validity, (None, None))
1650

    
1651

    
1652
class TestMakedirs(unittest.TestCase):
1653
  def setUp(self):
1654
    self.tmpdir = tempfile.mkdtemp()
1655

    
1656
  def tearDown(self):
1657
    shutil.rmtree(self.tmpdir)
1658

    
1659
  def testNonExisting(self):
1660
    path = utils.PathJoin(self.tmpdir, "foo")
1661
    utils.Makedirs(path)
1662
    self.assert_(os.path.isdir(path))
1663

    
1664
  def testExisting(self):
1665
    path = utils.PathJoin(self.tmpdir, "foo")
1666
    os.mkdir(path)
1667
    utils.Makedirs(path)
1668
    self.assert_(os.path.isdir(path))
1669

    
1670
  def testRecursiveNonExisting(self):
1671
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
1672
    utils.Makedirs(path)
1673
    self.assert_(os.path.isdir(path))
1674

    
1675
  def testRecursiveExisting(self):
1676
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
1677
    self.assert_(not os.path.exists(path))
1678
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
1679
    utils.Makedirs(path)
1680
    self.assert_(os.path.isdir(path))
1681

    
1682

    
1683
class TestRetry(testutils.GanetiTestCase):
1684
  def setUp(self):
1685
    testutils.GanetiTestCase.setUp(self)
1686
    self.retries = 0
1687

    
1688
  @staticmethod
1689
  def _RaiseRetryAgain():
1690
    raise utils.RetryAgain()
1691

    
1692
  @staticmethod
1693
  def _RaiseRetryAgainWithArg(args):
1694
    raise utils.RetryAgain(*args)
1695

    
1696
  def _WrongNestedLoop(self):
1697
    return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
1698

    
1699
  def _RetryAndSucceed(self, retries):
1700
    if self.retries < retries:
1701
      self.retries += 1
1702
      raise utils.RetryAgain()
1703
    else:
1704
      return True
1705

    
1706
  def testRaiseTimeout(self):
1707
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1708
                          self._RaiseRetryAgain, 0.01, 0.02)
1709
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1710
                          self._RetryAndSucceed, 0.01, 0, args=[1])
1711
    self.failUnlessEqual(self.retries, 1)
1712

    
1713
  def testComplete(self):
1714
    self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
1715
    self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
1716
                         True)
1717
    self.failUnlessEqual(self.retries, 2)
1718

    
1719
  def testNestedLoop(self):
1720
    try:
1721
      self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
1722
                            self._WrongNestedLoop, 0, 1)
1723
    except utils.RetryTimeout:
1724
      self.fail("Didn't detect inner loop's exception")
1725

    
1726
  def testTimeoutArgument(self):
1727
    retry_arg="my_important_debugging_message"
1728
    try:
1729
      utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
1730
    except utils.RetryTimeout, err:
1731
      self.failUnlessEqual(err.args, (retry_arg, ))
1732
    else:
1733
      self.fail("Expected timeout didn't happen")
1734

    
1735
  def testRaiseInnerWithExc(self):
1736
    retry_arg="my_important_debugging_message"
1737
    try:
1738
      try:
1739
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1740
                    args=[[errors.GenericError(retry_arg, retry_arg)]])
1741
      except utils.RetryTimeout, err:
1742
        err.RaiseInner()
1743
      else:
1744
        self.fail("Expected timeout didn't happen")
1745
    except errors.GenericError, err:
1746
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1747
    else:
1748
      self.fail("Expected GenericError didn't happen")
1749

    
1750
  def testRaiseInnerWithMsg(self):
1751
    retry_arg="my_important_debugging_message"
1752
    try:
1753
      try:
1754
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1755
                    args=[[retry_arg, retry_arg]])
1756
      except utils.RetryTimeout, err:
1757
        err.RaiseInner()
1758
      else:
1759
        self.fail("Expected timeout didn't happen")
1760
    except utils.RetryTimeout, err:
1761
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1762
    else:
1763
      self.fail("Expected RetryTimeout didn't happen")
1764

    
1765

    
1766
class TestLineSplitter(unittest.TestCase):
1767
  def test(self):
1768
    lines = []
1769
    ls = utils.LineSplitter(lines.append)
1770
    ls.write("Hello World\n")
1771
    self.assertEqual(lines, [])
1772
    ls.write("Foo\n Bar\r\n ")
1773
    ls.write("Baz")
1774
    ls.write("Moo")
1775
    self.assertEqual(lines, [])
1776
    ls.flush()
1777
    self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
1778
    ls.close()
1779
    self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
1780

    
1781
  def _testExtra(self, line, all_lines, p1, p2):
1782
    self.assertEqual(p1, 999)
1783
    self.assertEqual(p2, "extra")
1784
    all_lines.append(line)
1785

    
1786
  def testExtraArgsNoFlush(self):
1787
    lines = []
1788
    ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
1789
    ls.write("\n\nHello World\n")
1790
    ls.write("Foo\n Bar\r\n ")
1791
    ls.write("")
1792
    ls.write("Baz")
1793
    ls.write("Moo\n\nx\n")
1794
    self.assertEqual(lines, [])
1795
    ls.close()
1796
    self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
1797
                             "", "x"])
1798

    
1799

    
1800
if __name__ == '__main__':
1801
  testutils.GanetiTestProgram()