Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ debed9ae

History | View | Annotate | Download (62.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import stat
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36
import string
37
import fcntl
38
import OpenSSL
39
import warnings
40
import distutils.version
41
import glob
42

    
43
import ganeti
44
import testutils
45
from ganeti import constants
46
from ganeti import utils
47
from ganeti import errors
48
from ganeti.utils import IsProcessAlive, RunCmd, \
49
     RemoveFile, MatchNameComponent, FormatUnit, \
50
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
51
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
52
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
53
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
54
     UnescapeAndSplit, RunParts, PathJoin, HostInfo
55

    
56
from ganeti.errors import LockError, UnitParseError, GenericError, \
57
     ProgrammerError, OpPrereqError
58

    
59

    
60
class TestIsProcessAlive(unittest.TestCase):
61
  """Testing case for IsProcessAlive"""
62

    
63
  def testExists(self):
64
    mypid = os.getpid()
65
    self.assert_(IsProcessAlive(mypid),
66
                 "can't find myself running")
67

    
68
  def testNotExisting(self):
69
    pid_non_existing = os.fork()
70
    if pid_non_existing == 0:
71
      os._exit(0)
72
    elif pid_non_existing < 0:
73
      raise SystemError("can't fork")
74
    os.waitpid(pid_non_existing, 0)
75
    self.assert_(not IsProcessAlive(pid_non_existing),
76
                 "nonexisting process detected")
77

    
78

    
79
class TestPidFileFunctions(unittest.TestCase):
80
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
81

    
82
  def setUp(self):
83
    self.dir = tempfile.mkdtemp()
84
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
85
    utils.DaemonPidFileName = self.f_dpn
86

    
87
  def testPidFileFunctions(self):
88
    pid_file = self.f_dpn('test')
89
    utils.WritePidFile('test')
90
    self.failUnless(os.path.exists(pid_file),
91
                    "PID file should have been created")
92
    read_pid = utils.ReadPidFile(pid_file)
93
    self.failUnlessEqual(read_pid, os.getpid())
94
    self.failUnless(utils.IsProcessAlive(read_pid))
95
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
96
    utils.RemovePidFile('test')
97
    self.failIf(os.path.exists(pid_file),
98
                "PID file should not exist anymore")
99
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
100
                         "ReadPidFile should return 0 for missing pid file")
101
    fh = open(pid_file, "w")
102
    fh.write("blah\n")
103
    fh.close()
104
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
105
                         "ReadPidFile should return 0 for invalid pid file")
106
    utils.RemovePidFile('test')
107
    self.failIf(os.path.exists(pid_file),
108
                "PID file should not exist anymore")
109

    
110
  def testKill(self):
111
    pid_file = self.f_dpn('child')
112
    r_fd, w_fd = os.pipe()
113
    new_pid = os.fork()
114
    if new_pid == 0: #child
115
      utils.WritePidFile('child')
116
      os.write(w_fd, 'a')
117
      signal.pause()
118
      os._exit(0)
119
      return
120
    # else we are in the parent
121
    # wait until the child has written the pid file
122
    os.read(r_fd, 1)
123
    read_pid = utils.ReadPidFile(pid_file)
124
    self.failUnlessEqual(read_pid, new_pid)
125
    self.failUnless(utils.IsProcessAlive(new_pid))
126
    utils.KillProcess(new_pid, waitpid=True)
127
    self.failIf(utils.IsProcessAlive(new_pid))
128
    utils.RemovePidFile('child')
129
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
130

    
131
  def tearDown(self):
132
    for name in os.listdir(self.dir):
133
      os.unlink(os.path.join(self.dir, name))
134
    os.rmdir(self.dir)
135

    
136

    
137
class TestRunCmd(testutils.GanetiTestCase):
138
  """Testing case for the RunCmd function"""
139

    
140
  def setUp(self):
141
    testutils.GanetiTestCase.setUp(self)
142
    self.magic = time.ctime() + " ganeti test"
143
    self.fname = self._CreateTempFile()
144

    
145
  def testOk(self):
146
    """Test successful exit code"""
147
    result = RunCmd("/bin/sh -c 'exit 0'")
148
    self.assertEqual(result.exit_code, 0)
149
    self.assertEqual(result.output, "")
150

    
151
  def testFail(self):
152
    """Test fail exit code"""
153
    result = RunCmd("/bin/sh -c 'exit 1'")
154
    self.assertEqual(result.exit_code, 1)
155
    self.assertEqual(result.output, "")
156

    
157
  def testStdout(self):
158
    """Test standard output"""
159
    cmd = 'echo -n "%s"' % self.magic
160
    result = RunCmd("/bin/sh -c '%s'" % cmd)
161
    self.assertEqual(result.stdout, self.magic)
162
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
163
    self.assertEqual(result.output, "")
164
    self.assertFileContent(self.fname, self.magic)
165

    
166
  def testStderr(self):
167
    """Test standard error"""
168
    cmd = 'echo -n "%s"' % self.magic
169
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
170
    self.assertEqual(result.stderr, self.magic)
171
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
172
    self.assertEqual(result.output, "")
173
    self.assertFileContent(self.fname, self.magic)
174

    
175
  def testCombined(self):
176
    """Test combined output"""
177
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
178
    expected = "A" + self.magic + "B" + self.magic
179
    result = RunCmd("/bin/sh -c '%s'" % cmd)
180
    self.assertEqual(result.output, expected)
181
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
182
    self.assertEqual(result.output, "")
183
    self.assertFileContent(self.fname, expected)
184

    
185
  def testSignal(self):
186
    """Test signal"""
187
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
188
    self.assertEqual(result.signal, 15)
189
    self.assertEqual(result.output, "")
190

    
191
  def testListRun(self):
192
    """Test list runs"""
193
    result = RunCmd(["true"])
194
    self.assertEqual(result.signal, None)
195
    self.assertEqual(result.exit_code, 0)
196
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
197
    self.assertEqual(result.signal, None)
198
    self.assertEqual(result.exit_code, 1)
199
    result = RunCmd(["echo", "-n", self.magic])
200
    self.assertEqual(result.signal, None)
201
    self.assertEqual(result.exit_code, 0)
202
    self.assertEqual(result.stdout, self.magic)
203

    
204
  def testFileEmptyOutput(self):
205
    """Test file output"""
206
    result = RunCmd(["true"], output=self.fname)
207
    self.assertEqual(result.signal, None)
208
    self.assertEqual(result.exit_code, 0)
209
    self.assertFileContent(self.fname, "")
210

    
211
  def testLang(self):
212
    """Test locale environment"""
213
    old_env = os.environ.copy()
214
    try:
215
      os.environ["LANG"] = "en_US.UTF-8"
216
      os.environ["LC_ALL"] = "en_US.UTF-8"
217
      result = RunCmd(["locale"])
218
      for line in result.output.splitlines():
219
        key, value = line.split("=", 1)
220
        # Ignore these variables, they're overridden by LC_ALL
221
        if key == "LANG" or key == "LANGUAGE":
222
          continue
223
        self.failIf(value and value != "C" and value != '"C"',
224
            "Variable %s is set to the invalid value '%s'" % (key, value))
225
    finally:
226
      os.environ = old_env
227

    
228
  def testDefaultCwd(self):
229
    """Test default working directory"""
230
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
231

    
232
  def testCwd(self):
233
    """Test default working directory"""
234
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
235
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
236
    cwd = os.getcwd()
237
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
238

    
239
  def testResetEnv(self):
240
    """Test environment reset functionality"""
