Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ a744b676

History | View | Annotate | Download (75.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import distutils.version
25
import errno
26
import fcntl
27
import glob
28
import os
29
import os.path
30
import re
31
import shutil
32
import signal
33
import socket
34
import stat
35
import string
36
import tempfile
37
import time
38
import unittest
39
import warnings
40
import OpenSSL
41

    
42
import testutils
43
from ganeti import constants
44
from ganeti import compat
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
48
     ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
49
     TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
50
     ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
51

    
52

    
53
class TestIsProcessAlive(unittest.TestCase):
54
  """Testing case for IsProcessAlive"""
55

    
56
  def testExists(self):
57
    mypid = os.getpid()
58
    self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
59

    
60
  def testNotExisting(self):
61
    pid_non_existing = os.fork()
62
    if pid_non_existing == 0:
63
      os._exit(0)
64
    elif pid_non_existing < 0:
65
      raise SystemError("can't fork")
66
    os.waitpid(pid_non_existing, 0)
67
    self.assertFalse(utils.IsProcessAlive(pid_non_existing),
68
                     "nonexisting process detected")
69

    
70

    
71
class TestGetProcStatusPath(unittest.TestCase):
72
  def test(self):
73
    self.assert_("/1234/" in utils._GetProcStatusPath(1234))
74
    self.assertNotEqual(utils._GetProcStatusPath(1),
75
                        utils._GetProcStatusPath(2))
76

    
77

    
78
class TestIsProcessHandlingSignal(unittest.TestCase):
79
  def setUp(self):
80
    self.tmpdir = tempfile.mkdtemp()
81

    
82
  def tearDown(self):
83
    shutil.rmtree(self.tmpdir)
84

    
85
  def testParseSigsetT(self):
86
    self.assertEqual(len(utils._ParseSigsetT("0")), 0)
87
    self.assertEqual(utils._ParseSigsetT("1"), set([1]))
88
    self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
89
    self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
90
    self.assertEqual(utils._ParseSigsetT("0000000180000202"),
91
                     set([2, 10, 32, 33]))
92
    self.assertEqual(utils._ParseSigsetT("0000000180000002"),
93
                     set([2, 32, 33]))
94
    self.assertEqual(utils._ParseSigsetT("0000000188000002"),
95
                     set([2, 28, 32, 33]))
96
    self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
97
                     set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
98
                          24, 25, 26, 28, 31]))
99
    self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
100

    
101
  def testGetProcStatusField(self):
102
    for field in ["SigCgt", "Name", "FDSize"]:
103
      for value in ["", "0", "cat", "  1234 KB"]:
104
        pstatus = "\n".join([
105
          "VmPeak: 999 kB",
106
          "%s: %s" % (field, value),
107
          "TracerPid: 0",
108
          ])
109
        result = utils._GetProcStatusField(pstatus, field)
110
        self.assertEqual(result, value.strip())
111

    
112
  def test(self):
113
    sp = PathJoin(self.tmpdir, "status")
114

    
115
    utils.WriteFile(sp, data="\n".join([
116
      "Name:   bash",
117
      "State:  S (sleeping)",
118
      "SleepAVG:       98%",
119
      "Pid:    22250",
120
      "PPid:   10858",
121
      "TracerPid:      0",
122
      "SigBlk: 0000000000010000",
123
      "SigIgn: 0000000000384004",
124
      "SigCgt: 000000004b813efb",
125
      "CapEff: 0000000000000000",
126
      ]))
127

    
128
    self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
129

    
130
  def testNoSigCgt(self):
131
    sp = PathJoin(self.tmpdir, "status")
132

    
133
    utils.WriteFile(sp, data="\n".join([
134
      "Name:   bash",
135
      ]))
136

    
137
    self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
138
                      1234, 10, status_path=sp)
139

    
140
  def testNoSuchFile(self):
141
    sp = PathJoin(self.tmpdir, "notexist")
142

    
143
    self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
144

    
145
  @staticmethod
146
  def _TestRealProcess():
147
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
148
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
149
      raise Exception("SIGUSR1 is handled when it should not be")
150

    
151
    signal.signal(signal.SIGUSR1, lambda signum, frame: None)
152
    if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
153
      raise Exception("SIGUSR1 is not handled when it should be")
154

    
155
    signal.signal(signal.SIGUSR1, signal.SIG_IGN)
156
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
157
      raise Exception("SIGUSR1 is not handled when it should be")
158

    
159
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
160
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
161
      raise Exception("SIGUSR1 is handled when it should not be")
162

    
163
    return True
164

    
165
  def testRealProcess(self):
166
    self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
167

    
168

    
169
class TestPidFileFunctions(unittest.TestCase):
170
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
171

    
172
  def setUp(self):
173
    self.dir = tempfile.mkdtemp()
174
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
175
    utils.DaemonPidFileName = self.f_dpn
176

    
177
  def testPidFileFunctions(self):
178
    pid_file = self.f_dpn('test')
179
    utils.WritePidFile('test')
180
    self.failUnless(os.path.exists(pid_file),
181
                    "PID file should have been created")
182
    read_pid = utils.ReadPidFile(pid_file)
183
    self.failUnlessEqual(read_pid, os.getpid())
184
    self.failUnless(utils.IsProcessAlive(read_pid))
185
    self.failUnlessRaises(errors.GenericError, utils.WritePidFile, 'test')
186
    utils.RemovePidFile('test')
187
    self.failIf(os.path.exists(pid_file),
188
                "PID file should not exist anymore")
189
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
190
                         "ReadPidFile should return 0 for missing pid file")
191
    fh = open(pid_file, "w")
192
    fh.write("blah\n")
193
    fh.close()
194
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
195
                         "ReadPidFile should return 0 for invalid pid file")
196
    utils.RemovePidFile('test')
197
    self.failIf(os.path.exists(pid_file),
198
                "PID file should not exist anymore")
199

    
200
  def testKill(self):
201
    pid_file = self.f_dpn('child')
202
    r_fd, w_fd = os.pipe()
203
    new_pid = os.fork()
204
    if new_pid == 0: #child
205
      utils.WritePidFile('child')
206
      os.write(w_fd, 'a')
207
      signal.pause()
208
      os._exit(0)
209
      return
210
    # else we are in the parent
211
    # wait until the child has written the pid file
212
    os.read(r_fd, 1)
213
    read_pid = utils.ReadPidFile(pid_file)
214
    self.failUnlessEqual(read_pid, new_pid)
215
    self.failUnless(utils.IsProcessAlive(new_pid))
216
    utils.KillProcess(new_pid, waitpid=True)
217
    self.failIf(utils.IsProcessAlive(new_pid))
218
    utils.RemovePidFile('child')
219
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
220

    
221
  def tearDown(self):
222
    for name in os.listdir(self.dir):
223
      os.unlink(os.path.join(self.dir, name))
224
    os.rmdir(self.dir)
225

    
226

    
227
class TestRunCmd(testutils.GanetiTestCase):
228
  """Testing case for the RunCmd function"""
229

    
230
  def setUp(self):
231
    testutils.GanetiTestCase.setUp(self)
232
    self.magic = time.ctime() + " ganeti test"
233
    self.fname = self._CreateTempFile()
234

    
235
  def testOk(self):
236
    """Test successful exit code"""
237
    result = RunCmd("/bin/sh -c 'exit 0'")
238
    self.assertEqual(result.exit_code, 0)
239
    self.assertEqual(result.output, "")
240

    
241
  def testFail(self):
242
    """Test fail exit code"""
243
    result = RunCmd("/bin/sh -c 'exit 1'")
244
    self.assertEqual(result.exit_code, 1)
245
    self.assertEqual(result.output, "")
246

    
247
  def testStdout(self):
248
    """Test standard output"""
249
    cmd = 'echo -n "%s"' % self.magic
250
    result = RunCmd("/bin/sh -c '%s'" % cmd)
251
    self.assertEqual(result.stdout, self.magic)
252
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
253
    self.assertEqual(result.output, "")
254
    self.assertFileContent(self.fname, self.magic)
255

    
256
  def testStderr(self):
257
    """Test standard error"""
258
    cmd = 'echo -n "%s"' % self.magic
259
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
260
    self.assertEqual(result.stderr, self.magic)
261
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
262
    self.assertEqual(result.output, "")
263
    self.assertFileContent(self.fname, self.magic)
264

    
265
  def testCombined(self):
266
    """Test combined output"""
267
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
268
    expected = "A" + self.magic + "B" + self.magic
269
    result = RunCmd("/bin/sh -c '%s'" % cmd)
270
    self.assertEqual(result.output, expected)
271
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
272
    self.assertEqual(result.output, "")
273
    self.assertFileContent(self.fname, expected)
274

    
275
  def testSignal(self):
276
    """Test signal"""
277
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
278
    self.assertEqual(result.signal, 15)
279
    self.assertEqual(result.output, "")
280

    
281
  def testListRun(self):
282
    """Test list runs"""
283
    result = RunCmd(["true"])
284
    self.assertEqual(result.signal, None)
285
    self.assertEqual(result.exit_code, 0)
286
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
287
    self.assertEqual(result.signal, None)
