4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.io"""
34 from ganeti import constants
35 from ganeti import utils
36 from ganeti import compat
37 from ganeti import errors
42 class TestReadFile(testutils.GanetiTestCase):
43 def testReadAll(self):
44 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45 self.assertEqual(len(data), 814)
49 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
51 def testReadSize(self):
52 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
54 self.assertEqual(len(data), 100)
58 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
60 def testCallback(self):
62 self.assertEqual(fh.tell(), 0)
63 data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64 self.assertEqual(len(data), 814)
67 self.assertRaises(EnvironmentError, utils.ReadFile,
68 "/dev/null/does-not-exist")
71 class TestReadOneLineFile(testutils.GanetiTestCase):
73 testutils.GanetiTestCase.setUp(self)
75 def testDefault(self):
76 data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77 self.assertEqual(len(data), 27)
78 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
80 def testNotStrict(self):
81 data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
83 self.assertEqual(len(data), 27)
84 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
86 def testStrictFailure(self):
87 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88 self._TestDataFilename("cert1.pem"), strict=True)
90 def testLongLine(self):
91 dummydata = (1024 * "Hello World! ")
92 myfile = self._CreateTempFile()
93 utils.WriteFile(myfile, data=dummydata)
94 datastrict = utils.ReadOneLineFile(myfile, strict=True)
95 datalax = utils.ReadOneLineFile(myfile, strict=False)
96 self.assertEqual(dummydata, datastrict)
97 self.assertEqual(dummydata, datalax)
99 def testNewline(self):
100 myfile = self._CreateTempFile()
102 for nl in ["", "\n", "\r\n"]:
103 dummydata = "%s%s" % (myline, nl)
104 utils.WriteFile(myfile, data=dummydata)
105 datalax = utils.ReadOneLineFile(myfile, strict=False)
106 self.assertEqual(myline, datalax)
107 datastrict = utils.ReadOneLineFile(myfile, strict=True)
108 self.assertEqual(myline, datastrict)
110 def testWhitespaceAndMultipleLines(self):
111 myfile = self._CreateTempFile()
112 for nl in ["", "\n", "\r\n"]:
113 for ws in [" ", "\t", "\t\t \t", "\t "]:
114 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115 utils.WriteFile(myfile, data=dummydata)
116 datalax = utils.ReadOneLineFile(myfile, strict=False)
118 self.assert_(set("\r\n") & set(dummydata))
119 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
121 explen = len("Foo bar baz ") + len(ws)
122 self.assertEqual(len(datalax), explen)
123 self.assertEqual(datalax, dummydata[:explen])
124 self.assertFalse(set("\r\n") & set(datalax))
126 datastrict = utils.ReadOneLineFile(myfile, strict=True)
127 self.assertEqual(dummydata, datastrict)
128 self.assertEqual(dummydata, datalax)
130 def testEmptylines(self):
131 myfile = self._CreateTempFile()
133 for nl in ["\n", "\r\n"]:
134 for ol in ["", "otherline"]:
135 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136 utils.WriteFile(myfile, data=dummydata)
137 self.assert_(set("\r\n") & set(dummydata))
138 datalax = utils.ReadOneLineFile(myfile, strict=False)
139 self.assertEqual(myline, datalax)
141 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
144 datastrict = utils.ReadOneLineFile(myfile, strict=True)
145 self.assertEqual(myline, datastrict)
147 def testEmptyfile(self):
148 myfile = self._CreateTempFile()
149 self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
152 class TestTimestampForFilename(unittest.TestCase):
154 self.assert_("." not in utils.TimestampForFilename())
155 self.assert_(":" not in utils.TimestampForFilename())
158 class TestCreateBackup(testutils.GanetiTestCase):
160 testutils.GanetiTestCase.setUp(self)
162 self.tmpdir = tempfile.mkdtemp()
165 testutils.GanetiTestCase.tearDown(self)
167 shutil.rmtree(self.tmpdir)
170 filename = utils.PathJoin(self.tmpdir, "config.data")
171 utils.WriteFile(filename, data="")
172 bname = utils.CreateBackup(filename)
173 self.assertFileContent(bname, "")
174 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175 utils.CreateBackup(filename)
176 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177 utils.CreateBackup(filename)
178 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
180 fifoname = utils.PathJoin(self.tmpdir, "fifo")
182 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
184 def testContent(self):
186 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187 for rep in [1, 2, 10, 127]:
188 testdata = data * rep
190 filename = utils.PathJoin(self.tmpdir, "test.data_")
191 utils.WriteFile(filename, data=testdata)
192 self.assertFileContent(filename, testdata)
195 bname = utils.CreateBackup(filename)
197 self.assertFileContent(bname, testdata)
198 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
201 class TestListVisibleFiles(unittest.TestCase):
202 """Test case for ListVisibleFiles"""
205 self.path = tempfile.mkdtemp()
208 shutil.rmtree(self.path)
210 def _CreateFiles(self, files):
212 utils.WriteFile(os.path.join(self.path, name), data="test")
214 def _test(self, files, expected):
215 self._CreateFiles(files)
216 found = utils.ListVisibleFiles(self.path)
217 self.assertEqual(set(found), set(expected))
219 def testAllVisible(self):
220 files = ["a", "b", "c"]
222 self._test(files, expected)
224 def testNoneVisible(self):
225 files = [".a", ".b", ".c"]
227 self._test(files, expected)
229 def testSomeVisible(self):
230 files = ["a", "b", ".c"]
231 expected = ["a", "b"]
232 self._test(files, expected)
234 def testNonAbsolutePath(self):
235 self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
238 def testNonNormalizedPath(self):
239 self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
242 def testMountpoint(self):
243 lvfmp_fn = compat.partial(utils.ListVisibleFiles,
244 _is_mountpoint=lambda _: True)
245 self.assertEqual(lvfmp_fn(self.path), [])
247 # Create "lost+found" as a regular file
248 self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
249 self.assertEqual(set(lvfmp_fn(self.path)),
250 set(["foo", "bar", "lost+found"]))
252 # Replace "lost+found" with a directory
253 laf_path = utils.PathJoin(self.path, "lost+found")
254 utils.RemoveFile(laf_path)
256 self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
258 def testLostAndFoundNoMountpoint(self):
259 files = ["foo", "bar", ".Hello World", "lost+found"]
260 expected = ["foo", "bar", "lost+found"]
261 self._test(files, expected)
264 class TestWriteFile(testutils.GanetiTestCase):
266 testutils.GanetiTestCase.setUp(self)
268 self.tfile = tempfile.NamedTemporaryFile()
270 self.did_post = False
271 self.did_write = False
274 testutils.GanetiTestCase.tearDown(self)
276 shutil.rmtree(self.tmpdir)
278 def markPre(self, fd):
281 def markPost(self, fd):
284 def markWrite(self, fd):
285 self.did_write = True
289 utils.WriteFile(self.tfile.name, data=data)
290 self.assertEqual(utils.ReadFile(self.tfile.name), data)
292 def testWriteSimpleUnicode(self):
294 utils.WriteFile(self.tfile.name, data=data)
295 self.assertEqual(utils.ReadFile(self.tfile.name), data)
297 def testErrors(self):
298 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
299 self.tfile.name, data="test", fn=lambda fd: None)
300 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
301 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
302 self.tfile.name, data="test", atime=0)
303 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
304 mode=0400, keep_perms=utils.KP_ALWAYS)
305 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
306 uid=0, keep_perms=utils.KP_ALWAYS)
307 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
308 gid=0, keep_perms=utils.KP_ALWAYS)
309 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
310 mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
312 def testPreWrite(self):
313 utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
314 self.assertTrue(self.did_pre)
315 self.assertFalse(self.did_post)
316 self.assertFalse(self.did_write)
318 def testPostWrite(self):
319 utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
320 self.assertFalse(self.did_pre)
321 self.assertTrue(self.did_post)
322 self.assertFalse(self.did_write)
324 def testWriteFunction(self):
325 utils.WriteFile(self.tfile.name, fn=self.markWrite)
326 self.assertFalse(self.did_pre)
327 self.assertFalse(self.did_post)
328 self.assertTrue(self.did_write)
330 def testDryRun(self):
332 self.tfile.write(orig)
334 utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
335 self.assertEqual(utils.ReadFile(self.tfile.name), orig)
339 for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
340 (int(time.time()), 5000)]:
341 utils.WriteFile(f, data="hello", atime=at, mtime=mt)
343 self.assertEqual(st.st_atime, at)
344 self.assertEqual(st.st_mtime, mt)
346 def testNoClose(self):
348 self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
349 fd = utils.WriteFile(self.tfile.name, data=data, close=False)
352 self.assertEqual(os.read(fd, 4096), data)
356 def testNoLeftovers(self):
357 self.tmpdir = tempfile.mkdtemp()
358 self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
361 self.assertEqual(os.listdir(self.tmpdir), ["test"])
363 def testFailRename(self):
364 self.tmpdir = tempfile.mkdtemp()
365 target = utils.PathJoin(self.tmpdir, "target")
367 self.assertRaises(OSError, utils.WriteFile, target, data="abc")
368 self.assertTrue(os.path.isdir(target))
369 self.assertEqual(os.listdir(self.tmpdir), ["target"])
370 self.assertFalse(os.listdir(target))
372 def testFailRenameDryRun(self):
373 self.tmpdir = tempfile.mkdtemp()
374 target = utils.PathJoin(self.tmpdir, "target")
376 self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
377 self.assertTrue(os.path.isdir(target))
378 self.assertEqual(os.listdir(self.tmpdir), ["target"])
379 self.assertFalse(os.listdir(target))
381 def testBackup(self):
382 self.tmpdir = tempfile.mkdtemp()
383 testfile = utils.PathJoin(self.tmpdir, "test")
385 self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
386 self.assertEqual(utils.ReadFile(testfile), "foo")
387 self.assertEqual(os.listdir(self.tmpdir), ["test"])
390 assert os.path.isfile(testfile)
391 self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
392 self.assertEqual(utils.ReadFile(testfile), "bar")
393 self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
394 self.assertTrue("test" in os.listdir(self.tmpdir))
395 self.assertEqual(len(os.listdir(self.tmpdir)), 2)
397 # Write again as dry-run
398 assert os.path.isfile(testfile)
399 self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
402 self.assertEqual(utils.ReadFile(testfile), "bar")
403 self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
404 self.assertTrue("test" in os.listdir(self.tmpdir))
405 self.assertEqual(len(os.listdir(self.tmpdir)), 2)
407 def testFileMode(self):
408 self.tmpdir = tempfile.mkdtemp()
409 target = utils.PathJoin(self.tmpdir, "target")
410 self.assertRaises(OSError, utils.WriteFile, target, data="data",
411 keep_perms=utils.KP_ALWAYS)
412 # All masks have only user bits set, to avoid interactions with umask
413 utils.WriteFile(target, data="data", mode=0200)
414 self.assertFileMode(target, 0200)
415 utils.WriteFile(target, data="data", mode=0400,
416 keep_perms=utils.KP_IF_EXISTS)
417 self.assertFileMode(target, 0200)
418 utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
419 self.assertFileMode(target, 0200)
420 utils.WriteFile(target, data="data", mode=0700)
421 self.assertFileMode(target, 0700)
423 def testNewFileMode(self):
424 self.tmpdir = tempfile.mkdtemp()
425 target = utils.PathJoin(self.tmpdir, "target")
426 utils.WriteFile(target, data="data", mode=0400,
427 keep_perms=utils.KP_IF_EXISTS)
428 self.assertFileMode(target, 0400)
430 class TestFileID(testutils.GanetiTestCase):
431 def testEquality(self):
432 name = self._CreateTempFile()
433 oldi = utils.GetFileID(path=name)
434 self.failUnless(utils.VerifyFileID(oldi, oldi))
436 def testUpdate(self):
437 name = self._CreateTempFile()
438 oldi = utils.GetFileID(path=name)
439 fd = os.open(name, os.O_RDWR)
441 newi = utils.GetFileID(fd=fd)
442 self.failUnless(utils.VerifyFileID(oldi, newi))
443 self.failUnless(utils.VerifyFileID(newi, oldi))
447 def testWriteFile(self):
448 name = self._CreateTempFile()
449 oldi = utils.GetFileID(path=name)
451 os.utime(name, (mtime + 10, mtime + 10))
452 self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
454 os.utime(name, (mtime - 10, mtime - 10))
455 utils.SafeWriteFile(name, oldi, data="")
456 oldi = utils.GetFileID(path=name)
458 os.utime(name, (mtime + 10, mtime + 10))
459 # this doesn't raise, since we passed None
460 utils.SafeWriteFile(name, None, data="")
463 t = tempfile.NamedTemporaryFile()
464 self.assertRaises(errors.ProgrammerError, utils.GetFileID,
465 path=t.name, fd=t.fileno())
468 class TestRemoveFile(unittest.TestCase):
469 """Test case for the RemoveFile function"""
472 """Create a temp dir and file for each case"""
473 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
474 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
478 if os.path.exists(self.tmpfile):
479 os.unlink(self.tmpfile)
480 os.rmdir(self.tmpdir)
482 def testIgnoreDirs(self):
483 """Test that RemoveFile() ignores directories"""
484 self.assertEqual(None, utils.RemoveFile(self.tmpdir))
486 def testIgnoreNotExisting(self):
487 """Test that RemoveFile() ignores non-existing files"""
488 utils.RemoveFile(self.tmpfile)
489 utils.RemoveFile(self.tmpfile)
491 def testRemoveFile(self):
492 """Test that RemoveFile does remove a file"""
493 utils.RemoveFile(self.tmpfile)
494 if os.path.exists(self.tmpfile):
495 self.fail("File '%s' not removed" % self.tmpfile)
497 def testRemoveSymlink(self):
498 """Test that RemoveFile does remove symlinks"""
499 symlink = self.tmpdir + "/symlink"
500 os.symlink("no-such-file", symlink)
501 utils.RemoveFile(symlink)
502 if os.path.exists(symlink):
503 self.fail("File '%s' not removed" % symlink)
504 os.symlink(self.tmpfile, symlink)
505 utils.RemoveFile(symlink)
506 if os.path.exists(symlink):
507 self.fail("File '%s' not removed" % symlink)
510 class TestRemoveDir(unittest.TestCase):
512 self.tmpdir = tempfile.mkdtemp()
516 shutil.rmtree(self.tmpdir)
517 except EnvironmentError:
520 def testEmptyDir(self):
521 utils.RemoveDir(self.tmpdir)
522 self.assertFalse(os.path.isdir(self.tmpdir))
524 def testNonEmptyDir(self):
525 self.tmpfile = os.path.join(self.tmpdir, "test1")
526 open(self.tmpfile, "w").close()
527 self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
530 class TestRename(unittest.TestCase):
531 """Test case for RenameFile"""
534 """Create a temporary directory"""
535 self.tmpdir = tempfile.mkdtemp()
536 self.tmpfile = os.path.join(self.tmpdir, "test1")
539 open(self.tmpfile, "w").close()
542 """Remove temporary directory"""
543 shutil.rmtree(self.tmpdir)
545 def testSimpleRename1(self):
546 """Simple rename 1"""
547 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
548 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
550 def testSimpleRename2(self):
551 """Simple rename 2"""
552 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
554 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
556 def testRenameMkdir(self):
557 """Rename with mkdir"""
558 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
560 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
561 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
563 self.assertRaises(EnvironmentError, utils.RenameFile,
564 os.path.join(self.tmpdir, "test/xyz"),
565 os.path.join(self.tmpdir, "test/foo/bar/baz"),
568 self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
569 self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
570 self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
571 "test/foo/bar/baz")))
574 class TestMakedirs(unittest.TestCase):
576 self.tmpdir = tempfile.mkdtemp()
579 shutil.rmtree(self.tmpdir)
581 def testNonExisting(self):
582 path = utils.PathJoin(self.tmpdir, "foo")
584 self.assert_(os.path.isdir(path))
586 def testExisting(self):
587 path = utils.PathJoin(self.tmpdir, "foo")
590 self.assert_(os.path.isdir(path))
592 def testRecursiveNonExisting(self):
593 path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
595 self.assert_(os.path.isdir(path))
597 def testRecursiveExisting(self):
598 path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
599 self.assertFalse(os.path.exists(path))
600 os.mkdir(utils.PathJoin(self.tmpdir, "B"))
602 self.assert_(os.path.isdir(path))
605 class TestEnsureDirs(unittest.TestCase):
606 """Tests for EnsureDirs"""
609 self.dir = tempfile.mkdtemp()
610 self.old_umask = os.umask(0777)
612 def testEnsureDirs(self):
614 (utils.PathJoin(self.dir, "foo"), 0777),
615 (utils.PathJoin(self.dir, "bar"), 0000),
617 self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
618 self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
621 os.rmdir(utils.PathJoin(self.dir, "foo"))
622 os.rmdir(utils.PathJoin(self.dir, "bar"))
624 os.umask(self.old_umask)
627 class TestIsNormAbsPath(unittest.TestCase):
628 """Testing case for IsNormAbsPath"""
630 def _pathTestHelper(self, path, result):
632 self.assert_(utils.IsNormAbsPath(path),
633 "Path %s should result absolute and normalized" % path)
635 self.assertFalse(utils.IsNormAbsPath(path),
636 "Path %s should not result absolute and normalized" % path)
639 self._pathTestHelper("/etc", True)
640 self._pathTestHelper("/srv", True)
641 self._pathTestHelper("etc", False)
642 self._pathTestHelper("/etc/../root", False)
643 self._pathTestHelper("/etc/", False)
646 class TestIsBelowDir(unittest.TestCase):
647 """Testing case for IsBelowDir"""
649 def testExactlyTheSame(self):
650 self.assertFalse(utils.IsBelowDir("/a/b", "/a/b"))
651 self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/"))
652 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b"))
653 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/"))
655 def testSamePrefix(self):
656 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
657 self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
659 def testSamePrefixButDifferentDir(self):
660 self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
661 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
663 def testSamePrefixButDirTraversal(self):
664 self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
665 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
667 def testSamePrefixAndTraversal(self):
668 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
669 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
670 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
672 def testBothAbsPath(self):
673 self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
674 self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
675 self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
678 class TestPathJoin(unittest.TestCase):
679 """Testing case for PathJoin"""
681 def testBasicItems(self):
682 mlist = ["/a", "b", "c"]
683 self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
685 def testNonAbsPrefix(self):
686 self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
688 def testBackTrack(self):
689 self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
691 def testMultiAbs(self):
692 self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
695 class TestTailFile(testutils.GanetiTestCase):
696 """Test case for the TailFile function"""
699 fname = self._CreateTempFile()
700 self.failUnlessEqual(utils.TailFile(fname), [])
701 self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
703 def testAllLines(self):
704 data = ["test %d" % i for i in range(30)]
706 fname = self._CreateTempFile()
707 fd = open(fname, "w")
708 fd.write("\n".join(data[:i]))
712 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
714 def testPartialLines(self):
715 data = ["test %d" % i for i in range(30)]
716 fname = self._CreateTempFile()
717 fd = open(fname, "w")
718 fd.write("\n".join(data))
721 for i in range(1, 30):
722 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
724 def testBigFile(self):
725 data = ["test %d" % i for i in range(30)]
726 fname = self._CreateTempFile()
727 fd = open(fname, "w")
728 fd.write("X" * 1048576)
730 fd.write("\n".join(data))
733 for i in range(1, 30):
734 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
737 class TestPidFileFunctions(unittest.TestCase):
738 """Tests for WritePidFile and ReadPidFile"""
741 self.dir = tempfile.mkdtemp()
742 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
744 def testPidFileFunctions(self):
745 pid_file = self.f_dpn('test')
746 fd = utils.WritePidFile(self.f_dpn('test'))
747 self.failUnless(os.path.exists(pid_file),
748 "PID file should have been created")
749 read_pid = utils.ReadPidFile(pid_file)
750 self.failUnlessEqual(read_pid, os.getpid())
751 self.failUnless(utils.IsProcessAlive(read_pid))
752 self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
755 utils.RemoveFile(self.f_dpn("test"))
756 self.failIf(os.path.exists(pid_file),
757 "PID file should not exist anymore")
758 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
759 "ReadPidFile should return 0 for missing pid file")
760 fh = open(pid_file, "w")
763 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
764 "ReadPidFile should return 0 for invalid pid file")
765 # but now, even with the file existing, we should be able to lock it
766 fd = utils.WritePidFile(self.f_dpn('test'))
768 utils.RemoveFile(self.f_dpn("test"))
769 self.failIf(os.path.exists(pid_file),
770 "PID file should not exist anymore")
773 pid_file = self.f_dpn('child')
774 r_fd, w_fd = os.pipe()
776 if new_pid == 0: #child
777 utils.WritePidFile(self.f_dpn('child'))
782 # else we are in the parent
783 # wait until the child has written the pid file
785 read_pid = utils.ReadPidFile(pid_file)
786 self.failUnlessEqual(read_pid, new_pid)
787 self.failUnless(utils.IsProcessAlive(new_pid))
789 # Try writing to locked file
791 utils.WritePidFile(pid_file)
792 except errors.PidFileLockError, err:
794 self.assertTrue(errmsg.endswith(" %s" % new_pid),
795 msg=("Error message ('%s') didn't contain correct"
796 " PID (%s)" % (errmsg, new_pid)))
798 self.fail("Writing to locked file didn't fail")
800 utils.KillProcess(new_pid, waitpid=True)
801 self.failIf(utils.IsProcessAlive(new_pid))
802 utils.RemoveFile(self.f_dpn('child'))
803 self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
805 def testExceptionType(self):
806 # Make sure the PID lock error is a subclass of LockError in case some code
808 self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
811 shutil.rmtree(self.dir)
814 class TestSshKeys(testutils.GanetiTestCase):
815 """Test case for the AddAuthorizedKey function"""
817 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
818 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
819 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
822 testutils.GanetiTestCase.setUp(self)
823 self.tmpname = self._CreateTempFile()
824 handle = open(self.tmpname, 'w')
826 handle.write("%s\n" % TestSshKeys.KEY_A)
827 handle.write("%s\n" % TestSshKeys.KEY_B)
831 def testAddingNewKey(self):
832 utils.AddAuthorizedKey(self.tmpname,
833 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
835 self.assertFileContent(self.tmpname,
836 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
837 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
838 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
839 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
841 def testAddingAlmostButNotCompletelyTheSameKey(self):
842 utils.AddAuthorizedKey(self.tmpname,
843 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
845 self.assertFileContent(self.tmpname,
846 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
847 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
848 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
849 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
851 def testAddingExistingKeyWithSomeMoreSpaces(self):
852 utils.AddAuthorizedKey(self.tmpname,
853 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
855 self.assertFileContent(self.tmpname,
856 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
857 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
858 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
860 def testRemovingExistingKeyWithSomeMoreSpaces(self):
861 utils.RemoveAuthorizedKey(self.tmpname,
862 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
864 self.assertFileContent(self.tmpname,
865 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
866 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
868 def testRemovingNonExistingKey(self):
869 utils.RemoveAuthorizedKey(self.tmpname,
870 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
872 self.assertFileContent(self.tmpname,
873 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
874 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
875 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
878 class TestNewUUID(unittest.TestCase):
879 """Test case for NewUUID"""
882 self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
885 def _MockStatResult(cb, mode, uid, gid):
897 def _RaiseNoEntError():
898 raise EnvironmentError(errno.ENOENT, "not found")
901 def _OtherStatRaise():
902 raise EnvironmentError()
905 class TestPermissionEnforcements(unittest.TestCase):
912 self._chown_calls = []
913 self._chmod_calls = []
914 self._mkdir_calls = []
917 self.assertRaises(IndexError, self._mkdir_calls.pop)
918 self.assertRaises(IndexError, self._chmod_calls.pop)
919 self.assertRaises(IndexError, self._chown_calls.pop)
921 def _FakeMkdir(self, path):
922 self._mkdir_calls.append(path)
924 def _FakeChown(self, path, uid, gid):
925 self._chown_calls.append((path, uid, gid))
927 def _ChmodWrapper(self, cb):
929 self._chmod_calls.append((path, mode))
934 def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
935 self.assertEqual(path, "/ganeti-qa-non-test")
936 self.assertEqual(mode, 0700)
937 self.assertEqual(uid, self.UID_A)
938 self.assertEqual(gid, self.GID_A)
940 def testMakeDirWithPerm(self):
941 is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
942 utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
943 _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
945 def testDirErrors(self):
946 self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
947 "/ganeti-qa-non-test", 0700, 0, 0,
948 _lstat_fn=_MockStatResult(None, 0, 0, 0))
949 self.assertRaises(IndexError, self._mkdir_calls.pop)
951 other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
952 self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
953 "/ganeti-qa-non-test", 0700, 0, 0,
954 _lstat_fn=other_stat_raise)
955 self.assertRaises(IndexError, self._mkdir_calls.pop)
957 non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
958 utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
959 _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
960 _perm_fn=self._VerifyPerm)
961 self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
963 def testEnforcePermissionNoEnt(self):
964 self.assertRaises(errors.GenericError, utils.EnforcePermission,
965 "/ganeti-qa-non-test", 0600,
966 _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
967 _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
969 def testEnforcePermissionNoEntMustNotExist(self):
970 utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
971 _chmod_fn=NotImplemented,
972 _chown_fn=NotImplemented,
973 _stat_fn=_MockStatResult(_RaiseNoEntError,
976 def testEnforcePermissionOtherErrorMustNotExist(self):
977 self.assertRaises(errors.GenericError, utils.EnforcePermission,
978 "/ganeti-qa-non-test", 0600, must_exist=False,
979 _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
980 _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
982 def testEnforcePermissionNoChanges(self):
983 utils.EnforcePermission("/ganeti-qa-non-test", 0600,
984 _stat_fn=_MockStatResult(None, 0600, 0, 0),
985 _chmod_fn=self._ChmodWrapper(None),
986 _chown_fn=self._FakeChown)
988 def testEnforcePermissionChangeMode(self):
989 utils.EnforcePermission("/ganeti-qa-non-test", 0444,
990 _stat_fn=_MockStatResult(None, 0600, 0, 0),
991 _chmod_fn=self._ChmodWrapper(None),
992 _chown_fn=self._FakeChown)
993 self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
995 def testEnforcePermissionSetUidGid(self):
996 utils.EnforcePermission("/ganeti-qa-non-test", 0600,
997 uid=self.UID_B, gid=self.GID_B,
998 _stat_fn=_MockStatResult(None, 0600,
1001 _chmod_fn=self._ChmodWrapper(None),
1002 _chown_fn=self._FakeChown)
1003 self.assertEqual(self._chown_calls.pop(0),
1004 ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
1007 if __name__ == "__main__":
1008 testutils.GanetiTestProgram()