Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 815bf6d5

History | View | Annotate | Download (82.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import distutils.version
25
import errno
26
import fcntl
27
import glob
28
import os
29
import os.path
30
import re
31
import shutil
32
import signal
33
import socket
34
import stat
35
import string
36
import tempfile
37
import time
38
import unittest
39
import warnings
40
import OpenSSL
41
from cStringIO import StringIO
42

    
43
import testutils
44
from ganeti import constants
45
from ganeti import compat
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
49
     ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
50
     TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
51
     ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
52

    
53

    
54
class TestIsProcessAlive(unittest.TestCase):
55
  """Testing case for IsProcessAlive"""
56

    
57
  def testExists(self):
58
    mypid = os.getpid()
59
    self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
60

    
61
  def testNotExisting(self):
62
    pid_non_existing = os.fork()
63
    if pid_non_existing == 0:
64
      os._exit(0)
65
    elif pid_non_existing < 0:
66
      raise SystemError("can't fork")
67
    os.waitpid(pid_non_existing, 0)
68
    self.assertFalse(utils.IsProcessAlive(pid_non_existing),
69
                     "nonexisting process detected")
70

    
71

    
72
class TestGetProcStatusPath(unittest.TestCase):
73
  def test(self):
74
    self.assert_("/1234/" in utils._GetProcStatusPath(1234))
75
    self.assertNotEqual(utils._GetProcStatusPath(1),
76
                        utils._GetProcStatusPath(2))
77

    
78

    
79
class TestIsProcessHandlingSignal(unittest.TestCase):
80
  def setUp(self):
81
    self.tmpdir = tempfile.mkdtemp()
82

    
83
  def tearDown(self):
84
    shutil.rmtree(self.tmpdir)
85

    
86
  def testParseSigsetT(self):
87
    self.assertEqual(len(utils._ParseSigsetT("0")), 0)
88
    self.assertEqual(utils._ParseSigsetT("1"), set([1]))
89
    self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
90
    self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
91
    self.assertEqual(utils._ParseSigsetT("0000000180000202"),
92
                     set([2, 10, 32, 33]))
93
    self.assertEqual(utils._ParseSigsetT("0000000180000002"),
94
                     set([2, 32, 33]))
95
    self.assertEqual(utils._ParseSigsetT("0000000188000002"),
96
                     set([2, 28, 32, 33]))
97
    self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
98
                     set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
99
                          24, 25, 26, 28, 31]))
100
    self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
101

    
102
  def testGetProcStatusField(self):
103
    for field in ["SigCgt", "Name", "FDSize"]:
104
      for value in ["", "0", "cat", "  1234 KB"]:
105
        pstatus = "\n".join([
106
          "VmPeak: 999 kB",
107
          "%s: %s" % (field, value),
108
          "TracerPid: 0",
109
          ])
110
        result = utils._GetProcStatusField(pstatus, field)
111
        self.assertEqual(result, value.strip())
112

    
113
  def test(self):
114
    sp = PathJoin(self.tmpdir, "status")
115

    
116
    utils.WriteFile(sp, data="\n".join([
117
      "Name:   bash",
118
      "State:  S (sleeping)",
119
      "SleepAVG:       98%",
120
      "Pid:    22250",
121
      "PPid:   10858",
122
      "TracerPid:      0",
123
      "SigBlk: 0000000000010000",
124
      "SigIgn: 0000000000384004",
125
      "SigCgt: 000000004b813efb",
126
      "CapEff: 0000000000000000",
127
      ]))
128

    
129
    self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
130

    
131
  def testNoSigCgt(self):
132
    sp = PathJoin(self.tmpdir, "status")
133

    
134
    utils.WriteFile(sp, data="\n".join([
135
      "Name:   bash",
136
      ]))
137

    
138
    self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
139
                      1234, 10, status_path=sp)
140

    
141
  def testNoSuchFile(self):
142
    sp = PathJoin(self.tmpdir, "notexist")
143

    
144
    self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
145

    
146
  @staticmethod
147
  def _TestRealProcess():
148
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
149
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
150
      raise Exception("SIGUSR1 is handled when it should not be")
151

    
152
    signal.signal(signal.SIGUSR1, lambda signum, frame: None)
153
    if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
154
      raise Exception("SIGUSR1 is not handled when it should be")
155

    
156
    signal.signal(signal.SIGUSR1, signal.SIG_IGN)
157
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
158
      raise Exception("SIGUSR1 is not handled when it should be")
159

    
160
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
161
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
162
      raise Exception("SIGUSR1 is handled when it should not be")
163

    
164
    return True
165

    
166
  def testRealProcess(self):
167
    self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
168

    
169

    
170
class TestPidFileFunctions(unittest.TestCase):
171
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
172

    
173
  def setUp(self):
174
    self.dir = tempfile.mkdtemp()
175
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
176
    utils.DaemonPidFileName = self.f_dpn
177

    
178
  def testPidFileFunctions(self):
179
    pid_file = self.f_dpn('test')
180
    fd = utils.WritePidFile(self.f_dpn('test'))
181
    self.failUnless(os.path.exists(pid_file),
182
                    "PID file should have been created")
183
    read_pid = utils.ReadPidFile(pid_file)
184
    self.failUnlessEqual(read_pid, os.getpid())
185
    self.failUnless(utils.IsProcessAlive(read_pid))
186
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
187
                          self.f_dpn('test'))
188
    os.close(fd)
189
    utils.RemovePidFile('test')
190
    self.failIf(os.path.exists(pid_file),
191
                "PID file should not exist anymore")
192
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
193
                         "ReadPidFile should return 0 for missing pid file")
194
    fh = open(pid_file, "w")
195
    fh.write("blah\n")
196
    fh.close()
197
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
198
                         "ReadPidFile should return 0 for invalid pid file")
199
    # but now, even with the file existing, we should be able to lock it
200
    fd = utils.WritePidFile(self.f_dpn('test'))
201
    os.close(fd)
202
    utils.RemovePidFile('test')
203
    self.failIf(os.path.exists(pid_file),
204
                "PID file should not exist anymore")
205

    
206
  def testKill(self):
207
    pid_file = self.f_dpn('child')
208
    r_fd, w_fd = os.pipe()
209
    new_pid = os.fork()
210
    if new_pid == 0: #child
211
      utils.WritePidFile(self.f_dpn('child'))
212
      os.write(w_fd, 'a')
213
      signal.pause()
214
      os._exit(0)
215
      return
216
    # else we are in the parent
217
    # wait until the child has written the pid file
218
    os.read(r_fd, 1)
219
    read_pid = utils.ReadPidFile(pid_file)
220
    self.failUnlessEqual(read_pid, new_pid)
221
    self.failUnless(utils.IsProcessAlive(new_pid))
222
    utils.KillProcess(new_pid, waitpid=True)
223
    self.failIf(utils.IsProcessAlive(new_pid))
224
    utils.RemovePidFile('child')
225
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
226

    
227
  def tearDown(self):
228
    for name in os.listdir(self.dir):
229
      os.unlink(os.path.join(self.dir, name))
230
    os.rmdir(self.dir)
231

    
232

    
233
class TestRunCmd(testutils.GanetiTestCase):
234
  """Testing case for the RunCmd function"""
235

    
236
  def setUp(self):
237
    testutils.GanetiTestCase.setUp(self)
238
    self.magic = time.ctime() + " ganeti test"
239
    self.fname = self._CreateTempFile()
240
    self.fifo_tmpdir = tempfile.mkdtemp()
241
    self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo")
242
    os.mkfifo(self.fifo_file)
243

    
244
  def tearDown(self):
245
    shutil.rmtree(self.fifo_tmpdir)
246

    
247
  def testOk(self):
248
    """Test successful exit code"""
249
    result = RunCmd("/bin/sh -c 'exit 0'")
250
    self.assertEqual(result.exit_code, 0)
251
    self.assertEqual(result.output, "")
252

    
253
  def testFail(self):
254
    """Test fail exit code"""
255
    result = RunCmd("/bin/sh -c 'exit 1'")
256
    self.assertEqual(result.exit_code, 1)
257
    self.assertEqual(result.output, "")
258

    
259
  def testStdout(self):
260
    """Test standard output"""
261
    cmd = 'echo -n "%s"' % self.magic
262
    result = RunCmd("/bin/sh -c '%s'" % cmd)
263
    self.assertEqual(result.stdout, self.magic)
264
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
265
    self.assertEqual(result.output, "")
266
    self.assertFileContent(self.fname, self.magic)
267

    
268
  def testStderr(self):
269
    """Test standard error"""
270
    cmd = 'echo -n "%s"' % self.magic
271
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
272
    self.assertEqual(result.stderr, self.magic)
273
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
274
    self.assertEqual(result.output, "")
275
    self.assertFileContent(self.fname, self.magic)
276

    
277
  def testCombined(self):
278
    """Test combined output"""
279
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
280
    expected = "A" + self.magic + "B" + self.magic
281
    result = RunCmd("/bin/sh -c '%s'" % cmd)
282
    self.assertEqual(result.output, expected)
283
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
284
    self.assertEqual(result.output, "")
285
    self.assertFileContent(self.fname, expected)
286

    
287
  def testSignal(self):
288
    """Test signal"""
289
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
290
    self.assertEqual(result.signal, 15)
291
    self.assertEqual(result.output, "")
292

    
293
  def testTimeoutClean(self):
294
    cmd = "trap 'exit 0' TERM; read < %s" % self.fifo_file
295
    result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
296
    self.assertEqual(result.exit_code, 0)
297

    
298
  def testTimeoutKill(self):
299
    cmd = ["/bin/sh", "-c", "trap '' TERM; read < %s" % self.fifo_file]
300
    timeout = 0.2
301
    out, err, status, ta = utils._RunCmdPipe(cmd, {}, False, "/", False,
302
                                             timeout, _linger_timeout=0.2)
303
    self.assert_(status < 0)
304
    self.assertEqual(-status, signal.SIGKILL)
305

    
306
  def testTimeoutOutputAfterTerm(self):
307
    cmd = "trap 'echo sigtermed; exit 1' TERM; read < %s" % self.fifo_file
308
    result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
309
    self.assert_(result.failed)