288
    self.assertEqual(result.exit_code, 1)
289
    result = RunCmd(["echo", "-n", self.magic])
290
    self.assertEqual(result.signal, None)
291
    self.assertEqual(result.exit_code, 0)
292
    self.assertEqual(result.stdout, self.magic)
293

    
294
  def testFileEmptyOutput(self):
295
    """Test file output"""
296
    result = RunCmd(["true"], output=self.fname)
297
    self.assertEqual(result.signal, None)
298
    self.assertEqual(result.exit_code, 0)
299
    self.assertFileContent(self.fname, "")
300

    
301
  def testLang(self):
302
    """Test locale environment"""
303
    old_env = os.environ.copy()
304
    try:
305
      os.environ["LANG"] = "en_US.UTF-8"
306
      os.environ["LC_ALL"] = "en_US.UTF-8"
307
      result = RunCmd(["locale"])
308
      for line in result.output.splitlines():
309
        key, value = line.split("=", 1)
310
        # Ignore these variables, they're overridden by LC_ALL
311
        if key == "LANG" or key == "LANGUAGE":
312
          continue
313
        self.failIf(value and value != "C" and value != '"C"',
314
            "Variable %s is set to the invalid value '%s'" % (key, value))
315
    finally:
316
      os.environ = old_env
317

    
318
  def testDefaultCwd(self):
319
    """Test default working directory"""
320
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
321

    
322
  def testCwd(self):
323
    """Test default working directory"""
324
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
325
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
326
    cwd = os.getcwd()
327
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
328

    
329
  def testResetEnv(self):
330
    """Test environment reset functionality"""
331
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
332
    self.failUnlessEqual(RunCmd(["env"], reset_env=True,
333
                                env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
334

    
335

    
336
class TestRunParts(unittest.TestCase):
337
  """Testing case for the RunParts function"""
338

    
339
  def setUp(self):
340
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
341

    
342
  def tearDown(self):
343
    shutil.rmtree(self.rundir)
344

    
345
  def testEmpty(self):
346
    """Test on an empty dir"""
347
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
348

    
349
  def testSkipWrongName(self):
350
    """Test that wrong files are skipped"""
351
    fname = os.path.join(self.rundir, "00test.dot")
352
    utils.WriteFile(fname, data="")
353
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
354
    relname = os.path.basename(fname)
355
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
356
                         [(relname, constants.RUNPARTS_SKIP, None)])
357

    
358
  def testSkipNonExec(self):
359
    """Test that non executable files are skipped"""
360
    fname = os.path.join(self.rundir, "00test")
361
    utils.WriteFile(fname, data="")
362
    relname = os.path.basename(fname)
363
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
364
                         [(relname, constants.RUNPARTS_SKIP, None)])
365

    
366
  def testError(self):
367
    """Test error on a broken executable"""
368
    fname = os.path.join(self.rundir, "00test")
369
    utils.WriteFile(fname, data="")
370
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
371
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
372
    self.failUnlessEqual(relname, os.path.basename(fname))
373
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
374
    self.failUnless(error)
375

    
376
  def testSorted(self):
377
    """Test executions are sorted"""
378
    files = []
379
    files.append(os.path.join(self.rundir, "64test"))
380
    files.append(os.path.join(self.rundir, "00test"))
381
    files.append(os.path.join(self.rundir, "42test"))
382

    
383
    for fname in files:
384
      utils.WriteFile(fname, data="")
385

    
386
    results = RunParts(self.rundir, reset_env=True)
387

    
388
    for fname in sorted(files):
389
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
390

    
391
  def testOk(self):
392
    """Test correct execution"""
393
    fname = os.path.join(self.rundir, "00test")
394
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
395
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
396
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
397
    self.failUnlessEqual(relname, os.path.basename(fname))
398
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
399
    self.failUnlessEqual(runresult.stdout, "ciao")
400

    
401
  def testRunFail(self):
402
    """Test correct execution, with run failure"""
403
    fname = os.path.join(self.rundir, "00test")
404
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
405
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
406
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
407
    self.failUnlessEqual(relname, os.path.basename(fname))
408
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
409
    self.failUnlessEqual(runresult.exit_code, 1)
410
    self.failUnless(runresult.failed)
411

    
412
  def testRunMix(self):
413
    files = []
414
    files.append(os.path.join(self.rundir, "00test"))
415
    files.append(os.path.join(self.rundir, "42test"))
416
    files.append(os.path.join(self.rundir, "64test"))
417
    files.append(os.path.join(self.rundir, "99test"))
418

    
419
    files.sort()
420

    
421
    # 1st has errors in execution
422
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
423
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
424

    
425
    # 2nd is skipped
426
    utils.WriteFile(files[1], data="")
427

    
428
    # 3rd cannot execute properly
429
    utils.WriteFile(files[2], data="")
430
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
431

    
432
    # 4th execs
433
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
434
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
435

    
436
    results = RunParts(self.rundir, reset_env=True)
437

    
438
    (relname, status, runresult) = results[0]
439
    self.failUnlessEqual(relname, os.path.basename(files[0]))
440
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
441
    self.failUnlessEqual(runresult.exit_code, 1)
442
    self.failUnless(runresult.failed)
443

    
444
    (relname, status, runresult) = results[1]
445
    self.failUnlessEqual(relname, os.path.basename(files[1]))
446
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
447
    self.failUnlessEqual(runresult, None)
448

    
449
    (relname, status, runresult) = results[2]
450
    self.failUnlessEqual(relname, os.path.basename(files[2]))
451
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
452
    self.failUnless(runresult)
453

    
454
    (relname, status, runresult) = results[3]
455
    self.failUnlessEqual(relname, os.path.basename(files[3]))
456
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
457
    self.failUnlessEqual(runresult.output, "ciao")
458
    self.failUnlessEqual(runresult.exit_code, 0)
459
    self.failUnless(not runresult.failed)
460

    
461

    
462
class TestStartDaemon(testutils.GanetiTestCase):
463
  def setUp(self):
464
    self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
465
    self.tmpfile = os.path.join(self.tmpdir, "test")
466

    
467
  def tearDown(self):
468
    shutil.rmtree(self.tmpdir)
469

    
470
  def testShell(self):
471
    utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
472
    self._wait(self.tmpfile, 60.0, "Hello World")
473

    
474
  def testShellOutput(self):
475
    utils.StartDaemon("echo Hello World", output=self.tmpfile)
476
    self._wait(self.tmpfile, 60.0, "Hello World")
477

    
478
  def testNoShellNoOutput(self):
479
    utils.StartDaemon(["pwd"])
480

    
481
  def testNoShellNoOutputTouch(self):
482
    testfile = os.path.join(self.tmpdir, "check")
483
    self.failIf(os.path.exists(testfile))
484
    utils.StartDaemon(["touch", testfile])
485
    self._wait(testfile, 60.0, "")
486

    
487
  def testNoShellOutput(self):
488
    utils.StartDaemon(["pwd"], output=self.tmpfile)
489
    self._wait(self.tmpfile, 60.0, "/")
490

    
491
  def testNoShellOutputCwd(self):
492
    utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
493
    self._wait(self.tmpfile, 60.0, os.getcwd())
494

    
495
  def testShellEnv(self):
496
    utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
497
                      env={ "GNT_TEST_VAR": "Hello World", })
498
    self._wait(self.tmpfile, 60.0, "Hello World")
499

    
500
  def testNoShellEnv(self):
501
    utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
502
                      env={ "GNT_TEST_VAR": "Hello World", })
503
    self._wait(self.tmpfile, 60.0, "Hello World")
504

    
505
  def testOutputFd(self):
506
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
507
    try:
508
      utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
509
    finally:
510
      os.close(fd)
511
    self._wait(self.tmpfile, 60.0, os.getcwd())
512

    
513
  def testPid(self):
514
    pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
515
    self._wait(self.tmpfile, 60.0, str(pid))
516

    
517
  def testPidFile(self):
518
    pidfile = os.path.join(self.tmpdir, "pid")
519
    checkfile = os.path.join(self.tmpdir, "abort")
520

    
521
    pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
522
                            output=self.tmpfile)
523
    try:
524
      fd = os.open(pidfile, os.O_RDONLY)
525
      try:
526
        # Check file is locked
527
        self.assertRaises(errors.LockError, utils.LockFile, fd)
528

    
529
        pidtext = os.read(fd, 100)
530
      finally:
531
        os.close(fd)
532

    
533
      self.assertEqual(int(pidtext.strip()), pid)
534

    
535
      self.assert_(utils.IsProcessAlive(pid))
536
    finally:
537
      # No matter what happens, kill daemon
538
      utils.KillProcess(pid, timeout=5.0, waitpid=False)
539
      self.failIf(utils.IsProcessAlive(pid))
540

    
541
    self.assertEqual(utils.ReadFile(self.tmpfile), "")
542

    
543
  def _wait(self, path, timeout, expected):
544
    # Due to the asynchronous nature of daemon processes, polling is necessary.
545
    # A timeout makes sure the test doesn't hang forever.
546
    def _CheckFile():