241
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
242
    self.failUnlessEqual(RunCmd(["env"], reset_env=True,
243
                                env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
244

    
245

    
246
class TestRunParts(unittest.TestCase):
247
  """Testing case for the RunParts function"""
248

    
249
  def setUp(self):
250
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
251

    
252
  def tearDown(self):
253
    shutil.rmtree(self.rundir)
254

    
255
  def testEmpty(self):
256
    """Test on an empty dir"""
257
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
258

    
259
  def testSkipWrongName(self):
260
    """Test that wrong files are skipped"""
261
    fname = os.path.join(self.rundir, "00test.dot")
262
    utils.WriteFile(fname, data="")
263
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
264
    relname = os.path.basename(fname)
265
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
266
                         [(relname, constants.RUNPARTS_SKIP, None)])
267

    
268
  def testSkipNonExec(self):
269
    """Test that non executable files are skipped"""
270
    fname = os.path.join(self.rundir, "00test")
271
    utils.WriteFile(fname, data="")
272
    relname = os.path.basename(fname)
273
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
274
                         [(relname, constants.RUNPARTS_SKIP, None)])
275

    
276
  def testError(self):
277
    """Test error on a broken executable"""
278
    fname = os.path.join(self.rundir, "00test")
279
    utils.WriteFile(fname, data="")
280
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
281
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
282
    self.failUnlessEqual(relname, os.path.basename(fname))
283
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
284
    self.failUnless(error)
285

    
286
  def testSorted(self):
287
    """Test executions are sorted"""
288
    files = []
289
    files.append(os.path.join(self.rundir, "64test"))
290
    files.append(os.path.join(self.rundir, "00test"))
291
    files.append(os.path.join(self.rundir, "42test"))
292

    
293
    for fname in files:
294
      utils.WriteFile(fname, data="")
295

    
296
    results = RunParts(self.rundir, reset_env=True)
297

    
298
    for fname in sorted(files):
299
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
300

    
301
  def testOk(self):
302
    """Test correct execution"""
303
    fname = os.path.join(self.rundir, "00test")
304
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
305
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
306
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
307
    self.failUnlessEqual(relname, os.path.basename(fname))
308
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
309
    self.failUnlessEqual(runresult.stdout, "ciao")
310

    
311
  def testRunFail(self):
312
    """Test correct execution, with run failure"""
313
    fname = os.path.join(self.rundir, "00test")
314
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
315
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
316
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
317
    self.failUnlessEqual(relname, os.path.basename(fname))
318
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
319
    self.failUnlessEqual(runresult.exit_code, 1)
320
    self.failUnless(runresult.failed)
321

    
322
  def testRunMix(self):
323
    files = []
324
    files.append(os.path.join(self.rundir, "00test"))
325
    files.append(os.path.join(self.rundir, "42test"))
326
    files.append(os.path.join(self.rundir, "64test"))
327
    files.append(os.path.join(self.rundir, "99test"))
328

    
329
    files.sort()
330

    
331
    # 1st has errors in execution
332
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
333
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
334

    
335
    # 2nd is skipped
336
    utils.WriteFile(files[1], data="")
337

    
338
    # 3rd cannot execute properly
339
    utils.WriteFile(files[2], data="")
340
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
341

    
342
    # 4th execs
343
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
344
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
345

    
346
    results = RunParts(self.rundir, reset_env=True)
347

    
348
    (relname, status, runresult) = results[0]
349
    self.failUnlessEqual(relname, os.path.basename(files[0]))
350
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
351
    self.failUnlessEqual(runresult.exit_code, 1)
352
    self.failUnless(runresult.failed)
353

    
354
    (relname, status, runresult) = results[1]
355
    self.failUnlessEqual(relname, os.path.basename(files[1]))
356
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
357
    self.failUnlessEqual(runresult, None)
358

    
359
    (relname, status, runresult) = results[2]
360
    self.failUnlessEqual(relname, os.path.basename(files[2]))
361
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
362
    self.failUnless(runresult)
363

    
364
    (relname, status, runresult) = results[3]
365
    self.failUnlessEqual(relname, os.path.basename(files[3]))
366
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
367
    self.failUnlessEqual(runresult.output, "ciao")
368
    self.failUnlessEqual(runresult.exit_code, 0)
369
    self.failUnless(not runresult.failed)
370

    
371

    
372
class TestStartDaemon(testutils.GanetiTestCase):
373
  def setUp(self):
374
    self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
375
    self.tmpfile = os.path.join(self.tmpdir, "test")
376

    
377
  def tearDown(self):
378
    shutil.rmtree(self.tmpdir)
379

    
380
  def testShell(self):
381
    utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
382
    self._wait(self.tmpfile, 60.0, "Hello World")
383

    
384
  def testShellOutput(self):
385
    utils.StartDaemon("echo Hello World", output=self.tmpfile)
386
    self._wait(self.tmpfile, 60.0, "Hello World")
387

    
388
  def testNoShellNoOutput(self):
389
    utils.StartDaemon(["pwd"])
390

    
391
  def testNoShellNoOutputTouch(self):
392
    testfile = os.path.join(self.tmpdir, "check")
393
    self.failIf(os.path.exists(testfile))
394
    utils.StartDaemon(["touch", testfile])
395
    self._wait(testfile, 60.0, "")
396

    
397
  def testNoShellOutput(self):
398
    utils.StartDaemon(["pwd"], output=self.tmpfile)
399
    self._wait(self.tmpfile, 60.0, "/")
400

    
401
  def testNoShellOutputCwd(self):
402
    utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
403
    self._wait(self.tmpfile, 60.0, os.getcwd())
404

    
405
  def testShellEnv(self):
406
    utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
407
                      env={ "GNT_TEST_VAR": "Hello World", })
408
    self._wait(self.tmpfile, 60.0, "Hello World")
409

    
410
  def testNoShellEnv(self):
411
    utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
412
                      env={ "GNT_TEST_VAR": "Hello World", })
413
    self._wait(self.tmpfile, 60.0, "Hello World")
414

    
415
  def testOutputFd(self):
416
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
417
    try:
418
      utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
419
    finally:
420
      os.close(fd)
421
    self._wait(self.tmpfile, 60.0, os.getcwd())
422

    
423
  def testPid(self):
424
    pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
425
    self._wait(self.tmpfile, 60.0, str(pid))
426

    
427
  def testPidFile(self):
428
    pidfile = os.path.join(self.tmpdir, "pid")
429
    checkfile = os.path.join(self.tmpdir, "abort")
430

    
431
    pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
432
                            output=self.tmpfile)
433
    try:
434
      fd = os.open(pidfile, os.O_RDONLY)
435
      try:
436
        # Check file is locked
437
        self.assertRaises(errors.LockError, utils.LockFile, fd)
438

    
439
        pidtext = os.read(fd, 100)
440
      finally:
441
        os.close(fd)
442

    
443
      self.assertEqual(int(pidtext.strip()), pid)
444

    
445
      self.assert_(utils.IsProcessAlive(pid))
446
    finally:
447
      # No matter what happens, kill daemon
448
      utils.KillProcess(pid, timeout=5.0, waitpid=False)
449
      self.failIf(utils.IsProcessAlive(pid))
450

    
451
    self.assertEqual(utils.ReadFile(self.tmpfile), "")
452

    
453
  def _wait(self, path, timeout, expected):
454
    # Due to the asynchronous nature of daemon processes, polling is necessary.
455
    # A timeout makes sure the test doesn't hang forever.
456
    def _CheckFile():
