Merge branch 'devel-2.5'
[ganeti-local] / test / ganeti.utils.io_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.io"""
23
24 import os
25 import tempfile
26 import unittest
27 import shutil
28 import glob
29 import time
30 import signal
31 import stat
32 import errno
33
34 from ganeti import constants
35 from ganeti import utils
36 from ganeti import compat
37 from ganeti import errors
38
39 import testutils
40
41
42 class TestReadFile(testutils.GanetiTestCase):
43   def testReadAll(self):
44     data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45     self.assertEqual(len(data), 814)
46
47     h = compat.md5_hash()
48     h.update(data)
49     self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
50
51   def testReadSize(self):
52     data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
53                           size=100)
54     self.assertEqual(len(data), 100)
55
56     h = compat.md5_hash()
57     h.update(data)
58     self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
59
60   def testCallback(self):
61     def _Cb(fh):
62       self.assertEqual(fh.tell(), 0)
63     data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64     self.assertEqual(len(data), 814)
65
66   def testError(self):
67     self.assertRaises(EnvironmentError, utils.ReadFile,
68                       "/dev/null/does-not-exist")
69
70
71 class TestReadOneLineFile(testutils.GanetiTestCase):
72   def setUp(self):
73     testutils.GanetiTestCase.setUp(self)
74
75   def testDefault(self):
76     data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77     self.assertEqual(len(data), 27)
78     self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
79
80   def testNotStrict(self):
81     data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
82                                  strict=False)
83     self.assertEqual(len(data), 27)
84     self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
85
86   def testStrictFailure(self):
87     self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88                       self._TestDataFilename("cert1.pem"), strict=True)
89
90   def testLongLine(self):
91     dummydata = (1024 * "Hello World! ")
92     myfile = self._CreateTempFile()
93     utils.WriteFile(myfile, data=dummydata)
94     datastrict = utils.ReadOneLineFile(myfile, strict=True)
95     datalax = utils.ReadOneLineFile(myfile, strict=False)
96     self.assertEqual(dummydata, datastrict)
97     self.assertEqual(dummydata, datalax)
98
99   def testNewline(self):
100     myfile = self._CreateTempFile()
101     myline = "myline"
102     for nl in ["", "\n", "\r\n"]:
103       dummydata = "%s%s" % (myline, nl)
104       utils.WriteFile(myfile, data=dummydata)
105       datalax = utils.ReadOneLineFile(myfile, strict=False)
106       self.assertEqual(myline, datalax)
107       datastrict = utils.ReadOneLineFile(myfile, strict=True)
108       self.assertEqual(myline, datastrict)
109
110   def testWhitespaceAndMultipleLines(self):
111     myfile = self._CreateTempFile()
112     for nl in ["", "\n", "\r\n"]:
113       for ws in [" ", "\t", "\t\t  \t", "\t "]:
114         dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115         utils.WriteFile(myfile, data=dummydata)
116         datalax = utils.ReadOneLineFile(myfile, strict=False)
117         if nl:
118           self.assert_(set("\r\n") & set(dummydata))
119           self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
120                             myfile, strict=True)
121           explen = len("Foo bar baz ") + len(ws)
122           self.assertEqual(len(datalax), explen)
123           self.assertEqual(datalax, dummydata[:explen])
124           self.assertFalse(set("\r\n") & set(datalax))
125         else:
126           datastrict = utils.ReadOneLineFile(myfile, strict=True)
127           self.assertEqual(dummydata, datastrict)
128           self.assertEqual(dummydata, datalax)
129
130   def testEmptylines(self):
131     myfile = self._CreateTempFile()
132     myline = "myline"
133     for nl in ["\n", "\r\n"]:
134       for ol in ["", "otherline"]:
135         dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136         utils.WriteFile(myfile, data=dummydata)
137         self.assert_(set("\r\n") & set(dummydata))
138         datalax = utils.ReadOneLineFile(myfile, strict=False)
139         self.assertEqual(myline, datalax)
140         if ol:
141           self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
142                             myfile, strict=True)
143         else:
144           datastrict = utils.ReadOneLineFile(myfile, strict=True)
145           self.assertEqual(myline, datastrict)
146
147   def testEmptyfile(self):
148     myfile = self._CreateTempFile()
149     self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
150
151
152 class TestTimestampForFilename(unittest.TestCase):
153   def test(self):
154     self.assert_("." not in utils.TimestampForFilename())
155     self.assert_(":" not in utils.TimestampForFilename())
156
157
158 class TestCreateBackup(testutils.GanetiTestCase):
159   def setUp(self):
160     testutils.GanetiTestCase.setUp(self)
161
162     self.tmpdir = tempfile.mkdtemp()
163
164   def tearDown(self):
165     testutils.GanetiTestCase.tearDown(self)
166
167     shutil.rmtree(self.tmpdir)
168
169   def testEmpty(self):
170     filename = utils.PathJoin(self.tmpdir, "config.data")
171     utils.WriteFile(filename, data="")
172     bname = utils.CreateBackup(filename)
173     self.assertFileContent(bname, "")
174     self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175     utils.CreateBackup(filename)
176     self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177     utils.CreateBackup(filename)
178     self.assertEqual(len(glob.glob("%s*" % filename)), 4)
179
180     fifoname = utils.PathJoin(self.tmpdir, "fifo")
181     os.mkfifo(fifoname)
182     self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
183
184   def testContent(self):
185     bkpcount = 0
186     for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187       for rep in [1, 2, 10, 127]:
188         testdata = data * rep
189
190         filename = utils.PathJoin(self.tmpdir, "test.data_")
191         utils.WriteFile(filename, data=testdata)
192         self.assertFileContent(filename, testdata)
193
194         for _ in range(3):
195           bname = utils.CreateBackup(filename)
196           bkpcount += 1
197           self.assertFileContent(bname, testdata)
198           self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
199
200
201 class TestListVisibleFiles(unittest.TestCase):
202   """Test case for ListVisibleFiles"""
203
204   def setUp(self):
205     self.path = tempfile.mkdtemp()
206
207   def tearDown(self):
208     shutil.rmtree(self.path)
209
210   def _CreateFiles(self, files):
211     for name in files:
212       utils.WriteFile(os.path.join(self.path, name), data="test")
213
214   def _test(self, files, expected):
215     self._CreateFiles(files)
216     found = utils.ListVisibleFiles(self.path)
217     self.assertEqual(set(found), set(expected))
218
219   def testAllVisible(self):
220     files = ["a", "b", "c"]
221     expected = files
222     self._test(files, expected)
223
224   def testNoneVisible(self):
225     files = [".a", ".b", ".c"]
226     expected = []
227     self._test(files, expected)
228
229   def testSomeVisible(self):
230     files = ["a", "b", ".c"]
231     expected = ["a", "b"]
232     self._test(files, expected)
233
234   def testNonAbsolutePath(self):
235     self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
236                           "abc")
237
238   def testNonNormalizedPath(self):
239     self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
240                           "/bin/../tmp")
241
242
243 class TestWriteFile(unittest.TestCase):
244   def setUp(self):
245     self.tmpdir = None
246     self.tfile = tempfile.NamedTemporaryFile()
247     self.did_pre = False
248     self.did_post = False
249     self.did_write = False
250
251   def tearDown(self):
252     if self.tmpdir:
253       shutil.rmtree(self.tmpdir)
254
255   def markPre(self, fd):
256     self.did_pre = True
257
258   def markPost(self, fd):
259     self.did_post = True
260
261   def markWrite(self, fd):
262     self.did_write = True
263
264   def testWrite(self):
265     data = "abc"
266     utils.WriteFile(self.tfile.name, data=data)
267     self.assertEqual(utils.ReadFile(self.tfile.name), data)
268
269   def testWriteSimpleUnicode(self):
270     data = u"abc"
271     utils.WriteFile(self.tfile.name, data=data)
272     self.assertEqual(utils.ReadFile(self.tfile.name), data)
273
274   def testErrors(self):
275     self.assertRaises(errors.ProgrammerError, utils.WriteFile,
276                       self.tfile.name, data="test", fn=lambda fd: None)
277     self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
278     self.assertRaises(errors.ProgrammerError, utils.WriteFile,
279                       self.tfile.name, data="test", atime=0)
280
281   def testPreWrite(self):
282     utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
283     self.assertTrue(self.did_pre)
284     self.assertFalse(self.did_post)
285     self.assertFalse(self.did_write)
286
287   def testPostWrite(self):
288     utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
289     self.assertFalse(self.did_pre)
290     self.assertTrue(self.did_post)
291     self.assertFalse(self.did_write)
292
293   def testWriteFunction(self):
294     utils.WriteFile(self.tfile.name, fn=self.markWrite)
295     self.assertFalse(self.did_pre)
296     self.assertFalse(self.did_post)
297     self.assertTrue(self.did_write)
298
299   def testDryRun(self):
300     orig = "abc"
301     self.tfile.write(orig)
302     self.tfile.flush()
303     utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
304     self.assertEqual(utils.ReadFile(self.tfile.name), orig)
305
306   def testTimes(self):
307     f = self.tfile.name
308     for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
309                    (int(time.time()), 5000)]:
310       utils.WriteFile(f, data="hello", atime=at, mtime=mt)
311       st = os.stat(f)
312       self.assertEqual(st.st_atime, at)
313       self.assertEqual(st.st_mtime, mt)
314
315   def testNoClose(self):
316     data = "hello"
317     self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
318     fd = utils.WriteFile(self.tfile.name, data=data, close=False)
319     try:
320       os.lseek(fd, 0, 0)
321       self.assertEqual(os.read(fd, 4096), data)
322     finally:
323       os.close(fd)
324
325   def testNoLeftovers(self):
326     self.tmpdir = tempfile.mkdtemp()
327     self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
328                                      data="abc"),
329                      None)
330     self.assertEqual(os.listdir(self.tmpdir), ["test"])
331
332   def testFailRename(self):
333     self.tmpdir = tempfile.mkdtemp()
334     target = utils.PathJoin(self.tmpdir, "target")
335     os.mkdir(target)
336     self.assertRaises(OSError, utils.WriteFile, target, data="abc")
337     self.assertTrue(os.path.isdir(target))
338     self.assertEqual(os.listdir(self.tmpdir), ["target"])
339     self.assertFalse(os.listdir(target))
340
341   def testFailRenameDryRun(self):
342     self.tmpdir = tempfile.mkdtemp()
343     target = utils.PathJoin(self.tmpdir, "target")
344     os.mkdir(target)
345     self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
346     self.assertTrue(os.path.isdir(target))
347     self.assertEqual(os.listdir(self.tmpdir), ["target"])
348     self.assertFalse(os.listdir(target))
349
350   def testBackup(self):
351     self.tmpdir = tempfile.mkdtemp()
352     testfile = utils.PathJoin(self.tmpdir, "test")
353
354     self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
355     self.assertEqual(utils.ReadFile(testfile), "foo")
356     self.assertEqual(os.listdir(self.tmpdir), ["test"])
357
358     # Write again
359     assert os.path.isfile(testfile)
360     self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
361     self.assertEqual(utils.ReadFile(testfile), "bar")
362     self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
363     self.assertTrue("test" in os.listdir(self.tmpdir))
364     self.assertEqual(len(os.listdir(self.tmpdir)), 2)
365
366     # Write again as dry-run
367     assert os.path.isfile(testfile)
368     self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
369                                      dry_run=True),
370                      None)
371     self.assertEqual(utils.ReadFile(testfile), "bar")
372     self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
373     self.assertTrue("test" in os.listdir(self.tmpdir))
374     self.assertEqual(len(os.listdir(self.tmpdir)), 2)
375
376
377 class TestFileID(testutils.GanetiTestCase):
378   def testEquality(self):
379     name = self._CreateTempFile()
380     oldi = utils.GetFileID(path=name)
381     self.failUnless(utils.VerifyFileID(oldi, oldi))
382
383   def testUpdate(self):
384     name = self._CreateTempFile()
385     oldi = utils.GetFileID(path=name)
386     os.utime(name, None)
387     fd = os.open(name, os.O_RDWR)
388     try:
389       newi = utils.GetFileID(fd=fd)
390       self.failUnless(utils.VerifyFileID(oldi, newi))
391       self.failUnless(utils.VerifyFileID(newi, oldi))
392     finally:
393       os.close(fd)
394
395   def testWriteFile(self):
396     name = self._CreateTempFile()
397     oldi = utils.GetFileID(path=name)
398     mtime = oldi[2]
399     os.utime(name, (mtime + 10, mtime + 10))
400     self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
401                       oldi, data="")
402     os.utime(name, (mtime - 10, mtime - 10))
403     utils.SafeWriteFile(name, oldi, data="")
404     oldi = utils.GetFileID(path=name)
405     mtime = oldi[2]
406     os.utime(name, (mtime + 10, mtime + 10))
407     # this doesn't raise, since we passed None
408     utils.SafeWriteFile(name, None, data="")
409
410   def testError(self):
411     t = tempfile.NamedTemporaryFile()
412     self.assertRaises(errors.ProgrammerError, utils.GetFileID,
413                       path=t.name, fd=t.fileno())
414
415
416 class TestRemoveFile(unittest.TestCase):
417   """Test case for the RemoveFile function"""
418
419   def setUp(self):
420     """Create a temp dir and file for each case"""
421     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
422     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
423     os.close(fd)
424
425   def tearDown(self):
426     if os.path.exists(self.tmpfile):
427       os.unlink(self.tmpfile)
428     os.rmdir(self.tmpdir)
429
430   def testIgnoreDirs(self):
431     """Test that RemoveFile() ignores directories"""
432     self.assertEqual(None, utils.RemoveFile(self.tmpdir))
433
434   def testIgnoreNotExisting(self):
435     """Test that RemoveFile() ignores non-existing files"""
436     utils.RemoveFile(self.tmpfile)
437     utils.RemoveFile(self.tmpfile)
438
439   def testRemoveFile(self):
440     """Test that RemoveFile does remove a file"""
441     utils.RemoveFile(self.tmpfile)
442     if os.path.exists(self.tmpfile):
443       self.fail("File '%s' not removed" % self.tmpfile)
444
445   def testRemoveSymlink(self):
446     """Test that RemoveFile does remove symlinks"""
447     symlink = self.tmpdir + "/symlink"
448     os.symlink("no-such-file", symlink)
449     utils.RemoveFile(symlink)
450     if os.path.exists(symlink):
451       self.fail("File '%s' not removed" % symlink)
452     os.symlink(self.tmpfile, symlink)
453     utils.RemoveFile(symlink)
454     if os.path.exists(symlink):
455       self.fail("File '%s' not removed" % symlink)
456
457
458 class TestRemoveDir(unittest.TestCase):
459   def setUp(self):
460     self.tmpdir = tempfile.mkdtemp()
461
462   def tearDown(self):
463     try:
464       shutil.rmtree(self.tmpdir)
465     except EnvironmentError:
466       pass
467
468   def testEmptyDir(self):
469     utils.RemoveDir(self.tmpdir)
470     self.assertFalse(os.path.isdir(self.tmpdir))
471
472   def testNonEmptyDir(self):
473     self.tmpfile = os.path.join(self.tmpdir, "test1")
474     open(self.tmpfile, "w").close()
475     self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
476
477
478 class TestRename(unittest.TestCase):
479   """Test case for RenameFile"""
480
481   def setUp(self):
482     """Create a temporary directory"""
483     self.tmpdir = tempfile.mkdtemp()
484     self.tmpfile = os.path.join(self.tmpdir, "test1")
485
486     # Touch the file
487     open(self.tmpfile, "w").close()
488
489   def tearDown(self):
490     """Remove temporary directory"""
491     shutil.rmtree(self.tmpdir)
492
493   def testSimpleRename1(self):
494     """Simple rename 1"""
495     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
496     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
497
498   def testSimpleRename2(self):
499     """Simple rename 2"""
500     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
501                      mkdir=True)
502     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
503
504   def testRenameMkdir(self):
505     """Rename with mkdir"""
506     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
507                      mkdir=True)
508     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
509     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
510
511     self.assertRaises(EnvironmentError, utils.RenameFile,
512                       os.path.join(self.tmpdir, "test/xyz"),
513                       os.path.join(self.tmpdir, "test/foo/bar/baz"),
514                       mkdir=True)
515
516     self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
517     self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
518     self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
519                                                  "test/foo/bar/baz")))
520
521
522 class TestMakedirs(unittest.TestCase):
523   def setUp(self):
524     self.tmpdir = tempfile.mkdtemp()
525
526   def tearDown(self):
527     shutil.rmtree(self.tmpdir)
528
529   def testNonExisting(self):
530     path = utils.PathJoin(self.tmpdir, "foo")
531     utils.Makedirs(path)
532     self.assert_(os.path.isdir(path))
533
534   def testExisting(self):
535     path = utils.PathJoin(self.tmpdir, "foo")
536     os.mkdir(path)
537     utils.Makedirs(path)
538     self.assert_(os.path.isdir(path))
539
540   def testRecursiveNonExisting(self):
541     path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
542     utils.Makedirs(path)
543     self.assert_(os.path.isdir(path))
544
545   def testRecursiveExisting(self):
546     path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
547     self.assertFalse(os.path.exists(path))
548     os.mkdir(utils.PathJoin(self.tmpdir, "B"))
549     utils.Makedirs(path)
550     self.assert_(os.path.isdir(path))
551
552
553 class TestEnsureDirs(unittest.TestCase):
554   """Tests for EnsureDirs"""
555
556   def setUp(self):
557     self.dir = tempfile.mkdtemp()
558     self.old_umask = os.umask(0777)
559
560   def testEnsureDirs(self):
561     utils.EnsureDirs([
562         (utils.PathJoin(self.dir, "foo"), 0777),
563         (utils.PathJoin(self.dir, "bar"), 0000),
564         ])
565     self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
566     self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
567
568   def tearDown(self):
569     os.rmdir(utils.PathJoin(self.dir, "foo"))
570     os.rmdir(utils.PathJoin(self.dir, "bar"))
571     os.rmdir(self.dir)
572     os.umask(self.old_umask)
573
574
575 class TestIsNormAbsPath(unittest.TestCase):
576   """Testing case for IsNormAbsPath"""
577
578   def _pathTestHelper(self, path, result):
579     if result:
580       self.assert_(utils.IsNormAbsPath(path),
581           "Path %s should result absolute and normalized" % path)
582     else:
583       self.assertFalse(utils.IsNormAbsPath(path),
584           "Path %s should not result absolute and normalized" % path)
585
586   def testBase(self):
587     self._pathTestHelper("/etc", True)
588     self._pathTestHelper("/srv", True)
589     self._pathTestHelper("etc", False)
590     self._pathTestHelper("/etc/../root", False)
591     self._pathTestHelper("/etc/", False)
592
593
594 class TestIsBelowDir(unittest.TestCase):
595   """Testing case for IsBelowDir"""
596
597   def testSamePrefix(self):
598     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
599     self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
600
601   def testSamePrefixButDifferentDir(self):
602     self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
603     self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
604
605   def testSamePrefixButDirTraversal(self):
606     self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
607     self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
608
609   def testSamePrefixAndTraversal(self):
610     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
611     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
612     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
613
614   def testBothAbsPath(self):
615     self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
616     self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
617     self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
618
619
620 class TestPathJoin(unittest.TestCase):
621   """Testing case for PathJoin"""
622
623   def testBasicItems(self):
624     mlist = ["/a", "b", "c"]
625     self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
626
627   def testNonAbsPrefix(self):
628     self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
629
630   def testBackTrack(self):
631     self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
632
633   def testMultiAbs(self):
634     self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
635
636
637 class TestTailFile(testutils.GanetiTestCase):
638   """Test case for the TailFile function"""
639
640   def testEmpty(self):
641     fname = self._CreateTempFile()
642     self.failUnlessEqual(utils.TailFile(fname), [])
643     self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
644
645   def testAllLines(self):
646     data = ["test %d" % i for i in range(30)]
647     for i in range(30):
648       fname = self._CreateTempFile()
649       fd = open(fname, "w")
650       fd.write("\n".join(data[:i]))
651       if i > 0:
652         fd.write("\n")
653       fd.close()
654       self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
655
656   def testPartialLines(self):
657     data = ["test %d" % i for i in range(30)]
658     fname = self._CreateTempFile()
659     fd = open(fname, "w")
660     fd.write("\n".join(data))
661     fd.write("\n")
662     fd.close()
663     for i in range(1, 30):
664       self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
665
666   def testBigFile(self):
667     data = ["test %d" % i for i in range(30)]
668     fname = self._CreateTempFile()
669     fd = open(fname, "w")
670     fd.write("X" * 1048576)
671     fd.write("\n")
672     fd.write("\n".join(data))
673     fd.write("\n")
674     fd.close()
675     for i in range(1, 30):
676       self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
677
678
679 class TestPidFileFunctions(unittest.TestCase):
680   """Tests for WritePidFile and ReadPidFile"""
681
682   def setUp(self):
683     self.dir = tempfile.mkdtemp()
684     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
685
686   def testPidFileFunctions(self):
687     pid_file = self.f_dpn('test')
688     fd = utils.WritePidFile(self.f_dpn('test'))
689     self.failUnless(os.path.exists(pid_file),
690                     "PID file should have been created")
691     read_pid = utils.ReadPidFile(pid_file)
692     self.failUnlessEqual(read_pid, os.getpid())
693     self.failUnless(utils.IsProcessAlive(read_pid))
694     self.failUnlessRaises(errors.LockError, utils.WritePidFile,
695                           self.f_dpn('test'))
696     os.close(fd)
697     utils.RemoveFile(self.f_dpn("test"))
698     self.failIf(os.path.exists(pid_file),
699                 "PID file should not exist anymore")
700     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
701                          "ReadPidFile should return 0 for missing pid file")
702     fh = open(pid_file, "w")
703     fh.write("blah\n")
704     fh.close()
705     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
706                          "ReadPidFile should return 0 for invalid pid file")
707     # but now, even with the file existing, we should be able to lock it
708     fd = utils.WritePidFile(self.f_dpn('test'))
709     os.close(fd)
710     utils.RemoveFile(self.f_dpn("test"))
711     self.failIf(os.path.exists(pid_file),
712                 "PID file should not exist anymore")
713
714   def testKill(self):
715     pid_file = self.f_dpn('child')
716     r_fd, w_fd = os.pipe()
717     new_pid = os.fork()
718     if new_pid == 0: #child
719       utils.WritePidFile(self.f_dpn('child'))
720       os.write(w_fd, 'a')
721       signal.pause()
722       os._exit(0)
723       return
724     # else we are in the parent
725     # wait until the child has written the pid file
726     os.read(r_fd, 1)
727     read_pid = utils.ReadPidFile(pid_file)
728     self.failUnlessEqual(read_pid, new_pid)
729     self.failUnless(utils.IsProcessAlive(new_pid))
730     utils.KillProcess(new_pid, waitpid=True)
731     self.failIf(utils.IsProcessAlive(new_pid))
732     utils.RemoveFile(self.f_dpn('child'))
733     self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
734
735   def tearDown(self):
736     shutil.rmtree(self.dir)
737
738
739 class TestSshKeys(testutils.GanetiTestCase):
740   """Test case for the AddAuthorizedKey function"""
741
742   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
743   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
744            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
745
746   def setUp(self):
747     testutils.GanetiTestCase.setUp(self)
748     self.tmpname = self._CreateTempFile()
749     handle = open(self.tmpname, 'w')
750     try:
751       handle.write("%s\n" % TestSshKeys.KEY_A)
752       handle.write("%s\n" % TestSshKeys.KEY_B)
753     finally:
754       handle.close()
755
756   def testAddingNewKey(self):
757     utils.AddAuthorizedKey(self.tmpname,
758                            'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
759
760     self.assertFileContent(self.tmpname,
761       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
762       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
763       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
764       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
765
766   def testAddingAlmostButNotCompletelyTheSameKey(self):
767     utils.AddAuthorizedKey(self.tmpname,
768         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
769
770     self.assertFileContent(self.tmpname,
771       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
772       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
773       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
774       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
775
776   def testAddingExistingKeyWithSomeMoreSpaces(self):
777     utils.AddAuthorizedKey(self.tmpname,
778         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
779
780     self.assertFileContent(self.tmpname,
781       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
782       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
783       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
784
785   def testRemovingExistingKeyWithSomeMoreSpaces(self):
786     utils.RemoveAuthorizedKey(self.tmpname,
787         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
788
789     self.assertFileContent(self.tmpname,
790       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
791       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
792
793   def testRemovingNonExistingKey(self):
794     utils.RemoveAuthorizedKey(self.tmpname,
795         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
796
797     self.assertFileContent(self.tmpname,
798       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
799       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
800       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
801
802
803 class TestNewUUID(unittest.TestCase):
804   """Test case for NewUUID"""
805
806   def runTest(self):
807     self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
808
809
810 def _MockStatResult(cb, mode, uid, gid):
811   def _fn(path):
812     if cb:
813       cb()
814     return {
815       stat.ST_MODE: mode,
816       stat.ST_UID: uid,
817       stat.ST_GID: gid,
818       }
819   return _fn
820
821
822 def _RaiseNoEntError():
823   raise EnvironmentError(errno.ENOENT, "not found")
824
825
826 def _OtherStatRaise():
827   raise EnvironmentError()
828
829
830 class TestPermissionEnforcements(unittest.TestCase):
831   UID_A = 16024
832   UID_B = 25850
833   GID_A = 14028
834   GID_B = 29801
835
836   def setUp(self):
837     self._chown_calls = []
838     self._chmod_calls = []
839     self._mkdir_calls = []
840
841   def tearDown(self):
842     self.assertRaises(IndexError, self._mkdir_calls.pop)
843     self.assertRaises(IndexError, self._chmod_calls.pop)
844     self.assertRaises(IndexError, self._chown_calls.pop)
845
846   def _FakeMkdir(self, path):
847     self._mkdir_calls.append(path)
848
849   def _FakeChown(self, path, uid, gid):
850     self._chown_calls.append((path, uid, gid))
851
852   def _ChmodWrapper(self, cb):
853     def _fn(path, mode):
854       self._chmod_calls.append((path, mode))
855       if cb:
856         cb()
857     return _fn
858
859   def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
860     self.assertEqual(path, "/ganeti-qa-non-test")
861     self.assertEqual(mode, 0700)
862     self.assertEqual(uid, self.UID_A)
863     self.assertEqual(gid, self.GID_A)
864
865   def testMakeDirWithPerm(self):
866     is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
867     utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
868                           _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
869
870   def testDirErrors(self):
871     self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
872                       "/ganeti-qa-non-test", 0700, 0, 0,
873                       _lstat_fn=_MockStatResult(None, 0, 0, 0))
874     self.assertRaises(IndexError, self._mkdir_calls.pop)
875
876     other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
877     self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
878                       "/ganeti-qa-non-test", 0700, 0, 0,
879                       _lstat_fn=other_stat_raise)
880     self.assertRaises(IndexError, self._mkdir_calls.pop)
881
882     non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
883     utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
884                           _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
885                           _perm_fn=self._VerifyPerm)
886     self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
887
888   def testEnforcePermissionNoEnt(self):
889     self.assertRaises(errors.GenericError, utils.EnforcePermission,
890                       "/ganeti-qa-non-test", 0600,
891                       _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
892                       _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
893
894   def testEnforcePermissionNoEntMustNotExist(self):
895     utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
896                             _chmod_fn=NotImplemented,
897                             _chown_fn=NotImplemented,
898                             _stat_fn=_MockStatResult(_RaiseNoEntError,
899                                                           0, 0, 0))
900
901   def testEnforcePermissionOtherErrorMustNotExist(self):
902     self.assertRaises(errors.GenericError, utils.EnforcePermission,
903                       "/ganeti-qa-non-test", 0600, must_exist=False,
904                       _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
905                       _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
906
907   def testEnforcePermissionNoChanges(self):
908     utils.EnforcePermission("/ganeti-qa-non-test", 0600,
909                             _stat_fn=_MockStatResult(None, 0600, 0, 0),
910                             _chmod_fn=self._ChmodWrapper(None),
911                             _chown_fn=self._FakeChown)
912
913   def testEnforcePermissionChangeMode(self):
914     utils.EnforcePermission("/ganeti-qa-non-test", 0444,
915                             _stat_fn=_MockStatResult(None, 0600, 0, 0),
916                             _chmod_fn=self._ChmodWrapper(None),
917                             _chown_fn=self._FakeChown)
918     self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
919
920   def testEnforcePermissionSetUidGid(self):
921     utils.EnforcePermission("/ganeti-qa-non-test", 0600,
922                             uid=self.UID_B, gid=self.GID_B,
923                             _stat_fn=_MockStatResult(None, 0600,
924                                                      self.UID_A,
925                                                      self.GID_A),
926                             _chmod_fn=self._ChmodWrapper(None),
927                             _chown_fn=self._FakeChown)
928     self.assertEqual(self._chown_calls.pop(0),
929                      ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
930
931
932 if __name__ == "__main__":
933   testutils.GanetiTestProgram()