547
      if not (os.path.isfile(path) and
548
              utils.ReadFile(path).strip() == expected):
549
        raise utils.RetryAgain()
550

    
551
    try:
552
      utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
553
    except utils.RetryTimeout:
554
      self.fail("Apparently the daemon didn't run in %s seconds and/or"
555
                " didn't write the correct output" % timeout)
556

    
557
  def testError(self):
558
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
559
                      ["./does-NOT-EXIST/here/0123456789"])
560
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
561
                      ["./does-NOT-EXIST/here/0123456789"],
562
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
563
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
564
                      ["./does-NOT-EXIST/here/0123456789"],
565
                      cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
566
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
567
                      ["./does-NOT-EXIST/here/0123456789"],
568
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
569

    
570
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
571
    try:
572
      self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
573
                        ["./does-NOT-EXIST/here/0123456789"],
574
                        output=self.tmpfile, output_fd=fd)
575
    finally:
576
      os.close(fd)
577

    
578

    
579
class TestSetCloseOnExecFlag(unittest.TestCase):
580
  """Tests for SetCloseOnExecFlag"""
581

    
582
  def setUp(self):
583
    self.tmpfile = tempfile.TemporaryFile()
584

    
585
  def testEnable(self):
586
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
587
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
588
                    fcntl.FD_CLOEXEC)
589

    
590
  def testDisable(self):
591
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
592
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
593
                fcntl.FD_CLOEXEC)
594

    
595

    
596
class TestSetNonblockFlag(unittest.TestCase):
597
  def setUp(self):
598
    self.tmpfile = tempfile.TemporaryFile()
599

    
600
  def testEnable(self):
601
    utils.SetNonblockFlag(self.tmpfile.fileno(), True)
602
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
603
                    os.O_NONBLOCK)
604

    
605
  def testDisable(self):
606
    utils.SetNonblockFlag(self.tmpfile.fileno(), False)
607
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
608
                os.O_NONBLOCK)
609

    
610

    
611
class TestRemoveFile(unittest.TestCase):
612
  """Test case for the RemoveFile function"""
613

    
614
  def setUp(self):
615
    """Create a temp dir and file for each case"""
616
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
617
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
618
    os.close(fd)
619

    
620
  def tearDown(self):
621
    if os.path.exists(self.tmpfile):
622
      os.unlink(self.tmpfile)
623
    os.rmdir(self.tmpdir)
624

    
625
  def testIgnoreDirs(self):
626
    """Test that RemoveFile() ignores directories"""
627
    self.assertEqual(None, RemoveFile(self.tmpdir))
628

    
629
  def testIgnoreNotExisting(self):
630
    """Test that RemoveFile() ignores non-existing files"""
631
    RemoveFile(self.tmpfile)
632
    RemoveFile(self.tmpfile)
633

    
634
  def testRemoveFile(self):
635
    """Test that RemoveFile does remove a file"""
636
    RemoveFile(self.tmpfile)
637
    if os.path.exists(self.tmpfile):
638
      self.fail("File '%s' not removed" % self.tmpfile)
639

    
640
  def testRemoveSymlink(self):
641
    """Test that RemoveFile does remove symlinks"""
642
    symlink = self.tmpdir + "/symlink"
643
    os.symlink("no-such-file", symlink)
644
    RemoveFile(symlink)
645
    if os.path.exists(symlink):
646
      self.fail("File '%s' not removed" % symlink)
647
    os.symlink(self.tmpfile, symlink)
648
    RemoveFile(symlink)
649
    if os.path.exists(symlink):
650
      self.fail("File '%s' not removed" % symlink)
651

    
652

    
653
class TestRename(unittest.TestCase):
654
  """Test case for RenameFile"""
655

    
656
  def setUp(self):
657
    """Create a temporary directory"""
658
    self.tmpdir = tempfile.mkdtemp()
659
    self.tmpfile = os.path.join(self.tmpdir, "test1")
660

    
661
    # Touch the file
662
    open(self.tmpfile, "w").close()
663

    
664
  def tearDown(self):
665
    """Remove temporary directory"""
666
    shutil.rmtree(self.tmpdir)
667

    
668
  def testSimpleRename1(self):
669
    """Simple rename 1"""
670
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
671
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
672

    
673
  def testSimpleRename2(self):
674
    """Simple rename 2"""
675
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
676
                     mkdir=True)
677
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
678

    
679
  def testRenameMkdir(self):
680
    """Rename with mkdir"""
681
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
682
                     mkdir=True)
683
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
684
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
685

    
686
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
687
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
688
                     mkdir=True)
689
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
690
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
691
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
692

    
693

    
694
class TestMatchNameComponent(unittest.TestCase):
695
  """Test case for the MatchNameComponent function"""
696

    
697
  def testEmptyList(self):
698
    """Test that there is no match against an empty list"""
699

    
700
    self.failUnlessEqual(MatchNameComponent("", []), None)
701
    self.failUnlessEqual(MatchNameComponent("test", []), None)
702

    
703
  def testSingleMatch(self):
704
    """Test that a single match is performed correctly"""
705
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
706
    for key in "test2", "test2.example", "test2.example.com":
707
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
708

    
709
  def testMultipleMatches(self):
710
    """Test that a multiple match is returned as None"""
711
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
712
    for key in "test1", "test1.example":
713
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
714

    
715
  def testFullMatch(self):
716
    """Test that a full match is returned correctly"""
717
    key1 = "test1"
718
    key2 = "test1.example"
719
    mlist = [key2, key2 + ".com"]
720
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
721
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
722

    
723
  def testCaseInsensitivePartialMatch(self):
724
    """Test for the case_insensitive keyword"""
725
    mlist = ["test1.example.com", "test2.example.net"]
726
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
727
                     "test2.example.net")
728
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
729
                     "test2.example.net")
730
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
731
                     "test2.example.net")
732
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
733
                     "test2.example.net")
734

    
735

    
736
  def testCaseInsensitiveFullMatch(self):
737
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
738
    # Between the two ts1 a full string match non-case insensitive should work
739
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
740
                     None)
741
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
742
                     "ts1.ex")
743
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
744
                     "ts1.ex")
745
    # Between the two ts2 only case differs, so only case-match works
746
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
747
                     "ts2.ex")
748
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
749
                     "Ts2.ex")
750
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
751
                     None)
752

    
753

    
754
class TestReadFile(testutils.GanetiTestCase):
755

    
756
  def testReadAll(self):
757
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
758
    self.assertEqual(len(data), 814)
759

    
760
    h = compat.md5_hash()
761
    h.update(data)
762
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
763

    
764
  def testReadSize(self):
765
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
766
                          size=100)
767
    self.assertEqual(len(data), 100)
768

    
769
    h = compat.md5_hash()
770
    h.update(data)
771
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
772

    
773
  def testError(self):
774
    self.assertRaises(EnvironmentError, utils.ReadFile,
775
                      "/dev/null/does-not-exist")
776

    
777

    
778
class TestReadOneLineFile(testutils.GanetiTestCase):
779

    
780
  def setUp(self):
781
    testutils.GanetiTestCase.setUp(self)
782

    
783
  def testDefault(self):
784
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
785
    self.assertEqual(len(data), 27)
786
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
787

    
788
  def testNotStrict(self):
789
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
790
    self.assertEqual(len(data), 27)
791
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
792

    
793
  def testStrictFailure(self):
794
    self.assertRaises(errors.GenericError, ReadOneLineFile,
795
                      self._TestDataFilename("cert1.pem"), strict=True)
796

    
797
  def testLongLine(self):
798
    dummydata = (1024 * "Hello World! ")
799
    myfile = self._CreateTempFile()
800
    utils.WriteFile(myfile, data=dummydata)
801
    datastrict = ReadOneLineFile(myfile, strict=True)
802
    datalax = ReadOneLineFile(myfile, strict=False)
803
    self.assertEqual(dummydata, datastrict)
804
    self.assertEqual(dummydata, datalax)
805

    
806
  def testNewline(self):
807
    myfile = self._CreateTempFile()
808
    myline = "myline"
809
    for nl in ["", "\n", "\r\n"]:
810
      dummydata = "%s%s" % (myline, nl)
811
      utils.WriteFile(myfile, data=dummydata)
812
      datalax = ReadOneLineFile(myfile, strict=False)
813
      self.assertEqual(myline, datalax)
814
      datastrict = ReadOneLineFile(myfile, strict=True)
815
      self.assertEqual(myline, datastrict)
816

    
817
  def testWhitespaceAndMultipleLines(self):
818
    myfile = self._CreateTempFile()
819
    for nl in ["", "\n", "\r\n"]:
820
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
821
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
822
        utils.WriteFile(myfile, data=dummydata)
823
        datalax = ReadOneLineFile(myfile, strict=False)
824
        if nl:
825
          self.assert_(set("\r\n") & set(dummydata))
826
          self.assertRaises(errors.GenericError, ReadOneLineFile,
827
                            myfile, strict=True)
828
          explen = len("Foo bar baz ") + len(ws)
829
          self.assertEqual(len(datalax), explen)
830
          self.assertEqual(datalax, dummydata[:explen])
