Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 27e46076

History | View | Annotate | Download (48.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import stat
31
import md5
32
import signal
33
import socket
34
import shutil
35
import re
36
import select
37
import string
38
import OpenSSL
39
import warnings
40
import distutils.version
41

    
42
import ganeti
43
import testutils
44
from ganeti import constants
45
from ganeti import utils
46
from ganeti import errors
47
from ganeti.utils import IsProcessAlive, RunCmd, \
48
     RemoveFile, MatchNameComponent, FormatUnit, \
49
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
50
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
51
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
52
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
53
     UnescapeAndSplit, RunParts, PathJoin, HostInfo
54

    
55
from ganeti.errors import LockError, UnitParseError, GenericError, \
56
     ProgrammerError, OpPrereqError
57

    
58

    
59
class TestIsProcessAlive(unittest.TestCase):
60
  """Testing case for IsProcessAlive"""
61

    
62
  def testExists(self):
63
    mypid = os.getpid()
64
    self.assert_(IsProcessAlive(mypid),
65
                 "can't find myself running")
66

    
67
  def testNotExisting(self):
68
    pid_non_existing = os.fork()
69
    if pid_non_existing == 0:
70
      os._exit(0)
71
    elif pid_non_existing < 0:
72
      raise SystemError("can't fork")
73
    os.waitpid(pid_non_existing, 0)
74
    self.assert_(not IsProcessAlive(pid_non_existing),
75
                 "nonexisting process detected")
76

    
77

    
78
class TestPidFileFunctions(unittest.TestCase):
79
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
80

    
81
  def setUp(self):
82
    self.dir = tempfile.mkdtemp()
83
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
84
    utils.DaemonPidFileName = self.f_dpn
85

    
86
  def testPidFileFunctions(self):
87
    pid_file = self.f_dpn('test')
88
    utils.WritePidFile('test')
89
    self.failUnless(os.path.exists(pid_file),
90
                    "PID file should have been created")
91
    read_pid = utils.ReadPidFile(pid_file)
92
    self.failUnlessEqual(read_pid, os.getpid())
93
    self.failUnless(utils.IsProcessAlive(read_pid))
94
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
95
    utils.RemovePidFile('test')
96
    self.failIf(os.path.exists(pid_file),
97
                "PID file should not exist anymore")
98
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
99
                         "ReadPidFile should return 0 for missing pid file")
100
    fh = open(pid_file, "w")
101
    fh.write("blah\n")
102
    fh.close()
103
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
104
                         "ReadPidFile should return 0 for invalid pid file")
105
    utils.RemovePidFile('test')
106
    self.failIf(os.path.exists(pid_file),
107
                "PID file should not exist anymore")
108

    
109
  def testKill(self):
110
    pid_file = self.f_dpn('child')
111
    r_fd, w_fd = os.pipe()
112
    new_pid = os.fork()
113
    if new_pid == 0: #child
114
      utils.WritePidFile('child')
115
      os.write(w_fd, 'a')
116
      signal.pause()
117
      os._exit(0)
118
      return
119
    # else we are in the parent
120
    # wait until the child has written the pid file
121
    os.read(r_fd, 1)
122
    read_pid = utils.ReadPidFile(pid_file)
123
    self.failUnlessEqual(read_pid, new_pid)
124
    self.failUnless(utils.IsProcessAlive(new_pid))
125
    utils.KillProcess(new_pid, waitpid=True)
126
    self.failIf(utils.IsProcessAlive(new_pid))
127
    utils.RemovePidFile('child')
128
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
129

    
130
  def tearDown(self):
131
    for name in os.listdir(self.dir):
132
      os.unlink(os.path.join(self.dir, name))
133
    os.rmdir(self.dir)
134

    
135

    
136
class TestRunCmd(testutils.GanetiTestCase):
137
  """Testing case for the RunCmd function"""
138

    
139
  def setUp(self):
140
    testutils.GanetiTestCase.setUp(self)
141
    self.magic = time.ctime() + " ganeti test"
142
    self.fname = self._CreateTempFile()
143

    
144
  def testOk(self):
145
    """Test successful exit code"""
146
    result = RunCmd("/bin/sh -c 'exit 0'")
147
    self.assertEqual(result.exit_code, 0)
148
    self.assertEqual(result.output, "")
149

    
150
  def testFail(self):
151
    """Test fail exit code"""
152
    result = RunCmd("/bin/sh -c 'exit 1'")
153
    self.assertEqual(result.exit_code, 1)
154
    self.assertEqual(result.output, "")
155

    
156
  def testStdout(self):
157
    """Test standard output"""
158
    cmd = 'echo -n "%s"' % self.magic
159
    result = RunCmd("/bin/sh -c '%s'" % cmd)
160
    self.assertEqual(result.stdout, self.magic)
161
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
162
    self.assertEqual(result.output, "")
163
    self.assertFileContent(self.fname, self.magic)
164

    
165
  def testStderr(self):
166
    """Test standard error"""
167
    cmd = 'echo -n "%s"' % self.magic
168
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
169
    self.assertEqual(result.stderr, self.magic)
170
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
171
    self.assertEqual(result.output, "")
172
    self.assertFileContent(self.fname, self.magic)
173

    
174
  def testCombined(self):
175
    """Test combined output"""
176
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
177
    expected = "A" + self.magic + "B" + self.magic
178
    result = RunCmd("/bin/sh -c '%s'" % cmd)
179
    self.assertEqual(result.output, expected)
180
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
181
    self.assertEqual(result.output, "")
182
    self.assertFileContent(self.fname, expected)
183

    
184
  def testSignal(self):
185
    """Test signal"""
186
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
187
    self.assertEqual(result.signal, 15)
188
    self.assertEqual(result.output, "")
