Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 3865ca48

History | View | Annotate | Download (41.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import distutils.version
25
import errno
26
import fcntl
27
import glob
28
import os
29
import os.path
30
import re
31
import shutil
32
import signal
33
import socket
34
import stat
35
import string
36
import tempfile
37
import time
38
import unittest
39
import warnings
40
import OpenSSL
41
import random
42
import operator
43

    
44
import testutils
45
from ganeti import constants
46
from ganeti import compat
47
from ganeti import utils
48
from ganeti import errors
49
from ganeti.utils import RunCmd, \
50
     FirstFree, \
51
     RunParts, \
52
     SetEtcHostsEntry, RemoveEtcHostsEntry
53

    
54

    
55
class TestIsProcessAlive(unittest.TestCase):
56
  """Testing case for IsProcessAlive"""
57

    
58
  def testExists(self):
59
    mypid = os.getpid()
60
    self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
61

    
62
  def testNotExisting(self):
63
    pid_non_existing = os.fork()
64
    if pid_non_existing == 0:
65
      os._exit(0)
66
    elif pid_non_existing < 0:
67
      raise SystemError("can't fork")
68
    os.waitpid(pid_non_existing, 0)
69
    self.assertFalse(utils.IsProcessAlive(pid_non_existing),
70
                     "nonexisting process detected")
71

    
72

    
73
class TestGetProcStatusPath(unittest.TestCase):
74
  def test(self):
75
    self.assert_("/1234/" in utils._GetProcStatusPath(1234))
76
    self.assertNotEqual(utils._GetProcStatusPath(1),
77
                        utils._GetProcStatusPath(2))
78

    
79

    
80
class TestIsProcessHandlingSignal(unittest.TestCase):
81
  def setUp(self):
82
    self.tmpdir = tempfile.mkdtemp()
83

    
84
  def tearDown(self):
85
    shutil.rmtree(self.tmpdir)
86

    
87
  def testParseSigsetT(self):
88
    self.assertEqual(len(utils._ParseSigsetT("0")), 0)
89
    self.assertEqual(utils._ParseSigsetT("1"), set([1]))
90
    self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
91
    self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
92
    self.assertEqual(utils._ParseSigsetT("0000000180000202"),
93
                     set([2, 10, 32, 33]))
94
    self.assertEqual(utils._ParseSigsetT("0000000180000002"),
95
                     set([2, 32, 33]))
96
    self.assertEqual(utils._ParseSigsetT("0000000188000002"),
97
                     set([2, 28, 32, 33]))
98
    self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
99
                     set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
100
                          24, 25, 26, 28, 31]))
101
    self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
102

    
103
  def testGetProcStatusField(self):
104
    for field in ["SigCgt", "Name", "FDSize"]:
105
      for value in ["", "0", "cat", "  1234 KB"]:
106
        pstatus = "\n".join([
107
          "VmPeak: 999 kB",
108
          "%s: %s" % (field, value),
109
          "TracerPid: 0",
110
          ])
111
        result = utils._GetProcStatusField(pstatus, field)
112
        self.assertEqual(result, value.strip())
113

    
114
  def test(self):
115
    sp = utils.PathJoin(self.tmpdir, "status")
116

    
117
    utils.WriteFile(sp, data="\n".join([
118
      "Name:   bash",
119
      "State:  S (sleeping)",
120
      "SleepAVG:       98%",
121
      "Pid:    22250",
122
      "PPid:   10858",
123
      "TracerPid:      0",
124
      "SigBlk: 0000000000010000",
125
      "SigIgn: 0000000000384004",
126
      "SigCgt: 000000004b813efb",
127
      "CapEff: 0000000000000000",
128
      ]))
129

    
130
    self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
131

    
132
  def testNoSigCgt(self):
133
    sp = utils.PathJoin(self.tmpdir, "status")
134

    
135
    utils.WriteFile(sp, data="\n".join([
136
      "Name:   bash",
137
      ]))
138

    
139
    self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
140
                      1234, 10, status_path=sp)
141

    
142
  def testNoSuchFile(self):
143
    sp = utils.PathJoin(self.tmpdir, "notexist")
144

    
145
    self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
146

    
147
  @staticmethod
148
  def _TestRealProcess():
149
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
150
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
151
      raise Exception("SIGUSR1 is handled when it should not be")
152

    
153
    signal.signal(signal.SIGUSR1, lambda signum, frame: None)
154
    if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
155
      raise Exception("SIGUSR1 is not handled when it should be")
156

    
157
    signal.signal(signal.SIGUSR1, signal.SIG_IGN)
158
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
159
      raise Exception("SIGUSR1 is not handled when it should be")
160

    
161
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
162
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
163
      raise Exception("SIGUSR1 is handled when it should not be")
164

    
165
    return True
166

    
167
  def testRealProcess(self):
168
    self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
169

    
170

    
171
class TestRunCmd(testutils.GanetiTestCase):
172
  """Testing case for the RunCmd function"""
173

    
174
  def setUp(self):