831
          self.assertFalse(set("\r\n") & set(datalax))
832
        else:
833
          datastrict = ReadOneLineFile(myfile, strict=True)
834
          self.assertEqual(dummydata, datastrict)
835
          self.assertEqual(dummydata, datalax)
836

    
837
  def testEmptylines(self):
838
    myfile = self._CreateTempFile()
839
    myline = "myline"
840
    for nl in ["\n", "\r\n"]:
841
      for ol in ["", "otherline"]:
842
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
843
        utils.WriteFile(myfile, data=dummydata)
844
        self.assert_(set("\r\n") & set(dummydata))
845
        datalax = ReadOneLineFile(myfile, strict=False)
846
        self.assertEqual(myline, datalax)
847
        if ol:
848
          self.assertRaises(errors.GenericError, ReadOneLineFile,
849
                            myfile, strict=True)
850
        else:
851
          datastrict = ReadOneLineFile(myfile, strict=True)
852
          self.assertEqual(myline, datastrict)
853

    
854

    
855
class TestTimestampForFilename(unittest.TestCase):
856
  def test(self):
857
    self.assert_("." not in utils.TimestampForFilename())
858
    self.assert_(":" not in utils.TimestampForFilename())
859

    
860

    
861
class TestCreateBackup(testutils.GanetiTestCase):
862
  def setUp(self):
863
    testutils.GanetiTestCase.setUp(self)
864

    
865
    self.tmpdir = tempfile.mkdtemp()
866

    
867
  def tearDown(self):
868
    testutils.GanetiTestCase.tearDown(self)
869

    
870
    shutil.rmtree(self.tmpdir)
871

    
872
  def testEmpty(self):
873
    filename = PathJoin(self.tmpdir, "config.data")
874
    utils.WriteFile(filename, data="")
875
    bname = utils.CreateBackup(filename)
876
    self.assertFileContent(bname, "")
877
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
878
    utils.CreateBackup(filename)
879
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
880
    utils.CreateBackup(filename)
881
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
882

    
883
    fifoname = PathJoin(self.tmpdir, "fifo")
884
    os.mkfifo(fifoname)
885
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
886

    
887
  def testContent(self):
888
    bkpcount = 0
889
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
890
      for rep in [1, 2, 10, 127]:
891
        testdata = data * rep
892

    
893
        filename = PathJoin(self.tmpdir, "test.data_")
894
        utils.WriteFile(filename, data=testdata)
895
        self.assertFileContent(filename, testdata)
896

    
897
        for _ in range(3):
898
          bname = utils.CreateBackup(filename)
899
          bkpcount += 1
900
          self.assertFileContent(bname, testdata)
901
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
902

    
903

    
904
class TestFormatUnit(unittest.TestCase):
905
  """Test case for the FormatUnit function"""
906

    
907
  def testMiB(self):
908
    self.assertEqual(FormatUnit(1, 'h'), '1M')
909
    self.assertEqual(FormatUnit(100, 'h'), '100M')
910
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
911

    
912
    self.assertEqual(FormatUnit(1, 'm'), '1')
913
    self.assertEqual(FormatUnit(100, 'm'), '100')
914
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
915

    
916
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
917
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
918
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
919
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
920

    
921
  def testGiB(self):
922
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
923
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
924
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
925
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
926

    
927
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
928
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
929
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
930
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
931

    
932
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
933
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
934
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
935

    
936
  def testTiB(self):
937
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
938
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
939
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
940

    
941
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
942
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
943
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
944

    
945

    
946
class TestParseUnit(unittest.TestCase):
947
  """Test case for the ParseUnit function"""
948

    
949
  SCALES = (('', 1),
950
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
951
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
952
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
953

    
954
  def testRounding(self):
955
    self.assertEqual(ParseUnit('0'), 0)
956
    self.assertEqual(ParseUnit('1'), 4)
957
    self.assertEqual(ParseUnit('2'), 4)
958
    self.assertEqual(ParseUnit('3'), 4)
959

    
960
    self.assertEqual(ParseUnit('124'), 124)
961
    self.assertEqual(ParseUnit('125'), 128)
962
    self.assertEqual(ParseUnit('126'), 128)
963
    self.assertEqual(ParseUnit('127'), 128)
964
    self.assertEqual(ParseUnit('128'), 128)
965
    self.assertEqual(ParseUnit('129'), 132)
966
    self.assertEqual(ParseUnit('130'), 132)
967

    
968
  def testFloating(self):
969
    self.assertEqual(ParseUnit('0'), 0)
970
    self.assertEqual(ParseUnit('0.5'), 4)
971
    self.assertEqual(ParseUnit('1.75'), 4)
972
    self.assertEqual(ParseUnit('1.99'), 4)
973
    self.assertEqual(ParseUnit('2.00'), 4)
974
    self.assertEqual(ParseUnit('2.01'), 4)
975
    self.assertEqual(ParseUnit('3.99'), 4)
976
    self.assertEqual(ParseUnit('4.00'), 4)
977
    self.assertEqual(ParseUnit('4.01'), 8)
978
    self.assertEqual(ParseUnit('1.5G'), 1536)
979
    self.assertEqual(ParseUnit('1.8G'), 1844)
980
    self.assertEqual(ParseUnit('8.28T'), 8682212)
981

    
982
  def testSuffixes(self):
983
    for sep in ('', ' ', '   ', "\t", "\t "):
984
      for suffix, scale in TestParseUnit.SCALES:
985
        for func in (lambda x: x, str.lower, str.upper):
986
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
987
                           1024 * scale)
988

    
989
  def testInvalidInput(self):
990
    for sep in ('-', '_', ',', 'a'):
991
      for suffix, _ in TestParseUnit.SCALES:
992
        self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
993

    
994
    for suffix, _ in TestParseUnit.SCALES:
995
      self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
996

    
997

    
998
class TestSshKeys(testutils.GanetiTestCase):
999
  """Test case for the AddAuthorizedKey function"""
1000

    
1001
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1002
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
1003
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1004

    
1005
  def setUp(self):
1006
    testutils.GanetiTestCase.setUp(self)
1007
    self.tmpname = self._CreateTempFile()
1008
    handle = open(self.tmpname, 'w')
1009
    try:
1010
      handle.write("%s\n" % TestSshKeys.KEY_A)
1011
      handle.write("%s\n" % TestSshKeys.KEY_B)
1012
    finally:
1013
      handle.close()
1014

    
1015
  def testAddingNewKey(self):
