4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.io"""
34 from ganeti import constants
35 from ganeti import utils
36 from ganeti import compat
37 from ganeti import errors
42 class TestReadFile(testutils.GanetiTestCase):
43 def testReadAll(self):
44 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45 self.assertEqual(len(data), 814)
49 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
51 def testReadSize(self):
52 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
54 self.assertEqual(len(data), 100)
58 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
60 def testCallback(self):
62 self.assertEqual(fh.tell(), 0)
63 data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64 self.assertEqual(len(data), 814)
67 self.assertRaises(EnvironmentError, utils.ReadFile,
68 "/dev/null/does-not-exist")
71 class TestReadOneLineFile(testutils.GanetiTestCase):
73 testutils.GanetiTestCase.setUp(self)
75 def testDefault(self):
76 data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77 self.assertEqual(len(data), 27)
78 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
80 def testNotStrict(self):
81 data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
83 self.assertEqual(len(data), 27)
84 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
86 def testStrictFailure(self):
87 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88 self._TestDataFilename("cert1.pem"), strict=True)
90 def testLongLine(self):
91 dummydata = (1024 * "Hello World! ")
92 myfile = self._CreateTempFile()
93 utils.WriteFile(myfile, data=dummydata)
94 datastrict = utils.ReadOneLineFile(myfile, strict=True)
95 datalax = utils.ReadOneLineFile(myfile, strict=False)
96 self.assertEqual(dummydata, datastrict)
97 self.assertEqual(dummydata, datalax)
99 def testNewline(self):
100 myfile = self._CreateTempFile()
102 for nl in ["", "\n", "\r\n"]:
103 dummydata = "%s%s" % (myline, nl)
104 utils.WriteFile(myfile, data=dummydata)
105 datalax = utils.ReadOneLineFile(myfile, strict=False)
106 self.assertEqual(myline, datalax)
107 datastrict = utils.ReadOneLineFile(myfile, strict=True)
108 self.assertEqual(myline, datastrict)
110 def testWhitespaceAndMultipleLines(self):
111 myfile = self._CreateTempFile()
112 for nl in ["", "\n", "\r\n"]:
113 for ws in [" ", "\t", "\t\t \t", "\t "]:
114 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115 utils.WriteFile(myfile, data=dummydata)
116 datalax = utils.ReadOneLineFile(myfile, strict=False)
118 self.assert_(set("\r\n") & set(dummydata))
119 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
121 explen = len("Foo bar baz ") + len(ws)
122 self.assertEqual(len(datalax), explen)
123 self.assertEqual(datalax, dummydata[:explen])
124 self.assertFalse(set("\r\n") & set(datalax))
126 datastrict = utils.ReadOneLineFile(myfile, strict=True)
127 self.assertEqual(dummydata, datastrict)
128 self.assertEqual(dummydata, datalax)
130 def testEmptylines(self):
131 myfile = self._CreateTempFile()
133 for nl in ["\n", "\r\n"]:
134 for ol in ["", "otherline"]:
135 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136 utils.WriteFile(myfile, data=dummydata)
137 self.assert_(set("\r\n") & set(dummydata))
138 datalax = utils.ReadOneLineFile(myfile, strict=False)
139 self.assertEqual(myline, datalax)
141 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
144 datastrict = utils.ReadOneLineFile(myfile, strict=True)
145 self.assertEqual(myline, datastrict)
147 def testEmptyfile(self):
148 myfile = self._CreateTempFile()
149 self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
152 class TestTimestampForFilename(unittest.TestCase):
154 self.assert_("." not in utils.TimestampForFilename())
155 self.assert_(":" not in utils.TimestampForFilename())
158 class TestCreateBackup(testutils.GanetiTestCase):
160 testutils.GanetiTestCase.setUp(self)
162 self.tmpdir = tempfile.mkdtemp()
165 testutils.GanetiTestCase.tearDown(self)
167 shutil.rmtree(self.tmpdir)
170 filename = utils.PathJoin(self.tmpdir, "config.data")
171 utils.WriteFile(filename, data="")
172 bname = utils.CreateBackup(filename)
173 self.assertFileContent(bname, "")
174 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175 utils.CreateBackup(filename)
176 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177 utils.CreateBackup(filename)
178 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
180 fifoname = utils.PathJoin(self.tmpdir, "fifo")
182 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
184 def testContent(self):
186 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187 for rep in [1, 2, 10, 127]:
188 testdata = data * rep
190 filename = utils.PathJoin(self.tmpdir, "test.data_")
191 utils.WriteFile(filename, data=testdata)
192 self.assertFileContent(filename, testdata)
195 bname = utils.CreateBackup(filename)
197 self.assertFileContent(bname, testdata)
198 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
201 class TestListVisibleFiles(unittest.TestCase):
202 """Test case for ListVisibleFiles"""
205 self.path = tempfile.mkdtemp()
208 shutil.rmtree(self.path)
210 def _CreateFiles(self, files):
212 utils.WriteFile(os.path.join(self.path, name), data="test")
214 def _test(self, files, expected):
215 self._CreateFiles(files)
216 found = utils.ListVisibleFiles(self.path)
217 self.assertEqual(set(found), set(expected))
219 def testAllVisible(self):
220 files = ["a", "b", "c"]
222 self._test(files, expected)
224 def testNoneVisible(self):
225 files = [".a", ".b", ".c"]
227 self._test(files, expected)
229 def testSomeVisible(self):
230 files = ["a", "b", ".c"]
231 expected = ["a", "b"]
232 self._test(files, expected)
234 def testNonAbsolutePath(self):
235 self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
238 def testNonNormalizedPath(self):
239 self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
243 class TestWriteFile(unittest.TestCase):
246 self.tfile = tempfile.NamedTemporaryFile()
248 self.did_post = False
249 self.did_write = False
253 shutil.rmtree(self.tmpdir)
255 def markPre(self, fd):
258 def markPost(self, fd):
261 def markWrite(self, fd):
262 self.did_write = True
266 utils.WriteFile(self.tfile.name, data=data)
267 self.assertEqual(utils.ReadFile(self.tfile.name), data)
269 def testWriteSimpleUnicode(self):
271 utils.WriteFile(self.tfile.name, data=data)
272 self.assertEqual(utils.ReadFile(self.tfile.name), data)
274 def testErrors(self):
275 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
276 self.tfile.name, data="test", fn=lambda fd: None)
277 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
278 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
279 self.tfile.name, data="test", atime=0)
281 def testPreWrite(self):
282 utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
283 self.assertTrue(self.did_pre)
284 self.assertFalse(self.did_post)
285 self.assertFalse(self.did_write)
287 def testPostWrite(self):
288 utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
289 self.assertFalse(self.did_pre)
290 self.assertTrue(self.did_post)
291 self.assertFalse(self.did_write)
293 def testWriteFunction(self):
294 utils.WriteFile(self.tfile.name, fn=self.markWrite)
295 self.assertFalse(self.did_pre)
296 self.assertFalse(self.did_post)
297 self.assertTrue(self.did_write)
299 def testDryRun(self):
301 self.tfile.write(orig)
303 utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
304 self.assertEqual(utils.ReadFile(self.tfile.name), orig)
308 for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
309 (int(time.time()), 5000)]:
310 utils.WriteFile(f, data="hello", atime=at, mtime=mt)
312 self.assertEqual(st.st_atime, at)
313 self.assertEqual(st.st_mtime, mt)
315 def testNoClose(self):
317 self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
318 fd = utils.WriteFile(self.tfile.name, data=data, close=False)
321 self.assertEqual(os.read(fd, 4096), data)
325 def testNoLeftovers(self):
326 self.tmpdir = tempfile.mkdtemp()
327 self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
330 self.assertEqual(os.listdir(self.tmpdir), ["test"])
332 def testFailRename(self):
333 self.tmpdir = tempfile.mkdtemp()
334 target = utils.PathJoin(self.tmpdir, "target")
336 self.assertRaises(OSError, utils.WriteFile, target, data="abc")
337 self.assertTrue(os.path.isdir(target))
338 self.assertEqual(os.listdir(self.tmpdir), ["target"])
339 self.assertFalse(os.listdir(target))
341 def testFailRenameDryRun(self):
342 self.tmpdir = tempfile.mkdtemp()
343 target = utils.PathJoin(self.tmpdir, "target")
345 self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
346 self.assertTrue(os.path.isdir(target))
347 self.assertEqual(os.listdir(self.tmpdir), ["target"])
348 self.assertFalse(os.listdir(target))
350 def testBackup(self):
351 self.tmpdir = tempfile.mkdtemp()
352 testfile = utils.PathJoin(self.tmpdir, "test")
354 self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
355 self.assertEqual(utils.ReadFile(testfile), "foo")
356 self.assertEqual(os.listdir(self.tmpdir), ["test"])
359 assert os.path.isfile(testfile)
360 self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
361 self.assertEqual(utils.ReadFile(testfile), "bar")
362 self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
363 self.assertTrue("test" in os.listdir(self.tmpdir))
364 self.assertEqual(len(os.listdir(self.tmpdir)), 2)
366 # Write again as dry-run
367 assert os.path.isfile(testfile)
368 self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
371 self.assertEqual(utils.ReadFile(testfile), "bar")
372 self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
373 self.assertTrue("test" in os.listdir(self.tmpdir))
374 self.assertEqual(len(os.listdir(self.tmpdir)), 2)
377 class TestFileID(testutils.GanetiTestCase):
378 def testEquality(self):
379 name = self._CreateTempFile()
380 oldi = utils.GetFileID(path=name)
381 self.failUnless(utils.VerifyFileID(oldi, oldi))
383 def testUpdate(self):
384 name = self._CreateTempFile()
385 oldi = utils.GetFileID(path=name)
387 fd = os.open(name, os.O_RDWR)
389 newi = utils.GetFileID(fd=fd)
390 self.failUnless(utils.VerifyFileID(oldi, newi))
391 self.failUnless(utils.VerifyFileID(newi, oldi))
395 def testWriteFile(self):
396 name = self._CreateTempFile()
397 oldi = utils.GetFileID(path=name)
399 os.utime(name, (mtime + 10, mtime + 10))
400 self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
402 os.utime(name, (mtime - 10, mtime - 10))
403 utils.SafeWriteFile(name, oldi, data="")
404 oldi = utils.GetFileID(path=name)
406 os.utime(name, (mtime + 10, mtime + 10))
407 # this doesn't raise, since we passed None
408 utils.SafeWriteFile(name, None, data="")
411 t = tempfile.NamedTemporaryFile()
412 self.assertRaises(errors.ProgrammerError, utils.GetFileID,
413 path=t.name, fd=t.fileno())
416 class TestRemoveFile(unittest.TestCase):
417 """Test case for the RemoveFile function"""
420 """Create a temp dir and file for each case"""
421 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
422 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
426 if os.path.exists(self.tmpfile):
427 os.unlink(self.tmpfile)
428 os.rmdir(self.tmpdir)
430 def testIgnoreDirs(self):
431 """Test that RemoveFile() ignores directories"""
432 self.assertEqual(None, utils.RemoveFile(self.tmpdir))
434 def testIgnoreNotExisting(self):
435 """Test that RemoveFile() ignores non-existing files"""
436 utils.RemoveFile(self.tmpfile)
437 utils.RemoveFile(self.tmpfile)
439 def testRemoveFile(self):
440 """Test that RemoveFile does remove a file"""
441 utils.RemoveFile(self.tmpfile)
442 if os.path.exists(self.tmpfile):
443 self.fail("File '%s' not removed" % self.tmpfile)
445 def testRemoveSymlink(self):
446 """Test that RemoveFile does remove symlinks"""
447 symlink = self.tmpdir + "/symlink"
448 os.symlink("no-such-file", symlink)
449 utils.RemoveFile(symlink)
450 if os.path.exists(symlink):
451 self.fail("File '%s' not removed" % symlink)
452 os.symlink(self.tmpfile, symlink)
453 utils.RemoveFile(symlink)
454 if os.path.exists(symlink):
455 self.fail("File '%s' not removed" % symlink)
458 class TestRemoveDir(unittest.TestCase):
460 self.tmpdir = tempfile.mkdtemp()
464 shutil.rmtree(self.tmpdir)
465 except EnvironmentError:
468 def testEmptyDir(self):
469 utils.RemoveDir(self.tmpdir)
470 self.assertFalse(os.path.isdir(self.tmpdir))
472 def testNonEmptyDir(self):
473 self.tmpfile = os.path.join(self.tmpdir, "test1")
474 open(self.tmpfile, "w").close()
475 self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
478 class TestRename(unittest.TestCase):
479 """Test case for RenameFile"""
482 """Create a temporary directory"""
483 self.tmpdir = tempfile.mkdtemp()
484 self.tmpfile = os.path.join(self.tmpdir, "test1")
487 open(self.tmpfile, "w").close()
490 """Remove temporary directory"""
491 shutil.rmtree(self.tmpdir)
493 def testSimpleRename1(self):
494 """Simple rename 1"""
495 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
496 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
498 def testSimpleRename2(self):
499 """Simple rename 2"""
500 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
502 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
504 def testRenameMkdir(self):
505 """Rename with mkdir"""
506 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
508 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
509 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
511 self.assertRaises(EnvironmentError, utils.RenameFile,
512 os.path.join(self.tmpdir, "test/xyz"),
513 os.path.join(self.tmpdir, "test/foo/bar/baz"),
516 self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
517 self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
518 self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
519 "test/foo/bar/baz")))
522 class TestMakedirs(unittest.TestCase):
524 self.tmpdir = tempfile.mkdtemp()
527 shutil.rmtree(self.tmpdir)
529 def testNonExisting(self):
530 path = utils.PathJoin(self.tmpdir, "foo")
532 self.assert_(os.path.isdir(path))
534 def testExisting(self):
535 path = utils.PathJoin(self.tmpdir, "foo")
538 self.assert_(os.path.isdir(path))
540 def testRecursiveNonExisting(self):
541 path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
543 self.assert_(os.path.isdir(path))
545 def testRecursiveExisting(self):
546 path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
547 self.assertFalse(os.path.exists(path))
548 os.mkdir(utils.PathJoin(self.tmpdir, "B"))
550 self.assert_(os.path.isdir(path))
553 class TestEnsureDirs(unittest.TestCase):
554 """Tests for EnsureDirs"""
557 self.dir = tempfile.mkdtemp()
558 self.old_umask = os.umask(0777)
560 def testEnsureDirs(self):
562 (utils.PathJoin(self.dir, "foo"), 0777),
563 (utils.PathJoin(self.dir, "bar"), 0000),
565 self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
566 self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
569 os.rmdir(utils.PathJoin(self.dir, "foo"))
570 os.rmdir(utils.PathJoin(self.dir, "bar"))
572 os.umask(self.old_umask)
575 class TestIsNormAbsPath(unittest.TestCase):
576 """Testing case for IsNormAbsPath"""
578 def _pathTestHelper(self, path, result):
580 self.assert_(utils.IsNormAbsPath(path),
581 "Path %s should result absolute and normalized" % path)
583 self.assertFalse(utils.IsNormAbsPath(path),
584 "Path %s should not result absolute and normalized" % path)
587 self._pathTestHelper("/etc", True)
588 self._pathTestHelper("/srv", True)
589 self._pathTestHelper("etc", False)
590 self._pathTestHelper("/etc/../root", False)
591 self._pathTestHelper("/etc/", False)
594 class TestIsBelowDir(unittest.TestCase):
595 """Testing case for IsBelowDir"""
597 def testSamePrefix(self):
598 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
599 self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
601 def testSamePrefixButDifferentDir(self):
602 self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
603 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
605 def testSamePrefixButDirTraversal(self):
606 self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
607 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
609 def testSamePrefixAndTraversal(self):
610 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
611 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
612 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
614 def testBothAbsPath(self):
615 self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
616 self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
617 self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
620 class TestPathJoin(unittest.TestCase):
621 """Testing case for PathJoin"""
623 def testBasicItems(self):
624 mlist = ["/a", "b", "c"]
625 self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
627 def testNonAbsPrefix(self):
628 self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
630 def testBackTrack(self):
631 self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
633 def testMultiAbs(self):
634 self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
637 class TestTailFile(testutils.GanetiTestCase):
638 """Test case for the TailFile function"""
641 fname = self._CreateTempFile()
642 self.failUnlessEqual(utils.TailFile(fname), [])
643 self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
645 def testAllLines(self):
646 data = ["test %d" % i for i in range(30)]
648 fname = self._CreateTempFile()
649 fd = open(fname, "w")
650 fd.write("\n".join(data[:i]))
654 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
656 def testPartialLines(self):
657 data = ["test %d" % i for i in range(30)]
658 fname = self._CreateTempFile()
659 fd = open(fname, "w")
660 fd.write("\n".join(data))
663 for i in range(1, 30):
664 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
666 def testBigFile(self):
667 data = ["test %d" % i for i in range(30)]
668 fname = self._CreateTempFile()
669 fd = open(fname, "w")
670 fd.write("X" * 1048576)
672 fd.write("\n".join(data))
675 for i in range(1, 30):
676 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
679 class TestPidFileFunctions(unittest.TestCase):
680 """Tests for WritePidFile and ReadPidFile"""
683 self.dir = tempfile.mkdtemp()
684 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
686 def testPidFileFunctions(self):
687 pid_file = self.f_dpn('test')
688 fd = utils.WritePidFile(self.f_dpn('test'))
689 self.failUnless(os.path.exists(pid_file),
690 "PID file should have been created")
691 read_pid = utils.ReadPidFile(pid_file)
692 self.failUnlessEqual(read_pid, os.getpid())
693 self.failUnless(utils.IsProcessAlive(read_pid))
694 self.failUnlessRaises(errors.LockError, utils.WritePidFile,
697 utils.RemoveFile(self.f_dpn("test"))
698 self.failIf(os.path.exists(pid_file),
699 "PID file should not exist anymore")
700 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
701 "ReadPidFile should return 0 for missing pid file")
702 fh = open(pid_file, "w")
705 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
706 "ReadPidFile should return 0 for invalid pid file")
707 # but now, even with the file existing, we should be able to lock it
708 fd = utils.WritePidFile(self.f_dpn('test'))
710 utils.RemoveFile(self.f_dpn("test"))
711 self.failIf(os.path.exists(pid_file),
712 "PID file should not exist anymore")
715 pid_file = self.f_dpn('child')
716 r_fd, w_fd = os.pipe()
718 if new_pid == 0: #child
719 utils.WritePidFile(self.f_dpn('child'))
724 # else we are in the parent
725 # wait until the child has written the pid file
727 read_pid = utils.ReadPidFile(pid_file)
728 self.failUnlessEqual(read_pid, new_pid)
729 self.failUnless(utils.IsProcessAlive(new_pid))
730 utils.KillProcess(new_pid, waitpid=True)
731 self.failIf(utils.IsProcessAlive(new_pid))
732 utils.RemoveFile(self.f_dpn('child'))
733 self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
736 shutil.rmtree(self.dir)
739 class TestSshKeys(testutils.GanetiTestCase):
740 """Test case for the AddAuthorizedKey function"""
742 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
743 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
744 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
747 testutils.GanetiTestCase.setUp(self)
748 self.tmpname = self._CreateTempFile()
749 handle = open(self.tmpname, 'w')
751 handle.write("%s\n" % TestSshKeys.KEY_A)
752 handle.write("%s\n" % TestSshKeys.KEY_B)
756 def testAddingNewKey(self):
757 utils.AddAuthorizedKey(self.tmpname,
758 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
760 self.assertFileContent(self.tmpname,
761 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
762 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
763 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
764 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
766 def testAddingAlmostButNotCompletelyTheSameKey(self):
767 utils.AddAuthorizedKey(self.tmpname,
768 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
770 self.assertFileContent(self.tmpname,
771 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
772 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
773 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
774 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
776 def testAddingExistingKeyWithSomeMoreSpaces(self):
777 utils.AddAuthorizedKey(self.tmpname,
778 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
780 self.assertFileContent(self.tmpname,
781 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
782 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
783 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
785 def testRemovingExistingKeyWithSomeMoreSpaces(self):
786 utils.RemoveAuthorizedKey(self.tmpname,
787 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
789 self.assertFileContent(self.tmpname,
790 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
791 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
793 def testRemovingNonExistingKey(self):
794 utils.RemoveAuthorizedKey(self.tmpname,
795 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
797 self.assertFileContent(self.tmpname,
798 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
799 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
800 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
803 class TestNewUUID(unittest.TestCase):
804 """Test case for NewUUID"""
807 self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
810 def _MockStatResult(cb, mode, uid, gid):
822 def _RaiseNoEntError():
823 raise EnvironmentError(errno.ENOENT, "not found")
826 def _OtherStatRaise():
827 raise EnvironmentError()
830 class TestPermissionEnforcements(unittest.TestCase):
837 self._chown_calls = []
838 self._chmod_calls = []
839 self._mkdir_calls = []
842 self.assertRaises(IndexError, self._mkdir_calls.pop)
843 self.assertRaises(IndexError, self._chmod_calls.pop)
844 self.assertRaises(IndexError, self._chown_calls.pop)
846 def _FakeMkdir(self, path):
847 self._mkdir_calls.append(path)
849 def _FakeChown(self, path, uid, gid):
850 self._chown_calls.append((path, uid, gid))
852 def _ChmodWrapper(self, cb):
854 self._chmod_calls.append((path, mode))
859 def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
860 self.assertEqual(path, "/ganeti-qa-non-test")
861 self.assertEqual(mode, 0700)
862 self.assertEqual(uid, self.UID_A)
863 self.assertEqual(gid, self.GID_A)
865 def testMakeDirWithPerm(self):
866 is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
867 utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
868 _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
870 def testDirErrors(self):
871 self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
872 "/ganeti-qa-non-test", 0700, 0, 0,
873 _lstat_fn=_MockStatResult(None, 0, 0, 0))
874 self.assertRaises(IndexError, self._mkdir_calls.pop)
876 other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
877 self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
878 "/ganeti-qa-non-test", 0700, 0, 0,
879 _lstat_fn=other_stat_raise)
880 self.assertRaises(IndexError, self._mkdir_calls.pop)
882 non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
883 utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
884 _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
885 _perm_fn=self._VerifyPerm)
886 self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
888 def testEnforcePermissionNoEnt(self):
889 self.assertRaises(errors.GenericError, utils.EnforcePermission,
890 "/ganeti-qa-non-test", 0600,
891 _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
892 _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
894 def testEnforcePermissionNoEntMustNotExist(self):
895 utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
896 _chmod_fn=NotImplemented,
897 _chown_fn=NotImplemented,
898 _stat_fn=_MockStatResult(_RaiseNoEntError,
901 def testEnforcePermissionOtherErrorMustNotExist(self):
902 self.assertRaises(errors.GenericError, utils.EnforcePermission,
903 "/ganeti-qa-non-test", 0600, must_exist=False,
904 _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
905 _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
907 def testEnforcePermissionNoChanges(self):
908 utils.EnforcePermission("/ganeti-qa-non-test", 0600,
909 _stat_fn=_MockStatResult(None, 0600, 0, 0),
910 _chmod_fn=self._ChmodWrapper(None),
911 _chown_fn=self._FakeChown)
913 def testEnforcePermissionChangeMode(self):
914 utils.EnforcePermission("/ganeti-qa-non-test", 0444,
915 _stat_fn=_MockStatResult(None, 0600, 0, 0),
916 _chmod_fn=self._ChmodWrapper(None),
917 _chown_fn=self._FakeChown)
918 self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
920 def testEnforcePermissionSetUidGid(self):
921 utils.EnforcePermission("/ganeti-qa-non-test", 0600,
922 uid=self.UID_B, gid=self.GID_B,
923 _stat_fn=_MockStatResult(None, 0600,
926 _chmod_fn=self._ChmodWrapper(None),
927 _chown_fn=self._FakeChown)
928 self.assertEqual(self._chown_calls.pop(0),
929 ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
932 if __name__ == "__main__":
933 testutils.GanetiTestProgram()