Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 9dc71d5a

History | View | Annotate | Download (75.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import distutils.version
25
import errno
26
import fcntl
27
import glob
28
import os
29
import os.path
30
import re
31
import shutil
32
import signal
33
import socket
34
import stat
35
import string
36
import tempfile
37
import time
38
import unittest
39
import warnings
40
import OpenSSL
41

    
42
import testutils
43
from ganeti import constants
44
from ganeti import compat
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
48
     ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
49
     TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
50
     ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
51

    
52

    
53
class TestIsProcessAlive(unittest.TestCase):
54
  """Testing case for IsProcessAlive"""
55

    
56
  def testExists(self):
57
    mypid = os.getpid()
58
    self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
59

    
60
  def testNotExisting(self):
61
    pid_non_existing = os.fork()
62
    if pid_non_existing == 0:
63
      os._exit(0)
64
    elif pid_non_existing < 0:
65
      raise SystemError("can't fork")
66
    os.waitpid(pid_non_existing, 0)
67
    self.assertFalse(utils.IsProcessAlive(pid_non_existing),
68
                     "nonexisting process detected")
69

    
70

    
71
class TestGetProcStatusPath(unittest.TestCase):
72
  def test(self):
73
    self.assert_("/1234/" in utils._GetProcStatusPath(1234))
74
    self.assertNotEqual(utils._GetProcStatusPath(1),
75
                        utils._GetProcStatusPath(2))
76

    
77

    
78
class TestIsProcessHandlingSignal(unittest.TestCase):
79
  def setUp(self):
80
    self.tmpdir = tempfile.mkdtemp()
81

    
82
  def tearDown(self):
83
    shutil.rmtree(self.tmpdir)
84

    
85
  def testParseSigsetT(self):
86
    self.assertEqual(len(utils._ParseSigsetT("0")), 0)
87
    self.assertEqual(utils._ParseSigsetT("1"), set([1]))
88
    self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
89
    self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
90
    self.assertEqual(utils._ParseSigsetT("0000000180000202"),
91
                     set([2, 10, 32, 33]))
92
    self.assertEqual(utils._ParseSigsetT("0000000180000002"),
93
                     set([2, 32, 33]))
94
    self.assertEqual(utils._ParseSigsetT("0000000188000002"),
95
                     set([2, 28, 32, 33]))
96
    self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
97
                     set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
98
                          24, 25, 26, 28, 31]))
99
    self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
100

    
101
  def testGetProcStatusField(self):
102
    for field in ["SigCgt", "Name", "FDSize"]:
103
      for value in ["", "0", "cat", "  1234 KB"]:
104
        pstatus = "\n".join([
105
          "VmPeak: 999 kB",
106
          "%s: %s" % (field, value),
107
          "TracerPid: 0",
108
          ])
109
        result = utils._GetProcStatusField(pstatus, field)
110
        self.assertEqual(result, value.strip())
111

    
112
  def test(self):
113
    sp = PathJoin(self.tmpdir, "status")
114

    
115
    utils.WriteFile(sp, data="\n".join([
116
      "Name:   bash",
117
      "State:  S (sleeping)",
118
      "SleepAVG:       98%",
119
      "Pid:    22250",
120
      "PPid:   10858",
121
      "TracerPid:      0",
122
      "SigBlk: 0000000000010000",
123
      "SigIgn: 0000000000384004",
124
      "SigCgt: 000000004b813efb",
125
      "CapEff: 0000000000000000",
126
      ]))
127

    
128
    self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
129

    
130
  def testNoSigCgt(self):
131
    sp = PathJoin(self.tmpdir, "status")
132

    
133
    utils.WriteFile(sp, data="\n".join([
134
      "Name:   bash",
135
      ]))
136

    
137
    self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
138
                      1234, 10, status_path=sp)
139

    
140
  def testNoSuchFile(self):
141
    sp = PathJoin(self.tmpdir, "notexist")
142

    
143
    self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
144

    
145
  @staticmethod
146
  def _TestRealProcess():
147
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
148
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
149
      raise Exception("SIGUSR1 is handled when it should not be")
150

    
151
    signal.signal(signal.SIGUSR1, lambda signum, frame: None)
152
    if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
153
      raise Exception("SIGUSR1 is not handled when it should be")
154

    
155
    signal.signal(signal.SIGUSR1, signal.SIG_IGN)
156
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
157
      raise Exception("SIGUSR1 is not handled when it should be")
158

    
159
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
160
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
161
      raise Exception("SIGUSR1 is handled when it should not be")
162

    
163
    return True
164

    
165
  def testRealProcess(self):
166
    self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
167

    
168

    
169
class TestPidFileFunctions(unittest.TestCase):
170
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
171

    
172
  def setUp(self):
173
    self.dir = tempfile.mkdtemp()
174
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
175
    utils.DaemonPidFileName = self.f_dpn
176

    
177
  def testPidFileFunctions(self):
178
    pid_file = self.f_dpn('test')
179
    utils.WritePidFile('test')
180
    self.failUnless(os.path.exists(pid_file),
181
                    "PID file should have been created")
182
    read_pid = utils.ReadPidFile(pid_file)
183
    self.failUnlessEqual(read_pid, os.getpid())
184
    self.failUnless(utils.IsProcessAlive(read_pid))
185
    self.failUnlessRaises(errors.GenericError, utils.WritePidFile, 'test')
186
    utils.RemovePidFile('test')
187
    self.failIf(os.path.exists(pid_file),
188
                "PID file should not exist anymore")
189
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
190
                         "ReadPidFile should return 0 for missing pid file")
191
    fh = open(pid_file, "w")
192
    fh.write("blah\n")
193
    fh.close()
194
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
195
                         "ReadPidFile should return 0 for invalid pid file")
196
    utils.RemovePidFile('test')
197
    self.failIf(os.path.exists(pid_file),
198
                "PID file should not exist anymore")
199

    
200
  def testKill(self):
201
    pid_file = self.f_dpn('child')
202
    r_fd, w_fd = os.pipe()
203
    new_pid = os.fork()
204
    if new_pid == 0: #child
205
      utils.WritePidFile('child')
206
      os.write(w_fd, 'a')
207
      signal.pause()
208
      os._exit(0)
209
      return
210
    # else we are in the parent
211
    # wait until the child has written the pid file
212
    os.read(r_fd, 1)
213
    read_pid = utils.ReadPidFile(pid_file)
214
    self.failUnlessEqual(read_pid, new_pid)
215
    self.failUnless(utils.IsProcessAlive(new_pid))
216
    utils.KillProcess(new_pid, waitpid=True)
217
    self.failIf(utils.IsProcessAlive(new_pid))
218
    utils.RemovePidFile('child')
219
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
220

    
221
  def tearDown(self):
222
    for name in os.listdir(self.dir):
223
      os.unlink(os.path.join(self.dir, name))
224
    os.rmdir(self.dir)
225

    
226

    
227
class TestRunCmd(testutils.GanetiTestCase):
228
  """Testing case for the RunCmd function"""
229

    
230
  def setUp(self):
231
    testutils.GanetiTestCase.setUp(self)
232
    self.magic = time.ctime() + " ganeti test"
233
    self.fname = self._CreateTempFile()
234

    
235
  def testOk(self):
236
    """Test successful exit code"""
237
    result = RunCmd("/bin/sh -c 'exit 0'")
238
    self.assertEqual(result.exit_code, 0)
239
    self.assertEqual(result.output, "")
240

    
241
  def testFail(self):
242
    """Test fail exit code"""
243
    result = RunCmd("/bin/sh -c 'exit 1'")
244
    self.assertEqual(result.exit_code, 1)
245
    self.assertEqual(result.output, "")
246

    
247
  def testStdout(self):
248
    """Test standard output"""
249
    cmd = 'echo -n "%s"' % self.magic
250
    result = RunCmd("/bin/sh -c '%s'" % cmd)
251
    self.assertEqual(result.stdout, self.magic)
252
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
253
    self.assertEqual(result.output, "")
254
    self.assertFileContent(self.fname, self.magic)
255

    
256
  def testStderr(self):
257
    """Test standard error"""
258
    cmd = 'echo -n "%s"' % self.magic
259
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
260
    self.assertEqual(result.stderr, self.magic)
261
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
262
    self.assertEqual(result.output, "")
263
    self.assertFileContent(self.fname, self.magic)
264

    
265
  def testCombined(self):
266
    """Test combined output"""
267
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
268
    expected = "A" + self.magic + "B" + self.magic
269
    result = RunCmd("/bin/sh -c '%s'" % cmd)
270
    self.assertEqual(result.output, expected)
271
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
272
    self.assertEqual(result.output, "")
273
    self.assertFileContent(self.fname, expected)
274

    
275
  def testSignal(self):
276
    """Test signal"""
277
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
278
    self.assertEqual(result.signal, 15)
279
    self.assertEqual(result.output, "")
280

    
281
  def testListRun(self):
282
    """Test list runs"""
283
    result = RunCmd(["true"])
284
    self.assertEqual(result.signal, None)
285
    self.assertEqual(result.exit_code, 0)
286
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
287
    self.assertEqual(result.signal, None)