457
      if not (os.path.isfile(path) and
458
              utils.ReadFile(path).strip() == expected):
459
        raise utils.RetryAgain()
460

    
461
    try:
462
      utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
463
    except utils.RetryTimeout:
464
      self.fail("Apparently the daemon didn't run in %s seconds and/or"
465
                " didn't write the correct output" % timeout)
466

    
467
  def testError(self):
468
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
469
                      ["./does-NOT-EXIST/here/0123456789"])
470
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
471
                      ["./does-NOT-EXIST/here/0123456789"],
472
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
473
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
474
                      ["./does-NOT-EXIST/here/0123456789"],
475
                      cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
476
    self.assertRaises(errors.OpExecError, utils.StartDaemon,
477
                      ["./does-NOT-EXIST/here/0123456789"],
478
                      output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
479

    
480
    fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
481
    try:
482
      self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
483
                        ["./does-NOT-EXIST/here/0123456789"],
484
                        output=self.tmpfile, output_fd=fd)
485
    finally:
486
      os.close(fd)
487

    
488

    
489
class TestSetCloseOnExecFlag(unittest.TestCase):
490
  """Tests for SetCloseOnExecFlag"""
491

    
492
  def setUp(self):
493
    self.tmpfile = tempfile.TemporaryFile()
494

    
495
  def testEnable(self):
496
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
497
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
498
                    fcntl.FD_CLOEXEC)
499

    
500
  def testDisable(self):
501
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
502
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
503
                fcntl.FD_CLOEXEC)
504

    
505

    
506
class TestSetNonblockFlag(unittest.TestCase):
507
  def setUp(self):
508
    self.tmpfile = tempfile.TemporaryFile()
509

    
510
  def testEnable(self):
511
    utils.SetNonblockFlag(self.tmpfile.fileno(), True)
512
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
513
                    os.O_NONBLOCK)
514

    
515
  def testDisable(self):
516
    utils.SetNonblockFlag(self.tmpfile.fileno(), False)
517
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
518
                os.O_NONBLOCK)
519

    
520

    
521
class TestRemoveFile(unittest.TestCase):
522
  """Test case for the RemoveFile function"""
523

    
524
  def setUp(self):
525
    """Create a temp dir and file for each case"""
526
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
527
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
528
    os.close(fd)
529

    
530
  def tearDown(self):
531
    if os.path.exists(self.tmpfile):
532
      os.unlink(self.tmpfile)
533
    os.rmdir(self.tmpdir)
534

    
535
  def testIgnoreDirs(self):
536
    """Test that RemoveFile() ignores directories"""
537
    self.assertEqual(None, RemoveFile(self.tmpdir))
538

    
539
  def testIgnoreNotExisting(self):
540
    """Test that RemoveFile() ignores non-existing files"""
541
    RemoveFile(self.tmpfile)
542
    RemoveFile(self.tmpfile)
543

    
544
  def testRemoveFile(self):
545
    """Test that RemoveFile does remove a file"""
546
    RemoveFile(self.tmpfile)
547
    if os.path.exists(self.tmpfile):
548
      self.fail("File '%s' not removed" % self.tmpfile)
549

    
550
  def testRemoveSymlink(self):
551
    """Test that RemoveFile does remove symlinks"""
552
    symlink = self.tmpdir + "/symlink"
553
    os.symlink("no-such-file", symlink)
554
    RemoveFile(symlink)
555
    if os.path.exists(symlink):
556
      self.fail("File '%s' not removed" % symlink)
557
    os.symlink(self.tmpfile, symlink)
558
    RemoveFile(symlink)
559
    if os.path.exists(symlink):
560
      self.fail("File '%s' not removed" % symlink)
561

    
562

    
563
class TestRename(unittest.TestCase):
564
  """Test case for RenameFile"""
565

    
566
  def setUp(self):
567
    """Create a temporary directory"""
568
    self.tmpdir = tempfile.mkdtemp()
569
    self.tmpfile = os.path.join(self.tmpdir, "test1")
570

    
571
    # Touch the file
572
    open(self.tmpfile, "w").close()
573

    
574
  def tearDown(self):
575
    """Remove temporary directory"""
576
    shutil.rmtree(self.tmpdir)
577

    
578
  def testSimpleRename1(self):
579
    """Simple rename 1"""
580
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
581
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
582

    
583
  def testSimpleRename2(self):
584
    """Simple rename 2"""
585
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
586
                     mkdir=True)
587
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
588

    
589
  def testRenameMkdir(self):
590
    """Rename with mkdir"""
591
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
592
                     mkdir=True)
593
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
594
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
595

    
596
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
597
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
598
                     mkdir=True)
599
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
600
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
601
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
602

    
603

    
604
class TestMatchNameComponent(unittest.TestCase):
605
  """Test case for the MatchNameComponent function"""
606

    
607
  def testEmptyList(self):
608
    """Test that there is no match against an empty list"""
609

    
610
    self.failUnlessEqual(MatchNameComponent("", []), None)
611
    self.failUnlessEqual(MatchNameComponent("test", []), None)
612

    
613
  def testSingleMatch(self):
614
    """Test that a single match is performed correctly"""
615
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
616
    for key in "test2", "test2.example", "test2.example.com":
617
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
618

    
619
  def testMultipleMatches(self):
620
    """Test that a multiple match is returned as None"""
621
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
622
    for key in "test1", "test1.example":
623
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
624

    
625
  def testFullMatch(self):
626
    """Test that a full match is returned correctly"""
627
    key1 = "test1"
628
    key2 = "test1.example"
629
    mlist = [key2, key2 + ".com"]
630
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
631
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
632

    
633
  def testCaseInsensitivePartialMatch(self):
634
    """Test for the case_insensitive keyword"""
635
    mlist = ["test1.example.com", "test2.example.net"]
636
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
637
                     "test2.example.net")
638
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
639
                     "test2.example.net")
640
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
641
                     "test2.example.net")
642
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
643
                     "test2.example.net")
644

    
645

    
646
  def testCaseInsensitiveFullMatch(self):
647
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
648
    # Between the two ts1 a full string match non-case insensitive should work
649
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
650
                     None)
651
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
652
                     "ts1.ex")
653
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
654
                     "ts1.ex")
655
    # Between the two ts2 only case differs, so only case-match works
656
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
657
                     "ts2.ex")
658
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
659
                     "Ts2.ex")
660
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
661
                     None)
662

    
663

    
664
class TestTimestampForFilename(unittest.TestCase):
665
  def test(self):
666
    self.assert_("." not in utils.TimestampForFilename())
667
    self.assert_(":" not in utils.TimestampForFilename())
668

    
669

    
670
class TestCreateBackup(testutils.GanetiTestCase):
671
  def setUp(self):
672
    testutils.GanetiTestCase.setUp(self)
673

    
674
    self.tmpdir = tempfile.mkdtemp()
675

    
676
  def tearDown(self):
677
    testutils.GanetiTestCase.tearDown(self)
678

    
679
    shutil.rmtree(self.tmpdir)
680

    
681
  def testEmpty(self):
682
    filename = utils.PathJoin(self.tmpdir, "config.data")
683
    utils.WriteFile(filename, data="")
684
    bname = utils.CreateBackup(filename)
685
    self.assertFileContent(bname, "")
686
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
687
    utils.CreateBackup(filename)
688
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
689
    utils.CreateBackup(filename)
690
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
691

    
692
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
693
    os.mkfifo(fifoname)
694
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
695

    
696
  def testContent(self):
697
    bkpcount = 0
698
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
699
      for rep in [1, 2, 10, 127]:
