Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 59525e1f

History | View | Annotate | Download (78.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import distutils.version
25
import errno
26
import fcntl
27
import glob
28
import os
29
import os.path
30
import re
31
import shutil
32
import signal
33
import socket
34
import stat
35
import string
36
import tempfile
37
import time
38
import unittest
39
import warnings
40
import OpenSSL
41
from cStringIO import StringIO
42

    
43
import testutils
44
from ganeti import constants
45
from ganeti import compat
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
49
     ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
50
     TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
51
     ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
52

    
53

    
54
class TestIsProcessAlive(unittest.TestCase):
55
  """Testing case for IsProcessAlive"""
56

    
57
  def testExists(self):
58
    mypid = os.getpid()
59
    self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
60

    
61
  def testNotExisting(self):
62
    pid_non_existing = os.fork()
63
    if pid_non_existing == 0:
64
      os._exit(0)
65
    elif pid_non_existing < 0:
66
      raise SystemError("can't fork")
67
    os.waitpid(pid_non_existing, 0)
68
    self.assertFalse(utils.IsProcessAlive(pid_non_existing),
69
                     "nonexisting process detected")
70

    
71

    
72
class TestGetProcStatusPath(unittest.TestCase):
73
  def test(self):
74
    self.assert_("/1234/" in utils._GetProcStatusPath(1234))
75
    self.assertNotEqual(utils._GetProcStatusPath(1),
76
                        utils._GetProcStatusPath(2))
77

    
78

    
79
class TestIsProcessHandlingSignal(unittest.TestCase):
80
  def setUp(self):
81
    self.tmpdir = tempfile.mkdtemp()
82

    
83
  def tearDown(self):
84
    shutil.rmtree(self.tmpdir)
85

    
86
  def testParseSigsetT(self):
87
    self.assertEqual(len(utils._ParseSigsetT("0")), 0)
88
    self.assertEqual(utils._ParseSigsetT("1"), set([1]))
89
    self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
90
    self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
91
    self.assertEqual(utils._ParseSigsetT("0000000180000202"),
92
                     set([2, 10, 32, 33]))
93
    self.assertEqual(utils._ParseSigsetT("0000000180000002"),
94
                     set([2, 32, 33]))
95
    self.assertEqual(utils._ParseSigsetT("0000000188000002"),
96
                     set([2, 28, 32, 33]))
97
    self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
98
                     set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
99
                          24, 25, 26, 28, 31]))
100
    self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
101

    
102
  def testGetProcStatusField(self):
103
    for field in ["SigCgt", "Name", "FDSize"]:
104
      for value in ["", "0", "cat", "  1234 KB"]:
105
        pstatus = "\n".join([
106
          "VmPeak: 999 kB",
107
          "%s: %s" % (field, value),
108
          "TracerPid: 0",
109
          ])
110
        result = utils._GetProcStatusField(pstatus, field)
111
        self.assertEqual(result, value.strip())
112

    
113
  def test(self):
114
    sp = PathJoin(self.tmpdir, "status")
115

    
116
    utils.WriteFile(sp, data="\n".join([
117
      "Name:   bash",
118
      "State:  S (sleeping)",
119
      "SleepAVG:       98%",
120
      "Pid:    22250",
121
      "PPid:   10858",
122
      "TracerPid:      0",
123
      "SigBlk: 0000000000010000",
124
      "SigIgn: 0000000000384004",
125
      "SigCgt: 000000004b813efb",
126
      "CapEff: 0000000000000000",
127
      ]))
128

    
129
    self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
130

    
131
  def testNoSigCgt(self):
132
    sp = PathJoin(self.tmpdir, "status")
133

    
134
    utils.WriteFile(sp, data="\n".join([
135
      "Name:   bash",
136
      ]))
137

    
138
    self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
139
                      1234, 10, status_path=sp)
140

    
141
  def testNoSuchFile(self):
142
    sp = PathJoin(self.tmpdir, "notexist")
143

    
144
    self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
145

    
146
  @staticmethod
147
  def _TestRealProcess():
148
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
149
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
150
      raise Exception("SIGUSR1 is handled when it should not be")
151

    
152
    signal.signal(signal.SIGUSR1, lambda signum, frame: None)
153
    if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
154
      raise Exception("SIGUSR1 is not handled when it should be")
155

    
156
    signal.signal(signal.SIGUSR1, signal.SIG_IGN)
157
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
158
      raise Exception("SIGUSR1 is not handled when it should be")
159

    
160
    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
161
    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
162
      raise Exception("SIGUSR1 is handled when it should not be")
163

    
164
    return True
165

    
166
  def testRealProcess(self):
167
    self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
168

    
169

    
170
class TestPidFileFunctions(unittest.TestCase):
171
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
172

    
173
  def setUp(self):
174
    self.dir = tempfile.mkdtemp()
175
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
176
    utils.DaemonPidFileName = self.f_dpn
177

    
178
  def testPidFileFunctions(self):
179
    pid_file = self.f_dpn('test')
180
    utils.WritePidFile('test')
181
    self.failUnless(os.path.exists(pid_file),
182
                    "PID file should have been created")
183
    read_pid = utils.ReadPidFile(pid_file)
184
    self.failUnlessEqual(read_pid, os.getpid())
185
    self.failUnless(utils.IsProcessAlive(read_pid))
186
    self.failUnlessRaises(errors.GenericError, utils.WritePidFile, 'test')
187
    utils.RemovePidFile('test')
188
    self.failIf(os.path.exists(pid_file),
189
                "PID file should not exist anymore")
190
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
191
                         "ReadPidFile should return 0 for missing pid file")
192
    fh = open(pid_file, "w")
193
    fh.write("blah\n")
194
    fh.close()
195
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
196
                         "ReadPidFile should return 0 for invalid pid file")
197
    utils.RemovePidFile('test')
198
    self.failIf(os.path.exists(pid_file),
199
                "PID file should not exist anymore")
200

    
201
  def testKill(self):
202
    pid_file = self.f_dpn('child')
203
    r_fd, w_fd = os.pipe()
204
    new_pid = os.fork()
205
    if new_pid == 0: #child
206
      utils.WritePidFile('child')
207
      os.write(w_fd, 'a')
208
      signal.pause()
209
      os._exit(0)
210
      return
211
    # else we are in the parent
212
    # wait until the child has written the pid file
213
    os.read(r_fd, 1)
214
    read_pid = utils.ReadPidFile(pid_file)
215
    self.failUnlessEqual(read_pid, new_pid)
216
    self.failUnless(utils.IsProcessAlive(new_pid))
217
    utils.KillProcess(new_pid, waitpid=True)
218
    self.failIf(utils.IsProcessAlive(new_pid))
219
    utils.RemovePidFile('child')
220
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
221

    
222
  def tearDown(self):
223
    for name in os.listdir(self.dir):
224
      os.unlink(os.path.join(self.dir, name))
225
    os.rmdir(self.dir)
226

    
227

    
228
class TestRunCmd(testutils.GanetiTestCase):
229
  """Testing case for the RunCmd function"""
230

    
231
  def setUp(self):
232
    testutils.GanetiTestCase.setUp(self)
233
    self.magic = time.ctime() + " ganeti test"
234
    self.fname = self._CreateTempFile()
235

    
236
  def testOk(self):
237
    """Test successful exit code"""
238
    result = RunCmd("/bin/sh -c 'exit 0'")
239
    self.assertEqual(result.exit_code, 0)
240
    self.assertEqual(result.output, "")
241

    
242
  def testFail(self):
243
    """Test fail exit code"""
244
    result = RunCmd("/bin/sh -c 'exit 1'")
245
    self.assertEqual(result.exit_code, 1)
246
    self.assertEqual(result.output, "")
247

    
248
  def testStdout(self):
249
    """Test standard output"""
250
    cmd = 'echo -n "%s"' % self.magic
251
    result = RunCmd("/bin/sh -c '%s'" % cmd)
252
    self.assertEqual(result.stdout, self.magic)
253
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
254
    self.assertEqual(result.output, "")
255
    self.assertFileContent(self.fname, self.magic)
256

    
257
  def testStderr(self):
258
    """Test standard error"""
259
    cmd = 'echo -n "%s"' % self.magic
260
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
261
    self.assertEqual(result.stderr, self.magic)
262
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
263
    self.assertEqual(result.output, "")
264
    self.assertFileContent(self.fname, self.magic)
265

    
266
  def testCombined(self):
267
    """Test combined output"""
268
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
269
    expected = "A" + self.magic + "B" + self.magic
270
    result = RunCmd("/bin/sh -c '%s'" % cmd)
271
    self.assertEqual(result.output, expected)
272
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
273
    self.assertEqual(result.output, "")
274
    self.assertFileContent(self.fname, expected)
275

    
276
  def testSignal(self):
277
    """Test signal"""
278
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
279
    self.assertEqual(result.signal, 15)
280
    self.assertEqual(result.output, "")
281

    
282
  def testListRun(self):
283
    """Test list runs"""
284
    result = RunCmd(["true"])
285
    self.assertEqual(result.signal, None)
286
    self.assertEqual(result.exit_code, 0)
287
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
288
    self.assertEqual(result.signal, None)
289
    self.assertEqual(result.exit_code, 1)
290
    result = RunCmd(["echo", "-n", self.magic])
291
    self.assertEqual(result.signal, None)
292
    self.assertEqual(result.exit_code, 0)
293
    self.assertEqual(result.stdout, self.magic)
294

    
295
  def testFileEmptyOutput(self):