310
    self.assertEqual(result.stdout, "sigtermed\n")
311

    
312
  def testListRun(self):
313
    """Test list runs"""
314
    result = RunCmd(["true"])
315
    self.assertEqual(result.signal, None)
316
    self.assertEqual(result.exit_code, 0)
317
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
318
    self.assertEqual(result.signal, None)
319
    self.assertEqual(result.exit_code, 1)
320
    result = RunCmd(["echo", "-n", self.magic])
321
    self.assertEqual(result.signal, None)
322
    self.assertEqual(result.exit_code, 0)
323
    self.assertEqual(result.stdout, self.magic)
324

    
325
  def testFileEmptyOutput(self):
326
    """Test file output"""
327
    result = RunCmd(["true"], output=self.fname)
328
    self.assertEqual(result.signal, None)
329
    self.assertEqual(result.exit_code, 0)
330
    self.assertFileContent(self.fname, "")
331

    
332
  def testLang(self):
333
    """Test locale environment"""
334
    old_env = os.environ.copy()
335
    try:
336
      os.environ["LANG"] = "en_US.UTF-8"
337
      os.environ["LC_ALL"] = "en_US.UTF-8"
338
      result = RunCmd(["locale"])
339
      for line in result.output.splitlines():
340
        key, value = line.split("=", 1)
341
        # Ignore these variables, they're overridden by LC_ALL
342
        if key == "LANG" or key == "LANGUAGE":
343
          continue
344
        self.failIf(value and value != "C" and value != '"C"',
345
            "Variable %s is set to the invalid value '%s'" % (key, value))
346
    finally:
347
      os.environ = old_env
348

    
349
  def testDefaultCwd(self):
350
    """Test default working directory"""
351
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
352

    
353
  def testCwd(self):
354
    """Test default working directory"""
355
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
356
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
357
    cwd = os.getcwd()
358
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
359

    
360
  def testResetEnv(self):
361
    """Test environment reset functionality"""
362
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
363
    self.failUnlessEqual(RunCmd(["env"], reset_env=True,
364
                                env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
365

    
366

    
367
class TestRunParts(unittest.TestCase):
368
  """Testing case for the RunParts function"""
369

    
370
  def setUp(self):
371
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
372

    
373
  def tearDown(self):
374
    shutil.rmtree(self.rundir)
375

    
376
  def testEmpty(self):
377
    """Test on an empty dir"""
378
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
379

    
380
  def testSkipWrongName(self):
381
    """Test that wrong files are skipped"""
382
    fname = os.path.join(self.rundir, "00test.dot")
383
    utils.WriteFile(fname, data="")
384
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
385
    relname = os.path.basename(fname)
386
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
387
                         [(relname, constants.RUNPARTS_SKIP, None)])
388

    
389
  def testSkipNonExec(self):
390
    """Test that non executable files are skipped"""
391
    fname = os.path.join(self.rundir, "00test")
392
    utils.WriteFile(fname, data="")
393
    relname = os.path.basename(fname)
394
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
395
                         [(relname, constants.RUNPARTS_SKIP, None)])
396

    
397
  def testError(self):
398
    """Test error on a broken executable"""
399
    fname = os.path.join(self.rundir, "00test")
400
    utils.WriteFile(fname, data="")
401
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
402
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
403
    self.failUnlessEqual(relname, os.path.basename(fname))
404
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
405
    self.failUnless(error)
406

    
407
  def testSorted(self):
408
    """Test executions are sorted"""
409
    files = []
410
    files.append(os.path.join(self.rundir, "64test"))
411
    files.append(os.path.join(self.rundir, "00test"))
412
    files.append(os.path.join(self.rundir, "42test"))
413

    
414
    for fname in files:
415
      utils.WriteFile(fname, data="")
416

    
417
    results = RunParts(self.rundir, reset_env=True)
418

    
419
    for fname in sorted(files):
420
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
421

    
422
  def testOk(self):
423
    """Test correct execution"""
424
    fname = os.path.join(self.rundir, "00test")
425
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
426
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
427
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
428
    self.failUnlessEqual(relname, os.path.basename(fname))
429
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
430
    self.failUnlessEqual(runresult.stdout, "ciao")
431

    
432
  def testRunFail(self):
433
    """Test correct execution, with run failure"""
434
    fname = os.path.join(self.rundir, "00test")
435
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
436
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
437
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
438
    self.failUnlessEqual(relname, os.path.basename(fname))
439
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
440
    self.failUnlessEqual(runresult.exit_code, 1)
441
    self.failUnless(runresult.failed)
442

    
443
  def testRunMix(self):
444
    files = []
445
    files.append(os.path.join(self.rundir, "00test"))
446
    files.append(os.path.join(self.rundir, "42test"))
447
    files.append(os.path.join(self.rundir, "64test"))
448
    files.append(os.path.join(self.rundir, "99test"))
449

    
450
    files.sort()
451

    
452
    # 1st has errors in execution
453
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
454
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
455

    
456
    # 2nd is skipped
457
    utils.WriteFile(files[1], data="")
458

    
459
    # 3rd cannot execute properly
460
    utils.WriteFile(files[2], data="")
461
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
462

    
463
    # 4th execs
464
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
465
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
466

    
467
    results = RunParts(self.rundir, reset_env=True)
468

    
469
    (relname, status, runresult) = results[0]
470
    self.failUnlessEqual(relname, os.path.basename(files[0]))
471
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
472
    self.failUnlessEqual(runresult.exit_code, 1)
473
    self.failUnless(runresult.failed)
474

    
475
    (relname, status, runresult) = results[1]
476
    self.failUnlessEqual(relname, os.path.basename(files[1]))
477
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
478
    self.failUnlessEqual(runresult, None)
479

    
480
    (relname, status, runresult) = results[2]
481
    self.failUnlessEqual(relname, os.path.basename(files[2]))
482
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
483
    self.failUnless(runresult)
484

    
485
    (relname, status, runresult) = results[3]
486
    self.failUnlessEqual(relname, os.path.basename(files[3]))
487
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
488
    self.failUnlessEqual(runresult.output, "ciao")
489
    self.failUnlessEqual(runresult.exit_code, 0)
490
    self.failUnless(not runresult.failed)
491

    
492

    
493
class TestStartDaemon(testutils.GanetiTestCase):
494
  def setUp(self):
495
    self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
496
    self.tmpfile = os.path.join(self.tmpdir, "test")
497

    
498
  def tearDown(self):
499
    shutil.rmtree(self.tmpdir)
500

    
501
  def testShell(self):
502
    utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
503
    self._wait(self.tmpfile, 60.0, "Hello World")
504

    
505
  def testShellOutput(self):
506
    utils.StartDaemon("echo Hello World", output=self.tmpfile)
507
    self._wait(self.tmpfile, 60.0, "Hello World")
508

    
509
  def testNoShellNoOutput(self):
510
    utils.StartDaemon(["pwd"])
511

    
512
  def testNoShellNoOutputTouch(self):
513
    testfile = os.path.join(self.tmpdir, "check")
514
    self.failIf(os.path.exists(testfile))
515
    utils.StartDaemon(["touch", testfile])
516
    self._wait(testfile, 60.0, "")
517

    
518
  def testNoShellOutput(self):
519
    utils.StartDaemon(["pwd"], output=self.tmpfile)
520
    self._wait(self.tmpfile, 60.0, "/")
521

    
522
  def testNoShellOutputCwd(self):
523
    utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
524
    self._wait(self.tmpfile, 60.0, os.getcwd())
525

    
526
  def testShellEnv(self):
527
    utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
528
                      env={ "GNT_TEST_VAR": "Hello World", })
529
    self._wait(self.tmpfile, 60.0, "Hello World")
530

    
531
  def testNoShellEnv(self):
532
    utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
533
                      env={ "GNT_TEST_VAR": "Hello World", })
534
    self._wait(self.tmpfile, 60.0, "Hello World")
535

    
536
  def testOutputFd(self):
537
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
538
    try:
539
      utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
540
    finally:
541
      os.close(fd)
542
    self._wait(self.tmpfile, 60.0, os.getcwd())
543

    
544
  def testPid(self):
545
    pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
546
    self._wait(self.tmpfile, 60.0, str(pid))
547

    
548
  def testPidFile(self):
549
    pidfile = os.path.join(self.tmpdir, "pid")
550
    checkfile = os.path.join(self.tmpdir, "abort")
551

    
552
    pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
553
                            output=self.tmpfile)
554
    try:
555
      fd = os.open(pidfile, os.O_RDONLY)
556
      try:
557
        # Check file is locked
558
        self.assertRaises(errors.LockError, utils.LockFile, fd)
559

    
560
        pidtext = os.read(fd, 100)
561
      finally:
562
        os.close(fd)
563

    
564
      self.assertEqual(int(pidtext.strip()), pid)
565

    
566
      self.assert_(utils.IsProcessAlive(pid))
567
    finally:
568
      # No matter what happens, kill daemon
569
      utils.KillProcess(pid, timeout=5.0, waitpid=False)
570
      self.failIf(utils.IsProcessAlive(pid))
571

    
572
    self.assertEqual(utils.ReadFile(self.tmpfile), "")
573

    
574
  def _wait(self, path, timeout, expected):
575
    # Due to the asynchronous nature of daemon processes, polling is necessary.
576
    # A timeout makes sure the test doesn't hang forever.
577
    def _CheckFile():
578
      if not (os.path.isfile(path) and
579
              utils.ReadFile(path).strip() == expected):
580
        raise utils.RetryAgain()
581

    
582
    try:
583
      utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
584
    except utils.RetryTimeout:
585
      self.fail("Apparently the daemon didn't run in %s seconds and/or"
586
                " didn't write the correct output" % timeout)
587

    
588
  def testError(self):
589
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
590
                      ["./does-NOT-EXIST/here/0123456789"])
591
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
592
                      ["./does-NOT-EXIST/here/0123456789"],
593
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
594
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
595
                      ["./does-NOT-EXIST/here/0123456789"],
596
                      cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
597
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
598
                      ["./does-NOT-EXIST/here/0123456789"],
599
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
600

    
601
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
602
    try:
603
      self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
604
                        ["./does-NOT-EXIST/here/0123456789"],