700
        testdata = data * rep
701

    
702
        filename = utils.PathJoin(self.tmpdir, "test.data_")
703
        utils.WriteFile(filename, data=testdata)
704
        self.assertFileContent(filename, testdata)
705

    
706
        for _ in range(3):
707
          bname = utils.CreateBackup(filename)
708
          bkpcount += 1
709
          self.assertFileContent(bname, testdata)
710
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
711

    
712

    
713
class TestFormatUnit(unittest.TestCase):
714
  """Test case for the FormatUnit function"""
715

    
716
  def testMiB(self):
717
    self.assertEqual(FormatUnit(1, 'h'), '1M')
718
    self.assertEqual(FormatUnit(100, 'h'), '100M')
719
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
720

    
721
    self.assertEqual(FormatUnit(1, 'm'), '1')
722
    self.assertEqual(FormatUnit(100, 'm'), '100')
723
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
724

    
725
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
726
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
727
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
728
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
729

    
730
  def testGiB(self):
731
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
732
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
733
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
734
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
735

    
736
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
737
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
738
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
739
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
740

    
741
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
742
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
743
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
744

    
745
  def testTiB(self):
746
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
747
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
748
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
749

    
750
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
751
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
752
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
753

    
754
class TestParseUnit(unittest.TestCase):
755
  """Test case for the ParseUnit function"""
756

    
757
  SCALES = (('', 1),
758
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
759
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
760
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
761

    
762
  def testRounding(self):
763
    self.assertEqual(ParseUnit('0'), 0)
764
    self.assertEqual(ParseUnit('1'), 4)
765
    self.assertEqual(ParseUnit('2'), 4)
766
    self.assertEqual(ParseUnit('3'), 4)
767

    
768
    self.assertEqual(ParseUnit('124'), 124)
769
    self.assertEqual(ParseUnit('125'), 128)
770
    self.assertEqual(ParseUnit('126'), 128)
771
    self.assertEqual(ParseUnit('127'), 128)
772
    self.assertEqual(ParseUnit('128'), 128)
773
    self.assertEqual(ParseUnit('129'), 132)
774
    self.assertEqual(ParseUnit('130'), 132)
775

    
776
  def testFloating(self):
777
    self.assertEqual(ParseUnit('0'), 0)
778
    self.assertEqual(ParseUnit('0.5'), 4)
779
    self.assertEqual(ParseUnit('1.75'), 4)
780
    self.assertEqual(ParseUnit('1.99'), 4)
781
    self.assertEqual(ParseUnit('2.00'), 4)
782
    self.assertEqual(ParseUnit('2.01'), 4)
783
    self.assertEqual(ParseUnit('3.99'), 4)
784
    self.assertEqual(ParseUnit('4.00'), 4)
785
    self.assertEqual(ParseUnit('4.01'), 8)
786
    self.assertEqual(ParseUnit('1.5G'), 1536)
787
    self.assertEqual(ParseUnit('1.8G'), 1844)
788
    self.assertEqual(ParseUnit('8.28T'), 8682212)
789

    
790
  def testSuffixes(self):
791
    for sep in ('', ' ', '   ', "\t", "\t "):
792
      for suffix, scale in TestParseUnit.SCALES:
793
        for func in (lambda x: x, str.lower, str.upper):
794
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
795
                           1024 * scale)
796

    
797
  def testInvalidInput(self):
798
    for sep in ('-', '_', ',', 'a'):
799
      for suffix, _ in TestParseUnit.SCALES:
800
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
801

    
802
    for suffix, _ in TestParseUnit.SCALES:
803
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
804

    
805

    
806
class TestSshKeys(testutils.GanetiTestCase):
807
  """Test case for the AddAuthorizedKey function"""
808

    
809
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
810
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
811
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
812

    
813
  def setUp(self):
814
    testutils.GanetiTestCase.setUp(self)
815
    self.tmpname = self._CreateTempFile()
816
    handle = open(self.tmpname, 'w')
817
    try:
818
      handle.write("%s\n" % TestSshKeys.KEY_A)
819
      handle.write("%s\n" % TestSshKeys.KEY_B)
820
    finally:
821
      handle.close()
822

    
823
  def testAddingNewKey(self):
824
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
825

    
826
    self.assertFileContent(self.tmpname,
827
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
828
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
829
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
830
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
831

    
832
  def testAddingAlmostButNotCompletelyTheSameKey(self):
833
    AddAuthorizedKey(self.tmpname,
834
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
835

    
836
    self.assertFileContent(self.tmpname,
837
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
838
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
839
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
840
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
841

    
842
  def testAddingExistingKeyWithSomeMoreSpaces(self):
843
    AddAuthorizedKey(self.tmpname,
844
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
845

    
846
    self.assertFileContent(self.tmpname,
847
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
848
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
849
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
850

    
851
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
852
    RemoveAuthorizedKey(self.tmpname,
853
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
854

    
855
    self.assertFileContent(self.tmpname,
856
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
857
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
858

    
859
  def testRemovingNonExistingKey(self):
860
    RemoveAuthorizedKey(self.tmpname,
861
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
862

    
863
    self.assertFileContent(self.tmpname,
864
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
865
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
866
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
867

    
868

    
869
class TestEtcHosts(testutils.GanetiTestCase):
870
  """Test functions modifying /etc/hosts"""
871

    
872
  def setUp(self):
873
    testutils.GanetiTestCase.setUp(self)
874
    self.tmpname = self._CreateTempFile()
875
    handle = open(self.tmpname, 'w')
876
    try:
877
      handle.write('# This is a test file for /etc/hosts\n')
878
      handle.write('127.0.0.1\tlocalhost\n')
879
      handle.write('192.168.1.1 router gw\n')
880
    finally:
881
      handle.close()
882

    
883
  def testSettingNewIp(self):
884
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
885

    
886
    self.assertFileContent(self.tmpname,
887
      "# This is a test file for /etc/hosts\n"
888
      "127.0.0.1\tlocalhost\n"
889
      "192.168.1.1 router gw\n"
890
      "1.2.3.4\tmyhost.domain.tld myhost\n")
891
    self.assertFileMode(self.tmpname, 0644)
892

    
893
  def testSettingExistingIp(self):
894
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
895
                     ['myhost'])
896

    
897
    self.assertFileContent(self.tmpname,
898
      "# This is a test file for /etc/hosts\n"
899
      "127.0.0.1\tlocalhost\n"
900
      "192.168.1.1\tmyhost.domain.tld myhost\n")
901
    self.assertFileMode(self.tmpname, 0644)
902

    
903
  def testSettingDuplicateName(self):
904
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
905

    
906
    self.assertFileContent(self.tmpname,
907
      "# This is a test file for /etc/hosts\n"
908
      "127.0.0.1\tlocalhost\n"
909
      "192.168.1.1 router gw\n"
910
      "1.2.3.4\tmyhost\n")
911
    self.assertFileMode(self.tmpname, 0644)
912

    
913
  def testRemovingExistingHost(self):
914
    RemoveEtcHostsEntry(self.tmpname, 'router')
915

    
916
    self.assertFileContent(self.tmpname,
917
      "# This is a test file for /etc/hosts\n"
918
      "127.0.0.1\tlocalhost\n"
919
      "192.168.1.1 gw\n")
920
    self.assertFileMode(self.tmpname, 0644)
921

    
922
  def testRemovingSingleExistingHost(self):
923
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
924

    
925
    self.assertFileContent(self.tmpname,
926
      "# This is a test file for /etc/hosts\n"
927
      "192.168.1.1 router gw\n")
928
    self.assertFileMode(self.tmpname, 0644)
929

    
930
  def testRemovingNonExistingHost(self):
931
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
932

    
933
    self.assertFileContent(self.tmpname,
934
      "# This is a test file for /etc/hosts\n"
935
      "127.0.0.1\tlocalhost\n"
936
      "192.168.1.1 router gw\n")
937
    self.assertFileMode(self.tmpname, 0644)
938

    
939
  def testRemovingAlias(self):
940
    RemoveEtcHostsEntry(self.tmpname, 'gw')
941

    
942
    self.assertFileContent(self.tmpname,
943
      "# This is a test file for /etc/hosts\n"
944
      "127.0.0.1\tlocalhost\n"
945
      "192.168.1.1 router\n")
946
    self.assertFileMode(self.tmpname, 0644)
947

    
948

    
949
class TestShellQuoting(unittest.TestCase):
950
  """Test case for shell quoting functions"""
951

    
952
  def testShellQuote(self):
953
    self.assertEqual(ShellQuote('abc'), "abc")
954
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
955
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
956
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
957
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
958

    
959
  def testShellQuoteArgs(self):
960
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
961
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
962
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
963

    
964

    
965
class TestTcpPing(unittest.TestCase):
966
  """Testcase for TCP version of ping - against listen(2)ing port"""
967

    
968
  def setUp(self):
969
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
970
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
971
    self.listenerport = self.listener.getsockname()[1]
972
    self.listener.listen(1)
973

    
974
  def tearDown(self):
975
    self.listener.shutdown(socket.SHUT_RDWR)
976
    del self.listener
977
    del self.listenerport
978

    
979
  def testTcpPingToLocalHostAccept(self):
980
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
981
                         self.listenerport,
982
                         timeout=10,
983
                         live_port_needed=True,
984
                         source=constants.LOCALHOST_IP_ADDRESS,
985
                         ),
986
                 "failed to connect to test listener")
987

    
988
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
989
                         self.listenerport,
990
                         timeout=10,
991
                         live_port_needed=True,
992
                         ),
993
                 "failed to connect to test listener (no source)")
994

    
995

    
996
class TestTcpPingDeaf(unittest.TestCase):
997
  """Testcase for TCP version of ping - against non listen(2)ing port"""
998

    
999
  def setUp(self):
1000
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1001
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
1002
    self.deaflistenerport = self.deaflistener.getsockname()[1]
1003

    
1004
  def tearDown(self):
1005
    del self.deaflistener
1006
    del self.deaflistenerport
1007

    
1008
  def testTcpPingToLocalHostAcceptDeaf(self):
1009
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1010
                        self.deaflistenerport,
1011
                        timeout=constants.TCP_PING_TIMEOUT,
1012
                        live_port_needed=True,
1013
                        source=constants.LOCALHOST_IP_ADDRESS,
1014
                        ), # need successful connect(2)
1015
                "successfully connected to deaf listener")
1016

    
1017
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1018
                        self.deaflistenerport,
1019
                        timeout=constants.TCP_PING_TIMEOUT,
1020
                        live_port_needed=True,
1021
                        ), # need successful connect(2)
1022
                "successfully connected to deaf listener (no source addr)")