175
    testutils.GanetiTestCase.setUp(self)
176
    self.magic = time.ctime() + " ganeti test"
177
    self.fname = self._CreateTempFile()
178
    self.fifo_tmpdir = tempfile.mkdtemp()
179
    self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo")
180
    os.mkfifo(self.fifo_file)
181

    
182
  def tearDown(self):
183
    shutil.rmtree(self.fifo_tmpdir)
184
    testutils.GanetiTestCase.tearDown(self)
185

    
186
  def testOk(self):
187
    """Test successful exit code"""
188
    result = RunCmd("/bin/sh -c 'exit 0'")
189
    self.assertEqual(result.exit_code, 0)
190
    self.assertEqual(result.output, "")
191

    
192
  def testFail(self):
193
    """Test fail exit code"""
194
    result = RunCmd("/bin/sh -c 'exit 1'")
195
    self.assertEqual(result.exit_code, 1)
196
    self.assertEqual(result.output, "")
197

    
198
  def testStdout(self):
199
    """Test standard output"""
200
    cmd = 'echo -n "%s"' % self.magic
201
    result = RunCmd("/bin/sh -c '%s'" % cmd)
202
    self.assertEqual(result.stdout, self.magic)
203
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
204
    self.assertEqual(result.output, "")
205
    self.assertFileContent(self.fname, self.magic)
206

    
207
  def testStderr(self):
208
    """Test standard error"""
209
    cmd = 'echo -n "%s"' % self.magic
210
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
211
    self.assertEqual(result.stderr, self.magic)
212
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
213
    self.assertEqual(result.output, "")
214
    self.assertFileContent(self.fname, self.magic)
215

    
216
  def testCombined(self):
217
    """Test combined output"""
218
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
219
    expected = "A" + self.magic + "B" + self.magic
220
    result = RunCmd("/bin/sh -c '%s'" % cmd)
221
    self.assertEqual(result.output, expected)
222
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
223
    self.assertEqual(result.output, "")
224
    self.assertFileContent(self.fname, expected)
225

    
226
  def testSignal(self):
227
    """Test signal"""
228
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
229
    self.assertEqual(result.signal, 15)
230
    self.assertEqual(result.output, "")
231

    
232
  def testTimeoutClean(self):
233
    cmd = "trap 'exit 0' TERM; read < %s" % self.fifo_file
234
    result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
235
    self.assertEqual(result.exit_code, 0)
236

    
237
  def testTimeoutKill(self):
238
    cmd = ["/bin/sh", "-c", "trap '' TERM; read < %s" % self.fifo_file]
239
    timeout = 0.2
240
    out, err, status, ta = utils._RunCmdPipe(cmd, {}, False, "/", False,
241
                                             timeout, _linger_timeout=0.2)
242
    self.assert_(status < 0)
243
    self.assertEqual(-status, signal.SIGKILL)
244

    
245
  def testTimeoutOutputAfterTerm(self):
246
    cmd = "trap 'echo sigtermed; exit 1' TERM; read < %s" % self.fifo_file
247
    result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
248
    self.assert_(result.failed)
249
    self.assertEqual(result.stdout, "sigtermed\n")
250

    
251
  def testListRun(self):
252
    """Test list runs"""
253
    result = RunCmd(["true"])
254
    self.assertEqual(result.signal, None)
255
    self.assertEqual(result.exit_code, 0)
256
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
257
    self.assertEqual(result.signal, None)
258
    self.assertEqual(result.exit_code, 1)
259
    result = RunCmd(["echo", "-n", self.magic])
260
    self.assertEqual(result.signal, None)
261
    self.assertEqual(result.exit_code, 0)
262
    self.assertEqual(result.stdout, self.magic)
263

    
264
  def testFileEmptyOutput(self):
265
    """Test file output"""
266
    result = RunCmd(["true"], output=self.fname)
267
    self.assertEqual(result.signal, None)
268
    self.assertEqual(result.exit_code, 0)
269
    self.assertFileContent(self.fname, "")
270

    
271
  def testLang(self):
272
    """Test locale environment"""
273
    old_env = os.environ.copy()
274
    try:
275
      os.environ["LANG"] = "en_US.UTF-8"
276
      os.environ["LC_ALL"] = "en_US.UTF-8"
277
      result = RunCmd(["locale"])
278
      for line in result.output.splitlines():
279
        key, value = line.split("=", 1)
280
        # Ignore these variables, they're overridden by LC_ALL
281
        if key == "LANG" or key == "LANGUAGE":
282
          continue
283
        self.failIf(value and value != "C" and value != '"C"',
284
            "Variable %s is set to the invalid value '%s'" % (key, value))
285
    finally:
286
      os.environ = old_env
287

    
288
  def testDefaultCwd(self):
289
    """Test default working directory"""
290
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
291

    
292
  def testCwd(self):
293
    """Test default working directory"""
294
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
295
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
296
    cwd = os.getcwd()
297
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
298

    
299
  def testResetEnv(self):