189

    
190
  def testListRun(self):
191
    """Test list runs"""
192
    result = RunCmd(["true"])
193
    self.assertEqual(result.signal, None)
194
    self.assertEqual(result.exit_code, 0)
195
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
196
    self.assertEqual(result.signal, None)
197
    self.assertEqual(result.exit_code, 1)
198
    result = RunCmd(["echo", "-n", self.magic])
199
    self.assertEqual(result.signal, None)
200
    self.assertEqual(result.exit_code, 0)
201
    self.assertEqual(result.stdout, self.magic)
202

    
203
  def testFileEmptyOutput(self):
204
    """Test file output"""
205
    result = RunCmd(["true"], output=self.fname)
206
    self.assertEqual(result.signal, None)
207
    self.assertEqual(result.exit_code, 0)
208
    self.assertFileContent(self.fname, "")
209

    
210
  def testLang(self):
211
    """Test locale environment"""
212
    old_env = os.environ.copy()
213
    try:
214
      os.environ["LANG"] = "en_US.UTF-8"
215
      os.environ["LC_ALL"] = "en_US.UTF-8"
216
      result = RunCmd(["locale"])
217
      for line in result.output.splitlines():
218
        key, value = line.split("=", 1)
219
        # Ignore these variables, they're overridden by LC_ALL
220
        if key == "LANG" or key == "LANGUAGE":
221
          continue
222
        self.failIf(value and value != "C" and value != '"C"',
223
            "Variable %s is set to the invalid value '%s'" % (key, value))
224
    finally:
225
      os.environ = old_env
226

    
227
  def testDefaultCwd(self):
228
    """Test default working directory"""
229
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
230

    
231
  def testCwd(self):
232
    """Test default working directory"""
233
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
234
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
235
    cwd = os.getcwd()
236
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
237

    
238
  def testResetEnv(self):
239
    """Test environment reset functionality"""
240
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
241

    
242

    
243
class TestRunParts(unittest.TestCase):
244
  """Testing case for the RunParts function"""
245

    
246
  def setUp(self):
247
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
248

    
249
  def tearDown(self):
250
    shutil.rmtree(self.rundir)
251

    
252
  def testEmpty(self):
253
    """Test on an empty dir"""
254
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
255

    
256
  def testSkipWrongName(self):
257
    """Test that wrong files are skipped"""
258
    fname = os.path.join(self.rundir, "00test.dot")
259
    utils.WriteFile(fname, data="")
260
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
261
    relname = os.path.basename(fname)
262
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
263
                         [(relname, constants.RUNPARTS_SKIP, None)])
264

    
265
  def testSkipNonExec(self):
266
    """Test that non executable files are skipped"""
267
    fname = os.path.join(self.rundir, "00test")
268
    utils.WriteFile(fname, data="")
269
    relname = os.path.basename(fname)
270
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
271
                         [(relname, constants.RUNPARTS_SKIP, None)])
272

    
273
  def testError(self):
274
    """Test error on a broken executable"""
275
    fname = os.path.join(self.rundir, "00test")
276
    utils.WriteFile(fname, data="")
277
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
278
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
279
    self.failUnlessEqual(relname, os.path.basename(fname))
280
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
281
    self.failUnless(error)
282

    
283
  def testSorted(self):
284
    """Test executions are sorted"""
285
    files = []
286
    files.append(os.path.join(self.rundir, "64test"))
287
    files.append(os.path.join(self.rundir, "00test"))
288
    files.append(os.path.join(self.rundir, "42test"))
289

    
290
    for fname in files:
291
      utils.WriteFile(fname, data="")
292

    
293
    results = RunParts(self.rundir, reset_env=True)
294

    
295
    for fname in sorted(files):
296
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
297

    
298
  def testOk(self):
299
    """Test correct execution"""
300
    fname = os.path.join(self.rundir, "00test")
301
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
302
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
303
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
304
    self.failUnlessEqual(relname, os.path.basename(fname))
305
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
306
    self.failUnlessEqual(runresult.stdout, "ciao")
307

    
308
  def testRunFail(self):
309
    """Test correct execution, with run failure"""
310
    fname = os.path.join(self.rundir, "00test")
311
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
312
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
313
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
314
    self.failUnlessEqual(relname, os.path.basename(fname))
315
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
316
    self.failUnlessEqual(runresult.exit_code, 1)
317
    self.failUnless(runresult.failed)
318

    
319
  def testRunMix(self):
320
    files = []
321
    files.append(os.path.join(self.rundir, "00test"))
322
    files.append(os.path.join(self.rundir, "42test"))
323
    files.append(os.path.join(self.rundir, "64test"))
324
    files.append(os.path.join(self.rundir, "99test"))
325

    
326
    files.sort()
327

    
328
    # 1st has errors in execution
329
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
330
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
331

    
332
    # 2nd is skipped
333
    utils.WriteFile(files[1], data="")
334

    
335
    # 3rd cannot execute properly
336
    utils.WriteFile(files[2], data="")
337
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
338

    
339
    # 4th execs
340
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
341
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
342

    
343
    results = RunParts(self.rundir, reset_env=True)
344

    
345
    (relname, status, runresult) = results[0]
346
    self.failUnlessEqual(relname, os.path.basename(files[0]))
347
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
348
    self.failUnlessEqual(runresult.exit_code, 1)
349
    self.failUnless(runresult.failed)
350

    
351
    (relname, status, runresult) = results[1]
352
    self.failUnlessEqual(relname, os.path.basename(files[1]))
353
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
354
    self.failUnlessEqual(runresult, None)
355

    
356
    (relname, status, runresult) = results[2]
357
    self.failUnlessEqual(relname, os.path.basename(files[2]))
358
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
359
    self.failUnless(runresult)
