4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.io"""
34 from ganeti import constants
35 from ganeti import utils
36 from ganeti import compat
37 from ganeti import errors
42 class TestReadFile(testutils.GanetiTestCase):
43 def testReadAll(self):
44 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45 self.assertEqual(len(data), 814)
49 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
51 def testReadSize(self):
52 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
54 self.assertEqual(len(data), 100)
58 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
60 def testCallback(self):
62 self.assertEqual(fh.tell(), 0)
63 data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64 self.assertEqual(len(data), 814)
67 self.assertRaises(EnvironmentError, utils.ReadFile,
68 "/dev/null/does-not-exist")
71 class TestReadOneLineFile(testutils.GanetiTestCase):
73 testutils.GanetiTestCase.setUp(self)
75 def testDefault(self):
76 data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77 self.assertEqual(len(data), 27)
78 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
80 def testNotStrict(self):
81 data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
83 self.assertEqual(len(data), 27)
84 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
86 def testStrictFailure(self):
87 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88 self._TestDataFilename("cert1.pem"), strict=True)
90 def testLongLine(self):
91 dummydata = (1024 * "Hello World! ")
92 myfile = self._CreateTempFile()
93 utils.WriteFile(myfile, data=dummydata)
94 datastrict = utils.ReadOneLineFile(myfile, strict=True)
95 datalax = utils.ReadOneLineFile(myfile, strict=False)
96 self.assertEqual(dummydata, datastrict)
97 self.assertEqual(dummydata, datalax)
99 def testNewline(self):
100 myfile = self._CreateTempFile()
102 for nl in ["", "\n", "\r\n"]:
103 dummydata = "%s%s" % (myline, nl)
104 utils.WriteFile(myfile, data=dummydata)
105 datalax = utils.ReadOneLineFile(myfile, strict=False)
106 self.assertEqual(myline, datalax)
107 datastrict = utils.ReadOneLineFile(myfile, strict=True)
108 self.assertEqual(myline, datastrict)
110 def testWhitespaceAndMultipleLines(self):
111 myfile = self._CreateTempFile()
112 for nl in ["", "\n", "\r\n"]:
113 for ws in [" ", "\t", "\t\t \t", "\t "]:
114 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115 utils.WriteFile(myfile, data=dummydata)
116 datalax = utils.ReadOneLineFile(myfile, strict=False)
118 self.assert_(set("\r\n") & set(dummydata))
119 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
121 explen = len("Foo bar baz ") + len(ws)
122 self.assertEqual(len(datalax), explen)
123 self.assertEqual(datalax, dummydata[:explen])
124 self.assertFalse(set("\r\n") & set(datalax))
126 datastrict = utils.ReadOneLineFile(myfile, strict=True)
127 self.assertEqual(dummydata, datastrict)
128 self.assertEqual(dummydata, datalax)
130 def testEmptylines(self):
131 myfile = self._CreateTempFile()
133 for nl in ["\n", "\r\n"]:
134 for ol in ["", "otherline"]:
135 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136 utils.WriteFile(myfile, data=dummydata)
137 self.assert_(set("\r\n") & set(dummydata))
138 datalax = utils.ReadOneLineFile(myfile, strict=False)
139 self.assertEqual(myline, datalax)
141 self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
144 datastrict = utils.ReadOneLineFile(myfile, strict=True)
145 self.assertEqual(myline, datastrict)
147 def testEmptyfile(self):
148 myfile = self._CreateTempFile()
149 self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
152 class TestTimestampForFilename(unittest.TestCase):
154 self.assert_("." not in utils.TimestampForFilename())
155 self.assert_(":" not in utils.TimestampForFilename())
158 class TestCreateBackup(testutils.GanetiTestCase):
160 testutils.GanetiTestCase.setUp(self)
162 self.tmpdir = tempfile.mkdtemp()
165 testutils.GanetiTestCase.tearDown(self)
167 shutil.rmtree(self.tmpdir)
170 filename = utils.PathJoin(self.tmpdir, "config.data")
171 utils.WriteFile(filename, data="")
172 bname = utils.CreateBackup(filename)
173 self.assertFileContent(bname, "")
174 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175 utils.CreateBackup(filename)
176 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177 utils.CreateBackup(filename)
178 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
180 fifoname = utils.PathJoin(self.tmpdir, "fifo")
182 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
184 def testContent(self):
186 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187 for rep in [1, 2, 10, 127]:
188 testdata = data * rep
190 filename = utils.PathJoin(self.tmpdir, "test.data_")
191 utils.WriteFile(filename, data=testdata)
192 self.assertFileContent(filename, testdata)
195 bname = utils.CreateBackup(filename)
197 self.assertFileContent(bname, testdata)
198 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
201 class TestListVisibleFiles(unittest.TestCase):
202 """Test case for ListVisibleFiles"""
205 self.path = tempfile.mkdtemp()
208 shutil.rmtree(self.path)
210 def _CreateFiles(self, files):
212 utils.WriteFile(os.path.join(self.path, name), data="test")
214 def _test(self, files, expected):
215 self._CreateFiles(files)
216 found = utils.ListVisibleFiles(self.path)
217 self.assertEqual(set(found), set(expected))
219 def testAllVisible(self):
220 files = ["a", "b", "c"]
222 self._test(files, expected)
224 def testNoneVisible(self):
225 files = [".a", ".b", ".c"]
227 self._test(files, expected)
229 def testSomeVisible(self):
230 files = ["a", "b", ".c"]
231 expected = ["a", "b"]
232 self._test(files, expected)
234 def testNonAbsolutePath(self):
235 self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
238 def testNonNormalizedPath(self):
239 self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
242 def testMountpoint(self):
243 lvfmp_fn = compat.partial(utils.ListVisibleFiles,
244 _is_mountpoint=lambda _: True)
245 self.assertEqual(lvfmp_fn(self.path), [])
247 # Create "lost+found" as a regular file
248 self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
249 self.assertEqual(set(lvfmp_fn(self.path)),
250 set(["foo", "bar", "lost+found"]))
252 # Replace "lost+found" with a directory
253 laf_path = utils.PathJoin(self.path, "lost+found")
254 utils.RemoveFile(laf_path)
256 self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
258 def testLostAndFoundNoMountpoint(self):
259 files = ["foo", "bar", ".Hello World", "lost+found"]
260 expected = ["foo", "bar", "lost+found"]
261 self._test(files, expected)
264 class TestWriteFile(testutils.GanetiTestCase):
266 testutils.GanetiTestCase.setUp(self)
268 self.tfile = tempfile.NamedTemporaryFile()
270 self.did_post = False
271 self.did_write = False
274 testutils.GanetiTestCase.tearDown(self)
276 shutil.rmtree(self.tmpdir)
278 def markPre(self, fd):
281 def markPost(self, fd):
284 def markWrite(self, fd):
285 self.did_write = True
289 utils.WriteFile(self.tfile.name, data=data)
290 self.assertEqual(utils.ReadFile(self.tfile.name), data)
292 def testWriteSimpleUnicode(self):
294 utils.WriteFile(self.tfile.name, data=data)
295 self.assertEqual(utils.ReadFile(self.tfile.name), data)
297 def testErrors(self):
298 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
299 self.tfile.name, data="test", fn=lambda fd: None)
300 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
301 self.assertRaises(errors.ProgrammerError, utils.WriteFile,
302 self.tfile.name, data="test", atime=0)
303 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
304 mode=0400, keep_perms=utils.KP_ALWAYS)
305 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
306 uid=0, keep_perms=utils.KP_ALWAYS)
307 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
308 gid=0, keep_perms=utils.KP_ALWAYS)
309 self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
310 mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
312 def testPreWrite(self):
313 utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
314 self.assertTrue(self.did_pre)
315 self.assertFalse(self.did_post)
316 self.assertFalse(self.did_write)
318 def testPostWrite(self):
319 utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
320 self.assertFalse(self.did_pre)
321 self.assertTrue(self.did_post)
322 self.assertFalse(self.did_write)
324 def testWriteFunction(self):
325 utils.WriteFile(self.tfile.name, fn=self.markWrite)
326 self.assertFalse(self.did_pre)
327 self.assertFalse(self.did_post)
328 self.assertTrue(self.did_write)
330 def testDryRun(self):
332 self.tfile.write(orig)
334 utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
335 self.assertEqual(utils.ReadFile(self.tfile.name), orig)
339 for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
340 (int(time.time()), 5000)]:
341 utils.WriteFile(f, data="hello", atime=at, mtime=mt)
343 self.assertEqual(st.st_atime, at)
344 self.assertEqual(st.st_mtime, mt)
346 def testNoClose(self):
348 self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
349 fd = utils.WriteFile(self.tfile.name, data=data, close=False)
352 self.assertEqual(os.read(fd, 4096), data)
356 def testNoLeftovers(self):
357 self.tmpdir = tempfile.mkdtemp()
358 self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
361 self.assertEqual(os.listdir(self.tmpdir), ["test"])
363 def testFailRename(self):
364 self.tmpdir = tempfile.mkdtemp()
365 target = utils.PathJoin(self.tmpdir, "target")
367 self.assertRaises(OSError, utils.WriteFile, target, data="abc")
368 self.assertTrue(os.path.isdir(target))
369 self.assertEqual(os.listdir(self.tmpdir), ["target"])
370 self.assertFalse(os.listdir(target))
372 def testFailRenameDryRun(self):
373 self.tmpdir = tempfile.mkdtemp()
374 target = utils.PathJoin(self.tmpdir, "target")
376 self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
377 self.assertTrue(os.path.isdir(target))
378 self.assertEqual(os.listdir(self.tmpdir), ["target"])
379 self.assertFalse(os.listdir(target))
381 def testBackup(self):
382 self.tmpdir = tempfile.mkdtemp()
383 testfile = utils.PathJoin(self.tmpdir, "test")
385 self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
386 self.assertEqual(utils.ReadFile(testfile), "foo")
387 self.assertEqual(os.listdir(self.tmpdir), ["test"])
390 assert os.path.isfile(testfile)
391 self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
392 self.assertEqual(utils.ReadFile(testfile), "bar")
393 self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
394 self.assertTrue("test" in os.listdir(self.tmpdir))
395 self.assertEqual(len(os.listdir(self.tmpdir)), 2)
397 # Write again as dry-run
398 assert os.path.isfile(testfile)
399 self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
402 self.assertEqual(utils.ReadFile(testfile), "bar")
403 self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
404 self.assertTrue("test" in os.listdir(self.tmpdir))
405 self.assertEqual(len(os.listdir(self.tmpdir)), 2)
407 def testFileMode(self):
408 self.tmpdir = tempfile.mkdtemp()
409 target = utils.PathJoin(self.tmpdir, "target")
410 self.assertRaises(OSError, utils.WriteFile, target, data="data",
411 keep_perms=utils.KP_ALWAYS)
412 # All masks have only user bits set, to avoid interactions with umask
413 utils.WriteFile(target, data="data", mode=0200)
414 self.assertFileMode(target, 0200)
415 utils.WriteFile(target, data="data", mode=0400,
416 keep_perms=utils.KP_IF_EXISTS)
417 self.assertFileMode(target, 0200)
418 utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
419 self.assertFileMode(target, 0200)
420 utils.WriteFile(target, data="data", mode=0700)
421 self.assertFileMode(target, 0700)
423 def testNewFileMode(self):
424 self.tmpdir = tempfile.mkdtemp()
425 target = utils.PathJoin(self.tmpdir, "target")
426 utils.WriteFile(target, data="data", mode=0400,
427 keep_perms=utils.KP_IF_EXISTS)
428 self.assertFileMode(target, 0400)
430 class TestFileID(testutils.GanetiTestCase):
431 def testEquality(self):
432 name = self._CreateTempFile()
433 oldi = utils.GetFileID(path=name)
434 self.failUnless(utils.VerifyFileID(oldi, oldi))
436 def testUpdate(self):
437 name = self._CreateTempFile()
438 oldi = utils.GetFileID(path=name)
439 fd = os.open(name, os.O_RDWR)
441 newi = utils.GetFileID(fd=fd)
442 self.failUnless(utils.VerifyFileID(oldi, newi))
443 self.failUnless(utils.VerifyFileID(newi, oldi))
447 def testWriteFile(self):
448 name = self._CreateTempFile()
449 oldi = utils.GetFileID(path=name)
451 os.utime(name, (mtime + 10, mtime + 10))
452 self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
454 os.utime(name, (mtime - 10, mtime - 10))
455 utils.SafeWriteFile(name, oldi, data="")
456 oldi = utils.GetFileID(path=name)
458 os.utime(name, (mtime + 10, mtime + 10))
459 # this doesn't raise, since we passed None
460 utils.SafeWriteFile(name, None, data="")
463 t = tempfile.NamedTemporaryFile()
464 self.assertRaises(errors.ProgrammerError, utils.GetFileID,
465 path=t.name, fd=t.fileno())
468 class TestRemoveFile(unittest.TestCase):
469 """Test case for the RemoveFile function"""
472 """Create a temp dir and file for each case"""
473 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
474 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
478 if os.path.exists(self.tmpfile):
479 os.unlink(self.tmpfile)
480 os.rmdir(self.tmpdir)
482 def testIgnoreDirs(self):
483 """Test that RemoveFile() ignores directories"""
484 self.assertEqual(None, utils.RemoveFile(self.tmpdir))
486 def testIgnoreNotExisting(self):
487 """Test that RemoveFile() ignores non-existing files"""
488 utils.RemoveFile(self.tmpfile)
489 utils.RemoveFile(self.tmpfile)
491 def testRemoveFile(self):
492 """Test that RemoveFile does remove a file"""
493 utils.RemoveFile(self.tmpfile)
494 if os.path.exists(self.tmpfile):
495 self.fail("File '%s' not removed" % self.tmpfile)
497 def testRemoveSymlink(self):
498 """Test that RemoveFile does remove symlinks"""
499 symlink = self.tmpdir + "/symlink"
500 os.symlink("no-such-file", symlink)
501 utils.RemoveFile(symlink)
502 if os.path.exists(symlink):
503 self.fail("File '%s' not removed" % symlink)
504 os.symlink(self.tmpfile, symlink)
505 utils.RemoveFile(symlink)
506 if os.path.exists(symlink):
507 self.fail("File '%s' not removed" % symlink)
510 class TestRemoveDir(unittest.TestCase):
512 self.tmpdir = tempfile.mkdtemp()
516 shutil.rmtree(self.tmpdir)
517 except EnvironmentError:
520 def testEmptyDir(self):
521 utils.RemoveDir(self.tmpdir)
522 self.assertFalse(os.path.isdir(self.tmpdir))
524 def testNonEmptyDir(self):
525 self.tmpfile = os.path.join(self.tmpdir, "test1")
526 open(self.tmpfile, "w").close()
527 self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
530 class TestRename(unittest.TestCase):
531 """Test case for RenameFile"""
534 """Create a temporary directory"""
535 self.tmpdir = tempfile.mkdtemp()
536 self.tmpfile = os.path.join(self.tmpdir, "test1")
539 open(self.tmpfile, "w").close()
542 """Remove temporary directory"""
543 shutil.rmtree(self.tmpdir)
545 def testSimpleRename1(self):
546 """Simple rename 1"""
547 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
548 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
550 def testSimpleRename2(self):
551 """Simple rename 2"""
552 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
554 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
556 def testRenameMkdir(self):
557 """Rename with mkdir"""
558 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
560 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
561 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
563 self.assertRaises(EnvironmentError, utils.RenameFile,
564 os.path.join(self.tmpdir, "test/xyz"),
565 os.path.join(self.tmpdir, "test/foo/bar/baz"),
568 self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
569 self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
570 self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
571 "test/foo/bar/baz")))
574 class TestMakedirs(unittest.TestCase):
576 self.tmpdir = tempfile.mkdtemp()
579 shutil.rmtree(self.tmpdir)
581 def testNonExisting(self):
582 path = utils.PathJoin(self.tmpdir, "foo")
584 self.assert_(os.path.isdir(path))
586 def testExisting(self):
587 path = utils.PathJoin(self.tmpdir, "foo")
590 self.assert_(os.path.isdir(path))
592 def testRecursiveNonExisting(self):
593 path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
595 self.assert_(os.path.isdir(path))
597 def testRecursiveExisting(self):
598 path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
599 self.assertFalse(os.path.exists(path))
600 os.mkdir(utils.PathJoin(self.tmpdir, "B"))
602 self.assert_(os.path.isdir(path))
605 class TestEnsureDirs(unittest.TestCase):
606 """Tests for EnsureDirs"""
609 self.dir = tempfile.mkdtemp()
610 self.old_umask = os.umask(0777)
612 def testEnsureDirs(self):
614 (utils.PathJoin(self.dir, "foo"), 0777),
615 (utils.PathJoin(self.dir, "bar"), 0000),
617 self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
618 self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
621 os.rmdir(utils.PathJoin(self.dir, "foo"))
622 os.rmdir(utils.PathJoin(self.dir, "bar"))
624 os.umask(self.old_umask)
627 class TestIsNormAbsPath(unittest.TestCase):
628 """Testing case for IsNormAbsPath"""
630 def _pathTestHelper(self, path, result):
632 self.assert_(utils.IsNormAbsPath(path),
633 "Path %s should result absolute and normalized" % path)
635 self.assertFalse(utils.IsNormAbsPath(path),
636 "Path %s should not result absolute and normalized" % path)
639 self._pathTestHelper("/etc", True)
640 self._pathTestHelper("/srv", True)
641 self._pathTestHelper("etc", False)
642 self._pathTestHelper("/etc/../root", False)
643 self._pathTestHelper("/etc/", False)
646 class TestIsBelowDir(unittest.TestCase):
647 """Testing case for IsBelowDir"""
649 def testSamePrefix(self):
650 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
651 self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
653 def testSamePrefixButDifferentDir(self):
654 self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
655 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
657 def testSamePrefixButDirTraversal(self):
658 self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
659 self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
661 def testSamePrefixAndTraversal(self):
662 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
663 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
664 self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
666 def testBothAbsPath(self):
667 self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
668 self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
669 self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
672 class TestPathJoin(unittest.TestCase):
673 """Testing case for PathJoin"""
675 def testBasicItems(self):
676 mlist = ["/a", "b", "c"]
677 self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
679 def testNonAbsPrefix(self):
680 self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
682 def testBackTrack(self):
683 self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
685 def testMultiAbs(self):
686 self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
689 class TestTailFile(testutils.GanetiTestCase):
690 """Test case for the TailFile function"""
693 fname = self._CreateTempFile()
694 self.failUnlessEqual(utils.TailFile(fname), [])
695 self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
697 def testAllLines(self):
698 data = ["test %d" % i for i in range(30)]
700 fname = self._CreateTempFile()
701 fd = open(fname, "w")
702 fd.write("\n".join(data[:i]))
706 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
708 def testPartialLines(self):
709 data = ["test %d" % i for i in range(30)]
710 fname = self._CreateTempFile()
711 fd = open(fname, "w")
712 fd.write("\n".join(data))
715 for i in range(1, 30):
716 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
718 def testBigFile(self):
719 data = ["test %d" % i for i in range(30)]
720 fname = self._CreateTempFile()
721 fd = open(fname, "w")
722 fd.write("X" * 1048576)
724 fd.write("\n".join(data))
727 for i in range(1, 30):
728 self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
731 class TestPidFileFunctions(unittest.TestCase):
732 """Tests for WritePidFile and ReadPidFile"""
735 self.dir = tempfile.mkdtemp()
736 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
738 def testPidFileFunctions(self):
739 pid_file = self.f_dpn('test')
740 fd = utils.WritePidFile(self.f_dpn('test'))
741 self.failUnless(os.path.exists(pid_file),
742 "PID file should have been created")
743 read_pid = utils.ReadPidFile(pid_file)
744 self.failUnlessEqual(read_pid, os.getpid())
745 self.failUnless(utils.IsProcessAlive(read_pid))
746 self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
749 utils.RemoveFile(self.f_dpn("test"))
750 self.failIf(os.path.exists(pid_file),
751 "PID file should not exist anymore")
752 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
753 "ReadPidFile should return 0 for missing pid file")
754 fh = open(pid_file, "w")
757 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
758 "ReadPidFile should return 0 for invalid pid file")
759 # but now, even with the file existing, we should be able to lock it
760 fd = utils.WritePidFile(self.f_dpn('test'))
762 utils.RemoveFile(self.f_dpn("test"))
763 self.failIf(os.path.exists(pid_file),
764 "PID file should not exist anymore")
767 pid_file = self.f_dpn('child')
768 r_fd, w_fd = os.pipe()
770 if new_pid == 0: #child
771 utils.WritePidFile(self.f_dpn('child'))
776 # else we are in the parent
777 # wait until the child has written the pid file
779 read_pid = utils.ReadPidFile(pid_file)
780 self.failUnlessEqual(read_pid, new_pid)
781 self.failUnless(utils.IsProcessAlive(new_pid))
783 # Try writing to locked file
785 utils.WritePidFile(pid_file)
786 except errors.PidFileLockError, err:
788 self.assertTrue(errmsg.endswith(" %s" % new_pid),
789 msg=("Error message ('%s') didn't contain correct"
790 " PID (%s)" % (errmsg, new_pid)))
792 self.fail("Writing to locked file didn't fail")
794 utils.KillProcess(new_pid, waitpid=True)
795 self.failIf(utils.IsProcessAlive(new_pid))
796 utils.RemoveFile(self.f_dpn('child'))
797 self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
799 def testExceptionType(self):
800 # Make sure the PID lock error is a subclass of LockError in case some code
802 self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
805 shutil.rmtree(self.dir)
808 class TestSshKeys(testutils.GanetiTestCase):
809 """Test case for the AddAuthorizedKey function"""
811 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
812 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
813 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
816 testutils.GanetiTestCase.setUp(self)
817 self.tmpname = self._CreateTempFile()
818 handle = open(self.tmpname, 'w')
820 handle.write("%s\n" % TestSshKeys.KEY_A)
821 handle.write("%s\n" % TestSshKeys.KEY_B)
825 def testAddingNewKey(self):
826 utils.AddAuthorizedKey(self.tmpname,
827 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
829 self.assertFileContent(self.tmpname,
830 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
831 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
832 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
833 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
835 def testAddingAlmostButNotCompletelyTheSameKey(self):
836 utils.AddAuthorizedKey(self.tmpname,
837 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
839 self.assertFileContent(self.tmpname,
840 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
841 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
842 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
843 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
845 def testAddingExistingKeyWithSomeMoreSpaces(self):
846 utils.AddAuthorizedKey(self.tmpname,
847 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
849 self.assertFileContent(self.tmpname,
850 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
851 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
852 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
854 def testRemovingExistingKeyWithSomeMoreSpaces(self):
855 utils.RemoveAuthorizedKey(self.tmpname,
856 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
858 self.assertFileContent(self.tmpname,
859 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
860 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
862 def testRemovingNonExistingKey(self):
863 utils.RemoveAuthorizedKey(self.tmpname,
864 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
866 self.assertFileContent(self.tmpname,
867 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
868 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
869 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
872 class TestNewUUID(unittest.TestCase):
873 """Test case for NewUUID"""
876 self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
879 def _MockStatResult(cb, mode, uid, gid):
891 def _RaiseNoEntError():
892 raise EnvironmentError(errno.ENOENT, "not found")
895 def _OtherStatRaise():
896 raise EnvironmentError()
899 class TestPermissionEnforcements(unittest.TestCase):
906 self._chown_calls = []
907 self._chmod_calls = []
908 self._mkdir_calls = []
911 self.assertRaises(IndexError, self._mkdir_calls.pop)
912 self.assertRaises(IndexError, self._chmod_calls.pop)
913 self.assertRaises(IndexError, self._chown_calls.pop)
915 def _FakeMkdir(self, path):
916 self._mkdir_calls.append(path)
918 def _FakeChown(self, path, uid, gid):
919 self._chown_calls.append((path, uid, gid))
921 def _ChmodWrapper(self, cb):
923 self._chmod_calls.append((path, mode))
928 def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
929 self.assertEqual(path, "/ganeti-qa-non-test")
930 self.assertEqual(mode, 0700)
931 self.assertEqual(uid, self.UID_A)
932 self.assertEqual(gid, self.GID_A)
934 def testMakeDirWithPerm(self):
935 is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
936 utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
937 _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
939 def testDirErrors(self):
940 self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
941 "/ganeti-qa-non-test", 0700, 0, 0,
942 _lstat_fn=_MockStatResult(None, 0, 0, 0))
943 self.assertRaises(IndexError, self._mkdir_calls.pop)
945 other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
946 self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
947 "/ganeti-qa-non-test", 0700, 0, 0,
948 _lstat_fn=other_stat_raise)
949 self.assertRaises(IndexError, self._mkdir_calls.pop)
951 non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
952 utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
953 _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
954 _perm_fn=self._VerifyPerm)
955 self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
957 def testEnforcePermissionNoEnt(self):
958 self.assertRaises(errors.GenericError, utils.EnforcePermission,
959 "/ganeti-qa-non-test", 0600,
960 _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
961 _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
963 def testEnforcePermissionNoEntMustNotExist(self):
964 utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
965 _chmod_fn=NotImplemented,
966 _chown_fn=NotImplemented,
967 _stat_fn=_MockStatResult(_RaiseNoEntError,
970 def testEnforcePermissionOtherErrorMustNotExist(self):
971 self.assertRaises(errors.GenericError, utils.EnforcePermission,
972 "/ganeti-qa-non-test", 0600, must_exist=False,
973 _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
974 _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
976 def testEnforcePermissionNoChanges(self):
977 utils.EnforcePermission("/ganeti-qa-non-test", 0600,
978 _stat_fn=_MockStatResult(None, 0600, 0, 0),
979 _chmod_fn=self._ChmodWrapper(None),
980 _chown_fn=self._FakeChown)
982 def testEnforcePermissionChangeMode(self):
983 utils.EnforcePermission("/ganeti-qa-non-test", 0444,
984 _stat_fn=_MockStatResult(None, 0600, 0, 0),
985 _chmod_fn=self._ChmodWrapper(None),
986 _chown_fn=self._FakeChown)
987 self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
989 def testEnforcePermissionSetUidGid(self):
990 utils.EnforcePermission("/ganeti-qa-non-test", 0600,
991 uid=self.UID_B, gid=self.GID_B,
992 _stat_fn=_MockStatResult(None, 0600,
995 _chmod_fn=self._ChmodWrapper(None),
996 _chown_fn=self._FakeChown)
997 self.assertEqual(self._chown_calls.pop(0),
998 ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
1001 if __name__ == "__main__":
1002 testutils.GanetiTestProgram()