605
                        output=self.tmpfile, output_fd=fd)
606
    finally:
607
      os.close(fd)
608

    
609

    
610
class TestSetCloseOnExecFlag(unittest.TestCase):
611
  """Tests for SetCloseOnExecFlag"""
612

    
613
  def setUp(self):
614
    self.tmpfile = tempfile.TemporaryFile()
615

    
616
  def testEnable(self):
617
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
618
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
619
                    fcntl.FD_CLOEXEC)
620

    
621
  def testDisable(self):
622
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
623
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
624
                fcntl.FD_CLOEXEC)
625

    
626

    
627
class TestSetNonblockFlag(unittest.TestCase):
628
  def setUp(self):
629
    self.tmpfile = tempfile.TemporaryFile()
630

    
631
  def testEnable(self):
632
    utils.SetNonblockFlag(self.tmpfile.fileno(), True)
633
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
634
                    os.O_NONBLOCK)
635

    
636
  def testDisable(self):
637
    utils.SetNonblockFlag(self.tmpfile.fileno(), False)
638
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
639
                os.O_NONBLOCK)
640

    
641

    
642
class TestRemoveFile(unittest.TestCase):
643
  """Test case for the RemoveFile function"""
644

    
645
  def setUp(self):
646
    """Create a temp dir and file for each case"""
647
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
648
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
649
    os.close(fd)
650

    
651
  def tearDown(self):
652
    if os.path.exists(self.tmpfile):
653
      os.unlink(self.tmpfile)
654
    os.rmdir(self.tmpdir)
655

    
656
  def testIgnoreDirs(self):
657
    """Test that RemoveFile() ignores directories"""
658
    self.assertEqual(None, RemoveFile(self.tmpdir))
659

    
660
  def testIgnoreNotExisting(self):
661
    """Test that RemoveFile() ignores non-existing files"""
662
    RemoveFile(self.tmpfile)
663
    RemoveFile(self.tmpfile)
664

    
665
  def testRemoveFile(self):
666
    """Test that RemoveFile does remove a file"""
667
    RemoveFile(self.tmpfile)
668
    if os.path.exists(self.tmpfile):
669
      self.fail("File '%s' not removed" % self.tmpfile)
670

    
671
  def testRemoveSymlink(self):
672
    """Test that RemoveFile does remove symlinks"""
673
    symlink = self.tmpdir + "/symlink"
674
    os.symlink("no-such-file", symlink)
675
    RemoveFile(symlink)
676
    if os.path.exists(symlink):
677
      self.fail("File '%s' not removed" % symlink)
678
    os.symlink(self.tmpfile, symlink)
679
    RemoveFile(symlink)
680
    if os.path.exists(symlink):
681
      self.fail("File '%s' not removed" % symlink)
682

    
683

    
684
class TestRename(unittest.TestCase):
685
  """Test case for RenameFile"""
686

    
687
  def setUp(self):
688
    """Create a temporary directory"""
689
    self.tmpdir = tempfile.mkdtemp()
690
    self.tmpfile = os.path.join(self.tmpdir, "test1")
691

    
692
    # Touch the file
693
    open(self.tmpfile, "w").close()
694

    
695
  def tearDown(self):
696
    """Remove temporary directory"""
697
    shutil.rmtree(self.tmpdir)
698

    
699
  def testSimpleRename1(self):
700
    """Simple rename 1"""
701
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
702
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
703

    
704
  def testSimpleRename2(self):
705
    """Simple rename 2"""
706
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
707
                     mkdir=True)
708
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
709

    
710
  def testRenameMkdir(self):
711
    """Rename with mkdir"""
712
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
713
                     mkdir=True)
714
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
715
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
716

    
717
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
718
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
719
                     mkdir=True)
720
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
721
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
722
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
723

    
724

    
725
class TestMatchNameComponent(unittest.TestCase):
726
  """Test case for the MatchNameComponent function"""
727

    
728
  def testEmptyList(self):
729
    """Test that there is no match against an empty list"""
730

    
731
    self.failUnlessEqual(MatchNameComponent("", []), None)
732
    self.failUnlessEqual(MatchNameComponent("test", []), None)
733

    
734
  def testSingleMatch(self):
735
    """Test that a single match is performed correctly"""
736
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
737
    for key in "test2", "test2.example", "test2.example.com":
738
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
739

    
740
  def testMultipleMatches(self):
741
    """Test that a multiple match is returned as None"""
742
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
743
    for key in "test1", "test1.example":
744
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
745

    
746
  def testFullMatch(self):
747
    """Test that a full match is returned correctly"""
748
    key1 = "test1"
749
    key2 = "test1.example"
750
    mlist = [key2, key2 + ".com"]
751
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
752
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
753

    
754
  def testCaseInsensitivePartialMatch(self):
755
    """Test for the case_insensitive keyword"""
756
    mlist = ["test1.example.com", "test2.example.net"]
757
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
758
                     "test2.example.net")
759
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
760
                     "test2.example.net")
761
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
762
                     "test2.example.net")
763
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
764
                     "test2.example.net")
765

    
766

    
767
  def testCaseInsensitiveFullMatch(self):
768
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
769
    # Between the two ts1 a full string match non-case insensitive should work
770
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
771
                     None)
772
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
773
                     "ts1.ex")
774
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
775
                     "ts1.ex")
776
    # Between the two ts2 only case differs, so only case-match works
777
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
778
                     "ts2.ex")
779
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
780
                     "Ts2.ex")
781
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
782
                     None)
783

    
784

    
785
class TestReadFile(testutils.GanetiTestCase):
786

    
787
  def testReadAll(self):
788
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
789
    self.assertEqual(len(data), 814)
790

    
791
    h = compat.md5_hash()
792
    h.update(data)
793
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
794

    
795
  def testReadSize(self):
796
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
797
                          size=100)
798
    self.assertEqual(len(data), 100)
799

    
800
    h = compat.md5_hash()
801
    h.update(data)
802
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
803

    
804
  def testError(self):
805
    self.assertRaises(EnvironmentError, utils.ReadFile,
806
                      "/dev/null/does-not-exist")
807

    
808

    
809
class TestReadOneLineFile(testutils.GanetiTestCase):
810

    
811
  def setUp(self):
812
    testutils.GanetiTestCase.setUp(self)
813

    
814
  def testDefault(self):
815
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
816
    self.assertEqual(len(data), 27)
817
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
818

    
819
  def testNotStrict(self):
820
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
821
    self.assertEqual(len(data), 27)
822
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
823

    
824
  def testStrictFailure(self):
825
    self.assertRaises(errors.GenericError, ReadOneLineFile,
826
                      self._TestDataFilename("cert1.pem"), strict=True)
827

    
828
  def testLongLine(self):
829
    dummydata = (1024 * "Hello World! ")
830
    myfile = self._CreateTempFile()
831
    utils.WriteFile(myfile, data=dummydata)
832
    datastrict = ReadOneLineFile(myfile, strict=True)
833
    datalax = ReadOneLineFile(myfile, strict=False)
834
    self.assertEqual(dummydata, datastrict)
835
    self.assertEqual(dummydata, datalax)
836

    
837
  def testNewline(self):
838
    myfile = self._CreateTempFile()
839
    myline = "myline"
840
    for nl in ["", "\n", "\r\n"]:
841
      dummydata = "%s%s" % (myline, nl)
842
      utils.WriteFile(myfile, data=dummydata)
843
      datalax = ReadOneLineFile(myfile, strict=False)
844
      self.assertEqual(myline, datalax)
845
      datastrict = ReadOneLineFile(myfile, strict=True)
846
      self.assertEqual(myline, datastrict)
847

    
848
  def testWhitespaceAndMultipleLines(self):
849
    myfile = self._CreateTempFile()
850
    for nl in ["", "\n", "\r\n"]:
851
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
852
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
853
        utils.WriteFile(myfile, data=dummydata)
854
        datalax = ReadOneLineFile(myfile, strict=False)
855
        if nl:
856
          self.assert_(set("\r\n") & set(dummydata))
857
          self.assertRaises(errors.GenericError, ReadOneLineFile,
858
                            myfile, strict=True)
859
          explen = len("Foo bar baz ") + len(ws)
860
          self.assertEqual(len(datalax), explen)
861
          self.assertEqual(datalax, dummydata[:explen])
862
          self.assertFalse(set("\r\n") & set(datalax))
863
        else:
864
          datastrict = ReadOneLineFile(myfile, strict=True)
865
          self.assertEqual(dummydata, datastrict)
866
          self.assertEqual(dummydata, datalax)
867

    
868
  def testEmptylines(self):
869
    myfile = self._CreateTempFile()
870
    myline = "myline"
871
    for nl in ["\n", "\r\n"]:
872
      for ol in ["", "otherline"]:
873
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
874
        utils.WriteFile(myfile, data=dummydata)
875
        self.assert_(set("\r\n") & set(dummydata))
876
        datalax = ReadOneLineFile(myfile, strict=False)
877
        self.assertEqual(myline, datalax)
878
        if ol:
879
          self.assertRaises(errors.GenericError, ReadOneLineFile,
880
                            myfile, strict=True)
881
        else:
882
          datastrict = ReadOneLineFile(myfile, strict=True)
883
          self.assertEqual(myline, datastrict)
884

    
885

    
886
class TestTimestampForFilename(unittest.TestCase):
887
  def test(self):
888
    self.assert_("." not in utils.TimestampForFilename())
889
    self.assert_(":" not in utils.TimestampForFilename())
890

    
891

    
892
class TestCreateBackup(testutils.GanetiTestCase):
893
  def setUp(self):
894
    testutils.GanetiTestCase.setUp(self)
895

    
896
    self.tmpdir = tempfile.mkdtemp()
897

    
898
  def tearDown(self):
899
    testutils.GanetiTestCase.tearDown(self)
900

    
901
    shutil.rmtree(self.tmpdir)
902

    
903
  def testEmpty(self):
904
    filename = PathJoin(self.tmpdir, "config.data")
905
    utils.WriteFile(filename, data="")
906
    bname = utils.CreateBackup(filename)
907
    self.assertFileContent(bname, "")
908
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
909
    utils.CreateBackup(filename)