360

    
361
    (relname, status, runresult) = results[3]
362
    self.failUnlessEqual(relname, os.path.basename(files[3]))
363
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
364
    self.failUnlessEqual(runresult.output, "ciao")
365
    self.failUnlessEqual(runresult.exit_code, 0)
366
    self.failUnless(not runresult.failed)
367

    
368

    
369
class TestRemoveFile(unittest.TestCase):
370
  """Test case for the RemoveFile function"""
371

    
372
  def setUp(self):
373
    """Create a temp dir and file for each case"""
374
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
375
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
376
    os.close(fd)
377

    
378
  def tearDown(self):
379
    if os.path.exists(self.tmpfile):
380
      os.unlink(self.tmpfile)
381
    os.rmdir(self.tmpdir)
382

    
383

    
384
  def testIgnoreDirs(self):
385
    """Test that RemoveFile() ignores directories"""
386
    self.assertEqual(None, RemoveFile(self.tmpdir))
387

    
388

    
389
  def testIgnoreNotExisting(self):
390
    """Test that RemoveFile() ignores non-existing files"""
391
    RemoveFile(self.tmpfile)
392
    RemoveFile(self.tmpfile)
393

    
394

    
395
  def testRemoveFile(self):
396
    """Test that RemoveFile does remove a file"""
397
    RemoveFile(self.tmpfile)
398
    if os.path.exists(self.tmpfile):
399
      self.fail("File '%s' not removed" % self.tmpfile)
400

    
401

    
402
  def testRemoveSymlink(self):
403
    """Test that RemoveFile does remove symlinks"""
404
    symlink = self.tmpdir + "/symlink"
405
    os.symlink("no-such-file", symlink)
406
    RemoveFile(symlink)
407
    if os.path.exists(symlink):
408
      self.fail("File '%s' not removed" % symlink)
409
    os.symlink(self.tmpfile, symlink)
410
    RemoveFile(symlink)
411
    if os.path.exists(symlink):
412
      self.fail("File '%s' not removed" % symlink)
413

    
414

    
415
class TestRename(unittest.TestCase):
416
  """Test case for RenameFile"""
417

    
418
  def setUp(self):
419
    """Create a temporary directory"""
420
    self.tmpdir = tempfile.mkdtemp()
421
    self.tmpfile = os.path.join(self.tmpdir, "test1")
422

    
423
    # Touch the file
424
    open(self.tmpfile, "w").close()
425

    
426
  def tearDown(self):
427
    """Remove temporary directory"""
428
    shutil.rmtree(self.tmpdir)
429

    
430
  def testSimpleRename1(self):
431
    """Simple rename 1"""
432
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
433
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
434

    
435
  def testSimpleRename2(self):
436
    """Simple rename 2"""
437
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
438
                     mkdir=True)
439
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
440

    
441
  def testRenameMkdir(self):
442
    """Rename with mkdir"""
443
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
444
                     mkdir=True)
445
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
446
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
447

    
448
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
449
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
450
                     mkdir=True)
451
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
452
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
453
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
454

    
455

    
456
class TestMatchNameComponent(unittest.TestCase):
457
  """Test case for the MatchNameComponent function"""
458

    
459
  def testEmptyList(self):
460
    """Test that there is no match against an empty list"""
461

    
462
    self.failUnlessEqual(MatchNameComponent("", []), None)
463
    self.failUnlessEqual(MatchNameComponent("test", []), None)
464

    
465
  def testSingleMatch(self):
466
    """Test that a single match is performed correctly"""
467
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
468
    for key in "test2", "test2.example", "test2.example.com":
469
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
470

    
471
  def testMultipleMatches(self):
472
    """Test that a multiple match is returned as None"""
473
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
474
    for key in "test1", "test1.example":
475
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
476

    
477
  def testFullMatch(self):
478
    """Test that a full match is returned correctly"""
479
    key1 = "test1"
480
    key2 = "test1.example"
481
    mlist = [key2, key2 + ".com"]
482
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
483
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
484

    
485
  def testCaseInsensitivePartialMatch(self):
486
    """Test for the case_insensitive keyword"""
487
    mlist = ["test1.example.com", "test2.example.net"]
488
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
489
                     "test2.example.net")
490
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
491
                     "test2.example.net")
492
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
493
                     "test2.example.net")
494
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
495
                     "test2.example.net")
496

    
497

    
498
  def testCaseInsensitiveFullMatch(self):
499
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
500
    # Between the two ts1 a full string match non-case insensitive should work
501
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
502
                     None)
503
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
504
                     "ts1.ex")
505
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
506
                     "ts1.ex")
507
    # Between the two ts2 only case differs, so only case-match works
508
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
509
                     "ts2.ex")
510
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
511
                     "Ts2.ex")
512
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
513
                     None)
514

    
515

    
516
class TestFormatUnit(unittest.TestCase):
517
  """Test case for the FormatUnit function"""
518

    
519
  def testMiB(self):
520
    self.assertEqual(FormatUnit(1, 'h'), '1M')
521
    self.assertEqual(FormatUnit(100, 'h'), '100M')
522
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
523

    
524
    self.assertEqual(FormatUnit(1, 'm'), '1')
525
    self.assertEqual(FormatUnit(100, 'm'), '100')
526
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
527

    
528
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
529
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
530
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
531
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
532

    
533
  def testGiB(self):
534
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
535
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
536
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
537
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
538

    
539
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
540
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
541
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
542
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
543

    
544
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
545
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
546
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
547

    
548
  def testTiB(self):
549
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
550
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
551
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
552

    
553
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
554
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
555
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
556

    
557
class TestParseUnit(unittest.TestCase):
558
  """Test case for the ParseUnit function"""