300
    """Test environment reset functionality"""
301
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
302
    self.failUnlessEqual(RunCmd(["env"], reset_env=True,
303
                                env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
304

    
305
  def testNoFork(self):
306
    """Test that nofork raise an error"""
307
    self.assertFalse(utils._no_fork)
308
    utils.DisableFork()
309
    try:
310
      self.assertTrue(utils._no_fork)
311
      self.assertRaises(errors.ProgrammerError, RunCmd, ["true"])
312
    finally:
313
      utils._no_fork = False
314

    
315
  def testWrongParams(self):
316
    """Test wrong parameters"""
317
    self.assertRaises(errors.ProgrammerError, RunCmd, ["true"],
318
                      output="/dev/null", interactive=True)
319

    
320

    
321
class TestRunParts(testutils.GanetiTestCase):
322
  """Testing case for the RunParts function"""
323

    
324
  def setUp(self):
325
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
326

    
327
  def tearDown(self):
328
    shutil.rmtree(self.rundir)
329

    
330
  def testEmpty(self):
331
    """Test on an empty dir"""
332
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
333

    
334
  def testSkipWrongName(self):
335
    """Test that wrong files are skipped"""
336
    fname = os.path.join(self.rundir, "00test.dot")
337
    utils.WriteFile(fname, data="")
338
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
339
    relname = os.path.basename(fname)
340
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
341
                         [(relname, constants.RUNPARTS_SKIP, None)])
342

    
343
  def testSkipNonExec(self):
344
    """Test that non executable files are skipped"""
345
    fname = os.path.join(self.rundir, "00test")
346
    utils.WriteFile(fname, data="")
347
    relname = os.path.basename(fname)
348
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
349
                         [(relname, constants.RUNPARTS_SKIP, None)])
350

    
351
  def testError(self):
352
    """Test error on a broken executable"""
353
    fname = os.path.join(self.rundir, "00test")
354
    utils.WriteFile(fname, data="")
355
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
356
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
357
    self.failUnlessEqual(relname, os.path.basename(fname))
358
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
359
    self.failUnless(error)
360

    
361
  def testSorted(self):
362
    """Test executions are sorted"""
363
    files = []
364
    files.append(os.path.join(self.rundir, "64test"))
365
    files.append(os.path.join(self.rundir, "00test"))
366
    files.append(os.path.join(self.rundir, "42test"))
367

    
368
    for fname in files:
369
      utils.WriteFile(fname, data="")
370

    
371
    results = RunParts(self.rundir, reset_env=True)
372

    
373
    for fname in sorted(files):
374
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
375

    
376
  def testOk(self):
377
    """Test correct execution"""
378
    fname = os.path.join(self.rundir, "00test")
379
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
380
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
381
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
382
    self.failUnlessEqual(relname, os.path.basename(fname))
383
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
384
    self.failUnlessEqual(runresult.stdout, "ciao")
385

    
386
  def testRunFail(self):
387
    """Test correct execution, with run failure"""
388
    fname = os.path.join(self.rundir, "00test")
389
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
390
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
391
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
392
    self.failUnlessEqual(relname, os.path.basename(fname))
393
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
394
    self.failUnlessEqual(runresult.exit_code, 1)
395
    self.failUnless(runresult.failed)
396

    
397
  def testRunMix(self):
398
    files = []
399
    files.append(os.path.join(self.rundir, "00test"))
400
    files.append(os.path.join(self.rundir, "42test"))
401
    files.append(os.path.join(self.rundir, "64test"))
402
    files.append(os.path.join(self.rundir, "99test"))
403

    
404
    files.sort()
405

    
406
    # 1st has errors in execution
407
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
408
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
409

    
410
    # 2nd is skipped
411
    utils.WriteFile(files[1], data="")
412

    
413
    # 3rd cannot execute properly
414
    utils.WriteFile(files[2], data="")
415
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
416

    
417
    # 4th execs
418
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
419
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
420

    
421
    results = RunParts(self.rundir, reset_env=True)
422

    
423
    (relname, status, runresult) = results[0]
424
    self.failUnlessEqual(relname, os.path.basename(files[0]))
425
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
426
    self.failUnlessEqual(runresult.exit_code, 1)
427
    self.failUnless(runresult.failed)
428

    
429
    (relname, status, runresult) = results[1]
430
    self.failUnlessEqual(relname, os.path.basename(files[1]))
431
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
432
    self.failUnlessEqual(runresult, None)
433

    
434
    (relname, status, runresult) = results[2]
435
    self.failUnlessEqual(relname, os.path.basename(files[2]))
436
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
437
    self.failUnless(runresult)
438

    
439
    (relname, status, runresult) = results[3]
440
    self.failUnlessEqual(relname, os.path.basename(files[3]))
441
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
442
    self.failUnlessEqual(runresult.output, "ciao")
443
    self.failUnlessEqual(runresult.exit_code, 0)
444
    self.failUnless(not runresult.failed)
445

    
446
  def testMissingDirectory(self):
