Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 7fcffe27

History | View | Annotate | Download (72 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import distutils.version
25
import errno
26
import fcntl
27
import glob
28
import os
29
import os.path
30
import re
31
import shutil
32
import signal
33
import socket
34
import stat
35
import string
36
import tempfile
37
import time
38
import unittest
39
import warnings
40
import OpenSSL
41
import random
42
import operator
43

    
44
import testutils
45
from ganeti import constants
46
from ganeti import compat
47
from ganeti import utils
48
from ganeti import errors
49
from ganeti.utils import RunCmd, RemoveFile, \
50
     ListVisibleFiles, FirstFree, \
51
     TailFile, RunParts, PathJoin, \
52
     ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
53

    
54

    
55
class TestIsProcessAlive(unittest.TestCase):
56
  """Testing case for IsProcessAlive"""
57

    
58
  def testExists(self):
59
    mypid = os.getpid()
60
    self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
61

    
62
  def testNotExisting(self):
63
    pid_non_existing = os.fork()
64
    if pid_non_existing == 0:
65
      os._exit(0)
66
    elif pid_non_existing < 0:
67
      raise SystemError("can't fork")
68
    os.waitpid(pid_non_existing, 0)
69
    self.assertFalse(utils.IsProcessAlive(pid_non_existing),
70
                     "nonexisting process detected")
71

    
72

    
73
class TestGetProcStatusPath(unittest.TestCase):
74
  def test(self):
75
    self.assert_("/1234/" in utils._GetProcStatusPath(1234))
76
    self.assertNotEqual(utils._GetProcStatusPath(1),
77
                        utils._GetProcStatusPath(2))
78

    
79

    
80
class TestIsProcessHandlingSignal(unittest.TestCase):
81
  def setUp(self):
82
    self.tmpdir = tempfile.mkdtemp()
83

    
84
  def tearDown(self):
85
    shutil.rmtree(self.tmpdir)
86

    
87
  def testParseSigsetT(self):
88
    self.assertEqual(len(utils._ParseSigsetT("0")), 0)
89
    self.assertEqual(utils._ParseSigsetT("1"), set([1]))
90
    self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
91
    self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
92
    self.assertEqual(utils._ParseSigsetT("0000000180000202"),
93
                     set([2, 10, 32, 33]))
94
    self.assertEqual(utils._ParseSigsetT("0000000180000002"),
95
                     set([2, 32, 33]))
96
    self.assertEqual(utils._ParseSigsetT("0000000188000002"),
97
                     set([2, 28, 32, 33]))
98
    self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
99
                     set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
100
                          24, 25, 26, 28, 31]))
101
    self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
102

    
103
  def testGetProcStatusField(self):
104
    for field in ["SigCgt", "Name", "FDSize"]:
105
      for value in ["", "0", "cat", "  1234 KB"]:
106
        pstatus = "\n".join([
107
          "VmPeak: 999 kB",
108
          "%s: %s" % (field, value),
109
          "TracerPid: 0",
110
          ])
111
        result = utils._GetProcStatusField(pstatus, field)
112
        self.assertEqual(result, value.strip())
113

    
114
  def test(self):
115
    sp = PathJoin(self.tmpdir, "status")
116

    
117
    utils.WriteFile(sp, data="\n".join([
118
      "Name:   bash",
119
      "State:  S (sleeping)",
120
      "SleepAVG:       98%",
121
      "Pid:    22250",
122
      "PPid:   10858",
123
      "TracerPid:      0",
124
      "SigBlk: 0000000000010000",
125
      "SigIgn: 0000000000384004",
126
      "SigCgt: 000000004b813efb",
127
      "CapEff: 0000000000000000",
128
      ]))
129

    
130
    self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
131

    
132
  def testNoSigCgt(self):
133
    sp = PathJoin(self.tmpdir, "status")
134

    
135
    utils.WriteFile(sp, data="\n".join([
136
      "Name:   bash",
137
      ]))
138

    
139
    self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
140
                      1234, 10, status_path=sp)
141

    
142
  def testNoSuchFile(self):
143
    sp = PathJoin(self.tmpdir, "notexist")
144

    
145
    self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
146

    
147
  @staticmethod
148
  def _TestRealProcess():
149
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
150
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
151
      raise Exception("SIGUSR1 is handled when it should not be")
152

    
153
    signal.signal(signal.SIGUSR1, lambda signum, frame: None)
154
    if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
155
      raise Exception("SIGUSR1 is not handled when it should be")
156

    
157
    signal.signal(signal.SIGUSR1, signal.SIG_IGN)
158
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
159
      raise Exception("SIGUSR1 is not handled when it should be")
160

    
161
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
162
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
163
      raise Exception("SIGUSR1 is handled when it should not be")
164

    
165
    return True
166

    
167
  def testRealProcess(self):
168
    self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
169

    
170

    
171
class TestPidFileFunctions(unittest.TestCase):
172
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
173

    
174
  def setUp(self):
175
    self.dir = tempfile.mkdtemp()
176
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
177
    utils.DaemonPidFileName = self.f_dpn
178

    
179
  def testPidFileFunctions(self):
180
    pid_file = self.f_dpn('test')
181
    fd = utils.WritePidFile(self.f_dpn('test'))
182
    self.failUnless(os.path.exists(pid_file),
183
                    "PID file should have been created")
184
    read_pid = utils.ReadPidFile(pid_file)
185
    self.failUnlessEqual(read_pid, os.getpid())
186
    self.failUnless(utils.IsProcessAlive(read_pid))
187
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
188
                          self.f_dpn('test'))
189
    os.close(fd)
190
    utils.RemovePidFile('test')
191
    self.failIf(os.path.exists(pid_file),
192
                "PID file should not exist anymore")
193
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
194
                         "ReadPidFile should return 0 for missing pid file")
195
    fh = open(pid_file, "w")
196
    fh.write("blah\n")
197
    fh.close()
198
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
199
                         "ReadPidFile should return 0 for invalid pid file")
200
    # but now, even with the file existing, we should be able to lock it
201
    fd = utils.WritePidFile(self.f_dpn('test'))
202
    os.close(fd)
203
    utils.RemovePidFile('test')
204
    self.failIf(os.path.exists(pid_file),
205
                "PID file should not exist anymore")
206

    
207
  def testKill(self):
208
    pid_file = self.f_dpn('child')
209
    r_fd, w_fd = os.pipe()
210
    new_pid = os.fork()
211
    if new_pid == 0: #child
212
      utils.WritePidFile(self.f_dpn('child'))
213
      os.write(w_fd, 'a')
214
      signal.pause()
215
      os._exit(0)
216
      return
217
    # else we are in the parent
218
    # wait until the child has written the pid file
219
    os.read(r_fd, 1)
220
    read_pid = utils.ReadPidFile(pid_file)
221
    self.failUnlessEqual(read_pid, new_pid)
222
    self.failUnless(utils.IsProcessAlive(new_pid))
223
    utils.KillProcess(new_pid, waitpid=True)
224
    self.failIf(utils.IsProcessAlive(new_pid))
225
    utils.RemovePidFile('child')
226
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
227

    
228
  def tearDown(self):
229
    for name in os.listdir(self.dir):
230
      os.unlink(os.path.join(self.dir, name))
231
    os.rmdir(self.dir)
232

    
233

    
234
class TestRunCmd(testutils.GanetiTestCase):
235
  """Testing case for the RunCmd function"""
236

    
237
  def setUp(self):
238
    testutils.GanetiTestCase.setUp(self)
239
    self.magic = time.ctime() + " ganeti test"
240
    self.fname = self._CreateTempFile()
241
    self.fifo_tmpdir = tempfile.mkdtemp()
242
    self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo")
243
    os.mkfifo(self.fifo_file)
244

    
245
  def tearDown(self):
246
    shutil.rmtree(self.fifo_tmpdir)
247
    testutils.GanetiTestCase.tearDown(self)
248

    
249
  def testOk(self):
250
    """Test successful exit code"""
251
    result = RunCmd("/bin/sh -c 'exit 0'")
252
    self.assertEqual(result.exit_code, 0)
253
    self.assertEqual(result.output, "")
254

    
255
  def testFail(self):
256
    """Test fail exit code"""
257
    result = RunCmd("/bin/sh -c 'exit 1'")
258
    self.assertEqual(result.exit_code, 1)
259
    self.assertEqual(result.output, "")
260

    
261
  def testStdout(self):
262
    """Test standard output"""
263
    cmd = 'echo -n "%s"' % self.magic
264
    result = RunCmd("/bin/sh -c '%s'" % cmd)
265
    self.assertEqual(result.stdout, self.magic)
266
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
267
    self.assertEqual(result.output, "")
268
    self.assertFileContent(self.fname, self.magic)
269

    
270
  def testStderr(self):
271
    """Test standard error"""
272
    cmd = 'echo -n "%s"' % self.magic
273
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
274
    self.assertEqual(result.stderr, self.magic)
275
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
276
    self.assertEqual(result.output, "")
277
    self.assertFileContent(self.fname, self.magic)