1023

    
1024
  def testTcpPingToLocalHostNoAccept(self):
1025
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1026
                         self.deaflistenerport,
1027
                         timeout=constants.TCP_PING_TIMEOUT,
1028
                         live_port_needed=False,
1029
                         source=constants.LOCALHOST_IP_ADDRESS,
1030
                         ), # ECONNREFUSED is OK
1031
                 "failed to ping alive host on deaf port")
1032

    
1033
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1034
                         self.deaflistenerport,
1035
                         timeout=constants.TCP_PING_TIMEOUT,
1036
                         live_port_needed=False,
1037
                         ), # ECONNREFUSED is OK
1038
                 "failed to ping alive host on deaf port (no source addr)")
1039

    
1040

    
1041
class TestOwnIpAddress(unittest.TestCase):
1042
  """Testcase for OwnIpAddress"""
1043

    
1044
  def testOwnLoopback(self):
1045
    """check having the loopback ip"""
1046
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
1047
                    "Should own the loopback address")
1048

    
1049
  def testNowOwnAddress(self):
1050
    """check that I don't own an address"""
1051

    
1052
    # network 192.0.2.0/24 is reserved for test/documentation as per
1053
    # rfc 3330, so we *should* not have an address of this range... if
1054
    # this fails, we should extend the test to multiple addresses
1055
    DST_IP = "192.0.2.1"
1056
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
1057

    
1058

    
1059
class TestListVisibleFiles(unittest.TestCase):
1060
  """Test case for ListVisibleFiles"""
1061

    
1062
  def setUp(self):
1063
    self.path = tempfile.mkdtemp()
1064

    
1065
  def tearDown(self):
1066
    shutil.rmtree(self.path)
1067

    
1068
  def _test(self, files, expected):
1069
    # Sort a copy
1070
    expected = expected[:]
1071
    expected.sort()
1072

    
1073
    for name in files:
1074
      f = open(os.path.join(self.path, name), 'w')
1075
      try:
1076
        f.write("Test\n")
1077
      finally:
1078
        f.close()
1079

    
1080
    found = ListVisibleFiles(self.path)
1081
    found.sort()
1082

    
1083
    self.assertEqual(found, expected)
1084

    
1085
  def testAllVisible(self):
1086
    files = ["a", "b", "c"]
1087
    expected = files
1088
    self._test(files, expected)
1089

    
1090
  def testNoneVisible(self):
1091
    files = [".a", ".b", ".c"]
1092
    expected = []
1093
    self._test(files, expected)
1094

    
1095
  def testSomeVisible(self):
1096
    files = ["a", "b", ".c"]
1097
    expected = ["a", "b"]
1098
    self._test(files, expected)
1099

    
1100
  def testNonAbsolutePath(self):
1101
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1102

    
1103
  def testNonNormalizedPath(self):
1104
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1105
                          "/bin/../tmp")
1106

    
1107

    
1108
class TestNewUUID(unittest.TestCase):
1109
  """Test case for NewUUID"""
1110

    
1111
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1112
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
1113

    
1114
  def runTest(self):
1115
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
1116

    
1117

    
1118
class TestUniqueSequence(unittest.TestCase):
1119
  """Test case for UniqueSequence"""
1120

    
1121
  def _test(self, input, expected):
1122
    self.assertEqual(utils.UniqueSequence(input), expected)
1123

    
1124
  def runTest(self):
1125
    # Ordered input
1126
    self._test([1, 2, 3], [1, 2, 3])
1127
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1128
    self._test([1, 2, 2, 3], [1, 2, 3])
1129
    self._test([1, 2, 3, 3], [1, 2, 3])
1130

    
1131
    # Unordered input
1132
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1133
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1134

    
1135
    # Strings
1136
    self._test(["a", "a"], ["a"])
1137
    self._test(["a", "b"], ["a", "b"])
1138
    self._test(["a", "b", "a"], ["a", "b"])
1139

    
1140

    
1141
class TestFirstFree(unittest.TestCase):
1142
  """Test case for the FirstFree function"""
1143

    
1144
  def test(self):
1145
    """Test FirstFree"""
1146
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1147
    self.failUnlessEqual(FirstFree([]), None)
1148
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1149
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1150
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1151

    
1152

    
1153
class TestTailFile(testutils.GanetiTestCase):
1154
  """Test case for the TailFile function"""