1016
    utils.AddAuthorizedKey(self.tmpname,
1017
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1018

    
1019
    self.assertFileContent(self.tmpname,
1020
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1021
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1022
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1023
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1024

    
1025
  def testAddingAlmostButNotCompletelyTheSameKey(self):
1026
    utils.AddAuthorizedKey(self.tmpname,
1027
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1028

    
1029
    self.assertFileContent(self.tmpname,
1030
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1031
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1032
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1033
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1034

    
1035
  def testAddingExistingKeyWithSomeMoreSpaces(self):
1036
    utils.AddAuthorizedKey(self.tmpname,
1037
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1038

    
1039
    self.assertFileContent(self.tmpname,
1040
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1041
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1042
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1043

    
1044
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
1045
    utils.RemoveAuthorizedKey(self.tmpname,
1046
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1047

    
1048
    self.assertFileContent(self.tmpname,
1049
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1050
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1051

    
1052
  def testRemovingNonExistingKey(self):
1053
    utils.RemoveAuthorizedKey(self.tmpname,
1054
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
1055

    
1056
    self.assertFileContent(self.tmpname,
1057
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1058
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1059
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1060

    
1061

    
1062
class TestEtcHosts(testutils.GanetiTestCase):
1063
  """Test functions modifying /etc/hosts"""
1064

    
1065
  def setUp(self):
1066
    testutils.GanetiTestCase.setUp(self)
1067
    self.tmpname = self._CreateTempFile()
1068
    handle = open(self.tmpname, 'w')
1069
    try:
1070
      handle.write('# This is a test file for /etc/hosts\n')
1071
      handle.write('127.0.0.1\tlocalhost\n')
1072
      handle.write('192.168.1.1 router gw\n')
1073
    finally:
1074
      handle.close()
1075

    
1076
  def testSettingNewIp(self):
1077
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
1078

    
1079
    self.assertFileContent(self.tmpname,
1080
      "# This is a test file for /etc/hosts\n"
1081
      "127.0.0.1\tlocalhost\n"
1082
      "192.168.1.1 router gw\n"
1083
      "1.2.3.4\tmyhost.domain.tld myhost\n")
1084
    self.assertFileMode(self.tmpname, 0644)
1085

    
1086
  def testSettingExistingIp(self):
1087
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
1088
                     ['myhost'])
1089

    
1090
    self.assertFileContent(self.tmpname,
1091
      "# This is a test file for /etc/hosts\n"
1092
      "127.0.0.1\tlocalhost\n"
1093
      "192.168.1.1\tmyhost.domain.tld myhost\n")
1094
    self.assertFileMode(self.tmpname, 0644)
1095

    
1096
  def testSettingDuplicateName(self):
1097
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
1098

    
1099
    self.assertFileContent(self.tmpname,
1100
      "# This is a test file for /etc/hosts\n"
1101
      "127.0.0.1\tlocalhost\n"
1102
      "192.168.1.1 router gw\n"
1103
      "1.2.3.4\tmyhost\n")
1104
    self.assertFileMode(self.tmpname, 0644)
1105

    
1106
  def testRemovingExistingHost(self):
1107
    RemoveEtcHostsEntry(self.tmpname, 'router')
1108

    
1109
    self.assertFileContent(self.tmpname,
1110
      "# This is a test file for /etc/hosts\n"
1111
      "127.0.0.1\tlocalhost\n"
1112
      "192.168.1.1 gw\n")
1113
    self.assertFileMode(self.tmpname, 0644)
1114

    
1115
  def testRemovingSingleExistingHost(self):
1116
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
1117

    
1118
    self.assertFileContent(self.tmpname,
1119
      "# This is a test file for /etc/hosts\n"
1120
      "192.168.1.1 router gw\n")
1121
    self.assertFileMode(self.tmpname, 0644)
1122

    
1123
  def testRemovingNonExistingHost(self):
1124
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
1125

    
1126
    self.assertFileContent(self.tmpname,
1127
      "# This is a test file for /etc/hosts\n"
1128
      "127.0.0.1\tlocalhost\n"
1129
      "192.168.1.1 router gw\n")
1130
    self.assertFileMode(self.tmpname, 0644)
1131

    
1132
  def testRemovingAlias(self):
1133
    RemoveEtcHostsEntry(self.tmpname, 'gw')
1134

    
1135
    self.assertFileContent(self.tmpname,
1136
      "# This is a test file for /etc/hosts\n"
1137
      "127.0.0.1\tlocalhost\n"
1138
      "192.168.1.1 router\n")
1139
    self.assertFileMode(self.tmpname, 0644)
1140

    
1141

    
1142
class TestGetMounts(unittest.TestCase):
1143
  """Test case for GetMounts()."""
1144

    
1145
  TESTDATA = (
1146
    "rootfs /     rootfs rw 0 0\n"
1147
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
1148
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
1149

    
1150
  def setUp(self):
1151
    self.tmpfile = tempfile.NamedTemporaryFile()
1152
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1153

    
1154
  def testGetMounts(self):
1155
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1156
      [
1157
        ("rootfs", "/", "rootfs", "rw"),
1158
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1159
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1160
      ])
1161

    
1162

    
1163
class TestShellQuoting(unittest.TestCase):
1164
  """Test case for shell quoting functions"""
1165

    
1166
  def testShellQuote(self):
1167
    self.assertEqual(ShellQuote('abc'), "abc")
1168
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1169
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1170
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
1171
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1172

    
1173
  def testShellQuoteArgs(self):
1174
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1175
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1176
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1177

    
1178

    
1179
class TestListVisibleFiles(unittest.TestCase):
1180
  """Test case for ListVisibleFiles"""
1181

    
1182
  def setUp(self):
1183
    self.path = tempfile.mkdtemp()
1184

    
1185
  def tearDown(self):
1186
    shutil.rmtree(self.path)
1187

    
1188
  def _CreateFiles(self, files):
1189
    for name in files:
1190
      utils.WriteFile(os.path.join(self.path, name), data="test")
1191

    
1192
  def _test(self, files, expected):
1193
    self._CreateFiles(files)
1194
    found = ListVisibleFiles(self.path)
1195
    self.assertEqual(set(found), set(expected))
1196

    
1197
  def testAllVisible(self):
1198
    files = ["a", "b", "c"]
1199
    expected = files
1200
    self._test(files, expected)
1201

    
1202
  def testNoneVisible(self):
1203
    files = [".a", ".b", ".c"]
1204
    expected = []
1205
    self._test(files, expected)
1206

    
1207
  def testSomeVisible(self):
1208
    files = ["a", "b", ".c"]
1209
    expected = ["a", "b"]
1210
    self._test(files, expected)
1211

    
1212
  def testNonAbsolutePath(self):
1213
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1214

    
1215
  def testNonNormalizedPath(self):
1216
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1217
                          "/bin/../tmp")
1218

    
1219

    
1220
class TestNewUUID(unittest.TestCase):
1221
  """Test case for NewUUID"""
1222

    
1223
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1224
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
1225

    
1226
  def runTest(self):
1227
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
1228

    
1229

    
1230
class TestUniqueSequence(unittest.TestCase):
1231
  """Test case for UniqueSequence"""
1232

    
1233
  def _test(self, input, expected):
1234
    self.assertEqual(utils.UniqueSequence(input), expected)
1235

    
1236
  def runTest(self):
1237
    # Ordered input
1238
    self._test([1, 2, 3], [1, 2, 3])
1239
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1240
    self._test([1, 2, 2, 3], [1, 2, 3])
1241
    self._test([1, 2, 3, 3], [1, 2, 3])
1242

    
1243
    # Unordered input
1244
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1245
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1246

    
1247
    # Strings
1248
    self._test(["a", "a"], ["a"])
1249
    self._test(["a", "b"], ["a", "b"])
1250
    self._test(["a", "b", "a"], ["a", "b"])
1251

    
1252

    
1253
class TestFirstFree(unittest.TestCase):
1254
  """Test case for the FirstFree function"""
1255

    
1256
  def test(self):
1257
    """Test FirstFree"""
1258
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1259
    self.failUnlessEqual(FirstFree([]), None)
1260
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1261
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1262
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1263

    
1264

    
1265
class TestTailFile(testutils.GanetiTestCase):
1266
  """Test case for the TailFile function"""
1267

    
1268
  def testEmpty(self):
1269
    fname = self._CreateTempFile()
1270
    self.failUnlessEqual(TailFile(fname), [])
1271
    self.failUnlessEqual(TailFile(fname, lines=25), [])
1272

    
1273
  def testAllLines(self):
1274
    data = ["test %d" % i for i in range(30)]
1275
    for i in range(30):
1276
      fname = self._CreateTempFile()
1277
      fd = open(fname, "w")
1278
      fd.write("\n".join(data[:i]))
1279
      if i > 0:
1280
        fd.write("\n")
1281
      fd.close()
1282
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1283

    
1284
  def testPartialLines(self):
1285
    data = ["test %d" % i for i in range(30)]
1286
    fname = self._CreateTempFile()
1287
    fd = open(fname, "w")
1288
    fd.write("\n".join(data))
1289
    fd.write("\n")
1290
    fd.close()
1291
    for i in range(1, 30):
1292
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1293

    
1294
  def testBigFile(self):
1295
    data = ["test %d" % i for i in range(30)]
1296
    fname = self._CreateTempFile()
1297
    fd = open(fname, "w")
1298
    fd.write("X" * 1048576)
1299
    fd.write("\n")
1300
    fd.write("\n".join(data))
1301
    fd.write("\n")
1302
    fd.close()
1303
    for i in range(1, 30):
1304
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1305

    
1306

    
1307
class _BaseFileLockTest:
1308
  """Test case for the FileLock class"""
1309

    
1310
  def testSharedNonblocking(self):
1311
    self.lock.Shared(blocking=False)
1312
    self.lock.Close()
1313

    
1314
  def testExclusiveNonblocking(self):
1315
    self.lock.Exclusive(blocking=False)
1316
    self.lock.Close()
1317

    
1318
  def testUnlockNonblocking(self):
1319
    self.lock.Unlock(blocking=False)
1320
    self.lock.Close()
1321

    
1322
  def testSharedBlocking(self):
1323
    self.lock.Shared(blocking=True)
1324
    self.lock.Close()
1325

    
1326
  def testExclusiveBlocking(self):
1327
    self.lock.Exclusive(blocking=True)
1328
    self.lock.Close()
1329

    
1330
  def testUnlockBlocking(self):
1331
    self.lock.Unlock(blocking=True)
1332
    self.lock.Close()
1333

    
1334
  def testSharedExclusiveUnlock(self):
1335
    self.lock.Shared(blocking=False)
1336
    self.lock.Exclusive(blocking=False)
1337
    self.lock.Unlock(blocking=False)
1338
    self.lock.Close()
1339

    
1340
  def testExclusiveSharedUnlock(self):
1341
    self.lock.Exclusive(blocking=False)
1342
    self.lock.Shared(blocking=False)
1343
    self.lock.Unlock(blocking=False)
1344
    self.lock.Close()
1345

    
1346
  def testSimpleTimeout(self):
1347
    # These will succeed on the first attempt, hence a short timeout
1348
    self.lock.Shared(blocking=True, timeout=10.0)
1349
    self.lock.Exclusive(blocking=False, timeout=10.0)
1350
    self.lock.Unlock(blocking=True, timeout=10.0)
1351
    self.lock.Close()
1352

    
1353
  @staticmethod
1354
  def _TryLockInner(filename, shared, blocking):
1355
    lock = utils.FileLock.Open(filename)
1356

    
1357
    if shared:
1358
      fn = lock.Shared
1359
    else:
1360
      fn = lock.Exclusive
1361

    
1362
    try:
1363
      # The timeout doesn't really matter as the parent process waits for us to
1364
      # finish anyway.
1365
      fn(blocking=blocking, timeout=0.01)
1366
    except errors.LockError, err:
1367
      return False
1368

    
1369
    return True
1370

    
1371
  def _TryLock(self, *args):
1372
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1373
                                      *args)
1374

    
1375
  def testTimeout(self):
1376
    for blocking in [True, False]:
1377
      self.lock.Exclusive(blocking=True)
1378
      self.failIf(self._TryLock(False, blocking))
1379
      self.failIf(self._TryLock(True, blocking))
1380

    
1381
      self.lock.Shared(blocking=True)
1382
      self.assert_(self._TryLock(True, blocking))
1383
      self.failIf(self._TryLock(False, blocking))
1384

    
1385
  def testCloseShared(self):
1386
    self.lock.Close()
1387
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1388

    
1389
  def testCloseExclusive(self):
1390
    self.lock.Close()
1391
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1392

    
1393
  def testCloseUnlock(self):
1394
    self.lock.Close()
1395
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1396

    
1397

    
1398
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1399
  TESTDATA = "Hello World\n" * 10
1400

    
1401
  def setUp(self):
1402
    testutils.GanetiTestCase.setUp(self)
1403

    
1404
    self.tmpfile = tempfile.NamedTemporaryFile()
1405
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1406
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1407

    
1408
    # Ensure "Open" didn't truncate file
1409
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1410

    
1411
  def tearDown(self):
1412
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1413

    
1414
    testutils.GanetiTestCase.tearDown(self)
1415

    
1416

    
1417
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1418
  def setUp(self):
1419
    self.tmpfile = tempfile.NamedTemporaryFile()
1420
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1421

    
1422

    
1423
class TestTimeFunctions(unittest.TestCase):
1424
  """Test case for time functions"""
1425

    
1426
  def runTest(self):
1427
    self.assertEqual(utils.SplitTime(1), (1, 0))
1428
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1429
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1430
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1431
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1432
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1433
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1434
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1435

    
1436
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1437

    
1438
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1439
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1440
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1441

    
1442
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1443
                     1218448917.481)
1444
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1445

    
1446
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1447
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1448
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1449
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1450
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1451

    
1452

    
1453
class FieldSetTestCase(unittest.TestCase):
1454
  """Test case for FieldSets"""
1455

    
1456
  def testSimpleMatch(self):
1457
    f = utils.FieldSet("a", "b", "c", "def")
1458
    self.failUnless(f.Matches("a"))
1459
    self.failIf(f.Matches("d"), "Substring matched")
1460
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1461
    self.failIf(f.NonMatching(["b", "c"]))
1462
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1463
    self.failUnless(f.NonMatching(["a", "d"]))
1464

    
1465
  def testRegexMatch(self):
1466
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1467
    self.failUnless(f.Matches("b1"))
1468
    self.failUnless(f.Matches("b99"))
1469
    self.failIf(f.Matches("b/1"))
1470
    self.failIf(f.NonMatching(["b12", "c"]))
1471
    self.failUnless(f.NonMatching(["a", "1"]))
1472

    
1473
class TestForceDictType(unittest.TestCase):
1474
  """Test case for ForceDictType"""
1475

    
1476
  def setUp(self):
1477
    self.key_types = {
1478
      'a': constants.VTYPE_INT,
1479
      'b': constants.VTYPE_BOOL,
1480
      'c': constants.VTYPE_STRING,
1481
      'd': constants.VTYPE_SIZE,
1482
      }
1483

    
1484
  def _fdt(self, dict, allowed_values=None):
1485
    if allowed_values is None:
1486
      utils.ForceDictType(dict, self.key_types)
1487
    else:
1488
      utils.ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1489

    
1490
    return dict
1491

    
1492
  def testSimpleDict(self):
1493
    self.assertEqual(self._fdt({}), {})
1494
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1495
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1496
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1497
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1498
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1499
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1500
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1501
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1502
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1503
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1504
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1505

    
1506
  def testErrors(self):
1507
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1508
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1509
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1510
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1511

    
1512

    
1513
class TestIsNormAbsPath(unittest.TestCase):
1514
  """Testing case for IsNormAbsPath"""
1515

    
1516
  def _pathTestHelper(self, path, result):
1517
    if result:
1518
      self.assert_(utils.IsNormAbsPath(path),
1519
          "Path %s should result absolute and normalized" % path)
1520
    else:
1521
      self.assertFalse(utils.IsNormAbsPath(path),
1522
          "Path %s should not result absolute and normalized" % path)
1523

    
1524
  def testBase(self):
1525
    self._pathTestHelper('/etc', True)
1526
    self._pathTestHelper('/srv', True)
1527
    self._pathTestHelper('etc', False)
1528
    self._pathTestHelper('/etc/../root', False)
1529
    self._pathTestHelper('/etc/', False)
1530

    
1531

    
1532
class TestSafeEncode(unittest.TestCase):
1533
  """Test case for SafeEncode"""
1534

    
1535
  def testAscii(self):
1536
    for txt in [string.digits, string.letters, string.punctuation]:
1537
      self.failUnlessEqual(txt, SafeEncode(txt))
1538

    
1539
  def testDoubleEncode(self):
1540
    for i in range(255):
1541
      txt = SafeEncode(chr(i))
1542
      self.failUnlessEqual(txt, SafeEncode(txt))
1543

    
1544
  def testUnicode(self):
1545
    # 1024 is high enough to catch non-direct ASCII mappings
1546
    for i in range(1024):
1547
      txt = SafeEncode(unichr(i))
1548
      self.failUnlessEqual(txt, SafeEncode(txt))
1549

    
1550

    
1551
class TestFormatTime(unittest.TestCase):
1552
  """Testing case for FormatTime"""
1553

    
1554
  def testNone(self):
1555
    self.failUnlessEqual(FormatTime(None), "N/A")
1556

    
1557
  def testInvalid(self):
1558
    self.failUnlessEqual(FormatTime(()), "N/A")
1559

    
1560
  def testNow(self):
1561
    # tests that we accept time.time input
1562
    FormatTime(time.time())
1563
    # tests that we accept int input
1564
    FormatTime(int(time.time()))
1565

    
1566

    
1567
class RunInSeparateProcess(unittest.TestCase):
1568
  def test(self):
1569
    for exp in [True, False]:
1570
      def _child():
1571
        return exp
1572

    
1573
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1574

    
1575
  def testArgs(self):
1576
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1577
      def _child(carg1, carg2):
1578
        return carg1 == "Foo" and carg2 == arg
1579

    
1580
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1581

    
1582
  def testPid(self):
1583
    parent_pid = os.getpid()
1584

    
1585
    def _check():
1586
      return os.getpid() == parent_pid
1587

    
1588
    self.failIf(utils.RunInSeparateProcess(_check))
1589

    
1590
  def testSignal(self):
1591
    def _kill():
1592
      os.kill(os.getpid(), signal.SIGTERM)
1593

    
1594
    self.assertRaises(errors.GenericError,
1595
                      utils.RunInSeparateProcess, _kill)
1596

    
1597
  def testException(self):
1598
    def _exc():
1599
      raise errors.GenericError("This is a test")
1600

    
1601
    self.assertRaises(errors.GenericError,
1602
                      utils.RunInSeparateProcess, _exc)
1603

    
1604

    
1605
class TestFingerprintFile(unittest.TestCase):
1606
  def setUp(self):
1607
    self.tmpfile = tempfile.NamedTemporaryFile()
1608

    
1609
  def test(self):
1610
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1611
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1612

    
1613
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1614
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1615
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1616

    
1617

    
1618
class TestUnescapeAndSplit(unittest.TestCase):
1619
  """Testing case for UnescapeAndSplit"""
1620

    
1621
  def setUp(self):
1622
    # testing more that one separator for regexp safety
1623
    self._seps = [",", "+", "."]
1624

    
1625
  def testSimple(self):
1626
    a = ["a", "b", "c", "d"]
1627
    for sep in self._seps:
1628
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1629

    
1630
  def testEscape(self):
1631
    for sep in self._seps:
1632
      a = ["a", "b\\" + sep + "c", "d"]
1633
      b = ["a", "b" + sep + "c", "d"]
1634
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1635

    
1636
  def testDoubleEscape(self):
1637
    for sep in self._seps:
1638
      a = ["a", "b\\\\", "c", "d"]
1639
      b = ["a", "b\\", "c", "d"]
1640
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1641

    
1642
  def testThreeEscape(self):
1643
    for sep in self._seps:
1644
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1645
      b = ["a", "b\\" + sep + "c", "d"]
1646
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1647

    
1648

    
1649
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1650
  def setUp(self):
1651
    self.tmpdir = tempfile.mkdtemp()
1652

    
1653
  def tearDown(self):
1654
    shutil.rmtree(self.tmpdir)
1655

    
1656
  def _checkRsaPrivateKey(self, key):
1657
    lines = key.splitlines()
1658
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1659
            "-----END RSA PRIVATE KEY-----" in lines)
1660

    
1661
  def _checkCertificate(self, cert):
1662
    lines = cert.splitlines()
1663
    return ("-----BEGIN CERTIFICATE-----" in lines and
1664
            "-----END CERTIFICATE-----" in lines)
1665

    
1666
  def test(self):
1667
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1668
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1669
      self._checkRsaPrivateKey(key_pem)
1670
      self._checkCertificate(cert_pem)
1671

    
1672
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1673
                                           key_pem)
1674
      self.assert_(key.bits() >= 1024)
1675
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1676
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1677

    
1678
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1679
                                             cert_pem)
1680
      self.failIf(x509.has_expired())
1681
      self.assertEqual(x509.get_issuer().CN, common_name)
1682
      self.assertEqual(x509.get_subject().CN, common_name)
1683
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1684

    
1685
  def testLegacy(self):
1686
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1687

    
1688
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1689

    
1690
    cert1 = utils.ReadFile(cert1_filename)
1691

    
1692
    self.assert_(self._checkRsaPrivateKey(cert1))
1693
    self.assert_(self._checkCertificate(cert1))
1694

    
1695

    
1696
class TestPathJoin(unittest.TestCase):
1697
  """Testing case for PathJoin"""
1698

    
1699
  def testBasicItems(self):
1700
    mlist = ["/a", "b", "c"]
1701
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1702

    
1703
  def testNonAbsPrefix(self):
1704
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1705

    
1706
  def testBackTrack(self):
1707
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1708

    
1709
  def testMultiAbs(self):
1710
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1711

    
1712

    
1713
class TestValidateServiceName(unittest.TestCase):
1714
  def testValid(self):
1715
    testnames = [
1716
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
1717
      "ganeti",
1718
      "gnt-masterd",
1719
      "HELLO_WORLD_SVC",
1720
      "hello.world.1",
1721
      "0", "80", "1111", "65535",
1722
      ]
1723

    
1724
    for name in testnames:
1725
      self.assertEqual(utils.ValidateServiceName(name), name)
1726

    
1727
  def testInvalid(self):
1728
    testnames = [
1729
      -15756, -1, 65536, 133428083,
1730
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1731
      "-8546", "-1", "65536",
1732
      (129 * "A"),
1733
      ]
1734

    
1735
    for name in testnames:
1736
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1737

    
1738

    
1739
class TestParseAsn1Generalizedtime(unittest.TestCase):
1740
  def test(self):
1741
    # UTC
1742
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1743
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1744
                     1266860512)
1745
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1746
                     (2**31) - 1)
1747

    
1748
    # With offset
1749
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1750
                     1266860512)