296
    """Test file output"""
297
    result = RunCmd(["true"], output=self.fname)
298
    self.assertEqual(result.signal, None)
299
    self.assertEqual(result.exit_code, 0)
300
    self.assertFileContent(self.fname, "")
301

    
302
  def testLang(self):
303
    """Test locale environment"""
304
    old_env = os.environ.copy()
305
    try:
306
      os.environ["LANG"] = "en_US.UTF-8"
307
      os.environ["LC_ALL"] = "en_US.UTF-8"
308
      result = RunCmd(["locale"])
309
      for line in result.output.splitlines():
310
        key, value = line.split("=", 1)
311
        # Ignore these variables, they're overridden by LC_ALL
312
        if key == "LANG" or key == "LANGUAGE":
313
          continue
314
        self.failIf(value and value != "C" and value != '"C"',
315
            "Variable %s is set to the invalid value '%s'" % (key, value))
316
    finally:
317
      os.environ = old_env
318

    
319
  def testDefaultCwd(self):
320
    """Test default working directory"""
321
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
322

    
323
  def testCwd(self):
324
    """Test default working directory"""
325
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
326
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
327
    cwd = os.getcwd()
328
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
329

    
330
  def testResetEnv(self):
331
    """Test environment reset functionality"""
332
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
333
    self.failUnlessEqual(RunCmd(["env"], reset_env=True,
334
                                env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
335

    
336

    
337
class TestRunParts(unittest.TestCase):
338
  """Testing case for the RunParts function"""
339

    
340
  def setUp(self):
341
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
342

    
343
  def tearDown(self):
344
    shutil.rmtree(self.rundir)
345

    
346
  def testEmpty(self):
347
    """Test on an empty dir"""
348
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
349

    
350
  def testSkipWrongName(self):
351
    """Test that wrong files are skipped"""
352
    fname = os.path.join(self.rundir, "00test.dot")
353
    utils.WriteFile(fname, data="")
354
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
355
    relname = os.path.basename(fname)
356
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
357
                         [(relname, constants.RUNPARTS_SKIP, None)])
358

    
359
  def testSkipNonExec(self):
360
    """Test that non executable files are skipped"""
361
    fname = os.path.join(self.rundir, "00test")
362
    utils.WriteFile(fname, data="")
363
    relname = os.path.basename(fname)
364
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
365
                         [(relname, constants.RUNPARTS_SKIP, None)])
366

    
367
  def testError(self):
368
    """Test error on a broken executable"""
369
    fname = os.path.join(self.rundir, "00test")
370
    utils.WriteFile(fname, data="")
371
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
372
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
373
    self.failUnlessEqual(relname, os.path.basename(fname))
374
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
375
    self.failUnless(error)
376

    
377
  def testSorted(self):
378
    """Test executions are sorted"""
379
    files = []
380
    files.append(os.path.join(self.rundir, "64test"))
381
    files.append(os.path.join(self.rundir, "00test"))
382
    files.append(os.path.join(self.rundir, "42test"))
383

    
384
    for fname in files:
385
      utils.WriteFile(fname, data="")
386

    
387
    results = RunParts(self.rundir, reset_env=True)
388

    
389
    for fname in sorted(files):
390
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
391

    
392
  def testOk(self):
393
    """Test correct execution"""
394
    fname = os.path.join(self.rundir, "00test")
395
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
396
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
397
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
398
    self.failUnlessEqual(relname, os.path.basename(fname))
399
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
400
    self.failUnlessEqual(runresult.stdout, "ciao")
401

    
402
  def testRunFail(self):
403
    """Test correct execution, with run failure"""
404
    fname = os.path.join(self.rundir, "00test")
405
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
406
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
407
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
408
    self.failUnlessEqual(relname, os.path.basename(fname))
409
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
410
    self.failUnlessEqual(runresult.exit_code, 1)
411
    self.failUnless(runresult.failed)
412

    
413
  def testRunMix(self):
414
    files = []
415
    files.append(os.path.join(self.rundir, "00test"))
416
    files.append(os.path.join(self.rundir, "42test"))
417
    files.append(os.path.join(self.rundir, "64test"))
418
    files.append(os.path.join(self.rundir, "99test"))
419

    
420
    files.sort()
421

    
422
    # 1st has errors in execution
423
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
424
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
425

    
426
    # 2nd is skipped
427
    utils.WriteFile(files[1], data="")
428

    
429
    # 3rd cannot execute properly
430
    utils.WriteFile(files[2], data="")
431
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
432

    
433
    # 4th execs
434
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
435
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
436

    
437
    results = RunParts(self.rundir, reset_env=True)
438

    
439
    (relname, status, runresult) = results[0]
440
    self.failUnlessEqual(relname, os.path.basename(files[0]))
441
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
442
    self.failUnlessEqual(runresult.exit_code, 1)
443
    self.failUnless(runresult.failed)
444

    
445
    (relname, status, runresult) = results[1]
446
    self.failUnlessEqual(relname, os.path.basename(files[1]))
447
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
448
    self.failUnlessEqual(runresult, None)
449

    
450
    (relname, status, runresult) = results[2]
451
    self.failUnlessEqual(relname, os.path.basename(files[2]))
452
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
453
    self.failUnless(runresult)
454

    
455
    (relname, status, runresult) = results[3]
456
    self.failUnlessEqual(relname, os.path.basename(files[3]))
457
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
458
    self.failUnlessEqual(runresult.output, "ciao")
459
    self.failUnlessEqual(runresult.exit_code, 0)
460
    self.failUnless(not runresult.failed)
461

    
462

    
463
class TestStartDaemon(testutils.GanetiTestCase):
464
  def setUp(self):
465
    self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
466
    self.tmpfile = os.path.join(self.tmpdir, "test")
467

    
468
  def tearDown(self):
469
    shutil.rmtree(self.tmpdir)
470

    
471
  def testShell(self):
472
    utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
473
    self._wait(self.tmpfile, 60.0, "Hello World")
474

    
475
  def testShellOutput(self):
476
    utils.StartDaemon("echo Hello World", output=self.tmpfile)
477
    self._wait(self.tmpfile, 60.0, "Hello World")
478

    
479
  def testNoShellNoOutput(self):
480
    utils.StartDaemon(["pwd"])
481

    
482
  def testNoShellNoOutputTouch(self):
483
    testfile = os.path.join(self.tmpdir, "check")
484
    self.failIf(os.path.exists(testfile))
485
    utils.StartDaemon(["touch", testfile])
486
    self._wait(testfile, 60.0, "")
487

    
488
  def testNoShellOutput(self):
489
    utils.StartDaemon(["pwd"], output=self.tmpfile)
490
    self._wait(self.tmpfile, 60.0, "/")
491

    
492
  def testNoShellOutputCwd(self):
493
    utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
494
    self._wait(self.tmpfile, 60.0, os.getcwd())
495

    
496
  def testShellEnv(self):
497
    utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
498
                      env={ "GNT_TEST_VAR": "Hello World", })
499
    self._wait(self.tmpfile, 60.0, "Hello World")
500

    
501
  def testNoShellEnv(self):
502
    utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
503
                      env={ "GNT_TEST_VAR": "Hello World", })
504
    self._wait(self.tmpfile, 60.0, "Hello World")
505

    
506
  def testOutputFd(self):
507
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
508
    try:
509
      utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
510
    finally:
511
      os.close(fd)
512
    self._wait(self.tmpfile, 60.0, os.getcwd())
513

    
514
  def testPid(self):
515
    pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
516
    self._wait(self.tmpfile, 60.0, str(pid))
517

    
518
  def testPidFile(self):
519
    pidfile = os.path.join(self.tmpdir, "pid")
520
    checkfile = os.path.join(self.tmpdir, "abort")
521

    
522
    pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
523
                            output=self.tmpfile)
524
    try:
525
      fd = os.open(pidfile, os.O_RDONLY)
526
      try:
527
        # Check file is locked
528
        self.assertRaises(errors.LockError, utils.LockFile, fd)
529

    
530
        pidtext = os.read(fd, 100)
531
      finally:
532
        os.close(fd)
533

    
534
      self.assertEqual(int(pidtext.strip()), pid)
535

    
536
      self.assert_(utils.IsProcessAlive(pid))
537
    finally:
538
      # No matter what happens, kill daemon
539
      utils.KillProcess(pid, timeout=5.0, waitpid=False)
540
      self.failIf(utils.IsProcessAlive(pid))
541

    
542
    self.assertEqual(utils.ReadFile(self.tmpfile), "")
543

    
544
  def _wait(self, path, timeout, expected):
545
    # Due to the asynchronous nature of daemon processes, polling is necessary.
546
    # A timeout makes sure the test doesn't hang forever.
547
    def _CheckFile():
548
      if not (os.path.isfile(path) and
549
              utils.ReadFile(path).strip() == expected):
550
        raise utils.RetryAgain()
551

    
552
    try:
553
      utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
554
    except utils.RetryTimeout:
555
      self.fail("Apparently the daemon didn't run in %s seconds and/or"
556
                " didn't write the correct output" % timeout)
557

    
558
  def testError(self):
559
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
560
                      ["./does-NOT-EXIST/here/0123456789"])
561
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
562
                      ["./does-NOT-EXIST/here/0123456789"],
563
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
564
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
565
                      ["./does-NOT-EXIST/here/0123456789"],
566
                      cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
567
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
568
                      ["./does-NOT-EXIST/here/0123456789"],
569
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
570

    
571
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
572
    try:
573
      self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