1155

    
1156
  def testEmpty(self):
1157
    fname = self._CreateTempFile()
1158
    self.failUnlessEqual(TailFile(fname), [])
1159
    self.failUnlessEqual(TailFile(fname, lines=25), [])
1160

    
1161
  def testAllLines(self):
1162
    data = ["test %d" % i for i in range(30)]
1163
    for i in range(30):
1164
      fname = self._CreateTempFile()
1165
      fd = open(fname, "w")
1166
      fd.write("\n".join(data[:i]))
1167
      if i > 0:
1168
        fd.write("\n")
1169
      fd.close()
1170
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1171

    
1172
  def testPartialLines(self):
1173
    data = ["test %d" % i for i in range(30)]
1174
    fname = self._CreateTempFile()
1175
    fd = open(fname, "w")
1176
    fd.write("\n".join(data))
1177
    fd.write("\n")
1178
    fd.close()
1179
    for i in range(1, 30):
1180
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1181

    
1182
  def testBigFile(self):
1183
    data = ["test %d" % i for i in range(30)]
1184
    fname = self._CreateTempFile()
1185
    fd = open(fname, "w")
1186
    fd.write("X" * 1048576)
1187
    fd.write("\n")
1188
    fd.write("\n".join(data))
1189
    fd.write("\n")
1190
    fd.close()
1191
    for i in range(1, 30):
1192
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1193

    
1194

    
1195
class _BaseFileLockTest:
1196
  """Test case for the FileLock class"""
1197

    
1198
  def testSharedNonblocking(self):
1199
    self.lock.Shared(blocking=False)
1200
    self.lock.Close()
1201

    
1202
  def testExclusiveNonblocking(self):
1203
    self.lock.Exclusive(blocking=False)
1204
    self.lock.Close()
1205

    
1206
  def testUnlockNonblocking(self):
1207
    self.lock.Unlock(blocking=False)
1208
    self.lock.Close()
1209

    
1210
  def testSharedBlocking(self):
1211
    self.lock.Shared(blocking=True)
1212
    self.lock.Close()
1213

    
1214
  def testExclusiveBlocking(self):
1215
    self.lock.Exclusive(blocking=True)
1216
    self.lock.Close()
1217

    
1218
  def testUnlockBlocking(self):
1219
    self.lock.Unlock(blocking=True)
1220
    self.lock.Close()
1221

    
1222
  def testSharedExclusiveUnlock(self):
1223
    self.lock.Shared(blocking=False)
1224
    self.lock.Exclusive(blocking=False)
1225
    self.lock.Unlock(blocking=False)
1226
    self.lock.Close()
1227

    
1228
  def testExclusiveSharedUnlock(self):
1229
    self.lock.Exclusive(blocking=False)
1230
    self.lock.Shared(blocking=False)
1231
    self.lock.Unlock(blocking=False)
1232
    self.lock.Close()
1233

    
1234
  def testSimpleTimeout(self):
1235
    # These will succeed on the first attempt, hence a short timeout
1236
    self.lock.Shared(blocking=True, timeout=10.0)
1237
    self.lock.Exclusive(blocking=False, timeout=10.0)
1238
    self.lock.Unlock(blocking=True, timeout=10.0)
1239
    self.lock.Close()
1240

    
1241
  @staticmethod
1242
  def _TryLockInner(filename, shared, blocking):
1243
    lock = utils.FileLock.Open(filename)
1244

    
1245
    if shared:
1246
      fn = lock.Shared
1247
    else:
1248
      fn = lock.Exclusive
1249

    
1250
    try:
1251
      # The timeout doesn't really matter as the parent process waits for us to
1252
      # finish anyway.
1253
      fn(blocking=blocking, timeout=0.01)
1254
    except errors.LockError, err:
1255
      return False
1256

    
1257
    return True
1258

    
1259
  def _TryLock(self, *args):
1260
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1261
                                      *args)
1262

    
1263
  def testTimeout(self):
1264
    for blocking in [True, False]:
1265
      self.lock.Exclusive(blocking=True)
1266
      self.failIf(self._TryLock(False, blocking))
1267
      self.failIf(self._TryLock(True, blocking))
1268

    
1269
      self.lock.Shared(blocking=True)
1270
      self.assert_(self._TryLock(True, blocking))
1271
      self.failIf(self._TryLock(False, blocking))
1272

    
1273
  def testCloseShared(self):
1274
    self.lock.Close()
1275
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1276

    
1277
  def testCloseExclusive(self):
1278
    self.lock.Close()
1279
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1280

    
1281
  def testCloseUnlock(self):
1282
    self.lock.Close()
1283
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1284

    
1285

    
1286
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1287
  TESTDATA = "Hello World\n" * 10
1288

    
1289
  def setUp(self):
1290
    testutils.GanetiTestCase.setUp(self)
1291

    
1292
    self.tmpfile = tempfile.NamedTemporaryFile()
1293
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1294
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1295

    
1296
    # Ensure "Open" didn't truncate file
1297
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1298

    
1299
  def tearDown(self):
1300
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1301

    
1302
    testutils.GanetiTestCase.tearDown(self)
1303

    
1304

    
1305
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1306
  def setUp(self):
1307
    self.tmpfile = tempfile.NamedTemporaryFile()
1308
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1309

    
1310

    
1311
class TestTimeFunctions(unittest.TestCase):
1312
  """Test case for time functions"""
1313

    
1314
  def runTest(self):
1315
    self.assertEqual(utils.SplitTime(1), (1, 0))
1316
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1317
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1318
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1319
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1320
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1321
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1322
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1323

    
1324
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1325

    
1326
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1327
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1328
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1329

    
1330
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1331
                     1218448917.481)
1332
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1333

    
1334
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1335
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1336
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1337
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1338
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1339

    
1340

    
1341
class FieldSetTestCase(unittest.TestCase):
1342
  """Test case for FieldSets"""
1343

    
1344
  def testSimpleMatch(self):
1345
    f = utils.FieldSet("a", "b", "c", "def")
1346
    self.failUnless(f.Matches("a"))
1347
    self.failIf(f.Matches("d"), "Substring matched")
1348
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1349
    self.failIf(f.NonMatching(["b", "c"]))
1350
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1351
    self.failUnless(f.NonMatching(["a", "d"]))
1352

    
1353
  def testRegexMatch(self):
1354
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1355
    self.failUnless(f.Matches("b1"))
1356
    self.failUnless(f.Matches("b99"))
1357
    self.failIf(f.Matches("b/1"))
1358
    self.failIf(f.NonMatching(["b12", "c"]))
1359
    self.failUnless(f.NonMatching(["a", "1"]))
1360

    
1361
class TestForceDictType(unittest.TestCase):
1362
  """Test case for ForceDictType"""
1363

    
1364
  def setUp(self):
1365
    self.key_types = {
1366
      'a': constants.VTYPE_INT,
1367
      'b': constants.VTYPE_BOOL,
1368
      'c': constants.VTYPE_STRING,
1369
      'd': constants.VTYPE_SIZE,
1370
      }
1371

    
1372
  def _fdt(self, dict, allowed_values=None):
1373
    if allowed_values is None:
1374
      ForceDictType(dict, self.key_types)
1375
    else:
1376
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1377

    
1378
    return dict
1379

    
1380
  def testSimpleDict(self):
1381
    self.assertEqual(self._fdt({}), {})
1382
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1383
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1384
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1385
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1386
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1387
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1388
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1389
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1390
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1391
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1392
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1393

    
1394
  def testErrors(self):
