Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 6bb65e3a

History | View | Annotate | Download (42.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import stat
31
import md5
32
import signal
33
import socket
34
import shutil
35
import re
36
import select
37
import string
38

    
39
import ganeti
40
import testutils
41
from ganeti import constants
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti.utils import IsProcessAlive, RunCmd, \
45
     RemoveFile, MatchNameComponent, FormatUnit, \
46
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
47
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
48
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
49
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
50
     UnescapeAndSplit, RunParts
51

    
52
from ganeti.errors import LockError, UnitParseError, GenericError, \
53
     ProgrammerError
54

    
55

    
56
class TestIsProcessAlive(unittest.TestCase):
57
  """Testing case for IsProcessAlive"""
58

    
59
  def testExists(self):
60
    mypid = os.getpid()
61
    self.assert_(IsProcessAlive(mypid),
62
                 "can't find myself running")
63

    
64
  def testNotExisting(self):
65
    pid_non_existing = os.fork()
66
    if pid_non_existing == 0:
67
      os._exit(0)
68
    elif pid_non_existing < 0:
69
      raise SystemError("can't fork")
70
    os.waitpid(pid_non_existing, 0)
71
    self.assert_(not IsProcessAlive(pid_non_existing),
72
                 "nonexisting process detected")
73

    
74

    
75
class TestPidFileFunctions(unittest.TestCase):
76
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
77

    
78
  def setUp(self):
79
    self.dir = tempfile.mkdtemp()
80
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
81
    utils.DaemonPidFileName = self.f_dpn
82

    
83
  def testPidFileFunctions(self):
84
    pid_file = self.f_dpn('test')
85
    utils.WritePidFile('test')
86
    self.failUnless(os.path.exists(pid_file),
87
                    "PID file should have been created")
88
    read_pid = utils.ReadPidFile(pid_file)
89
    self.failUnlessEqual(read_pid, os.getpid())
90
    self.failUnless(utils.IsProcessAlive(read_pid))
91
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
92
    utils.RemovePidFile('test')
93
    self.failIf(os.path.exists(pid_file),
94
                "PID file should not exist anymore")
95
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
96
                         "ReadPidFile should return 0 for missing pid file")
97
    fh = open(pid_file, "w")
98
    fh.write("blah\n")
99
    fh.close()
100
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
101
                         "ReadPidFile should return 0 for invalid pid file")
102
    utils.RemovePidFile('test')
103
    self.failIf(os.path.exists(pid_file),
104
                "PID file should not exist anymore")
105

    
106
  def testKill(self):
107
    pid_file = self.f_dpn('child')
108
    r_fd, w_fd = os.pipe()
109
    new_pid = os.fork()
110
    if new_pid == 0: #child
111
      utils.WritePidFile('child')
112
      os.write(w_fd, 'a')
113
      signal.pause()
114
      os._exit(0)
115
      return
116
    # else we are in the parent
117
    # wait until the child has written the pid file
118
    os.read(r_fd, 1)
119
    read_pid = utils.ReadPidFile(pid_file)
120
    self.failUnlessEqual(read_pid, new_pid)
121
    self.failUnless(utils.IsProcessAlive(new_pid))
122
    utils.KillProcess(new_pid, waitpid=True)
123
    self.failIf(utils.IsProcessAlive(new_pid))
124
    utils.RemovePidFile('child')
125
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
126

    
127
  def tearDown(self):
128
    for name in os.listdir(self.dir):
129
      os.unlink(os.path.join(self.dir, name))
130
    os.rmdir(self.dir)
131

    
132

    
133
class TestRunCmd(testutils.GanetiTestCase):
134
  """Testing case for the RunCmd function"""
135

    
136
  def setUp(self):
137
    testutils.GanetiTestCase.setUp(self)
138
    self.magic = time.ctime() + " ganeti test"
139
    self.fname = self._CreateTempFile()
140

    
141
  def testOk(self):
142
    """Test successful exit code"""
143
    result = RunCmd("/bin/sh -c 'exit 0'")
144
    self.assertEqual(result.exit_code, 0)
145
    self.assertEqual(result.output, "")
146

    
147
  def testFail(self):
148
    """Test fail exit code"""
149
    result = RunCmd("/bin/sh -c 'exit 1'")
150
    self.assertEqual(result.exit_code, 1)
151
    self.assertEqual(result.output, "")
152

    
153
  def testStdout(self):
154
    """Test standard output"""
155
    cmd = 'echo -n "%s"' % self.magic
156
    result = RunCmd("/bin/sh -c '%s'" % cmd)
157
    self.assertEqual(result.stdout, self.magic)
158
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
159
    self.assertEqual(result.output, "")
160
    self.assertFileContent(self.fname, self.magic)
161

    
162
  def testStderr(self):
163
    """Test standard error"""
164
    cmd = 'echo -n "%s"' % self.magic
165
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
166
    self.assertEqual(result.stderr, self.magic)
167
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
168
    self.assertEqual(result.output, "")
169
    self.assertFileContent(self.fname, self.magic)
170

    
171
  def testCombined(self):