288
    self.assertEqual(result.exit_code, 1)
289
    result = RunCmd(["echo", "-n", self.magic])
290
    self.assertEqual(result.signal, None)
291
    self.assertEqual(result.exit_code, 0)
292
    self.assertEqual(result.stdout, self.magic)
293

    
294
  def testFileEmptyOutput(self):
295
    """Test file output"""
296
    result = RunCmd(["true"], output=self.fname)
297
    self.assertEqual(result.signal, None)
298
    self.assertEqual(result.exit_code, 0)
299
    self.assertFileContent(self.fname, "")
300

    
301
  def testLang(self):
302
    """Test locale environment"""
303
    old_env = os.environ.copy()
304
    try:
305
      os.environ["LANG"] = "en_US.UTF-8"
306
      os.environ["LC_ALL"] = "en_US.UTF-8"
307
      result = RunCmd(["locale"])
308
      for line in result.output.splitlines():
309
        key, value = line.split("=", 1)
310
        # Ignore these variables, they're overridden by LC_ALL
311
        if key == "LANG" or key == "LANGUAGE":
312
          continue
313
        self.failIf(value and value != "C" and value != '"C"',
314
            "Variable %s is set to the invalid value '%s'" % (key, value))
315
    finally:
316
      os.environ = old_env
317

    
318
  def testDefaultCwd(self):
319
    """Test default working directory"""
320
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
321

    
322
  def testCwd(self):
323
    """Test default working directory"""
324
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
325
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
326
    cwd = os.getcwd()
327
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
328

    
329
  def testResetEnv(self):
330
    """Test environment reset functionality"""
331
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
332
    self.failUnlessEqual(RunCmd(["env"], reset_env=True,
333
                                env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
334

    
335

    
336
class TestRunParts(unittest.TestCase):
337
  """Testing case for the RunParts function"""
338

    
339
  def setUp(self):
340
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
341

    
342
  def tearDown(self):
343
    shutil.rmtree(self.rundir)
344

    
345
  def testEmpty(self):
346
    """Test on an empty dir"""
347
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
348

    
349
  def testSkipWrongName(self):
350
    """Test that wrong files are skipped"""
351
    fname = os.path.join(self.rundir, "00test.dot")
352
    utils.WriteFile(fname, data="")
353
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
354
    relname = os.path.basename(fname)
355
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
356
                         [(relname, constants.RUNPARTS_SKIP, None)])
357

    
358
  def testSkipNonExec(self):
359
    """Test that non executable files are skipped"""
360
    fname = os.path.join(self.rundir, "00test")
361
    utils.WriteFile(fname, data="")
362
    relname = os.path.basename(fname)
363
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
364
                         [(relname, constants.RUNPARTS_SKIP, None)])
365

    
366
  def testError(self):
367
    """Test error on a broken executable"""
368
    fname = os.path.join(self.rundir, "00test")
369
    utils.WriteFile(fname, data="")
370
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
371
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
372
    self.failUnlessEqual(relname, os.path.basename(fname))
373
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
374
    self.failUnless(error)
375

    
376
  def testSorted(self):
377
    """Test executions are sorted"""
378
    files = []
379
    files.append(os.path.join(self.rundir, "64test"))
380
    files.append(os.path.join(self.rundir, "00test"))
381
    files.append(os.path.join(self.rundir, "42test"))
382

    
383
    for fname in files:
384
      utils.WriteFile(fname, data="")
385

    
386
    results = RunParts(self.rundir, reset_env=True)
387

    
388
    for fname in sorted(files):
389
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
390

    
391
  def testOk(self):
392
    """Test correct execution"""
393
    fname = os.path.join(self.rundir, "00test")
394
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
395
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
396
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
397
    self.failUnlessEqual(relname, os.path.basename(fname))
398
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
399
    self.failUnlessEqual(runresult.stdout, "ciao")
400

    
401
  def testRunFail(self):
402
    """Test correct execution, with run failure"""
403
    fname = os.path.join(self.rundir, "00test")
404
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
405
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
406
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
407
    self.failUnlessEqual(relname, os.path.basename(fname))
408
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
409
    self.failUnlessEqual(runresult.exit_code, 1)
410
    self.failUnless(runresult.failed)
411

    
412
  def testRunMix(self):
413
    files = []
414
    files.append(os.path.join(self.rundir, "00test"))
415
    files.append(os.path.join(self.rundir, "42test"))
416
    files.append(os.path.join(self.rundir, "64test"))
417
    files.append(os.path.join(self.rundir, "99test"))
418

    
419
    files.sort()
420

    
421
    # 1st has errors in execution
422
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
423
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
424

    
425
    # 2nd is skipped
426
    utils.WriteFile(files[1], data="")
427

    
428
    # 3rd cannot execute properly
429
    utils.WriteFile(files[2], data="")
430
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
431

    
432
    # 4th execs
433
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
434
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
435

    
436
    results = RunParts(self.rundir, reset_env=True)
437

    
438
    (relname, status, runresult) = results[0]
439
    self.failUnlessEqual(relname, os.path.basename(files[0]))
440
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
441
    self.failUnlessEqual(runresult.exit_code, 1)
442
    self.failUnless(runresult.failed)
443

    
444
    (relname, status, runresult) = results[1]
445
    self.failUnlessEqual(relname, os.path.basename(files[1]))
446
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
447
    self.failUnlessEqual(runresult, None)
448

    
449
    (relname, status, runresult) = results[2]
450
    self.failUnlessEqual(relname, os.path.basename(files[2]))
451
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
452
    self.failUnless(runresult)
453

    
454
    (relname, status, runresult) = results[3]
455
    self.failUnlessEqual(relname, os.path.basename(files[3]))
456
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
457
    self.failUnlessEqual(runresult.output, "ciao")
458
    self.failUnlessEqual(runresult.exit_code, 0)
459
    self.failUnless(not runresult.failed)
460

    
461

    
462
class TestStartDaemon(testutils.GanetiTestCase):
463
  def setUp(self):
464
    self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
465
    self.tmpfile = os.path.join(self.tmpdir, "test")
466

    
467
  def tearDown(self):
468
    shutil.rmtree(self.tmpdir)
469

    
470
  def testShell(self):
471
    utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
472
    self._wait(self.tmpfile, 60.0, "Hello World")
473

    
474
  def testShellOutput(self):
475
    utils.StartDaemon("echo Hello World", output=self.tmpfile)
476
    self._wait(self.tmpfile, 60.0, "Hello World")
477

    
478
  def testNoShellNoOutput(self):
479
    utils.StartDaemon(["pwd"])
480

    
481
  def testNoShellNoOutputTouch(self):
482
    testfile = os.path.join(self.tmpdir, "check")
483
    self.failIf(os.path.exists(testfile))
484
    utils.StartDaemon(["touch", testfile])
485
    self._wait(testfile, 60.0, "")
486

    
487
  def testNoShellOutput(self):
488
    utils.StartDaemon(["pwd"], output=self.tmpfile)
489
    self._wait(self.tmpfile, 60.0, "/")
490

    
491
  def testNoShellOutputCwd(self):
492
    utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
493
    self._wait(self.tmpfile, 60.0, os.getcwd())
494

    
495
  def testShellEnv(self):
496
    utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
497
                      env={ "GNT_TEST_VAR": "Hello World", })
498
    self._wait(self.tmpfile, 60.0, "Hello World")
499

    
500
  def testNoShellEnv(self):
501
    utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
502
                      env={ "GNT_TEST_VAR": "Hello World", })
503
    self._wait(self.tmpfile, 60.0, "Hello World")
504

    
505
  def testOutputFd(self):
506
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
507
    try:
508
      utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
509
    finally:
510
      os.close(fd)
511
    self._wait(self.tmpfile, 60.0, os.getcwd())
512

    
513
  def testPid(self):
514
    pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
515
    self._wait(self.tmpfile, 60.0, str(pid))
516

    
517
  def testPidFile(self):
518
    pidfile = os.path.join(self.tmpdir, "pid")
519
    checkfile = os.path.join(self.tmpdir, "abort")
520

    
521
    pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
522
                            output=self.tmpfile)
523
    try:
524
      fd = os.open(pidfile, os.O_RDONLY)
525
      try:
526
        # Check file is locked
527
        self.assertRaises(errors.LockError, utils.LockFile, fd)
528

    
529
        pidtext = os.read(fd, 100)
530
      finally:
531
        os.close(fd)
532

    
533
      self.assertEqual(int(pidtext.strip()), pid)
534

    
535
      self.assert_(utils.IsProcessAlive(pid))
536
    finally:
537
      # No matter what happens, kill daemon
538
      utils.KillProcess(pid, timeout=5.0, waitpid=False)
539
      self.failIf(utils.IsProcessAlive(pid))
540

    
541
    self.assertEqual(utils.ReadFile(self.tmpfile), "")
542

    
543
  def _wait(self, path, timeout, expected):
544
    # Due to the asynchronous nature of daemon processes, polling is necessary.
545
    # A timeout makes sure the test doesn't hang forever.
546
    def _CheckFile():