447
    nosuchdir = utils.PathJoin(self.rundir, "no/such/directory")
448
    self.assertEqual(RunParts(nosuchdir), [])
449

    
450

    
451
class TestStartDaemon(testutils.GanetiTestCase):
452
  def setUp(self):
453
    self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
454
    self.tmpfile = os.path.join(self.tmpdir, "test")
455

    
456
  def tearDown(self):
457
    shutil.rmtree(self.tmpdir)
458

    
459
  def testShell(self):
460
    utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
461
    self._wait(self.tmpfile, 60.0, "Hello World")
462

    
463
  def testShellOutput(self):
464
    utils.StartDaemon("echo Hello World", output=self.tmpfile)
465
    self._wait(self.tmpfile, 60.0, "Hello World")
466

    
467
  def testNoShellNoOutput(self):
468
    utils.StartDaemon(["pwd"])
469

    
470
  def testNoShellNoOutputTouch(self):
471
    testfile = os.path.join(self.tmpdir, "check")
472
    self.failIf(os.path.exists(testfile))
473
    utils.StartDaemon(["touch", testfile])
474
    self._wait(testfile, 60.0, "")
475

    
476
  def testNoShellOutput(self):
477
    utils.StartDaemon(["pwd"], output=self.tmpfile)
478
    self._wait(self.tmpfile, 60.0, "/")
479

    
480
  def testNoShellOutputCwd(self):
481
    utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
482
    self._wait(self.tmpfile, 60.0, os.getcwd())
483

    
484
  def testShellEnv(self):
485
    utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
486
                      env={ "GNT_TEST_VAR": "Hello World", })
487
    self._wait(self.tmpfile, 60.0, "Hello World")
488

    
489
  def testNoShellEnv(self):
490
    utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
491
                      env={ "GNT_TEST_VAR": "Hello World", })
492
    self._wait(self.tmpfile, 60.0, "Hello World")
493

    
494
  def testOutputFd(self):
495
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
496
    try:
497
      utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
498
    finally:
499
      os.close(fd)
500
    self._wait(self.tmpfile, 60.0, os.getcwd())
501

    
502
  def testPid(self):
503
    pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
504
    self._wait(self.tmpfile, 60.0, str(pid))
505

    
506
  def testPidFile(self):
507
    pidfile = os.path.join(self.tmpdir, "pid")
508
    checkfile = os.path.join(self.tmpdir, "abort")
509

    
510
    pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
511
                            output=self.tmpfile)
512
    try:
513
      fd = os.open(pidfile, os.O_RDONLY)
514
      try:
515
        # Check file is locked
516
        self.assertRaises(errors.LockError, utils.LockFile, fd)
517

    
518
        pidtext = os.read(fd, 100)
519
      finally:
520
        os.close(fd)
521

    
522
      self.assertEqual(int(pidtext.strip()), pid)
523

    
524
      self.assert_(utils.IsProcessAlive(pid))
525
    finally:
526
      # No matter what happens, kill daemon
527
      utils.KillProcess(pid, timeout=5.0, waitpid=False)
528
      self.failIf(utils.IsProcessAlive(pid))
529

    
530
    self.assertEqual(utils.ReadFile(self.tmpfile), "")
531

    
532
  def _wait(self, path, timeout, expected):
533
    # Due to the asynchronous nature of daemon processes, polling is necessary.
534
    # A timeout makes sure the test doesn't hang forever.
535
    def _CheckFile():
536
      if not (os.path.isfile(path) and
537
              utils.ReadFile(path).strip() == expected):
538
        raise utils.RetryAgain()
539

    
540
    try:
541
      utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
542
    except utils.RetryTimeout:
543
      self.fail("Apparently the daemon didn't run in %s seconds and/or"
544
                " didn't write the correct output" % timeout)
545

    
546
  def testError(self):
547
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
548
                      ["./does-NOT-EXIST/here/0123456789"])
549
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
550
                      ["./does-NOT-EXIST/here/0123456789"],
551
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
552
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
553
                      ["./does-NOT-EXIST/here/0123456789"],
554
                      cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
555
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
556
                      ["./does-NOT-EXIST/here/0123456789"],
557
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
558

    
559
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
560
    try:
561
      self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
562
                        ["./does-NOT-EXIST/here/0123456789"],
563
                        output=self.tmpfile, output_fd=fd)
564
    finally:
565
      os.close(fd)
566

    
567

    
568
class TestParseCpuMask(unittest.TestCase):
569
  """Test case for the ParseCpuMask function."""
570

    
571
  def testWellFormed(self):
572
    self.assertEqual(utils.ParseCpuMask(""), [])
573
    self.assertEqual(utils.ParseCpuMask("1"), [1])
574
    self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
575

    
576
  def testInvalidInput(self):
577
    for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
578
      self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
579

    
580

    
581
class TestEtcHosts(testutils.GanetiTestCase):
582
  """Test functions modifying /etc/hosts"""
583

    
584
  def setUp(self):