1395
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1396
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1397
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1398
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1399

    
1400

    
1401
class TestIsAbsNormPath(unittest.TestCase):
1402
  """Testing case for IsNormAbsPath"""
1403

    
1404
  def _pathTestHelper(self, path, result):
1405
    if result:
1406
      self.assert_(IsNormAbsPath(path),
1407
          "Path %s should result absolute and normalized" % path)
1408
    else:
1409
      self.assert_(not IsNormAbsPath(path),
1410
          "Path %s should not result absolute and normalized" % path)
1411

    
1412
  def testBase(self):
1413
    self._pathTestHelper('/etc', True)
1414
    self._pathTestHelper('/srv', True)
1415
    self._pathTestHelper('etc', False)
1416
    self._pathTestHelper('/etc/../root', False)
1417
    self._pathTestHelper('/etc/', False)
1418

    
1419

    
1420
class TestSafeEncode(unittest.TestCase):
1421
  """Test case for SafeEncode"""
1422

    
1423
  def testAscii(self):
1424
    for txt in [string.digits, string.letters, string.punctuation]:
1425
      self.failUnlessEqual(txt, SafeEncode(txt))
1426

    
1427
  def testDoubleEncode(self):
1428
    for i in range(255):
1429
      txt = SafeEncode(chr(i))
1430
      self.failUnlessEqual(txt, SafeEncode(txt))
1431

    
1432
  def testUnicode(self):
1433
    # 1024 is high enough to catch non-direct ASCII mappings
1434
    for i in range(1024):
1435
      txt = SafeEncode(unichr(i))
1436
      self.failUnlessEqual(txt, SafeEncode(txt))
1437

    
1438

    
1439
class TestFormatTime(unittest.TestCase):
1440
  """Testing case for FormatTime"""
1441

    
1442
  def testNone(self):
1443
    self.failUnlessEqual(FormatTime(None), "N/A")
1444

    
1445
  def testInvalid(self):
1446
    self.failUnlessEqual(FormatTime(()), "N/A")
1447

    
1448
  def testNow(self):
1449
    # tests that we accept time.time input
1450
    FormatTime(time.time())
1451
    # tests that we accept int input
1452
    FormatTime(int(time.time()))
1453

    
1454

    
1455
class RunInSeparateProcess(unittest.TestCase):
1456
  def test(self):
1457
    for exp in [True, False]:
1458
      def _child():
1459
        return exp
1460

    
1461
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1462

    
1463
  def testArgs(self):
1464
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1465
      def _child(carg1, carg2):
1466
        return carg1 == "Foo" and carg2 == arg
1467

    
1468
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1469

    
1470
  def testPid(self):
1471
    parent_pid = os.getpid()
1472

    
1473
    def _check():
1474
      return os.getpid() == parent_pid
1475

    
1476
    self.failIf(utils.RunInSeparateProcess(_check))
1477

    
1478
  def testSignal(self):
1479
    def _kill():
1480
      os.kill(os.getpid(), signal.SIGTERM)
1481

    
1482
    self.assertRaises(errors.GenericError,
1483
                      utils.RunInSeparateProcess, _kill)
1484

    
1485
  def testException(self):
1486
    def _exc():
1487
      raise errors.GenericError("This is a test")
1488

    
1489
    self.assertRaises(errors.GenericError,
1490
                      utils.RunInSeparateProcess, _exc)
1491

    
1492

    
1493
class TestFingerprintFile(unittest.TestCase):
1494
  def setUp(self):
1495
    self.tmpfile = tempfile.NamedTemporaryFile()
1496

    
1497
  def test(self):
1498
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1499
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1500

    
1501
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1502
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1503
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1504

    
1505

    
1506
class TestUnescapeAndSplit(unittest.TestCase):
1507
  """Testing case for UnescapeAndSplit"""
1508

    
1509
  def setUp(self):
1510
    # testing more that one separator for regexp safety
1511
    self._seps = [",", "+", "."]
1512

    
1513
  def testSimple(self):
1514
    a = ["a", "b", "c", "d"]
1515
    for sep in self._seps:
1516
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1517

    
1518
  def testEscape(self):
1519
    for sep in self._seps:
1520
      a = ["a", "b\\" + sep + "c", "d"]
1521
      b = ["a", "b" + sep + "c", "d"]
1522
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1523

    
1524
  def testDoubleEscape(self):
1525
    for sep in self._seps:
1526
      a = ["a", "b\\\\", "c", "d"]
1527
      b = ["a", "b\\", "c", "d"]
1528
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1529

    
1530
  def testThreeEscape(self):
1531
    for sep in self._seps:
1532
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1533
      b = ["a", "b\\" + sep + "c", "d"]
1534
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1535

    
1536

    
1537
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1538
  def setUp(self):
1539
    self.tmpdir = tempfile.mkdtemp()
1540

    
1541
  def tearDown(self):
1542
    shutil.rmtree(self.tmpdir)
1543

    
1544
  def _checkRsaPrivateKey(self, key):
1545
    lines = key.splitlines()
1546
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1547
            "-----END RSA PRIVATE KEY-----" in lines)
1548

    
1549
  def _checkCertificate(self, cert):
1550
    lines = cert.splitlines()
1551
    return ("-----BEGIN CERTIFICATE-----" in lines and
1552
            "-----END CERTIFICATE-----" in lines)
1553

    
1554
  def test(self):
1555
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1556
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1557
      self._checkRsaPrivateKey(key_pem)
1558
      self._checkCertificate(cert_pem)
1559

    
1560
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1561
                                           key_pem)
1562
      self.assert_(key.bits() >= 1024)
1563
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1564
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1565

    
1566
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1567
                                             cert_pem)
1568
      self.failIf(x509.has_expired())
1569
      self.assertEqual(x509.get_issuer().CN, common_name)
1570
      self.assertEqual(x509.get_subject().CN, common_name)
1571
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1572

    
1573
  def testLegacy(self):
1574
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1575

    
1576
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1577

    
1578
    cert1 = utils.ReadFile(cert1_filename)
1579

    
1580
    self.assert_(self._checkRsaPrivateKey(cert1))
1581
    self.assert_(self._checkCertificate(cert1))
1582

    
1583

    
1584
class TestPathJoin(unittest.TestCase):
1585
  """Testing case for PathJoin"""
1586

    
1587
  def testBasicItems(self):
1588
    mlist = ["/a", "b", "c"]
1589
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1590

    
1591
  def testNonAbsPrefix(self):
1592
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1593

    
1594
  def testBackTrack(self):
1595
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1596

    
1597
  def testMultiAbs(self):
1598
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1599

    
1600

    
1601
class TestHostInfo(unittest.TestCase):
1602
  """Testing case for HostInfo"""
1603

    
1604
  def testUppercase(self):
1605
    data = "AbC.example.com"
1606
    self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1607

    
1608
  def testTooLongName(self):
1609
    data = "a.b." + "c" * 255
1610
    self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1611

    
1612
  def testTrailingDot(self):
1613
    data = "a.b.c"
1614
    self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1615

    
1616
  def testInvalidName(self):
1617
    data = [
1618
      "a b",
1619
      "a/b",
1620
      ".a.b",
1621
      "a..b",
1622
      ]
1623
    for value in data:
1624
      self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1625

    
1626
  def testValidName(self):
1627
    data = [
1628
      "a.b",
1629
      "a-b",
1630
      "a_b",
1631
      "a.b.c",
1632
      ]
1633
    for value in data:
1634
      HostInfo.NormalizeName(value)
1635

    
1636

    
1637
class TestParseAsn1Generalizedtime(unittest.TestCase):
1638
  def test(self):