172
    """Test combined output"""
173
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
174
    expected = "A" + self.magic + "B" + self.magic
175
    result = RunCmd("/bin/sh -c '%s'" % cmd)
176
    self.assertEqual(result.output, expected)
177
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
178
    self.assertEqual(result.output, "")
179
    self.assertFileContent(self.fname, expected)
180

    
181
  def testSignal(self):
182
    """Test signal"""
183
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
184
    self.assertEqual(result.signal, 15)
185
    self.assertEqual(result.output, "")
186

    
187
  def testListRun(self):
188
    """Test list runs"""
189
    result = RunCmd(["true"])
190
    self.assertEqual(result.signal, None)
191
    self.assertEqual(result.exit_code, 0)
192
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
193
    self.assertEqual(result.signal, None)
194
    self.assertEqual(result.exit_code, 1)
195
    result = RunCmd(["echo", "-n", self.magic])
196
    self.assertEqual(result.signal, None)
197
    self.assertEqual(result.exit_code, 0)
198
    self.assertEqual(result.stdout, self.magic)
199

    
200
  def testFileEmptyOutput(self):
201
    """Test file output"""
202
    result = RunCmd(["true"], output=self.fname)
203
    self.assertEqual(result.signal, None)
204
    self.assertEqual(result.exit_code, 0)
205
    self.assertFileContent(self.fname, "")
206

    
207
  def testLang(self):
208
    """Test locale environment"""
209
    old_env = os.environ.copy()
210
    try:
211
      os.environ["LANG"] = "en_US.UTF-8"
212
      os.environ["LC_ALL"] = "en_US.UTF-8"
213
      result = RunCmd(["locale"])
214
      for line in result.output.splitlines():
215
        key, value = line.split("=", 1)
216
        # Ignore these variables, they're overridden by LC_ALL
217
        if key == "LANG" or key == "LANGUAGE":
218
          continue
219
        self.failIf(value and value != "C" and value != '"C"',
220
            "Variable %s is set to the invalid value '%s'" % (key, value))
221
    finally:
222
      os.environ = old_env
223

    
224
  def testDefaultCwd(self):
225
    """Test default working directory"""
226
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
227

    
228
  def testCwd(self):
229
    """Test default working directory"""
230
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
232
    cwd = os.getcwd()
233
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
234

    
235
  def testResetEnv(self):
236
    """Test environment reset functionality"""
237
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
238

    
239

    
240
class TestRunParts(unittest.TestCase):
241
  """Testing case for the RunParts function"""
242

    
243
  def setUp(self):
244
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
245

    
246
  def tearDown(self):
247
    shutil.rmtree(self.rundir)
248

    
249
  def testEmpty(self):
250
    """Test on an empty dir"""
251
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
252

    
253
  def testSkipWrongName(self):
254
    """Test that wrong files are skipped"""
255
    fname = os.path.join(self.rundir, "00test.dot")
256
    utils.WriteFile(fname, data="")
257
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
258
    relname = os.path.basename(fname)
259
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
260
                         [(relname, constants.RUNPARTS_SKIP, None)])
261

    
262
  def testSkipNonExec(self):
263
    """Test that non executable files are skipped"""
264
    fname = os.path.join(self.rundir, "00test")
265
    utils.WriteFile(fname, data="")
266
    relname = os.path.basename(fname)
267
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
268
                         [(relname, constants.RUNPARTS_SKIP, None)])
269

    
270
  def testError(self):
271
    """Test error on a broken executable"""
272
    fname = os.path.join(self.rundir, "00test")
273
    utils.WriteFile(fname, data="")
274
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
275
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
276
    self.failUnlessEqual(relname, os.path.basename(fname))
277
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
278
    self.failUnless(error)
279

    
280
  def testSorted(self):
281
    """Test executions are sorted"""
282
    files = []
283
    files.append(os.path.join(self.rundir, "64test"))
284
    files.append(os.path.join(self.rundir, "00test"))
285
    files.append(os.path.join(self.rundir, "42test"))
286

    
287
    for fname in files:
288
      utils.WriteFile(fname, data="")
289

    
290
    results = RunParts(self.rundir, reset_env=True)
291

    
292
    for fname in sorted(files):
293
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
294

    
295
  def testOk(self):
296
    """Test correct execution"""
297
    fname = os.path.join(self.rundir, "00test")
298
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
299
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
300
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
301
    self.failUnlessEqual(relname, os.path.basename(fname))
302
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
303
    self.failUnlessEqual(runresult.stdout, "ciao")
304

    
305
  def testRunFail(self):
306
    """Test correct execution, with run failure"""
307
    fname = os.path.join(self.rundir, "00test")
308
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
309
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
310
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
311
    self.failUnlessEqual(relname, os.path.basename(fname))
312
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
313
    self.failUnlessEqual(runresult.exit_code, 1)
314
    self.failUnless(runresult.failed)
315

    
316
  def testRunMix(self):
317
    files = []
318
    files.append(os.path.join(self.rundir, "00test"))
319
    files.append(os.path.join(self.rundir, "42test"))