585
    testutils.GanetiTestCase.setUp(self)
586
    self.tmpname = self._CreateTempFile()
587
    handle = open(self.tmpname, 'w')
588
    try:
589
      handle.write('# This is a test file for /etc/hosts\n')
590
      handle.write('127.0.0.1\tlocalhost\n')
591
      handle.write('192.0.2.1 router gw\n')
592
    finally:
593
      handle.close()
594

    
595
  def testSettingNewIp(self):
596
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
597
                     ['myhost'])
598

    
599
    self.assertFileContent(self.tmpname,
600
      "# This is a test file for /etc/hosts\n"
601
      "127.0.0.1\tlocalhost\n"
602
      "192.0.2.1 router gw\n"
603
      "198.51.100.4\tmyhost.example.com myhost\n")
604
    self.assertFileMode(self.tmpname, 0644)
605

    
606
  def testSettingExistingIp(self):
607
    SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
608
                     ['myhost'])
609

    
610
    self.assertFileContent(self.tmpname,
611
      "# This is a test file for /etc/hosts\n"
612
      "127.0.0.1\tlocalhost\n"
613
      "192.0.2.1\tmyhost.example.com myhost\n")
614
    self.assertFileMode(self.tmpname, 0644)
615

    
616
  def testSettingDuplicateName(self):
617
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
618

    
619
    self.assertFileContent(self.tmpname,
620
      "# This is a test file for /etc/hosts\n"
621
      "127.0.0.1\tlocalhost\n"
622
      "192.0.2.1 router gw\n"
623
      "198.51.100.4\tmyhost\n")
624
    self.assertFileMode(self.tmpname, 0644)
625

    
626
  def testRemovingExistingHost(self):
627
    RemoveEtcHostsEntry(self.tmpname, 'router')
628

    
629
    self.assertFileContent(self.tmpname,
630
      "# This is a test file for /etc/hosts\n"
631
      "127.0.0.1\tlocalhost\n"
632
      "192.0.2.1 gw\n")
633
    self.assertFileMode(self.tmpname, 0644)
634

    
635
  def testRemovingSingleExistingHost(self):
636
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
637

    
638
    self.assertFileContent(self.tmpname,
639
      "# This is a test file for /etc/hosts\n"
640
      "192.0.2.1 router gw\n")
641
    self.assertFileMode(self.tmpname, 0644)
642

    
643
  def testRemovingNonExistingHost(self):
644
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
645

    
646
    self.assertFileContent(self.tmpname,
647
      "# This is a test file for /etc/hosts\n"
648
      "127.0.0.1\tlocalhost\n"
649
      "192.0.2.1 router gw\n")
650
    self.assertFileMode(self.tmpname, 0644)
651

    
652
  def testRemovingAlias(self):
653
    RemoveEtcHostsEntry(self.tmpname, 'gw')
654

    
655
    self.assertFileContent(self.tmpname,
656
      "# This is a test file for /etc/hosts\n"
657
      "127.0.0.1\tlocalhost\n"
658
      "192.0.2.1 router\n")
659
    self.assertFileMode(self.tmpname, 0644)
660

    
661

    
662
class TestGetMounts(unittest.TestCase):
663
  """Test case for GetMounts()."""
664

    
665
  TESTDATA = (
666
    "rootfs /     rootfs rw 0 0\n"
667
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
668
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
669

    
670
  def setUp(self):
671
    self.tmpfile = tempfile.NamedTemporaryFile()
672
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
673

    
674
  def testGetMounts(self):
675
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
676
      [
677
        ("rootfs", "/", "rootfs", "rw"),
678
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
679
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
680
      ])
681

    
682
class TestNewUUID(unittest.TestCase):
683
  """Test case for NewUUID"""
684

    
685
  def runTest(self):
686
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
687

    
688

    
689
class TestFirstFree(unittest.TestCase):
690
  """Test case for the FirstFree function"""
691

    
692
  def test(self):
693
    """Test FirstFree"""
694
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
695
    self.failUnlessEqual(FirstFree([]), None)
696
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
697
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
698
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
699

    
700

    
701
class TestTimeFunctions(unittest.TestCase):
702
  """Test case for time functions"""
703

    
704
  def runTest(self):
705
    self.assertEqual(utils.SplitTime(1), (1, 0))
706
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
707
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
708
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
709
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
710
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
711
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
712
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
713

    
714
    self.assertRaises(AssertionError, utils.SplitTime, -1)
715

    
716
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
717
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
718
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
719

    
720
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
721
                     1218448917.481)
722
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
723

    
724
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
725
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
726
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
727
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
728
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
729

    
730

    
731
class FieldSetTestCase(unittest.TestCase):
732
  """Test case for FieldSets"""
733

    
734
  def testSimpleMatch(self):
735
    f = utils.FieldSet("a", "b", "c", "def")
736
    self.failUnless(f.Matches("a"))
737
    self.failIf(f.Matches("d"), "Substring matched")
738
    self.failIf(f.Matches("defghi"), "Prefix string matched")