1751
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1752
                     1266931012)
1753
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1754
                     1266931088)
1755
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1756
                     1266931295)
1757
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1758
                     3600)
1759

    
1760
    # Leap seconds are not supported by datetime.datetime
1761
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1762
                      "19841231235960+0000")
1763
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1764
                      "19920630235960+0000")
1765

    
1766
    # Errors
1767
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1768
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1769
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1770
                      "20100222174152")
1771
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1772
                      "Mon Feb 22 17:47:02 UTC 2010")
1773
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1774
                      "2010-02-22 17:42:02")
1775

    
1776

    
1777
class TestGetX509CertValidity(testutils.GanetiTestCase):
1778
  def setUp(self):
1779
    testutils.GanetiTestCase.setUp(self)
1780

    
1781
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1782

    
1783
    # Test whether we have pyOpenSSL 0.7 or above
1784
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1785

    
1786
    if not self.pyopenssl0_7:
1787
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1788
                    " function correctly")
1789

    
1790
  def _LoadCert(self, name):
1791
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1792
                                           self._ReadTestData(name))
1793

    
1794
  def test(self):
1795
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1796
    if self.pyopenssl0_7:
1797
      self.assertEqual(validity, (1266919967, 1267524767))
1798
    else:
1799
      self.assertEqual(validity, (None, None))