320
    files.append(os.path.join(self.rundir, "64test"))
321
    files.append(os.path.join(self.rundir, "99test"))
322

    
323
    files.sort()
324

    
325
    # 1st has errors in execution
326
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
327
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
328

    
329
    # 2nd is skipped
330
    utils.WriteFile(files[1], data="")
331

    
332
    # 3rd cannot execute properly
333
    utils.WriteFile(files[2], data="")
334
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
335

    
336
    # 4th execs
337
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
338
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
339

    
340
    results = RunParts(self.rundir, reset_env=True)
341

    
342
    (relname, status, runresult) = results[0]
343
    self.failUnlessEqual(relname, os.path.basename(files[0]))
344
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
345
    self.failUnlessEqual(runresult.exit_code, 1)
346
    self.failUnless(runresult.failed)
347

    
348
    (relname, status, runresult) = results[1]
349
    self.failUnlessEqual(relname, os.path.basename(files[1]))
350
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
351
    self.failUnlessEqual(runresult, None)
352

    
353
    (relname, status, runresult) = results[2]
354
    self.failUnlessEqual(relname, os.path.basename(files[2]))
355
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
356
    self.failUnless(runresult)
357

    
358
    (relname, status, runresult) = results[3]
359
    self.failUnlessEqual(relname, os.path.basename(files[3]))
360
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
361
    self.failUnlessEqual(runresult.output, "ciao")
362
    self.failUnlessEqual(runresult.exit_code, 0)
363
    self.failUnless(not runresult.failed)
364

    
365

    
366
class TestRemoveFile(unittest.TestCase):
367
  """Test case for the RemoveFile function"""
368

    
369
  def setUp(self):
370
    """Create a temp dir and file for each case"""
371
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
372
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
373
    os.close(fd)
374

    
375
  def tearDown(self):
376
    if os.path.exists(self.tmpfile):
377
      os.unlink(self.tmpfile)
378
    os.rmdir(self.tmpdir)
379

    
380

    
381
  def testIgnoreDirs(self):
382
    """Test that RemoveFile() ignores directories"""
383
    self.assertEqual(None, RemoveFile(self.tmpdir))
384

    
385

    
386
  def testIgnoreNotExisting(self):
387
    """Test that RemoveFile() ignores non-existing files"""
388
    RemoveFile(self.tmpfile)
389
    RemoveFile(self.tmpfile)
390

    
391

    
392
  def testRemoveFile(self):
393
    """Test that RemoveFile does remove a file"""
394
    RemoveFile(self.tmpfile)
395
    if os.path.exists(self.tmpfile):
396
      self.fail("File '%s' not removed" % self.tmpfile)
397

    
398

    
399
  def testRemoveSymlink(self):
400
    """Test that RemoveFile does remove symlinks"""
401
    symlink = self.tmpdir + "/symlink"
402
    os.symlink("no-such-file", symlink)
403
    RemoveFile(symlink)
404
    if os.path.exists(symlink):
405
      self.fail("File '%s' not removed" % symlink)
406
    os.symlink(self.tmpfile, symlink)
407
    RemoveFile(symlink)
408
    if os.path.exists(symlink):
409
      self.fail("File '%s' not removed" % symlink)
410

    
411

    
412
class TestRename(unittest.TestCase):
413
  """Test case for RenameFile"""
414

    
415
  def setUp(self):
416
    """Create a temporary directory"""
417
    self.tmpdir = tempfile.mkdtemp()
418
    self.tmpfile = os.path.join(self.tmpdir, "test1")
419

    
420
    # Touch the file
421
    open(self.tmpfile, "w").close()
422

    
423
  def tearDown(self):
424
    """Remove temporary directory"""
425
    shutil.rmtree(self.tmpdir)
426

    
427
  def testSimpleRename1(self):
428
    """Simple rename 1"""
429
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
430
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
431

    
432
  def testSimpleRename2(self):
433
    """Simple rename 2"""
434
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
435
                     mkdir=True)
436
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
437

    
438
  def testRenameMkdir(self):
439
    """Rename with mkdir"""
440
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
441
                     mkdir=True)
442
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
443
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
444

    
445
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
446
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
447
                     mkdir=True)
448
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
449
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
450
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
451

    
452

    
453
class TestMatchNameComponent(unittest.TestCase):
454
  """Test case for the MatchNameComponent function"""
455

    
456
  def testEmptyList(self):
457
    """Test that there is no match against an empty list"""
458

    
459
    self.failUnlessEqual(MatchNameComponent("", []), None)
460
    self.failUnlessEqual(MatchNameComponent("test", []), None)
461

    
462
  def testSingleMatch(self):
463
    """Test that a single match is performed correctly"""
464
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
465
    for key in "test2", "test2.example", "test2.example.com":
466
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
467

    
468
  def testMultipleMatches(self):
469
    """Test that a multiple match is returned as None"""
470
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
471
    for key in "test1", "test1.example":
472
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
473

    
474
  def testFullMatch(self):
475
    """Test that a full match is returned correctly"""
476
    key1 = "test1"