547
      if not (os.path.isfile(path) and
548
              utils.ReadFile(path).strip() == expected):
549
        raise utils.RetryAgain()
550

    
551
    try:
552
      utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
553
    except utils.RetryTimeout:
554
      self.fail("Apparently the daemon didn't run in %s seconds and/or"
555
                " didn't write the correct output" % timeout)
556

    
557
  def testError(self):
558
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
559
                      ["./does-NOT-EXIST/here/0123456789"])
560
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
561
                      ["./does-NOT-EXIST/here/0123456789"],
562
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
563
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
564
                      ["./does-NOT-EXIST/here/0123456789"],
565
                      cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
566
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
567
                      ["./does-NOT-EXIST/here/0123456789"],
568
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
569

    
570
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
571
    try:
572
      self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
573
                        ["./does-NOT-EXIST/here/0123456789"],
574
                        output=self.tmpfile, output_fd=fd)
575
    finally:
576
      os.close(fd)
577

    
578

    
579
class TestSetCloseOnExecFlag(unittest.TestCase):
580
  """Tests for SetCloseOnExecFlag"""
581

    
582
  def setUp(self):
583
    self.tmpfile = tempfile.TemporaryFile()
584

    
585
  def testEnable(self):
586
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
587
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
588
                    fcntl.FD_CLOEXEC)
589

    
590
  def testDisable(self):
591
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
592
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
593
                fcntl.FD_CLOEXEC)
594

    
595

    
596
class TestSetNonblockFlag(unittest.TestCase):
597
  def setUp(self):
598
    self.tmpfile = tempfile.TemporaryFile()
599

    
600
  def testEnable(self):
601
    utils.SetNonblockFlag(self.tmpfile.fileno(), True)
602
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
603
                    os.O_NONBLOCK)
604

    
605
  def testDisable(self):
606
    utils.SetNonblockFlag(self.tmpfile.fileno(), False)
607
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
608
                os.O_NONBLOCK)
609

    
610

    
611
class TestRemoveFile(unittest.TestCase):
612
  """Test case for the RemoveFile function"""
613

    
614
  def setUp(self):
615
    """Create a temp dir and file for each case"""
616
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
617
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
618
    os.close(fd)
619

    
620
  def tearDown(self):
621
    if os.path.exists(self.tmpfile):
622
      os.unlink(self.tmpfile)
623
    os.rmdir(self.tmpdir)
624

    
625
  def testIgnoreDirs(self):
626
    """Test that RemoveFile() ignores directories"""
627
    self.assertEqual(None, RemoveFile(self.tmpdir))
628

    
629
  def testIgnoreNotExisting(self):
630
    """Test that RemoveFile() ignores non-existing files"""
631
    RemoveFile(self.tmpfile)
632
    RemoveFile(self.tmpfile)
633

    
634
  def testRemoveFile(self):
635
    """Test that RemoveFile does remove a file"""
636
    RemoveFile(self.tmpfile)
637
    if os.path.exists(self.tmpfile):
638
      self.fail("File '%s' not removed" % self.tmpfile)
639

    
640
  def testRemoveSymlink(self):
641
    """Test that RemoveFile does remove symlinks"""
642
    symlink = self.tmpdir + "/symlink"
643
    os.symlink("no-such-file", symlink)
644
    RemoveFile(symlink)
645
    if os.path.exists(symlink):
646
      self.fail("File '%s' not removed" % symlink)
647
    os.symlink(self.tmpfile, symlink)
648
    RemoveFile(symlink)
649
    if os.path.exists(symlink):
650
      self.fail("File '%s' not removed" % symlink)
651

    
652

    
653
class TestRename(unittest.TestCase):
654
  """Test case for RenameFile"""
655

    
656
  def setUp(self):
657
    """Create a temporary directory"""
658
    self.tmpdir = tempfile.mkdtemp()
659
    self.tmpfile = os.path.join(self.tmpdir, "test1")
660

    
661
    # Touch the file
662
    open(self.tmpfile, "w").close()
663

    
664
  def tearDown(self):
665
    """Remove temporary directory"""
666
    shutil.rmtree(self.tmpdir)
667

    
668
  def testSimpleRename1(self):
669
    """Simple rename 1"""
670
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
671
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
672

    
673
  def testSimpleRename2(self):
674
    """Simple rename 2"""
675
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
676
                     mkdir=True)
677
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
678

    
679
  def testRenameMkdir(self):
680
    """Rename with mkdir"""
681
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
682
                     mkdir=True)
683
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
684
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
685

    
686
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
687
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
688
                     mkdir=True)
689
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
690
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
691
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
692

    
693

    
694
class TestMatchNameComponent(unittest.TestCase):
695
  """Test case for the MatchNameComponent function"""
696

    
697
  def testEmptyList(self):
698
    """Test that there is no match against an empty list"""
699

    
700
    self.failUnlessEqual(MatchNameComponent("", []), None)
701
    self.failUnlessEqual(MatchNameComponent("test", []), None)
702

    
703
  def testSingleMatch(self):
704
    """Test that a single match is performed correctly"""
705
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
706
    for key in "test2", "test2.example", "test2.example.com":
707
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
708

    
709
  def testMultipleMatches(self):
710
    """Test that a multiple match is returned as None"""
711
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
712
    for key in "test1", "test1.example":
713
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
714

    
715
  def testFullMatch(self):
716
    """Test that a full match is returned correctly"""
717
    key1 = "test1"
718
    key2 = "test1.example"
719
    mlist = [key2, key2 + ".com"]
720
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
721
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
722

    
723
  def testCaseInsensitivePartialMatch(self):
724
    """Test for the case_insensitive keyword"""
725
    mlist = ["test1.example.com", "test2.example.net"]
726
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
727
                     "test2.example.net")
728
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
729
                     "test2.example.net")
730
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
731
                     "test2.example.net")
732
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
733
                     "test2.example.net")
734

    
735

    
736
  def testCaseInsensitiveFullMatch(self):
737
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
738
    # Between the two ts1 a full string match non-case insensitive should work
739
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
740
                     None)
741
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
742
                     "ts1.ex")
743
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
744
                     "ts1.ex")
745
    # Between the two ts2 only case differs, so only case-match works
746
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
747
                     "ts2.ex")
748
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
749
                     "Ts2.ex")
750
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
751
                     None)
752

    
753

    
754
class TestReadFile(testutils.GanetiTestCase):
755

    
756
  def testReadAll(self):
757
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
758
    self.assertEqual(len(data), 814)
759

    
760
    h = compat.md5_hash()
761
    h.update(data)
762
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
763

    
764
  def testReadSize(self):
765
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
766
                          size=100)
767
    self.assertEqual(len(data), 100)
768

    
769
    h = compat.md5_hash()
770
    h.update(data)
771
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
772

    
773
  def testError(self):
774
    self.assertRaises(EnvironmentError, utils.ReadFile,
775
                      "/dev/null/does-not-exist")
776

    
777

    
778
class TestReadOneLineFile(testutils.GanetiTestCase):
779

    
780
  def setUp(self):
781
    testutils.GanetiTestCase.setUp(self)
782

    
783
  def testDefault(self):
784
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
785
    self.assertEqual(len(data), 27)
786
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
787

    
788
  def testNotStrict(self):
789
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
790
    self.assertEqual(len(data), 27)
791
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
792

    
793
  def testStrictFailure(self):
794
    self.assertRaises(errors.GenericError, ReadOneLineFile,
795
                      self._TestDataFilename("cert1.pem"), strict=True)
796

    
797
  def testLongLine(self):
798
    dummydata = (1024 * "Hello World! ")
799
    myfile = self._CreateTempFile()
800
    utils.WriteFile(myfile, data=dummydata)
801
    datastrict = ReadOneLineFile(myfile, strict=True)
802
    datalax = ReadOneLineFile(myfile, strict=False)
803
    self.assertEqual(dummydata, datastrict)
804
    self.assertEqual(dummydata, datalax)
805

    
806
  def testNewline(self):
807
    myfile = self._CreateTempFile()
808
    myline = "myline"
809
    for nl in ["", "\n", "\r\n"]:
810
      dummydata = "%s%s" % (myline, nl)
811
      utils.WriteFile(myfile, data=dummydata)
812
      datalax = ReadOneLineFile(myfile, strict=False)
813
      self.assertEqual(myline, datalax)
814
      datastrict = ReadOneLineFile(myfile, strict=True)
815
      self.assertEqual(myline, datastrict)
816

    
817
  def testWhitespaceAndMultipleLines(self):
818
    myfile = self._CreateTempFile()
819
    for nl in ["", "\n", "\r\n"]:
820
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
821
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
822
        utils.WriteFile(myfile, data=dummydata)
823
        datalax = ReadOneLineFile(myfile, strict=False)
824
        if nl:
825
          self.assert_(set("\r\n") & set(dummydata))
826
          self.assertRaises(errors.GenericError, ReadOneLineFile,
827
                            myfile, strict=True)
828
          explen = len("Foo bar baz ") + len(ws)
829
          self.assertEqual(len(datalax), explen)
830
          self.assertEqual(datalax, dummydata[:explen])