559

    
560
  SCALES = (('', 1),
561
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
562
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
563
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
564

    
565
  def testRounding(self):
566
    self.assertEqual(ParseUnit('0'), 0)
567
    self.assertEqual(ParseUnit('1'), 4)
568
    self.assertEqual(ParseUnit('2'), 4)
569
    self.assertEqual(ParseUnit('3'), 4)
570

    
571
    self.assertEqual(ParseUnit('124'), 124)
572
    self.assertEqual(ParseUnit('125'), 128)
573
    self.assertEqual(ParseUnit('126'), 128)
574
    self.assertEqual(ParseUnit('127'), 128)
575
    self.assertEqual(ParseUnit('128'), 128)
576
    self.assertEqual(ParseUnit('129'), 132)
577
    self.assertEqual(ParseUnit('130'), 132)
578

    
579
  def testFloating(self):
580
    self.assertEqual(ParseUnit('0'), 0)
581
    self.assertEqual(ParseUnit('0.5'), 4)
582
    self.assertEqual(ParseUnit('1.75'), 4)
583
    self.assertEqual(ParseUnit('1.99'), 4)
584
    self.assertEqual(ParseUnit('2.00'), 4)
585
    self.assertEqual(ParseUnit('2.01'), 4)
586
    self.assertEqual(ParseUnit('3.99'), 4)
587
    self.assertEqual(ParseUnit('4.00'), 4)
588
    self.assertEqual(ParseUnit('4.01'), 8)
589
    self.assertEqual(ParseUnit('1.5G'), 1536)
590
    self.assertEqual(ParseUnit('1.8G'), 1844)
591
    self.assertEqual(ParseUnit('8.28T'), 8682212)
592

    
593
  def testSuffixes(self):
594
    for sep in ('', ' ', '   ', "\t", "\t "):
595
      for suffix, scale in TestParseUnit.SCALES:
596
        for func in (lambda x: x, str.lower, str.upper):
597
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
598
                           1024 * scale)
599

    
600
  def testInvalidInput(self):
601
    for sep in ('-', '_', ',', 'a'):
602
      for suffix, _ in TestParseUnit.SCALES:
603
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
604

    
605
    for suffix, _ in TestParseUnit.SCALES:
606
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
607

    
608

    
609
class TestSshKeys(testutils.GanetiTestCase):
610
  """Test case for the AddAuthorizedKey function"""
611

    
612
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
613
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
614
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
615

    
616
  def setUp(self):
617
    testutils.GanetiTestCase.setUp(self)
618
    self.tmpname = self._CreateTempFile()
619
    handle = open(self.tmpname, 'w')
620
    try:
621
      handle.write("%s\n" % TestSshKeys.KEY_A)
622
      handle.write("%s\n" % TestSshKeys.KEY_B)
623
    finally:
624
      handle.close()
625

    
626
  def testAddingNewKey(self):