477
    key2 = "test1.example"
478
    mlist = [key2, key2 + ".com"]
479
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
480
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
481

    
482
  def testCaseInsensitivePartialMatch(self):
483
    """Test for the case_insensitive keyword"""
484
    mlist = ["test1.example.com", "test2.example.net"]
485
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
486
                     "test2.example.net")
487
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
488
                     "test2.example.net")
489
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
490
                     "test2.example.net")
491
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
492
                     "test2.example.net")
493

    
494

    
495
  def testCaseInsensitiveFullMatch(self):
496
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
497
    # Between the two ts1 a full string match non-case insensitive should work
498
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
499
                     None)
500
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
501
                     "ts1.ex")
502
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
503
                     "ts1.ex")
504
    # Between the two ts2 only case differs, so only case-match works
505
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
506
                     "ts2.ex")
507
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
508
                     "Ts2.ex")
509
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
510
                     None)
511

    
512

    
513
class TestFormatUnit(unittest.TestCase):
514
  """Test case for the FormatUnit function"""
515

    
516
  def testMiB(self):
517
    self.assertEqual(FormatUnit(1, 'h'), '1M')
518
    self.assertEqual(FormatUnit(100, 'h'), '100M')
519
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
520

    
521
    self.assertEqual(FormatUnit(1, 'm'), '1')
522
    self.assertEqual(FormatUnit(100, 'm'), '100')
523
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
524

    
525
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
526
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
527
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
528
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
529

    
530
  def testGiB(self):
531
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
532
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
533
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
534
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
535

    
536
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
537
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
538
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
539
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
540

    
541
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
542
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
543
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
544

    
545
  def testTiB(self):
546
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
547
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
548
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
549

    
550
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
551
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
552
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
553

    
554
class TestParseUnit(unittest.TestCase):
555
  """Test case for the ParseUnit function"""
556

    
557
  SCALES = (('', 1),
558
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
559
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
560
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
561

    
562
  def testRounding(self):
563
    self.assertEqual(ParseUnit('0'), 0)
564
    self.assertEqual(ParseUnit('1'), 4)
565
    self.assertEqual(ParseUnit('2'), 4)
566
    self.assertEqual(ParseUnit('3'), 4)
567

    
568
    self.assertEqual(ParseUnit('124'), 124)
569
    self.assertEqual(ParseUnit('125'), 128)
570
    self.assertEqual(ParseUnit('126'), 128)
571
    self.assertEqual(ParseUnit('127'), 128)
572
    self.assertEqual(ParseUnit('128'), 128)
573
    self.assertEqual(ParseUnit('129'), 132)
574
    self.assertEqual(ParseUnit('130'), 132)
575

    
576
  def testFloating(self):
577
    self.assertEqual(ParseUnit('0'), 0)
578
    self.assertEqual(ParseUnit('0.5'), 4)
579
    self.assertEqual(ParseUnit('1.75'), 4)
580
    self.assertEqual(ParseUnit('1.99'), 4)
581
    self.assertEqual(ParseUnit('2.00'), 4)
582
    self.assertEqual(ParseUnit('2.01'), 4)
583
    self.assertEqual(ParseUnit('3.99'), 4)
584
    self.assertEqual(ParseUnit('4.00'), 4)
585
    self.assertEqual(ParseUnit('4.01'), 8)
586
    self.assertEqual(ParseUnit('1.5G'), 1536)
587
    self.assertEqual(ParseUnit('1.8G'), 1844)
588
    self.assertEqual(ParseUnit('8.28T'), 8682212)
589

    
590
  def testSuffixes(self):
591
    for sep in ('', ' ', '   ', "\t", "\t "):
592
      for suffix, scale in TestParseUnit.SCALES:
593
        for func in (lambda x: x, str.lower, str.upper):
594
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
595
                           1024 * scale)
596

    
597
  def testInvalidInput(self):
598
    for sep in ('-', '_', ',', 'a'):
599
      for suffix, _ in TestParseUnit.SCALES:
600
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
601

    
602
    for suffix, _ in TestParseUnit.SCALES:
603
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
604

    
605

    
606
class TestSshKeys(testutils.GanetiTestCase):
607
  """Test case for the AddAuthorizedKey function"""
608

    
609
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
610
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
611
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
612

    
613
  def setUp(self):
614
    testutils.GanetiTestCase.setUp(self)
615
    self.tmpname = self._CreateTempFile()
616
    handle = open(self.tmpname, 'w')
617
    try:
618
      handle.write("%s\n" % TestSshKeys.KEY_A)
619
      handle.write("%s\n" % TestSshKeys.KEY_B)
620
    finally:
621
      handle.close()
622

    
623
  def testAddingNewKey(self):