831
          self.assertFalse(set("\r\n") & set(datalax))
832
        else:
833
          datastrict = ReadOneLineFile(myfile, strict=True)
834
          self.assertEqual(dummydata, datastrict)
835
          self.assertEqual(dummydata, datalax)
836

    
837
  def testEmptylines(self):
838
    myfile = self._CreateTempFile()
839
    myline = "myline"
840
    for nl in ["\n", "\r\n"]:
841
      for ol in ["", "otherline"]:
842
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
843
        utils.WriteFile(myfile, data=dummydata)
844
        self.assert_(set("\r\n") & set(dummydata))
845
        datalax = ReadOneLineFile(myfile, strict=False)
846
        self.assertEqual(myline, datalax)
847
        if ol:
848
          self.assertRaises(errors.GenericError, ReadOneLineFile,
849
                            myfile, strict=True)
850
        else:
851
          datastrict = ReadOneLineFile(myfile, strict=True)
852
          self.assertEqual(myline, datastrict)
853

    
854

    
855
class TestTimestampForFilename(unittest.TestCase):
856
  def test(self):
857
    self.assert_("." not in utils.TimestampForFilename())
858
    self.assert_(":" not in utils.TimestampForFilename())
859

    
860

    
861
class TestCreateBackup(testutils.GanetiTestCase):
862
  def setUp(self):
863
    testutils.GanetiTestCase.setUp(self)
864

    
865
    self.tmpdir = tempfile.mkdtemp()
866

    
867
  def tearDown(self):
868
    testutils.GanetiTestCase.tearDown(self)
869

    
870
    shutil.rmtree(self.tmpdir)
871

    
872
  def testEmpty(self):
873
    filename = PathJoin(self.tmpdir, "config.data")
874
    utils.WriteFile(filename, data="")
875
    bname = utils.CreateBackup(filename)
876
    self.assertFileContent(bname, "")
877
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
878
    utils.CreateBackup(filename)
879
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
880
    utils.CreateBackup(filename)
881
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
882

    
883
    fifoname = PathJoin(self.tmpdir, "fifo")
884
    os.mkfifo(fifoname)
885
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
886

    
887
  def testContent(self):
888
    bkpcount = 0
889
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
890
      for rep in [1, 2, 10, 127]:
891
        testdata = data * rep
892

    
893
        filename = PathJoin(self.tmpdir, "test.data_")
894
        utils.WriteFile(filename, data=testdata)
895
        self.assertFileContent(filename, testdata)
896

    
897
        for _ in range(3):
898
          bname = utils.CreateBackup(filename)
899
          bkpcount += 1
900
          self.assertFileContent(bname, testdata)
901
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
902

    
903

    
904
class TestFormatUnit(unittest.TestCase):
905
  """Test case for the FormatUnit function"""
906

    
907
  def testMiB(self):
908
    self.assertEqual(FormatUnit(1, 'h'), '1M')
909
    self.assertEqual(FormatUnit(100, 'h'), '100M')
910
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
911

    
912
    self.assertEqual(FormatUnit(1, 'm'), '1')
913
    self.assertEqual(FormatUnit(100, 'm'), '100')
914
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
915

    
916
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
917
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
918
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
919
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
920

    
921
  def testGiB(self):
922
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
923
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
924
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
925
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
926

    
927
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
928
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
929
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
930
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
931

    
932
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
933
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
934
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
935

    
936
  def testTiB(self):
937
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
938
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
939
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
940

    
941
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
942
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
943
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
944

    
945

    
946
class TestParseUnit(unittest.TestCase):
947
  """Test case for the ParseUnit function"""
948

    
949
  SCALES = (('', 1),
950
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
951
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
952
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
953

    
954
  def testRounding(self):
955
    self.assertEqual(ParseUnit('0'), 0)
956
    self.assertEqual(ParseUnit('1'), 4)
957
    self.assertEqual(ParseUnit('2'), 4)
958
    self.assertEqual(ParseUnit('3'), 4)
959

    
960
    self.assertEqual(ParseUnit('124'), 124)
961
    self.assertEqual(ParseUnit('125'), 128)
962
    self.assertEqual(ParseUnit('126'), 128)
963
    self.assertEqual(ParseUnit('127'), 128)
964
    self.assertEqual(ParseUnit('128'), 128)
965
    self.assertEqual(ParseUnit('129'), 132)
966
    self.assertEqual(ParseUnit('130'), 132)
967

    
968
  def testFloating(self):
969
    self.assertEqual(ParseUnit('0'), 0)
970
    self.assertEqual(ParseUnit('0.5'), 4)
971
    self.assertEqual(ParseUnit('1.75'), 4)
972
    self.assertEqual(ParseUnit('1.99'), 4)
973
    self.assertEqual(ParseUnit('2.00'), 4)
974
    self.assertEqual(ParseUnit('2.01'), 4)
975
    self.assertEqual(ParseUnit('3.99'), 4)
976
    self.assertEqual(ParseUnit('4.00'), 4)
977
    self.assertEqual(ParseUnit('4.01'), 8)
978
    self.assertEqual(ParseUnit('1.5G'), 1536)
979
    self.assertEqual(ParseUnit('1.8G'), 1844)
980
    self.assertEqual(ParseUnit('8.28T'), 8682212)
981

    
982
  def testSuffixes(self):
983
    for sep in ('', ' ', '   ', "\t", "\t "):
984
      for suffix, scale in TestParseUnit.SCALES:
985
        for func in (lambda x: x, str.lower, str.upper):
986
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
987
                           1024 * scale)
988

    
989
  def testInvalidInput(self):
990
    for sep in ('-', '_', ',', 'a'):
991
      for suffix, _ in TestParseUnit.SCALES:
992
        self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
993

    
994
    for suffix, _ in TestParseUnit.SCALES:
995
      self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
996

    
997

    
998
class TestSshKeys(testutils.GanetiTestCase):
999
  """Test case for the AddAuthorizedKey function"""
1000

    
1001
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1002
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
1003
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1004

    
1005
  def setUp(self):
1006
    testutils.GanetiTestCase.setUp(self)
1007
    self.tmpname = self._CreateTempFile()
1008
    handle = open(self.tmpname, 'w')
1009
    try:
1010
      handle.write("%s\n" % TestSshKeys.KEY_A)
1011
      handle.write("%s\n" % TestSshKeys.KEY_B)
1012
    finally:
1013
      handle.close()
1014

    
1015
  def testAddingNewKey(self):