278

    
279
  def testCombined(self):
280
    """Test combined output"""
281
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
282
    expected = "A" + self.magic + "B" + self.magic
283
    result = RunCmd("/bin/sh -c '%s'" % cmd)
284
    self.assertEqual(result.output, expected)
285
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
286
    self.assertEqual(result.output, "")
287
    self.assertFileContent(self.fname, expected)
288

    
289
  def testSignal(self):
290
    """Test signal"""
291
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
292
    self.assertEqual(result.signal, 15)
293
    self.assertEqual(result.output, "")
294

    
295
  def testTimeoutClean(self):
296
    cmd = "trap 'exit 0' TERM; read < %s" % self.fifo_file
297
    result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
298
    self.assertEqual(result.exit_code, 0)
299

    
300
  def testTimeoutKill(self):
301
    cmd = ["/bin/sh", "-c", "trap '' TERM; read < %s" % self.fifo_file]
302
    timeout = 0.2
303
    out, err, status, ta = utils._RunCmdPipe(cmd, {}, False, "/", False,
304
                                             timeout, _linger_timeout=0.2)
305
    self.assert_(status < 0)
306
    self.assertEqual(-status, signal.SIGKILL)
307

    
308
  def testTimeoutOutputAfterTerm(self):
309
    cmd = "trap 'echo sigtermed; exit 1' TERM; read < %s" % self.fifo_file
310
    result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
311
    self.assert_(result.failed)
312
    self.assertEqual(result.stdout, "sigtermed\n")
313

    
314
  def testListRun(self):
315
    """Test list runs"""
316
    result = RunCmd(["true"])
317
    self.assertEqual(result.signal, None)
318
    self.assertEqual(result.exit_code, 0)
319
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
320
    self.assertEqual(result.signal, None)
321
    self.assertEqual(result.exit_code, 1)
322
    result = RunCmd(["echo", "-n", self.magic])
323
    self.assertEqual(result.signal, None)
324
    self.assertEqual(result.exit_code, 0)
325
    self.assertEqual(result.stdout, self.magic)
326

    
327
  def testFileEmptyOutput(self):
328
    """Test file output"""
329
    result = RunCmd(["true"], output=self.fname)
330
    self.assertEqual(result.signal, None)
331
    self.assertEqual(result.exit_code, 0)
332
    self.assertFileContent(self.fname, "")
333

    
334
  def testLang(self):
335
    """Test locale environment"""
336
    old_env = os.environ.copy()
337
    try:
338
      os.environ["LANG"] = "en_US.UTF-8"
339
      os.environ["LC_ALL"] = "en_US.UTF-8"
340
      result = RunCmd(["locale"])
341
      for line in result.output.splitlines():
342
        key, value = line.split("=", 1)
343
        # Ignore these variables, they're overridden by LC_ALL
344
        if key == "LANG" or key == "LANGUAGE":
345
          continue
346
        self.failIf(value and value != "C" and value != '"C"',
347
            "Variable %s is set to the invalid value '%s'" % (key, value))
348
    finally:
349
      os.environ = old_env
350

    
351
  def testDefaultCwd(self):
352
    """Test default working directory"""
353
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
354

    
355
  def testCwd(self):
356
    """Test default working directory"""
357
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
358
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
359
    cwd = os.getcwd()
360
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
361

    
362
  def testResetEnv(self):
363
    """Test environment reset functionality"""
364
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
365
    self.failUnlessEqual(RunCmd(["env"], reset_env=True,
366
                                env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
367

    
368
  def testNoFork(self):
369
    """Test that nofork raise an error"""
370
    assert not utils.no_fork
371
    utils.no_fork = True
372
    try:
373
      self.assertRaises(errors.ProgrammerError, RunCmd, ["true"])
374
    finally:
375
      utils.no_fork = False
376

    
377
  def testWrongParams(self):
378
    """Test wrong parameters"""
379
    self.assertRaises(errors.ProgrammerError, RunCmd, ["true"],
380
                      output="/dev/null", interactive=True)
381

    
382

    
383
class TestRunParts(testutils.GanetiTestCase):
384
  """Testing case for the RunParts function"""
385

    
386
  def setUp(self):
387
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
388

    
389
  def tearDown(self):
390
    shutil.rmtree(self.rundir)
391

    
392
  def testEmpty(self):
393
    """Test on an empty dir"""
394
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
395

    
396
  def testSkipWrongName(self):
397
    """Test that wrong files are skipped"""
398
    fname = os.path.join(self.rundir, "00test.dot")
399
    utils.WriteFile(fname, data="")
400
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
401
    relname = os.path.basename(fname)
402
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
403
                         [(relname, constants.RUNPARTS_SKIP, None)])
404

    
405
  def testSkipNonExec(self):
406
    """Test that non executable files are skipped"""
407
    fname = os.path.join(self.rundir, "00test")
408
    utils.WriteFile(fname, data="")
409
    relname = os.path.basename(fname)
410
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
411
                         [(relname, constants.RUNPARTS_SKIP, None)])
412

    
413
  def testError(self):
414
    """Test error on a broken executable"""
415
    fname = os.path.join(self.rundir, "00test")
416
    utils.WriteFile(fname, data="")
417
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
418
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
419
    self.failUnlessEqual(relname, os.path.basename(fname))
420
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
421
    self.failUnless(error)
422

    
423
  def testSorted(self):
424
    """Test executions are sorted"""
425
    files = []
426
    files.append(os.path.join(self.rundir, "64test"))
427
    files.append(os.path.join(self.rundir, "00test"))
428
    files.append(os.path.join(self.rundir, "42test"))
429

    
430
    for fname in files:
431
      utils.WriteFile(fname, data="")
432

    
433
    results = RunParts(self.rundir, reset_env=True)
434

    
435
    for fname in sorted(files):
436
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
437

    
438
  def testOk(self):
439
    """Test correct execution"""
440
    fname = os.path.join(self.rundir, "00test")
441
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
442
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
443
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
444
    self.failUnlessEqual(relname, os.path.basename(fname))
445
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
446
    self.failUnlessEqual(runresult.stdout, "ciao")
447

    
448
  def testRunFail(self):
449
    """Test correct execution, with run failure"""
450
    fname = os.path.join(self.rundir, "00test")
451
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
452
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
453
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
454
    self.failUnlessEqual(relname, os.path.basename(fname))
455
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
456
    self.failUnlessEqual(runresult.exit_code, 1)
457
    self.failUnless(runresult.failed)
458

    
459
  def testRunMix(self):
460
    files = []
461
    files.append(os.path.join(self.rundir, "00test"))
462
    files.append(os.path.join(self.rundir, "42test"))
463
    files.append(os.path.join(self.rundir, "64test"))
464
    files.append(os.path.join(self.rundir, "99test"))
465

    
466
    files.sort()
467

    
468
    # 1st has errors in execution
469
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
470
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
471

    
472
    # 2nd is skipped
473
    utils.WriteFile(files[1], data="")
474

    
475
    # 3rd cannot execute properly
476
    utils.WriteFile(files[2], data="")
477
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
478

    
479
    # 4th execs
480
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
481
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
482

    
483
    results = RunParts(self.rundir, reset_env=True)
484

    
485
    (relname, status, runresult) = results[0]
486
    self.failUnlessEqual(relname, os.path.basename(files[0]))
487
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
488
    self.failUnlessEqual(runresult.exit_code, 1)
489
    self.failUnless(runresult.failed)
490

    
491
    (relname, status, runresult) = results[1]
492
    self.failUnlessEqual(relname, os.path.basename(files[1]))
493
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
494
    self.failUnlessEqual(runresult, None)
495

    
496
    (relname, status, runresult) = results[2]
497
    self.failUnlessEqual(relname, os.path.basename(files[2]))
498
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
499
    self.failUnless(runresult)
500

    
501
    (relname, status, runresult) = results[3]
502
    self.failUnlessEqual(relname, os.path.basename(files[3]))
503
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
504
    self.failUnlessEqual(runresult.output, "ciao")
505
    self.failUnlessEqual(runresult.exit_code, 0)
506
    self.failUnless(not runresult.failed)
507

    
508
  def testMissingDirectory(self):
509
    nosuchdir = utils.PathJoin(self.rundir, "no/such/directory")
510
    self.assertEqual(RunParts(nosuchdir), [])
511

    
512

    
513
class TestStartDaemon(testutils.GanetiTestCase):
514
  def setUp(self):
515
    self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
516
    self.tmpfile = os.path.join(self.tmpdir, "test")
517

    
518
  def tearDown(self):
519
    shutil.rmtree(self.tmpdir)
520

    
521
  def testShell(self):
522
    utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
523
    self._wait(self.tmpfile, 60.0, "Hello World")
524

    
525
  def testShellOutput(self):
526
    utils.StartDaemon("echo Hello World", output=self.tmpfile)
527
    self._wait(self.tmpfile, 60.0, "Hello World")
528

    
529
  def testNoShellNoOutput(self):
530
    utils.StartDaemon(["pwd"])