1800

    
1801

    
1802
class TestSignX509Certificate(unittest.TestCase):
1803
  KEY = "My private key!"
1804
  KEY_OTHER = "Another key"
1805

    
1806
  def test(self):
1807
    # Generate certificate valid for 5 minutes
1808
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1809

    
1810
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1811
                                           cert_pem)
1812

    
1813
    # No signature at all
1814
    self.assertRaises(errors.GenericError,
1815
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1816

    
1817
    # Invalid input
1818
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1819
                      "", self.KEY)
1820
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1821
                      "X-Ganeti-Signature: \n", self.KEY)
1822
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1823
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1824
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1825
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1826
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1827
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1828

    
1829
    # Invalid salt
1830
    for salt in list("-_@$,:;/\\ \t\n"):
1831
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1832
                        cert_pem, self.KEY, "foo%sbar" % salt)
1833

    
1834
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
1835
                 utils.GenerateSecret(numbytes=4),
1836
                 utils.GenerateSecret(numbytes=16),
1837
                 "{123:456}".encode("hex")]:
1838
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1839

    
1840
      self._Check(cert, salt, signed_pem)
1841

    
1842
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1843
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1844
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1845
                               "lines----\n------ at\nthe end!"))
1846

    
1847
  def _Check(self, cert, salt, pem):
1848
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1849
    self.assertEqual(salt, salt2)
1850
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1851

    
1852
    # Other key
1853
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1854
                      pem, self.KEY_OTHER)
1855

    
1856

    
1857
class TestMakedirs(unittest.TestCase):
1858
  def setUp(self):
1859
    self.tmpdir = tempfile.mkdtemp()
1860

    
1861
  def tearDown(self):
1862
    shutil.rmtree(self.tmpdir)
1863

    
1864
  def testNonExisting(self):
1865
    path = PathJoin(self.tmpdir, "foo")
1866
    utils.Makedirs(path)
1867
    self.assert_(os.path.isdir(path))
1868

    
1869
  def testExisting(self):
1870
    path = PathJoin(self.tmpdir, "foo")
1871
    os.mkdir(path)
1872
    utils.Makedirs(path)
1873
    self.assert_(os.path.isdir(path))
1874

    
1875
  def testRecursiveNonExisting(self):
1876
    path = PathJoin(self.tmpdir, "foo/bar/baz")
1877
    utils.Makedirs(path)
1878
    self.assert_(os.path.isdir(path))
1879

    
1880
  def testRecursiveExisting(self):
1881
    path = PathJoin(self.tmpdir, "B/moo/xyz")
1882
    self.assertFalse(os.path.exists(path))
1883
    os.mkdir(PathJoin(self.tmpdir, "B"))
1884
    utils.Makedirs(path)
1885
    self.assert_(os.path.isdir(path))
1886

    
1887

    
1888
class TestRetry(testutils.GanetiTestCase):
1889
  def setUp(self):
1890
    testutils.GanetiTestCase.setUp(self)
1891
    self.retries = 0
1892

    
1893
  @staticmethod
1894
  def _RaiseRetryAgain():
1895
    raise utils.RetryAgain()
1896

    
1897
  @staticmethod
1898
  def _RaiseRetryAgainWithArg(args):
1899
    raise utils.RetryAgain(*args)
1900

    
1901
  def _WrongNestedLoop(self):
1902
    return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
1903

    
1904
  def _RetryAndSucceed(self, retries):
1905
    if self.retries < retries:
1906
      self.retries += 1
1907
      raise utils.RetryAgain()
1908
    else:
1909
      return True
1910

    
1911
  def testRaiseTimeout(self):
1912
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1913
                          self._RaiseRetryAgain, 0.01, 0.02)
1914
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1915
                          self._RetryAndSucceed, 0.01, 0, args=[1])
1916
    self.failUnlessEqual(self.retries, 1)
1917

    
1918
  def testComplete(self):
1919
    self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
1920
    self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
1921
                         True)
1922
    self.failUnlessEqual(self.retries, 2)
1923

    
1924
  def testNestedLoop(self):
1925
    try:
1926
      self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
1927
                            self._WrongNestedLoop, 0, 1)
1928
    except utils.RetryTimeout:
1929
      self.fail("Didn't detect inner loop's exception")
1930

    
1931
  def testTimeoutArgument(self):
1932
    retry_arg="my_important_debugging_message"
1933
    try:
1934
      utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
1935
    except utils.RetryTimeout, err:
1936
      self.failUnlessEqual(err.args, (retry_arg, ))
1937
    else:
1938
      self.fail("Expected timeout didn't happen")
1939

    
1940
  def testRaiseInnerWithExc(self):
1941
    retry_arg="my_important_debugging_message"
1942
    try:
1943
      try:
1944
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1945
                    args=[[errors.GenericError(retry_arg, retry_arg)]])
1946
      except utils.RetryTimeout, err:
1947
        err.RaiseInner()
1948
      else:
1949
        self.fail("Expected timeout didn't happen")
1950
    except errors.GenericError, err:
1951
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1952
    else:
1953
      self.fail("Expected GenericError didn't happen")
1954

    
1955
  def testRaiseInnerWithMsg(self):
1956
    retry_arg="my_important_debugging_message"
1957
    try:
1958
      try:
1959
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1960
                    args=[[retry_arg, retry_arg]])
1961
      except utils.RetryTimeout, err:
1962
        err.RaiseInner()
1963
      else:
1964
        self.fail("Expected timeout didn't happen")
1965
    except utils.RetryTimeout, err:
1966
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1967
    else:
1968
      self.fail("Expected RetryTimeout didn't happen")
1969

    
1970

    
1971
class TestLineSplitter(unittest.TestCase):
1972
  def test(self):
1973
    lines = []
1974
    ls = utils.LineSplitter(lines.append)
1975
    ls.write("Hello World\n")
1976
    self.assertEqual(lines, [])
1977
    ls.write("Foo\n Bar\r\n ")
1978
    ls.write("Baz")
1979
    ls.write("Moo")
1980
    self.assertEqual(lines, [])
1981
    ls.flush()
1982
    self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
1983
    ls.close()
1984
    self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
1985

    
1986
  def _testExtra(self, line, all_lines, p1, p2):
1987
    self.assertEqual(p1, 999)
1988
    self.assertEqual(p2, "extra")
1989
    all_lines.append(line)
1990

    
1991
  def testExtraArgsNoFlush(self):
1992
    lines = []
1993
    ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
1994
    ls.write("\n\nHello World\n")
1995
    ls.write("Foo\n Bar\r\n ")
1996
    ls.write("")
1997
    ls.write("Baz")
1998
    ls.write("Moo\n\nx\n")
1999
    self.assertEqual(lines, [])
2000
    ls.close()
2001
    self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2002
                             "", "x"])
2003

    
2004

    
2005
class TestReadLockedPidFile(unittest.TestCase):
2006
  def setUp(self):
2007
    self.tmpdir = tempfile.mkdtemp()
2008

    
2009
  def tearDown(self):
2010
    shutil.rmtree(self.tmpdir)
2011

    
2012
  def testNonExistent(self):
2013
    path = PathJoin(self.tmpdir, "nonexist")
2014
    self.assert_(utils.ReadLockedPidFile(path) is None)
2015

    
2016
  def testUnlocked(self):
2017
    path = PathJoin(self.tmpdir, "pid")
2018
    utils.WriteFile(path, data="123")
2019
    self.assert_(utils.ReadLockedPidFile(path) is None)
2020

    
2021
  def testLocked(self):
2022
    path = PathJoin(self.tmpdir, "pid")
2023
    utils.WriteFile(path, data="123")
2024

    
2025
    fl = utils.FileLock.Open(path)
2026
    try:
2027
      fl.Exclusive(blocking=True)
2028

    
2029
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
2030
    finally:
2031
      fl.Close()
2032

    
2033
    self.assert_(utils.ReadLockedPidFile(path) is None)
2034

    
2035
  def testError(self):
2036
    path = PathJoin(self.tmpdir, "foobar", "pid")
2037
    utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2038
    # open(2) should return ENOTDIR
2039
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2040

    
2041

    
2042
class TestCertVerification(testutils.GanetiTestCase):
2043
  def setUp(self):
2044
    testutils.GanetiTestCase.setUp(self)
2045

    
2046
    self.tmpdir = tempfile.mkdtemp()
2047

    
2048
  def tearDown(self):
2049
    shutil.rmtree(self.tmpdir)
2050

    
2051
  def testVerifyCertificate(self):
2052
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2053
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2054
                                           cert_pem)
2055

    
2056
    # Not checking return value as this certificate is expired
2057
    utils.VerifyX509Certificate(cert, 30, 7)
2058

    
2059

    
2060
class TestVerifyCertificateInner(unittest.TestCase):
2061
  def test(self):
2062
    vci = utils._VerifyCertificateInner
2063

    
2064
    # Valid
2065
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2066
                     (None, None))
2067

    
2068
    # Not yet valid
2069
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2070
    self.assertEqual(errcode, utils.CERT_WARNING)
2071

    
2072
    # Expiring soon
2073
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2074
    self.assertEqual(errcode, utils.CERT_ERROR)
2075

    
2076
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2077
    self.assertEqual(errcode, utils.CERT_WARNING)
2078

    
2079
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2080
    self.assertEqual(errcode, None)
2081

    
2082
    # Expired
2083
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2084
    self.assertEqual(errcode, utils.CERT_ERROR)
2085

    
2086
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2087
    self.assertEqual(errcode, utils.CERT_ERROR)
2088

    
2089
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2090
    self.assertEqual(errcode, utils.CERT_ERROR)
2091

    
2092
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2093
    self.assertEqual(errcode, utils.CERT_ERROR)
2094

    
2095

    
2096
class TestHmacFunctions(unittest.TestCase):
2097
  # Digests can be checked with "openssl sha1 -hmac $key"
2098
  def testSha1Hmac(self):
2099
    self.assertEqual(utils.Sha1Hmac("", ""),
2100
                     "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2101
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2102
                     "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2103
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2104
                     "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2105

    
2106
    longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2107
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2108
                     "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2109

    
2110
  def testSha1HmacSalt(self):
2111
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2112
                     "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2113
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2114
                     "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2115
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2116
                     "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2117

    
2118
  def testVerifySha1Hmac(self):
2119
    self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2120
                                               "7d64b71fb76370690e1d")))
2121
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2122
                                      ("f904c2476527c6d3e660"
2123
                                       "9ab683c66fa0652cb1dc")))
2124

    
2125
    digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2126
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2127
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2128
                                      digest.lower()))
2129
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2130
                                      digest.upper()))
2131
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2132
                                      digest.title()))
2133

    
2134
  def testVerifySha1HmacSalt(self):
2135
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2136
                                      ("17a4adc34d69c0d367d4"
2137
                                       "ffbef96fd41d4df7a6e8"),
2138
                                      salt="abc9"))
2139
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2140
                                      ("7f264f8114c9066afc9b"
2141
                                       "b7636e1786d996d3cc0d"),
2142
                                      salt="xyz0"))
2143

    
2144

    
2145
class TestIgnoreSignals(unittest.TestCase):
2146
  """Test the IgnoreSignals decorator"""
2147

    
2148
  @staticmethod
2149
  def _Raise(exception):
2150
    raise exception
2151

    
2152
  @staticmethod
2153
  def _Return(rval):
2154
    return rval
2155

    
2156
  def testIgnoreSignals(self):
2157
    sock_err_intr = socket.error(errno.EINTR, "Message")
2158
    sock_err_inval = socket.error(errno.EINVAL, "Message")
2159

    
2160
    env_err_intr = EnvironmentError(errno.EINTR, "Message")
2161
    env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2162

    
2163
    self.assertRaises(socket.error, self._Raise, sock_err_intr)
2164
    self.assertRaises(socket.error, self._Raise, sock_err_inval)
2165
    self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2166
    self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2167

    
2168
    self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2169
    self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2170
    self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2171
                      sock_err_inval)
2172
    self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2173
                      env_err_inval)
2174

    
2175
    self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2176
    self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2177

    
2178

    
2179
class TestEnsureDirs(unittest.TestCase):
2180
  """Tests for EnsureDirs"""
2181

    
2182
  def setUp(self):
2183
    self.dir = tempfile.mkdtemp()
2184
    self.old_umask = os.umask(0777)
2185

    
2186
  def testEnsureDirs(self):
2187
    utils.EnsureDirs([
2188
        (PathJoin(self.dir, "foo"), 0777),
2189
        (PathJoin(self.dir, "bar"), 0000),
2190
        ])
2191
    self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2192
    self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2193

    
2194
  def tearDown(self):
2195
    os.rmdir(PathJoin(self.dir, "foo"))
2196
    os.rmdir(PathJoin(self.dir, "bar"))
2197
    os.rmdir(self.dir)
2198
    os.umask(self.old_umask)
2199

    
2200

    
2201
class TestFormatSeconds(unittest.TestCase):
2202
  def test(self):
2203
    self.assertEqual(utils.FormatSeconds(1), "1s")
2204
    self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2205
    self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2206
    self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2207
    self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2208
    self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2209
    self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2210
    self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2211
    self.assertEqual(utils.FormatSeconds(-1), "-1s")
2212
    self.assertEqual(utils.FormatSeconds(-282), "-282s")
2213
    self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2214

    
2215
  def testFloat(self):
2216
    self.assertEqual(utils.FormatSeconds(1.3), "1s")
2217
    self.assertEqual(utils.FormatSeconds(1.9), "2s")
2218
    self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2219
    self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2220

    
2221

    
2222
class RunIgnoreProcessNotFound(unittest.TestCase):
2223
  @staticmethod
2224
  def _WritePid(fd):
2225
    os.write(fd, str(os.getpid()))
2226
    os.close(fd)
2227
    return True
2228

    
2229
  def test(self):
2230
    (pid_read_fd, pid_write_fd) = os.pipe()
2231

    
2232
    # Start short-lived process which writes its PID to pipe
2233
    self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2234
    os.close(pid_write_fd)
2235

    
2236
    # Read PID from pipe
2237
    pid = int(os.read(pid_read_fd, 1024))
2238
    os.close(pid_read_fd)
2239

    
2240
    # Try to send signal to process which exited recently
2241
    self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2242

    
2243

    
2244
if __name__ == '__main__':
2245
  testutils.GanetiTestProgram()