574
                        ["./does-NOT-EXIST/here/0123456789"],
575
                        output=self.tmpfile, output_fd=fd)
576
    finally:
577
      os.close(fd)
578

    
579

    
580
class TestSetCloseOnExecFlag(unittest.TestCase):
581
  """Tests for SetCloseOnExecFlag"""
582

    
583
  def setUp(self):
584
    self.tmpfile = tempfile.TemporaryFile()
585

    
586
  def testEnable(self):
587
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
588
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
589
                    fcntl.FD_CLOEXEC)
590

    
591
  def testDisable(self):
592
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
593
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
594
                fcntl.FD_CLOEXEC)
595

    
596

    
597
class TestSetNonblockFlag(unittest.TestCase):
598
  def setUp(self):
599
    self.tmpfile = tempfile.TemporaryFile()
600

    
601
  def testEnable(self):
602
    utils.SetNonblockFlag(self.tmpfile.fileno(), True)
603
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
604
                    os.O_NONBLOCK)
605

    
606
  def testDisable(self):
607
    utils.SetNonblockFlag(self.tmpfile.fileno(), False)
608
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
609
                os.O_NONBLOCK)
610

    
611

    
612
class TestRemoveFile(unittest.TestCase):
613
  """Test case for the RemoveFile function"""
614

    
615
  def setUp(self):
616
    """Create a temp dir and file for each case"""
617
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
618
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
619
    os.close(fd)
620

    
621
  def tearDown(self):
622
    if os.path.exists(self.tmpfile):
623
      os.unlink(self.tmpfile)
624
    os.rmdir(self.tmpdir)
625

    
626
  def testIgnoreDirs(self):
627
    """Test that RemoveFile() ignores directories"""
628
    self.assertEqual(None, RemoveFile(self.tmpdir))
629

    
630
  def testIgnoreNotExisting(self):
631
    """Test that RemoveFile() ignores non-existing files"""
632
    RemoveFile(self.tmpfile)
633
    RemoveFile(self.tmpfile)
634

    
635
  def testRemoveFile(self):
636
    """Test that RemoveFile does remove a file"""
637
    RemoveFile(self.tmpfile)
638
    if os.path.exists(self.tmpfile):
639
      self.fail("File '%s' not removed" % self.tmpfile)
640

    
641
  def testRemoveSymlink(self):
642
    """Test that RemoveFile does remove symlinks"""
643
    symlink = self.tmpdir + "/symlink"
644
    os.symlink("no-such-file", symlink)
645
    RemoveFile(symlink)
646
    if os.path.exists(symlink):
647
      self.fail("File '%s' not removed" % symlink)
648
    os.symlink(self.tmpfile, symlink)
649
    RemoveFile(symlink)
650
    if os.path.exists(symlink):
651
      self.fail("File '%s' not removed" % symlink)
652

    
653

    
654
class TestRename(unittest.TestCase):
655
  """Test case for RenameFile"""
656

    
657
  def setUp(self):
658
    """Create a temporary directory"""
659
    self.tmpdir = tempfile.mkdtemp()
660
    self.tmpfile = os.path.join(self.tmpdir, "test1")
661

    
662
    # Touch the file
663
    open(self.tmpfile, "w").close()
664

    
665
  def tearDown(self):
666
    """Remove temporary directory"""
667
    shutil.rmtree(self.tmpdir)
668

    
669
  def testSimpleRename1(self):
670
    """Simple rename 1"""
671
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
672
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
673

    
674
  def testSimpleRename2(self):
675
    """Simple rename 2"""
676
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
677
                     mkdir=True)
678
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
679

    
680
  def testRenameMkdir(self):
681
    """Rename with mkdir"""
682
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
683
                     mkdir=True)
684
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
685
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
686

    
687
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
688
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
689
                     mkdir=True)
690
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
691
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
692
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
693

    
694

    
695
class TestMatchNameComponent(unittest.TestCase):
696
  """Test case for the MatchNameComponent function"""
697

    
698
  def testEmptyList(self):
699
    """Test that there is no match against an empty list"""
700

    
701
    self.failUnlessEqual(MatchNameComponent("", []), None)
702
    self.failUnlessEqual(MatchNameComponent("test", []), None)
703

    
704
  def testSingleMatch(self):
705
    """Test that a single match is performed correctly"""
706
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
707
    for key in "test2", "test2.example", "test2.example.com":
708
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
709

    
710
  def testMultipleMatches(self):
711
    """Test that a multiple match is returned as None"""
712
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
713
    for key in "test1", "test1.example":
714
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
715

    
716
  def testFullMatch(self):
717
    """Test that a full match is returned correctly"""
718
    key1 = "test1"
719
    key2 = "test1.example"
720
    mlist = [key2, key2 + ".com"]
721
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
722
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
723

    
724
  def testCaseInsensitivePartialMatch(self):
725
    """Test for the case_insensitive keyword"""
726
    mlist = ["test1.example.com", "test2.example.net"]
727
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
728
                     "test2.example.net")
729
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
730
                     "test2.example.net")
731
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
732
                     "test2.example.net")
733
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
734
                     "test2.example.net")
735

    
736

    
737
  def testCaseInsensitiveFullMatch(self):
738
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
739
    # Between the two ts1 a full string match non-case insensitive should work
740
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
741
                     None)
742
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
743
                     "ts1.ex")
744
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
745
                     "ts1.ex")
746
    # Between the two ts2 only case differs, so only case-match works
747
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
748
                     "ts2.ex")
749
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
750
                     "Ts2.ex")
751
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
752
                     None)
753

    
754

    
755
class TestReadFile(testutils.GanetiTestCase):
756

    
757
  def testReadAll(self):
758
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
759
    self.assertEqual(len(data), 814)
760

    
761
    h = compat.md5_hash()
762
    h.update(data)
763
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
764

    
765
  def testReadSize(self):
766
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
767
                          size=100)
768
    self.assertEqual(len(data), 100)
769

    
770
    h = compat.md5_hash()
771
    h.update(data)
772
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
773

    
774
  def testError(self):
775
    self.assertRaises(EnvironmentError, utils.ReadFile,
776
                      "/dev/null/does-not-exist")
777

    
778

    
779
class TestReadOneLineFile(testutils.GanetiTestCase):
780

    
781
  def setUp(self):
782
    testutils.GanetiTestCase.setUp(self)
783

    
784
  def testDefault(self):
785
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
786
    self.assertEqual(len(data), 27)
787
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
788

    
789
  def testNotStrict(self):
790
    data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
791
    self.assertEqual(len(data), 27)
792
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
793

    
794
  def testStrictFailure(self):
795
    self.assertRaises(errors.GenericError, ReadOneLineFile,
796
                      self._TestDataFilename("cert1.pem"), strict=True)
797

    
798
  def testLongLine(self):
799
    dummydata = (1024 * "Hello World! ")
800
    myfile = self._CreateTempFile()
801
    utils.WriteFile(myfile, data=dummydata)
802
    datastrict = ReadOneLineFile(myfile, strict=True)
803
    datalax = ReadOneLineFile(myfile, strict=False)
804
    self.assertEqual(dummydata, datastrict)
805
    self.assertEqual(dummydata, datalax)
806

    
807
  def testNewline(self):
808
    myfile = self._CreateTempFile()
809
    myline = "myline"
810
    for nl in ["", "\n", "\r\n"]:
811
      dummydata = "%s%s" % (myline, nl)
812
      utils.WriteFile(myfile, data=dummydata)
813
      datalax = ReadOneLineFile(myfile, strict=False)
814
      self.assertEqual(myline, datalax)
815
      datastrict = ReadOneLineFile(myfile, strict=True)
816
      self.assertEqual(myline, datastrict)
817

    
818
  def testWhitespaceAndMultipleLines(self):
819
    myfile = self._CreateTempFile()
820
    for nl in ["", "\n", "\r\n"]:
821
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
822
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
823
        utils.WriteFile(myfile, data=dummydata)
824
        datalax = ReadOneLineFile(myfile, strict=False)
825
        if nl:
826
          self.assert_(set("\r\n") & set(dummydata))
827
          self.assertRaises(errors.GenericError, ReadOneLineFile,
828
                            myfile, strict=True)
829
          explen = len("Foo bar baz ") + len(ws)
830
          self.assertEqual(len(datalax), explen)
831
          self.assertEqual(datalax, dummydata[:explen])
832
          self.assertFalse(set("\r\n") & set(datalax))
833
        else:
834
          datastrict = ReadOneLineFile(myfile, strict=True)
835
          self.assertEqual(dummydata, datastrict)
836
          self.assertEqual(dummydata, datalax)
837

    
838
  def testEmptylines(self):
839
    myfile = self._CreateTempFile()
840
    myline = "myline"
841
    for nl in ["\n", "\r\n"]:
842
      for ol in ["", "otherline"]:
843
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
844
        utils.WriteFile(myfile, data=dummydata)
845
        self.assert_(set("\r\n") & set(dummydata))
846
        datalax = ReadOneLineFile(myfile, strict=False)
847
        self.assertEqual(myline, datalax)
848
        if ol:
849
          self.assertRaises(errors.GenericError, ReadOneLineFile,
850
                            myfile, strict=True)
851
        else:
852
          datastrict = ReadOneLineFile(myfile, strict=True)
853
          self.assertEqual(myline, datastrict)
854

    
855

    
856
class TestTimestampForFilename(unittest.TestCase):
857
  def test(self):
858
    self.assert_("." not in utils.TimestampForFilename())