627
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
628

    
629
    self.assertFileContent(self.tmpname,
630
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
631
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
632
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
633
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
634

    
635
  def testAddingAlmostButNotCompletelyTheSameKey(self):
636
    AddAuthorizedKey(self.tmpname,
637
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
638

    
639
    self.assertFileContent(self.tmpname,
640
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
641
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
642
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
643
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
644

    
645
  def testAddingExistingKeyWithSomeMoreSpaces(self):
646
    AddAuthorizedKey(self.tmpname,
647
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
648

    
649
    self.assertFileContent(self.tmpname,
650
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
651
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
652
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
653

    
654
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
655
    RemoveAuthorizedKey(self.tmpname,
656
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
657

    
658
    self.assertFileContent(self.tmpname,
659
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
660
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
661

    
662
  def testRemovingNonExistingKey(self):
663
    RemoveAuthorizedKey(self.tmpname,
664
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
665

    
666
    self.assertFileContent(self.tmpname,
667
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
668
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
669
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
670

    
671

    
672
class TestEtcHosts(testutils.GanetiTestCase):
673
  """Test functions modifying /etc/hosts"""
674

    
675
  def setUp(self):
676
    testutils.GanetiTestCase.setUp(self)
677
    self.tmpname = self._CreateTempFile()
678
    handle = open(self.tmpname, 'w')
679
    try:
680
      handle.write('# This is a test file for /etc/hosts\n')
681
      handle.write('127.0.0.1\tlocalhost\n')
682
      handle.write('192.168.1.1 router gw\n')
683
    finally:
684
      handle.close()
685

    
686
  def testSettingNewIp(self):
687
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
688

    
689
    self.assertFileContent(self.tmpname,
690
      "# This is a test file for /etc/hosts\n"
691
      "127.0.0.1\tlocalhost\n"
692
      "192.168.1.1 router gw\n"
693
      "1.2.3.4\tmyhost.domain.tld myhost\n")
694
    self.assertFileMode(self.tmpname, 0644)
695

    
696
  def testSettingExistingIp(self):
697
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
698
                     ['myhost'])
699

    
700
    self.assertFileContent(self.tmpname,
701
      "# This is a test file for /etc/hosts\n"
702
      "127.0.0.1\tlocalhost\n"
703
      "192.168.1.1\tmyhost.domain.tld myhost\n")
704
    self.assertFileMode(self.tmpname, 0644)
705

    
706
  def testSettingDuplicateName(self):
707
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
708

    
709
    self.assertFileContent(self.tmpname,
710
      "# This is a test file for /etc/hosts\n"
711
      "127.0.0.1\tlocalhost\n"
712
      "192.168.1.1 router gw\n"
713
      "1.2.3.4\tmyhost\n")
714
    self.assertFileMode(self.tmpname, 0644)
715

    
716
  def testRemovingExistingHost(self):
717
    RemoveEtcHostsEntry(self.tmpname, 'router')
718

    
719
    self.assertFileContent(self.tmpname,
720
      "# This is a test file for /etc/hosts\n"
721
      "127.0.0.1\tlocalhost\n"
722
      "192.168.1.1 gw\n")
723
    self.assertFileMode(self.tmpname, 0644)
724

    
725
  def testRemovingSingleExistingHost(self):
726
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
727

    
728
    self.assertFileContent(self.tmpname,
729
      "# This is a test file for /etc/hosts\n"
730
      "192.168.1.1 router gw\n")
731
    self.assertFileMode(self.tmpname, 0644)
732

    
733
  def testRemovingNonExistingHost(self):
734
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
735

    
736
    self.assertFileContent(self.tmpname,
737
      "# This is a test file for /etc/hosts\n"
738
      "127.0.0.1\tlocalhost\n"
739
      "192.168.1.1 router gw\n")
740
    self.assertFileMode(self.tmpname, 0644)
741

    
742
  def testRemovingAlias(self):
743
    RemoveEtcHostsEntry(self.tmpname, 'gw')
744

    
745
    self.assertFileContent(self.tmpname,
746
      "# This is a test file for /etc/hosts\n"
747
      "127.0.0.1\tlocalhost\n"
748
      "192.168.1.1 router\n")
749
    self.assertFileMode(self.tmpname, 0644)
750

    
751

    
752
class TestShellQuoting(unittest.TestCase):
753
  """Test case for shell quoting functions"""
754

    
755
  def testShellQuote(self):
756
    self.assertEqual(ShellQuote('abc'), "abc")
757
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
758
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
759
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
760
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
761

    
762
  def testShellQuoteArgs(self):
763
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
764
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
765
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
766

    
767

    
768
class TestTcpPing(unittest.TestCase):
769
  """Testcase for TCP version of ping - against listen(2)ing port"""
770

    
771
  def setUp(self):
772
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
773
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
774
    self.listenerport = self.listener.getsockname()[1]
775
    self.listener.listen(1)
776

    
777
  def tearDown(self):
778
    self.listener.shutdown(socket.SHUT_RDWR)
779
    del self.listener
780
    del self.listenerport
781

    
782
  def testTcpPingToLocalHostAccept(self):
783
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
784
                         self.listenerport,
785
                         timeout=10,
786
                         live_port_needed=True,
787
                         source=constants.LOCALHOST_IP_ADDRESS,
788
                         ),
789
                 "failed to connect to test listener")
790

    
791
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
792
                         self.listenerport,
793
                         timeout=10,
794
                         live_port_needed=True,
795
                         ),
796
                 "failed to connect to test listener (no source)")
797

    
798

    
799
class TestTcpPingDeaf(unittest.TestCase):
800
  """Testcase for TCP version of ping - against non listen(2)ing port"""
801

    
802
  def setUp(self):
803
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
804
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
805
    self.deaflistenerport = self.deaflistener.getsockname()[1]
806

    
807
  def tearDown(self):
808
    del self.deaflistener
809
    del self.deaflistenerport
810

    
811
  def testTcpPingToLocalHostAcceptDeaf(self):
812
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
813
                        self.deaflistenerport,
814
                        timeout=constants.TCP_PING_TIMEOUT,
815
                        live_port_needed=True,
816
                        source=constants.LOCALHOST_IP_ADDRESS,
817
                        ), # need successful connect(2)
818
                "successfully connected to deaf listener")
819

    
820
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
821
                        self.deaflistenerport,
822
                        timeout=constants.TCP_PING_TIMEOUT,
823
                        live_port_needed=True,
824
                        ), # need successful connect(2)
825
                "successfully connected to deaf listener (no source addr)")
826

    
827
  def testTcpPingToLocalHostNoAccept(self):
828
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
829
                         self.deaflistenerport,
830
                         timeout=constants.TCP_PING_TIMEOUT,
831
                         live_port_needed=False,
832
                         source=constants.LOCALHOST_IP_ADDRESS,
833
                         ), # ECONNREFUSED is OK
834
                 "failed to ping alive host on deaf port")
835

    
836
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
837
                         self.deaflistenerport,
838
                         timeout=constants.TCP_PING_TIMEOUT,
839
                         live_port_needed=False,
840
                         ), # ECONNREFUSED is OK
841
                 "failed to ping alive host on deaf port (no source addr)")
842

    
843

    
844
class TestOwnIpAddress(unittest.TestCase):
845
  """Testcase for OwnIpAddress"""
846

    
847
  def testOwnLoopback(self):
848
    """check having the loopback ip"""
849
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
850
                    "Should own the loopback address")
851

    
852
  def testNowOwnAddress(self):
853
    """check that I don't own an address"""
854

    
855
    # network 192.0.2.0/24 is reserved for test/documentation as per
856
    # rfc 3330, so we *should* not have an address of this range... if
857
    # this fails, we should extend the test to multiple addresses
858
    DST_IP = "192.0.2.1"
859
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
860

    
861

    
862
class TestListVisibleFiles(unittest.TestCase):
863
  """Test case for ListVisibleFiles"""
864

    
865
  def setUp(self):
866
    self.path = tempfile.mkdtemp()
867

    
868
  def tearDown(self):
869
    shutil.rmtree(self.path)
870

    
871
  def _test(self, files, expected):
872
    # Sort a copy
873
    expected = expected[:]
874
    expected.sort()
875

    
876
    for name in files:
877
      f = open(os.path.join(self.path, name), 'w')
878
      try:
879
        f.write("Test\n")
880
      finally:
881
        f.close()
882

    
883
    found = ListVisibleFiles(self.path)
884
    found.sort()
885

    
886
    self.assertEqual(found, expected)
887

    
888
  def testAllVisible(self):
