4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.io"""
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import compat
35 from ganeti import errors
40 class TestReadFile(testutils.GanetiTestCase):
41 def testReadAll(self):
42 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
43 self.assertEqual(len(data), 814)
47 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
49 def testReadSize(self):
50 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
52 self.assertEqual(len(data), 100)
56 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
58 def testCallback(self):
60 self.assertEqual(fh.tell(), 0)
61 data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
62 self.assertEqual(len(data), 814)
65 self.assertRaises(EnvironmentError, utils.ReadFile,
66 "/dev/null/does-not-exist")
69 class TestReadOneLineFile(testutils.GanetiTestCase):
71 testutils.GanetiTestCase.setUp(self)
73 def testDefault(self):
74 data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
75 self.assertEqual(len(data), 27)
76 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
78 def testNotStrict(self):
79 data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
81 self.assertEqual(len(data), 27)
82 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
84 def testStrictFailure(self):
85 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
86 self._TestDataFilename("cert1.pem"), strict=True)
88 def testLongLine(self):
89 dummydata = (1024 * "Hello World! ")
90 myfile = self._CreateTempFile()
91 utils.WriteFile(myfile, data=dummydata)
92 datastrict = utils.ReadOneLineFile(myfile, strict=True)
93 datalax = utils.ReadOneLineFile(myfile, strict=False)
94 self.assertEqual(dummydata, datastrict)
95 self.assertEqual(dummydata, datalax)
97 def testNewline(self):
98 myfile = self._CreateTempFile()
100 for nl in ["", "\n", "\r\n"]:
101 dummydata = "%s%s" % (myline, nl)
102 utils.WriteFile(myfile, data=dummydata)
103 datalax = utils.ReadOneLineFile(myfile, strict=False)
104 self.assertEqual(myline, datalax)
105 datastrict = utils.ReadOneLineFile(myfile, strict=True)
106 self.assertEqual(myline, datastrict)
108 def testWhitespaceAndMultipleLines(self):
109 myfile = self._CreateTempFile()
110 for nl in ["", "\n", "\r\n"]:
111 for ws in [" ", "\t", "\t\t \t", "\t "]:
112 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
113 utils.WriteFile(myfile, data=dummydata)
114 datalax = utils.ReadOneLineFile(myfile, strict=False)
116 self.assert_(set("\r\n") & set(dummydata))
117 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
119 explen = len("Foo bar baz ") + len(ws)
120 self.assertEqual(len(datalax), explen)
121 self.assertEqual(datalax, dummydata[:explen])
122 self.assertFalse(set("\r\n") & set(datalax))
124 datastrict = utils.ReadOneLineFile(myfile, strict=True)
125 self.assertEqual(dummydata, datastrict)
126 self.assertEqual(dummydata, datalax)
128 def testEmptylines(self):
129 myfile = self._CreateTempFile()
131 for nl in ["\n", "\r\n"]:
132 for ol in ["", "otherline"]:
133 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
134 utils.WriteFile(myfile, data=dummydata)
135 self.assert_(set("\r\n") & set(dummydata))
136 datalax = utils.ReadOneLineFile(myfile, strict=False)
137 self.assertEqual(myline, datalax)
139 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
142 datastrict = utils.ReadOneLineFile(myfile, strict=True)
143 self.assertEqual(myline, datastrict)
145 def testEmptyfile(self):
146 myfile = self._CreateTempFile()
147 self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
150 class TestTimestampForFilename(unittest.TestCase):
152 self.assert_("." not in utils.TimestampForFilename())
153 self.assert_(":" not in utils.TimestampForFilename())
156 class TestCreateBackup(testutils.GanetiTestCase):
158 testutils.GanetiTestCase.setUp(self)
160 self.tmpdir = tempfile.mkdtemp()
163 testutils.GanetiTestCase.tearDown(self)
165 shutil.rmtree(self.tmpdir)
168 filename = utils.PathJoin(self.tmpdir, "config.data")
169 utils.WriteFile(filename, data="")
170 bname = utils.CreateBackup(filename)
171 self.assertFileContent(bname, "")
172 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
173 utils.CreateBackup(filename)
174 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
175 utils.CreateBackup(filename)
176 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
178 fifoname = utils.PathJoin(self.tmpdir, "fifo")
180 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
182 def testContent(self):
184 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
185 for rep in [1, 2, 10, 127]:
186 testdata = data * rep
188 filename = utils.PathJoin(self.tmpdir, "test.data_")
189 utils.WriteFile(filename, data=testdata)
190 self.assertFileContent(filename, testdata)
193 bname = utils.CreateBackup(filename)
195 self.assertFileContent(bname, testdata)
196 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
199 class TestListVisibleFiles(unittest.TestCase):
200 """Test case for ListVisibleFiles"""
203 self.path = tempfile.mkdtemp()
206 shutil.rmtree(self.path)
208 def _CreateFiles(self, files):
210 utils.WriteFile(os.path.join(self.path, name), data="test")
212 def _test(self, files, expected):
213 self._CreateFiles(files)
214 found = utils.ListVisibleFiles(self.path)
215 self.assertEqual(set(found), set(expected))
217 def testAllVisible(self):
218 files = ["a", "b", "c"]
220 self._test(files, expected)
222 def testNoneVisible(self):
223 files = [".a", ".b", ".c"]
225 self._test(files, expected)
227 def testSomeVisible(self):
228 files = ["a", "b", ".c"]
229 expected = ["a", "b"]
230 self._test(files, expected)
232 def testNonAbsolutePath(self):
233 self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
236 def testNonNormalizedPath(self):
237 self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
241 class TestWriteFile(unittest.TestCase):
244 self.tfile = tempfile.NamedTemporaryFile()
246 self.did_post = False
247 self.did_write = False
251 shutil.rmtree(self.tmpdir)
253 def markPre(self, fd):
256 def markPost(self, fd):
259 def markWrite(self, fd):
260 self.did_write = True
264 utils.WriteFile(self.tfile.name, data=data)
265 self.assertEqual(utils.ReadFile(self.tfile.name), data)
267 def testWriteSimpleUnicode(self):
269 utils.WriteFile(self.tfile.name, data=data)
270 self.assertEqual(utils.ReadFile(self.tfile.name), data)
272 def testErrors(self):
273 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
274 self.tfile.name, data="test", fn=lambda fd: None)
275 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
276 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
277 self.tfile.name, data="test", atime=0)
279 def testPreWrite(self):
280 utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
281 self.assertTrue(self.did_pre)
282 self.assertFalse(self.did_post)
283 self.assertFalse(self.did_write)
285 def testPostWrite(self):
286 utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
287 self.assertFalse(self.did_pre)
288 self.assertTrue(self.did_post)
289 self.assertFalse(self.did_write)
291 def testWriteFunction(self):
292 utils.WriteFile(self.tfile.name, fn=self.markWrite)
293 self.assertFalse(self.did_pre)
294 self.assertFalse(self.did_post)
295 self.assertTrue(self.did_write)
297 def testDryRun(self):
299 self.tfile.write(orig)
301 utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
302 self.assertEqual(utils.ReadFile(self.tfile.name), orig)
306 for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
307 (int(time.time()), 5000)]:
308 utils.WriteFile(f, data="hello", atime=at, mtime=mt)
310 self.assertEqual(st.st_atime, at)
311 self.assertEqual(st.st_mtime, mt)
313 def testNoClose(self):
315 self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
316 fd = utils.WriteFile(self.tfile.name, data=data, close=False)
319 self.assertEqual(os.read(fd, 4096), data)
323 def testNoLeftovers(self):
324 self.tmpdir = tempfile.mkdtemp()
325 self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
328 self.assertEqual(os.listdir(self.tmpdir), ["test"])
330 def testFailRename(self):
331 self.tmpdir = tempfile.mkdtemp()
332 target = utils.PathJoin(self.tmpdir, "target")
334 self.assertRaises(OSError, utils.WriteFile, target, data="abc")
335 self.assertTrue(os.path.isdir(target))
336 self.assertEqual(os.listdir(self.tmpdir), ["target"])
337 self.assertFalse(os.listdir(target))
339 def testFailRenameDryRun(self):
340 self.tmpdir = tempfile.mkdtemp()
341 target = utils.PathJoin(self.tmpdir, "target")
343 self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
344 self.assertTrue(os.path.isdir(target))
345 self.assertEqual(os.listdir(self.tmpdir), ["target"])
346 self.assertFalse(os.listdir(target))
348 def testBackup(self):
349 self.tmpdir = tempfile.mkdtemp()
350 testfile = utils.PathJoin(self.tmpdir, "test")
352 self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
353 self.assertEqual(utils.ReadFile(testfile), "foo")
354 self.assertEqual(os.listdir(self.tmpdir), ["test"])
357 assert os.path.isfile(testfile)
358 self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
359 self.assertEqual(utils.ReadFile(testfile), "bar")
360 self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
361 self.assertTrue("test" in os.listdir(self.tmpdir))
362 self.assertEqual(len(os.listdir(self.tmpdir)), 2)
364 # Write again as dry-run
365 assert os.path.isfile(testfile)
366 self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
369 self.assertEqual(utils.ReadFile(testfile), "bar")
370 self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
371 self.assertTrue("test" in os.listdir(self.tmpdir))
372 self.assertEqual(len(os.listdir(self.tmpdir)), 2)
375 class TestFileID(testutils.GanetiTestCase):
376 def testEquality(self):
377 name = self._CreateTempFile()
378 oldi = utils.GetFileID(path=name)
379 self.failUnless(utils.VerifyFileID(oldi, oldi))
381 def testUpdate(self):
382 name = self._CreateTempFile()
383 oldi = utils.GetFileID(path=name)
385 fd = os.open(name, os.O_RDWR)
387 newi = utils.GetFileID(fd=fd)
388 self.failUnless(utils.VerifyFileID(oldi, newi))
389 self.failUnless(utils.VerifyFileID(newi, oldi))
393 def testWriteFile(self):
394 name = self._CreateTempFile()
395 oldi = utils.GetFileID(path=name)
397 os.utime(name, (mtime + 10, mtime + 10))
398 self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
400 os.utime(name, (mtime - 10, mtime - 10))
401 utils.SafeWriteFile(name, oldi, data="")
402 oldi = utils.GetFileID(path=name)
404 os.utime(name, (mtime + 10, mtime + 10))
405 # this doesn't raise, since we passed None
406 utils.SafeWriteFile(name, None, data="")
409 t = tempfile.NamedTemporaryFile()
410 self.assertRaises(errors.ProgrammerError, utils.GetFileID,
411 path=t.name, fd=t.fileno())
414 class TestRemoveFile(unittest.TestCase):
415 """Test case for the RemoveFile function"""
418 """Create a temp dir and file for each case"""
419 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
420 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
424 if os.path.exists(self.tmpfile):
425 os.unlink(self.tmpfile)
426 os.rmdir(self.tmpdir)
428 def testIgnoreDirs(self):
429 """Test that RemoveFile() ignores directories"""
430 self.assertEqual(None, utils.RemoveFile(self.tmpdir))
432 def testIgnoreNotExisting(self):
433 """Test that RemoveFile() ignores non-existing files"""
434 utils.RemoveFile(self.tmpfile)
435 utils.RemoveFile(self.tmpfile)
437 def testRemoveFile(self):
438 """Test that RemoveFile does remove a file"""
439 utils.RemoveFile(self.tmpfile)
440 if os.path.exists(self.tmpfile):
441 self.fail("File '%s' not removed" % self.tmpfile)
443 def testRemoveSymlink(self):
444 """Test that RemoveFile does remove symlinks"""
445 symlink = self.tmpdir + "/symlink"
446 os.symlink("no-such-file", symlink)
447 utils.RemoveFile(symlink)
448 if os.path.exists(symlink):
449 self.fail("File '%s' not removed" % symlink)
450 os.symlink(self.tmpfile, symlink)
451 utils.RemoveFile(symlink)
452 if os.path.exists(symlink):
453 self.fail("File '%s' not removed" % symlink)
456 class TestRemoveDir(unittest.TestCase):
458 self.tmpdir = tempfile.mkdtemp()
462 shutil.rmtree(self.tmpdir)
463 except EnvironmentError:
466 def testEmptyDir(self):
467 utils.RemoveDir(self.tmpdir)
468 self.assertFalse(os.path.isdir(self.tmpdir))
470 def testNonEmptyDir(self):
471 self.tmpfile = os.path.join(self.tmpdir, "test1")
472 open(self.tmpfile, "w").close()
473 self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
476 class TestRename(unittest.TestCase):
477 """Test case for RenameFile"""
480 """Create a temporary directory"""
481 self.tmpdir = tempfile.mkdtemp()
482 self.tmpfile = os.path.join(self.tmpdir, "test1")
485 open(self.tmpfile, "w").close()
488 """Remove temporary directory"""
489 shutil.rmtree(self.tmpdir)
491 def testSimpleRename1(self):
492 """Simple rename 1"""
493 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
494 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
496 def testSimpleRename2(self):
497 """Simple rename 2"""
498 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
500 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
502 def testRenameMkdir(self):
503 """Rename with mkdir"""
504 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
506 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
507 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
509 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
510 os.path.join(self.tmpdir, "test/foo/bar/baz"),
512 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
513 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
514 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
517 class TestMakedirs(unittest.TestCase):
519 self.tmpdir = tempfile.mkdtemp()
522 shutil.rmtree(self.tmpdir)
524 def testNonExisting(self):
525 path = utils.PathJoin(self.tmpdir, "foo")
527 self.assert_(os.path.isdir(path))
529 def testExisting(self):
530 path = utils.PathJoin(self.tmpdir, "foo")
533 self.assert_(os.path.isdir(path))
535 def testRecursiveNonExisting(self):
536 path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
538 self.assert_(os.path.isdir(path))
540 def testRecursiveExisting(self):
541 path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
542 self.assertFalse(os.path.exists(path))
543 os.mkdir(utils.PathJoin(self.tmpdir, "B"))
545 self.assert_(os.path.isdir(path))
548 class TestEnsureDirs(unittest.TestCase):
549 """Tests for EnsureDirs"""
552 self.dir = tempfile.mkdtemp()
553 self.old_umask = os.umask(0777)
555 def testEnsureDirs(self):
557 (utils.PathJoin(self.dir, "foo"), 0777),
558 (utils.PathJoin(self.dir, "bar"), 0000),
560 self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
561 self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
564 os.rmdir(utils.PathJoin(self.dir, "foo"))
565 os.rmdir(utils.PathJoin(self.dir, "bar"))
567 os.umask(self.old_umask)
570 class TestIsNormAbsPath(unittest.TestCase):
571 """Testing case for IsNormAbsPath"""
573 def _pathTestHelper(self, path, result):
575 self.assert_(utils.IsNormAbsPath(path),
576 "Path %s should result absolute and normalized" % path)
578 self.assertFalse(utils.IsNormAbsPath(path),
579 "Path %s should not result absolute and normalized" % path)
582 self._pathTestHelper("/etc", True)
583 self._pathTestHelper("/srv", True)
584 self._pathTestHelper("etc", False)
585 self._pathTestHelper("/etc/../root", False)
586 self._pathTestHelper("/etc/", False)
589 class TestIsBelowDir(unittest.TestCase):
590 """Testing case for IsBelowDir"""
592 def testSamePrefix(self):
593 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
594 self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
596 def testSamePrefixButDifferentDir(self):
597 self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
598 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
600 def testSamePrefixButDirTraversal(self):
601 self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
602 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
604 def testSamePrefixAndTraversal(self):
605 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
606 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
607 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
609 def testBothAbsPath(self):
610 self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
611 self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
612 self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
615 class TestPathJoin(unittest.TestCase):
616 """Testing case for PathJoin"""
618 def testBasicItems(self):
619 mlist = ["/a", "b", "c"]
620 self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
622 def testNonAbsPrefix(self):
623 self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
625 def testBackTrack(self):
626 self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
628 def testMultiAbs(self):
629 self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
632 class TestTailFile(testutils.GanetiTestCase):
633 """Test case for the TailFile function"""
636 fname = self._CreateTempFile()
637 self.failUnlessEqual(utils.TailFile(fname), [])
638 self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
640 def testAllLines(self):
641 data = ["test %d" % i for i in range(30)]
643 fname = self._CreateTempFile()
644 fd = open(fname, "w")
645 fd.write("\n".join(data[:i]))
649 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
651 def testPartialLines(self):
652 data = ["test %d" % i for i in range(30)]
653 fname = self._CreateTempFile()
654 fd = open(fname, "w")
655 fd.write("\n".join(data))
658 for i in range(1, 30):
659 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
661 def testBigFile(self):
662 data = ["test %d" % i for i in range(30)]
663 fname = self._CreateTempFile()
664 fd = open(fname, "w")
665 fd.write("X" * 1048576)
667 fd.write("\n".join(data))
670 for i in range(1, 30):
671 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
674 class TestPidFileFunctions(unittest.TestCase):
675 """Tests for WritePidFile and ReadPidFile"""
678 self.dir = tempfile.mkdtemp()
679 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
681 def testPidFileFunctions(self):
682 pid_file = self.f_dpn('test')
683 fd = utils.WritePidFile(self.f_dpn('test'))
684 self.failUnless(os.path.exists(pid_file),
685 "PID file should have been created")
686 read_pid = utils.ReadPidFile(pid_file)
687 self.failUnlessEqual(read_pid, os.getpid())
688 self.failUnless(utils.IsProcessAlive(read_pid))
689 self.failUnlessRaises(errors.LockError, utils.WritePidFile,
692 utils.RemoveFile(self.f_dpn("test"))
693 self.failIf(os.path.exists(pid_file),
694 "PID file should not exist anymore")
695 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
696 "ReadPidFile should return 0 for missing pid file")
697 fh = open(pid_file, "w")
700 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
701 "ReadPidFile should return 0 for invalid pid file")
702 # but now, even with the file existing, we should be able to lock it
703 fd = utils.WritePidFile(self.f_dpn('test'))
705 utils.RemoveFile(self.f_dpn("test"))
706 self.failIf(os.path.exists(pid_file),
707 "PID file should not exist anymore")
710 pid_file = self.f_dpn('child')
711 r_fd, w_fd = os.pipe()
713 if new_pid == 0: #child
714 utils.WritePidFile(self.f_dpn('child'))
719 # else we are in the parent
720 # wait until the child has written the pid file
722 read_pid = utils.ReadPidFile(pid_file)
723 self.failUnlessEqual(read_pid, new_pid)
724 self.failUnless(utils.IsProcessAlive(new_pid))
725 utils.KillProcess(new_pid, waitpid=True)
726 self.failIf(utils.IsProcessAlive(new_pid))
727 utils.RemoveFile(self.f_dpn('child'))
728 self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
731 shutil.rmtree(self.dir)
734 class TestSshKeys(testutils.GanetiTestCase):
735 """Test case for the AddAuthorizedKey function"""
737 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
738 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
739 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
742 testutils.GanetiTestCase.setUp(self)
743 self.tmpname = self._CreateTempFile()
744 handle = open(self.tmpname, 'w')
746 handle.write("%s\n" % TestSshKeys.KEY_A)
747 handle.write("%s\n" % TestSshKeys.KEY_B)
751 def testAddingNewKey(self):
752 utils.AddAuthorizedKey(self.tmpname,
753 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
755 self.assertFileContent(self.tmpname,
756 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
757 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
758 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
759 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
761 def testAddingAlmostButNotCompletelyTheSameKey(self):
762 utils.AddAuthorizedKey(self.tmpname,
763 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
765 self.assertFileContent(self.tmpname,
766 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
767 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
768 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
769 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
771 def testAddingExistingKeyWithSomeMoreSpaces(self):
772 utils.AddAuthorizedKey(self.tmpname,
773 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
775 self.assertFileContent(self.tmpname,
776 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
777 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
778 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
780 def testRemovingExistingKeyWithSomeMoreSpaces(self):
781 utils.RemoveAuthorizedKey(self.tmpname,
782 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
784 self.assertFileContent(self.tmpname,
785 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
786 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
788 def testRemovingNonExistingKey(self):
789 utils.RemoveAuthorizedKey(self.tmpname,
790 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
792 self.assertFileContent(self.tmpname,
793 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
794 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
795 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
798 class TestNewUUID(unittest.TestCase):
799 """Test case for NewUUID"""
802 self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
805 if __name__ == "__main__":
806 testutils.GanetiTestProgram()