Merge branch 'devel-2.5'
[ganeti-local] / test / ganeti.utils.io_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.io"""
23
24 import os
25 import tempfile
26 import unittest
27 import shutil
28 import glob
29 import time
30 import signal
31
32 from ganeti import constants
33 from ganeti import utils
34 from ganeti import compat
35 from ganeti import errors
36
37 import testutils
38
39
40 class TestReadFile(testutils.GanetiTestCase):
41   def testReadAll(self):
42     data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
43     self.assertEqual(len(data), 814)
44
45     h = compat.md5_hash()
46     h.update(data)
47     self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
48
49   def testReadSize(self):
50     data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
51                           size=100)
52     self.assertEqual(len(data), 100)
53
54     h = compat.md5_hash()
55     h.update(data)
56     self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
57
58   def testCallback(self):
59     def _Cb(fh):
60       self.assertEqual(fh.tell(), 0)
61     data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
62     self.assertEqual(len(data), 814)
63
64   def testError(self):
65     self.assertRaises(EnvironmentError, utils.ReadFile,
66                       "/dev/null/does-not-exist")
67
68
69 class TestReadOneLineFile(testutils.GanetiTestCase):
70   def setUp(self):
71     testutils.GanetiTestCase.setUp(self)
72
73   def testDefault(self):
74     data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
75     self.assertEqual(len(data), 27)
76     self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
77
78   def testNotStrict(self):
79     data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
80                                  strict=False)
81     self.assertEqual(len(data), 27)
82     self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
83
84   def testStrictFailure(self):
85     self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
86                       self._TestDataFilename("cert1.pem"), strict=True)
87
88   def testLongLine(self):
89     dummydata = (1024 * "Hello World! ")
90     myfile = self._CreateTempFile()
91     utils.WriteFile(myfile, data=dummydata)
92     datastrict = utils.ReadOneLineFile(myfile, strict=True)
93     datalax = utils.ReadOneLineFile(myfile, strict=False)
94     self.assertEqual(dummydata, datastrict)
95     self.assertEqual(dummydata, datalax)
96
97   def testNewline(self):
98     myfile = self._CreateTempFile()
99     myline = "myline"
100     for nl in ["", "\n", "\r\n"]:
101       dummydata = "%s%s" % (myline, nl)
102       utils.WriteFile(myfile, data=dummydata)
103       datalax = utils.ReadOneLineFile(myfile, strict=False)
104       self.assertEqual(myline, datalax)
105       datastrict = utils.ReadOneLineFile(myfile, strict=True)
106       self.assertEqual(myline, datastrict)
107
108   def testWhitespaceAndMultipleLines(self):
109     myfile = self._CreateTempFile()
110     for nl in ["", "\n", "\r\n"]:
111       for ws in [" ", "\t", "\t\t  \t", "\t "]:
112         dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
113         utils.WriteFile(myfile, data=dummydata)
114         datalax = utils.ReadOneLineFile(myfile, strict=False)
115         if nl:
116           self.assert_(set("\r\n") & set(dummydata))
117           self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
118                             myfile, strict=True)
119           explen = len("Foo bar baz ") + len(ws)
120           self.assertEqual(len(datalax), explen)
121           self.assertEqual(datalax, dummydata[:explen])
122           self.assertFalse(set("\r\n") & set(datalax))
123         else:
124           datastrict = utils.ReadOneLineFile(myfile, strict=True)
125           self.assertEqual(dummydata, datastrict)
126           self.assertEqual(dummydata, datalax)
127
128   def testEmptylines(self):
129     myfile = self._CreateTempFile()
130     myline = "myline"
131     for nl in ["\n", "\r\n"]:
132       for ol in ["", "otherline"]:
133         dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
134         utils.WriteFile(myfile, data=dummydata)
135         self.assert_(set("\r\n") & set(dummydata))
136         datalax = utils.ReadOneLineFile(myfile, strict=False)
137         self.assertEqual(myline, datalax)
138         if ol:
139           self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
140                             myfile, strict=True)
141         else:
142           datastrict = utils.ReadOneLineFile(myfile, strict=True)
143           self.assertEqual(myline, datastrict)
144
145   def testEmptyfile(self):
146     myfile = self._CreateTempFile()
147     self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
148
149
150 class TestTimestampForFilename(unittest.TestCase):
151   def test(self):
152     self.assert_("." not in utils.TimestampForFilename())
153     self.assert_(":" not in utils.TimestampForFilename())
154
155
156 class TestCreateBackup(testutils.GanetiTestCase):
157   def setUp(self):
158     testutils.GanetiTestCase.setUp(self)
159
160     self.tmpdir = tempfile.mkdtemp()
161
162   def tearDown(self):
163     testutils.GanetiTestCase.tearDown(self)
164
165     shutil.rmtree(self.tmpdir)
166
167   def testEmpty(self):
168     filename = utils.PathJoin(self.tmpdir, "config.data")
169     utils.WriteFile(filename, data="")
170     bname = utils.CreateBackup(filename)
171     self.assertFileContent(bname, "")
172     self.assertEqual(len(glob.glob("%s*" % filename)), 2)
173     utils.CreateBackup(filename)
174     self.assertEqual(len(glob.glob("%s*" % filename)), 3)
175     utils.CreateBackup(filename)
176     self.assertEqual(len(glob.glob("%s*" % filename)), 4)
177
178     fifoname = utils.PathJoin(self.tmpdir, "fifo")
179     os.mkfifo(fifoname)
180     self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
181
182   def testContent(self):
183     bkpcount = 0
184     for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
185       for rep in [1, 2, 10, 127]:
186         testdata = data * rep
187
188         filename = utils.PathJoin(self.tmpdir, "test.data_")
189         utils.WriteFile(filename, data=testdata)
190         self.assertFileContent(filename, testdata)
191
192         for _ in range(3):
193           bname = utils.CreateBackup(filename)
194           bkpcount += 1
195           self.assertFileContent(bname, testdata)
196           self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
197
198
199 class TestListVisibleFiles(unittest.TestCase):
200   """Test case for ListVisibleFiles"""
201
202   def setUp(self):
203     self.path = tempfile.mkdtemp()
204
205   def tearDown(self):
206     shutil.rmtree(self.path)
207
208   def _CreateFiles(self, files):
209     for name in files:
210       utils.WriteFile(os.path.join(self.path, name), data="test")
211
212   def _test(self, files, expected):
213     self._CreateFiles(files)
214     found = utils.ListVisibleFiles(self.path)
215     self.assertEqual(set(found), set(expected))
216
217   def testAllVisible(self):
218     files = ["a", "b", "c"]
219     expected = files
220     self._test(files, expected)
221
222   def testNoneVisible(self):
223     files = [".a", ".b", ".c"]
224     expected = []
225     self._test(files, expected)
226
227   def testSomeVisible(self):
228     files = ["a", "b", ".c"]
229     expected = ["a", "b"]
230     self._test(files, expected)
231
232   def testNonAbsolutePath(self):
233     self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
234                           "abc")
235
236   def testNonNormalizedPath(self):
237     self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
238                           "/bin/../tmp")
239
240
241 class TestWriteFile(unittest.TestCase):
242   def setUp(self):
243     self.tmpdir = None
244     self.tfile = tempfile.NamedTemporaryFile()
245     self.did_pre = False
246     self.did_post = False
247     self.did_write = False
248
249   def tearDown(self):
250     if self.tmpdir:
251       shutil.rmtree(self.tmpdir)
252
253   def markPre(self, fd):
254     self.did_pre = True
255
256   def markPost(self, fd):
257     self.did_post = True
258
259   def markWrite(self, fd):
260     self.did_write = True
261
262   def testWrite(self):
263     data = "abc"
264     utils.WriteFile(self.tfile.name, data=data)
265     self.assertEqual(utils.ReadFile(self.tfile.name), data)
266
267   def testWriteSimpleUnicode(self):
268     data = u"abc"
269     utils.WriteFile(self.tfile.name, data=data)
270     self.assertEqual(utils.ReadFile(self.tfile.name), data)
271
272   def testErrors(self):
273     self.assertRaises(errors.ProgrammerError, utils.WriteFile,
274                       self.tfile.name, data="test", fn=lambda fd: None)
275     self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
276     self.assertRaises(errors.ProgrammerError, utils.WriteFile,
277                       self.tfile.name, data="test", atime=0)
278
279   def testPreWrite(self):
280     utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
281     self.assertTrue(self.did_pre)
282     self.assertFalse(self.did_post)
283     self.assertFalse(self.did_write)
284
285   def testPostWrite(self):
286     utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
287     self.assertFalse(self.did_pre)
288     self.assertTrue(self.did_post)
289     self.assertFalse(self.did_write)
290
291   def testWriteFunction(self):
292     utils.WriteFile(self.tfile.name, fn=self.markWrite)
293     self.assertFalse(self.did_pre)
294     self.assertFalse(self.did_post)
295     self.assertTrue(self.did_write)
296
297   def testDryRun(self):
298     orig = "abc"
299     self.tfile.write(orig)
300     self.tfile.flush()
301     utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
302     self.assertEqual(utils.ReadFile(self.tfile.name), orig)
303
304   def testTimes(self):
305     f = self.tfile.name
306     for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
307                    (int(time.time()), 5000)]:
308       utils.WriteFile(f, data="hello", atime=at, mtime=mt)
309       st = os.stat(f)
310       self.assertEqual(st.st_atime, at)
311       self.assertEqual(st.st_mtime, mt)
312
313   def testNoClose(self):
314     data = "hello"
315     self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
316     fd = utils.WriteFile(self.tfile.name, data=data, close=False)
317     try:
318       os.lseek(fd, 0, 0)
319       self.assertEqual(os.read(fd, 4096), data)
320     finally:
321       os.close(fd)
322
323   def testNoLeftovers(self):
324     self.tmpdir = tempfile.mkdtemp()
325     self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
326                                      data="abc"),
327                      None)
328     self.assertEqual(os.listdir(self.tmpdir), ["test"])
329
330   def testFailRename(self):
331     self.tmpdir = tempfile.mkdtemp()
332     target = utils.PathJoin(self.tmpdir, "target")
333     os.mkdir(target)
334     self.assertRaises(OSError, utils.WriteFile, target, data="abc")
335     self.assertTrue(os.path.isdir(target))
336     self.assertEqual(os.listdir(self.tmpdir), ["target"])
337     self.assertFalse(os.listdir(target))
338
339   def testFailRenameDryRun(self):
340     self.tmpdir = tempfile.mkdtemp()
341     target = utils.PathJoin(self.tmpdir, "target")
342     os.mkdir(target)
343     self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
344     self.assertTrue(os.path.isdir(target))
345     self.assertEqual(os.listdir(self.tmpdir), ["target"])
346     self.assertFalse(os.listdir(target))
347
348   def testBackup(self):
349     self.tmpdir = tempfile.mkdtemp()
350     testfile = utils.PathJoin(self.tmpdir, "test")
351
352     self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
353     self.assertEqual(utils.ReadFile(testfile), "foo")
354     self.assertEqual(os.listdir(self.tmpdir), ["test"])
355
356     # Write again
357     assert os.path.isfile(testfile)
358     self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
359     self.assertEqual(utils.ReadFile(testfile), "bar")
360     self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
361     self.assertTrue("test" in os.listdir(self.tmpdir))
362     self.assertEqual(len(os.listdir(self.tmpdir)), 2)
363
364     # Write again as dry-run
365     assert os.path.isfile(testfile)
366     self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
367                                      dry_run=True),
368                      None)
369     self.assertEqual(utils.ReadFile(testfile), "bar")
370     self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
371     self.assertTrue("test" in os.listdir(self.tmpdir))
372     self.assertEqual(len(os.listdir(self.tmpdir)), 2)
373
374
375 class TestFileID(testutils.GanetiTestCase):
376   def testEquality(self):
377     name = self._CreateTempFile()
378     oldi = utils.GetFileID(path=name)
379     self.failUnless(utils.VerifyFileID(oldi, oldi))
380
381   def testUpdate(self):
382     name = self._CreateTempFile()
383     oldi = utils.GetFileID(path=name)
384     os.utime(name, None)
385     fd = os.open(name, os.O_RDWR)
386     try:
387       newi = utils.GetFileID(fd=fd)
388       self.failUnless(utils.VerifyFileID(oldi, newi))
389       self.failUnless(utils.VerifyFileID(newi, oldi))
390     finally:
391       os.close(fd)
392
393   def testWriteFile(self):
394     name = self._CreateTempFile()
395     oldi = utils.GetFileID(path=name)
396     mtime = oldi[2]
397     os.utime(name, (mtime + 10, mtime + 10))
398     self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
399                       oldi, data="")
400     os.utime(name, (mtime - 10, mtime - 10))
401     utils.SafeWriteFile(name, oldi, data="")
402     oldi = utils.GetFileID(path=name)
403     mtime = oldi[2]
404     os.utime(name, (mtime + 10, mtime + 10))
405     # this doesn't raise, since we passed None
406     utils.SafeWriteFile(name, None, data="")
407
408   def testError(self):
409     t = tempfile.NamedTemporaryFile()
410     self.assertRaises(errors.ProgrammerError, utils.GetFileID,
411                       path=t.name, fd=t.fileno())
412
413
414 class TestRemoveFile(unittest.TestCase):
415   """Test case for the RemoveFile function"""
416
417   def setUp(self):
418     """Create a temp dir and file for each case"""
419     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
420     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
421     os.close(fd)
422
423   def tearDown(self):
424     if os.path.exists(self.tmpfile):
425       os.unlink(self.tmpfile)
426     os.rmdir(self.tmpdir)
427
428   def testIgnoreDirs(self):
429     """Test that RemoveFile() ignores directories"""
430     self.assertEqual(None, utils.RemoveFile(self.tmpdir))
431
432   def testIgnoreNotExisting(self):
433     """Test that RemoveFile() ignores non-existing files"""
434     utils.RemoveFile(self.tmpfile)
435     utils.RemoveFile(self.tmpfile)
436
437   def testRemoveFile(self):
438     """Test that RemoveFile does remove a file"""
439     utils.RemoveFile(self.tmpfile)
440     if os.path.exists(self.tmpfile):
441       self.fail("File '%s' not removed" % self.tmpfile)
442
443   def testRemoveSymlink(self):
444     """Test that RemoveFile does remove symlinks"""
445     symlink = self.tmpdir + "/symlink"
446     os.symlink("no-such-file", symlink)
447     utils.RemoveFile(symlink)
448     if os.path.exists(symlink):
449       self.fail("File '%s' not removed" % symlink)
450     os.symlink(self.tmpfile, symlink)
451     utils.RemoveFile(symlink)
452     if os.path.exists(symlink):
453       self.fail("File '%s' not removed" % symlink)
454
455
456 class TestRemoveDir(unittest.TestCase):
457   def setUp(self):
458     self.tmpdir = tempfile.mkdtemp()
459
460   def tearDown(self):
461     try:
462       shutil.rmtree(self.tmpdir)
463     except EnvironmentError:
464       pass
465
466   def testEmptyDir(self):
467     utils.RemoveDir(self.tmpdir)
468     self.assertFalse(os.path.isdir(self.tmpdir))
469
470   def testNonEmptyDir(self):
471     self.tmpfile = os.path.join(self.tmpdir, "test1")
472     open(self.tmpfile, "w").close()
473     self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
474
475
476 class TestRename(unittest.TestCase):
477   """Test case for RenameFile"""
478
479   def setUp(self):
480     """Create a temporary directory"""
481     self.tmpdir = tempfile.mkdtemp()
482     self.tmpfile = os.path.join(self.tmpdir, "test1")
483
484     # Touch the file
485     open(self.tmpfile, "w").close()
486
487   def tearDown(self):
488     """Remove temporary directory"""
489     shutil.rmtree(self.tmpdir)
490
491   def testSimpleRename1(self):
492     """Simple rename 1"""
493     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
494     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
495
496   def testSimpleRename2(self):
497     """Simple rename 2"""
498     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
499                      mkdir=True)
500     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
501
502   def testRenameMkdir(self):
503     """Rename with mkdir"""
504     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
505                      mkdir=True)
506     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
507     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
508
509     utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
510                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
511                      mkdir=True)
512     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
513     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
514     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
515
516
517 class TestMakedirs(unittest.TestCase):
518   def setUp(self):
519     self.tmpdir = tempfile.mkdtemp()
520
521   def tearDown(self):
522     shutil.rmtree(self.tmpdir)
523
524   def testNonExisting(self):
525     path = utils.PathJoin(self.tmpdir, "foo")
526     utils.Makedirs(path)
527     self.assert_(os.path.isdir(path))
528
529   def testExisting(self):
530     path = utils.PathJoin(self.tmpdir, "foo")
531     os.mkdir(path)
532     utils.Makedirs(path)
533     self.assert_(os.path.isdir(path))
534
535   def testRecursiveNonExisting(self):
536     path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
537     utils.Makedirs(path)
538     self.assert_(os.path.isdir(path))
539
540   def testRecursiveExisting(self):
541     path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
542     self.assertFalse(os.path.exists(path))
543     os.mkdir(utils.PathJoin(self.tmpdir, "B"))
544     utils.Makedirs(path)
545     self.assert_(os.path.isdir(path))
546
547
548 class TestEnsureDirs(unittest.TestCase):
549   """Tests for EnsureDirs"""
550
551   def setUp(self):
552     self.dir = tempfile.mkdtemp()
553     self.old_umask = os.umask(0777)
554
555   def testEnsureDirs(self):
556     utils.EnsureDirs([
557         (utils.PathJoin(self.dir, "foo"), 0777),
558         (utils.PathJoin(self.dir, "bar"), 0000),
559         ])
560     self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
561     self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
562
563   def tearDown(self):
564     os.rmdir(utils.PathJoin(self.dir, "foo"))
565     os.rmdir(utils.PathJoin(self.dir, "bar"))
566     os.rmdir(self.dir)
567     os.umask(self.old_umask)
568
569
570 class TestIsNormAbsPath(unittest.TestCase):
571   """Testing case for IsNormAbsPath"""
572
573   def _pathTestHelper(self, path, result):
574     if result:
575       self.assert_(utils.IsNormAbsPath(path),
576           "Path %s should result absolute and normalized" % path)
577     else:
578       self.assertFalse(utils.IsNormAbsPath(path),
579           "Path %s should not result absolute and normalized" % path)
580
581   def testBase(self):
582     self._pathTestHelper("/etc", True)
583     self._pathTestHelper("/srv", True)
584     self._pathTestHelper("etc", False)
585     self._pathTestHelper("/etc/../root", False)
586     self._pathTestHelper("/etc/", False)
587
588
589 class TestIsBelowDir(unittest.TestCase):
590   """Testing case for IsBelowDir"""
591
592   def testSamePrefix(self):
593     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
594     self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
595
596   def testSamePrefixButDifferentDir(self):
597     self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
598     self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
599
600   def testSamePrefixButDirTraversal(self):
601     self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
602     self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
603
604   def testSamePrefixAndTraversal(self):
605     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
606     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
607     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
608
609   def testBothAbsPath(self):
610     self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
611     self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
612     self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
613
614
615 class TestPathJoin(unittest.TestCase):
616   """Testing case for PathJoin"""
617
618   def testBasicItems(self):
619     mlist = ["/a", "b", "c"]
620     self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
621
622   def testNonAbsPrefix(self):
623     self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
624
625   def testBackTrack(self):
626     self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
627
628   def testMultiAbs(self):
629     self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
630
631
632 class TestTailFile(testutils.GanetiTestCase):
633   """Test case for the TailFile function"""
634
635   def testEmpty(self):
636     fname = self._CreateTempFile()
637     self.failUnlessEqual(utils.TailFile(fname), [])
638     self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
639
640   def testAllLines(self):
641     data = ["test %d" % i for i in range(30)]
642     for i in range(30):
643       fname = self._CreateTempFile()
644       fd = open(fname, "w")
645       fd.write("\n".join(data[:i]))
646       if i > 0:
647         fd.write("\n")
648       fd.close()
649       self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
650
651   def testPartialLines(self):
652     data = ["test %d" % i for i in range(30)]
653     fname = self._CreateTempFile()
654     fd = open(fname, "w")
655     fd.write("\n".join(data))
656     fd.write("\n")
657     fd.close()
658     for i in range(1, 30):
659       self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
660
661   def testBigFile(self):
662     data = ["test %d" % i for i in range(30)]
663     fname = self._CreateTempFile()
664     fd = open(fname, "w")
665     fd.write("X" * 1048576)
666     fd.write("\n")
667     fd.write("\n".join(data))
668     fd.write("\n")
669     fd.close()
670     for i in range(1, 30):
671       self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
672
673
674 class TestPidFileFunctions(unittest.TestCase):
675   """Tests for WritePidFile and ReadPidFile"""
676
677   def setUp(self):
678     self.dir = tempfile.mkdtemp()
679     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
680
681   def testPidFileFunctions(self):
682     pid_file = self.f_dpn('test')
683     fd = utils.WritePidFile(self.f_dpn('test'))
684     self.failUnless(os.path.exists(pid_file),
685                     "PID file should have been created")
686     read_pid = utils.ReadPidFile(pid_file)
687     self.failUnlessEqual(read_pid, os.getpid())
688     self.failUnless(utils.IsProcessAlive(read_pid))
689     self.failUnlessRaises(errors.LockError, utils.WritePidFile,
690                           self.f_dpn('test'))
691     os.close(fd)
692     utils.RemoveFile(self.f_dpn("test"))
693     self.failIf(os.path.exists(pid_file),
694                 "PID file should not exist anymore")
695     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
696                          "ReadPidFile should return 0 for missing pid file")
697     fh = open(pid_file, "w")
698     fh.write("blah\n")
699     fh.close()
700     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
701                          "ReadPidFile should return 0 for invalid pid file")
702     # but now, even with the file existing, we should be able to lock it
703     fd = utils.WritePidFile(self.f_dpn('test'))
704     os.close(fd)
705     utils.RemoveFile(self.f_dpn("test"))
706     self.failIf(os.path.exists(pid_file),
707                 "PID file should not exist anymore")
708
709   def testKill(self):
710     pid_file = self.f_dpn('child')
711     r_fd, w_fd = os.pipe()
712     new_pid = os.fork()
713     if new_pid == 0: #child
714       utils.WritePidFile(self.f_dpn('child'))
715       os.write(w_fd, 'a')
716       signal.pause()
717       os._exit(0)
718       return
719     # else we are in the parent
720     # wait until the child has written the pid file
721     os.read(r_fd, 1)
722     read_pid = utils.ReadPidFile(pid_file)
723     self.failUnlessEqual(read_pid, new_pid)
724     self.failUnless(utils.IsProcessAlive(new_pid))
725     utils.KillProcess(new_pid, waitpid=True)
726     self.failIf(utils.IsProcessAlive(new_pid))
727     utils.RemoveFile(self.f_dpn('child'))
728     self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
729
730   def tearDown(self):
731     shutil.rmtree(self.dir)
732
733
734 class TestSshKeys(testutils.GanetiTestCase):
735   """Test case for the AddAuthorizedKey function"""
736
737   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
738   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
739            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
740
741   def setUp(self):
742     testutils.GanetiTestCase.setUp(self)
743     self.tmpname = self._CreateTempFile()
744     handle = open(self.tmpname, 'w')
745     try:
746       handle.write("%s\n" % TestSshKeys.KEY_A)
747       handle.write("%s\n" % TestSshKeys.KEY_B)
748     finally:
749       handle.close()
750
751   def testAddingNewKey(self):
752     utils.AddAuthorizedKey(self.tmpname,
753                            'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
754
755     self.assertFileContent(self.tmpname,
756       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
757       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
758       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
759       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
760
761   def testAddingAlmostButNotCompletelyTheSameKey(self):
762     utils.AddAuthorizedKey(self.tmpname,
763         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
764
765     self.assertFileContent(self.tmpname,
766       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
767       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
768       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
769       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
770
771   def testAddingExistingKeyWithSomeMoreSpaces(self):
772     utils.AddAuthorizedKey(self.tmpname,
773         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
774
775     self.assertFileContent(self.tmpname,
776       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
777       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
778       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
779
780   def testRemovingExistingKeyWithSomeMoreSpaces(self):
781     utils.RemoveAuthorizedKey(self.tmpname,
782         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
783
784     self.assertFileContent(self.tmpname,
785       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
786       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
787
788   def testRemovingNonExistingKey(self):
789     utils.RemoveAuthorizedKey(self.tmpname,
790         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
791
792     self.assertFileContent(self.tmpname,
793       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
794       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
795       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
796
797
798 class TestNewUUID(unittest.TestCase):
799   """Test case for NewUUID"""
800
801   def runTest(self):
802     self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
803
804
805 if __name__ == "__main__":
806   testutils.GanetiTestProgram()