Support passing in file object in utils.FileLock
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import stat
31 import md5
32 import signal
33 import socket
34 import shutil
35 import re
36 import select
37 import string
38
39 import ganeti
40 import testutils
41 from ganeti import constants
42 from ganeti import utils
43 from ganeti import errors
44 from ganeti.utils import IsProcessAlive, RunCmd, \
45      RemoveFile, MatchNameComponent, FormatUnit, \
46      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
47      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
48      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
49      TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
50      UnescapeAndSplit, RunParts, PathJoin, HostInfo
51
52 from ganeti.errors import LockError, UnitParseError, GenericError, \
53      ProgrammerError, OpPrereqError
54
55
56 class TestIsProcessAlive(unittest.TestCase):
57   """Testing case for IsProcessAlive"""
58
59   def testExists(self):
60     mypid = os.getpid()
61     self.assert_(IsProcessAlive(mypid),
62                  "can't find myself running")
63
64   def testNotExisting(self):
65     pid_non_existing = os.fork()
66     if pid_non_existing == 0:
67       os._exit(0)
68     elif pid_non_existing < 0:
69       raise SystemError("can't fork")
70     os.waitpid(pid_non_existing, 0)
71     self.assert_(not IsProcessAlive(pid_non_existing),
72                  "nonexisting process detected")
73
74
75 class TestPidFileFunctions(unittest.TestCase):
76   """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
77
78   def setUp(self):
79     self.dir = tempfile.mkdtemp()
80     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
81     utils.DaemonPidFileName = self.f_dpn
82
83   def testPidFileFunctions(self):
84     pid_file = self.f_dpn('test')
85     utils.WritePidFile('test')
86     self.failUnless(os.path.exists(pid_file),
87                     "PID file should have been created")
88     read_pid = utils.ReadPidFile(pid_file)
89     self.failUnlessEqual(read_pid, os.getpid())
90     self.failUnless(utils.IsProcessAlive(read_pid))
91     self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
92     utils.RemovePidFile('test')
93     self.failIf(os.path.exists(pid_file),
94                 "PID file should not exist anymore")
95     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
96                          "ReadPidFile should return 0 for missing pid file")
97     fh = open(pid_file, "w")
98     fh.write("blah\n")
99     fh.close()
100     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
101                          "ReadPidFile should return 0 for invalid pid file")
102     utils.RemovePidFile('test')
103     self.failIf(os.path.exists(pid_file),
104                 "PID file should not exist anymore")
105
106   def testKill(self):
107     pid_file = self.f_dpn('child')
108     r_fd, w_fd = os.pipe()
109     new_pid = os.fork()
110     if new_pid == 0: #child
111       utils.WritePidFile('child')
112       os.write(w_fd, 'a')
113       signal.pause()
114       os._exit(0)
115       return
116     # else we are in the parent
117     # wait until the child has written the pid file
118     os.read(r_fd, 1)
119     read_pid = utils.ReadPidFile(pid_file)
120     self.failUnlessEqual(read_pid, new_pid)
121     self.failUnless(utils.IsProcessAlive(new_pid))
122     utils.KillProcess(new_pid, waitpid=True)
123     self.failIf(utils.IsProcessAlive(new_pid))
124     utils.RemovePidFile('child')
125     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
126
127   def tearDown(self):
128     for name in os.listdir(self.dir):
129       os.unlink(os.path.join(self.dir, name))
130     os.rmdir(self.dir)
131
132
133 class TestRunCmd(testutils.GanetiTestCase):
134   """Testing case for the RunCmd function"""
135
136   def setUp(self):
137     testutils.GanetiTestCase.setUp(self)
138     self.magic = time.ctime() + " ganeti test"
139     self.fname = self._CreateTempFile()
140
141   def testOk(self):
142     """Test successful exit code"""
143     result = RunCmd("/bin/sh -c 'exit 0'")
144     self.assertEqual(result.exit_code, 0)
145     self.assertEqual(result.output, "")
146
147   def testFail(self):
148     """Test fail exit code"""
149     result = RunCmd("/bin/sh -c 'exit 1'")
150     self.assertEqual(result.exit_code, 1)
151     self.assertEqual(result.output, "")
152
153   def testStdout(self):
154     """Test standard output"""
155     cmd = 'echo -n "%s"' % self.magic
156     result = RunCmd("/bin/sh -c '%s'" % cmd)
157     self.assertEqual(result.stdout, self.magic)
158     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
159     self.assertEqual(result.output, "")
160     self.assertFileContent(self.fname, self.magic)
161
162   def testStderr(self):
163     """Test standard error"""
164     cmd = 'echo -n "%s"' % self.magic
165     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
166     self.assertEqual(result.stderr, self.magic)
167     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
168     self.assertEqual(result.output, "")
169     self.assertFileContent(self.fname, self.magic)
170
171   def testCombined(self):
172     """Test combined output"""
173     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
174     expected = "A" + self.magic + "B" + self.magic
175     result = RunCmd("/bin/sh -c '%s'" % cmd)
176     self.assertEqual(result.output, expected)
177     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
178     self.assertEqual(result.output, "")
179     self.assertFileContent(self.fname, expected)
180
181   def testSignal(self):
182     """Test signal"""
183     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
184     self.assertEqual(result.signal, 15)
185     self.assertEqual(result.output, "")
186
187   def testListRun(self):
188     """Test list runs"""
189     result = RunCmd(["true"])
190     self.assertEqual(result.signal, None)
191     self.assertEqual(result.exit_code, 0)
192     result = RunCmd(["/bin/sh", "-c", "exit 1"])
193     self.assertEqual(result.signal, None)
194     self.assertEqual(result.exit_code, 1)
195     result = RunCmd(["echo", "-n", self.magic])
196     self.assertEqual(result.signal, None)
197     self.assertEqual(result.exit_code, 0)
198     self.assertEqual(result.stdout, self.magic)
199
200   def testFileEmptyOutput(self):
201     """Test file output"""
202     result = RunCmd(["true"], output=self.fname)
203     self.assertEqual(result.signal, None)
204     self.assertEqual(result.exit_code, 0)
205     self.assertFileContent(self.fname, "")
206
207   def testLang(self):
208     """Test locale environment"""
209     old_env = os.environ.copy()
210     try:
211       os.environ["LANG"] = "en_US.UTF-8"
212       os.environ["LC_ALL"] = "en_US.UTF-8"
213       result = RunCmd(["locale"])
214       for line in result.output.splitlines():
215         key, value = line.split("=", 1)
216         # Ignore these variables, they're overridden by LC_ALL
217         if key == "LANG" or key == "LANGUAGE":
218           continue
219         self.failIf(value and value != "C" and value != '"C"',
220             "Variable %s is set to the invalid value '%s'" % (key, value))
221     finally:
222       os.environ = old_env
223
224   def testDefaultCwd(self):
225     """Test default working directory"""
226     self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
227
228   def testCwd(self):
229     """Test default working directory"""
230     self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
231     self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
232     cwd = os.getcwd()
233     self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
234
235   def testResetEnv(self):
236     """Test environment reset functionality"""
237     self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
238
239
240 class TestRunParts(unittest.TestCase):
241   """Testing case for the RunParts function"""
242
243   def setUp(self):
244     self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
245
246   def tearDown(self):
247     shutil.rmtree(self.rundir)
248
249   def testEmpty(self):
250     """Test on an empty dir"""
251     self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
252
253   def testSkipWrongName(self):
254     """Test that wrong files are skipped"""
255     fname = os.path.join(self.rundir, "00test.dot")
256     utils.WriteFile(fname, data="")
257     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
258     relname = os.path.basename(fname)
259     self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
260                          [(relname, constants.RUNPARTS_SKIP, None)])
261
262   def testSkipNonExec(self):
263     """Test that non executable files are skipped"""
264     fname = os.path.join(self.rundir, "00test")
265     utils.WriteFile(fname, data="")
266     relname = os.path.basename(fname)
267     self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
268                          [(relname, constants.RUNPARTS_SKIP, None)])
269
270   def testError(self):
271     """Test error on a broken executable"""
272     fname = os.path.join(self.rundir, "00test")
273     utils.WriteFile(fname, data="")
274     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
275     (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
276     self.failUnlessEqual(relname, os.path.basename(fname))
277     self.failUnlessEqual(status, constants.RUNPARTS_ERR)
278     self.failUnless(error)
279
280   def testSorted(self):
281     """Test executions are sorted"""
282     files = []
283     files.append(os.path.join(self.rundir, "64test"))
284     files.append(os.path.join(self.rundir, "00test"))
285     files.append(os.path.join(self.rundir, "42test"))
286
287     for fname in files:
288       utils.WriteFile(fname, data="")
289
290     results = RunParts(self.rundir, reset_env=True)
291
292     for fname in sorted(files):
293       self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
294
295   def testOk(self):
296     """Test correct execution"""
297     fname = os.path.join(self.rundir, "00test")
298     utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
299     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
300     (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
301     self.failUnlessEqual(relname, os.path.basename(fname))
302     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
303     self.failUnlessEqual(runresult.stdout, "ciao")
304
305   def testRunFail(self):
306     """Test correct execution, with run failure"""
307     fname = os.path.join(self.rundir, "00test")
308     utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
309     os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
310     (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
311     self.failUnlessEqual(relname, os.path.basename(fname))
312     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
313     self.failUnlessEqual(runresult.exit_code, 1)
314     self.failUnless(runresult.failed)
315
316   def testRunMix(self):
317     files = []
318     files.append(os.path.join(self.rundir, "00test"))
319     files.append(os.path.join(self.rundir, "42test"))
320     files.append(os.path.join(self.rundir, "64test"))
321     files.append(os.path.join(self.rundir, "99test"))
322
323     files.sort()
324
325     # 1st has errors in execution
326     utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
327     os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
328
329     # 2nd is skipped
330     utils.WriteFile(files[1], data="")
331
332     # 3rd cannot execute properly
333     utils.WriteFile(files[2], data="")
334     os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
335
336     # 4th execs
337     utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
338     os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
339
340     results = RunParts(self.rundir, reset_env=True)
341
342     (relname, status, runresult) = results[0]
343     self.failUnlessEqual(relname, os.path.basename(files[0]))
344     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
345     self.failUnlessEqual(runresult.exit_code, 1)
346     self.failUnless(runresult.failed)
347
348     (relname, status, runresult) = results[1]
349     self.failUnlessEqual(relname, os.path.basename(files[1]))
350     self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
351     self.failUnlessEqual(runresult, None)
352
353     (relname, status, runresult) = results[2]
354     self.failUnlessEqual(relname, os.path.basename(files[2]))
355     self.failUnlessEqual(status, constants.RUNPARTS_ERR)
356     self.failUnless(runresult)
357
358     (relname, status, runresult) = results[3]
359     self.failUnlessEqual(relname, os.path.basename(files[3]))
360     self.failUnlessEqual(status, constants.RUNPARTS_RUN)
361     self.failUnlessEqual(runresult.output, "ciao")
362     self.failUnlessEqual(runresult.exit_code, 0)
363     self.failUnless(not runresult.failed)
364
365
366 class TestRemoveFile(unittest.TestCase):
367   """Test case for the RemoveFile function"""
368
369   def setUp(self):
370     """Create a temp dir and file for each case"""
371     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
372     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
373     os.close(fd)
374
375   def tearDown(self):
376     if os.path.exists(self.tmpfile):
377       os.unlink(self.tmpfile)
378     os.rmdir(self.tmpdir)
379
380
381   def testIgnoreDirs(self):
382     """Test that RemoveFile() ignores directories"""
383     self.assertEqual(None, RemoveFile(self.tmpdir))
384
385
386   def testIgnoreNotExisting(self):
387     """Test that RemoveFile() ignores non-existing files"""
388     RemoveFile(self.tmpfile)
389     RemoveFile(self.tmpfile)
390
391
392   def testRemoveFile(self):
393     """Test that RemoveFile does remove a file"""
394     RemoveFile(self.tmpfile)
395     if os.path.exists(self.tmpfile):
396       self.fail("File '%s' not removed" % self.tmpfile)
397
398
399   def testRemoveSymlink(self):
400     """Test that RemoveFile does remove symlinks"""
401     symlink = self.tmpdir + "/symlink"
402     os.symlink("no-such-file", symlink)
403     RemoveFile(symlink)
404     if os.path.exists(symlink):
405       self.fail("File '%s' not removed" % symlink)
406     os.symlink(self.tmpfile, symlink)
407     RemoveFile(symlink)
408     if os.path.exists(symlink):
409       self.fail("File '%s' not removed" % symlink)
410
411
412 class TestRename(unittest.TestCase):
413   """Test case for RenameFile"""
414
415   def setUp(self):
416     """Create a temporary directory"""
417     self.tmpdir = tempfile.mkdtemp()
418     self.tmpfile = os.path.join(self.tmpdir, "test1")
419
420     # Touch the file
421     open(self.tmpfile, "w").close()
422
423   def tearDown(self):
424     """Remove temporary directory"""
425     shutil.rmtree(self.tmpdir)
426
427   def testSimpleRename1(self):
428     """Simple rename 1"""
429     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
430     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
431
432   def testSimpleRename2(self):
433     """Simple rename 2"""
434     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
435                      mkdir=True)
436     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
437
438   def testRenameMkdir(self):
439     """Rename with mkdir"""
440     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
441                      mkdir=True)
442     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
443     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
444
445     utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
446                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
447                      mkdir=True)
448     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
449     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
450     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
451
452
453 class TestMatchNameComponent(unittest.TestCase):
454   """Test case for the MatchNameComponent function"""
455
456   def testEmptyList(self):
457     """Test that there is no match against an empty list"""
458
459     self.failUnlessEqual(MatchNameComponent("", []), None)
460     self.failUnlessEqual(MatchNameComponent("test", []), None)
461
462   def testSingleMatch(self):
463     """Test that a single match is performed correctly"""
464     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
465     for key in "test2", "test2.example", "test2.example.com":
466       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
467
468   def testMultipleMatches(self):
469     """Test that a multiple match is returned as None"""
470     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
471     for key in "test1", "test1.example":
472       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
473
474   def testFullMatch(self):
475     """Test that a full match is returned correctly"""
476     key1 = "test1"
477     key2 = "test1.example"
478     mlist = [key2, key2 + ".com"]
479     self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
480     self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
481
482   def testCaseInsensitivePartialMatch(self):
483     """Test for the case_insensitive keyword"""
484     mlist = ["test1.example.com", "test2.example.net"]
485     self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
486                      "test2.example.net")
487     self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
488                      "test2.example.net")
489     self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
490                      "test2.example.net")
491     self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
492                      "test2.example.net")
493
494
495   def testCaseInsensitiveFullMatch(self):
496     mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
497     # Between the two ts1 a full string match non-case insensitive should work
498     self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
499                      None)
500     self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
501                      "ts1.ex")
502     self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
503                      "ts1.ex")
504     # Between the two ts2 only case differs, so only case-match works
505     self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
506                      "ts2.ex")
507     self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
508                      "Ts2.ex")
509     self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
510                      None)
511
512
513 class TestFormatUnit(unittest.TestCase):
514   """Test case for the FormatUnit function"""
515
516   def testMiB(self):
517     self.assertEqual(FormatUnit(1, 'h'), '1M')
518     self.assertEqual(FormatUnit(100, 'h'), '100M')
519     self.assertEqual(FormatUnit(1023, 'h'), '1023M')
520
521     self.assertEqual(FormatUnit(1, 'm'), '1')
522     self.assertEqual(FormatUnit(100, 'm'), '100')
523     self.assertEqual(FormatUnit(1023, 'm'), '1023')
524
525     self.assertEqual(FormatUnit(1024, 'm'), '1024')
526     self.assertEqual(FormatUnit(1536, 'm'), '1536')
527     self.assertEqual(FormatUnit(17133, 'm'), '17133')
528     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
529
530   def testGiB(self):
531     self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
532     self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
533     self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
534     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
535
536     self.assertEqual(FormatUnit(1024, 'g'), '1.0')
537     self.assertEqual(FormatUnit(1536, 'g'), '1.5')
538     self.assertEqual(FormatUnit(17133, 'g'), '16.7')
539     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
540
541     self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
542     self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
543     self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
544
545   def testTiB(self):
546     self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
547     self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
548     self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
549
550     self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
551     self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
552     self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
553
554 class TestParseUnit(unittest.TestCase):
555   """Test case for the ParseUnit function"""
556
557   SCALES = (('', 1),
558             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
559             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
560             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
561
562   def testRounding(self):
563     self.assertEqual(ParseUnit('0'), 0)
564     self.assertEqual(ParseUnit('1'), 4)
565     self.assertEqual(ParseUnit('2'), 4)
566     self.assertEqual(ParseUnit('3'), 4)
567
568     self.assertEqual(ParseUnit('124'), 124)
569     self.assertEqual(ParseUnit('125'), 128)
570     self.assertEqual(ParseUnit('126'), 128)
571     self.assertEqual(ParseUnit('127'), 128)
572     self.assertEqual(ParseUnit('128'), 128)
573     self.assertEqual(ParseUnit('129'), 132)
574     self.assertEqual(ParseUnit('130'), 132)
575
576   def testFloating(self):
577     self.assertEqual(ParseUnit('0'), 0)
578     self.assertEqual(ParseUnit('0.5'), 4)
579     self.assertEqual(ParseUnit('1.75'), 4)
580     self.assertEqual(ParseUnit('1.99'), 4)
581     self.assertEqual(ParseUnit('2.00'), 4)
582     self.assertEqual(ParseUnit('2.01'), 4)
583     self.assertEqual(ParseUnit('3.99'), 4)
584     self.assertEqual(ParseUnit('4.00'), 4)
585     self.assertEqual(ParseUnit('4.01'), 8)
586     self.assertEqual(ParseUnit('1.5G'), 1536)
587     self.assertEqual(ParseUnit('1.8G'), 1844)
588     self.assertEqual(ParseUnit('8.28T'), 8682212)
589
590   def testSuffixes(self):
591     for sep in ('', ' ', '   ', "\t", "\t "):
592       for suffix, scale in TestParseUnit.SCALES:
593         for func in (lambda x: x, str.lower, str.upper):
594           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
595                            1024 * scale)
596
597   def testInvalidInput(self):
598     for sep in ('-', '_', ',', 'a'):
599       for suffix, _ in TestParseUnit.SCALES:
600         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
601
602     for suffix, _ in TestParseUnit.SCALES:
603       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
604
605
606 class TestSshKeys(testutils.GanetiTestCase):
607   """Test case for the AddAuthorizedKey function"""
608
609   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
610   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
611            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
612
613   def setUp(self):
614     testutils.GanetiTestCase.setUp(self)
615     self.tmpname = self._CreateTempFile()
616     handle = open(self.tmpname, 'w')
617     try:
618       handle.write("%s\n" % TestSshKeys.KEY_A)
619       handle.write("%s\n" % TestSshKeys.KEY_B)
620     finally:
621       handle.close()
622
623   def testAddingNewKey(self):
624     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
625
626     self.assertFileContent(self.tmpname,
627       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
628       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
629       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
630       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
631
632   def testAddingAlmostButNotCompletelyTheSameKey(self):
633     AddAuthorizedKey(self.tmpname,
634         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
635
636     self.assertFileContent(self.tmpname,
637       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
638       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
639       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
640       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
641
642   def testAddingExistingKeyWithSomeMoreSpaces(self):
643     AddAuthorizedKey(self.tmpname,
644         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
645
646     self.assertFileContent(self.tmpname,
647       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
648       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
649       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
650
651   def testRemovingExistingKeyWithSomeMoreSpaces(self):
652     RemoveAuthorizedKey(self.tmpname,
653         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
654
655     self.assertFileContent(self.tmpname,
656       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
657       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
658
659   def testRemovingNonExistingKey(self):
660     RemoveAuthorizedKey(self.tmpname,
661         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
662
663     self.assertFileContent(self.tmpname,
664       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
665       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
666       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
667
668
669 class TestEtcHosts(testutils.GanetiTestCase):
670   """Test functions modifying /etc/hosts"""
671
672   def setUp(self):
673     testutils.GanetiTestCase.setUp(self)
674     self.tmpname = self._CreateTempFile()
675     handle = open(self.tmpname, 'w')
676     try:
677       handle.write('# This is a test file for /etc/hosts\n')
678       handle.write('127.0.0.1\tlocalhost\n')
679       handle.write('192.168.1.1 router gw\n')
680     finally:
681       handle.close()
682
683   def testSettingNewIp(self):
684     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
685
686     self.assertFileContent(self.tmpname,
687       "# This is a test file for /etc/hosts\n"
688       "127.0.0.1\tlocalhost\n"
689       "192.168.1.1 router gw\n"
690       "1.2.3.4\tmyhost.domain.tld myhost\n")
691     self.assertFileMode(self.tmpname, 0644)
692
693   def testSettingExistingIp(self):
694     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
695                      ['myhost'])
696
697     self.assertFileContent(self.tmpname,
698       "# This is a test file for /etc/hosts\n"
699       "127.0.0.1\tlocalhost\n"
700       "192.168.1.1\tmyhost.domain.tld myhost\n")
701     self.assertFileMode(self.tmpname, 0644)
702
703   def testSettingDuplicateName(self):
704     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
705
706     self.assertFileContent(self.tmpname,
707       "# This is a test file for /etc/hosts\n"
708       "127.0.0.1\tlocalhost\n"
709       "192.168.1.1 router gw\n"
710       "1.2.3.4\tmyhost\n")
711     self.assertFileMode(self.tmpname, 0644)
712
713   def testRemovingExistingHost(self):
714     RemoveEtcHostsEntry(self.tmpname, 'router')
715
716     self.assertFileContent(self.tmpname,
717       "# This is a test file for /etc/hosts\n"
718       "127.0.0.1\tlocalhost\n"
719       "192.168.1.1 gw\n")
720     self.assertFileMode(self.tmpname, 0644)
721
722   def testRemovingSingleExistingHost(self):
723     RemoveEtcHostsEntry(self.tmpname, 'localhost')
724
725     self.assertFileContent(self.tmpname,
726       "# This is a test file for /etc/hosts\n"
727       "192.168.1.1 router gw\n")
728     self.assertFileMode(self.tmpname, 0644)
729
730   def testRemovingNonExistingHost(self):
731     RemoveEtcHostsEntry(self.tmpname, 'myhost')
732
733     self.assertFileContent(self.tmpname,
734       "# This is a test file for /etc/hosts\n"
735       "127.0.0.1\tlocalhost\n"
736       "192.168.1.1 router gw\n")
737     self.assertFileMode(self.tmpname, 0644)
738
739   def testRemovingAlias(self):
740     RemoveEtcHostsEntry(self.tmpname, 'gw')
741
742     self.assertFileContent(self.tmpname,
743       "# This is a test file for /etc/hosts\n"
744       "127.0.0.1\tlocalhost\n"
745       "192.168.1.1 router\n")
746     self.assertFileMode(self.tmpname, 0644)
747
748
749 class TestShellQuoting(unittest.TestCase):
750   """Test case for shell quoting functions"""
751
752   def testShellQuote(self):
753     self.assertEqual(ShellQuote('abc'), "abc")
754     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
755     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
756     self.assertEqual(ShellQuote("a b c"), "'a b c'")
757     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
758
759   def testShellQuoteArgs(self):
760     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
761     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
762     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
763
764
765 class TestTcpPing(unittest.TestCase):
766   """Testcase for TCP version of ping - against listen(2)ing port"""
767
768   def setUp(self):
769     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
770     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
771     self.listenerport = self.listener.getsockname()[1]
772     self.listener.listen(1)
773
774   def tearDown(self):
775     self.listener.shutdown(socket.SHUT_RDWR)
776     del self.listener
777     del self.listenerport
778
779   def testTcpPingToLocalHostAccept(self):
780     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
781                          self.listenerport,
782                          timeout=10,
783                          live_port_needed=True,
784                          source=constants.LOCALHOST_IP_ADDRESS,
785                          ),
786                  "failed to connect to test listener")
787
788     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
789                          self.listenerport,
790                          timeout=10,
791                          live_port_needed=True,
792                          ),
793                  "failed to connect to test listener (no source)")
794
795
796 class TestTcpPingDeaf(unittest.TestCase):
797   """Testcase for TCP version of ping - against non listen(2)ing port"""
798
799   def setUp(self):
800     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
801     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
802     self.deaflistenerport = self.deaflistener.getsockname()[1]
803
804   def tearDown(self):
805     del self.deaflistener
806     del self.deaflistenerport
807
808   def testTcpPingToLocalHostAcceptDeaf(self):
809     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
810                         self.deaflistenerport,
811                         timeout=constants.TCP_PING_TIMEOUT,
812                         live_port_needed=True,
813                         source=constants.LOCALHOST_IP_ADDRESS,
814                         ), # need successful connect(2)
815                 "successfully connected to deaf listener")
816
817     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
818                         self.deaflistenerport,
819                         timeout=constants.TCP_PING_TIMEOUT,
820                         live_port_needed=True,
821                         ), # need successful connect(2)
822                 "successfully connected to deaf listener (no source addr)")
823
824   def testTcpPingToLocalHostNoAccept(self):
825     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
826                          self.deaflistenerport,
827                          timeout=constants.TCP_PING_TIMEOUT,
828                          live_port_needed=False,
829                          source=constants.LOCALHOST_IP_ADDRESS,
830                          ), # ECONNREFUSED is OK
831                  "failed to ping alive host on deaf port")
832
833     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
834                          self.deaflistenerport,
835                          timeout=constants.TCP_PING_TIMEOUT,
836                          live_port_needed=False,
837                          ), # ECONNREFUSED is OK
838                  "failed to ping alive host on deaf port (no source addr)")
839
840
841 class TestOwnIpAddress(unittest.TestCase):
842   """Testcase for OwnIpAddress"""
843
844   def testOwnLoopback(self):
845     """check having the loopback ip"""
846     self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
847                     "Should own the loopback address")
848
849   def testNowOwnAddress(self):
850     """check that I don't own an address"""
851
852     # network 192.0.2.0/24 is reserved for test/documentation as per
853     # rfc 3330, so we *should* not have an address of this range... if
854     # this fails, we should extend the test to multiple addresses
855     DST_IP = "192.0.2.1"
856     self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
857
858
859 class TestListVisibleFiles(unittest.TestCase):
860   """Test case for ListVisibleFiles"""
861
862   def setUp(self):
863     self.path = tempfile.mkdtemp()
864
865   def tearDown(self):
866     shutil.rmtree(self.path)
867
868   def _test(self, files, expected):
869     # Sort a copy
870     expected = expected[:]
871     expected.sort()
872
873     for name in files:
874       f = open(os.path.join(self.path, name), 'w')
875       try:
876         f.write("Test\n")
877       finally:
878         f.close()
879
880     found = ListVisibleFiles(self.path)
881     found.sort()
882
883     self.assertEqual(found, expected)
884
885   def testAllVisible(self):
886     files = ["a", "b", "c"]
887     expected = files
888     self._test(files, expected)
889
890   def testNoneVisible(self):
891     files = [".a", ".b", ".c"]
892     expected = []
893     self._test(files, expected)
894
895   def testSomeVisible(self):
896     files = ["a", "b", ".c"]
897     expected = ["a", "b"]
898     self._test(files, expected)
899
900   def testNonAbsolutePath(self):
901     self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
902
903   def testNonNormalizedPath(self):
904     self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
905                           "/bin/../tmp")
906
907
908 class TestNewUUID(unittest.TestCase):
909   """Test case for NewUUID"""
910
911   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
912                         '[a-f0-9]{4}-[a-f0-9]{12}$')
913
914   def runTest(self):
915     self.failUnless(self._re_uuid.match(utils.NewUUID()))
916
917
918 class TestUniqueSequence(unittest.TestCase):
919   """Test case for UniqueSequence"""
920
921   def _test(self, input, expected):
922     self.assertEqual(utils.UniqueSequence(input), expected)
923
924   def runTest(self):
925     # Ordered input
926     self._test([1, 2, 3], [1, 2, 3])
927     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
928     self._test([1, 2, 2, 3], [1, 2, 3])
929     self._test([1, 2, 3, 3], [1, 2, 3])
930
931     # Unordered input
932     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
933     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
934
935     # Strings
936     self._test(["a", "a"], ["a"])
937     self._test(["a", "b"], ["a", "b"])
938     self._test(["a", "b", "a"], ["a", "b"])
939
940
941 class TestFirstFree(unittest.TestCase):
942   """Test case for the FirstFree function"""
943
944   def test(self):
945     """Test FirstFree"""
946     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
947     self.failUnlessEqual(FirstFree([]), None)
948     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
949     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
950     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
951
952
953 class TestTailFile(testutils.GanetiTestCase):
954   """Test case for the TailFile function"""
955
956   def testEmpty(self):
957     fname = self._CreateTempFile()
958     self.failUnlessEqual(TailFile(fname), [])
959     self.failUnlessEqual(TailFile(fname, lines=25), [])
960
961   def testAllLines(self):
962     data = ["test %d" % i for i in range(30)]
963     for i in range(30):
964       fname = self._CreateTempFile()
965       fd = open(fname, "w")
966       fd.write("\n".join(data[:i]))
967       if i > 0:
968         fd.write("\n")
969       fd.close()
970       self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
971
972   def testPartialLines(self):
973     data = ["test %d" % i for i in range(30)]
974     fname = self._CreateTempFile()
975     fd = open(fname, "w")
976     fd.write("\n".join(data))
977     fd.write("\n")
978     fd.close()
979     for i in range(1, 30):
980       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
981
982   def testBigFile(self):
983     data = ["test %d" % i for i in range(30)]
984     fname = self._CreateTempFile()
985     fd = open(fname, "w")
986     fd.write("X" * 1048576)
987     fd.write("\n")
988     fd.write("\n".join(data))
989     fd.write("\n")
990     fd.close()
991     for i in range(1, 30):
992       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
993
994
995 class _BaseFileLockTest:
996   """Test case for the FileLock class"""
997
998   def testSharedNonblocking(self):
999     self.lock.Shared(blocking=False)
1000     self.lock.Close()
1001
1002   def testExclusiveNonblocking(self):
1003     self.lock.Exclusive(blocking=False)
1004     self.lock.Close()
1005
1006   def testUnlockNonblocking(self):
1007     self.lock.Unlock(blocking=False)
1008     self.lock.Close()
1009
1010   def testSharedBlocking(self):
1011     self.lock.Shared(blocking=True)
1012     self.lock.Close()
1013
1014   def testExclusiveBlocking(self):
1015     self.lock.Exclusive(blocking=True)
1016     self.lock.Close()
1017
1018   def testUnlockBlocking(self):
1019     self.lock.Unlock(blocking=True)
1020     self.lock.Close()
1021
1022   def testSharedExclusiveUnlock(self):
1023     self.lock.Shared(blocking=False)
1024     self.lock.Exclusive(blocking=False)
1025     self.lock.Unlock(blocking=False)
1026     self.lock.Close()
1027
1028   def testExclusiveSharedUnlock(self):
1029     self.lock.Exclusive(blocking=False)
1030     self.lock.Shared(blocking=False)
1031     self.lock.Unlock(blocking=False)
1032     self.lock.Close()
1033
1034   def testSimpleTimeout(self):
1035     # These will succeed on the first attempt, hence a short timeout
1036     self.lock.Shared(blocking=True, timeout=10.0)
1037     self.lock.Exclusive(blocking=False, timeout=10.0)
1038     self.lock.Unlock(blocking=True, timeout=10.0)
1039     self.lock.Close()
1040
1041   @staticmethod
1042   def _TryLockInner(filename, shared, blocking):
1043     lock = utils.FileLock.Open(filename)
1044
1045     if shared:
1046       fn = lock.Shared
1047     else:
1048       fn = lock.Exclusive
1049
1050     try:
1051       # The timeout doesn't really matter as the parent process waits for us to
1052       # finish anyway.
1053       fn(blocking=blocking, timeout=0.01)
1054     except errors.LockError, err:
1055       return False
1056
1057     return True
1058
1059   def _TryLock(self, *args):
1060     return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1061                                       *args)
1062
1063   def testTimeout(self):
1064     for blocking in [True, False]:
1065       self.lock.Exclusive(blocking=True)
1066       self.failIf(self._TryLock(False, blocking))
1067       self.failIf(self._TryLock(True, blocking))
1068
1069       self.lock.Shared(blocking=True)
1070       self.assert_(self._TryLock(True, blocking))
1071       self.failIf(self._TryLock(False, blocking))
1072
1073   def testCloseShared(self):
1074     self.lock.Close()
1075     self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1076
1077   def testCloseExclusive(self):
1078     self.lock.Close()
1079     self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1080
1081   def testCloseUnlock(self):
1082     self.lock.Close()
1083     self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1084
1085
1086 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1087   TESTDATA = "Hello World\n" * 10
1088
1089   def setUp(self):
1090     testutils.GanetiTestCase.setUp(self)
1091
1092     self.tmpfile = tempfile.NamedTemporaryFile()
1093     utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1094     self.lock = utils.FileLock.Open(self.tmpfile.name)
1095
1096     # Ensure "Open" didn't truncate file
1097     self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1098
1099   def tearDown(self):
1100     self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1101
1102     testutils.GanetiTestCase.tearDown(self)
1103
1104
1105 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1106   def setUp(self):
1107     self.tmpfile = tempfile.NamedTemporaryFile()
1108     self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1109
1110
1111 class TestTimeFunctions(unittest.TestCase):
1112   """Test case for time functions"""
1113
1114   def runTest(self):
1115     self.assertEqual(utils.SplitTime(1), (1, 0))
1116     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1117     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1118     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1119     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1120     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1121     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1122     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1123
1124     self.assertRaises(AssertionError, utils.SplitTime, -1)
1125
1126     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1127     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1128     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1129
1130     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1131                      1218448917.481)
1132     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1133
1134     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1135     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1136     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1137     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1138     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1139
1140
1141 class FieldSetTestCase(unittest.TestCase):
1142   """Test case for FieldSets"""
1143
1144   def testSimpleMatch(self):
1145     f = utils.FieldSet("a", "b", "c", "def")
1146     self.failUnless(f.Matches("a"))
1147     self.failIf(f.Matches("d"), "Substring matched")
1148     self.failIf(f.Matches("defghi"), "Prefix string matched")
1149     self.failIf(f.NonMatching(["b", "c"]))
1150     self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1151     self.failUnless(f.NonMatching(["a", "d"]))
1152
1153   def testRegexMatch(self):
1154     f = utils.FieldSet("a", "b([0-9]+)", "c")
1155     self.failUnless(f.Matches("b1"))
1156     self.failUnless(f.Matches("b99"))
1157     self.failIf(f.Matches("b/1"))
1158     self.failIf(f.NonMatching(["b12", "c"]))
1159     self.failUnless(f.NonMatching(["a", "1"]))
1160
1161 class TestForceDictType(unittest.TestCase):
1162   """Test case for ForceDictType"""
1163
1164   def setUp(self):
1165     self.key_types = {
1166       'a': constants.VTYPE_INT,
1167       'b': constants.VTYPE_BOOL,
1168       'c': constants.VTYPE_STRING,
1169       'd': constants.VTYPE_SIZE,
1170       }
1171
1172   def _fdt(self, dict, allowed_values=None):
1173     if allowed_values is None:
1174       ForceDictType(dict, self.key_types)
1175     else:
1176       ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1177
1178     return dict
1179
1180   def testSimpleDict(self):
1181     self.assertEqual(self._fdt({}), {})
1182     self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1183     self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1184     self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1185     self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1186     self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1187     self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1188     self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1189     self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1190     self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1191     self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1192     self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1193
1194   def testErrors(self):
1195     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1196     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1197     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1198     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1199
1200
1201 class TestIsAbsNormPath(unittest.TestCase):
1202   """Testing case for IsProcessAlive"""
1203
1204   def _pathTestHelper(self, path, result):
1205     if result:
1206       self.assert_(IsNormAbsPath(path),
1207           "Path %s should result absolute and normalized" % path)
1208     else:
1209       self.assert_(not IsNormAbsPath(path),
1210           "Path %s should not result absolute and normalized" % path)
1211
1212   def testBase(self):
1213     self._pathTestHelper('/etc', True)
1214     self._pathTestHelper('/srv', True)
1215     self._pathTestHelper('etc', False)
1216     self._pathTestHelper('/etc/../root', False)
1217     self._pathTestHelper('/etc/', False)
1218
1219
1220 class TestSafeEncode(unittest.TestCase):
1221   """Test case for SafeEncode"""
1222
1223   def testAscii(self):
1224     for txt in [string.digits, string.letters, string.punctuation]:
1225       self.failUnlessEqual(txt, SafeEncode(txt))
1226
1227   def testDoubleEncode(self):
1228     for i in range(255):
1229       txt = SafeEncode(chr(i))
1230       self.failUnlessEqual(txt, SafeEncode(txt))
1231
1232   def testUnicode(self):
1233     # 1024 is high enough to catch non-direct ASCII mappings
1234     for i in range(1024):
1235       txt = SafeEncode(unichr(i))
1236       self.failUnlessEqual(txt, SafeEncode(txt))
1237
1238
1239 class TestFormatTime(unittest.TestCase):
1240   """Testing case for FormatTime"""
1241
1242   def testNone(self):
1243     self.failUnlessEqual(FormatTime(None), "N/A")
1244
1245   def testInvalid(self):
1246     self.failUnlessEqual(FormatTime(()), "N/A")
1247
1248   def testNow(self):
1249     # tests that we accept time.time input
1250     FormatTime(time.time())
1251     # tests that we accept int input
1252     FormatTime(int(time.time()))
1253
1254
1255 class RunInSeparateProcess(unittest.TestCase):
1256   def test(self):
1257     for exp in [True, False]:
1258       def _child():
1259         return exp
1260
1261       self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1262
1263   def testArgs(self):
1264     for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1265       def _child(carg1, carg2):
1266         return carg1 == "Foo" and carg2 == arg
1267
1268       self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1269
1270   def testPid(self):
1271     parent_pid = os.getpid()
1272
1273     def _check():
1274       return os.getpid() == parent_pid
1275
1276     self.failIf(utils.RunInSeparateProcess(_check))
1277
1278   def testSignal(self):
1279     def _kill():
1280       os.kill(os.getpid(), signal.SIGTERM)
1281
1282     self.assertRaises(errors.GenericError,
1283                       utils.RunInSeparateProcess, _kill)
1284
1285   def testException(self):
1286     def _exc():
1287       raise errors.GenericError("This is a test")
1288
1289     self.assertRaises(errors.GenericError,
1290                       utils.RunInSeparateProcess, _exc)
1291
1292
1293 class TestFingerprintFile(unittest.TestCase):
1294   def setUp(self):
1295     self.tmpfile = tempfile.NamedTemporaryFile()
1296
1297   def test(self):
1298     self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1299                      "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1300
1301     utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1302     self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1303                      "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1304
1305
1306 class TestUnescapeAndSplit(unittest.TestCase):
1307   """Testing case for UnescapeAndSplit"""
1308
1309   def setUp(self):
1310     # testing more that one separator for regexp safety
1311     self._seps = [",", "+", "."]
1312
1313   def testSimple(self):
1314     a = ["a", "b", "c", "d"]
1315     for sep in self._seps:
1316       self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1317
1318   def testEscape(self):
1319     for sep in self._seps:
1320       a = ["a", "b\\" + sep + "c", "d"]
1321       b = ["a", "b" + sep + "c", "d"]
1322       self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1323
1324   def testDoubleEscape(self):
1325     for sep in self._seps:
1326       a = ["a", "b\\\\", "c", "d"]
1327       b = ["a", "b\\", "c", "d"]
1328       self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1329
1330   def testThreeEscape(self):
1331     for sep in self._seps:
1332       a = ["a", "b\\\\\\" + sep + "c", "d"]
1333       b = ["a", "b\\" + sep + "c", "d"]
1334       self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1335
1336
1337 class TestPathJoin(unittest.TestCase):
1338   """Testing case for PathJoin"""
1339
1340   def testBasicItems(self):
1341     mlist = ["/a", "b", "c"]
1342     self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1343
1344   def testNonAbsPrefix(self):
1345     self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1346
1347   def testBackTrack(self):
1348     self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1349
1350   def testMultiAbs(self):
1351     self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1352
1353
1354 class TestHostInfo(unittest.TestCase):
1355   """Testing case for HostInfo"""
1356
1357   def testUppercase(self):
1358     data = "AbC.example.com"
1359     self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1360
1361   def testTooLongName(self):
1362     data = "a.b." + "c" * 255
1363     self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1364
1365   def testTrailingDot(self):
1366     data = "a.b.c"
1367     self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1368
1369   def testInvalidName(self):
1370     data = [
1371       "a b",
1372       "a/b",
1373       ".a.b",
1374       "a..b",
1375       ]
1376     for value in data:
1377       self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1378
1379   def testValidName(self):
1380     data = [
1381       "a.b",
1382       "a-b",
1383       "a_b",
1384       "a.b.c",
1385       ]
1386     for value in data:
1387       HostInfo.NormalizeName(value)
1388
1389
1390
1391 if __name__ == '__main__':
1392   testutils.GanetiTestProgram()