859
    self.assert_(":" not in utils.TimestampForFilename())
860

    
861

    
862
class TestCreateBackup(testutils.GanetiTestCase):
863
  def setUp(self):
864
    testutils.GanetiTestCase.setUp(self)
865

    
866
    self.tmpdir = tempfile.mkdtemp()
867

    
868
  def tearDown(self):
869
    testutils.GanetiTestCase.tearDown(self)
870

    
871
    shutil.rmtree(self.tmpdir)
872

    
873
  def testEmpty(self):
874
    filename = PathJoin(self.tmpdir, "config.data")
875
    utils.WriteFile(filename, data="")
876
    bname = utils.CreateBackup(filename)
877
    self.assertFileContent(bname, "")
878
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
879
    utils.CreateBackup(filename)
880
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
881
    utils.CreateBackup(filename)
882
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
883

    
884
    fifoname = PathJoin(self.tmpdir, "fifo")
885
    os.mkfifo(fifoname)
886
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
887

    
888
  def testContent(self):
889
    bkpcount = 0
890
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
891
      for rep in [1, 2, 10, 127]:
892
        testdata = data * rep
893

    
894
        filename = PathJoin(self.tmpdir, "test.data_")
895
        utils.WriteFile(filename, data=testdata)
896
        self.assertFileContent(filename, testdata)
897

    
898
        for _ in range(3):
899
          bname = utils.CreateBackup(filename)
900
          bkpcount += 1
901
          self.assertFileContent(bname, testdata)
902
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
903

    
904

    
905
class TestFormatUnit(unittest.TestCase):
906
  """Test case for the FormatUnit function"""
907

    
908
  def testMiB(self):
909
    self.assertEqual(FormatUnit(1, 'h'), '1M')
910
    self.assertEqual(FormatUnit(100, 'h'), '100M')
911
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
912

    
913
    self.assertEqual(FormatUnit(1, 'm'), '1')
914
    self.assertEqual(FormatUnit(100, 'm'), '100')
915
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
916

    
917
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
918
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
919
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
920
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
921

    
922
  def testGiB(self):
923
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
924
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
925
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
926
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
927

    
928
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
929
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
930
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
931
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
932

    
933
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
934
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
935
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
936

    
937
  def testTiB(self):
938
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
939
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
940
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
941

    
942
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
943
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
944
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
945

    
946

    
947
class TestParseUnit(unittest.TestCase):
948
  """Test case for the ParseUnit function"""
949

    
950
  SCALES = (('', 1),
951
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
952
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
953
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
954

    
955
  def testRounding(self):
956
    self.assertEqual(ParseUnit('0'), 0)
957
    self.assertEqual(ParseUnit('1'), 4)
958
    self.assertEqual(ParseUnit('2'), 4)
959
    self.assertEqual(ParseUnit('3'), 4)
960

    
961
    self.assertEqual(ParseUnit('124'), 124)
962
    self.assertEqual(ParseUnit('125'), 128)
963
    self.assertEqual(ParseUnit('126'), 128)
964
    self.assertEqual(ParseUnit('127'), 128)
965
    self.assertEqual(ParseUnit('128'), 128)
966
    self.assertEqual(ParseUnit('129'), 132)
967
    self.assertEqual(ParseUnit('130'), 132)
968

    
969
  def testFloating(self):
970
    self.assertEqual(ParseUnit('0'), 0)
971
    self.assertEqual(ParseUnit('0.5'), 4)
972
    self.assertEqual(ParseUnit('1.75'), 4)
973
    self.assertEqual(ParseUnit('1.99'), 4)
974
    self.assertEqual(ParseUnit('2.00'), 4)
975
    self.assertEqual(ParseUnit('2.01'), 4)
976
    self.assertEqual(ParseUnit('3.99'), 4)
977
    self.assertEqual(ParseUnit('4.00'), 4)
978
    self.assertEqual(ParseUnit('4.01'), 8)
979
    self.assertEqual(ParseUnit('1.5G'), 1536)
980
    self.assertEqual(ParseUnit('1.8G'), 1844)
981
    self.assertEqual(ParseUnit('8.28T'), 8682212)
982

    
983
  def testSuffixes(self):
984
    for sep in ('', ' ', '   ', "\t", "\t "):
985
      for suffix, scale in TestParseUnit.SCALES:
986
        for func in (lambda x: x, str.lower, str.upper):
987
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
988
                           1024 * scale)
989

    
990
  def testInvalidInput(self):
991
    for sep in ('-', '_', ',', 'a'):
992
      for suffix, _ in TestParseUnit.SCALES:
993
        self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
994

    
995
    for suffix, _ in TestParseUnit.SCALES:
996
      self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
997

    
998

    
999
class TestParseCpuMask(unittest.TestCase):
1000
  """Test case for the ParseCpuMask function."""
1001

    
1002
  def testWellFormed(self):
1003
    self.assertEqual(utils.ParseCpuMask(""), [])
1004
    self.assertEqual(utils.ParseCpuMask("1"), [1])
1005
    self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
1006

    
1007
  def testInvalidInput(self):
1008
    self.assertRaises(errors.ParseError,
1009
                      utils.ParseCpuMask,
1010
                      "garbage")
1011
    self.assertRaises(errors.ParseError,
1012
                      utils.ParseCpuMask,
1013
                      "0,")
1014
    self.assertRaises(errors.ParseError,
1015
                      utils.ParseCpuMask,
1016
                      "0-1-2")
1017
    self.assertRaises(errors.ParseError,
1018
                      utils.ParseCpuMask,
1019
                      "2-1")
1020

    
1021
class TestSshKeys(testutils.GanetiTestCase):
1022
  """Test case for the AddAuthorizedKey function"""
1023

    
1024
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1025
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
1026
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1027

    
1028
  def setUp(self):
1029
    testutils.GanetiTestCase.setUp(self)
1030
    self.tmpname = self._CreateTempFile()
1031
    handle = open(self.tmpname, 'w')
1032
    try:
1033
      handle.write("%s\n" % TestSshKeys.KEY_A)
1034
      handle.write("%s\n" % TestSshKeys.KEY_B)
1035
    finally:
1036
      handle.close()
1037

    
1038
  def testAddingNewKey(self):