624
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
625

    
626
    self.assertFileContent(self.tmpname,
627
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
628
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
629
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
630
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
631

    
632
  def testAddingAlmostButNotCompletelyTheSameKey(self):
633
    AddAuthorizedKey(self.tmpname,
634
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
635

    
636
    self.assertFileContent(self.tmpname,
637
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
638
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
639
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
640
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
641

    
642
  def testAddingExistingKeyWithSomeMoreSpaces(self):
643
    AddAuthorizedKey(self.tmpname,
644
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
645

    
646
    self.assertFileContent(self.tmpname,
647
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
648
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
649
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
650

    
651
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
652
    RemoveAuthorizedKey(self.tmpname,
653
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
654

    
655
    self.assertFileContent(self.tmpname,
656
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
657
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
658

    
659
  def testRemovingNonExistingKey(self):
660
    RemoveAuthorizedKey(self.tmpname,
661
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
662

    
663
    self.assertFileContent(self.tmpname,
664
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
665
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
666
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
667

    
668

    
669
class TestEtcHosts(testutils.GanetiTestCase):
670
  """Test functions modifying /etc/hosts"""
671

    
672
  def setUp(self):
673
    testutils.GanetiTestCase.setUp(self)
674
    self.tmpname = self._CreateTempFile()
675
    handle = open(self.tmpname, 'w')
676
    try:
677
      handle.write('# This is a test file for /etc/hosts\n')
678
      handle.write('127.0.0.1\tlocalhost\n')
679
      handle.write('192.168.1.1 router gw\n')
680
    finally:
681
      handle.close()
682

    
683
  def testSettingNewIp(self):
684
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
685

    
686
    self.assertFileContent(self.tmpname,
687
      "# This is a test file for /etc/hosts\n"
688
      "127.0.0.1\tlocalhost\n"
689
      "192.168.1.1 router gw\n"
690
      "1.2.3.4\tmyhost.domain.tld myhost\n")
691
    self.assertFileMode(self.tmpname, 0644)
692

    
693
  def testSettingExistingIp(self):
694
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
695
                     ['myhost'])
696

    
697
    self.assertFileContent(self.tmpname,
698
      "# This is a test file for /etc/hosts\n"
699
      "127.0.0.1\tlocalhost\n"
700
      "192.168.1.1\tmyhost.domain.tld myhost\n")
701
    self.assertFileMode(self.tmpname, 0644)
702

    
703
  def testSettingDuplicateName(self):
704
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
705

    
706
    self.assertFileContent(self.tmpname,
707
      "# This is a test file for /etc/hosts\n"
708
      "127.0.0.1\tlocalhost\n"
709
      "192.168.1.1 router gw\n"
710
      "1.2.3.4\tmyhost\n")
711
    self.assertFileMode(self.tmpname, 0644)
712

    
713
  def testRemovingExistingHost(self):
714
    RemoveEtcHostsEntry(self.tmpname, 'router')
715

    
716
    self.assertFileContent(self.tmpname,
717
      "# This is a test file for /etc/hosts\n"
718
      "127.0.0.1\tlocalhost\n"
719
      "192.168.1.1 gw\n")
720
    self.assertFileMode(self.tmpname, 0644)
721

    
722
  def testRemovingSingleExistingHost(self):
723
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
724

    
725
    self.assertFileContent(self.tmpname,
726
      "# This is a test file for /etc/hosts\n"
727
      "192.168.1.1 router gw\n")
728
    self.assertFileMode(self.tmpname, 0644)
729

    
730
  def testRemovingNonExistingHost(self):
731
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
732

    
733
    self.assertFileContent(self.tmpname,
734
      "# This is a test file for /etc/hosts\n"
735
      "127.0.0.1\tlocalhost\n"
736
      "192.168.1.1 router gw\n")
737
    self.assertFileMode(self.tmpname, 0644)
738

    
739
  def testRemovingAlias(self):
740
    RemoveEtcHostsEntry(self.tmpname, 'gw')
741

    
742
    self.assertFileContent(self.tmpname,
743
      "# This is a test file for /etc/hosts\n"
744
      "127.0.0.1\tlocalhost\n"
745
      "192.168.1.1 router\n")
746
    self.assertFileMode(self.tmpname, 0644)
747

    
748

    
749
class TestShellQuoting(unittest.TestCase):
750
  """Test case for shell quoting functions"""
751

    
752
  def testShellQuote(self):
753
    self.assertEqual(ShellQuote('abc'), "abc")
754
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
755
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
756
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
757
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
758

    
759
  def testShellQuoteArgs(self):
760
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
761
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
762
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
763

    
764

    
765
class TestTcpPing(unittest.TestCase):
766
  """Testcase for TCP version of ping - against listen(2)ing port"""
767

    
768
  def setUp(self):
769
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
770
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
771
    self.listenerport = self.listener.getsockname()[1]
772
    self.listener.listen(1)
773

    
774
  def tearDown(self):
775
    self.listener.shutdown(socket.SHUT_RDWR)
776
    del self.listener
777
    del self.listenerport
778

    
779
  def testTcpPingToLocalHostAccept(self):
780
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
781
                         self.listenerport,
782
                         timeout=10,
783
                         live_port_needed=True,
784
                         source=constants.LOCALHOST_IP_ADDRESS,
785
                         ),
786
                 "failed to connect to test listener")
787

    
788
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
789
                         self.listenerport,
790
                         timeout=10,
791
                         live_port_needed=True,
792
                         ),
793
                 "failed to connect to test listener (no source)")
794

    
795

    
796
class TestTcpPingDeaf(unittest.TestCase):
797
  """Testcase for TCP version of ping - against non listen(2)ing port"""
798

    
799
  def setUp(self):
800
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
801
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
802
    self.deaflistenerport = self.deaflistener.getsockname()[1]
803

    
804
  def tearDown(self):
805
    del self.deaflistener
806
    del self.deaflistenerport
807

    
808
  def testTcpPingToLocalHostAcceptDeaf(self):
809
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
810
                        self.deaflistenerport,
811
                        timeout=constants.TCP_PING_TIMEOUT,
812
                        live_port_needed=True,
813
                        source=constants.LOCALHOST_IP_ADDRESS,
814
                        ), # need successful connect(2)
815
                "successfully connected to deaf listener")
816

    
817
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
818
                        self.deaflistenerport,
819
                        timeout=constants.TCP_PING_TIMEOUT,
820
                        live_port_needed=True,
821
                        ), # need successful connect(2)
822
                "successfully connected to deaf listener (no source addr)")