910
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
911
    utils.CreateBackup(filename)
912
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
913

    
914
    fifoname = PathJoin(self.tmpdir, "fifo")
915
    os.mkfifo(fifoname)
916
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
917

    
918
  def testContent(self):
919
    bkpcount = 0
920
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
921
      for rep in [1, 2, 10, 127]:
922
        testdata = data * rep
923

    
924
        filename = PathJoin(self.tmpdir, "test.data_")
925
        utils.WriteFile(filename, data=testdata)
926
        self.assertFileContent(filename, testdata)
927

    
928
        for _ in range(3):
929
          bname = utils.CreateBackup(filename)
930
          bkpcount += 1
931
          self.assertFileContent(bname, testdata)
932
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
933

    
934

    
935
class TestFormatUnit(unittest.TestCase):
936
  """Test case for the FormatUnit function"""
937

    
938
  def testMiB(self):
939
    self.assertEqual(FormatUnit(1, 'h'), '1M')
940
    self.assertEqual(FormatUnit(100, 'h'), '100M')
941
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
942

    
943
    self.assertEqual(FormatUnit(1, 'm'), '1')
944
    self.assertEqual(FormatUnit(100, 'm'), '100')
945
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
946

    
947
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
948
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
949
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
950
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
951

    
952
  def testGiB(self):
953
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
954
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
955
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
956
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
957

    
958
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
959
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
960
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
961
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
962

    
963
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
964
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
965
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
966

    
967
  def testTiB(self):
968
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
969
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
970
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
971

    
972
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
973
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
974
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
975

    
976

    
977
class TestParseUnit(unittest.TestCase):
978
  """Test case for the ParseUnit function"""
979

    
980
  SCALES = (('', 1),
981
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
982
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
983
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
984

    
985
  def testRounding(self):
986
    self.assertEqual(ParseUnit('0'), 0)
987
    self.assertEqual(ParseUnit('1'), 4)
988
    self.assertEqual(ParseUnit('2'), 4)
989
    self.assertEqual(ParseUnit('3'), 4)
990

    
991
    self.assertEqual(ParseUnit('124'), 124)
992
    self.assertEqual(ParseUnit('125'), 128)
993
    self.assertEqual(ParseUnit('126'), 128)
994
    self.assertEqual(ParseUnit('127'), 128)
995
    self.assertEqual(ParseUnit('128'), 128)
996
    self.assertEqual(ParseUnit('129'), 132)
997
    self.assertEqual(ParseUnit('130'), 132)
998

    
999
  def testFloating(self):
1000
    self.assertEqual(ParseUnit('0'), 0)
1001
    self.assertEqual(ParseUnit('0.5'), 4)
1002
    self.assertEqual(ParseUnit('1.75'), 4)
1003
    self.assertEqual(ParseUnit('1.99'), 4)
1004
    self.assertEqual(ParseUnit('2.00'), 4)
1005
    self.assertEqual(ParseUnit('2.01'), 4)
1006
    self.assertEqual(ParseUnit('3.99'), 4)
1007
    self.assertEqual(ParseUnit('4.00'), 4)
1008
    self.assertEqual(ParseUnit('4.01'), 8)
1009
    self.assertEqual(ParseUnit('1.5G'), 1536)
1010
    self.assertEqual(ParseUnit('1.8G'), 1844)
1011
    self.assertEqual(ParseUnit('8.28T'), 8682212)
1012

    
1013
  def testSuffixes(self):
1014
    for sep in ('', ' ', '   ', "\t", "\t "):
1015
      for suffix, scale in TestParseUnit.SCALES:
1016
        for func in (lambda x: x, str.lower, str.upper):
1017
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
1018
                           1024 * scale)
1019

    
1020
  def testInvalidInput(self):
1021
    for sep in ('-', '_', ',', 'a'):
1022
      for suffix, _ in TestParseUnit.SCALES:
1023
        self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
1024

    
1025
    for suffix, _ in TestParseUnit.SCALES:
1026
      self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
1027

    
1028

    
1029
class TestParseCpuMask(unittest.TestCase):
1030
  """Test case for the ParseCpuMask function."""
1031

    
1032
  def testWellFormed(self):
1033
    self.assertEqual(utils.ParseCpuMask(""), [])
1034
    self.assertEqual(utils.ParseCpuMask("1"), [1])
1035
    self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
1036

    
1037
  def testInvalidInput(self):
1038
    self.assertRaises(errors.ParseError,
1039
                      utils.ParseCpuMask,
1040
                      "garbage")
1041
    self.assertRaises(errors.ParseError,
1042
                      utils.ParseCpuMask,
1043
                      "0,")
1044
    self.assertRaises(errors.ParseError,
1045
                      utils.ParseCpuMask,
1046
                      "0-1-2")
1047
    self.assertRaises(errors.ParseError,
1048
                      utils.ParseCpuMask,
1049
                      "2-1")
1050

    
1051
class TestSshKeys(testutils.GanetiTestCase):
1052
  """Test case for the AddAuthorizedKey function"""
1053

    
1054
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1055
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
1056
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1057

    
1058
  def setUp(self):
1059
    testutils.GanetiTestCase.setUp(self)
1060
    self.tmpname = self._CreateTempFile()
1061
    handle = open(self.tmpname, 'w')
1062
    try:
1063
      handle.write("%s\n" % TestSshKeys.KEY_A)
1064
      handle.write("%s\n" % TestSshKeys.KEY_B)
1065
    finally:
1066
      handle.close()
1067

    
1068
  def testAddingNewKey(self):