889
    files = ["a", "b", "c"]
890
    expected = files
891
    self._test(files, expected)
892

    
893
  def testNoneVisible(self):
894
    files = [".a", ".b", ".c"]
895
    expected = []
896
    self._test(files, expected)
897

    
898
  def testSomeVisible(self):
899
    files = ["a", "b", ".c"]
900
    expected = ["a", "b"]
901
    self._test(files, expected)
902

    
903
  def testNonAbsolutePath(self):
904
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
905

    
906
  def testNonNormalizedPath(self):
907
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
908
                          "/bin/../tmp")
909

    
910

    
911
class TestNewUUID(unittest.TestCase):
912
  """Test case for NewUUID"""
913

    
914
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
915
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
916

    
917
  def runTest(self):
918
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
919

    
920

    
921
class TestUniqueSequence(unittest.TestCase):
922
  """Test case for UniqueSequence"""
923

    
924
  def _test(self, input, expected):
925
    self.assertEqual(utils.UniqueSequence(input), expected)
926

    
927
  def runTest(self):
928
    # Ordered input
929
    self._test([1, 2, 3], [1, 2, 3])
930
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
931
    self._test([1, 2, 2, 3], [1, 2, 3])
932
    self._test([1, 2, 3, 3], [1, 2, 3])
933

    
934
    # Unordered input
935
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
936
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
937

    
938
    # Strings
939
    self._test(["a", "a"], ["a"])
940
    self._test(["a", "b"], ["a", "b"])
941
    self._test(["a", "b", "a"], ["a", "b"])
942

    
943

    
944
class TestFirstFree(unittest.TestCase):
945
  """Test case for the FirstFree function"""
946

    
947
  def test(self):
948
    """Test FirstFree"""
949
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
950
    self.failUnlessEqual(FirstFree([]), None)
951
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
952
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
953
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
954

    
955

    
956
class TestTailFile(testutils.GanetiTestCase):
957
  """Test case for the TailFile function"""
958

    
959
  def testEmpty(self):
960
    fname = self._CreateTempFile()
961
    self.failUnlessEqual(TailFile(fname), [])
962
    self.failUnlessEqual(TailFile(fname, lines=25), [])
963

    
964
  def testAllLines(self):
965
    data = ["test %d" % i for i in range(30)]
966
    for i in range(30):
967
      fname = self._CreateTempFile()
968
      fd = open(fname, "w")
969
      fd.write("\n".join(data[:i]))
970
      if i > 0:
971
        fd.write("\n")
972
      fd.close()
973
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
974

    
975
  def testPartialLines(self):
976
    data = ["test %d" % i for i in range(30)]
977
    fname = self._CreateTempFile()
978
    fd = open(fname, "w")
979
    fd.write("\n".join(data))
980
    fd.write("\n")
981
    fd.close()
982
    for i in range(1, 30):
983
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
984

    
985
  def testBigFile(self):
986
    data = ["test %d" % i for i in range(30)]
987
    fname = self._CreateTempFile()
988
    fd = open(fname, "w")
989
    fd.write("X" * 1048576)
990
    fd.write("\n")
991
    fd.write("\n".join(data))
992
    fd.write("\n")
993
    fd.close()
994
    for i in range(1, 30):
995
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
996

    
997

    
998
class _BaseFileLockTest:
999
  """Test case for the FileLock class"""
1000

    
1001
  def testSharedNonblocking(self):
1002
    self.lock.Shared(blocking=False)
1003
    self.lock.Close()
1004

    
1005
  def testExclusiveNonblocking(self):
1006
    self.lock.Exclusive(blocking=False)
1007
    self.lock.Close()
1008

    
1009
  def testUnlockNonblocking(self):
1010
    self.lock.Unlock(blocking=False)
1011
    self.lock.Close()
1012

    
1013
  def testSharedBlocking(self):
1014
    self.lock.Shared(blocking=True)
1015
    self.lock.Close()
1016

    
1017
  def testExclusiveBlocking(self):
1018
    self.lock.Exclusive(blocking=True)
1019
    self.lock.Close()
1020

    
1021
  def testUnlockBlocking(self):
1022
    self.lock.Unlock(blocking=True)
1023
    self.lock.Close()
1024

    
1025
  def testSharedExclusiveUnlock(self):
1026
    self.lock.Shared(blocking=False)
1027
    self.lock.Exclusive(blocking=False)
1028
    self.lock.Unlock(blocking=False)
1029
    self.lock.Close()
1030

    
1031
  def testExclusiveSharedUnlock(self):
1032
    self.lock.Exclusive(blocking=False)
1033
    self.lock.Shared(blocking=False)
1034
    self.lock.Unlock(blocking=False)
1035
    self.lock.Close()
1036

    
1037
  def testSimpleTimeout(self):
1038
    # These will succeed on the first attempt, hence a short timeout
1039
    self.lock.Shared(blocking=True, timeout=10.0)
1040
    self.lock.Exclusive(blocking=False, timeout=10.0)
1041
    self.lock.Unlock(blocking=True, timeout=10.0)
1042
    self.lock.Close()
1043

    
1044
  @staticmethod
1045
  def _TryLockInner(filename, shared, blocking):
1046
    lock = utils.FileLock.Open(filename)
1047

    
1048
    if shared:
1049
      fn = lock.Shared
1050
    else:
1051
      fn = lock.Exclusive
1052

    
1053
    try:
1054
      # The timeout doesn't really matter as the parent process waits for us to
1055
      # finish anyway.
1056
      fn(blocking=blocking, timeout=0.01)
1057
    except errors.LockError, err:
1058
      return False
1059

    
1060
    return True
1061

    
1062
  def _TryLock(self, *args):
1063
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1064
                                      *args)