1639
    # UTC
1640
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1641
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1642
                     1266860512)
1643
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1644
                     (2**31) - 1)
1645

    
1646
    # With offset
1647
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1648
                     1266860512)
1649
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1650
                     1266931012)
1651
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1652
                     1266931088)
1653
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1654
                     1266931295)
1655
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1656
                     3600)
1657

    
1658
    # Leap seconds are not supported by datetime.datetime
1659
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1660
                      "19841231235960+0000")
1661
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1662
                      "19920630235960+0000")
1663

    
1664
    # Errors
1665
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1666
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1667
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1668
                      "20100222174152")
1669
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1670
                      "Mon Feb 22 17:47:02 UTC 2010")
1671
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1672
                      "2010-02-22 17:42:02")
1673

    
1674

    
1675
class TestGetX509CertValidity(testutils.GanetiTestCase):
1676
  def setUp(self):
1677
    testutils.GanetiTestCase.setUp(self)
1678

    
1679
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1680

    
1681
    # Test whether we have pyOpenSSL 0.7 or above
1682
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1683

    
1684
    if not self.pyopenssl0_7:
1685
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1686
                    " function correctly")
1687

    
1688
  def _LoadCert(self, name):
1689
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1690
                                           self._ReadTestData(name))
1691

    
1692
  def test(self):
1693
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1694
    if self.pyopenssl0_7:
1695
      self.assertEqual(validity, (1266919967, 1267524767))
1696
    else:
1697
      self.assertEqual(validity, (None, None))
1698

    
1699

    
1700
class TestSignX509Certificate(unittest.TestCase):
1701
  KEY = "My private key!"
1702
  KEY_OTHER = "Another key"
1703

    
1704
  def test(self):
1705
    # Generate certificate valid for 5 minutes
1706
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1707

    
1708
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1709
                                           cert_pem)
1710

    
1711
    # No signature at all
1712
    self.assertRaises(errors.GenericError,
1713
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1714

    
1715
    # Invalid input
1716
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1717
                      "", self.KEY)
1718
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1719
                      "X-Ganeti-Signature: \n", self.KEY)
1720
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1721
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1722
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1723
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1724
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1725
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1726

    
1727
    # Invalid salt
1728
    for salt in list("-_@$,:;/\\ \t\n"):
1729
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1730
                        cert_pem, self.KEY, "foo%sbar" % salt)
1731

    
1732
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
1733
                 utils.GenerateSecret(numbytes=4),
1734
                 utils.GenerateSecret(numbytes=16),
1735
                 "{123:456}".encode("hex")]:
1736
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1737

    
1738
      self._Check(cert, salt, signed_pem)
1739

    
1740
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1741
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1742
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1743
                               "lines----\n------ at\nthe end!"))
1744

    
1745
  def _Check(self, cert, salt, pem):
1746
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1747
    self.assertEqual(salt, salt2)
1748
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1749

    
1750
    # Other key
1751
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1752
                      pem, self.KEY_OTHER)
1753

    
1754

    
1755
class TestMakedirs(unittest.TestCase):
1756
  def setUp(self):
1757
    self.tmpdir = tempfile.mkdtemp()
1758

    
1759
  def tearDown(self):
1760
    shutil.rmtree(self.tmpdir)
1761

    
1762
  def testNonExisting(self):
1763
    path = utils.PathJoin(self.tmpdir, "foo")
1764
    utils.Makedirs(path)
1765
    self.assert_(os.path.isdir(path))
1766

    
1767
  def testExisting(self):
1768
    path = utils.PathJoin(self.tmpdir, "foo")
1769
    os.mkdir(path)
1770
    utils.Makedirs(path)
1771
    self.assert_(os.path.isdir(path))
1772

    
1773
  def testRecursiveNonExisting(self):
1774
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
1775
    utils.Makedirs(path)
1776
    self.assert_(os.path.isdir(path))
1777

    
1778
  def testRecursiveExisting(self):
1779
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
1780
    self.assert_(not os.path.exists(path))
1781
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
1782
    utils.Makedirs(path)
1783
    self.assert_(os.path.isdir(path))
1784

    
1785

    
1786
class TestRetry(testutils.GanetiTestCase):
1787
  @staticmethod
1788
  def _RaiseRetryAgain():
1789
    raise utils.RetryAgain()
1790

    
1791
  def _WrongNestedLoop(self):
1792
    return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
1793

    
1794
  def testRaiseTimeout(self):
1795
    self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1796
                          self._RaiseRetryAgain, 0.01, 0.02)
1797

    
1798
  def testComplete(self):
1799
    self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
1800

    
1801
  def testNestedLoop(self):
1802
    try:
1803
      self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
1804
                            self._WrongNestedLoop, 0, 1)
1805
    except utils.RetryTimeout:
1806
      self.fail("Didn't detect inner loop's exception")
1807

    
1808

    
1809
class TestLineSplitter(unittest.TestCase):
1810
  def test(self):
1811
    lines = []
1812
    ls = utils.LineSplitter(lines.append)
1813
    ls.write("Hello World\n")
1814
    self.assertEqual(lines, [])
1815
    ls.write("Foo\n Bar\r\n ")
1816
    ls.write("Baz")
1817
    ls.write("Moo")
1818
    self.assertEqual(lines, [])
1819
    ls.flush()
1820
    self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
1821
    ls.close()
1822
    self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
1823

    
1824
  def _testExtra(self, line, all_lines, p1, p2):
1825
    self.assertEqual(p1, 999)
1826
    self.assertEqual(p2, "extra")
1827
    all_lines.append(line)
1828

    
1829
  def testExtraArgsNoFlush(self):
1830
    lines = []
1831
    ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
1832
    ls.write("\n\nHello World\n")
1833
    ls.write("Foo\n Bar\r\n ")
1834
    ls.write("")
1835
    ls.write("Baz")
1836
    ls.write("Moo\n\nx\n")
1837
    self.assertEqual(lines, [])
1838
    ls.close()
1839
    self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
1840
                             "", "x"])
1841

    
1842

    
1843
class TestReadLockedPidFile(unittest.TestCase):
1844
  def setUp(self):
1845
    self.tmpdir = tempfile.mkdtemp()
1846

    
1847
  def tearDown(self):
1848
    shutil.rmtree(self.tmpdir)
1849

    
1850
  def testNonExistent(self):
1851
    path = utils.PathJoin(self.tmpdir, "nonexist")
1852
    self.assert_(utils.ReadLockedPidFile(path) is None)
1853

    
1854
  def testUnlocked(self):
1855
    path = utils.PathJoin(self.tmpdir, "pid")
1856
    utils.WriteFile(path, data="123")
1857
    self.assert_(utils.ReadLockedPidFile(path) is None)
1858

    
1859
  def testLocked(self):
1860
    path = utils.PathJoin(self.tmpdir, "pid")
1861
    utils.WriteFile(path, data="123")
1862

    
1863
    fl = utils.FileLock.Open(path)
1864
    try:
1865
      fl.Exclusive(blocking=True)
1866

    
1867
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
1868
    finally:
1869
      fl.Close()
1870

    
1871
    self.assert_(utils.ReadLockedPidFile(path) is None)
1872

    
1873
  def testError(self):
1874
    path = utils.PathJoin(self.tmpdir, "foobar", "pid")
1875
    utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
1876
    # open(2) should return ENOTDIR
1877
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
1878

    
1879

    
1880
if __name__ == '__main__':
1881
  testutils.GanetiTestProgram()