531

    
532
  def testNoShellNoOutputTouch(self):
533
    testfile = os.path.join(self.tmpdir, "check")
534
    self.failIf(os.path.exists(testfile))
535
    utils.StartDaemon(["touch", testfile])
536
    self._wait(testfile, 60.0, "")
537

    
538
  def testNoShellOutput(self):
539
    utils.StartDaemon(["pwd"], output=self.tmpfile)
540
    self._wait(self.tmpfile, 60.0, "/")
541

    
542
  def testNoShellOutputCwd(self):
543
    utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
544
    self._wait(self.tmpfile, 60.0, os.getcwd())
545

    
546
  def testShellEnv(self):
547
    utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
548
                      env={ "GNT_TEST_VAR": "Hello World", })
549
    self._wait(self.tmpfile, 60.0, "Hello World")
550

    
551
  def testNoShellEnv(self):
552
    utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
553
                      env={ "GNT_TEST_VAR": "Hello World", })
554
    self._wait(self.tmpfile, 60.0, "Hello World")
555

    
556
  def testOutputFd(self):
557
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
558
    try:
559
      utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
560
    finally:
561
      os.close(fd)
562
    self._wait(self.tmpfile, 60.0, os.getcwd())
563

    
564
  def testPid(self):
565
    pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
566
    self._wait(self.tmpfile, 60.0, str(pid))
567

    
568
  def testPidFile(self):
569
    pidfile = os.path.join(self.tmpdir, "pid")
570
    checkfile = os.path.join(self.tmpdir, "abort")
571

    
572
    pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
573
                            output=self.tmpfile)
574
    try:
575
      fd = os.open(pidfile, os.O_RDONLY)
576
      try:
577
        # Check file is locked
578
        self.assertRaises(errors.LockError, utils.LockFile, fd)
579

    
580
        pidtext = os.read(fd, 100)
581
      finally:
582
        os.close(fd)
583

    
584
      self.assertEqual(int(pidtext.strip()), pid)
585

    
586
      self.assert_(utils.IsProcessAlive(pid))
587
    finally:
588
      # No matter what happens, kill daemon
589
      utils.KillProcess(pid, timeout=5.0, waitpid=False)
590
      self.failIf(utils.IsProcessAlive(pid))
591

    
592
    self.assertEqual(utils.ReadFile(self.tmpfile), "")
593

    
594
  def _wait(self, path, timeout, expected):
595
    # Due to the asynchronous nature of daemon processes, polling is necessary.
596
    # A timeout makes sure the test doesn't hang forever.
597
    def _CheckFile():
598
      if not (os.path.isfile(path) and
599
              utils.ReadFile(path).strip() == expected):
600
        raise utils.RetryAgain()
601

    
602
    try:
603
      utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
604
    except utils.RetryTimeout:
605
      self.fail("Apparently the daemon didn't run in %s seconds and/or"
606
                " didn't write the correct output" % timeout)
607

    
608
  def testError(self):
609
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
610
                      ["./does-NOT-EXIST/here/0123456789"])
611
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
612
                      ["./does-NOT-EXIST/here/0123456789"],
613
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
614
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
615
                      ["./does-NOT-EXIST/here/0123456789"],
616
                      cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
617
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
618
                      ["./does-NOT-EXIST/here/0123456789"],
619
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
620

    
621
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
622
    try:
623
      self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
624
                        ["./does-NOT-EXIST/here/0123456789"],
625
                        output=self.tmpfile, output_fd=fd)
626
    finally:
627
      os.close(fd)
628

    
629

    
630
class TestSetCloseOnExecFlag(unittest.TestCase):
631
  """Tests for SetCloseOnExecFlag"""
632

    
633
  def setUp(self):
634
    self.tmpfile = tempfile.TemporaryFile()
635

    
636
  def testEnable(self):
637
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
638
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
639
                    fcntl.FD_CLOEXEC)
640

    
641
  def testDisable(self):
642
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
643
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
644
                fcntl.FD_CLOEXEC)
645

    
646

    
647
class TestSetNonblockFlag(unittest.TestCase):
648
  def setUp(self):
649
    self.tmpfile = tempfile.TemporaryFile()
650

    
651
  def testEnable(self):
652
    utils.SetNonblockFlag(self.tmpfile.fileno(), True)
653
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
654
                    os.O_NONBLOCK)
655

    
656
  def testDisable(self):
657
    utils.SetNonblockFlag(self.tmpfile.fileno(), False)
658
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
659
                os.O_NONBLOCK)
660

    
661

    
662
class TestRemoveFile(unittest.TestCase):
663
  """Test case for the RemoveFile function"""
664

    
665
  def setUp(self):
666
    """Create a temp dir and file for each case"""
667
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
668
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
669
    os.close(fd)
670

    
671
  def tearDown(self):
672
    if os.path.exists(self.tmpfile):
673
      os.unlink(self.tmpfile)
674
    os.rmdir(self.tmpdir)
675

    
676
  def testIgnoreDirs(self):
677
    """Test that RemoveFile() ignores directories"""
678
    self.assertEqual(None, RemoveFile(self.tmpdir))
679

    
680
  def testIgnoreNotExisting(self):
681
    """Test that RemoveFile() ignores non-existing files"""
682
    RemoveFile(self.tmpfile)
683
    RemoveFile(self.tmpfile)
684

    
685
  def testRemoveFile(self):
686
    """Test that RemoveFile does remove a file"""
687
    RemoveFile(self.tmpfile)
688
    if os.path.exists(self.tmpfile):
689
      self.fail("File '%s' not removed" % self.tmpfile)
690

    
691
  def testRemoveSymlink(self):
692
    """Test that RemoveFile does remove symlinks"""
693
    symlink = self.tmpdir + "/symlink"
694
    os.symlink("no-such-file", symlink)
695
    RemoveFile(symlink)
696
    if os.path.exists(symlink):
697
      self.fail("File '%s' not removed" % symlink)
698
    os.symlink(self.tmpfile, symlink)
699
    RemoveFile(symlink)
700
    if os.path.exists(symlink):
701
      self.fail("File '%s' not removed" % symlink)
702

    
703

    
704
class TestRemoveDir(unittest.TestCase):
705
  def setUp(self):
706
    self.tmpdir = tempfile.mkdtemp()
707

    
708
  def tearDown(self):
709
    try:
710
      shutil.rmtree(self.tmpdir)
711
    except EnvironmentError:
712
      pass
713

    
714
  def testEmptyDir(self):
715
    utils.RemoveDir(self.tmpdir)
716
    self.assertFalse(os.path.isdir(self.tmpdir))
717

    
718
  def testNonEmptyDir(self):
719
    self.tmpfile = os.path.join(self.tmpdir, "test1")
720
    open(self.tmpfile, "w").close()
721
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
722

    
723

    
724
class TestRename(unittest.TestCase):
725
  """Test case for RenameFile"""
726

    
727
  def setUp(self):
728
    """Create a temporary directory"""
729
    self.tmpdir = tempfile.mkdtemp()
730
    self.tmpfile = os.path.join(self.tmpdir, "test1")
731

    
732
    # Touch the file
733
    open(self.tmpfile, "w").close()
734

    
735
  def tearDown(self):
736
    """Remove temporary directory"""
737
    shutil.rmtree(self.tmpdir)
738

    
739
  def testSimpleRename1(self):
740
    """Simple rename 1"""
741
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
742
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
743

    
744
  def testSimpleRename2(self):
745
    """Simple rename 2"""
746
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
747
                     mkdir=True)
748
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
749

    
750
  def testRenameMkdir(self):
751
    """Rename with mkdir"""
752
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
753
                     mkdir=True)
754
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
755
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
756

    
757
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
758
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
759
                     mkdir=True)
760
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
761
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
762
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
763

    
764

    
765
class TestReadFile(testutils.GanetiTestCase):
766

    
767
  def testReadAll(self):
768
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
769
    self.assertEqual(len(data), 814)
770

    
771
    h = compat.md5_hash()
772
    h.update(data)
773
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
774

    
775
  def testReadSize(self):
776
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
777
                          size=100)
778
    self.assertEqual(len(data), 100)
779

    
780
    h = compat.md5_hash()
781
    h.update(data)
782
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
783

    
784
  def testError(self):
785
    self.assertRaises(EnvironmentError, utils.ReadFile,
786
                      "/dev/null/does-not-exist")
787

    
788

    
789
class TestReadOneLineFile(testutils.GanetiTestCase):
790

    
791
  def setUp(self):
792
    testutils.GanetiTestCase.setUp(self)
793

    
794
  def testDefault(self):
795
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
796
    self.assertEqual(len(data), 27)
797
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
798

    
799
  def testNotStrict(self):
800
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
801
    self.assertEqual(len(data), 27)
802
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
803

    
804
  def testStrictFailure(self):
805
    self.assertRaises(errors.GenericError, ReadOneLineFile,
806
                      self._TestDataFilename("cert1.pem"), strict=True)
807

    
808
  def testLongLine(self):