1065

    
1066
  def testTimeout(self):
1067
    for blocking in [True, False]:
1068
      self.lock.Exclusive(blocking=True)
1069
      self.failIf(self._TryLock(False, blocking))
1070
      self.failIf(self._TryLock(True, blocking))
1071

    
1072
      self.lock.Shared(blocking=True)
1073
      self.assert_(self._TryLock(True, blocking))
1074
      self.failIf(self._TryLock(False, blocking))
1075

    
1076
  def testCloseShared(self):
1077
    self.lock.Close()
1078
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1079

    
1080
  def testCloseExclusive(self):
1081
    self.lock.Close()
1082
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1083

    
1084
  def testCloseUnlock(self):
1085
    self.lock.Close()
1086
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1087

    
1088

    
1089
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1090
  TESTDATA = "Hello World\n" * 10
1091

    
1092
  def setUp(self):
1093
    testutils.GanetiTestCase.setUp(self)
1094

    
1095
    self.tmpfile = tempfile.NamedTemporaryFile()
1096
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1097
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1098

    
1099
    # Ensure "Open" didn't truncate file
1100
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1101

    
1102
  def tearDown(self):
1103
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1104

    
1105
    testutils.GanetiTestCase.tearDown(self)
1106

    
1107

    
1108
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1109
  def setUp(self):
1110
    self.tmpfile = tempfile.NamedTemporaryFile()
1111
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1112

    
1113

    
1114
class TestTimeFunctions(unittest.TestCase):
1115
  """Test case for time functions"""
1116

    
1117
  def runTest(self):
1118
    self.assertEqual(utils.SplitTime(1), (1, 0))
1119
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1120
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1121
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1122
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1123
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1124
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1125
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1126

    
1127
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1128

    
1129
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1130
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1131
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1132

    
1133
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1134
                     1218448917.481)
1135
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1136

    
1137
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1138
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1139
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1140
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1141
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1142

    
1143

    
1144
class FieldSetTestCase(unittest.TestCase):
1145
  """Test case for FieldSets"""
1146

    
1147
  def testSimpleMatch(self):
1148
    f = utils.FieldSet("a", "b", "c", "def")
1149
    self.failUnless(f.Matches("a"))
1150
    self.failIf(f.Matches("d"), "Substring matched")
1151
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1152
    self.failIf(f.NonMatching(["b", "c"]))
1153
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1154
    self.failUnless(f.NonMatching(["a", "d"]))
1155

    
1156
  def testRegexMatch(self):
1157
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1158
    self.failUnless(f.Matches("b1"))
1159
    self.failUnless(f.Matches("b99"))
1160
    self.failIf(f.Matches("b/1"))
1161
    self.failIf(f.NonMatching(["b12", "c"]))
1162
    self.failUnless(f.NonMatching(["a", "1"]))
1163

    
1164
class TestForceDictType(unittest.TestCase):
1165
  """Test case for ForceDictType"""
1166

    
1167
  def setUp(self):
1168
    self.key_types = {
1169
      'a': constants.VTYPE_INT,
1170
      'b': constants.VTYPE_BOOL,
1171
      'c': constants.VTYPE_STRING,
1172
      'd': constants.VTYPE_SIZE,
1173
      }
1174

    
1175
  def _fdt(self, dict, allowed_values=None):
1176
    if allowed_values is None:
1177
      ForceDictType(dict, self.key_types)
1178
    else:
1179
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1180

    
1181
    return dict
1182

    
1183
  def testSimpleDict(self):
1184
    self.assertEqual(self._fdt({}), {})
1185
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1186
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1187
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1188
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1189
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1190
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1191
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1192
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1193
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1194
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1195
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1196

    
1197
  def testErrors(self):
1198
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1199
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1200
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1201
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1202

    
1203

    
1204
class TestIsAbsNormPath(unittest.TestCase):
1205
  """Testing case for IsProcessAlive"""
1206

    
1207
  def _pathTestHelper(self, path, result):
1208
    if result:
1209
      self.assert_(IsNormAbsPath(path),
1210
          "Path %s should result absolute and normalized" % path)
1211
    else:
1212
      self.assert_(not IsNormAbsPath(path),
1213
          "Path %s should not result absolute and normalized" % path)
1214

    
1215
  def testBase(self):
1216
    self._pathTestHelper('/etc', True)
1217
    self._pathTestHelper('/srv', True)
1218
    self._pathTestHelper('etc', False)
1219
    self._pathTestHelper('/etc/../root', False)
1220
    self._pathTestHelper('/etc/', False)
1221

    
1222

    
1223
class TestSafeEncode(unittest.TestCase):
1224
  """Test case for SafeEncode"""
1225

    
1226
  def testAscii(self):
1227
    for txt in [string.digits, string.letters, string.punctuation]:
1228
      self.failUnlessEqual(txt, SafeEncode(txt))
1229

    
1230
  def testDoubleEncode(self):
1231
    for i in range(255):
1232
      txt = SafeEncode(chr(i))
1233
      self.failUnlessEqual(txt, SafeEncode(txt))
1234

    
1235
  def testUnicode(self):
1236
    # 1024 is high enough to catch non-direct ASCII mappings
1237
    for i in range(1024):
1238
      txt = SafeEncode(unichr(i))
1239
      self.failUnlessEqual(txt, SafeEncode(txt))
1240

    
1241

    
1242
class TestFormatTime(unittest.TestCase):
1243
  """Testing case for FormatTime"""
1244

    
1245
  def testNone(self):
1246
    self.failUnlessEqual(FormatTime(None), "N/A")
1247

    
1248
  def testInvalid(self):
1249
    self.failUnlessEqual(FormatTime(()), "N/A")
1250

    
1251
  def testNow(self):