739
    self.failIf(f.NonMatching(["b", "c"]))
740
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
741
    self.failUnless(f.NonMatching(["a", "d"]))
742

    
743
  def testRegexMatch(self):
744
    f = utils.FieldSet("a", "b([0-9]+)", "c")
745
    self.failUnless(f.Matches("b1"))
746
    self.failUnless(f.Matches("b99"))
747
    self.failIf(f.Matches("b/1"))
748
    self.failIf(f.NonMatching(["b12", "c"]))
749
    self.failUnless(f.NonMatching(["a", "1"]))
750

    
751
class TestForceDictType(unittest.TestCase):
752
  """Test case for ForceDictType"""
753
  KEY_TYPES = {
754
    "a": constants.VTYPE_INT,
755
    "b": constants.VTYPE_BOOL,
756
    "c": constants.VTYPE_STRING,
757
    "d": constants.VTYPE_SIZE,
758
    "e": constants.VTYPE_MAYBE_STRING,
759
    }
760

    
761
  def _fdt(self, dict, allowed_values=None):
762
    if allowed_values is None:
763
      utils.ForceDictType(dict, self.KEY_TYPES)
764
    else:
765
      utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
766

    
767
    return dict
768

    
769
  def testSimpleDict(self):
770
    self.assertEqual(self._fdt({}), {})
771
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
772
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
773
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
774
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
775
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
776
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
777
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
778
    self.assertEqual(self._fdt({'b': False}), {'b': False})
779
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
780
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
781
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
782
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
783
    self.assertEqual(self._fdt({"e": None, }), {"e": None, })
784
    self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
785
    self.assertEqual(self._fdt({"e": False, }), {"e": '', })
786
    self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
787

    
788
  def testErrors(self):
789
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
790
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
791
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
792
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
793
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
794
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
795
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
796
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
797
    self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
798
    self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
799
                      {"b": "hello"}, {"b": "no-such-type"})
800

    
801

    
802
class RunInSeparateProcess(unittest.TestCase):
803
  def test(self):
804
    for exp in [True, False]:
805
      def _child():
806
        return exp
807

    
808
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
809

    
810
  def testArgs(self):
811
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
812
      def _child(carg1, carg2):
813
        return carg1 == "Foo" and carg2 == arg
814

    
815
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
816

    
817
  def testPid(self):
818
    parent_pid = os.getpid()
819

    
820
    def _check():
821
      return os.getpid() == parent_pid
822

    
823
    self.failIf(utils.RunInSeparateProcess(_check))
824

    
825
  def testSignal(self):
826
    def _kill():
827
      os.kill(os.getpid(), signal.SIGTERM)
828

    
829
    self.assertRaises(errors.GenericError,
830
                      utils.RunInSeparateProcess, _kill)
831

    
832
  def testException(self):
833
    def _exc():
834
      raise errors.GenericError("This is a test")
835

    
836
    self.assertRaises(errors.GenericError,
837
                      utils.RunInSeparateProcess, _exc)
838

    
839

    
840
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
841
  def setUp(self):
842
    self.tmpdir = tempfile.mkdtemp()
843

    
844
  def tearDown(self):
845
    shutil.rmtree(self.tmpdir)
846

    
847
  def _checkRsaPrivateKey(self, key):
848
    lines = key.splitlines()
849
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
850
            "-----END RSA PRIVATE KEY-----" in lines)
851

    
852
  def _checkCertificate(self, cert):
853
    lines = cert.splitlines()
854
    return ("-----BEGIN CERTIFICATE-----" in lines and
855
            "-----END CERTIFICATE-----" in lines)
856

    
857
  def test(self):
858
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
859
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
860
      self._checkRsaPrivateKey(key_pem)
861
      self._checkCertificate(cert_pem)
862

    
863
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
864
                                           key_pem)
865
      self.assert_(key.bits() >= 1024)
866
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
867
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
868

    
869
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
870
                                             cert_pem)
871
      self.failIf(x509.has_expired())
872
      self.assertEqual(x509.get_issuer().CN, common_name)
873
      self.assertEqual(x509.get_subject().CN, common_name)
874
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
875

    
876
  def testLegacy(self):
877
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
878

    
879
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
880

    
881
    cert1 = utils.ReadFile(cert1_filename)
882

    
883
    self.assert_(self._checkRsaPrivateKey(cert1))
884
    self.assert_(self._checkCertificate(cert1))
885

    
886

    
887
class TestValidateServiceName(unittest.TestCase):
888
  def testValid(self):
889
    testnames = [
890
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
891
      "ganeti",
892
      "gnt-masterd",
893
      "HELLO_WORLD_SVC",
894
      "hello.world.1",
895
      "0", "80", "1111", "65535",
896
      ]
897

    
898
    for name in testnames:
899
      self.assertEqual(utils.ValidateServiceName(name), name)
900

    
901
  def testInvalid(self):
902
    testnames = [
903
      -15756, -1, 65536, 133428083,
904
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
905
      "-8546", "-1", "65536",
906
      (129 * "A"),
907
      ]