823

    
824
  def testTcpPingToLocalHostNoAccept(self):
825
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
826
                         self.deaflistenerport,
827
                         timeout=constants.TCP_PING_TIMEOUT,
828
                         live_port_needed=False,
829
                         source=constants.LOCALHOST_IP_ADDRESS,
830
                         ), # ECONNREFUSED is OK
831
                 "failed to ping alive host on deaf port")
832

    
833
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
834
                         self.deaflistenerport,
835
                         timeout=constants.TCP_PING_TIMEOUT,
836
                         live_port_needed=False,
837
                         ), # ECONNREFUSED is OK
838
                 "failed to ping alive host on deaf port (no source addr)")
839

    
840

    
841
class TestOwnIpAddress(unittest.TestCase):
842
  """Testcase for OwnIpAddress"""
843

    
844
  def testOwnLoopback(self):
845
    """check having the loopback ip"""
846
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
847
                    "Should own the loopback address")
848

    
849
  def testNowOwnAddress(self):
850
    """check that I don't own an address"""
851

    
852
    # network 192.0.2.0/24 is reserved for test/documentation as per
853
    # rfc 3330, so we *should* not have an address of this range... if
854
    # this fails, we should extend the test to multiple addresses
855
    DST_IP = "192.0.2.1"
856
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
857

    
858

    
859
class TestListVisibleFiles(unittest.TestCase):
860
  """Test case for ListVisibleFiles"""
861

    
862
  def setUp(self):
863
    self.path = tempfile.mkdtemp()
864

    
865
  def tearDown(self):
866
    shutil.rmtree(self.path)
867

    
868
  def _test(self, files, expected):
869
    # Sort a copy
870
    expected = expected[:]
871
    expected.sort()
872

    
873
    for name in files:
874
      f = open(os.path.join(self.path, name), 'w')
875
      try:
876
        f.write("Test\n")
877
      finally:
878
        f.close()
879

    
880
    found = ListVisibleFiles(self.path)
881
    found.sort()
882

    
883
    self.assertEqual(found, expected)
884

    
885
  def testAllVisible(self):
886
    files = ["a", "b", "c"]
887
    expected = files
888
    self._test(files, expected)
889

    
890
  def testNoneVisible(self):
891
    files = [".a", ".b", ".c"]
892
    expected = []
893
    self._test(files, expected)
894

    
895
  def testSomeVisible(self):
896
    files = ["a", "b", ".c"]
897
    expected = ["a", "b"]
898
    self._test(files, expected)
899

    
900

    
901
class TestNewUUID(unittest.TestCase):
902
  """Test case for NewUUID"""
903

    
904
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
905
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
906

    
907
  def runTest(self):
908
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
909

    
910

    
911
class TestUniqueSequence(unittest.TestCase):
912
  """Test case for UniqueSequence"""
913

    
914
  def _test(self, input, expected):
915
    self.assertEqual(utils.UniqueSequence(input), expected)
916

    
917
  def runTest(self):
918
    # Ordered input
919
    self._test([1, 2, 3], [1, 2, 3])
920
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
921
    self._test([1, 2, 2, 3], [1, 2, 3])
922
    self._test([1, 2, 3, 3], [1, 2, 3])
923

    
924
    # Unordered input
925
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
926
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
927

    
928
    # Strings
929
    self._test(["a", "a"], ["a"])
930
    self._test(["a", "b"], ["a", "b"])
931
    self._test(["a", "b", "a"], ["a", "b"])
932

    
933

    
934
class TestFirstFree(unittest.TestCase):
935
  """Test case for the FirstFree function"""
936

    
937
  def test(self):
938
    """Test FirstFree"""
939
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
940
    self.failUnlessEqual(FirstFree([]), None)
941
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
942
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
943
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
944

    
945

    
946
class TestTailFile(testutils.GanetiTestCase):
947
  """Test case for the TailFile function"""
948

    
949
  def testEmpty(self):