1252
    # tests that we accept time.time input
1253
    FormatTime(time.time())
1254
    # tests that we accept int input
1255
    FormatTime(int(time.time()))
1256

    
1257

    
1258
class RunInSeparateProcess(unittest.TestCase):
1259
  def test(self):
1260
    for exp in [True, False]:
1261
      def _child():
1262
        return exp
1263

    
1264
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1265

    
1266
  def testArgs(self):
1267
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1268
      def _child(carg1, carg2):
1269
        return carg1 == "Foo" and carg2 == arg
1270

    
1271
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1272

    
1273
  def testPid(self):
1274
    parent_pid = os.getpid()
1275

    
1276
    def _check():
1277
      return os.getpid() == parent_pid
1278

    
1279
    self.failIf(utils.RunInSeparateProcess(_check))
1280

    
1281
  def testSignal(self):
1282
    def _kill():
1283
      os.kill(os.getpid(), signal.SIGTERM)
1284

    
1285
    self.assertRaises(errors.GenericError,
1286
                      utils.RunInSeparateProcess, _kill)
1287

    
1288
  def testException(self):
1289
    def _exc():
1290
      raise errors.GenericError("This is a test")
1291

    
1292
    self.assertRaises(errors.GenericError,
1293
                      utils.RunInSeparateProcess, _exc)
1294

    
1295

    
1296
class TestFingerprintFile(unittest.TestCase):
1297
  def setUp(self):
1298
    self.tmpfile = tempfile.NamedTemporaryFile()
1299

    
1300
  def test(self):
1301
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1302
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1303

    
1304
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1305
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1306
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1307

    
1308

    
1309
class TestUnescapeAndSplit(unittest.TestCase):
1310
  """Testing case for UnescapeAndSplit"""
1311

    
1312
  def setUp(self):
1313
    # testing more that one separator for regexp safety
1314
    self._seps = [",", "+", "."]
1315

    
1316
  def testSimple(self):
1317
    a = ["a", "b", "c", "d"]
1318
    for sep in self._seps:
1319
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1320

    
1321
  def testEscape(self):
1322
    for sep in self._seps:
1323
      a = ["a", "b\\" + sep + "c", "d"]
1324
      b = ["a", "b" + sep + "c", "d"]
1325
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1326

    
1327
  def testDoubleEscape(self):
1328
    for sep in self._seps:
1329
      a = ["a", "b\\\\", "c", "d"]
1330
      b = ["a", "b\\", "c", "d"]
1331
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1332

    
1333
  def testThreeEscape(self):
1334
    for sep in self._seps:
1335
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1336
      b = ["a", "b\\" + sep + "c", "d"]
1337
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1338

    
1339

    
1340
class TestPathJoin(unittest.TestCase):
1341
  """Testing case for PathJoin"""
1342

    
1343
  def testBasicItems(self):
1344
    mlist = ["/a", "b", "c"]
1345
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1346

    
1347
  def testNonAbsPrefix(self):
1348
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1349

    
1350
  def testBackTrack(self):
1351
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1352

    
1353
  def testMultiAbs(self):
1354
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1355

    
1356

    
1357
class TestHostInfo(unittest.TestCase):
1358
  """Testing case for HostInfo"""
1359

    
1360
  def testUppercase(self):
1361
    data = "AbC.example.com"
1362
    self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1363

    
1364
  def testTooLongName(self):
1365
    data = "a.b." + "c" * 255
1366
    self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1367

    
1368
  def testTrailingDot(self):
1369
    data = "a.b.c"
1370
    self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1371

    
1372
  def testInvalidName(self):
1373
    data = [
1374
      "a b",
1375
      "a/b",
1376
      ".a.b",
1377
      "a..b",
1378
      ]
1379
    for value in data:
1380
      self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1381

    
1382
  def testValidName(self):
1383
    data = [
1384
      "a.b",
1385
      "a-b",
1386
      "a_b",
1387
      "a.b.c",
1388
      ]
1389
    for value in data:
1390
      HostInfo.NormalizeName(value)
1391

    
1392

    
1393
class TestParseAsn1Generalizedtime(unittest.TestCase):
1394
  def test(self):
1395
    # UTC
1396
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1397
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1398
                     1266860512)
1399
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1400
                     (2**31) - 1)
1401

    
1402
    # With offset
1403
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1404
                     1266860512)
1405
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1406
                     1266931012)
1407
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1408
                     1266931088)
1409
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1410
                     1266931295)
1411
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1412
                     3600)
1413

    
1414
    # Leap seconds are not supported by datetime.datetime
1415
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1416
                      "19841231235960+0000")
1417
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1418
                      "19920630235960+0000")
1419

    
1420
    # Errors
1421
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1422
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1423
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1424
                      "20100222174152")
1425
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1426
                      "Mon Feb 22 17:47:02 UTC 2010")
1427
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1428
                      "2010-02-22 17:42:02")
1429

    
1430

    
1431
class TestGetX509CertValidity(testutils.GanetiTestCase):
1432
  def setUp(self):
1433
    testutils.GanetiTestCase.setUp(self)
1434

    
1435
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1436

    
1437
    # Test whether we have pyOpenSSL 0.7 or above
1438
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1439

    
1440
    if not self.pyopenssl0_7:
1441
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1442
                    " function correctly")
1443

    
1444
  def _LoadCert(self, name):
1445
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1446
                                           self._ReadTestData(name))
1447

    
1448
  def test(self):
1449
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1450
    if self.pyopenssl0_7:
1451
      self.assertEqual(validity, (1266919967, 1267524767))
1452
    else:
1453
      self.assertEqual(validity, (None, None))
1454

    
1455

    
1456
if __name__ == '__main__':
1457
  testutils.GanetiTestProgram()