1016
    utils.AddAuthorizedKey(self.tmpname,
1017
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1018

    
1019
    self.assertFileContent(self.tmpname,
1020
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1021
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1022
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1023
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1024

    
1025
  def testAddingAlmostButNotCompletelyTheSameKey(self):
1026
    utils.AddAuthorizedKey(self.tmpname,
1027
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1028

    
1029
    self.assertFileContent(self.tmpname,
1030
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1031
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1032
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1033
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1034

    
1035
  def testAddingExistingKeyWithSomeMoreSpaces(self):
1036
    utils.AddAuthorizedKey(self.tmpname,
1037
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1038

    
1039
    self.assertFileContent(self.tmpname,
1040
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1041
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1042
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1043

    
1044
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
1045
    utils.RemoveAuthorizedKey(self.tmpname,
1046
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1047

    
1048
    self.assertFileContent(self.tmpname,
1049
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1050
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1051

    
1052
  def testRemovingNonExistingKey(self):
1053
    utils.RemoveAuthorizedKey(self.tmpname,
1054
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
1055

    
1056
    self.assertFileContent(self.tmpname,
1057
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1058
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1059
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1060

    
1061

    
1062
class TestEtcHosts(testutils.GanetiTestCase):
1063
  """Test functions modifying /etc/hosts"""
1064

    
1065
  def setUp(self):
1066
    testutils.GanetiTestCase.setUp(self)
1067
    self.tmpname = self._CreateTempFile()
1068
    handle = open(self.tmpname, 'w')
1069
    try:
1070
      handle.write('# This is a test file for /etc/hosts\n')
1071
      handle.write('127.0.0.1\tlocalhost\n')
1072
      handle.write('192.0.2.1 router gw\n')
1073
    finally:
1074
      handle.close()
1075

    
1076
  def testSettingNewIp(self):
1077
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
1078
                     ['myhost'])
1079

    
1080
    self.assertFileContent(self.tmpname,
1081
      "# This is a test file for /etc/hosts\n"
1082
      "127.0.0.1\tlocalhost\n"
1083
      "192.0.2.1 router gw\n"
1084
      "198.51.100.4\tmyhost.example.com myhost\n")
1085
    self.assertFileMode(self.tmpname, 0644)
1086

    
1087
  def testSettingExistingIp(self):
1088
    SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
1089
                     ['myhost'])
1090

    
1091
    self.assertFileContent(self.tmpname,
1092
      "# This is a test file for /etc/hosts\n"
1093
      "127.0.0.1\tlocalhost\n"
1094
      "192.0.2.1\tmyhost.example.com myhost\n")
1095
    self.assertFileMode(self.tmpname, 0644)
1096

    
1097
  def testSettingDuplicateName(self):
1098
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
1099

    
1100
    self.assertFileContent(self.tmpname,
1101
      "# This is a test file for /etc/hosts\n"
1102
      "127.0.0.1\tlocalhost\n"
1103
      "192.0.2.1 router gw\n"
1104
      "198.51.100.4\tmyhost\n")
1105
    self.assertFileMode(self.tmpname, 0644)
1106

    
1107
  def testRemovingExistingHost(self):
1108
    RemoveEtcHostsEntry(self.tmpname, 'router')
1109

    
1110
    self.assertFileContent(self.tmpname,
1111
      "# This is a test file for /etc/hosts\n"
1112
      "127.0.0.1\tlocalhost\n"
1113
      "192.0.2.1 gw\n")
1114
    self.assertFileMode(self.tmpname, 0644)
1115

    
1116
  def testRemovingSingleExistingHost(self):
1117
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
1118

    
1119
    self.assertFileContent(self.tmpname,
1120
      "# This is a test file for /etc/hosts\n"
1121
      "192.0.2.1 router gw\n")
1122
    self.assertFileMode(self.tmpname, 0644)
1123

    
1124
  def testRemovingNonExistingHost(self):
1125
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
1126

    
1127
    self.assertFileContent(self.tmpname,
1128
      "# This is a test file for /etc/hosts\n"
1129
      "127.0.0.1\tlocalhost\n"
1130
      "192.0.2.1 router gw\n")
1131
    self.assertFileMode(self.tmpname, 0644)
1132

    
1133
  def testRemovingAlias(self):
1134
    RemoveEtcHostsEntry(self.tmpname, 'gw')
1135

    
1136
    self.assertFileContent(self.tmpname,
1137
      "# This is a test file for /etc/hosts\n"
1138
      "127.0.0.1\tlocalhost\n"
1139
      "192.0.2.1 router\n")
1140
    self.assertFileMode(self.tmpname, 0644)
1141

    
1142

    
1143
class TestGetMounts(unittest.TestCase):
1144
  """Test case for GetMounts()."""
1145

    
1146
  TESTDATA = (
1147
    "rootfs /     rootfs rw 0 0\n"
1148
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
1149
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
1150

    
1151
  def setUp(self):
1152
    self.tmpfile = tempfile.NamedTemporaryFile()
1153
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1154

    
1155
  def testGetMounts(self):
1156
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1157
      [
1158
        ("rootfs", "/", "rootfs", "rw"),
1159
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1160
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1161
      ])
1162

    
1163

    
1164
class TestShellQuoting(unittest.TestCase):
1165
  """Test case for shell quoting functions"""
1166

    
1167
  def testShellQuote(self):
1168
    self.assertEqual(ShellQuote('abc'), "abc")
1169
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1170
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1171
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
1172
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1173

    
1174
  def testShellQuoteArgs(self):
1175
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1176
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1177
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1178

    
1179

    
1180
class TestListVisibleFiles(unittest.TestCase):
1181
  """Test case for ListVisibleFiles"""
1182

    
1183
  def setUp(self):
1184
    self.path = tempfile.mkdtemp()
1185

    
1186
  def tearDown(self):
1187
    shutil.rmtree(self.path)
1188

    
1189
  def _CreateFiles(self, files):
1190
    for name in files:
1191
      utils.WriteFile(os.path.join(self.path, name), data="test")
1192

    
1193
  def _test(self, files, expected):
1194
    self._CreateFiles(files)
1195
    found = ListVisibleFiles(self.path)
1196
    self.assertEqual(set(found), set(expected))
1197

    
1198
  def testAllVisible(self):
1199
    files = ["a", "b", "c"]
1200
    expected = files
1201
    self._test(files, expected)
1202

    
1203
  def testNoneVisible(self):
1204
    files = [".a", ".b", ".c"]
1205
    expected = []
1206
    self._test(files, expected)
1207

    
1208
  def testSomeVisible(self):
1209
    files = ["a", "b", ".c"]
1210
    expected = ["a", "b"]
1211
    self._test(files, expected)
1212

    
1213
  def testNonAbsolutePath(self):
1214
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1215

    
1216
  def testNonNormalizedPath(self):
1217
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1218
                          "/bin/../tmp")
1219

    
1220

    
1221
class TestNewUUID(unittest.TestCase):
1222
  """Test case for NewUUID"""
1223

    
1224
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1225
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
1226

    
1227
  def runTest(self):
1228
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
1229

    
1230

    
1231
class TestUniqueSequence(unittest.TestCase):
1232
  """Test case for UniqueSequence"""
1233

    
1234
  def _test(self, input, expected):
1235
    self.assertEqual(utils.UniqueSequence(input), expected)
1236

    
1237
  def runTest(self):
1238
    # Ordered input
1239
    self._test([1, 2, 3], [1, 2, 3])
1240
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1241
    self._test([1, 2, 2, 3], [1, 2, 3])
1242
    self._test([1, 2, 3, 3], [1, 2, 3])
1243

    
1244
    # Unordered input
1245
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1246
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1247

    
1248
    # Strings
1249
    self._test(["a", "a"], ["a"])
1250
    self._test(["a", "b"], ["a", "b"])
1251
    self._test(["a", "b", "a"], ["a", "b"])
1252

    
1253

    
1254
class TestFirstFree(unittest.TestCase):
1255
  """Test case for the FirstFree function"""
1256

    
1257
  def test(self):
1258
    """Test FirstFree"""
1259
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1260
    self.failUnlessEqual(FirstFree([]), None)
1261
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1262
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1263
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1264

    
1265

    
1266
class TestTailFile(testutils.GanetiTestCase):
1267
  """Test case for the TailFile function"""
1268

    
1269
  def testEmpty(self):
1270
    fname = self._CreateTempFile()
1271
    self.failUnlessEqual(TailFile(fname), [])
1272
    self.failUnlessEqual(TailFile(fname, lines=25), [])
1273

    
1274
  def testAllLines(self):
1275
    data = ["test %d" % i for i in range(30)]
1276
    for i in range(30):
1277
      fname = self._CreateTempFile()
1278
      fd = open(fname, "w")
1279
      fd.write("\n".join(data[:i]))
1280
      if i > 0:
1281
        fd.write("\n")
1282
      fd.close()
1283
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1284

    
1285
  def testPartialLines(self):
1286
    data = ["test %d" % i for i in range(30)]
1287
    fname = self._CreateTempFile()
1288
    fd = open(fname, "w")
1289
    fd.write("\n".join(data))
1290
    fd.write("\n")
1291
    fd.close()
1292
    for i in range(1, 30):
1293
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1294

    
1295
  def testBigFile(self):
1296
    data = ["test %d" % i for i in range(30)]
1297
    fname = self._CreateTempFile()
1298
    fd = open(fname, "w")
1299
    fd.write("X" * 1048576)
1300
    fd.write("\n")
1301
    fd.write("\n".join(data))
1302
    fd.write("\n")
1303
    fd.close()
1304
    for i in range(1, 30):
1305
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1306

    
1307

    
1308
class _BaseFileLockTest:
1309
  """Test case for the FileLock class"""
1310

    
1311
  def testSharedNonblocking(self):
1312
    self.lock.Shared(blocking=False)
1313
    self.lock.Close()
1314

    
1315
  def testExclusiveNonblocking(self):
1316
    self.lock.Exclusive(blocking=False)
1317
    self.lock.Close()
1318

    
1319
  def testUnlockNonblocking(self):
1320
    self.lock.Unlock(blocking=False)
1321
    self.lock.Close()
1322

    
1323
  def testSharedBlocking(self):
1324
    self.lock.Shared(blocking=True)
1325
    self.lock.Close()
1326

    
1327
  def testExclusiveBlocking(self):
1328
    self.lock.Exclusive(blocking=True)
1329
    self.lock.Close()
1330

    
1331
  def testUnlockBlocking(self):
1332
    self.lock.Unlock(blocking=True)
1333
    self.lock.Close()
1334

    
1335
  def testSharedExclusiveUnlock(self):
1336
    self.lock.Shared(blocking=False)
1337
    self.lock.Exclusive(blocking=False)
1338
    self.lock.Unlock(blocking=False)
1339
    self.lock.Close()
1340

    
1341
  def testExclusiveSharedUnlock(self):
1342
    self.lock.Exclusive(blocking=False)
1343
    self.lock.Shared(blocking=False)
1344
    self.lock.Unlock(blocking=False)
1345
    self.lock.Close()
1346

    
1347
  def testSimpleTimeout(self):
1348
    # These will succeed on the first attempt, hence a short timeout
1349
    self.lock.Shared(blocking=True, timeout=10.0)
1350
    self.lock.Exclusive(blocking=False, timeout=10.0)
1351
    self.lock.Unlock(blocking=True, timeout=10.0)
1352
    self.lock.Close()
1353

    
1354
  @staticmethod
1355
  def _TryLockInner(filename, shared, blocking):
1356
    lock = utils.FileLock.Open(filename)
1357

    
1358
    if shared:
1359
      fn = lock.Shared
1360
    else:
1361
      fn = lock.Exclusive
1362

    
1363
    try:
1364
      # The timeout doesn't really matter as the parent process waits for us to
1365
      # finish anyway.
1366
      fn(blocking=blocking, timeout=0.01)
1367
    except errors.LockError, err:
1368
      return False
1369

    
1370
    return True
1371

    
1372
  def _TryLock(self, *args):
1373
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1374
                                      *args)
1375

    
1376
  def testTimeout(self):
1377
    for blocking in [True, False]:
1378
      self.lock.Exclusive(blocking=True)
1379
      self.failIf(self._TryLock(False, blocking))
1380
      self.failIf(self._TryLock(True, blocking))
1381

    
1382
      self.lock.Shared(blocking=True)
1383
      self.assert_(self._TryLock(True, blocking))
1384
      self.failIf(self._TryLock(False, blocking))
1385

    
1386
  def testCloseShared(self):
1387
    self.lock.Close()
1388
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1389

    
1390
  def testCloseExclusive(self):
1391
    self.lock.Close()
1392
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1393

    
1394
  def testCloseUnlock(self):
1395
    self.lock.Close()
1396
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1397

    
1398

    
1399
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1400
  TESTDATA = "Hello World\n" * 10
1401

    
1402
  def setUp(self):
1403
    testutils.GanetiTestCase.setUp(self)
1404

    
1405
    self.tmpfile = tempfile.NamedTemporaryFile()
1406
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1407
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1408

    
1409
    # Ensure "Open" didn't truncate file
1410
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1411

    
1412
  def tearDown(self):
1413
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1414

    
1415
    testutils.GanetiTestCase.tearDown(self)
1416

    
1417

    
1418
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1419
  def setUp(self):
1420
    self.tmpfile = tempfile.NamedTemporaryFile()
1421
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1422

    
1423

    
1424
class TestTimeFunctions(unittest.TestCase):
1425
  """Test case for time functions"""
1426

    
1427
  def runTest(self):
1428
    self.assertEqual(utils.SplitTime(1), (1, 0))
1429
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1430
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1431
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1432
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1433
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1434
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1435
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1436

    
1437
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1438

    
1439
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1440
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1441
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1442

    
1443
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1444
                     1218448917.481)
1445
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1446

    
1447
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1448
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1449
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1450
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1451
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1452

    
1453

    
1454
class FieldSetTestCase(unittest.TestCase):
1455
  """Test case for FieldSets"""
1456

    
1457
  def testSimpleMatch(self):
1458
    f = utils.FieldSet("a", "b", "c", "def")
1459
    self.failUnless(f.Matches("a"))
1460
    self.failIf(f.Matches("d"), "Substring matched")
1461
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1462
    self.failIf(f.NonMatching(["b", "c"]))
1463
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1464
    self.failUnless(f.NonMatching(["a", "d"]))
1465

    
1466
  def testRegexMatch(self):
1467
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1468
    self.failUnless(f.Matches("b1"))
1469
    self.failUnless(f.Matches("b99"))
1470
    self.failIf(f.Matches("b/1"))
1471
    self.failIf(f.NonMatching(["b12", "c"]))
1472
    self.failUnless(f.NonMatching(["a", "1"]))
1473

    
1474
class TestForceDictType(unittest.TestCase):
1475
  """Test case for ForceDictType"""
1476

    
1477
  def setUp(self):
1478
    self.key_types = {
1479
      'a': constants.VTYPE_INT,
1480
      'b': constants.VTYPE_BOOL,
1481
      'c': constants.VTYPE_STRING,
1482
      'd': constants.VTYPE_SIZE,
1483
      }
1484

    
1485
  def _fdt(self, dict, allowed_values=None):
1486
    if allowed_values is None:
1487
      utils.ForceDictType(dict, self.key_types)
1488
    else:
1489
      utils.ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1490

    
1491
    return dict
1492

    
1493
  def testSimpleDict(self):
1494
    self.assertEqual(self._fdt({}), {})
1495
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1496
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1497
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1498
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1499
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1500
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1501
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1502
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1503
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1504
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1505
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1506

    
1507
  def testErrors(self):
1508
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1509
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1510
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1511
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1512

    
1513

    
1514
class TestIsNormAbsPath(unittest.TestCase):
1515
  """Testing case for IsNormAbsPath"""
1516

    
1517
  def _pathTestHelper(self, path, result):
1518
    if result:
1519
      self.assert_(utils.IsNormAbsPath(path),
1520
          "Path %s should result absolute and normalized" % path)
1521
    else:
1522
      self.assertFalse(utils.IsNormAbsPath(path),
1523
          "Path %s should not result absolute and normalized" % path)
1524

    
1525
  def testBase(self):
1526
    self._pathTestHelper('/etc', True)
1527
    self._pathTestHelper('/srv', True)
1528
    self._pathTestHelper('etc', False)
1529
    self._pathTestHelper('/etc/../root', False)
1530
    self._pathTestHelper('/etc/', False)
1531

    
1532

    
1533
class TestSafeEncode(unittest.TestCase):
1534
  """Test case for SafeEncode"""
1535

    
1536
  def testAscii(self):
1537
    for txt in [string.digits, string.letters, string.punctuation]:
1538
      self.failUnlessEqual(txt, SafeEncode(txt))
1539

    
1540
  def testDoubleEncode(self):
1541
    for i in range(255):
1542
      txt = SafeEncode(chr(i))
1543
      self.failUnlessEqual(txt, SafeEncode(txt))
1544

    
1545
  def testUnicode(self):
1546
    # 1024 is high enough to catch non-direct ASCII mappings
1547
    for i in range(1024):
1548
      txt = SafeEncode(unichr(i))
1549
      self.failUnlessEqual(txt, SafeEncode(txt))
1550

    
1551

    
1552
class TestFormatTime(unittest.TestCase):
1553
  """Testing case for FormatTime"""
1554

    
1555
  def testNone(self):
1556
    self.failUnlessEqual(FormatTime(None), "N/A")
1557

    
1558
  def testInvalid(self):
1559
    self.failUnlessEqual(FormatTime(()), "N/A")
1560

    
1561
  def testNow(self):
1562
    # tests that we accept time.time input
1563
    FormatTime(time.time())
1564
    # tests that we accept int input
1565
    FormatTime(int(time.time()))
1566

    
1567

    
1568
class RunInSeparateProcess(unittest.TestCase):
1569
  def test(self):
1570
    for exp in [True, False]:
1571
      def _child():
1572
        return exp
1573

    
1574
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1575

    
1576
  def testArgs(self):
1577
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1578
      def _child(carg1, carg2):
1579
        return carg1 == "Foo" and carg2 == arg
1580

    
1581
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1582

    
1583
  def testPid(self):
1584
    parent_pid = os.getpid()
1585

    
1586
    def _check():
1587
      return os.getpid() == parent_pid
1588

    
1589
    self.failIf(utils.RunInSeparateProcess(_check))
1590

    
1591
  def testSignal(self):
1592
    def _kill():
1593
      os.kill(os.getpid(), signal.SIGTERM)
1594

    
1595
    self.assertRaises(errors.GenericError,
1596
                      utils.RunInSeparateProcess, _kill)
1597

    
1598
  def testException(self):
1599
    def _exc():
1600
      raise errors.GenericError("This is a test")
1601

    
1602
    self.assertRaises(errors.GenericError,
1603
                      utils.RunInSeparateProcess, _exc)
1604

    
1605

    
1606
class TestFingerprintFile(unittest.TestCase):
1607
  def setUp(self):
1608
    self.tmpfile = tempfile.NamedTemporaryFile()
1609

    
1610
  def test(self):
1611
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1612
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1613

    
1614
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1615
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1616
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1617

    
1618

    
1619
class TestUnescapeAndSplit(unittest.TestCase):
1620
  """Testing case for UnescapeAndSplit"""
1621

    
1622
  def setUp(self):
1623
    # testing more that one separator for regexp safety
1624
    self._seps = [",", "+", "."]
1625

    
1626
  def testSimple(self):
1627
    a = ["a", "b", "c", "d"]
1628
    for sep in self._seps:
1629
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1630

    
1631
  def testEscape(self):
1632
    for sep in self._seps:
1633
      a = ["a", "b\\" + sep + "c", "d"]
1634
      b = ["a", "b" + sep + "c", "d"]
1635
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1636

    
1637
  def testDoubleEscape(self):
1638
    for sep in self._seps:
1639
      a = ["a", "b\\\\", "c", "d"]
1640
      b = ["a", "b\\", "c", "d"]
1641
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1642

    
1643
  def testThreeEscape(self):
1644
    for sep in self._seps:
1645
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1646
      b = ["a", "b\\" + sep + "c", "d"]
1647
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1648

    
1649

    
1650
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1651
  def setUp(self):
1652
    self.tmpdir = tempfile.mkdtemp()
1653

    
1654
  def tearDown(self):
1655
    shutil.rmtree(self.tmpdir)
1656

    
1657
  def _checkRsaPrivateKey(self, key):
1658
    lines = key.splitlines()
1659
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1660
            "-----END RSA PRIVATE KEY-----" in lines)
1661

    
1662
  def _checkCertificate(self, cert):
1663
    lines = cert.splitlines()
1664
    return ("-----BEGIN CERTIFICATE-----" in lines and
1665
            "-----END CERTIFICATE-----" in lines)
1666

    
1667
  def test(self):
1668
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1669
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1670
      self._checkRsaPrivateKey(key_pem)
1671
      self._checkCertificate(cert_pem)
1672

    
1673
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1674
                                           key_pem)
1675
      self.assert_(key.bits() >= 1024)
1676
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1677
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1678

    
1679
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1680
                                             cert_pem)
1681
      self.failIf(x509.has_expired())
1682
      self.assertEqual(x509.get_issuer().CN, common_name)
1683
      self.assertEqual(x509.get_subject().CN, common_name)
1684
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1685

    
1686
  def testLegacy(self):
1687
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1688

    
1689
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1690

    
1691
    cert1 = utils.ReadFile(cert1_filename)
1692

    
1693
    self.assert_(self._checkRsaPrivateKey(cert1))
1694
    self.assert_(self._checkCertificate(cert1))
1695

    
1696

    
1697
class TestPathJoin(unittest.TestCase):
1698
  """Testing case for PathJoin"""
1699

    
1700
  def testBasicItems(self):
1701
    mlist = ["/a", "b", "c"]
1702
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1703

    
1704
  def testNonAbsPrefix(self):
1705
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1706

    
1707
  def testBackTrack(self):
1708
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1709

    
1710
  def testMultiAbs(self):
1711
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1712

    
1713

    
1714
class TestValidateServiceName(unittest.TestCase):
1715
  def testValid(self):
1716
    testnames = [
1717
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
1718
      "ganeti",
1719
      "gnt-masterd",
1720
      "HELLO_WORLD_SVC",
1721
      "hello.world.1",
1722
      "0", "80", "1111", "65535",
1723
      ]
1724

    
1725
    for name in testnames:
1726
      self.assertEqual(utils.ValidateServiceName(name), name)
1727

    
1728
  def testInvalid(self):
1729
    testnames = [
1730
      -15756, -1, 65536, 133428083,
1731
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1732
      "-8546", "-1", "65536",
1733
      (129 * "A"),
1734
      ]
1735

    
1736
    for name in testnames:
1737
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1738

    
1739

    
1740
class TestParseAsn1Generalizedtime(unittest.TestCase):
1741
  def test(self):
1742
    # UTC
1743
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1744
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1745
                     1266860512)
1746
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1747
                     (2**31) - 1)