809
    dummydata = (1024 * "Hello World! ")
810
    myfile = self._CreateTempFile()
811
    utils.WriteFile(myfile, data=dummydata)
812
    datastrict = ReadOneLineFile(myfile, strict=True)
813
    datalax = ReadOneLineFile(myfile, strict=False)
814
    self.assertEqual(dummydata, datastrict)
815
    self.assertEqual(dummydata, datalax)
816

    
817
  def testNewline(self):
818
    myfile = self._CreateTempFile()
819
    myline = "myline"
820
    for nl in ["", "\n", "\r\n"]:
821
      dummydata = "%s%s" % (myline, nl)
822
      utils.WriteFile(myfile, data=dummydata)
823
      datalax = ReadOneLineFile(myfile, strict=False)
824
      self.assertEqual(myline, datalax)
825
      datastrict = ReadOneLineFile(myfile, strict=True)
826
      self.assertEqual(myline, datastrict)
827

    
828
  def testWhitespaceAndMultipleLines(self):
829
    myfile = self._CreateTempFile()
830
    for nl in ["", "\n", "\r\n"]:
831
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
832
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
833
        utils.WriteFile(myfile, data=dummydata)
834
        datalax = ReadOneLineFile(myfile, strict=False)
835
        if nl:
836
          self.assert_(set("\r\n") & set(dummydata))
837
          self.assertRaises(errors.GenericError, ReadOneLineFile,
838
                            myfile, strict=True)
839
          explen = len("Foo bar baz ") + len(ws)
840
          self.assertEqual(len(datalax), explen)
841
          self.assertEqual(datalax, dummydata[:explen])
842
          self.assertFalse(set("\r\n") & set(datalax))
843
        else:
844
          datastrict = ReadOneLineFile(myfile, strict=True)
845
          self.assertEqual(dummydata, datastrict)
846
          self.assertEqual(dummydata, datalax)
847

    
848
  def testEmptylines(self):
849
    myfile = self._CreateTempFile()
850
    myline = "myline"
851
    for nl in ["\n", "\r\n"]:
852
      for ol in ["", "otherline"]:
853
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
854
        utils.WriteFile(myfile, data=dummydata)
855
        self.assert_(set("\r\n") & set(dummydata))
856
        datalax = ReadOneLineFile(myfile, strict=False)
857
        self.assertEqual(myline, datalax)
858
        if ol:
859
          self.assertRaises(errors.GenericError, ReadOneLineFile,
860
                            myfile, strict=True)
861
        else:
862
          datastrict = ReadOneLineFile(myfile, strict=True)
863
          self.assertEqual(myline, datastrict)
864

    
865
  def testEmptyfile(self):
866
    myfile = self._CreateTempFile()
867
    self.assertRaises(errors.GenericError, ReadOneLineFile, myfile)
868

    
869

    
870
class TestTimestampForFilename(unittest.TestCase):
871
  def test(self):
872
    self.assert_("." not in utils.TimestampForFilename())
873
    self.assert_(":" not in utils.TimestampForFilename())
874

    
875

    
876
class TestCreateBackup(testutils.GanetiTestCase):
877
  def setUp(self):
878
    testutils.GanetiTestCase.setUp(self)
879

    
880
    self.tmpdir = tempfile.mkdtemp()
881

    
882
  def tearDown(self):
883
    testutils.GanetiTestCase.tearDown(self)
884

    
885
    shutil.rmtree(self.tmpdir)
886

    
887
  def testEmpty(self):
888
    filename = PathJoin(self.tmpdir, "config.data")
889
    utils.WriteFile(filename, data="")
890
    bname = utils.CreateBackup(filename)
891
    self.assertFileContent(bname, "")
892
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
893
    utils.CreateBackup(filename)
894
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
895
    utils.CreateBackup(filename)
896
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
897

    
898
    fifoname = PathJoin(self.tmpdir, "fifo")
899
    os.mkfifo(fifoname)
900
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
901

    
902
  def testContent(self):
903
    bkpcount = 0
904
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
905
      for rep in [1, 2, 10, 127]:
906
        testdata = data * rep
907

    
908
        filename = PathJoin(self.tmpdir, "test.data_")
909
        utils.WriteFile(filename, data=testdata)
910
        self.assertFileContent(filename, testdata)
911

    
912
        for _ in range(3):
913
          bname = utils.CreateBackup(filename)
914
          bkpcount += 1
915
          self.assertFileContent(bname, testdata)
916
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
917

    
918

    
919
class TestParseCpuMask(unittest.TestCase):
920
  """Test case for the ParseCpuMask function."""
921

    
922
  def testWellFormed(self):
923
    self.assertEqual(utils.ParseCpuMask(""), [])
924
    self.assertEqual(utils.ParseCpuMask("1"), [1])
925
    self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
926

    
927
  def testInvalidInput(self):
928
    for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
929
      self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
930

    
931

    
932
class TestSshKeys(testutils.GanetiTestCase):
933
  """Test case for the AddAuthorizedKey function"""
934

    
935
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
936
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
937
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
938

    
939
  def setUp(self):
940
    testutils.GanetiTestCase.setUp(self)
941
    self.tmpname = self._CreateTempFile()
942
    handle = open(self.tmpname, 'w')
943
    try:
944
      handle.write("%s\n" % TestSshKeys.KEY_A)
945
      handle.write("%s\n" % TestSshKeys.KEY_B)
946
    finally:
947
      handle.close()
948

    
949
  def testAddingNewKey(self):