1039
    utils.AddAuthorizedKey(self.tmpname,
1040
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1041

    
1042
    self.assertFileContent(self.tmpname,
1043
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1044
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1045
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1046
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1047

    
1048
  def testAddingAlmostButNotCompletelyTheSameKey(self):
1049
    utils.AddAuthorizedKey(self.tmpname,
1050
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1051

    
1052
    self.assertFileContent(self.tmpname,
1053
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1054
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1055
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1056
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1057

    
1058
  def testAddingExistingKeyWithSomeMoreSpaces(self):
1059
    utils.AddAuthorizedKey(self.tmpname,
1060
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1061

    
1062
    self.assertFileContent(self.tmpname,
1063
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1064
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1065
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1066

    
1067
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
1068
    utils.RemoveAuthorizedKey(self.tmpname,
1069
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
1070

    
1071
    self.assertFileContent(self.tmpname,
1072
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1073
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1074

    
1075
  def testRemovingNonExistingKey(self):
1076
    utils.RemoveAuthorizedKey(self.tmpname,
1077
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
1078

    
1079
    self.assertFileContent(self.tmpname,
1080
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1081
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1082
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1083

    
1084

    
1085
class TestEtcHosts(testutils.GanetiTestCase):
1086
  """Test functions modifying /etc/hosts"""
1087

    
1088
  def setUp(self):
1089
    testutils.GanetiTestCase.setUp(self)
1090
    self.tmpname = self._CreateTempFile()
1091
    handle = open(self.tmpname, 'w')
1092
    try:
1093
      handle.write('# This is a test file for /etc/hosts\n')
1094
      handle.write('127.0.0.1\tlocalhost\n')
1095
      handle.write('192.0.2.1 router gw\n')
1096
    finally:
1097
      handle.close()
1098

    
1099
  def testSettingNewIp(self):
1100
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
1101
                     ['myhost'])
1102

    
1103
    self.assertFileContent(self.tmpname,
1104
      "# This is a test file for /etc/hosts\n"
1105
      "127.0.0.1\tlocalhost\n"
1106
      "192.0.2.1 router gw\n"
1107
      "198.51.100.4\tmyhost.example.com myhost\n")
1108
    self.assertFileMode(self.tmpname, 0644)
1109

    
1110
  def testSettingExistingIp(self):
1111
    SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
1112
                     ['myhost'])
1113

    
1114
    self.assertFileContent(self.tmpname,
1115
      "# This is a test file for /etc/hosts\n"
1116
      "127.0.0.1\tlocalhost\n"
1117
      "192.0.2.1\tmyhost.example.com myhost\n")
1118
    self.assertFileMode(self.tmpname, 0644)
1119

    
1120
  def testSettingDuplicateName(self):
1121
    SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
1122

    
1123
    self.assertFileContent(self.tmpname,
1124
      "# This is a test file for /etc/hosts\n"
1125
      "127.0.0.1\tlocalhost\n"
1126
      "192.0.2.1 router gw\n"
1127
      "198.51.100.4\tmyhost\n")
1128
    self.assertFileMode(self.tmpname, 0644)
1129

    
1130
  def testRemovingExistingHost(self):
1131
    RemoveEtcHostsEntry(self.tmpname, 'router')
1132

    
1133
    self.assertFileContent(self.tmpname,
1134
      "# This is a test file for /etc/hosts\n"
1135
      "127.0.0.1\tlocalhost\n"
1136
      "192.0.2.1 gw\n")
1137
    self.assertFileMode(self.tmpname, 0644)
1138

    
1139
  def testRemovingSingleExistingHost(self):
1140
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
1141

    
1142
    self.assertFileContent(self.tmpname,
1143
      "# This is a test file for /etc/hosts\n"
1144
      "192.0.2.1 router gw\n")
1145
    self.assertFileMode(self.tmpname, 0644)
1146

    
1147
  def testRemovingNonExistingHost(self):
1148
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
1149

    
1150
    self.assertFileContent(self.tmpname,
1151
      "# This is a test file for /etc/hosts\n"
1152
      "127.0.0.1\tlocalhost\n"
1153
      "192.0.2.1 router gw\n")
1154
    self.assertFileMode(self.tmpname, 0644)
1155

    
1156
  def testRemovingAlias(self):
1157
    RemoveEtcHostsEntry(self.tmpname, 'gw')
1158

    
1159
    self.assertFileContent(self.tmpname,
1160
      "# This is a test file for /etc/hosts\n"
1161
      "127.0.0.1\tlocalhost\n"
1162
      "192.0.2.1 router\n")
1163
    self.assertFileMode(self.tmpname, 0644)
1164

    
1165

    
1166
class TestGetMounts(unittest.TestCase):
1167
  """Test case for GetMounts()."""
1168

    
1169
  TESTDATA = (
1170
    "rootfs /     rootfs rw 0 0\n"
1171
    "none   /sys  sysfs  rw,nosuid,nodev,noexec,relatime 0 0\n"
1172
    "none   /proc proc   rw,nosuid,nodev,noexec,relatime 0 0\n")
1173

    
1174
  def setUp(self):
1175
    self.tmpfile = tempfile.NamedTemporaryFile()
1176
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1177

    
1178
  def testGetMounts(self):
1179
    self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1180
      [
1181
        ("rootfs", "/", "rootfs", "rw"),
1182
        ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1183
        ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1184
      ])
1185

    
1186

    
1187
class TestShellQuoting(unittest.TestCase):
1188
  """Test case for shell quoting functions"""
1189

    
1190
  def testShellQuote(self):
1191
    self.assertEqual(ShellQuote('abc'), "abc")
1192
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1193
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1194
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
1195
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1196

    
1197
  def testShellQuoteArgs(self):
1198
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1199
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1200
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1201

    
1202

    
1203
class TestListVisibleFiles(unittest.TestCase):
1204
  """Test case for ListVisibleFiles"""
1205

    
1206
  def setUp(self):
1207
    self.path = tempfile.mkdtemp()
1208

    
1209
  def tearDown(self):
1210
    shutil.rmtree(self.path)
1211

    
1212
  def _CreateFiles(self, files):
1213
    for name in files:
1214
      utils.WriteFile(os.path.join(self.path, name), data="test")
1215

    
1216
  def _test(self, files, expected):
1217
    self._CreateFiles(files)
1218
    found = ListVisibleFiles(self.path)
1219
    self.assertEqual(set(found), set(expected))
1220

    
1221
  def testAllVisible(self):
1222
    files = ["a", "b", "c"]
1223
    expected = files
1224
    self._test(files, expected)
1225

    
1226
  def testNoneVisible(self):
1227
    files = [".a", ".b", ".c"]
1228
    expected = []
1229
    self._test(files, expected)
1230

    
1231
  def testSomeVisible(self):
1232
    files = ["a", "b", ".c"]
1233
    expected = ["a", "b"]
1234
    self._test(files, expected)
1235

    
1236
  def testNonAbsolutePath(self):
1237
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1238

    
1239
  def testNonNormalizedPath(self):
1240
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1241
                          "/bin/../tmp")
1242

    
1243

    
1244
class TestNewUUID(unittest.TestCase):
1245
  """Test case for NewUUID"""
1246

    
1247
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1248
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
1249

    
1250
  def runTest(self):
1251
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
1252

    
1253

    
1254
class TestUniqueSequence(unittest.TestCase):
1255
  """Test case for UniqueSequence"""
1256

    
1257
  def _test(self, input, expected):
1258
    self.assertEqual(utils.UniqueSequence(input), expected)
1259

    
1260
  def runTest(self):
1261
    # Ordered input
1262
    self._test([1, 2, 3], [1, 2, 3])
1263
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1264
    self._test([1, 2, 2, 3], [1, 2, 3])
1265
    self._test([1, 2, 3, 3], [1, 2, 3])
1266

    
1267
    # Unordered input
1268
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1269
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1270

    
1271
    # Strings
1272
    self._test(["a", "a"], ["a"])
1273
    self._test(["a", "b"], ["a", "b"])
1274
    self._test(["a", "b", "a"], ["a", "b"])
1275

    
1276

    
1277
class TestFirstFree(unittest.TestCase):
1278
  """Test case for the FirstFree function"""
1279

    
1280
  def test(self):
1281
    """Test FirstFree"""
1282
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1283
    self.failUnlessEqual(FirstFree([]), None)
1284
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1285
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1286
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1287

    
1288

    
1289
class TestTailFile(testutils.GanetiTestCase):
1290
  """Test case for the TailFile function"""
1291

    
1292
  def testEmpty(self):
1293
    fname = self._CreateTempFile()
1294
    self.failUnlessEqual(TailFile(fname), [])
1295
    self.failUnlessEqual(TailFile(fname, lines=25), [])
1296

    
1297
  def testAllLines(self):
1298
    data = ["test %d" % i for i in range(30)]
1299
    for i in range(30):
1300
      fname = self._CreateTempFile()
1301
      fd = open(fname, "w")
1302
      fd.write("\n".join(data[:i]))
1303
      if i > 0:
1304
        fd.write("\n")
1305
      fd.close()
1306
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1307

    
1308
  def testPartialLines(self):
1309
    data = ["test %d" % i for i in range(30)]
1310
    fname = self._CreateTempFile()
1311
    fd = open(fname, "w")
1312
    fd.write("\n".join(data))
1313
    fd.write("\n")
1314
    fd.close()
1315
    for i in range(1, 30):
1316
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1317

    
1318
  def testBigFile(self):
1319
    data = ["test %d" % i for i in range(30)]
1320
    fname = self._CreateTempFile()
1321
    fd = open(fname, "w")
1322
    fd.write("X" * 1048576)
1323
    fd.write("\n")
1324
    fd.write("\n".join(data))
1325
    fd.write("\n")
1326
    fd.close()
1327
    for i in range(1, 30):
1328
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1329

    
1330

    
1331
class _BaseFileLockTest:
1332
  """Test case for the FileLock class"""
1333

    
1334
  def testSharedNonblocking(self):
1335
    self.lock.Shared(blocking=False)
1336
    self.lock.Close()
1337

    
1338
  def testExclusiveNonblocking(self):
1339
    self.lock.Exclusive(blocking=False)
1340
    self.lock.Close()
1341

    
1342
  def testUnlockNonblocking(self):
1343
    self.lock.Unlock(blocking=False)
1344
    self.lock.Close()
1345

    
1346
  def testSharedBlocking(self):
1347
    self.lock.Shared(blocking=True)
1348
    self.lock.Close()
1349

    
1350
  def testExclusiveBlocking(self):
1351
    self.lock.Exclusive(blocking=True)
1352
    self.lock.Close()
1353

    
1354
  def testUnlockBlocking(self):
1355
    self.lock.Unlock(blocking=True)
1356
    self.lock.Close()
1357

    
1358
  def testSharedExclusiveUnlock(self):
1359
    self.lock.Shared(blocking=False)
1360
    self.lock.Exclusive(blocking=False)
1361
    self.lock.Unlock(blocking=False)
1362
    self.lock.Close()
1363

    
1364
  def testExclusiveSharedUnlock(self):
1365
    self.lock.Exclusive(blocking=False)
1366
    self.lock.Shared(blocking=False)
1367
    self.lock.Unlock(blocking=False)
1368
    self.lock.Close()
1369

    
1370
  def testSimpleTimeout(self):
1371
    # These will succeed on the first attempt, hence a short timeout
1372
    self.lock.Shared(blocking=True, timeout=10.0)
1373
    self.lock.Exclusive(blocking=False, timeout=10.0)
1374
    self.lock.Unlock(blocking=True, timeout=10.0)
1375
    self.lock.Close()
1376

    
1377
  @staticmethod
1378
  def _TryLockInner(filename, shared, blocking):
1379
    lock = utils.FileLock.Open(filename)
1380

    
1381
    if shared:
1382
      fn = lock.Shared
1383
    else:
1384
      fn = lock.Exclusive
1385

    
1386
    try:
1387
      # The timeout doesn't really matter as the parent process waits for us to
1388
      # finish anyway.
1389
      fn(blocking=blocking, timeout=0.01)
1390
    except errors.LockError, err:
1391
      return False
1392

    
1393
    return True
1394

    
1395
  def _TryLock(self, *args):
1396
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1397
                                      *args)
1398

    
1399
  def testTimeout(self):
1400
    for blocking in [True, False]:
1401
      self.lock.Exclusive(blocking=True)
1402
      self.failIf(self._TryLock(False, blocking))
1403
      self.failIf(self._TryLock(True, blocking))
1404

    
1405
      self.lock.Shared(blocking=True)
1406
      self.assert_(self._TryLock(True, blocking))
1407
      self.failIf(self._TryLock(False, blocking))
1408

    
1409
  def testCloseShared(self):
1410
    self.lock.Close()
1411
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1412

    
1413
  def testCloseExclusive(self):
1414
    self.lock.Close()
1415
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1416

    
1417
  def testCloseUnlock(self):
1418
    self.lock.Close()
1419
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1420

    
1421

    
1422
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1423
  TESTDATA = "Hello World\n" * 10
1424

    
1425
  def setUp(self):
1426
    testutils.GanetiTestCase.setUp(self)
1427

    
1428
    self.tmpfile = tempfile.NamedTemporaryFile()
1429
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1430
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1431

    
1432
    # Ensure "Open" didn't truncate file
1433
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1434

    
1435
  def tearDown(self):
1436
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1437

    
1438
    testutils.GanetiTestCase.tearDown(self)
1439

    
1440

    
1441
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1442
  def setUp(self):
1443
    self.tmpfile = tempfile.NamedTemporaryFile()
1444
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1445

    
1446

    
1447
class TestTimeFunctions(unittest.TestCase):
1448
  """Test case for time functions"""
1449

    
1450
  def runTest(self):
1451
    self.assertEqual(utils.SplitTime(1), (1, 0))
1452
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1453
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1454
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1455
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1456
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1457
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1458
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1459

    
1460
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1461

    
1462
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1463
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1464
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1465

    
1466
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1467
                     1218448917.481)
1468
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1469

    
1470
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1471
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1472
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1473
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1474
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1475

    
1476

    
1477
class FieldSetTestCase(unittest.TestCase):
1478
  """Test case for FieldSets"""
1479

    
1480
  def testSimpleMatch(self):
1481
    f = utils.FieldSet("a", "b", "c", "def")
1482
    self.failUnless(f.Matches("a"))
1483
    self.failIf(f.Matches("d"), "Substring matched")
1484
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1485
    self.failIf(f.NonMatching(["b", "c"]))
1486
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1487
    self.failUnless(f.NonMatching(["a", "d"]))
1488

    
1489
  def testRegexMatch(self):
1490
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1491
    self.failUnless(f.Matches("b1"))
1492
    self.failUnless(f.Matches("b99"))
1493
    self.failIf(f.Matches("b/1"))
1494
    self.failIf(f.NonMatching(["b12", "c"]))
1495
    self.failUnless(f.NonMatching(["a", "1"]))
1496

    
1497
class TestForceDictType(unittest.TestCase):
1498
  """Test case for ForceDictType"""
1499

    
1500
  def setUp(self):
1501
    self.key_types = {
1502
      'a': constants.VTYPE_INT,
1503
      'b': constants.VTYPE_BOOL,
1504
      'c': constants.VTYPE_STRING,
1505
      'd': constants.VTYPE_SIZE,
1506
      "e": constants.VTYPE_MAYBE_STRING,
1507
      }
1508

    
1509
  def _fdt(self, dict, allowed_values=None):
1510
    if allowed_values is None:
1511
      utils.ForceDictType(dict, self.key_types)
1512
    else:
1513
      utils.ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1514

    
1515
    return dict
1516

    
1517
  def testSimpleDict(self):
1518
    self.assertEqual(self._fdt({}), {})
1519
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1520
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1521
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1522
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1523
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1524
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1525
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1526
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1527
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1528
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1529
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1530
    self.assertEqual(self._fdt({"e": None, }), {"e": None, })
1531
    self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
1532
    self.assertEqual(self._fdt({"e": False, }), {"e": '', })
1533

    
1534
  def testErrors(self):
1535
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1536
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1537
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1538
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1539
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
1540
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
1541

    
1542

    
1543
class TestIsNormAbsPath(unittest.TestCase):
1544
  """Testing case for IsNormAbsPath"""
1545

    
1546
  def _pathTestHelper(self, path, result):
1547
    if result:
1548
      self.assert_(utils.IsNormAbsPath(path),
1549
          "Path %s should result absolute and normalized" % path)
1550
    else:
1551
      self.assertFalse(utils.IsNormAbsPath(path),
1552
          "Path %s should not result absolute and normalized" % path)
1553

    
1554
  def testBase(self):
1555
    self._pathTestHelper('/etc', True)
1556
    self._pathTestHelper('/srv', True)
1557
    self._pathTestHelper('etc', False)
1558
    self._pathTestHelper('/etc/../root', False)
1559
    self._pathTestHelper('/etc/', False)
1560

    
1561

    
1562
class TestSafeEncode(unittest.TestCase):
1563
  """Test case for SafeEncode"""
1564

    
1565
  def testAscii(self):
1566
    for txt in [string.digits, string.letters, string.punctuation]:
1567
      self.failUnlessEqual(txt, SafeEncode(txt))
1568

    
1569
  def testDoubleEncode(self):
1570
    for i in range(255):
1571
      txt = SafeEncode(chr(i))
1572
      self.failUnlessEqual(txt, SafeEncode(txt))
1573

    
1574
  def testUnicode(self):
1575
    # 1024 is high enough to catch non-direct ASCII mappings
1576
    for i in range(1024):
1577
      txt = SafeEncode(unichr(i))
1578
      self.failUnlessEqual(txt, SafeEncode(txt))
1579

    
1580

    
1581
class TestFormatTime(unittest.TestCase):
1582
  """Testing case for FormatTime"""
1583

    
1584
  def testNone(self):
1585
    self.failUnlessEqual(FormatTime(None), "N/A")
1586

    
1587
  def testInvalid(self):
1588
    self.failUnlessEqual(FormatTime(()), "N/A")
1589

    
1590
  def testNow(self):
1591
    # tests that we accept time.time input
1592
    FormatTime(time.time())
1593
    # tests that we accept int input
1594
    FormatTime(int(time.time()))
1595

    
1596

    
1597
class RunInSeparateProcess(unittest.TestCase):
1598
  def test(self):
1599
    for exp in [True, False]:
1600
      def _child():
1601
        return exp
1602

    
1603
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1604

    
1605
  def testArgs(self):
1606
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1607
      def _child(carg1, carg2):
1608
        return carg1 == "Foo" and carg2 == arg
1609

    
1610
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1611

    
1612
  def testPid(self):
1613
    parent_pid = os.getpid()
1614

    
1615
    def _check():
1616
      return os.getpid() == parent_pid
1617

    
1618
    self.failIf(utils.RunInSeparateProcess(_check))
1619

    
1620
  def testSignal(self):
1621
    def _kill():
1622
      os.kill(os.getpid(), signal.SIGTERM)
1623

    
1624
    self.assertRaises(errors.GenericError,
1625
                      utils.RunInSeparateProcess, _kill)
1626

    
1627
  def testException(self):
1628
    def _exc():
1629
      raise errors.GenericError("This is a test")
1630

    
1631
    self.assertRaises(errors.GenericError,
1632
                      utils.RunInSeparateProcess, _exc)
1633

    
1634

    
1635
class TestFingerprintFile(unittest.TestCase):
1636
  def setUp(self):
1637
    self.tmpfile = tempfile.NamedTemporaryFile()
1638

    
1639
  def test(self):
1640
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1641
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1642

    
1643
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1644
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1645
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1646

    
1647

    
1648
class TestUnescapeAndSplit(unittest.TestCase):
1649
  """Testing case for UnescapeAndSplit"""
1650

    
1651
  def setUp(self):
1652
    # testing more that one separator for regexp safety
1653
    self._seps = [",", "+", "."]
1654

    
1655
  def testSimple(self):
1656
    a = ["a", "b", "c", "d"]
1657
    for sep in self._seps:
1658
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1659

    
1660
  def testEscape(self):
1661
    for sep in self._seps:
1662
      a = ["a", "b\\" + sep + "c", "d"]
1663
      b = ["a", "b" + sep + "c", "d"]
1664
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1665

    
1666
  def testDoubleEscape(self):
1667
    for sep in self._seps:
1668
      a = ["a", "b\\\\", "c", "d"]
1669
      b = ["a", "b\\", "c", "d"]
1670
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1671

    
1672
  def testThreeEscape(self):
1673
    for sep in self._seps:
1674
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1675
      b = ["a", "b\\" + sep + "c", "d"]
1676
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1677

    
1678

    
1679
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1680
  def setUp(self):
1681
    self.tmpdir = tempfile.mkdtemp()
1682

    
1683
  def tearDown(self):
1684
    shutil.rmtree(self.tmpdir)
1685

    
1686
  def _checkRsaPrivateKey(self, key):
1687
    lines = key.splitlines()
1688
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1689
            "-----END RSA PRIVATE KEY-----" in lines)
1690

    
1691
  def _checkCertificate(self, cert):
1692
    lines = cert.splitlines()
1693
    return ("-----BEGIN CERTIFICATE-----" in lines and
1694
            "-----END CERTIFICATE-----" in lines)
1695

    
1696
  def test(self):
1697
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1698
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1699
      self._checkRsaPrivateKey(key_pem)
1700
      self._checkCertificate(cert_pem)
1701

    
1702
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1703
                                           key_pem)
1704
      self.assert_(key.bits() >= 1024)
1705
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1706
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1707

    
1708
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1709
                                             cert_pem)
1710
      self.failIf(x509.has_expired())
1711
      self.assertEqual(x509.get_issuer().CN, common_name)
1712
      self.assertEqual(x509.get_subject().CN, common_name)
1713
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1714

    
1715
  def testLegacy(self):
1716
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1717

    
1718
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1719

    
1720
    cert1 = utils.ReadFile(cert1_filename)
1721

    
1722
    self.assert_(self._checkRsaPrivateKey(cert1))
1723
    self.assert_(self._checkCertificate(cert1))
1724

    
1725

    
1726
class TestPathJoin(unittest.TestCase):
1727
  """Testing case for PathJoin"""
1728

    
1729
  def testBasicItems(self):
1730
    mlist = ["/a", "b", "c"]
1731
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1732

    
1733
  def testNonAbsPrefix(self):
1734
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1735

    
1736
  def testBackTrack(self):
1737
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1738

    
1739
  def testMultiAbs(self):
1740
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1741

    
1742

    
1743
class TestValidateServiceName(unittest.TestCase):
1744
  def testValid(self):
1745
    testnames = [
1746
      0, 1, 2, 3, 1024, 65000, 65534, 65535,
1747
      "ganeti",
1748
      "gnt-masterd",
1749
      "HELLO_WORLD_SVC",
1750
      "hello.world.1",
1751
      "0", "80", "1111", "65535",
1752
      ]
1753

    
1754
    for name in testnames:
1755
      self.assertEqual(utils.ValidateServiceName(name), name)
1756

    
1757
  def testInvalid(self):
1758
    testnames = [
1759
      -15756, -1, 65536, 133428083,
1760
      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1761
      "-8546", "-1", "65536",
1762
      (129 * "A"),
1763
      ]
1764

    
1765
    for name in testnames:
1766
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1767

    
1768

    
1769
class TestParseAsn1Generalizedtime(unittest.TestCase):
1770
  def test(self):
1771
    # UTC
1772
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1773
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1774
                     1266860512)