908

    
909
    for name in testnames:
910
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
911

    
912

    
913
class TestParseAsn1Generalizedtime(unittest.TestCase):
914
  def test(self):
915
    # UTC
916
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
917
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
918
                     1266860512)
919
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
920
                     (2**31) - 1)
921

    
922
    # With offset
923
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
924
                     1266860512)
925
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
926
                     1266931012)
927
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
928
                     1266931088)
929
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
930
                     1266931295)
931
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
932
                     3600)
933

    
934
    # Leap seconds are not supported by datetime.datetime
935
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
936
                      "19841231235960+0000")
937
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
938
                      "19920630235960+0000")
939

    
940
    # Errors
941
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
942
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
943
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
944
                      "20100222174152")
945
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
946
                      "Mon Feb 22 17:47:02 UTC 2010")
947
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
948
                      "2010-02-22 17:42:02")
949

    
950

    
951
class TestGetX509CertValidity(testutils.GanetiTestCase):
952
  def setUp(self):
953
    testutils.GanetiTestCase.setUp(self)
954

    
955
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
956

    
957
    # Test whether we have pyOpenSSL 0.7 or above
958
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
959

    
960
    if not self.pyopenssl0_7:
961
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
962
                    " function correctly")
963

    
964
  def _LoadCert(self, name):
965
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
966
                                           self._ReadTestData(name))
967

    
968
  def test(self):
969
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
970
    if self.pyopenssl0_7:
971
      self.assertEqual(validity, (1266919967, 1267524767))
972
    else:
973
      self.assertEqual(validity, (None, None))
974

    
975

    
976
class TestSignX509Certificate(unittest.TestCase):
977
  KEY = "My private key!"
978
  KEY_OTHER = "Another key"
979

    
980
  def test(self):
981
    # Generate certificate valid for 5 minutes
982
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
983

    
984
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
985
                                           cert_pem)
986

    
987
    # No signature at all
988
    self.assertRaises(errors.GenericError,
989
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
990

    
991
    # Invalid input
992
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
993
                      "", self.KEY)
994
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
995
                      "X-Ganeti-Signature: \n", self.KEY)
996
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
997
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
998
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
999
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1000
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1001
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1002

    
1003
    # Invalid salt
1004
    for salt in list("-_@$,:;/\\ \t\n"):
1005
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1006
                        cert_pem, self.KEY, "foo%sbar" % salt)
1007

    
1008
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
1009
                 utils.GenerateSecret(numbytes=4),
1010
                 utils.GenerateSecret(numbytes=16),
1011
                 "{123:456}".encode("hex")]:
1012
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1013

    
1014
      self._Check(cert, salt, signed_pem)
1015

    
1016
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1017
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1018
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1019
                               "lines----\n------ at\nthe end!"))
1020

    
1021
  def _Check(self, cert, salt, pem):
1022
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1023
    self.assertEqual(salt, salt2)
1024
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1025

    
1026
    # Other key
1027
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1028
                      pem, self.KEY_OTHER)
1029

    
1030

    
1031
class TestReadLockedPidFile(unittest.TestCase):
1032
  def setUp(self):
1033
    self.tmpdir = tempfile.mkdtemp()
1034

    
1035
  def tearDown(self):
1036
    shutil.rmtree(self.tmpdir)
1037

    
1038
  def testNonExistent(self):
1039
    path = utils.PathJoin(self.tmpdir, "nonexist")
1040
    self.assert_(utils.ReadLockedPidFile(path) is None)
1041

    
1042
  def testUnlocked(self):
1043
    path = utils.PathJoin(self.tmpdir, "pid")
1044
    utils.WriteFile(path, data="123")
1045
    self.assert_(utils.ReadLockedPidFile(path) is None)
1046

    
1047
  def testLocked(self):
1048
    path = utils.PathJoin(self.tmpdir, "pid")
1049
    utils.WriteFile(path, data="123")
1050

    
1051
    fl = utils.FileLock.Open(path)
1052
    try:
1053
      fl.Exclusive(blocking=True)
1054

    
1055
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
1056
    finally:
1057
      fl.Close()
1058

    
1059
    self.assert_(utils.ReadLockedPidFile(path) is None)
1060

    
1061
  def testError(self):
1062
    path = utils.PathJoin(self.tmpdir, "foobar", "pid")
1063
    utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
1064
    # open(2) should return ENOTDIR
1065
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
1066

    
1067

    
1068
class TestCertVerification(testutils.GanetiTestCase):
1069
  def setUp(self):
1070
    testutils.GanetiTestCase.setUp(self)
1071

    
1072
    self.tmpdir = tempfile.mkdtemp()
1073

    
1074
  def tearDown(self):
1075
    shutil.rmtree(self.tmpdir)
1076

    
1077
  def testVerifyCertificate(self):
1078
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
1079
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1080
                                           cert_pem)
1081

    
1082
    # Not checking return value as this certificate is expired