950
    utils.AddAuthorizedKey(self.tmpname,
951
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
952

    
953
    self.assertFileContent(self.tmpname,
954
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
955
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
956
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
957
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
958

    
959
  def testAddingAlmostButNotCompletelyTheSameKey(self):
960
    utils.AddAuthorizedKey(self.tmpname,
961
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
962

    
963
    self.assertFileContent(self.tmpname,
964
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
965
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
966
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
967
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
968

    
969
  def testAddingExistingKeyWithSomeMoreSpaces(self):
970
    utils.AddAuthorizedKey(self.tmpname,
971
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
972

    
973
    self.assertFileContent(self.tmpname,
974
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
975
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
976
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
977

    
978
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
979
    utils.RemoveAuthorizedKey(self.tmpname,
980
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
981

    
982
    self.assertFileContent(self.tmpname,
983
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
984
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
985

    
986
  def testRemovingNonExistingKey(self):
987
    utils.RemoveAuthorizedKey(self.tmpname,
988
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
989

    
990
    self.assertFileContent(self.tmpname,
991
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
992
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
993
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
994

    
995

    
996
class TestEtcHosts(testutils.GanetiTestCase):
997
  """Test functions modifying /etc/hosts"""
998

    
999
  def setUp(self):
1000
    testutils.GanetiTestCase.setUp(self)
1001
    self.tmpname = self._CreateTempFile()
1002
    handle = open(self.tmpname, 'w')
1003
    try:
1004
      handle.write('# This is a test file for /etc/hosts\n')
1005
      handle.write('127.0.0.1\tlocalhost\n')
1006
      handle.write('192.0.2.1 router gw\n')
1007
    finally:
1008
      handle.close()
1009

    
1010
  def testSettingNewIp(self):
1011
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
1012
                     ['myhost'])
1013

    
1014
    self.assertFileContent(self.tmpname,
1015
      "# This is a test file for /etc/hosts\n"
1016
      "127.0.0.1\tlocalhost\n"
1017
      "192.0.2.1 router gw\n"
1018
      "198.51.100.4\tmyhost.example.com myhost\n")
1019
    self.assertFileMode(self.tmpname, 0644)
1020

    
1021
  def testSettingExistingIp(self):
1022
    SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
1023
                     ['myhost'])
1024

    
1025
    self.assertFileContent(self.tmpname,
1026
      "# This is a test file for /etc/hosts\n"
1027
      "127.0.0.1\tlocalhost\n"
1028
      "192.0.2.1\tmyhost.example.com myhost\n")
1029
    self.assertFileMode(self.tmpname, 0644)
1030

    
1031
  def testSettingDuplicateName(self):
1032
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
1033

    
1034
    self.assertFileContent(self.tmpname,
1035
      "# This is a test file for /etc/hosts\n"
1036
      "127.0.0.1\tlocalhost\n"
1037
      "192.0.2.1 router gw\n"
1038
      "198.51.100.4\tmyhost\n")
1039
    self.assertFileMode(self.tmpname, 0644)
1040

    
1041
  def testRemovingExistingHost(self):
1042
    RemoveEtcHostsEntry(self.tmpname, 'router')
1043

    
1044
    self.assertFileContent(self.tmpname,
1045
      "# This is a test file for /etc/hosts\n"
1046
      "127.0.0.1\tlocalhost\n"
1047
      "192.0.2.1 gw\n")
1048
    self.assertFileMode(self.tmpname, 0644)
1049

    
1050
  def testRemovingSingleExistingHost(self):
1051
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
1052

    
1053
    self.assertFileContent(self.tmpname,
1054
      "# This is a test file for /etc/hosts\n"
1055
      "192.0.2.1 router gw\n")
1056
    self.assertFileMode(self.tmpname, 0644)
1057

    
1058
  def testRemovingNonExistingHost(self):
1059
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
1060

    
1061
    self.assertFileContent(self.tmpname,
1062
      "# This is a test file for /etc/hosts\n"
1063
      "127.0.0.1\tlocalhost\n"
1064
      "192.0.2.1 router gw\n")
1065
    self.assertFileMode(self.tmpname, 0644)
1066

    
1067
  def testRemovingAlias(self):
1068
    RemoveEtcHostsEntry(self.tmpname, 'gw')
1069

    
1070
    self.assertFileContent(self.tmpname,
1071
      "# This is a test file for /etc/hosts\n"
1072
      "127.0.0.1\tlocalhost\n"
1073
      "192.0.2.1 router\n")
1074
    self.assertFileMode(self.tmpname, 0644)
1075

    
1076

    
1077
class TestGetMounts(unittest.TestCase):
1078
  """Test case for GetMounts()."""
1079

    
1080
  TESTDATA = (
1081
    "rootfs /     rootfs rw 0 0\n"
1082
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
1083
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
1084

    
1085
  def setUp(self):
1086
    self.tmpfile = tempfile.NamedTemporaryFile()
1087
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1088

    
1089
  def testGetMounts(self):
1090
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1091
      [
1092
        ("rootfs", "/", "rootfs", "rw"),
1093
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1094
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1095
      ])
1096

    
1097

    
1098
class TestListVisibleFiles(unittest.TestCase):
1099
  """Test case for ListVisibleFiles"""
1100

    
1101
  def setUp(self):
1102
    self.path = tempfile.mkdtemp()
1103

    
1104
  def tearDown(self):
1105
    shutil.rmtree(self.path)
1106

    
1107
  def _CreateFiles(self, files):
1108
    for name in files:
1109
      utils.WriteFile(os.path.join(self.path, name), data="test")
1110

    
1111
  def _test(self, files, expected):
1112
    self._CreateFiles(files)
1113
    found = ListVisibleFiles(self.path)
1114
    self.assertEqual(set(found), set(expected))
1115

    
1116
  def testAllVisible(self):
1117
    files = ["a", "b", "c"]
1118
    expected = files
1119
    self._test(files, expected)
1120

    
1121
  def testNoneVisible(self):
1122
    files = [".a", ".b", ".c"]
1123
    expected = []
1124
    self._test(files, expected)
1125

    
1126
  def testSomeVisible(self):
1127
    files = ["a", "b", ".c"]
1128
    expected = ["a", "b"]
1129
    self._test(files, expected)
1130

    
1131
  def testNonAbsolutePath(self):
1132
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1133

    
1134
  def testNonNormalizedPath(self):
1135
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1136
                          "/bin/../tmp")
1137

    
1138

    
1139
class TestNewUUID(unittest.TestCase):
1140
  """Test case for NewUUID"""
1141

    
1142
  def runTest(self):
1143
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
1144

    
1145

    
1146
class TestFirstFree(unittest.TestCase):
1147
  """Test case for the FirstFree function"""
1148

    
1149
  def test(self):
1150
    """Test FirstFree"""
1151
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1152
    self.failUnlessEqual(FirstFree([]), None)
1153
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1154
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1155
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1156

    
1157

    
1158
class TestTailFile(testutils.GanetiTestCase):
1159
  """Test case for the TailFile function"""
1160

    
1161
  def testEmpty(self):
1162
    fname = self._CreateTempFile()
1163
    self.failUnlessEqual(TailFile(fname), [])
1164
    self.failUnlessEqual(TailFile(fname, lines=25), [])
1165

    
1166
  def testAllLines(self):
1167
    data = ["test %d" % i for i in range(30)]
1168
    for i in range(30):
1169
      fname = self._CreateTempFile()
1170
      fd = open(fname, "w")
1171
      fd.write("\n".join(data[:i]))
1172
      if i > 0:
1173
        fd.write("\n")
1174
      fd.close()
1175
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1176

    
1177
  def testPartialLines(self):
1178
    data = ["test %d" % i for i in range(30)]
1179
    fname = self._CreateTempFile()
1180
    fd = open(fname, "w")
1181
    fd.write("\n".join(data))
1182
    fd.write("\n")
1183
    fd.close()
1184
    for i in range(1, 30):
1185
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1186

    
1187
  def testBigFile(self):
1188
    data = ["test %d" % i for i in range(30)]
1189
    fname = self._CreateTempFile()
1190
    fd = open(fname, "w")
1191
    fd.write("X" * 1048576)
1192
    fd.write("\n")
1193
    fd.write("\n".join(data))
1194
    fd.write("\n")
1195
    fd.close()
1196
    for i in range(1, 30):
1197
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1198

    
1199

    
1200
class _BaseFileLockTest:
1201
  """Test case for the FileLock class"""
1202

    
1203
  def testSharedNonblocking(self):
1204
    self.lock.Shared(blocking=False)
1205
    self.lock.Close()
1206

    
1207
  def testExclusiveNonblocking(self):
1208
    self.lock.Exclusive(blocking=False)
1209
    self.lock.Close()
1210

    
1211
  def testUnlockNonblocking(self):
1212
    self.lock.Unlock(blocking=False)
1213
    self.lock.Close()
1214

    
1215
  def testSharedBlocking(self):
1216
    self.lock.Shared(blocking=True)
1217
    self.lock.Close()
1218

    
1219
  def testExclusiveBlocking(self):
1220
    self.lock.Exclusive(blocking=True)
1221
    self.lock.Close()
1222

    
1223
  def testUnlockBlocking(self):
1224
    self.lock.Unlock(blocking=True)
1225
    self.lock.Close()
1226

    
1227
  def testSharedExclusiveUnlock(self):
1228
    self.lock.Shared(blocking=False)
1229
    self.lock.Exclusive(blocking=False)
1230
    self.lock.Unlock(blocking=False)
1231
    self.lock.Close()
1232

    
1233
  def testExclusiveSharedUnlock(self):
1234
    self.lock.Exclusive(blocking=False)
1235
    self.lock.Shared(blocking=False)
1236
    self.lock.Unlock(blocking=False)
1237
    self.lock.Close()
1238

    
1239
  def testSimpleTimeout(self):
1240
    # These will succeed on the first attempt, hence a short timeout
1241
    self.lock.Shared(blocking=True, timeout=10.0)
1242
    self.lock.Exclusive(blocking=False, timeout=10.0)
1243
    self.lock.Unlock(blocking=True, timeout=10.0)
1244
    self.lock.Close()
1245

    
1246
  @staticmethod
1247
  def _TryLockInner(filename, shared, blocking):
1248
    lock = utils.FileLock.Open(filename)
1249

    
1250
    if shared:
1251
      fn = lock.Shared
1252
    else:
1253
      fn = lock.Exclusive
1254

    
1255
    try:
1256
      # The timeout doesn't really matter as the parent process waits for us to
1257
      # finish anyway.
1258
      fn(blocking=blocking, timeout=0.01)
1259
    except errors.LockError, err:
1260
      return False
1261

    
1262
    return True
1263

    
1264
  def _TryLock(self, *args):
1265
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1266
                                      *args)
1267

    
1268
  def testTimeout(self):
1269
    for blocking in [True, False]:
1270
      self.lock.Exclusive(blocking=True)
1271
      self.failIf(self._TryLock(False, blocking))
1272
      self.failIf(self._TryLock(True, blocking))
1273

    
1274
      self.lock.Shared(blocking=True)
1275
      self.assert_(self._TryLock(True, blocking))
1276
      self.failIf(self._TryLock(False, blocking))
1277

    
1278
  def testCloseShared(self):
1279
    self.lock.Close()
1280
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1281

    
1282
  def testCloseExclusive(self):
1283
    self.lock.Close()
1284
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1285

    
1286
  def testCloseUnlock(self):
1287
    self.lock.Close()
1288
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1289

    
1290

    
1291
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1292
  TESTDATA = "Hello World\n" * 10
1293

    
1294
  def setUp(self):
1295
    testutils.GanetiTestCase.setUp(self)
1296

    
1297
    self.tmpfile = tempfile.NamedTemporaryFile()
1298
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1299
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1300

    
1301
    # Ensure "Open" didn't truncate file
1302
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1303

    
1304
  def tearDown(self):
1305
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1306

    
1307
    testutils.GanetiTestCase.tearDown(self)
1308

    
1309

    
1310
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1311
  def setUp(self):
1312
    self.tmpfile = tempfile.NamedTemporaryFile()
1313
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1314

    
1315

    
1316
class TestTimeFunctions(unittest.TestCase):
1317
  """Test case for time functions"""
1318

    
1319
  def runTest(self):
1320
    self.assertEqual(utils.SplitTime(1), (1, 0))
1321
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1322
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1323
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1324
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1325
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1326
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1327
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1328

    
1329
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1330

    
1331
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1332
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1333
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1334

    
1335
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1336
                     1218448917.481)
1337
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1338

    
1339
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1340
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1341
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1342
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1343
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1344

    
1345

    
1346
class FieldSetTestCase(unittest.TestCase):
1347
  """Test case for FieldSets"""
1348

    
1349
  def testSimpleMatch(self):
1350
    f = utils.FieldSet("a", "b", "c", "def")
1351
    self.failUnless(f.Matches("a"))
1352
    self.failIf(f.Matches("d"), "Substring matched")
1353
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1354
    self.failIf(f.NonMatching(["b", "c"]))
1355
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1356
    self.failUnless(f.NonMatching(["a", "d"]))
1357

    
1358
  def testRegexMatch(self):
1359
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1360
    self.failUnless(f.Matches("b1"))
1361
    self.failUnless(f.Matches("b99"))
1362
    self.failIf(f.Matches("b/1"))
1363
    self.failIf(f.NonMatching(["b12", "c"]))
1364
    self.failUnless(f.NonMatching(["a", "1"]))
1365

    
1366
class TestForceDictType(unittest.TestCase):
1367
  """Test case for ForceDictType"""
1368
  KEY_TYPES = {
1369
    "a": constants.VTYPE_INT,
1370
    "b": constants.VTYPE_BOOL,
1371
    "c": constants.VTYPE_STRING,
1372
    "d": constants.VTYPE_SIZE,
1373
    "e": constants.VTYPE_MAYBE_STRING,
1374
    }
1375

    
1376
  def _fdt(self, dict, allowed_values=None):
1377
    if allowed_values is None:
1378
      utils.ForceDictType(dict, self.KEY_TYPES)
1379
    else:
1380
      utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
1381

    
1382
    return dict
1383

    
1384
  def testSimpleDict(self):
1385
    self.assertEqual(self._fdt({}), {})
1386
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1387
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1388
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1389
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1390
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1391
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1392
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1393
    self.assertEqual(self._fdt({'b': False}), {'b': False})
1394
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1395
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1396
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1397
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1398
    self.assertEqual(self._fdt({"e": None, }), {"e": None, })
1399
    self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
1400
    self.assertEqual(self._fdt({"e": False, }), {"e": '', })
1401
    self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
1402

    
1403
  def testErrors(self):
1404
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1405
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
1406
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1407
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1408
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1409
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
1410
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
1411
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
1412
    self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
1413
    self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
1414
                      {"b": "hello"}, {"b": "no-such-type"})
1415

    
1416

    
1417
class TestIsNormAbsPath(unittest.TestCase):
1418
  """Testing case for IsNormAbsPath"""
1419

    
1420
  def _pathTestHelper(self, path, result):
1421
    if result:
1422
      self.assert_(utils.IsNormAbsPath(path),
1423
          "Path %s should result absolute and normalized" % path)
1424
    else:
1425
      self.assertFalse(utils.IsNormAbsPath(path),
1426
          "Path %s should not result absolute and normalized" % path)
1427

    
1428
  def testBase(self):
1429
    self._pathTestHelper('/etc', True)
1430
    self._pathTestHelper('/srv', True)
1431
    self._pathTestHelper('etc', False)
1432
    self._pathTestHelper('/etc/../root', False)
1433
    self._pathTestHelper('/etc/', False)
1434

    
1435

    
1436
class RunInSeparateProcess(unittest.TestCase):
1437
  def test(self):
1438
    for exp in [True, False]:
1439
      def _child():
1440
        return exp
1441

    
1442
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1443

    
1444
  def testArgs(self):
1445
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1446
      def _child(carg1, carg2):
1447
        return carg1 == "Foo" and carg2 == arg
1448

    
1449
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1450

    
1451
  def testPid(self):
1452
    parent_pid = os.getpid()
1453

    
1454
    def _check():
1455
      return os.getpid() == parent_pid
1456

    
1457
    self.failIf(utils.RunInSeparateProcess(_check))
1458

    
1459
  def testSignal(self):
1460
    def _kill():
1461
      os.kill(os.getpid(), signal.SIGTERM)
1462

    
1463
    self.assertRaises(errors.GenericError,
1464
                      utils.RunInSeparateProcess, _kill)
1465

    
1466
  def testException(self):
1467
    def _exc():
1468
      raise errors.GenericError("This is a test")
1469

    
1470
    self.assertRaises(errors.GenericError,
1471
                      utils.RunInSeparateProcess, _exc)
1472

    
1473

    
1474
class TestFingerprintFiles(unittest.TestCase):
1475
  def setUp(self):
1476
    self.tmpfile = tempfile.NamedTemporaryFile()
1477
    self.tmpfile2 = tempfile.NamedTemporaryFile()
1478
    utils.WriteFile(self.tmpfile2.name, data="Hello World\n")
1479
    self.results = {
1480
      self.tmpfile.name: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
1481
      self.tmpfile2.name: "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a",
1482
      }
1483

    
1484
  def testSingleFile(self):
1485
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1486
                     self.results[self.tmpfile.name])
1487

    
1488
    self.assertEqual(utils._FingerprintFile("/no/such/file"), None)
1489

    
1490
  def testBigFile(self):
1491
    self.tmpfile.write("A" * 8192)
1492
    self.tmpfile.flush()
1493
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1494
                     "35b6795ca20d6dc0aff8c7c110c96cd1070b8c38")
1495

    
1496
  def testMultiple(self):
1497
    all_files = self.results.keys()
1498
    all_files.append("/no/such/file")
1499
    self.assertEqual(utils.FingerprintFiles(self.results.keys()), self.results)
1500

    
1501

    
1502
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1503
  def setUp(self):
1504
    self.tmpdir = tempfile.mkdtemp()
1505

    
1506
  def tearDown(self):
1507
    shutil.rmtree(self.tmpdir)
1508

    
1509
  def _checkRsaPrivateKey(self, key):
1510
    lines = key.splitlines()
1511
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1512
            "-----END RSA PRIVATE KEY-----" in lines)
1513

    
1514
  def _checkCertificate(self, cert):
1515
    lines = cert.splitlines()
1516
    return ("-----BEGIN CERTIFICATE-----" in lines and
1517
            "-----END CERTIFICATE-----" in lines)
1518

    
1519
  def test(self):
1520
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1521
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1522
      self._checkRsaPrivateKey(key_pem)
1523
      self._checkCertificate(cert_pem)
1524

    
1525
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1526
                                           key_pem)