1069
    utils.AddAuthorizedKey(self.tmpname,
1070
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1071

    
1072
    self.assertFileContent(self.tmpname,
1073
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1074
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1075
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1076
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1077

    
1078
  def testAddingAlmostButNotCompletelyTheSameKey(self):
1079
    utils.AddAuthorizedKey(self.tmpname,
1080
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1081

    
1082
    self.assertFileContent(self.tmpname,
1083
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1084
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1085
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1086
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1087

    
1088
  def testAddingExistingKeyWithSomeMoreSpaces(self):
1089
    utils.AddAuthorizedKey(self.tmpname,
1090
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1091

    
1092
    self.assertFileContent(self.tmpname,
1093
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1094
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1095
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1096

    
1097
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
1098
    utils.RemoveAuthorizedKey(self.tmpname,
1099
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1100

    
1101
    self.assertFileContent(self.tmpname,
1102
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1103
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1104

    
1105
  def testRemovingNonExistingKey(self):
1106
    utils.RemoveAuthorizedKey(self.tmpname,
1107
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
1108

    
1109
    self.assertFileContent(self.tmpname,
1110
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1111
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1112
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1113

    
1114

    
1115
class TestEtcHosts(testutils.GanetiTestCase):
1116
  """Test functions modifying /etc/hosts"""
1117

    
1118
  def setUp(self):
1119
    testutils.GanetiTestCase.setUp(self)
1120
    self.tmpname = self._CreateTempFile()
1121
    handle = open(self.tmpname, 'w')
1122
    try:
1123
      handle.write('# This is a test file for /etc/hosts\n')
1124
      handle.write('127.0.0.1\tlocalhost\n')
1125
      handle.write('192.0.2.1 router gw\n')
1126
    finally:
1127
      handle.close()
1128

    
1129
  def testSettingNewIp(self):
1130
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
1131
                     ['myhost'])
1132

    
1133
    self.assertFileContent(self.tmpname,
1134
      "# This is a test file for /etc/hosts\n"
1135
      "127.0.0.1\tlocalhost\n"
1136
      "192.0.2.1 router gw\n"
1137
      "198.51.100.4\tmyhost.example.com myhost\n")
1138
    self.assertFileMode(self.tmpname, 0644)
1139

    
1140
  def testSettingExistingIp(self):
1141
    SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
1142
                     ['myhost'])
1143

    
1144
    self.assertFileContent(self.tmpname,
1145
      "# This is a test file for /etc/hosts\n"
1146
      "127.0.0.1\tlocalhost\n"
1147
      "192.0.2.1\tmyhost.example.com myhost\n")
1148
    self.assertFileMode(self.tmpname, 0644)
1149

    
1150
  def testSettingDuplicateName(self):
1151
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
1152

    
1153
    self.assertFileContent(self.tmpname,
1154
      "# This is a test file for /etc/hosts\n"
1155
      "127.0.0.1\tlocalhost\n"
1156
      "192.0.2.1 router gw\n"
1157
      "198.51.100.4\tmyhost\n")
1158
    self.assertFileMode(self.tmpname, 0644)
1159

    
1160
  def testRemovingExistingHost(self):
1161
    RemoveEtcHostsEntry(self.tmpname, 'router')
1162

    
1163
    self.assertFileContent(self.tmpname,
1164
      "# This is a test file for /etc/hosts\n"
1165
      "127.0.0.1\tlocalhost\n"
1166
      "192.0.2.1 gw\n")
1167
    self.assertFileMode(self.tmpname, 0644)
1168

    
1169
  def testRemovingSingleExistingHost(self):
1170
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
1171

    
1172
    self.assertFileContent(self.tmpname,
1173
      "# This is a test file for /etc/hosts\n"
1174
      "192.0.2.1 router gw\n")
1175
    self.assertFileMode(self.tmpname, 0644)
1176

    
1177
  def testRemovingNonExistingHost(self):
1178
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
1179

    
1180
    self.assertFileContent(self.tmpname,
1181
      "# This is a test file for /etc/hosts\n"
1182
      "127.0.0.1\tlocalhost\n"
1183
      "192.0.2.1 router gw\n")
1184
    self.assertFileMode(self.tmpname, 0644)
1185

    
1186
  def testRemovingAlias(self):
1187
    RemoveEtcHostsEntry(self.tmpname, 'gw')
1188

    
1189
    self.assertFileContent(self.tmpname,
1190
      "# This is a test file for /etc/hosts\n"
1191
      "127.0.0.1\tlocalhost\n"
1192
      "192.0.2.1 router\n")
1193
    self.assertFileMode(self.tmpname, 0644)
1194

    
1195

    
1196
class TestGetMounts(unittest.TestCase):
1197
  """Test case for GetMounts()."""
1198

    
1199
  TESTDATA = (
1200
    "rootfs /     rootfs rw 0 0\n"
1201
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
1202
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
1203

    
1204
  def setUp(self):
1205
    self.tmpfile = tempfile.NamedTemporaryFile()
1206
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1207

    
1208
  def testGetMounts(self):
1209
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1210
      [
1211
        ("rootfs", "/", "rootfs", "rw"),
1212
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1213
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1214
      ])
1215

    
1216

    
1217
class TestShellQuoting(unittest.TestCase):
1218
  """Test case for shell quoting functions"""
1219

    
1220
  def testShellQuote(self):
1221
    self.assertEqual(ShellQuote('abc'), "abc")
1222
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1223
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1224
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
1225
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1226

    
1227
  def testShellQuoteArgs(self):
1228
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1229
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1230
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1231

    
1232

    
1233
class TestListVisibleFiles(unittest.TestCase):
1234
  """Test case for ListVisibleFiles"""
1235

    
1236
  def setUp(self):
1237
    self.path = tempfile.mkdtemp()
1238

    
1239
  def tearDown(self):
1240
    shutil.rmtree(self.path)
1241

    
1242
  def _CreateFiles(self, files):
1243
    for name in files:
1244
      utils.WriteFile(os.path.join(self.path, name), data="test")
1245

    
1246
  def _test(self, files, expected):
1247
    self._CreateFiles(files)
1248
    found = ListVisibleFiles(self.path)
1249
    self.assertEqual(set(found), set(expected))
1250

    
1251
  def testAllVisible(self):
1252
    files = ["a", "b", "c"]
1253
    expected = files
1254
    self._test(files, expected)
1255

    
1256
  def testNoneVisible(self):
1257
    files = [".a", ".b", ".c"]
1258
    expected = []
1259
    self._test(files, expected)
1260

    
1261
  def testSomeVisible(self):
1262
    files = ["a", "b", ".c"]
1263
    expected = ["a", "b"]
1264
    self._test(files, expected)
1265

    
1266
  def testNonAbsolutePath(self):
1267
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1268

    
1269
  def testNonNormalizedPath(self):
1270
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1271
                          "/bin/../tmp")
1272

    
1273

    
1274
class TestNewUUID(unittest.TestCase):
1275
  """Test case for NewUUID"""
1276

    
1277
  def runTest(self):
1278
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
1279

    
1280

    
1281
class TestUniqueSequence(unittest.TestCase):
1282
  """Test case for UniqueSequence"""
1283

    
1284
  def _test(self, input, expected):
1285
    self.assertEqual(utils.UniqueSequence(input), expected)
1286

    
1287
  def runTest(self):
1288
    # Ordered input
1289
    self._test([1, 2, 3], [1, 2, 3])
1290
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1291
    self._test([1, 2, 2, 3], [1, 2, 3])
1292
    self._test([1, 2, 3, 3], [1, 2, 3])
1293

    
1294
    # Unordered input
1295
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1296
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1297

    
1298
    # Strings
1299
    self._test(["a", "a"], ["a"])
1300
    self._test(["a", "b"], ["a", "b"])
1301
    self._test(["a", "b", "a"], ["a", "b"])
1302

    
1303

    
1304
class TestFirstFree(unittest.TestCase):
1305
  """Test case for the FirstFree function"""
1306

    
1307
  def test(self):
1308
    """Test FirstFree"""
1309
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1310
    self.failUnlessEqual(FirstFree([]), None)
1311
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1312
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1313
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1314

    
1315

    
1316
class TestTailFile(testutils.GanetiTestCase):
1317
  """Test case for the TailFile function"""
1318

    
1319
  def testEmpty(self):
1320
    fname = self._CreateTempFile()
1321
    self.failUnlessEqual(TailFile(fname), [])
1322
    self.failUnlessEqual(TailFile(fname, lines=25), [])
1323

    
1324
  def testAllLines(self):
1325
    data = ["test %d" % i for i in range(30)]
1326
    for i in range(30):
1327
      fname = self._CreateTempFile()
1328
      fd = open(fname, "w")
1329
      fd.write("\n".join(data[:i]))
1330
      if i > 0:
1331
        fd.write("\n")
1332
      fd.close()
1333
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1334

    
1335
  def testPartialLines(self):
1336
    data = ["test %d" % i for i in range(30)]
1337
    fname = self._CreateTempFile()
1338
    fd = open(fname, "w")
1339
    fd.write("\n".join(data))
1340
    fd.write("\n")
1341
    fd.close()
1342
    for i in range(1, 30):
1343
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1344

    
1345
  def testBigFile(self):
1346
    data = ["test %d" % i for i in range(30)]
1347
    fname = self._CreateTempFile()
1348
    fd = open(fname, "w")
1349
    fd.write("X" * 1048576)
1350
    fd.write("\n")
1351
    fd.write("\n".join(data))
1352
    fd.write("\n")
1353
    fd.close()
1354
    for i in range(1, 30):
1355
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1356

    
1357

    
1358
class _BaseFileLockTest:
1359
  """Test case for the FileLock class"""
1360

    
1361
  def testSharedNonblocking(self):
1362
    self.lock.Shared(blocking=False)
1363
    self.lock.Close()
1364

    
1365
  def testExclusiveNonblocking(self):
1366
    self.lock.Exclusive(blocking=False)
1367
    self.lock.Close()
1368

    
1369
  def testUnlockNonblocking(self):
1370
    self.lock.Unlock(blocking=False)
1371
    self.lock.Close()
1372

    
1373
  def testSharedBlocking(self):
1374
    self.lock.Shared(blocking=True)
1375
    self.lock.Close()
1376

    
1377
  def testExclusiveBlocking(self):
1378
    self.lock.Exclusive(blocking=True)
1379
    self.lock.Close()
1380

    
1381
  def testUnlockBlocking(self):
1382
    self.lock.Unlock(blocking=True)
1383
    self.lock.Close()
1384

    
1385
  def testSharedExclusiveUnlock(self):
1386
    self.lock.Shared(blocking=False)
1387
    self.lock.Exclusive(blocking=False)
1388
    self.lock.Unlock(blocking=False)
1389
    self.lock.Close()
1390

    
1391
  def testExclusiveSharedUnlock(self):
1392
    self.lock.Exclusive(blocking=False)
1393
    self.lock.Shared(blocking=False)
1394
    self.lock.Unlock(blocking=False)
1395
    self.lock.Close()
1396

    
1397
  def testSimpleTimeout(self):
1398
    # These will succeed on the first attempt, hence a short timeout
1399
    self.lock.Shared(blocking=True, timeout=10.0)
1400
    self.lock.Exclusive(blocking=False, timeout=10.0)
1401
    self.lock.Unlock(blocking=True, timeout=10.0)
1402
    self.lock.Close()
1403

    
1404
  @staticmethod
1405
  def _TryLockInner(filename, shared, blocking):
1406
    lock = utils.FileLock.Open(filename)
1407

    
1408
    if shared:
1409
      fn = lock.Shared
1410
    else:
1411
      fn = lock.Exclusive
1412

    
1413
    try:
1414
      # The timeout doesn't really matter as the parent process waits for us to
1415
      # finish anyway.
1416
      fn(blocking=blocking, timeout=0.01)
1417
    except errors.LockError, err:
1418
      return False
1419

    
1420
    return True
1421

    
1422
  def _TryLock(self, *args):
1423
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1424
                                      *args)
1425

    
1426
  def testTimeout(self):
1427
    for blocking in [True, False]:
1428
      self.lock.Exclusive(blocking=True)
1429
      self.failIf(self._TryLock(False, blocking))
1430
      self.failIf(self._TryLock(True, blocking))
1431

    
1432
      self.lock.Shared(blocking=True)
1433
      self.assert_(self._TryLock(True, blocking))
1434
      self.failIf(self._TryLock(False, blocking))
1435

    
1436
  def testCloseShared(self):
1437
    self.lock.Close()
1438
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1439

    
1440
  def testCloseExclusive(self):
1441
    self.lock.Close()
1442
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1443

    
1444
  def testCloseUnlock(self):
1445
    self.lock.Close()
1446
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1447

    
1448

    
1449
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1450
  TESTDATA = "Hello World\n" * 10
1451

    
1452
  def setUp(self):
1453
    testutils.GanetiTestCase.setUp(self)
1454

    
1455
    self.tmpfile = tempfile.NamedTemporaryFile()
1456
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1457
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1458

    
1459
    # Ensure "Open" didn't truncate file
1460
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1461

    
1462
  def tearDown(self):
1463
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1464

    
1465
    testutils.GanetiTestCase.tearDown(self)
1466

    
1467

    
1468
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1469
  def setUp(self):
1470
    self.tmpfile = tempfile.NamedTemporaryFile()
1471
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1472

    
1473

    
1474
class TestTimeFunctions(unittest.TestCase):
1475
  """Test case for time functions"""
1476

    
1477
  def runTest(self):
1478
    self.assertEqual(utils.SplitTime(1), (1, 0))
1479
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1480
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1481
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1482
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1483
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1484
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1485
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1486

    
1487
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1488

    
1489
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1490
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1491
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1492

    
1493
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1494
                     1218448917.481)
1495
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1496

    
1497
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1498
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1499
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1500
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1501
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1502

    
1503

    
1504
class FieldSetTestCase(unittest.TestCase):
1505
  """Test case for FieldSets"""
1506

    
1507
  def testSimpleMatch(self):
1508
    f = utils.FieldSet("a", "b", "c", "def")
1509
    self.failUnless(f.Matches("a"))
1510
    self.failIf(f.Matches("d"), "Substring matched")
1511
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1512
    self.failIf(f.NonMatching(["b", "c"]))
1513
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1514
    self.failUnless(f.NonMatching(["a", "d"]))
1515

    
1516
  def testRegexMatch(self):
1517
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1518
    self.failUnless(f.Matches("b1"))
1519
    self.failUnless(f.Matches("b99"))
1520
    self.failIf(f.Matches("b/1"))
1521
    self.failIf(f.NonMatching(["b12", "c"]))
1522
    self.failUnless(f.NonMatching(["a", "1"]))
1523

    
1524
class TestForceDictType(unittest.TestCase):
1525
  """Test case for ForceDictType"""
1526

    
1527
  def setUp(self):
1528
    self.key_types = {
1529
      'a': constants.VTYPE_INT,
1530
      'b': constants.VTYPE_BOOL,
1531
      'c': constants.VTYPE_STRING,
1532
      'd': constants.VTYPE_SIZE,
1533
      "e": constants.VTYPE_MAYBE_STRING,
1534
      }
1535

    
1536
  def _fdt(self, dict, allowed_values=None):
1537
    if allowed_values is None:
1538
      utils.ForceDictType(dict, self.key_types)
1539
    else:
1540
      utils.ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1541

    
1542
    return dict
1543

    
1544
  def testSimpleDict(self):
1545
    self.assertEqual(self._fdt({}), {})
1546
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1547
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1548
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1549
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1550
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1551
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1552
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1553
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1554
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1555
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1556
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1557
    self.assertEqual(self._fdt({"e": None, }), {"e": None, })
1558
    self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
1559
    self.assertEqual(self._fdt({"e": False, }), {"e": '', })
1560

    
1561
  def testErrors(self):
1562
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1563
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1564
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1565
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1566
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
1567
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
1568

    
1569

    
1570
class TestIsNormAbsPath(unittest.TestCase):
1571
  """Testing case for IsNormAbsPath"""
1572

    
1573
  def _pathTestHelper(self, path, result):
1574
    if result:
1575
      self.assert_(utils.IsNormAbsPath(path),
1576
          "Path %s should result absolute and normalized" % path)
1577
    else:
1578
      self.assertFalse(utils.IsNormAbsPath(path),
1579
          "Path %s should not result absolute and normalized" % path)
1580

    
1581
  def testBase(self):
1582
    self._pathTestHelper('/etc', True)
1583
    self._pathTestHelper('/srv', True)
1584
    self._pathTestHelper('etc', False)
1585
    self._pathTestHelper('/etc/../root', False)
1586
    self._pathTestHelper('/etc/', False)
1587

    
1588

    
1589
class TestSafeEncode(unittest.TestCase):
1590
  """Test case for SafeEncode"""
1591

    
1592
  def testAscii(self):
1593
    for txt in [string.digits, string.letters, string.punctuation]:
1594
      self.failUnlessEqual(txt, SafeEncode(txt))
1595

    
1596
  def testDoubleEncode(self):
1597
    for i in range(255):
1598
      txt = SafeEncode(chr(i))
1599
      self.failUnlessEqual(txt, SafeEncode(txt))
1600

    
1601
  def testUnicode(self):
1602
    # 1024 is high enough to catch non-direct ASCII mappings
1603
    for i in range(1024):
1604
      txt = SafeEncode(unichr(i))
1605
      self.failUnlessEqual(txt, SafeEncode(txt))
1606

    
1607

    
1608
class TestFormatTime(unittest.TestCase):
1609
  """Testing case for FormatTime"""
1610

    
1611
  def testNone(self):
1612
    self.failUnlessEqual(FormatTime(None), "N/A")
1613

    
1614
  def testInvalid(self):
1615
    self.failUnlessEqual(FormatTime(()), "N/A")
1616

    
1617
  def testNow(self):
1618
    # tests that we accept time.time input
1619
    FormatTime(time.time())
1620
    # tests that we accept int input
1621
    FormatTime(int(time.time()))
1622

    
1623

    
1624
class RunInSeparateProcess(unittest.TestCase):
1625
  def test(self):
1626
    for exp in [True, False]:
1627
      def _child():
1628
        return exp
1629

    
1630
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1631

    
1632
  def testArgs(self):
1633
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1634
      def _child(carg1, carg2):
1635
        return carg1 == "Foo" and carg2 == arg
1636

    
1637
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1638

    
1639
  def testPid(self):
1640
    parent_pid = os.getpid()
1641

    
1642
    def _check():
1643
      return os.getpid() == parent_pid
1644

    
1645
    self.failIf(utils.RunInSeparateProcess(_check))
1646

    
1647
  def testSignal(self):
1648
    def _kill():
1649
      os.kill(os.getpid(), signal.SIGTERM)
1650

    
1651
    self.assertRaises(errors.GenericError,
1652
                      utils.RunInSeparateProcess, _kill)
1653

    
1654
  def testException(self):
1655
    def _exc():
1656
      raise errors.GenericError("This is a test")
1657

    
1658
    self.assertRaises(errors.GenericError,
1659
                      utils.RunInSeparateProcess, _exc)
1660

    
1661

    
1662
class TestFingerprintFile(unittest.TestCase):
1663
  def setUp(self):
1664
    self.tmpfile = tempfile.NamedTemporaryFile()
1665

    
1666
  def test(self):
1667
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1668
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1669

    
1670
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1671
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1672
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1673

    
1674

    
1675
class TestUnescapeAndSplit(unittest.TestCase):
1676
  """Testing case for UnescapeAndSplit"""
1677

    
1678
  def setUp(self):
1679
    # testing more that one separator for regexp safety
1680
    self._seps = [",", "+", "."]
1681

    
1682
  def testSimple(self):
1683
    a = ["a", "b", "c", "d"]
1684
    for sep in self._seps:
1685
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1686

    
1687
  def testEscape(self):
1688
    for sep in self._seps:
1689
      a = ["a", "b\\" + sep + "c", "d"]
1690
      b = ["a", "b" + sep + "c", "d"]
1691
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1692

    
1693
  def testDoubleEscape(self):
1694
    for sep in self._seps:
1695
      a = ["a", "b\\\\", "c", "d"]
1696
      b = ["a", "b\\", "c", "d"]
1697
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1698

    
1699
  def testThreeEscape(self):
1700
    for sep in self._seps:
1701
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1702
      b = ["a", "b\\" + sep + "c", "d"]
1703
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1704

    
1705

    
1706
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1707
  def setUp(self):
1708
    self.tmpdir = tempfile.mkdtemp()
1709

    
1710
  def tearDown(self):
1711
    shutil.rmtree(self.tmpdir)
1712

    
1713
  def _checkRsaPrivateKey(self, key):
1714
    lines = key.splitlines()
1715
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1716
            "-----END RSA PRIVATE KEY-----" in lines)
1717

    
1718
  def _checkCertificate(self, cert):
1719
    lines = cert.splitlines()
1720
    return ("-----BEGIN CERTIFICATE-----" in lines and
1721
            "-----END CERTIFICATE-----" in lines)
1722

    
1723
  def test(self):
1724
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1725
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1726
      self._checkRsaPrivateKey(key_pem)
1727
      self._checkCertificate(cert_pem)
1728

    
1729
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1730
                                           key_pem)
1731
      self.assert_(key.bits() >= 1024)
1732
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1733
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1734

    
1735
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1736
                                             cert_pem)
1737
      self.failIf(x509.has_expired())
1738
      self.assertEqual(x509.get_issuer().CN, common_name)
1739
      self.assertEqual(x509.get_subject().CN, common_name)
1740
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1741

    
1742
  def testLegacy(self):
1743
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1744

    
1745
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1746

    
1747
    cert1 = utils.ReadFile(cert1_filename)
1748

    
1749
    self.assert_(self._checkRsaPrivateKey(cert1))
1750
    self.assert_(self._checkCertificate(cert1))
1751

    
1752

    
1753
class TestPathJoin(unittest.TestCase):
1754
  """Testing case for PathJoin"""
1755

    
1756
  def testBasicItems(self):
1757
    mlist = ["/a", "b", "c"]
1758
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1759

    
1760
  def testNonAbsPrefix(self):
1761
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1762

    
1763
  def testBackTrack(self):
1764
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1765

    
1766
  def testMultiAbs(self):
1767
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1768

    
1769

    
1770
class TestValidateServiceName(unittest.TestCase):
1771
  def testValid(self):
1772
    testnames = [
1773
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
1774
      "ganeti",
1775
      "gnt-masterd",
1776
      "HELLO_WORLD_SVC",
1777
      "hello.world.1",
1778
      "0", "80", "1111", "65535",
1779
      ]
1780

    
1781
    for name in testnames:
1782
      self.assertEqual(utils.ValidateServiceName(name), name)
1783

    
1784
  def testInvalid(self):
1785
    testnames = [
1786
      -15756, -1, 65536, 133428083,
1787
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1788
      "-8546", "-1", "65536",
1789
      (129 * "A"),
1790
      ]
1791

    
1792
    for name in testnames:
1793
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1794

    
1795

    
1796
class TestParseAsn1Generalizedtime(unittest.TestCase):
1797
  def test(self):
1798
    # UTC
1799
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1800
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1801
                     1266860512)
1802
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1803
                     (2**31) - 1)