1775
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1776
                     (2**31) - 1)
1777

    
1778
    # With offset
1779
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1780
                     1266860512)
1781
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1782
                     1266931012)
1783
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1784
                     1266931088)
1785
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1786
                     1266931295)
1787
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1788
                     3600)
1789

    
1790
    # Leap seconds are not supported by datetime.datetime
1791
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1792
                      "19841231235960+0000")
1793
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1794
                      "19920630235960+0000")
1795

    
1796
    # Errors
1797
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1798
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1799
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1800
                      "20100222174152")
1801
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1802
                      "Mon Feb 22 17:47:02 UTC 2010")
1803
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1804
                      "2010-02-22 17:42:02")
1805

    
1806

    
1807
class TestGetX509CertValidity(testutils.GanetiTestCase):
1808
  def setUp(self):
1809
    testutils.GanetiTestCase.setUp(self)
1810

    
1811
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1812

    
1813
    # Test whether we have pyOpenSSL 0.7 or above
1814
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1815

    
1816
    if not self.pyopenssl0_7:
1817
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1818
                    " function correctly")
1819

    
1820
  def _LoadCert(self, name):
1821
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1822
                                           self._ReadTestData(name))
1823

    
1824
  def test(self):
1825
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1826
    if self.pyopenssl0_7:
1827
      self.assertEqual(validity, (1266919967, 1267524767))
1828
    else:
1829
      self.assertEqual(validity, (None, None))
1830

    
1831

    
1832
class TestSignX509Certificate(unittest.TestCase):
1833
  KEY = "My private key!"
1834
  KEY_OTHER = "Another key"
1835

    
1836
  def test(self):
1837
    # Generate certificate valid for 5 minutes
1838
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1839

    
1840
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1841
                                           cert_pem)
1842

    
1843
    # No signature at all
1844
    self.assertRaises(errors.GenericError,
1845
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1846

    
1847
    # Invalid input
1848
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1849
                      "", self.KEY)
1850
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1851
                      "X-Ganeti-Signature: \n", self.KEY)
1852
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1853
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1854
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1855
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1856
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1857
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1858

    
1859
    # Invalid salt
1860
    for salt in list("-_@$,:;/\\ \t\n"):
1861
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1862
                        cert_pem, self.KEY, "foo%sbar" % salt)
1863

    
1864
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
1865
                 utils.GenerateSecret(numbytes=4),
1866
                 utils.GenerateSecret(numbytes=16),
1867
                 "{123:456}".encode("hex")]:
1868
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1869

    
1870
      self._Check(cert, salt, signed_pem)
1871

    
1872
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1873
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1874
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1875
                               "lines----\n------ at\nthe end!"))
1876

    
1877
  def _Check(self, cert, salt, pem):
1878
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1879
    self.assertEqual(salt, salt2)
1880
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1881

    
1882
    # Other key
1883
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1884
                      pem, self.KEY_OTHER)
1885

    
1886

    
1887
class TestMakedirs(unittest.TestCase):
1888
  def setUp(self):
1889
    self.tmpdir = tempfile.mkdtemp()
1890

    
1891
  def tearDown(self):
1892
    shutil.rmtree(self.tmpdir)
1893

    
1894
  def testNonExisting(self):
1895
    path = PathJoin(self.tmpdir, "foo")
1896
    utils.Makedirs(path)
1897
    self.assert_(os.path.isdir(path))
1898

    
1899
  def testExisting(self):
1900
    path = PathJoin(self.tmpdir, "foo")
1901
    os.mkdir(path)
1902
    utils.Makedirs(path)
1903
    self.assert_(os.path.isdir(path))
1904

    
1905
  def testRecursiveNonExisting(self):
1906
    path = PathJoin(self.tmpdir, "foo/bar/baz")
1907
    utils.Makedirs(path)
1908
    self.assert_(os.path.isdir(path))
1909

    
1910
  def testRecursiveExisting(self):
1911
    path = PathJoin(self.tmpdir, "B/moo/xyz")
1912
    self.assertFalse(os.path.exists(path))
1913
    os.mkdir(PathJoin(self.tmpdir, "B"))
1914
    utils.Makedirs(path)
1915
    self.assert_(os.path.isdir(path))
1916

    
1917

    
1918
class TestRetry(testutils.GanetiTestCase):
1919
  def setUp(self):
1920
    testutils.GanetiTestCase.setUp(self)
1921
    self.retries = 0
1922

    
1923
  @staticmethod
1924
  def _RaiseRetryAgain():
1925
    raise utils.RetryAgain()
1926

    
1927
  @staticmethod
1928
  def _RaiseRetryAgainWithArg(args):
1929
    raise utils.RetryAgain(*args)
1930

    
1931
  def _WrongNestedLoop(self):
1932
    return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
1933

    
1934
  def _RetryAndSucceed(self, retries):
1935
    if self.retries < retries:
1936
      self.retries += 1
1937
      raise utils.RetryAgain()
1938
    else:
1939
      return True
1940

    
1941
  def testRaiseTimeout(self):
1942
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1943
                          self._RaiseRetryAgain, 0.01, 0.02)
1944
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1945
                          self._RetryAndSucceed, 0.01, 0, args=[1])
1946
    self.failUnlessEqual(self.retries, 1)
1947

    
1948
  def testComplete(self):
1949
    self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
1950
    self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
1951
                         True)
1952
    self.failUnlessEqual(self.retries, 2)
1953

    
1954
  def testNestedLoop(self):
1955
    try:
1956
      self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
1957
                            self._WrongNestedLoop, 0, 1)
1958
    except utils.RetryTimeout:
1959
      self.fail("Didn't detect inner loop's exception")
1960

    
1961
  def testTimeoutArgument(self):
1962
    retry_arg="my_important_debugging_message"
1963
    try:
1964
      utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
1965
    except utils.RetryTimeout, err:
1966
      self.failUnlessEqual(err.args, (retry_arg, ))
1967
    else:
1968
      self.fail("Expected timeout didn't happen")
1969

    
1970
  def testRaiseInnerWithExc(self):
1971
    retry_arg="my_important_debugging_message"
1972
    try:
1973
      try:
1974
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1975
                    args=[[errors.GenericError(retry_arg, retry_arg)]])
1976
      except utils.RetryTimeout, err:
1977
        err.RaiseInner()
1978
      else:
1979
        self.fail("Expected timeout didn't happen")
1980
    except errors.GenericError, err:
1981
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1982
    else:
1983
      self.fail("Expected GenericError didn't happen")
1984

    
1985
  def testRaiseInnerWithMsg(self):
1986
    retry_arg="my_important_debugging_message"
1987
    try:
1988
      try:
1989
        utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1990
                    args=[[retry_arg, retry_arg]])
1991
      except utils.RetryTimeout, err:
1992
        err.RaiseInner()
1993
      else:
1994
        self.fail("Expected timeout didn't happen")
1995
    except utils.RetryTimeout, err:
1996
      self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1997
    else:
1998
      self.fail("Expected RetryTimeout didn't happen")
1999

    
2000

    
2001
class TestLineSplitter(unittest.TestCase):
2002
  def test(self):
2003
    lines = []
2004
    ls = utils.LineSplitter(lines.append)
2005
    ls.write("Hello World\n")
2006
    self.assertEqual(lines, [])
2007
    ls.write("Foo\n Bar\r\n ")
2008
    ls.write("Baz")
2009
    ls.write("Moo")
2010
    self.assertEqual(lines, [])
2011
    ls.flush()
2012
    self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
2013
    ls.close()
2014
    self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
2015

    
2016
  def _testExtra(self, line, all_lines, p1, p2):
2017
    self.assertEqual(p1, 999)
2018
    self.assertEqual(p2, "extra")
2019
    all_lines.append(line)
2020

    
2021
  def testExtraArgsNoFlush(self):
2022
    lines = []
2023
    ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
2024
    ls.write("\n\nHello World\n")
2025
    ls.write("Foo\n Bar\r\n ")
2026
    ls.write("")
2027
    ls.write("Baz")