950
    fname = self._CreateTempFile()
951
    self.failUnlessEqual(TailFile(fname), [])
952
    self.failUnlessEqual(TailFile(fname, lines=25), [])
953

    
954
  def testAllLines(self):
955
    data = ["test %d" % i for i in range(30)]
956
    for i in range(30):
957
      fname = self._CreateTempFile()
958
      fd = open(fname, "w")
959
      fd.write("\n".join(data[:i]))
960
      if i > 0:
961
        fd.write("\n")
962
      fd.close()
963
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
964

    
965
  def testPartialLines(self):
966
    data = ["test %d" % i for i in range(30)]
967
    fname = self._CreateTempFile()
968
    fd = open(fname, "w")
969
    fd.write("\n".join(data))
970
    fd.write("\n")
971
    fd.close()
972
    for i in range(1, 30):
973
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
974

    
975
  def testBigFile(self):
976
    data = ["test %d" % i for i in range(30)]
977
    fname = self._CreateTempFile()
978
    fd = open(fname, "w")
979
    fd.write("X" * 1048576)
980
    fd.write("\n")
981
    fd.write("\n".join(data))
982
    fd.write("\n")
983
    fd.close()
984
    for i in range(1, 30):
985
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
986

    
987

    
988
class TestFileLock(unittest.TestCase):
989
  """Test case for the FileLock class"""
990

    
991
  def setUp(self):
992
    self.tmpfile = tempfile.NamedTemporaryFile()
993
    self.lock = utils.FileLock(self.tmpfile.name)
994

    
995
  def testSharedNonblocking(self):
996
    self.lock.Shared(blocking=False)
997
    self.lock.Close()
998

    
999
  def testExclusiveNonblocking(self):
1000
    self.lock.Exclusive(blocking=False)
1001
    self.lock.Close()
1002

    
1003
  def testUnlockNonblocking(self):
1004
    self.lock.Unlock(blocking=False)
1005
    self.lock.Close()
1006

    
1007
  def testSharedBlocking(self):
1008
    self.lock.Shared(blocking=True)
1009
    self.lock.Close()
1010

    
1011
  def testExclusiveBlocking(self):
1012
    self.lock.Exclusive(blocking=True)
1013
    self.lock.Close()
1014

    
1015
  def testUnlockBlocking(self):
1016
    self.lock.Unlock(blocking=True)
1017
    self.lock.Close()
1018

    
1019
  def testSharedExclusiveUnlock(self):
1020
    self.lock.Shared(blocking=False)
1021
    self.lock.Exclusive(blocking=False)
1022
    self.lock.Unlock(blocking=False)
1023
    self.lock.Close()
1024

    
1025
  def testExclusiveSharedUnlock(self):
1026
    self.lock.Exclusive(blocking=False)
1027
    self.lock.Shared(blocking=False)
1028
    self.lock.Unlock(blocking=False)
1029
    self.lock.Close()
1030

    
1031
  def testCloseShared(self):
1032
    self.lock.Close()
1033
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1034

    
1035
  def testCloseExclusive(self):
1036
    self.lock.Close()
1037
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1038

    
1039
  def testCloseUnlock(self):
1040
    self.lock.Close()
1041
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1042

    
1043

    
1044
class TestTimeFunctions(unittest.TestCase):
1045
  """Test case for time functions"""
1046

    
1047
  def runTest(self):
1048
    self.assertEqual(utils.SplitTime(1), (1, 0))
1049
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1050
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1051
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1052
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1053
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1054
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1055
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1056

    
1057
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1058

    
1059
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1060
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1061
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1062

    
1063
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1064
                     1218448917.481)
1065
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1066

    
1067
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1068
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1069
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1070
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1071
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1072

    
1073

    
1074
class FieldSetTestCase(unittest.TestCase):
1075
  """Test case for FieldSets"""
1076

    
1077
  def testSimpleMatch(self):
1078
    f = utils.FieldSet("a", "b", "c", "def")
1079
    self.failUnless(f.Matches("a"))
1080
    self.failIf(f.Matches("d"), "Substring matched")
1081
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1082
    self.failIf(f.NonMatching(["b", "c"]))
1083
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1084
    self.failUnless(f.NonMatching(["a", "d"]))
1085

    
1086
  def testRegexMatch(self):
1087
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1088
    self.failUnless(f.Matches("b1"))
1089
    self.failUnless(f.Matches("b99"))
1090
    self.failIf(f.Matches("b/1"))
1091
    self.failIf(f.NonMatching(["b12", "c"]))
1092
    self.failUnless(f.NonMatching(["a", "1"]))
1093

    
1094
class TestForceDictType(unittest.TestCase):
1095
  """Test case for ForceDictType"""
1096

    
1097
  def setUp(self):
1098
    self.key_types = {
1099
      'a': constants.VTYPE_INT,
1100
      'b': constants.VTYPE_BOOL,
1101
      'c': constants.VTYPE_STRING,
1102
      'd': constants.VTYPE_SIZE,
1103
      }
1104

    
1105
  def _fdt(self, dict, allowed_values=None):