1804

    
1805
    # With offset
1806
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1807
                     1266860512)
1808
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1809
                     1266931012)
1810
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1811
                     1266931088)
1812
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1813
                     1266931295)
1814
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1815
                     3600)
1816

    
1817
    # Leap seconds are not supported by datetime.datetime
1818
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1819
                      "19841231235960+0000")
1820
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1821
                      "19920630235960+0000")
1822

    
1823
    # Errors
1824
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1825
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1826
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1827
                      "20100222174152")
1828
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1829
                      "Mon Feb 22 17:47:02 UTC 2010")
1830
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1831
                      "2010-02-22 17:42:02")
1832

    
1833

    
1834
class TestGetX509CertValidity(testutils.GanetiTestCase):
1835
  def setUp(self):
1836
    testutils.GanetiTestCase.setUp(self)
1837

    
1838
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1839

    
1840
    # Test whether we have pyOpenSSL 0.7 or above
1841
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1842

    
1843
    if not self.pyopenssl0_7:
1844
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1845
                    " function correctly")
1846

    
1847
  def _LoadCert(self, name):
1848
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1849
                                           self._ReadTestData(name))
1850

    
1851
  def test(self):
1852
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1853
    if self.pyopenssl0_7:
1854
      self.assertEqual(validity, (1266919967, 1267524767))
1855
    else:
1856
      self.assertEqual(validity, (None, None))
1857

    
1858

    
1859
class TestSignX509Certificate(unittest.TestCase):
1860
  KEY = "My private key!"
1861
  KEY_OTHER = "Another key"
1862

    
1863
  def test(self):