1527
      self.assert_(key.bits() >= 1024)
1528
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1529
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1530

    
1531
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1532
                                             cert_pem)
1533
      self.failIf(x509.has_expired())
1534
      self.assertEqual(x509.get_issuer().CN, common_name)
1535
      self.assertEqual(x509.get_subject().CN, common_name)
1536
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1537

    
1538
  def testLegacy(self):
1539
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1540

    
1541
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1542

    
1543
    cert1 = utils.ReadFile(cert1_filename)
1544

    
1545
    self.assert_(self._checkRsaPrivateKey(cert1))
1546
    self.assert_(self._checkCertificate(cert1))
1547

    
1548

    
1549
class TestPathJoin(unittest.TestCase):
1550
  """Testing case for PathJoin"""
1551

    
1552
  def testBasicItems(self):
1553
    mlist = ["/a", "b", "c"]
1554
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1555

    
1556
  def testNonAbsPrefix(self):
1557
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1558

    
1559
  def testBackTrack(self):
1560
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1561

    
1562
  def testMultiAbs(self):
1563
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1564

    
1565

    
1566
class TestValidateServiceName(unittest.TestCase):
1567
  def testValid(self):
1568
    testnames = [
1569
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
1570
      "ganeti",
1571
      "gnt-masterd",
1572
      "HELLO_WORLD_SVC",
1573
      "hello.world.1",
1574
      "0", "80", "1111", "65535",
1575
      ]
1576

    
1577
    for name in testnames:
1578
      self.assertEqual(utils.ValidateServiceName(name), name)
1579

    
1580
  def testInvalid(self):
1581
    testnames = [
1582
      -15756, -1, 65536, 133428083,
1583
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1584
      "-8546", "-1", "65536",
1585
      (129 * "A"),
1586
      ]
1587

    
1588
    for name in testnames:
1589
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1590

    
1591

    
1592
class TestParseAsn1Generalizedtime(unittest.TestCase):
1593
  def test(self):
1594
    # UTC
1595
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1596
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1597
                     1266860512)
1598
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1599
                     (2**31) - 1)
1600

    
1601
    # With offset
1602
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1603
                     1266860512)
1604
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1605
                     1266931012)
1606
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1607
                     1266931088)
1608
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1609
                     1266931295)
1610
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1611
                     3600)
1612

    
1613
    # Leap seconds are not supported by datetime.datetime
1614
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1615
                      "19841231235960+0000")
1616
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1617
                      "19920630235960+0000")
1618

    
1619
    # Errors
1620
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1621
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1622
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1623
                      "20100222174152")
1624
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1625
                      "Mon Feb 22 17:47:02 UTC 2010")
1626
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1627
                      "2010-02-22 17:42:02")