1748

    
1749
    # With offset
1750
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1751
                     1266860512)
1752
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1753
                     1266931012)
1754
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1755
                     1266931088)
1756
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1757
                     1266931295)
1758
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1759
                     3600)
1760

    
1761
    # Leap seconds are not supported by datetime.datetime
1762
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1763
                      "19841231235960+0000")
1764
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1765
                      "19920630235960+0000")
1766

    
1767
    # Errors
1768
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1769
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1770
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1771
                      "20100222174152")
1772
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1773
                      "Mon Feb 22 17:47:02 UTC 2010")
1774
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1775
                      "2010-02-22 17:42:02")
1776

    
1777

    
1778
class TestGetX509CertValidity(testutils.GanetiTestCase):
1779
  def setUp(self):
1780
    testutils.GanetiTestCase.setUp(self)
1781

    
1782
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1783

    
1784
    # Test whether we have pyOpenSSL 0.7 or above
1785
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1786

    
1787
    if not self.pyopenssl0_7:
1788
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1789
                    " function correctly")
1790

    
1791
  def _LoadCert(self, name):
1792
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1793
                                           self._ReadTestData(name))
1794

    
1795
  def test(self):
1796
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1797
    if self.pyopenssl0_7:
1798
      self.assertEqual(validity, (1266919967, 1267524767))