1083
    utils.VerifyX509Certificate(cert, 30, 7)
1084

    
1085

    
1086
class TestVerifyCertificateInner(unittest.TestCase):
1087
  def test(self):
1088
    vci = utils._VerifyCertificateInner
1089

    
1090
    # Valid
1091
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
1092
                     (None, None))
1093

    
1094
    # Not yet valid
1095
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
1096
    self.assertEqual(errcode, utils.CERT_WARNING)
1097

    
1098
    # Expiring soon
1099
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
1100
    self.assertEqual(errcode, utils.CERT_ERROR)
1101

    
1102
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
1103
    self.assertEqual(errcode, utils.CERT_WARNING)
1104

    
1105
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
1106
    self.assertEqual(errcode, None)
1107

    
1108
    # Expired
1109
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
1110
    self.assertEqual(errcode, utils.CERT_ERROR)
1111

    
1112
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
1113
    self.assertEqual(errcode, utils.CERT_ERROR)
1114

    
1115
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
1116
    self.assertEqual(errcode, utils.CERT_ERROR)
1117

    
1118
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
1119
    self.assertEqual(errcode, utils.CERT_ERROR)
1120

    
1121

    
1122
class TestFindMatch(unittest.TestCase):
1123
  def test(self):
1124
    data = {
1125
      "aaaa": "Four A",
1126
      "bb": {"Two B": True},
1127
      re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
1128
      }
1129

    
1130
    self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
1131
    self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
1132

    
1133
    for i in ["foo", "bar", "bazX"]:
1134
      for j in range(1, 100, 7):
1135
        self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
1136
                         ((1, 2, 3), [i, str(j)]))
1137

    
1138
  def testNoMatch(self):
1139
    self.assert_(utils.FindMatch({}, "") is None)
1140
    self.assert_(utils.FindMatch({}, "foo") is None)
1141
    self.assert_(utils.FindMatch({}, 1234) is None)
1142

    
1143
    data = {
1144
      "X": "Hello World",
1145
      re.compile("^(something)$"): "Hello World",
1146
      }
1147

    
1148
    self.assert_(utils.FindMatch(data, "") is None)
1149
    self.assert_(utils.FindMatch(data, "Hello World") is None)
1150

    
1151

    
1152
class TimeMock:
1153
  def __init__(self, values):
1154
    self.values = values
1155

    
1156
  def __call__(self):
1157
    return self.values.pop(0)
1158

    
1159

    
1160
class TestRunningTimeout(unittest.TestCase):
1161
  def setUp(self):
1162
    self.time_fn = TimeMock([0.0, 0.3, 4.6, 6.5])
1163

    
1164
  def testRemainingFloat(self):
1165
    timeout = utils.RunningTimeout(5.0, True, _time_fn=self.time_fn)
1166
    self.assertAlmostEqual(timeout.Remaining(), 4.7)
1167
    self.assertAlmostEqual(timeout.Remaining(), 0.4)
1168
    self.assertAlmostEqual(timeout.Remaining(), -1.5)
1169

    
1170
  def testRemaining(self):
1171
    self.time_fn = TimeMock([0, 2, 4, 5, 6])
1172
    timeout = utils.RunningTimeout(5, True, _time_fn=self.time_fn)
1173
    self.assertEqual(timeout.Remaining(), 3)
1174
    self.assertEqual(timeout.Remaining(), 1)
1175
    self.assertEqual(timeout.Remaining(), 0)
1176
    self.assertEqual(timeout.Remaining(), -1)
1177

    
1178
  def testRemainingNonNegative(self):
1179
    timeout = utils.RunningTimeout(5.0, False, _time_fn=self.time_fn)
1180
    self.assertAlmostEqual(timeout.Remaining(), 4.7)
1181
    self.assertAlmostEqual(timeout.Remaining(), 0.4)
1182
    self.assertEqual(timeout.Remaining(), 0.0)
1183

    
1184
  def testNegativeTimeout(self):
1185
    self.assertRaises(ValueError, utils.RunningTimeout, -1.0, True)
1186

    
1187

    
1188
class TestTryConvert(unittest.TestCase):
1189
  def test(self):
1190
    for src, fn, result in [
1191
      ("1", int, 1),
1192
      ("a", int, "a"),
1193
      ("", bool, False),
1194
      ("a", bool, True),
1195
      ]:
1196
      self.assertEqual(utils.TryConvert(fn, src), result)
1197

    
1198

    
1199
class TestIsValidShellParam(unittest.TestCase):
1200
  def test(self):
1201
    for val, result in [
1202
      ("abc", True),
1203
      ("ab;cd", False),
1204
      ]:
1205
      self.assertEqual(utils.IsValidShellParam(val), result)
1206

    
1207

    
1208
class TestBuildShellCmd(unittest.TestCase):
1209
  def test(self):
1210
    self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
1211
                      "ls %s", "ab;cd")
1212
    self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
1213

    
1214

    
1215
if __name__ == '__main__':
1216
  testutils.GanetiTestProgram()