ListVisibleFiles: require normalized path names
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import stat
31 import md5
32 import signal
33 import socket
34 import shutil
35 import re
36 import select
37 import string
38
39 import ganeti
40 import testutils
41 from ganeti import constants
42 from ganeti import utils
43 from ganeti import errors
44 from ganeti.utils import IsProcessAlive, RunCmd, \
45      RemoveFile, MatchNameComponent, FormatUnit, \
46      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
47      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
48      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
49      TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
50      UnescapeAndSplit, RunParts, PathJoin
51
52 from ganeti.errors import LockError, UnitParseError, GenericError, \
53      ProgrammerError
54
55
56 class TestIsProcessAlive(unittest.TestCase):
57   """Testing case for IsProcessAlive"""
58
59   def testExists(self):
60     mypid = os.getpid()
61     self.assert_(IsProcessAlive(mypid),
62                  "can't find myself running")
63
64   def testNotExisting(self):
65     pid_non_existing = os.fork()
66     if pid_non_existing == 0:
67       os._exit(0)
68     elif pid_non_existing < 0:
69       raise SystemError("can't fork")
70     os.waitpid(pid_non_existing, 0)
71     self.assert_(not IsProcessAlive(pid_non_existing),
72                  "nonexisting process detected")
73
74
75 class TestPidFileFunctions(unittest.TestCase):
76   """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
77
78   def setUp(self):
79     self.dir = tempfile.mkdtemp()
80     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
81     utils.DaemonPidFileName = self.f_dpn
82
83   def testPidFileFunctions(self):
84     pid_file = self.f_dpn('test')
85     utils.WritePidFile('test')
86     self.failUnless(os.path.exists(pid_file),
87                     "PID file should have been created")
88     read_pid = utils.ReadPidFile(pid_file)
89     self.failUnlessEqual(read_pid, os.getpid())
90     self.failUnless(utils.IsProcessAlive(read_pid))
91     self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
92     utils.RemovePidFile('test')
93     self.failIf(os.path.exists(pid_file),
94                 "PID file should not exist anymore")
95     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
96                          "ReadPidFile should return 0 for missing pid file")
97     fh = open(pid_file, "w")
98     fh.write("blah\n")
99     fh.close()
100     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
101                          "ReadPidFile should return 0 for invalid pid file")
102     utils.RemovePidFile('test')
103     self.failIf(os.path.exists(pid_file),
104                 "PID file should not exist anymore")
105
106   def testKill(self):
107     pid_file = self.f_dpn('child')
108     r_fd, w_fd = os.pipe()
109     new_pid = os.fork()
110     if new_pid == 0: #child
111       utils.WritePidFile('child')
112       os.write(w_fd, 'a')
113       signal.pause()
114       os._exit(0)
115       return
116     # else we are in the parent
117     # wait until the child has written the pid file
118     os.read(r_fd, 1)
119     read_pid = utils.ReadPidFile(pid_file)
120     self.failUnlessEqual(read_pid, new_pid)
121     self.failUnless(utils.IsProcessAlive(new_pid))
122     utils.KillProcess(new_pid, waitpid=True)
123     self.failIf(utils.IsProcessAlive(new_pid))
124     utils.RemovePidFile('child')
125     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
126
127   def tearDown(self):
128     for name in os.listdir(self.dir):
129       os.unlink(os.path.join(self.dir, name))
130     os.rmdir(self.dir)
131
132
133 class TestRunCmd(testutils.GanetiTestCase):
134   """Testing case for the RunCmd function"""
135
136   def setUp(self):
137     testutils.GanetiTestCase.setUp(self)
138     self.magic = time.ctime() + " ganeti test"
139     self.fname = self._CreateTempFile()
140
141   def testOk(self):
142     """Test successful exit code"""
143     result = RunCmd("/bin/sh -c 'exit 0'")
144     self.assertEqual(result.exit_code, 0)
145     self.assertEqual(result.output, "")
146
147   def testFail(self):
148     """Test fail exit code"""
149     result = RunCmd("/bin/sh -c 'exit 1'")
150     self.assertEqual(result.exit_code, 1)
151     self.assertEqual(result.output, "")
152
153   def testStdout(self):
154     """Test standard output"""
155     cmd = 'echo -n "%s"' % self.magic
156     result = RunCmd("/bin/sh -c '%s'" % cmd)
157     self.assertEqual(result.stdout, self.magic)
158     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
159     self.assertEqual(result.output, "")
160     self.assertFileContent(self.fname, self.magic)
161
162   def testStderr(self):
163     """Test standard error"""
164     cmd = 'echo -n "%s"' % self.magic
165     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
166     self.assertEqual(result.stderr, self.magic)
167     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
168     self.assertEqual(result.output, "")
169     self.assertFileContent(self.fname, self.magic)
170
171   def testCombined(self):
172     """Test combined output"""
173     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
174     expected = "A" + self.magic + "B" + self.magic
175     result = RunCmd("/bin/sh -c '%s'" % cmd)
176     self.assertEqual(result.output, expected)
177     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
178     self.assertEqual(result.output, "")
179     self.assertFileContent(self.fname, expected)
180
181   def testSignal(self):
182     """Test signal"""
183     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
184     self.assertEqual(result.signal, 15)
185     self.assertEqual(result.output, "")
186
187   def testListRun(self):
188     """Test list runs"""
189     result = RunCmd(["true"])
190     self.assertEqual(result.signal, None)
191     self.assertEqual(result.exit_code, 0)
192     result = RunCmd(["/bin/sh", "-c", "exit 1"])
193     self.assertEqual(result.signal, None)
194     self.assertEqual(result.exit_code, 1)
195     result = RunCmd(["echo", "-n", self.magic])
196     self.assertEqual(result.signal, None)
197     self.assertEqual(result.exit_code, 0)
198     self.assertEqual(result.stdout, self.magic)
199
200   def testFileEmptyOutput(self):
201     """Test file output"""
202     result = RunCmd(["true"], output=self.fname)
203     self.assertEqual(result.signal, None)
204     self.assertEqual(result.exit_code, 0)
205     self.assertFileContent(self.fname, "")
206
207   def testLang(self):
208     """Test locale environment"""
209     old_env = os.environ.copy()
210     try:
211       os.environ["LANG"] = "en_US.UTF-8"
212       os.environ["LC_ALL"] = "en_US.UTF-8"
213       result = RunCmd(["locale"])
214       for line in result.output.splitlines():
215         key, value = line.split("=", 1)
216         # Ignore these variables, they're overridden by LC_ALL
217         if key == "LANG" or key == "LANGUAGE":
218           continue
219         self.failIf(value and value != "C" and value != '"C"',
220             "Variable %s is set to the invalid value '%s'" % (key, value))
221     finally:
222       os.environ = old_env
223
224   def testDefaultCwd(self):
225     """Test default working directory"""
226     self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
227
228   def testCwd(self):
229     """Test default working directory"""
230     self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
231     self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
232     cwd = os.getcwd()
233     self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
234
235   def testResetEnv(self):
236     """Test environment reset functionality"""
237     self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
238
239
240 class TestRunParts(unittest.TestCase):
241   """Testing case for the RunParts function"""
242
243   def setUp(self):
244     self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
245
246   def tearDown(self):
247     shutil.rmtree(self.rundir)
248
249   def testEmpty(self):
250     """Test on an empty dir"""
251     self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
252
253   def testSkipWrongName(self):
254     """Test that wrong files are skipped"""
255     fname = os.path.join(self.rundir, "00test.dot")
256     utils.WriteFile(fname, data="")
257     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
258     relname = os.path.basename(fname)
259     self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
260                          [(relname, constants.RUNPARTS_SKIP, None)])
261
262   def testSkipNonExec(self):
263     """Test that non executable files are skipped"""
264     fname = os.path.join(self.rundir, "00test")
265     utils.WriteFile(fname, data="")
266     relname = os.path.basename(fname)
267     self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
268                          [(relname, constants.RUNPARTS_SKIP, None)])
269
270   def testError(self):
271     """Test error on a broken executable"""
272     fname = os.path.join(self.rundir, "00test")
273     utils.WriteFile(fname, data="")
274     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
275     (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
276     self.failUnlessEqual(relname, os.path.basename(fname))
277     self.failUnlessEqual(status, constants.RUNPARTS_ERR)
278     self.failUnless(error)
279
280   def testSorted(self):
281     """Test executions are sorted"""
282     files = []
283     files.append(os.path.join(self.rundir, "64test"))
284     files.append(os.path.join(self.rundir, "00test"))
285     files.append(os.path.join(self.rundir, "42test"))
286
287     for fname in files:
288       utils.WriteFile(fname, data="")
289
290     results = RunParts(self.rundir, reset_env=True)
291
292     for fname in sorted(files):
293       self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
294
295   def testOk(self):
296     """Test correct execution"""
297     fname = os.path.join(self.rundir, "00test")
298     utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
299     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
300     (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
301     self.failUnlessEqual(relname, os.path.basename(fname))
302     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
303     self.failUnlessEqual(runresult.stdout, "ciao")
304
305   def testRunFail(self):
306     """Test correct execution, with run failure"""
307     fname = os.path.join(self.rundir, "00test")
308     utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
309     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
310     (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
311     self.failUnlessEqual(relname, os.path.basename(fname))
312     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
313     self.failUnlessEqual(runresult.exit_code, 1)
314     self.failUnless(runresult.failed)
315
316   def testRunMix(self):
317     files = []
318     files.append(os.path.join(self.rundir, "00test"))
319     files.append(os.path.join(self.rundir, "42test"))
320     files.append(os.path.join(self.rundir, "64test"))
321     files.append(os.path.join(self.rundir, "99test"))
322
323     files.sort()
324
325     # 1st has errors in execution
326     utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
327     os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
328
329     # 2nd is skipped
330     utils.WriteFile(files[1], data="")
331
332     # 3rd cannot execute properly
333     utils.WriteFile(files[2], data="")
334     os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
335
336     # 4th execs
337     utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
338     os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
339
340     results = RunParts(self.rundir, reset_env=True)
341
342     (relname, status, runresult) = results[0]
343     self.failUnlessEqual(relname, os.path.basename(files[0]))
344     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
345     self.failUnlessEqual(runresult.exit_code, 1)
346     self.failUnless(runresult.failed)
347
348     (relname, status, runresult) = results[1]
349     self.failUnlessEqual(relname, os.path.basename(files[1]))
350     self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
351     self.failUnlessEqual(runresult, None)
352
353     (relname, status, runresult) = results[2]
354     self.failUnlessEqual(relname, os.path.basename(files[2]))
355     self.failUnlessEqual(status, constants.RUNPARTS_ERR)
356     self.failUnless(runresult)
357
358     (relname, status, runresult) = results[3]
359     self.failUnlessEqual(relname, os.path.basename(files[3]))
360     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
361     self.failUnlessEqual(runresult.output, "ciao")
362     self.failUnlessEqual(runresult.exit_code, 0)
363     self.failUnless(not runresult.failed)
364
365
366 class TestRemoveFile(unittest.TestCase):
367   """Test case for the RemoveFile function"""
368
369   def setUp(self):
370     """Create a temp dir and file for each case"""
371     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
372     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
373     os.close(fd)
374
375   def tearDown(self):
376     if os.path.exists(self.tmpfile):
377       os.unlink(self.tmpfile)
378     os.rmdir(self.tmpdir)
379
380
381   def testIgnoreDirs(self):
382     """Test that RemoveFile() ignores directories"""
383     self.assertEqual(None, RemoveFile(self.tmpdir))
384
385
386   def testIgnoreNotExisting(self):
387     """Test that RemoveFile() ignores non-existing files"""
388     RemoveFile(self.tmpfile)
389     RemoveFile(self.tmpfile)
390
391
392   def testRemoveFile(self):
393     """Test that RemoveFile does remove a file"""
394     RemoveFile(self.tmpfile)
395     if os.path.exists(self.tmpfile):
396       self.fail("File '%s' not removed" % self.tmpfile)
397
398
399   def testRemoveSymlink(self):
400     """Test that RemoveFile does remove symlinks"""
401     symlink = self.tmpdir + "/symlink"
402     os.symlink("no-such-file", symlink)
403     RemoveFile(symlink)
404     if os.path.exists(symlink):
405       self.fail("File '%s' not removed" % symlink)
406     os.symlink(self.tmpfile, symlink)
407     RemoveFile(symlink)
408     if os.path.exists(symlink):
409       self.fail("File '%s' not removed" % symlink)
410
411
412 class TestRename(unittest.TestCase):
413   """Test case for RenameFile"""
414
415   def setUp(self):
416     """Create a temporary directory"""
417     self.tmpdir = tempfile.mkdtemp()
418     self.tmpfile = os.path.join(self.tmpdir, "test1")
419
420     # Touch the file
421     open(self.tmpfile, "w").close()
422
423   def tearDown(self):
424     """Remove temporary directory"""
425     shutil.rmtree(self.tmpdir)
426
427   def testSimpleRename1(self):
428     """Simple rename 1"""
429     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
430     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
431
432   def testSimpleRename2(self):
433     """Simple rename 2"""
434     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
435                      mkdir=True)
436     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
437
438   def testRenameMkdir(self):
439     """Rename with mkdir"""
440     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
441                      mkdir=True)
442     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
443     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
444
445     utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
446                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
447                      mkdir=True)
448     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
449     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
450     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
451
452
453 class TestMatchNameComponent(unittest.TestCase):
454   """Test case for the MatchNameComponent function"""
455
456   def testEmptyList(self):
457     """Test that there is no match against an empty list"""
458
459     self.failUnlessEqual(MatchNameComponent("", []), None)
460     self.failUnlessEqual(MatchNameComponent("test", []), None)
461
462   def testSingleMatch(self):
463     """Test that a single match is performed correctly"""
464     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
465     for key in "test2", "test2.example", "test2.example.com":
466       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
467
468   def testMultipleMatches(self):
469     """Test that a multiple match is returned as None"""
470     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
471     for key in "test1", "test1.example":
472       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
473
474   def testFullMatch(self):
475     """Test that a full match is returned correctly"""
476     key1 = "test1"
477     key2 = "test1.example"
478     mlist = [key2, key2 + ".com"]
479     self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
480     self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
481
482   def testCaseInsensitivePartialMatch(self):
483     """Test for the case_insensitive keyword"""
484     mlist = ["test1.example.com", "test2.example.net"]
485     self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
486                      "test2.example.net")
487     self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
488                      "test2.example.net")
489     self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
490                      "test2.example.net")
491     self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
492                      "test2.example.net")
493
494
495   def testCaseInsensitiveFullMatch(self):
496     mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
497     # Between the two ts1 a full string match non-case insensitive should work
498     self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
499                      None)
500     self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
501                      "ts1.ex")
502     self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
503                      "ts1.ex")
504     # Between the two ts2 only case differs, so only case-match works
505     self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
506                      "ts2.ex")
507     self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
508                      "Ts2.ex")
509     self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
510                      None)
511
512
513 class TestFormatUnit(unittest.TestCase):
514   """Test case for the FormatUnit function"""
515
516   def testMiB(self):
517     self.assertEqual(FormatUnit(1, 'h'), '1M')
518     self.assertEqual(FormatUnit(100, 'h'), '100M')
519     self.assertEqual(FormatUnit(1023, 'h'), '1023M')
520
521     self.assertEqual(FormatUnit(1, 'm'), '1')
522     self.assertEqual(FormatUnit(100, 'm'), '100')
523     self.assertEqual(FormatUnit(1023, 'm'), '1023')
524
525     self.assertEqual(FormatUnit(1024, 'm'), '1024')
526     self.assertEqual(FormatUnit(1536, 'm'), '1536')
527     self.assertEqual(FormatUnit(17133, 'm'), '17133')
528     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
529
530   def testGiB(self):
531     self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
532     self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
533     self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
534     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
535
536     self.assertEqual(FormatUnit(1024, 'g'), '1.0')
537     self.assertEqual(FormatUnit(1536, 'g'), '1.5')
538     self.assertEqual(FormatUnit(17133, 'g'), '16.7')
539     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
540
541     self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
542     self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
543     self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
544
545   def testTiB(self):
546     self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
547     self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
548     self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
549
550     self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
551     self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
552     self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
553
554 class TestParseUnit(unittest.TestCase):
555   """Test case for the ParseUnit function"""
556
557   SCALES = (('', 1),
558             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
559             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
560             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
561
562   def testRounding(self):
563     self.assertEqual(ParseUnit('0'), 0)
564     self.assertEqual(ParseUnit('1'), 4)
565     self.assertEqual(ParseUnit('2'), 4)
566     self.assertEqual(ParseUnit('3'), 4)
567
568     self.assertEqual(ParseUnit('124'), 124)
569     self.assertEqual(ParseUnit('125'), 128)
570     self.assertEqual(ParseUnit('126'), 128)
571     self.assertEqual(ParseUnit('127'), 128)
572     self.assertEqual(ParseUnit('128'), 128)
573     self.assertEqual(ParseUnit('129'), 132)
574     self.assertEqual(ParseUnit('130'), 132)
575
576   def testFloating(self):
577     self.assertEqual(ParseUnit('0'), 0)
578     self.assertEqual(ParseUnit('0.5'), 4)
579     self.assertEqual(ParseUnit('1.75'), 4)
580     self.assertEqual(ParseUnit('1.99'), 4)
581     self.assertEqual(ParseUnit('2.00'), 4)
582     self.assertEqual(ParseUnit('2.01'), 4)
583     self.assertEqual(ParseUnit('3.99'), 4)
584     self.assertEqual(ParseUnit('4.00'), 4)
585     self.assertEqual(ParseUnit('4.01'), 8)
586     self.assertEqual(ParseUnit('1.5G'), 1536)
587     self.assertEqual(ParseUnit('1.8G'), 1844)
588     self.assertEqual(ParseUnit('8.28T'), 8682212)
589
590   def testSuffixes(self):
591     for sep in ('', ' ', '   ', "\t", "\t "):
592       for suffix, scale in TestParseUnit.SCALES:
593         for func in (lambda x: x, str.lower, str.upper):
594           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
595                            1024 * scale)
596
597   def testInvalidInput(self):
598     for sep in ('-', '_', ',', 'a'):
599       for suffix, _ in TestParseUnit.SCALES:
600         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
601
602     for suffix, _ in TestParseUnit.SCALES:
603       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
604
605
606 class TestSshKeys(testutils.GanetiTestCase):
607   """Test case for the AddAuthorizedKey function"""
608
609   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
610   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
611            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
612
613   def setUp(self):
614     testutils.GanetiTestCase.setUp(self)
615     self.tmpname = self._CreateTempFile()
616     handle = open(self.tmpname, 'w')
617     try:
618       handle.write("%s\n" % TestSshKeys.KEY_A)
619       handle.write("%s\n" % TestSshKeys.KEY_B)
620     finally:
621       handle.close()
622
623   def testAddingNewKey(self):
624     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
625
626     self.assertFileContent(self.tmpname,
627       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
628       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
629       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
630       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
631
632   def testAddingAlmostButNotCompletelyTheSameKey(self):
633     AddAuthorizedKey(self.tmpname,
634         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
635
636     self.assertFileContent(self.tmpname,
637       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
638       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
639       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
640       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
641
642   def testAddingExistingKeyWithSomeMoreSpaces(self):
643     AddAuthorizedKey(self.tmpname,
644         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
645
646     self.assertFileContent(self.tmpname,
647       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
648       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
649       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
650
651   def testRemovingExistingKeyWithSomeMoreSpaces(self):
652     RemoveAuthorizedKey(self.tmpname,
653         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
654
655     self.assertFileContent(self.tmpname,
656       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
657       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
658
659   def testRemovingNonExistingKey(self):
660     RemoveAuthorizedKey(self.tmpname,
661         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
662
663     self.assertFileContent(self.tmpname,
664       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
665       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
666       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
667
668
669 class TestEtcHosts(testutils.GanetiTestCase):
670   """Test functions modifying /etc/hosts"""
671
672   def setUp(self):
673     testutils.GanetiTestCase.setUp(self)
674     self.tmpname = self._CreateTempFile()
675     handle = open(self.tmpname, 'w')
676     try:
677       handle.write('# This is a test file for /etc/hosts\n')
678       handle.write('127.0.0.1\tlocalhost\n')
679       handle.write('192.168.1.1 router gw\n')
680     finally:
681       handle.close()
682
683   def testSettingNewIp(self):
684     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
685
686     self.assertFileContent(self.tmpname,
687       "# This is a test file for /etc/hosts\n"
688       "127.0.0.1\tlocalhost\n"
689       "192.168.1.1 router gw\n"
690       "1.2.3.4\tmyhost.domain.tld myhost\n")
691     self.assertFileMode(self.tmpname, 0644)
692
693   def testSettingExistingIp(self):
694     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
695                      ['myhost'])
696
697     self.assertFileContent(self.tmpname,
698       "# This is a test file for /etc/hosts\n"
699       "127.0.0.1\tlocalhost\n"
700       "192.168.1.1\tmyhost.domain.tld myhost\n")
701     self.assertFileMode(self.tmpname, 0644)
702
703   def testSettingDuplicateName(self):
704     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
705
706     self.assertFileContent(self.tmpname,
707       "# This is a test file for /etc/hosts\n"
708       "127.0.0.1\tlocalhost\n"
709       "192.168.1.1 router gw\n"
710       "1.2.3.4\tmyhost\n")
711     self.assertFileMode(self.tmpname, 0644)
712
713   def testRemovingExistingHost(self):
714     RemoveEtcHostsEntry(self.tmpname, 'router')
715
716     self.assertFileContent(self.tmpname,
717       "# This is a test file for /etc/hosts\n"
718       "127.0.0.1\tlocalhost\n"
719       "192.168.1.1 gw\n")
720     self.assertFileMode(self.tmpname, 0644)
721
722   def testRemovingSingleExistingHost(self):
723     RemoveEtcHostsEntry(self.tmpname, 'localhost')
724
725     self.assertFileContent(self.tmpname,
726       "# This is a test file for /etc/hosts\n"
727       "192.168.1.1 router gw\n")
728     self.assertFileMode(self.tmpname, 0644)
729
730   def testRemovingNonExistingHost(self):
731     RemoveEtcHostsEntry(self.tmpname, 'myhost')
732
733     self.assertFileContent(self.tmpname,
734       "# This is a test file for /etc/hosts\n"
735       "127.0.0.1\tlocalhost\n"
736       "192.168.1.1 router gw\n")
737     self.assertFileMode(self.tmpname, 0644)
738
739   def testRemovingAlias(self):
740     RemoveEtcHostsEntry(self.tmpname, 'gw')
741
742     self.assertFileContent(self.tmpname,
743       "# This is a test file for /etc/hosts\n"
744       "127.0.0.1\tlocalhost\n"
745       "192.168.1.1 router\n")
746     self.assertFileMode(self.tmpname, 0644)
747
748
749 class TestShellQuoting(unittest.TestCase):
750   """Test case for shell quoting functions"""
751
752   def testShellQuote(self):
753     self.assertEqual(ShellQuote('abc'), "abc")
754     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
755     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
756     self.assertEqual(ShellQuote("a b c"), "'a b c'")
757     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
758
759   def testShellQuoteArgs(self):
760     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
761     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
762     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
763
764
765 class TestTcpPing(unittest.TestCase):
766   """Testcase for TCP version of ping - against listen(2)ing port"""
767
768   def setUp(self):
769     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
770     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
771     self.listenerport = self.listener.getsockname()[1]
772     self.listener.listen(1)
773
774   def tearDown(self):
775     self.listener.shutdown(socket.SHUT_RDWR)
776     del self.listener
777     del self.listenerport
778
779   def testTcpPingToLocalHostAccept(self):
780     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
781                          self.listenerport,
782                          timeout=10,
783                          live_port_needed=True,
784                          source=constants.LOCALHOST_IP_ADDRESS,
785                          ),
786                  "failed to connect to test listener")
787
788     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
789                          self.listenerport,
790                          timeout=10,
791                          live_port_needed=True,
792                          ),
793                  "failed to connect to test listener (no source)")
794
795
796 class TestTcpPingDeaf(unittest.TestCase):
797   """Testcase for TCP version of ping - against non listen(2)ing port"""
798
799   def setUp(self):
800     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
801     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
802     self.deaflistenerport = self.deaflistener.getsockname()[1]
803
804   def tearDown(self):
805     del self.deaflistener
806     del self.deaflistenerport
807
808   def testTcpPingToLocalHostAcceptDeaf(self):
809     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
810                         self.deaflistenerport,
811                         timeout=constants.TCP_PING_TIMEOUT,
812                         live_port_needed=True,
813                         source=constants.LOCALHOST_IP_ADDRESS,
814                         ), # need successful connect(2)
815                 "successfully connected to deaf listener")
816
817     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
818                         self.deaflistenerport,
819                         timeout=constants.TCP_PING_TIMEOUT,
820                         live_port_needed=True,
821                         ), # need successful connect(2)
822                 "successfully connected to deaf listener (no source addr)")
823
824   def testTcpPingToLocalHostNoAccept(self):
825     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
826                          self.deaflistenerport,
827                          timeout=constants.TCP_PING_TIMEOUT,
828                          live_port_needed=False,
829                          source=constants.LOCALHOST_IP_ADDRESS,
830                          ), # ECONNREFUSED is OK
831                  "failed to ping alive host on deaf port")
832
833     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
834                          self.deaflistenerport,
835                          timeout=constants.TCP_PING_TIMEOUT,
836                          live_port_needed=False,
837                          ), # ECONNREFUSED is OK
838                  "failed to ping alive host on deaf port (no source addr)")
839
840
841 class TestOwnIpAddress(unittest.TestCase):
842   """Testcase for OwnIpAddress"""
843
844   def testOwnLoopback(self):
845     """check having the loopback ip"""
846     self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
847                     "Should own the loopback address")
848
849   def testNowOwnAddress(self):
850     """check that I don't own an address"""
851
852     # network 192.0.2.0/24 is reserved for test/documentation as per
853     # rfc 3330, so we *should* not have an address of this range... if
854     # this fails, we should extend the test to multiple addresses
855     DST_IP = "192.0.2.1"
856     self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
857
858
859 class TestListVisibleFiles(unittest.TestCase):
860   """Test case for ListVisibleFiles"""
861
862   def setUp(self):
863     self.path = tempfile.mkdtemp()
864
865   def tearDown(self):
866     shutil.rmtree(self.path)
867
868   def _test(self, files, expected):
869     # Sort a copy
870     expected = expected[:]
871     expected.sort()
872
873     for name in files:
874       f = open(os.path.join(self.path, name), 'w')
875       try:
876         f.write("Test\n")
877       finally:
878         f.close()
879
880     found = ListVisibleFiles(self.path)
881     found.sort()
882
883     self.assertEqual(found, expected)
884
885   def testAllVisible(self):
886     files = ["a", "b", "c"]
887     expected = files
888     self._test(files, expected)
889
890   def testNoneVisible(self):
891     files = [".a", ".b", ".c"]
892     expected = []
893     self._test(files, expected)
894
895   def testSomeVisible(self):
896     files = ["a", "b", ".c"]
897     expected = ["a", "b"]
898     self._test(files, expected)
899
900   def testNonAbsolutePath(self):
901     self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
902
903   def testNonNormalizedPath(self):
904     self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
905                           "/bin/../tmp")
906
907
908 class TestNewUUID(unittest.TestCase):
909   """Test case for NewUUID"""
910
911   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
912                         '[a-f0-9]{4}-[a-f0-9]{12}$')
913
914   def runTest(self):
915     self.failUnless(self._re_uuid.match(utils.NewUUID()))
916
917
918 class TestUniqueSequence(unittest.TestCase):
919   """Test case for UniqueSequence"""
920
921   def _test(self, input, expected):
922     self.assertEqual(utils.UniqueSequence(input), expected)
923
924   def runTest(self):
925     # Ordered input
926     self._test([1, 2, 3], [1, 2, 3])
927     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
928     self._test([1, 2, 2, 3], [1, 2, 3])
929     self._test([1, 2, 3, 3], [1, 2, 3])
930
931     # Unordered input
932     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
933     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
934
935     # Strings
936     self._test(["a", "a"], ["a"])
937     self._test(["a", "b"], ["a", "b"])
938     self._test(["a", "b", "a"], ["a", "b"])
939
940
941 class TestFirstFree(unittest.TestCase):
942   """Test case for the FirstFree function"""
943
944   def test(self):
945     """Test FirstFree"""
946     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
947     self.failUnlessEqual(FirstFree([]), None)
948     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
949     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
950     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
951
952
953 class TestTailFile(testutils.GanetiTestCase):
954   """Test case for the TailFile function"""
955
956   def testEmpty(self):
957     fname = self._CreateTempFile()
958     self.failUnlessEqual(TailFile(fname), [])
959     self.failUnlessEqual(TailFile(fname, lines=25), [])
960
961   def testAllLines(self):
962     data = ["test %d" % i for i in range(30)]
963     for i in range(30):
964       fname = self._CreateTempFile()
965       fd = open(fname, "w")
966       fd.write("\n".join(data[:i]))
967       if i > 0:
968         fd.write("\n")
969       fd.close()
970       self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
971
972   def testPartialLines(self):
973     data = ["test %d" % i for i in range(30)]
974     fname = self._CreateTempFile()
975     fd = open(fname, "w")
976     fd.write("\n".join(data))
977     fd.write("\n")
978     fd.close()
979     for i in range(1, 30):
980       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
981
982   def testBigFile(self):
983     data = ["test %d" % i for i in range(30)]
984     fname = self._CreateTempFile()
985     fd = open(fname, "w")
986     fd.write("X" * 1048576)
987     fd.write("\n")
988     fd.write("\n".join(data))
989     fd.write("\n")
990     fd.close()
991     for i in range(1, 30):
992       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
993
994
995 class TestFileLock(unittest.TestCase):
996   """Test case for the FileLock class"""
997
998   def setUp(self):
999     self.tmpfile = tempfile.NamedTemporaryFile()
1000     self.lock = utils.FileLock(self.tmpfile.name)
1001
1002   def testSharedNonblocking(self):
1003     self.lock.Shared(blocking=False)
1004     self.lock.Close()
1005
1006   def testExclusiveNonblocking(self):
1007     self.lock.Exclusive(blocking=False)
1008     self.lock.Close()
1009
1010   def testUnlockNonblocking(self):
1011     self.lock.Unlock(blocking=False)
1012     self.lock.Close()
1013
1014   def testSharedBlocking(self):
1015     self.lock.Shared(blocking=True)
1016     self.lock.Close()
1017
1018   def testExclusiveBlocking(self):
1019     self.lock.Exclusive(blocking=True)
1020     self.lock.Close()
1021
1022   def testUnlockBlocking(self):
1023     self.lock.Unlock(blocking=True)
1024     self.lock.Close()
1025
1026   def testSharedExclusiveUnlock(self):
1027     self.lock.Shared(blocking=False)
1028     self.lock.Exclusive(blocking=False)
1029     self.lock.Unlock(blocking=False)
1030     self.lock.Close()
1031
1032   def testExclusiveSharedUnlock(self):
1033     self.lock.Exclusive(blocking=False)
1034     self.lock.Shared(blocking=False)
1035     self.lock.Unlock(blocking=False)
1036     self.lock.Close()
1037
1038   def testCloseShared(self):
1039     self.lock.Close()
1040     self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1041
1042   def testCloseExclusive(self):
1043     self.lock.Close()
1044     self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1045
1046   def testCloseUnlock(self):
1047     self.lock.Close()
1048     self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1049
1050
1051 class TestTimeFunctions(unittest.TestCase):
1052   """Test case for time functions"""
1053
1054   def runTest(self):
1055     self.assertEqual(utils.SplitTime(1), (1, 0))
1056     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1057     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1058     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1059     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1060     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1061     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1062     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1063
1064     self.assertRaises(AssertionError, utils.SplitTime, -1)
1065
1066     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1067     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1068     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1069
1070     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1071                      1218448917.481)
1072     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1073
1074     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1075     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1076     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1077     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1078     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1079
1080
1081 class FieldSetTestCase(unittest.TestCase):
1082   """Test case for FieldSets"""
1083
1084   def testSimpleMatch(self):
1085     f = utils.FieldSet("a", "b", "c", "def")
1086     self.failUnless(f.Matches("a"))
1087     self.failIf(f.Matches("d"), "Substring matched")
1088     self.failIf(f.Matches("defghi"), "Prefix string matched")
1089     self.failIf(f.NonMatching(["b", "c"]))
1090     self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1091     self.failUnless(f.NonMatching(["a", "d"]))
1092
1093   def testRegexMatch(self):
1094     f = utils.FieldSet("a", "b([0-9]+)", "c")
1095     self.failUnless(f.Matches("b1"))
1096     self.failUnless(f.Matches("b99"))
1097     self.failIf(f.Matches("b/1"))
1098     self.failIf(f.NonMatching(["b12", "c"]))
1099     self.failUnless(f.NonMatching(["a", "1"]))
1100
1101 class TestForceDictType(unittest.TestCase):
1102   """Test case for ForceDictType"""
1103
1104   def setUp(self):
1105     self.key_types = {
1106       'a': constants.VTYPE_INT,
1107       'b': constants.VTYPE_BOOL,
1108       'c': constants.VTYPE_STRING,
1109       'd': constants.VTYPE_SIZE,
1110       }
1111
1112   def _fdt(self, dict, allowed_values=None):
1113     if allowed_values is None:
1114       ForceDictType(dict, self.key_types)
1115     else:
1116       ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1117
1118     return dict
1119
1120   def testSimpleDict(self):
1121     self.assertEqual(self._fdt({}), {})
1122     self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1123     self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1124     self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1125     self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1126     self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1127     self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1128     self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1129     self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1130     self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1131     self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1132     self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1133
1134   def testErrors(self):
1135     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1136     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1137     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1138     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1139
1140
1141 class TestIsAbsNormPath(unittest.TestCase):
1142   """Testing case for IsProcessAlive"""
1143
1144   def _pathTestHelper(self, path, result):
1145     if result:
1146       self.assert_(IsNormAbsPath(path),
1147           "Path %s should result absolute and normalized" % path)
1148     else:
1149       self.assert_(not IsNormAbsPath(path),
1150           "Path %s should not result absolute and normalized" % path)
1151
1152   def testBase(self):
1153     self._pathTestHelper('/etc', True)
1154     self._pathTestHelper('/srv', True)
1155     self._pathTestHelper('etc', False)
1156     self._pathTestHelper('/etc/../root', False)
1157     self._pathTestHelper('/etc/', False)
1158
1159
1160 class TestSafeEncode(unittest.TestCase):
1161   """Test case for SafeEncode"""
1162
1163   def testAscii(self):
1164     for txt in [string.digits, string.letters, string.punctuation]:
1165       self.failUnlessEqual(txt, SafeEncode(txt))
1166
1167   def testDoubleEncode(self):
1168     for i in range(255):
1169       txt = SafeEncode(chr(i))
1170       self.failUnlessEqual(txt, SafeEncode(txt))
1171
1172   def testUnicode(self):
1173     # 1024 is high enough to catch non-direct ASCII mappings
1174     for i in range(1024):
1175       txt = SafeEncode(unichr(i))
1176       self.failUnlessEqual(txt, SafeEncode(txt))
1177
1178
1179 class TestFormatTime(unittest.TestCase):
1180   """Testing case for FormatTime"""
1181
1182   def testNone(self):
1183     self.failUnlessEqual(FormatTime(None), "N/A")
1184
1185   def testInvalid(self):
1186     self.failUnlessEqual(FormatTime(()), "N/A")
1187
1188   def testNow(self):
1189     # tests that we accept time.time input
1190     FormatTime(time.time())
1191     # tests that we accept int input
1192     FormatTime(int(time.time()))
1193
1194
1195 class RunInSeparateProcess(unittest.TestCase):
1196   def test(self):
1197     for exp in [True, False]:
1198       def _child():
1199         return exp
1200
1201       self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1202
1203   def testPid(self):
1204     parent_pid = os.getpid()
1205
1206     def _check():
1207       return os.getpid() == parent_pid
1208
1209     self.failIf(utils.RunInSeparateProcess(_check))
1210
1211   def testSignal(self):
1212     def _kill():
1213       os.kill(os.getpid(), signal.SIGTERM)
1214
1215     self.assertRaises(errors.GenericError,
1216                       utils.RunInSeparateProcess, _kill)
1217
1218   def testException(self):
1219     def _exc():
1220       raise errors.GenericError("This is a test")
1221
1222     self.assertRaises(errors.GenericError,
1223                       utils.RunInSeparateProcess, _exc)
1224
1225
1226 class TestFingerprintFile(unittest.TestCase):
1227   def setUp(self):
1228     self.tmpfile = tempfile.NamedTemporaryFile()
1229
1230   def test(self):
1231     self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1232                      "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1233
1234     utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1235     self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1236                      "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1237
1238
1239 class TestUnescapeAndSplit(unittest.TestCase):
1240   """Testing case for UnescapeAndSplit"""
1241
1242   def setUp(self):
1243     # testing more that one separator for regexp safety
1244     self._seps = [",", "+", "."]
1245
1246   def testSimple(self):
1247     a = ["a", "b", "c", "d"]
1248     for sep in self._seps:
1249       self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1250
1251   def testEscape(self):
1252     for sep in self._seps:
1253       a = ["a", "b\\" + sep + "c", "d"]
1254       b = ["a", "b" + sep + "c", "d"]
1255       self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1256
1257   def testDoubleEscape(self):
1258     for sep in self._seps:
1259       a = ["a", "b\\\\", "c", "d"]
1260       b = ["a", "b\\", "c", "d"]
1261       self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1262
1263   def testThreeEscape(self):
1264     for sep in self._seps:
1265       a = ["a", "b\\\\\\" + sep + "c", "d"]
1266       b = ["a", "b\\" + sep + "c", "d"]
1267       self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1268
1269
1270 class TestPathJoin(unittest.TestCase):
1271   """Testing case for PathJoin"""
1272
1273   def testBasicItems(self):
1274     mlist = ["/a", "b", "c"]
1275     self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1276
1277   def testNonAbsPrefix(self):
1278     self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1279
1280   def testBackTrack(self):
1281     self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1282
1283   def testMultiAbs(self):
1284     self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1285
1286
1287 if __name__ == '__main__':
1288   testutils.GanetiTestProgram()