1799
    else:
1800
      self.assertEqual(validity, (None, None))
1801

    
1802

    
1803
class TestSignX509Certificate(unittest.TestCase):
1804
  KEY = "My private key!"
1805
  KEY_OTHER = "Another key"
1806

    
1807
  def test(self):
1808
    # Generate certificate valid for 5 minutes
1809
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1810

    
1811
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1812
                                           cert_pem)
1813

    
1814
    # No signature at all
1815
    self.assertRaises(errors.GenericError,
1816
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1817

    
1818
    # Invalid input
1819
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1820
                      "", self.KEY)
1821
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1822
                      "X-Ganeti-Signature: \n", self.KEY)
1823
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1824
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1825
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1826
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1827
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1828
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1829

    
1830
    # Invalid salt
1831
    for salt in list("-_@$,:;/\\ \t\n"):
1832
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1833
                        cert_pem, self.KEY, "foo%sbar" % salt)
1834

    
1835
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
1836
                 utils.GenerateSecret(numbytes=4),
1837
                 utils.GenerateSecret(numbytes=16),
1838
                 "{123:456}".encode("hex")]:
1839
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1840

    
1841
      self._Check(cert, salt, signed_pem)
1842

    
1843
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1844
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1845
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1846
                               "lines----\n------ at\nthe end!"))
1847

    
1848
  def _Check(self, cert, salt, pem):
1849
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1850
    self.assertEqual(salt, salt2)
1851
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1852

    
1853
    # Other key
1854
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1855
                      pem, self.KEY_OTHER)
1856

    
1857

    
1858
class TestMakedirs(unittest.TestCase):
1859
  def setUp(self):
1860
    self.tmpdir = tempfile.mkdtemp()
1861

    
1862
  def tearDown(self):
1863
    shutil.rmtree(self.tmpdir)
1864

    
1865
  def testNonExisting(self):
1866
    path = PathJoin(self.tmpdir, "foo")
1867
    utils.Makedirs(path)
1868
    self.assert_(os.path.isdir(path))
1869

    
1870
  def testExisting(self):
1871
    path = PathJoin(self.tmpdir, "foo")
1872
    os.mkdir(path)
1873
    utils.Makedirs(path)
1874
    self.assert_(os.path.isdir(path))
1875

    
1876
  def testRecursiveNonExisting(self):
1877
    path = PathJoin(self.tmpdir, "foo/bar/baz")
1878
    utils.Makedirs(path)
1879
    self.assert_(os.path.isdir(path))
1880

    
1881
  def testRecursiveExisting(self):
1882
    path = PathJoin(self.tmpdir, "B/moo/xyz")
1883
    self.assertFalse(os.path.exists(path))
1884
    os.mkdir(PathJoin(self.tmpdir, "B"))
1885
    utils.Makedirs(path)
1886
    self.assert_(os.path.isdir(path))
1887

    
1888

    
1889
class TestRetry(testutils.GanetiTestCase):
1890
  def setUp(self):
1891
    testutils.GanetiTestCase.setUp(self)
1892
    self.retries = 0
1893

    
1894
  @staticmethod
1895
  def _RaiseRetryAgain():
1896
    raise utils.RetryAgain()
1897

    
1898
  @staticmethod
1899
  def _RaiseRetryAgainWithArg(args):
1900
    raise utils.RetryAgain(*args)
1901

    
1902
  def _WrongNestedLoop(self):
1903
    return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
1904

    
1905
  def _RetryAndSucceed(self, retries):
1906
    if self.retries < retries:
1907
      self.retries += 1
1908
      raise utils.RetryAgain()
1909
    else:
1910
      return True
1911

    
1912
  def testRaiseTimeout(self):
1913
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1914
                          self._RaiseRetryAgain, 0.01, 0.02)
1915
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1916
                          self._RetryAndSucceed, 0.01, 0, args=[1])
1917
    self.failUnlessEqual(self.retries, 1)
1918

    
1919
  def testComplete(self):
1920
    self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
1921
    self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
1922
                         True)
1923
    self.failUnlessEqual(self.retries, 2)
1924

    
1925
  def testNestedLoop(self):
1926
    try:
1927
      self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
1928
                            self._WrongNestedLoop, 0, 1)
1929
    except utils.RetryTimeout:
1930
      self.fail("Didn't detect inner loop's exception")
1931

    
1932
  def testTimeoutArgument(self):
1933
    retry_arg="my_important_debugging_message"
1934
    try:
1935
      utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
1936
    except utils.RetryTimeout, err:
1937
      self.failUnlessEqual(err.args, (retry_arg, ))
1938
    else:
1939
      self.fail("Expected timeout didn't happen")
1940

    
1941
  def testRaiseInnerWithExc(self):
1942
    retry_arg="my_important_debugging_message"
1943
    try:
1944
      try:
1945
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1946
                    args=[[errors.GenericError(retry_arg, retry_arg)]])
1947
      except utils.RetryTimeout, err:
1948
        err.RaiseInner()
1949
      else:
1950
        self.fail("Expected timeout didn't happen")
1951
    except errors.GenericError, err:
1952
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1953
    else:
1954
      self.fail("Expected GenericError didn't happen")
1955

    
1956
  def testRaiseInnerWithMsg(self):
1957
    retry_arg="my_important_debugging_message"
1958
    try:
1959
      try:
1960
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1961
                    args=[[retry_arg, retry_arg]])
1962
      except utils.RetryTimeout, err:
1963
        err.RaiseInner()