2028
    ls.write("Moo\n\nx\n")
2029
    self.assertEqual(lines, [])
2030
    ls.close()
2031
    self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2032
                             "", "x"])
2033

    
2034

    
2035
class TestReadLockedPidFile(unittest.TestCase):
2036
  def setUp(self):
2037
    self.tmpdir = tempfile.mkdtemp()
2038

    
2039
  def tearDown(self):
2040
    shutil.rmtree(self.tmpdir)
2041

    
2042
  def testNonExistent(self):
2043
    path = PathJoin(self.tmpdir, "nonexist")
2044
    self.assert_(utils.ReadLockedPidFile(path) is None)
2045

    
2046
  def testUnlocked(self):
2047
    path = PathJoin(self.tmpdir, "pid")
2048
    utils.WriteFile(path, data="123")
2049
    self.assert_(utils.ReadLockedPidFile(path) is None)
2050

    
2051
  def testLocked(self):
2052
    path = PathJoin(self.tmpdir, "pid")
2053
    utils.WriteFile(path, data="123")
2054

    
2055
    fl = utils.FileLock.Open(path)
2056
    try:
2057
      fl.Exclusive(blocking=True)
2058

    
2059
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
2060
    finally:
2061
      fl.Close()
2062

    
2063
    self.assert_(utils.ReadLockedPidFile(path) is None)
2064

    
2065
  def testError(self):
2066
    path = PathJoin(self.tmpdir, "foobar", "pid")
2067
    utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2068
    # open(2) should return ENOTDIR
2069
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2070

    
2071

    
2072
class TestCertVerification(testutils.GanetiTestCase):
2073
  def setUp(self):
2074
    testutils.GanetiTestCase.setUp(self)
2075

    
2076
    self.tmpdir = tempfile.mkdtemp()
2077

    
2078
  def tearDown(self):
2079
    shutil.rmtree(self.tmpdir)
2080

    
2081
  def testVerifyCertificate(self):
2082
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2083
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2084
                                           cert_pem)
2085

    
2086
    # Not checking return value as this certificate is expired
2087
    utils.VerifyX509Certificate(cert, 30, 7)
2088

    
2089

    
2090
class TestVerifyCertificateInner(unittest.TestCase):
2091
  def test(self):
2092
    vci = utils._VerifyCertificateInner
2093

    
2094
    # Valid
2095
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2096
                     (None, None))
2097

    
2098
    # Not yet valid
2099
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2100
    self.assertEqual(errcode, utils.CERT_WARNING)
2101

    
2102
    # Expiring soon
2103
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2104
    self.assertEqual(errcode, utils.CERT_ERROR)
2105

    
2106
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2107
    self.assertEqual(errcode, utils.CERT_WARNING)
2108

    
2109
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2110
    self.assertEqual(errcode, None)
2111

    
2112
    # Expired
2113
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2114
    self.assertEqual(errcode, utils.CERT_ERROR)
2115

    
2116
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2117
    self.assertEqual(errcode, utils.CERT_ERROR)
2118

    
2119
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2120
    self.assertEqual(errcode, utils.CERT_ERROR)
2121

    
2122
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2123
    self.assertEqual(errcode, utils.CERT_ERROR)
2124

    
2125

    
2126
class TestHmacFunctions(unittest.TestCase):
2127
  # Digests can be checked with "openssl sha1 -hmac $key"
2128
  def testSha1Hmac(self):
2129
    self.assertEqual(utils.Sha1Hmac("", ""),
2130
                     "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2131
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2132
                     "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2133
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2134
                     "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2135

    
2136
    longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2137
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2138
                     "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2139

    
2140
  def testSha1HmacSalt(self):
2141
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2142
                     "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2143
    self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2144
                     "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2145
    self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2146
                     "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2147

    
2148
  def testVerifySha1Hmac(self):
2149
    self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2150
                                               "7d64b71fb76370690e1d")))
2151
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2152
                                      ("f904c2476527c6d3e660"
2153
                                       "9ab683c66fa0652cb1dc")))
2154

    
2155
    digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2156
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2157
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2158
                                      digest.lower()))
2159
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2160
                                      digest.upper()))
2161
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2162
                                      digest.title()))
2163

    
2164
  def testVerifySha1HmacSalt(self):
2165
    self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2166
                                      ("17a4adc34d69c0d367d4"
2167
                                       "ffbef96fd41d4df7a6e8"),
2168
                                      salt="abc9"))
2169
    self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2170
                                      ("7f264f8114c9066afc9b"
2171
                                       "b7636e1786d996d3cc0d"),
2172
                                      salt="xyz0"))
2173

    
2174

    
2175
class TestIgnoreSignals(unittest.TestCase):
2176
  """Test the IgnoreSignals decorator"""
2177

    
2178
  @staticmethod
2179
  def _Raise(exception):
2180
    raise exception
2181

    
2182
  @staticmethod
2183
  def _Return(rval):
2184
    return rval
2185

    
2186
  def testIgnoreSignals(self):
2187
    sock_err_intr = socket.error(errno.EINTR, "Message")
2188
    sock_err_inval = socket.error(errno.EINVAL, "Message")
2189

    
2190
    env_err_intr = EnvironmentError(errno.EINTR, "Message")
2191
    env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2192

    
2193
    self.assertRaises(socket.error, self._Raise, sock_err_intr)
2194
    self.assertRaises(socket.error, self._Raise, sock_err_inval)
2195
    self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2196
    self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2197

    
2198
    self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2199
    self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2200
    self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2201
                      sock_err_inval)
2202
    self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2203
                      env_err_inval)
2204

    
2205
    self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2206
    self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2207

    
2208

    
2209
class TestEnsureDirs(unittest.TestCase):
2210
  """Tests for EnsureDirs"""
2211

    
2212
  def setUp(self):
2213
    self.dir = tempfile.mkdtemp()
2214
    self.old_umask = os.umask(0777)
2215

    
2216
  def testEnsureDirs(self):
2217
    utils.EnsureDirs([
2218
        (PathJoin(self.dir, "foo"), 0777),
2219
        (PathJoin(self.dir, "bar"), 0000),
2220
        ])
2221
    self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2222
    self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2223

    
2224
  def tearDown(self):
2225
    os.rmdir(PathJoin(self.dir, "foo"))
2226
    os.rmdir(PathJoin(self.dir, "bar"))
2227
    os.rmdir(self.dir)
2228
    os.umask(self.old_umask)
2229

    
2230

    
2231
class TestFormatSeconds(unittest.TestCase):
2232
  def test(self):
2233
    self.assertEqual(utils.FormatSeconds(1), "1s")
2234
    self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2235
    self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2236
    self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2237
    self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2238
    self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2239
    self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2240
    self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2241
    self.assertEqual(utils.FormatSeconds(-1), "-1s")
2242
    self.assertEqual(utils.FormatSeconds(-282), "-282s")
2243
    self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2244

    
2245
  def testFloat(self):
2246
    self.assertEqual(utils.FormatSeconds(1.3), "1s")
2247
    self.assertEqual(utils.FormatSeconds(1.9), "2s")
2248
    self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2249
    self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2250

    
2251

    
2252
class TestIgnoreProcessNotFound(unittest.TestCase):
2253
  @staticmethod
2254
  def _WritePid(fd):
2255
    os.write(fd, str(os.getpid()))
2256
    os.close(fd)
2257
    return True
2258

    
2259
  def test(self):
2260
    (pid_read_fd, pid_write_fd) = os.pipe()
2261

    
2262
    # Start short-lived process which writes its PID to pipe
2263
    self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2264
    os.close(pid_write_fd)
2265

    
2266
    # Read PID from pipe
2267
    pid = int(os.read(pid_read_fd, 1024))
2268
    os.close(pid_read_fd)
2269

    
2270
    # Try to send signal to process which exited recently
2271
    self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2272

    
2273

    
2274
class TestShellWriter(unittest.TestCase):
2275
  def test(self):
2276
    buf = StringIO()
2277
    sw = utils.ShellWriter(buf)
2278
    sw.Write("#!/bin/bash")
2279
    sw.Write("if true; then")
2280
    sw.IncIndent()
2281
    try:
2282
      sw.Write("echo true")
2283

    
2284
      sw.Write("for i in 1 2 3")
2285
      sw.Write("do")
2286
      sw.IncIndent()
2287
      try:
2288
        self.assertEqual(sw._indent, 2)
2289
        sw.Write("date")
2290
      finally:
2291
        sw.DecIndent()
2292
      sw.Write("done")
2293
    finally:
2294
      sw.DecIndent()
2295
    sw.Write("echo %s", utils.ShellQuote("Hello World"))
2296
    sw.Write("exit 0")
2297

    
2298
    self.assertEqual(sw._indent, 0)
2299

    
2300
    output = buf.getvalue()
2301

    
2302
    self.assert_(output.endswith("\n"))
2303

    
2304
    lines = output.splitlines()
2305
    self.assertEqual(len(lines), 9)
2306
    self.assertEqual(lines[0], "#!/bin/bash")
2307
    self.assert_(re.match(r"^\s+date$", lines[5]))
2308
    self.assertEqual(lines[7], "echo 'Hello World'")
2309

    
2310
  def testEmpty(self):
2311
    buf = StringIO()
2312
    sw = utils.ShellWriter(buf)
2313
    sw = None
2314
    self.assertEqual(buf.getvalue(), "")
2315

    
2316

    
2317
if __name__ == '__main__':
2318
  testutils.GanetiTestProgram()