1864
    # Generate certificate valid for 5 minutes
1865
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1866

    
1867
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1868
                                           cert_pem)
1869

    
1870
    # No signature at all
1871
    self.assertRaises(errors.GenericError,
1872
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1873

    
1874
    # Invalid input
1875
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1876
                      "", self.KEY)
1877
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1878
                      "X-Ganeti-Signature: \n", self.KEY)
1879
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1880
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1881
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1882
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1883
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1884
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1885

    
1886
    # Invalid salt
1887
    for salt in list("-_@$,:;/\\ \t\n"):
1888
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1889
                        cert_pem, self.KEY, "foo%sbar" % salt)
1890

    
1891
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
1892
                 utils.GenerateSecret(numbytes=4),
1893
                 utils.GenerateSecret(numbytes=16),
1894
                 "{123:456}".encode("hex")]:
1895
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1896

    
1897
      self._Check(cert, salt, signed_pem)
1898

    
1899
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1900
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1901
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1902
                               "lines----\n------ at\nthe end!"))
1903

    
1904
  def _Check(self, cert, salt, pem):
1905
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1906
    self.assertEqual(salt, salt2)
1907
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1908

    
1909
    # Other key
1910
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1911
                      pem, self.KEY_OTHER)
1912

    
1913

    
1914
class TestMakedirs(unittest.TestCase):
1915
  def setUp(self):
1916
    self.tmpdir = tempfile.mkdtemp()
1917

    
1918
  def tearDown(self):
1919
    shutil.rmtree(self.tmpdir)
1920

    
1921
  def testNonExisting(self):
1922
    path = PathJoin(self.tmpdir, "foo")
1923
    utils.Makedirs(path)
1924
    self.assert_(os.path.isdir(path))
1925

    
1926
  def testExisting(self):
1927
    path = PathJoin(self.tmpdir, "foo")
1928
    os.mkdir(path)
1929
    utils.Makedirs(path)
1930
    self.assert_(os.path.isdir(path))
1931

    
1932
  def testRecursiveNonExisting(self):
1933
    path = PathJoin(self.tmpdir, "foo/bar/baz")
1934
    utils.Makedirs(path)
1935
    self.assert_(os.path.isdir(path))
1936

    
1937
  def testRecursiveExisting(self):
1938
    path = PathJoin(self.tmpdir, "B/moo/xyz")
1939
    self.assertFalse(os.path.exists(path))
1940
    os.mkdir(PathJoin(self.tmpdir, "B"))
1941
    utils.Makedirs(path)
1942
    self.assert_(os.path.isdir(path))
1943

    
1944

    
1945
class TestRetry(testutils.GanetiTestCase):
1946
  def setUp(self):
1947
    testutils.GanetiTestCase.setUp(self)
1948
    self.retries = 0
1949

    
1950
  @staticmethod
1951
  def _RaiseRetryAgain():
1952
    raise utils.RetryAgain()
1953

    
1954
  @staticmethod
1955
  def _RaiseRetryAgainWithArg(args):
1956
    raise utils.RetryAgain(*args)
1957

    
1958
  def _WrongNestedLoop(self):
1959
    return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
1960

    
1961
  def _RetryAndSucceed(self, retries):
1962
    if self.retries < retries:
1963
      self.retries += 1
1964
      raise utils.RetryAgain()
1965
    else:
1966
      return True
1967

    
1968
  def testRaiseTimeout(self):
1969
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1970
                          self._RaiseRetryAgain, 0.01, 0.02)
1971
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1972
                          self._RetryAndSucceed, 0.01, 0, args=[1])
1973
    self.failUnlessEqual(self.retries, 1)
1974

    
1975
  def testComplete(self):
1976
    self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
1977
    self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
1978
                         True)
1979
    self.failUnlessEqual(self.retries, 2)
1980

    
1981
  def testNestedLoop(self):
1982
    try:
1983
      self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
1984
                            self._WrongNestedLoop, 0, 1)
1985
    except utils.RetryTimeout:
1986
      self.fail("Didn't detect inner loop's exception")
1987

    
1988
  def testTimeoutArgument(self):
1989
    retry_arg="my_important_debugging_message"
1990
    try:
1991
      utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
1992
    except utils.RetryTimeout, err:
1993
      self.failUnlessEqual(err.args, (retry_arg, ))
1994
    else:
1995
      self.fail("Expected timeout didn't happen")
1996

    
1997
  def testRaiseInnerWithExc(self):
1998
    retry_arg="my_important_debugging_message"
1999
    try:
2000
      try:
2001
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2002
                    args=[[errors.GenericError(retry_arg, retry_arg)]])
2003
      except utils.RetryTimeout, err:
2004
        err.RaiseInner()
2005
      else:
2006
        self.fail("Expected timeout didn't happen")
2007
    except errors.GenericError, err:
2008
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2009
    else:
2010
      self.fail("Expected GenericError didn't happen")
2011

    
2012
  def testRaiseInnerWithMsg(self):
2013
    retry_arg="my_important_debugging_message"
2014
    try:
2015
      try:
2016
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2017
                    args=[[retry_arg, retry_arg]])
2018
      except utils.RetryTimeout, err:
2019
        err.RaiseInner()
2020
      else:
2021
        self.fail("Expected timeout didn't happen")
2022
    except utils.RetryTimeout, err:
2023
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2024
    else:
2025
      self.fail("Expected RetryTimeout didn't happen")
2026

    
2027

    
2028
class TestLineSplitter(unittest.TestCase):
2029
  def test(self):
2030
    lines = []
2031
    ls = utils.LineSplitter(lines.append)
2032
    ls.write("Hello World\n")
2033
    self.assertEqual(lines, [])
2034
    ls.write("Foo\n Bar\r\n ")
2035
    ls.write("Baz")
2036
    ls.write("Moo")
2037
    self.assertEqual(lines, [])
2038
    ls.flush()
2039
    self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
2040
    ls.close()
2041
    self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
2042

    
2043
  def _testExtra(self, line, all_lines, p1, p2):
2044
    self.assertEqual(p1, 999)
2045
    self.assertEqual(p2, "extra")
2046
    all_lines.append(line)
2047

    
2048
  def testExtraArgsNoFlush(self):
2049
    lines = []
2050
    ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
2051
    ls.write("\n\nHello World\n")
2052
    ls.write("Foo\n Bar\r\n ")
2053
    ls.write("")
2054
    ls.write("Baz")
2055
    ls.write("Moo\n\nx\n")
2056
    self.assertEqual(lines, [])
2057
    ls.close()
2058
    self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2059
                             "", "x"])
2060

    
2061

    
2062
class TestReadLockedPidFile(unittest.TestCase):
2063
  def setUp(self):
2064
    self.tmpdir = tempfile.mkdtemp()
2065

    
2066
  def tearDown(self):
2067
    shutil.rmtree(self.tmpdir)
2068

    
2069
  def testNonExistent(self):
2070
    path = PathJoin(self.tmpdir, "nonexist")
2071
    self.assert_(utils.ReadLockedPidFile(path) is None)
2072

    
2073
  def testUnlocked(self):
2074
    path = PathJoin(self.tmpdir, "pid")
2075
    utils.WriteFile(path, data="123")
2076
    self.assert_(utils.ReadLockedPidFile(path) is None)
2077

    
2078
  def testLocked(self):
2079
    path = PathJoin(self.tmpdir, "pid")
2080
    utils.WriteFile(path, data="123")
2081

    
2082
    fl = utils.FileLock.Open(path)
2083
    try:
2084
      fl.Exclusive(blocking=True)
2085

    
2086
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
2087
    finally:
2088
      fl.Close()
2089

    
2090
    self.assert_(utils.ReadLockedPidFile(path) is None)
2091

    
2092
  def testError(self):
2093
    path = PathJoin(self.tmpdir, "foobar", "pid")
2094
    utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2095
    # open(2) should return ENOTDIR
2096
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2097

    
2098

    
2099
class TestCertVerification(testutils.GanetiTestCase):
2100
  def setUp(self):
2101
    testutils.GanetiTestCase.setUp(self)
2102

    
2103
    self.tmpdir = tempfile.mkdtemp()
2104

    
2105
  def tearDown(self):
2106
    shutil.rmtree(self.tmpdir)
2107

    
2108
  def testVerifyCertificate(self):
2109
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2110
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2111
                                           cert_pem)
2112

    
2113
    # Not checking return value as this certificate is expired
2114
    utils.VerifyX509Certificate(cert, 30, 7)
2115

    
2116

    
2117
class TestVerifyCertificateInner(unittest.TestCase):
2118
  def test(self):
2119
    vci = utils._VerifyCertificateInner
2120

    
2121
    # Valid
2122
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2123
                     (None, None))
2124

    
2125
    # Not yet valid
2126
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2127
    self.assertEqual(errcode, utils.CERT_WARNING)
2128

    
2129
    # Expiring soon
2130
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2131
    self.assertEqual(errcode, utils.CERT_ERROR)
2132

    
2133
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2134
    self.assertEqual(errcode, utils.CERT_WARNING)
2135

    
2136
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2137
    self.assertEqual(errcode, None)
2138

    
2139
    # Expired
2140
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2141
    self.assertEqual(errcode, utils.CERT_ERROR)
2142

    
2143
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2144
    self.assertEqual(errcode, utils.CERT_ERROR)
2145

    
2146
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2147
    self.assertEqual(errcode, utils.CERT_ERROR)
2148

    
2149
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2150
    self.assertEqual(errcode, utils.CERT_ERROR)
2151

    
2152

    
2153
class TestHmacFunctions(unittest.TestCase):
2154
  # Digests can be checked with "openssl sha1 -hmac $key"
2155
  def testSha1Hmac(self):
2156
    self.assertEqual(utils.Sha1Hmac("", ""),
2157
                     "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2158
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2159
                     "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2160
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2161
                     "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2162

    
2163
    longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2164
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2165
                     "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2166

    
2167
  def testSha1HmacSalt(self):
2168
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2169
                     "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2170
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2171
                     "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2172
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2173
                     "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2174

    
2175
  def testVerifySha1Hmac(self):
2176
    self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2177
                                               "7d64b71fb76370690e1d")))