1628

    
1629

    
1630
class TestGetX509CertValidity(testutils.GanetiTestCase):
1631
  def setUp(self):
1632
    testutils.GanetiTestCase.setUp(self)
1633

    
1634
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1635

    
1636
    # Test whether we have pyOpenSSL 0.7 or above
1637
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1638

    
1639
    if not self.pyopenssl0_7:
1640
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1641
                    " function correctly")
1642

    
1643
  def _LoadCert(self, name):
1644
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1645
                                           self._ReadTestData(name))
1646

    
1647
  def test(self):
1648
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1649
    if self.pyopenssl0_7:
1650
      self.assertEqual(validity, (1266919967, 1267524767))
1651
    else:
1652
      self.assertEqual(validity, (None, None))
1653

    
1654

    
1655
class TestSignX509Certificate(unittest.TestCase):
1656
  KEY = "My private key!"
1657
  KEY_OTHER = "Another key"
1658

    
1659
  def test(self):
1660
    # Generate certificate valid for 5 minutes
1661
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1662

    
1663
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1664
                                           cert_pem)
1665

    
1666
    # No signature at all
1667
    self.assertRaises(errors.GenericError,
1668
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1669

    
1670
    # Invalid input
1671
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1672
                      "", self.KEY)
1673
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1674
                      "X-Ganeti-Signature: \n", self.KEY)
1675
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1676
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1677
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1678
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1679
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1680
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1681

    
1682
    # Invalid salt
1683
    for salt in list("-_@$,:;/\\ \t\n"):
1684
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1685
                        cert_pem, self.KEY, "foo%sbar" % salt)
1686

    
1687
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
1688
                 utils.GenerateSecret(numbytes=4),
1689
                 utils.GenerateSecret(numbytes=16),
1690
                 "{123:456}".encode("hex")]:
1691
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1692

    
1693
      self._Check(cert, salt, signed_pem)
1694

    
1695
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1696
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1697
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1698
                               "lines----\n------ at\nthe end!"))
1699

    
1700
  def _Check(self, cert, salt, pem):
1701
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1702
    self.assertEqual(salt, salt2)
1703
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1704

    
1705
    # Other key
1706
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1707
                      pem, self.KEY_OTHER)
1708

    
1709

    
1710
class TestMakedirs(unittest.TestCase):
1711
  def setUp(self):
1712
    self.tmpdir = tempfile.mkdtemp()
1713

    
1714
  def tearDown(self):
1715
    shutil.rmtree(self.tmpdir)
1716

    
1717
  def testNonExisting(self):
1718
    path = PathJoin(self.tmpdir, "foo")
1719
    utils.Makedirs(path)
1720
    self.assert_(os.path.isdir(path))
1721

    
1722
  def testExisting(self):
1723
    path = PathJoin(self.tmpdir, "foo")
1724
    os.mkdir(path)
1725
    utils.Makedirs(path)
1726
    self.assert_(os.path.isdir(path))
1727

    
1728
  def testRecursiveNonExisting(self):
1729
    path = PathJoin(self.tmpdir, "foo/bar/baz")
1730
    utils.Makedirs(path)
1731
    self.assert_(os.path.isdir(path))
1732

    
1733
  def testRecursiveExisting(self):
1734
    path = PathJoin(self.tmpdir, "B/moo/xyz")
1735
    self.assertFalse(os.path.exists(path))
1736
    os.mkdir(PathJoin(self.tmpdir, "B"))
1737
    utils.Makedirs(path)
1738
    self.assert_(os.path.isdir(path))
1739

    
1740

    
1741
class TestReadLockedPidFile(unittest.TestCase):
1742
  def setUp(self):
1743
    self.tmpdir = tempfile.mkdtemp()
1744

    
1745
  def tearDown(self):
1746
    shutil.rmtree(self.tmpdir)
1747

    
1748
  def testNonExistent(self):
1749
    path = PathJoin(self.tmpdir, "nonexist")
1750
    self.assert_(utils.ReadLockedPidFile(path) is None)
1751

    
1752
  def testUnlocked(self):
1753
    path = PathJoin(self.tmpdir, "pid")
1754
    utils.WriteFile(path, data="123")
1755
    self.assert_(utils.ReadLockedPidFile(path) is None)
1756

    
1757
  def testLocked(self):
1758
    path = PathJoin(self.tmpdir, "pid")
1759
    utils.WriteFile(path, data="123")
1760

    
1761
    fl = utils.FileLock.Open(path)
1762
    try:
1763
      fl.Exclusive(blocking=True)
1764

    
1765
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
1766
    finally:
1767
      fl.Close()
1768

    
1769
    self.assert_(utils.ReadLockedPidFile(path) is None)
1770

    
1771
  def testError(self):
1772
    path = PathJoin(self.tmpdir, "foobar", "pid")
1773
    utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
1774
    # open(2) should return ENOTDIR
1775
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
1776

    
1777

    
1778
class TestCertVerification(testutils.GanetiTestCase):
1779
  def setUp(self):
1780
    testutils.GanetiTestCase.setUp(self)
1781

    
1782
    self.tmpdir = tempfile.mkdtemp()
1783

    
1784
  def tearDown(self):
1785
    shutil.rmtree(self.tmpdir)
1786

    
1787
  def testVerifyCertificate(self):
1788
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
1789
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1790
                                           cert_pem)
1791

    
1792
    # Not checking return value as this certificate is expired
1793
    utils.VerifyX509Certificate(cert, 30, 7)
1794

    
1795

    
1796
class TestVerifyCertificateInner(unittest.TestCase):
1797
  def test(self):
1798
    vci = utils._VerifyCertificateInner
1799

    
1800
    # Valid
1801
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
1802
                     (None, None))
1803

    
1804
    # Not yet valid
1805
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
1806
    self.assertEqual(errcode, utils.CERT_WARNING)
1807

    
1808
    # Expiring soon
1809
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
1810
    self.assertEqual(errcode, utils.CERT_ERROR)
1811

    
1812
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
1813
    self.assertEqual(errcode, utils.CERT_WARNING)
1814

    
1815
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
1816
    self.assertEqual(errcode, None)
1817

    
1818
    # Expired
1819
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
1820
    self.assertEqual(errcode, utils.CERT_ERROR)
1821

    
1822
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
1823
    self.assertEqual(errcode, utils.CERT_ERROR)
1824

    
1825
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
1826
    self.assertEqual(errcode, utils.CERT_ERROR)
1827

    
1828
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
1829
    self.assertEqual(errcode, utils.CERT_ERROR)
1830

    
1831

    
1832
class TestHmacFunctions(unittest.TestCase):
1833
  # Digests can be checked with "openssl sha1 -hmac $key"
1834
  def testSha1Hmac(self):
1835
    self.assertEqual(utils.Sha1Hmac("", ""),
1836
                     "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
1837
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
1838
                     "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
1839
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
1840
                     "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
1841

    
1842
    longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
1843
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
1844
                     "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
1845

    
1846
  def testSha1HmacSalt(self):
1847
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
1848
                     "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
1849
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
1850
                     "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
1851
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
1852
                     "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
1853

    
1854
  def testVerifySha1Hmac(self):
1855
    self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
1856
                                               "7d64b71fb76370690e1d")))
1857
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
1858
                                      ("f904c2476527c6d3e660"
1859
                                       "9ab683c66fa0652cb1dc")))
1860

    
1861
    digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
1862
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
1863
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
1864
                                      digest.lower()))
1865
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
1866
                                      digest.upper()))
1867
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
1868
                                      digest.title()))
1869

    
1870
  def testVerifySha1HmacSalt(self):
1871
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
1872
                                      ("17a4adc34d69c0d367d4"
1873
                                       "ffbef96fd41d4df7a6e8"),
1874
                                      salt="abc9"))
1875
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
1876
                                      ("7f264f8114c9066afc9b"
1877
                                       "b7636e1786d996d3cc0d"),
1878
                                      salt="xyz0"))
1879

    
1880

    
1881
class TestIgnoreSignals(unittest.TestCase):
1882
  """Test the IgnoreSignals decorator"""
1883

    
1884
  @staticmethod
1885
  def _Raise(exception):
1886
    raise exception
1887

    
1888
  @staticmethod
1889
  def _Return(rval):
1890
    return rval
1891

    
1892
  def testIgnoreSignals(self):
1893
    sock_err_intr = socket.error(errno.EINTR, "Message")
1894
    sock_err_inval = socket.error(errno.EINVAL, "Message")
1895

    
1896
    env_err_intr = EnvironmentError(errno.EINTR, "Message")
1897
    env_err_inval = EnvironmentError(errno.EINVAL, "Message")
1898

    
1899
    self.assertRaises(socket.error, self._Raise, sock_err_intr)
1900
    self.assertRaises(socket.error, self._Raise, sock_err_inval)
1901
    self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
1902
    self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
1903

    
1904
    self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
1905
    self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
1906
    self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
1907
                      sock_err_inval)
1908
    self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
1909
                      env_err_inval)