1106
    if allowed_values is None:
1107
      ForceDictType(dict, self.key_types)
1108
    else:
1109
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1110

    
1111
    return dict
1112

    
1113
  def testSimpleDict(self):
1114
    self.assertEqual(self._fdt({}), {})
1115
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1116
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1117
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1118
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1119
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1120
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1121
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1122
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1123
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1124
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1125
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1126

    
1127
  def testErrors(self):
1128
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1129
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1130
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1131
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1132

    
1133

    
1134
class TestIsAbsNormPath(unittest.TestCase):
1135
  """Testing case for IsProcessAlive"""
1136

    
1137
  def _pathTestHelper(self, path, result):
1138
    if result:
1139
      self.assert_(IsNormAbsPath(path),
1140
          "Path %s should result absolute and normalized" % path)
1141
    else:
1142
      self.assert_(not IsNormAbsPath(path),
1143
          "Path %s should not result absolute and normalized" % path)
1144

    
1145
  def testBase(self):
1146
    self._pathTestHelper('/etc', True)
1147
    self._pathTestHelper('/srv', True)
1148
    self._pathTestHelper('etc', False)
1149
    self._pathTestHelper('/etc/../root', False)
1150
    self._pathTestHelper('/etc/', False)
1151

    
1152

    
1153
class TestSafeEncode(unittest.TestCase):
1154
  """Test case for SafeEncode"""
1155

    
1156
  def testAscii(self):
1157
    for txt in [string.digits, string.letters, string.punctuation]:
1158
      self.failUnlessEqual(txt, SafeEncode(txt))
1159

    
1160
  def testDoubleEncode(self):
1161
    for i in range(255):
1162
      txt = SafeEncode(chr(i))
1163
      self.failUnlessEqual(txt, SafeEncode(txt))
1164

    
1165
  def testUnicode(self):
1166
    # 1024 is high enough to catch non-direct ASCII mappings
1167
    for i in range(1024):
1168
      txt = SafeEncode(unichr(i))
1169
      self.failUnlessEqual(txt, SafeEncode(txt))
1170

    
1171

    
1172
class TestFormatTime(unittest.TestCase):
1173
  """Testing case for FormatTime"""
1174

    
1175
  def testNone(self):
1176
    self.failUnlessEqual(FormatTime(None), "N/A")
1177

    
1178
  def testInvalid(self):
1179
    self.failUnlessEqual(FormatTime(()), "N/A")
1180

    
1181
  def testNow(self):
1182
    # tests that we accept time.time input
1183
    FormatTime(time.time())
1184
    # tests that we accept int input
1185
    FormatTime(int(time.time()))
1186

    
1187

    
1188
class RunInSeparateProcess(unittest.TestCase):
1189
  def test(self):
1190
    for exp in [True, False]:
1191
      def _child():
1192
        return exp
1193

    
1194
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1195

    
1196
  def testPid(self):
1197
    parent_pid = os.getpid()
1198

    
1199
    def _check():
1200
      return os.getpid() == parent_pid
1201

    
1202
    self.failIf(utils.RunInSeparateProcess(_check))
1203

    
1204
  def testSignal(self):
1205
    def _kill():
1206
      os.kill(os.getpid(), signal.SIGTERM)
1207

    
1208
    self.assertRaises(errors.GenericError,
1209
                      utils.RunInSeparateProcess, _kill)
1210

    
1211
  def testException(self):
1212
    def _exc():
1213
      raise errors.GenericError("This is a test")
1214

    
1215
    self.assertRaises(errors.GenericError,
1216
                      utils.RunInSeparateProcess, _exc)
1217

    
1218

    
1219
class TestFingerprintFile(unittest.TestCase):
1220
  def setUp(self):
1221
    self.tmpfile = tempfile.NamedTemporaryFile()
1222

    
1223
  def test(self):
1224
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1225
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1226

    
1227
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1228
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1229
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1230

    
1231

    
1232
class TestUnescapeAndSplit(unittest.TestCase):
1233
  """Testing case for UnescapeAndSplit"""
1234

    
1235
  def setUp(self):
1236
    # testing more that one separator for regexp safety
1237
    self._seps = [",", "+", "."]
1238

    
1239
  def testSimple(self):
1240
    a = ["a", "b", "c", "d"]
1241
    for sep in self._seps:
1242
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1243

    
1244
  def testEscape(self):
1245
    for sep in self._seps:
1246
      a = ["a", "b\\" + sep + "c", "d"]
1247
      b = ["a", "b" + sep + "c", "d"]
1248
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1249

    
1250
  def testDoubleEscape(self):
1251
    for sep in self._seps:
1252
      a = ["a", "b\\\\", "c", "d"]
1253
      b = ["a", "b\\", "c", "d"]
1254
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1255

    
1256
  def testThreeEscape(self):
1257
    for sep in self._seps:
1258
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1259
      b = ["a", "b\\" + sep + "c", "d"]
1260
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1261

    
1262

    
1263
if __name__ == '__main__':
1264
  testutils.GanetiTestProgram()