2178
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2179
                                      ("f904c2476527c6d3e660"
2180
                                       "9ab683c66fa0652cb1dc")))
2181

    
2182
    digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2183
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2184
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2185
                                      digest.lower()))
2186
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2187
                                      digest.upper()))
2188
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2189
                                      digest.title()))
2190

    
2191
  def testVerifySha1HmacSalt(self):
2192
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2193
                                      ("17a4adc34d69c0d367d4"
2194
                                       "ffbef96fd41d4df7a6e8"),
2195
                                      salt="abc9"))
2196
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2197
                                      ("7f264f8114c9066afc9b"
2198
                                       "b7636e1786d996d3cc0d"),
2199
                                      salt="xyz0"))
2200

    
2201

    
2202
class TestIgnoreSignals(unittest.TestCase):
2203
  """Test the IgnoreSignals decorator"""
2204

    
2205
  @staticmethod
2206
  def _Raise(exception):
2207
    raise exception
2208

    
2209
  @staticmethod
2210
  def _Return(rval):
2211
    return rval
2212

    
2213
  def testIgnoreSignals(self):
2214
    sock_err_intr = socket.error(errno.EINTR, "Message")
2215
    sock_err_inval = socket.error(errno.EINVAL, "Message")
2216

    
2217
    env_err_intr = EnvironmentError(errno.EINTR, "Message")
2218
    env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2219

    
2220
    self.assertRaises(socket.error, self._Raise, sock_err_intr)
2221
    self.assertRaises(socket.error, self._Raise, sock_err_inval)
2222
    self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2223
    self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2224

    
2225
    self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2226
    self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2227
    self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2228
                      sock_err_inval)
2229
    self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2230
                      env_err_inval)
2231

    
2232
    self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2233
    self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2234

    
2235

    
2236
class TestEnsureDirs(unittest.TestCase):
2237
  """Tests for EnsureDirs"""
2238

    
2239
  def setUp(self):
2240
    self.dir = tempfile.mkdtemp()
2241
    self.old_umask = os.umask(0777)
2242

    
2243
  def testEnsureDirs(self):
2244
    utils.EnsureDirs([
2245
        (PathJoin(self.dir, "foo"), 0777),
2246
        (PathJoin(self.dir, "bar"), 0000),
2247
        ])
2248
    self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2249
    self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2250

    
2251
  def tearDown(self):
2252
    os.rmdir(PathJoin(self.dir, "foo"))
2253
    os.rmdir(PathJoin(self.dir, "bar"))
2254
    os.rmdir(self.dir)
2255
    os.umask(self.old_umask)
2256

    
2257

    
2258
class TestFormatSeconds(unittest.TestCase):
2259
  def test(self):
2260
    self.assertEqual(utils.FormatSeconds(1), "1s")
2261
    self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2262
    self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2263
    self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2264
    self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2265
    self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2266
    self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2267
    self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2268
    self.assertEqual(utils.FormatSeconds(-1), "-1s")
2269
    self.assertEqual(utils.FormatSeconds(-282), "-282s")
2270
    self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2271

    
2272
  def testFloat(self):
2273
    self.assertEqual(utils.FormatSeconds(1.3), "1s")
2274
    self.assertEqual(utils.FormatSeconds(1.9), "2s")
2275
    self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2276
    self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2277

    
2278

    
2279
class TestIgnoreProcessNotFound(unittest.TestCase):
2280
  @staticmethod
2281
  def _WritePid(fd):
2282
    os.write(fd, str(os.getpid()))
2283
    os.close(fd)
2284
    return True
2285

    
2286
  def test(self):
2287
    (pid_read_fd, pid_write_fd) = os.pipe()
2288

    
2289
    # Start short-lived process which writes its PID to pipe
2290
    self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2291
    os.close(pid_write_fd)
2292

    
2293
    # Read PID from pipe
2294
    pid = int(os.read(pid_read_fd, 1024))
2295
    os.close(pid_read_fd)
2296

    
2297
    # Try to send signal to process which exited recently
2298
    self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2299

    
2300

    
2301
class TestShellWriter(unittest.TestCase):
2302
  def test(self):
2303
    buf = StringIO()
2304
    sw = utils.ShellWriter(buf)
2305
    sw.Write("#!/bin/bash")
2306
    sw.Write("if true; then")
2307
    sw.IncIndent()
2308
    try:
2309
      sw.Write("echo true")
2310

    
2311
      sw.Write("for i in 1 2 3")
2312
      sw.Write("do")
2313
      sw.IncIndent()
2314
      try:
2315
        self.assertEqual(sw._indent, 2)
2316
        sw.Write("date")
2317
      finally:
2318
        sw.DecIndent()
2319
      sw.Write("done")
2320
    finally:
2321
      sw.DecIndent()
2322
    sw.Write("echo %s", utils.ShellQuote("Hello World"))
2323
    sw.Write("exit 0")
2324

    
2325
    self.assertEqual(sw._indent, 0)
2326

    
2327
    output = buf.getvalue()
2328

    
2329
    self.assert_(output.endswith("\n"))
2330

    
2331
    lines = output.splitlines()
2332
    self.assertEqual(len(lines), 9)
2333
    self.assertEqual(lines[0], "#!/bin/bash")
2334
    self.assert_(re.match(r"^\s+date$", lines[5]))
2335
    self.assertEqual(lines[7], "echo 'Hello World'")
2336

    
2337
  def testEmpty(self):
2338
    buf = StringIO()
2339
    sw = utils.ShellWriter(buf)
2340
    sw = None
2341
    self.assertEqual(buf.getvalue(), "")
2342

    
2343

    
2344
class TestCommaJoin(unittest.TestCase):
2345
  def test(self):
2346
    self.assertEqual(utils.CommaJoin([]), "")
2347
    self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
2348
    self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
2349
    self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
2350
    self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
2351
                     "Hello, World, 99")
2352

    
2353

    
2354
class TestFindMatch(unittest.TestCase):
2355
  def test(self):
2356
    data = {
2357
      "aaaa": "Four A",
2358
      "bb": {"Two B": True},
2359
      re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
2360
      }
2361

    
2362
    self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
2363
    self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
2364

    
2365
    for i in ["foo", "bar", "bazX"]:
2366
      for j in range(1, 100, 7):
2367
        self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
2368
                         ((1, 2, 3), [i, str(j)]))
2369

    
2370
  def testNoMatch(self):
2371
    self.assert_(utils.FindMatch({}, "") is None)
2372
    self.assert_(utils.FindMatch({}, "foo") is None)
2373
    self.assert_(utils.FindMatch({}, 1234) is None)
2374

    
2375
    data = {
2376
      "X": "Hello World",
2377
      re.compile("^(something)$"): "Hello World",
2378
      }
2379

    
2380
    self.assert_(utils.FindMatch(data, "") is None)
2381
    self.assert_(utils.FindMatch(data, "Hello World") is None)
2382

    
2383

    
2384
class TestFileID(testutils.GanetiTestCase):
2385
  def testEquality(self):
2386
    name = self._CreateTempFile()
2387
    oldi = utils.GetFileID(path=name)
2388
    self.failUnless(utils.VerifyFileID(oldi, oldi))
2389

    
2390
  def testUpdate(self):
2391
    name = self._CreateTempFile()
2392
    oldi = utils.GetFileID(path=name)
2393
    os.utime(name, None)
2394
    fd = os.open(name, os.O_RDWR)
2395
    try:
2396
      newi = utils.GetFileID(fd=fd)
2397
      self.failUnless(utils.VerifyFileID(oldi, newi))
2398
      self.failUnless(utils.VerifyFileID(newi, oldi))
2399
    finally:
2400
      os.close(fd)
2401

    
2402
  def testWriteFile(self):
2403
    name = self._CreateTempFile()
2404
    oldi = utils.GetFileID(path=name)
2405
    mtime = oldi[2]
2406
    os.utime(name, (mtime + 10, mtime + 10))
2407
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
2408
                      oldi, data="")
2409
    os.utime(name, (mtime - 10, mtime - 10))
2410
    utils.SafeWriteFile(name, oldi, data="")
2411
    oldi = utils.GetFileID(path=name)
2412
    mtime = oldi[2]
2413
    os.utime(name, (mtime + 10, mtime + 10))
2414
    # this doesn't raise, since we passed None
2415
    utils.SafeWriteFile(name, None, data="")
2416

    
2417

    
2418
class TimeMock:
2419
  def __init__(self, values):
2420
    self.values = values
2421

    
2422
  def __call__(self):
2423
    return self.values.pop(0)
2424

    
2425

    
2426
class TestRunningTimeout(unittest.TestCase):
2427
  def setUp(self):
2428
    self.time_fn = TimeMock([0.0, 0.3, 4.6, 6.5])
2429

    
2430
  def testRemainingFloat(self):
2431
    timeout = utils.RunningTimeout(5.0, True, _time_fn=self.time_fn)
2432
    self.assertAlmostEqual(timeout.Remaining(), 4.7)
2433
    self.assertAlmostEqual(timeout.Remaining(), 0.4)
2434
    self.assertAlmostEqual(timeout.Remaining(), -1.5)
2435

    
2436
  def testRemaining(self):
2437
    self.time_fn = TimeMock([0, 2, 4, 5, 6])
2438
    timeout = utils.RunningTimeout(5, True, _time_fn=self.time_fn)
2439
    self.assertEqual(timeout.Remaining(), 3)
2440
    self.assertEqual(timeout.Remaining(), 1)
2441
    self.assertEqual(timeout.Remaining(), 0)
2442
    self.assertEqual(timeout.Remaining(), -1)
2443

    
2444
  def testRemainingNonNegative(self):
2445
    timeout = utils.RunningTimeout(5.0, False, _time_fn=self.time_fn)
2446
    self.assertAlmostEqual(timeout.Remaining(), 4.7)
2447
    self.assertAlmostEqual(timeout.Remaining(), 0.4)
2448
    self.assertEqual(timeout.Remaining(), 0.0)
2449

    
2450
  def testNegativeTimeout(self):
2451
    self.assertRaises(ValueError, utils.RunningTimeout, -1.0, True)
2452

    
2453

    
2454
if __name__ == '__main__':
2455
  testutils.GanetiTestProgram()