1964
      else:
1965
        self.fail("Expected timeout didn't happen")
1966
    except utils.RetryTimeout, err:
1967
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1968
    else:
1969
      self.fail("Expected RetryTimeout didn't happen")
1970

    
1971

    
1972
class TestLineSplitter(unittest.TestCase):
1973
  def test(self):
1974
    lines = []
1975
    ls = utils.LineSplitter(lines.append)
1976
    ls.write("Hello World\n")
1977
    self.assertEqual(lines, [])
1978
    ls.write("Foo\n Bar\r\n ")
1979
    ls.write("Baz")
1980
    ls.write("Moo")
1981
    self.assertEqual(lines, [])
1982
    ls.flush()
1983
    self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
1984
    ls.close()
1985
    self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
1986

    
1987
  def _testExtra(self, line, all_lines, p1, p2):
1988
    self.assertEqual(p1, 999)
1989
    self.assertEqual(p2, "extra")
1990
    all_lines.append(line)
1991

    
1992
  def testExtraArgsNoFlush(self):
1993
    lines = []
1994
    ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
1995
    ls.write("\n\nHello World\n")
1996
    ls.write("Foo\n Bar\r\n ")
1997
    ls.write("")
1998
    ls.write("Baz")
1999
    ls.write("Moo\n\nx\n")
2000
    self.assertEqual(lines, [])
2001
    ls.close()
2002
    self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2003
                             "", "x"])
2004

    
2005

    
2006
class TestReadLockedPidFile(unittest.TestCase):
2007
  def setUp(self):
2008
    self.tmpdir = tempfile.mkdtemp()
2009

    
2010
  def tearDown(self):
2011
    shutil.rmtree(self.tmpdir)
2012

    
2013
  def testNonExistent(self):
2014
    path = PathJoin(self.tmpdir, "nonexist")
2015
    self.assert_(utils.ReadLockedPidFile(path) is None)
2016

    
2017
  def testUnlocked(self):
2018
    path = PathJoin(self.tmpdir, "pid")
2019
    utils.WriteFile(path, data="123")
2020
    self.assert_(utils.ReadLockedPidFile(path) is None)
2021

    
2022
  def testLocked(self):
2023
    path = PathJoin(self.tmpdir, "pid")
2024
    utils.WriteFile(path, data="123")
2025

    
2026
    fl = utils.FileLock.Open(path)
2027
    try:
2028
      fl.Exclusive(blocking=True)
2029

    
2030
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
2031
    finally:
2032
      fl.Close()
2033

    
2034
    self.assert_(utils.ReadLockedPidFile(path) is None)
2035

    
2036
  def testError(self):
2037
    path = PathJoin(self.tmpdir, "foobar", "pid")
2038
    utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2039
    # open(2) should return ENOTDIR
2040
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2041

    
2042

    
2043
class TestCertVerification(testutils.GanetiTestCase):
2044
  def setUp(self):
2045
    testutils.GanetiTestCase.setUp(self)
2046

    
2047
    self.tmpdir = tempfile.mkdtemp()
2048

    
2049
  def tearDown(self):
2050
    shutil.rmtree(self.tmpdir)
2051

    
2052
  def testVerifyCertificate(self):
2053
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2054
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2055
                                           cert_pem)
2056

    
2057
    # Not checking return value as this certificate is expired
2058
    utils.VerifyX509Certificate(cert, 30, 7)
2059

    
2060

    
2061
class TestVerifyCertificateInner(unittest.TestCase):
2062
  def test(self):
2063
    vci = utils._VerifyCertificateInner
2064

    
2065
    # Valid
2066
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2067
                     (None, None))
2068

    
2069
    # Not yet valid
2070
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2071
    self.assertEqual(errcode, utils.CERT_WARNING)
2072

    
2073
    # Expiring soon
2074
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2075
    self.assertEqual(errcode, utils.CERT_ERROR)
2076

    
2077
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2078
    self.assertEqual(errcode, utils.CERT_WARNING)
2079

    
2080
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2081
    self.assertEqual(errcode, None)
2082

    
2083
    # Expired
2084
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2085
    self.assertEqual(errcode, utils.CERT_ERROR)
2086

    
2087
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2088
    self.assertEqual(errcode, utils.CERT_ERROR)
2089

    
2090
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2091
    self.assertEqual(errcode, utils.CERT_ERROR)
2092

    
2093
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2094
    self.assertEqual(errcode, utils.CERT_ERROR)
2095

    
2096

    
2097
class TestHmacFunctions(unittest.TestCase):
2098
  # Digests can be checked with "openssl sha1 -hmac $key"
2099
  def testSha1Hmac(self):
2100
    self.assertEqual(utils.Sha1Hmac("", ""),
2101
                     "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2102
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2103
                     "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2104
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2105
                     "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2106

    
2107
    longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2108
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2109
                     "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2110

    
2111
  def testSha1HmacSalt(self):
2112
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2113
                     "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2114
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2115
                     "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2116
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2117
                     "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2118

    
2119
  def testVerifySha1Hmac(self):
2120
    self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2121
                                               "7d64b71fb76370690e1d")))
2122
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2123
                                      ("f904c2476527c6d3e660"
2124
                                       "9ab683c66fa0652cb1dc")))
2125

    
2126
    digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2127
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2128
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2129
                                      digest.lower()))
2130
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2131
                                      digest.upper()))
2132
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2133
                                      digest.title()))
2134

    
2135
  def testVerifySha1HmacSalt(self):
2136
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2137
                                      ("17a4adc34d69c0d367d4"
2138
                                       "ffbef96fd41d4df7a6e8"),
2139
                                      salt="abc9"))
2140
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2141
                                      ("7f264f8114c9066afc9b"
2142
                                       "b7636e1786d996d3cc0d"),
2143
                                      salt="xyz0"))
2144

    
2145

    
2146
class TestIgnoreSignals(unittest.TestCase):
2147
  """Test the IgnoreSignals decorator"""
2148

    
2149
  @staticmethod
2150
  def _Raise(exception):
2151
    raise exception
2152

    
2153
  @staticmethod
2154
  def _Return(rval):
2155
    return rval
2156

    
2157
  def testIgnoreSignals(self):
2158
    sock_err_intr = socket.error(errno.EINTR, "Message")
2159
    sock_err_inval = socket.error(errno.EINVAL, "Message")
2160

    
2161
    env_err_intr = EnvironmentError(errno.EINTR, "Message")
2162
    env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2163

    
2164
    self.assertRaises(socket.error, self._Raise, sock_err_intr)
2165
    self.assertRaises(socket.error, self._Raise, sock_err_inval)
2166
    self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2167
    self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2168

    
2169
    self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2170
    self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2171
    self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2172
                      sock_err_inval)
2173
    self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2174
                      env_err_inval)
2175

    
2176
    self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2177
    self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2178

    
2179

    
2180
class TestEnsureDirs(unittest.TestCase):
2181
  """Tests for EnsureDirs"""
2182

    
2183
  def setUp(self):
2184
    self.dir = tempfile.mkdtemp()
2185
    self.old_umask = os.umask(0777)
2186

    
2187
  def testEnsureDirs(self):
2188
    utils.EnsureDirs([
2189
        (PathJoin(self.dir, "foo"), 0777),
2190
        (PathJoin(self.dir, "bar"), 0000),
2191
        ])
2192
    self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2193
    self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2194

    
2195
  def tearDown(self):
2196
    os.rmdir(PathJoin(self.dir, "foo"))
2197
    os.rmdir(PathJoin(self.dir, "bar"))
2198
    os.rmdir(self.dir)
2199
    os.umask(self.old_umask)
2200

    
2201

    
2202
class TestFormatSeconds(unittest.TestCase):
2203
  def test(self):
2204
    self.assertEqual(utils.FormatSeconds(1), "1s")
2205
    self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2206
    self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2207
    self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2208
    self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2209
    self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2210
    self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2211
    self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2212
    self.assertEqual(utils.FormatSeconds(-1), "-1s")
2213
    self.assertEqual(utils.FormatSeconds(-282), "-282s")
2214
    self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2215

    
2216
  def testFloat(self):
2217
    self.assertEqual(utils.FormatSeconds(1.3), "1s")
2218
    self.assertEqual(utils.FormatSeconds(1.9), "2s")
2219
    self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2220
    self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2221

    
2222

    
2223
class TestIgnoreProcessNotFound(unittest.TestCase):
2224
  @staticmethod
2225
  def _WritePid(fd):
2226
    os.write(fd, str(os.getpid()))
2227
    os.close(fd)
2228
    return True
2229

    
2230
  def test(self):
2231
    (pid_read_fd, pid_write_fd) = os.pipe()
2232

    
2233
    # Start short-lived process which writes its PID to pipe
2234
    self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2235
    os.close(pid_write_fd)
2236

    
2237
    # Read PID from pipe
2238
    pid = int(os.read(pid_read_fd, 1024))
2239
    os.close(pid_read_fd)
2240

    
2241
    # Try to send signal to process which exited recently
2242
    self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2243

    
2244

    
2245
if __name__ == '__main__':
2246
  testutils.GanetiTestProgram()