1910

    
1911
    self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
1912
    self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
1913

    
1914

    
1915
class TestEnsureDirs(unittest.TestCase):
1916
  """Tests for EnsureDirs"""
1917

    
1918
  def setUp(self):
1919
    self.dir = tempfile.mkdtemp()
1920
    self.old_umask = os.umask(0777)
1921

    
1922
  def testEnsureDirs(self):
1923
    utils.EnsureDirs([
1924
        (PathJoin(self.dir, "foo"), 0777),
1925
        (PathJoin(self.dir, "bar"), 0000),
1926
        ])
1927
    self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
1928
    self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
1929

    
1930
  def tearDown(self):
1931
    os.rmdir(PathJoin(self.dir, "foo"))
1932
    os.rmdir(PathJoin(self.dir, "bar"))
1933
    os.rmdir(self.dir)
1934
    os.umask(self.old_umask)
1935

    
1936

    
1937
class TestIgnoreProcessNotFound(unittest.TestCase):
1938
  @staticmethod
1939
  def _WritePid(fd):
1940
    os.write(fd, str(os.getpid()))
1941
    os.close(fd)
1942
    return True
1943

    
1944
  def test(self):
1945
    (pid_read_fd, pid_write_fd) = os.pipe()
1946

    
1947
    # Start short-lived process which writes its PID to pipe
1948
    self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
1949
    os.close(pid_write_fd)
1950

    
1951
    # Read PID from pipe
1952
    pid = int(os.read(pid_read_fd, 1024))
1953
    os.close(pid_read_fd)
1954

    
1955
    # Try to send signal to process which exited recently
1956
    self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
1957

    
1958

    
1959
class TestFindMatch(unittest.TestCase):
1960
  def test(self):
1961
    data = {
1962
      "aaaa": "Four A",
1963
      "bb": {"Two B": True},
1964
      re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
1965
      }
1966

    
1967
    self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
1968
    self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
1969

    
1970
    for i in ["foo", "bar", "bazX"]:
1971
      for j in range(1, 100, 7):
1972
        self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
1973
                         ((1, 2, 3), [i, str(j)]))
1974

    
1975
  def testNoMatch(self):
1976
    self.assert_(utils.FindMatch({}, "") is None)
1977
    self.assert_(utils.FindMatch({}, "foo") is None)
1978
    self.assert_(utils.FindMatch({}, 1234) is None)
1979

    
1980
    data = {
1981
      "X": "Hello World",
1982
      re.compile("^(something)$"): "Hello World",
1983
      }
1984

    
1985
    self.assert_(utils.FindMatch(data, "") is None)
1986
    self.assert_(utils.FindMatch(data, "Hello World") is None)
1987

    
1988

    
1989
class TestFileID(testutils.GanetiTestCase):
1990
  def testEquality(self):
1991
    name = self._CreateTempFile()
1992
    oldi = utils.GetFileID(path=name)
1993
    self.failUnless(utils.VerifyFileID(oldi, oldi))
1994

    
1995
  def testUpdate(self):
1996
    name = self._CreateTempFile()
1997
    oldi = utils.GetFileID(path=name)
1998
    os.utime(name, None)
1999
    fd = os.open(name, os.O_RDWR)
2000
    try:
2001
      newi = utils.GetFileID(fd=fd)
2002
      self.failUnless(utils.VerifyFileID(oldi, newi))
2003
      self.failUnless(utils.VerifyFileID(newi, oldi))
2004
    finally:
2005
      os.close(fd)
2006

    
2007
  def testWriteFile(self):
2008
    name = self._CreateTempFile()
2009
    oldi = utils.GetFileID(path=name)
2010
    mtime = oldi[2]
2011
    os.utime(name, (mtime + 10, mtime + 10))
2012
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
2013
                      oldi, data="")
2014
    os.utime(name, (mtime - 10, mtime - 10))
2015
    utils.SafeWriteFile(name, oldi, data="")
2016
    oldi = utils.GetFileID(path=name)
2017
    mtime = oldi[2]
2018
    os.utime(name, (mtime + 10, mtime + 10))
2019
    # this doesn't raise, since we passed None
2020
    utils.SafeWriteFile(name, None, data="")
2021

    
2022
  def testError(self):
2023
    t = tempfile.NamedTemporaryFile()
2024
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
2025
                      path=t.name, fd=t.fileno())
2026

    
2027

    
2028
class TimeMock:
2029
  def __init__(self, values):
2030
    self.values = values
2031

    
2032
  def __call__(self):
2033
    return self.values.pop(0)
2034

    
2035

    
2036
class TestRunningTimeout(unittest.TestCase):
2037
  def setUp(self):
2038
    self.time_fn = TimeMock([0.0, 0.3, 4.6, 6.5])
2039

    
2040
  def testRemainingFloat(self):
2041
    timeout = utils.RunningTimeout(5.0, True, _time_fn=self.time_fn)
2042
    self.assertAlmostEqual(timeout.Remaining(), 4.7)
2043
    self.assertAlmostEqual(timeout.Remaining(), 0.4)
2044
    self.assertAlmostEqual(timeout.Remaining(), -1.5)
2045

    
2046
  def testRemaining(self):
2047
    self.time_fn = TimeMock([0, 2, 4, 5, 6])
2048
    timeout = utils.RunningTimeout(5, True, _time_fn=self.time_fn)
2049
    self.assertEqual(timeout.Remaining(), 3)
2050
    self.assertEqual(timeout.Remaining(), 1)
2051
    self.assertEqual(timeout.Remaining(), 0)
2052
    self.assertEqual(timeout.Remaining(), -1)
2053

    
2054
  def testRemainingNonNegative(self):
2055
    timeout = utils.RunningTimeout(5.0, False, _time_fn=self.time_fn)
2056
    self.assertAlmostEqual(timeout.Remaining(), 4.7)
2057
    self.assertAlmostEqual(timeout.Remaining(), 0.4)
2058
    self.assertEqual(timeout.Remaining(), 0.0)
2059

    
2060
  def testNegativeTimeout(self):
2061
    self.assertRaises(ValueError, utils.RunningTimeout, -1.0, True)
2062

    
2063

    
2064
class TestTryConvert(unittest.TestCase):
2065
  def test(self):
2066
    for src, fn, result in [
2067
      ("1", int, 1),
2068
      ("a", int, "a"),
2069
      ("", bool, False),
2070
      ("a", bool, True),
2071
      ]:
2072
      self.assertEqual(utils.TryConvert(fn, src), result)
2073

    
2074

    
2075
class TestIsValidShellParam(unittest.TestCase):
2076
  def test(self):
2077
    for val, result in [
2078
      ("abc", True),
2079
      ("ab;cd", False),
2080
      ]:
2081
      self.assertEqual(utils.IsValidShellParam(val), result)
2082

    
2083

    
2084
class TestBuildShellCmd(unittest.TestCase):
2085
  def test(self):
2086
    self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
2087
                      "ls %s", "ab;cd")
2088
    self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
2089

    
2090

    
2091
class TestWriteFile(unittest.TestCase):
2092
  def setUp(self):
2093
    self.tfile = tempfile.NamedTemporaryFile()
2094
    self.did_pre = False
2095
    self.did_post = False
2096
    self.did_write = False
2097

    
2098
  def markPre(self, fd):
2099
    self.did_pre = True
2100

    
2101
  def markPost(self, fd):
2102
    self.did_post = True
2103

    
2104
  def markWrite(self, fd):
2105
    self.did_write = True
2106

    
2107
  def testWrite(self):
2108
    data = "abc"
2109
    utils.WriteFile(self.tfile.name, data=data)
2110
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
2111

    
2112
  def testErrors(self):
2113
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
2114
                      self.tfile.name, data="test", fn=lambda fd: None)
2115
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
2116
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
2117
                      self.tfile.name, data="test", atime=0)
2118

    
2119
  def testCalls(self):
2120
    utils.WriteFile(self.tfile.name, fn=self.markWrite,
2121
                    prewrite=self.markPre, postwrite=self.markPost)
2122
    self.assertTrue(self.did_pre)
2123
    self.assertTrue(self.did_post)
2124
    self.assertTrue(self.did_write)
2125

    
2126
  def testDryRun(self):
2127
    orig = "abc"
2128
    self.tfile.write(orig)
2129
    self.tfile.flush()
2130
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
2131
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
2132

    
2133
  def testTimes(self):
2134
    f = self.tfile.name
2135
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
2136
                   (int(time.time()), 5000)]:
2137
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
2138
      st = os.stat(f)
2139
      self.assertEqual(st.st_atime, at)
2140
      self.assertEqual(st.st_mtime, mt)
2141

    
2142

    
2143
  def testNoClose(self):
2144
    data = "hello"
2145
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
2146
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
2147
    try:
2148
      os.lseek(fd, 0, 0)
2149
      self.assertEqual(os.read(fd, 4096), data)
2150
    finally:
2151
      os.close(fd)
2152

    
2153

    
2154
if __name__ == '__main__':
2155
  testutils.GanetiTestProgram()