bash_completion: Enable extglob while parsing file
[ganeti-local] / test / ganeti.utils.io_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.io"""
23
24 import os
25 import tempfile
26 import unittest
27 import shutil
28 import glob
29 import time
30 import signal
31 import stat
32 import errno
33
34 from ganeti import constants
35 from ganeti import utils
36 from ganeti import compat
37 from ganeti import errors
38
39 import testutils
40
41
42 class TestReadFile(testutils.GanetiTestCase):
43   def testReadAll(self):
44     data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45     self.assertEqual(len(data), 814)
46
47     h = compat.md5_hash()
48     h.update(data)
49     self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
50
51   def testReadSize(self):
52     data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
53                           size=100)
54     self.assertEqual(len(data), 100)
55
56     h = compat.md5_hash()
57     h.update(data)
58     self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
59
60   def testCallback(self):
61     def _Cb(fh):
62       self.assertEqual(fh.tell(), 0)
63     data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64     self.assertEqual(len(data), 814)
65
66   def testError(self):
67     self.assertRaises(EnvironmentError, utils.ReadFile,
68                       "/dev/null/does-not-exist")
69
70
71 class TestReadOneLineFile(testutils.GanetiTestCase):
72   def setUp(self):
73     testutils.GanetiTestCase.setUp(self)
74
75   def testDefault(self):
76     data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77     self.assertEqual(len(data), 27)
78     self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
79
80   def testNotStrict(self):
81     data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
82                                  strict=False)
83     self.assertEqual(len(data), 27)
84     self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
85
86   def testStrictFailure(self):
87     self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88                       self._TestDataFilename("cert1.pem"), strict=True)
89
90   def testLongLine(self):
91     dummydata = (1024 * "Hello World! ")
92     myfile = self._CreateTempFile()
93     utils.WriteFile(myfile, data=dummydata)
94     datastrict = utils.ReadOneLineFile(myfile, strict=True)
95     datalax = utils.ReadOneLineFile(myfile, strict=False)
96     self.assertEqual(dummydata, datastrict)
97     self.assertEqual(dummydata, datalax)
98
99   def testNewline(self):
100     myfile = self._CreateTempFile()
101     myline = "myline"
102     for nl in ["", "\n", "\r\n"]:
103       dummydata = "%s%s" % (myline, nl)
104       utils.WriteFile(myfile, data=dummydata)
105       datalax = utils.ReadOneLineFile(myfile, strict=False)
106       self.assertEqual(myline, datalax)
107       datastrict = utils.ReadOneLineFile(myfile, strict=True)
108       self.assertEqual(myline, datastrict)
109
110   def testWhitespaceAndMultipleLines(self):
111     myfile = self._CreateTempFile()
112     for nl in ["", "\n", "\r\n"]:
113       for ws in [" ", "\t", "\t\t  \t", "\t "]:
114         dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115         utils.WriteFile(myfile, data=dummydata)
116         datalax = utils.ReadOneLineFile(myfile, strict=False)
117         if nl:
118           self.assert_(set("\r\n") & set(dummydata))
119           self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
120                             myfile, strict=True)
121           explen = len("Foo bar baz ") + len(ws)
122           self.assertEqual(len(datalax), explen)
123           self.assertEqual(datalax, dummydata[:explen])
124           self.assertFalse(set("\r\n") & set(datalax))
125         else:
126           datastrict = utils.ReadOneLineFile(myfile, strict=True)
127           self.assertEqual(dummydata, datastrict)
128           self.assertEqual(dummydata, datalax)
129
130   def testEmptylines(self):
131     myfile = self._CreateTempFile()
132     myline = "myline"
133     for nl in ["\n", "\r\n"]:
134       for ol in ["", "otherline"]:
135         dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136         utils.WriteFile(myfile, data=dummydata)
137         self.assert_(set("\r\n") & set(dummydata))
138         datalax = utils.ReadOneLineFile(myfile, strict=False)
139         self.assertEqual(myline, datalax)
140         if ol:
141           self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
142                             myfile, strict=True)
143         else:
144           datastrict = utils.ReadOneLineFile(myfile, strict=True)
145           self.assertEqual(myline, datastrict)
146
147   def testEmptyfile(self):
148     myfile = self._CreateTempFile()
149     self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
150
151
152 class TestTimestampForFilename(unittest.TestCase):
153   def test(self):
154     self.assert_("." not in utils.TimestampForFilename())
155     self.assert_(":" not in utils.TimestampForFilename())
156
157
158 class TestCreateBackup(testutils.GanetiTestCase):
159   def setUp(self):
160     testutils.GanetiTestCase.setUp(self)
161
162     self.tmpdir = tempfile.mkdtemp()
163
164   def tearDown(self):
165     testutils.GanetiTestCase.tearDown(self)
166
167     shutil.rmtree(self.tmpdir)
168
169   def testEmpty(self):
170     filename = utils.PathJoin(self.tmpdir, "config.data")
171     utils.WriteFile(filename, data="")
172     bname = utils.CreateBackup(filename)
173     self.assertFileContent(bname, "")
174     self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175     utils.CreateBackup(filename)
176     self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177     utils.CreateBackup(filename)
178     self.assertEqual(len(glob.glob("%s*" % filename)), 4)
179
180     fifoname = utils.PathJoin(self.tmpdir, "fifo")
181     os.mkfifo(fifoname)
182     self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
183
184   def testContent(self):
185     bkpcount = 0
186     for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187       for rep in [1, 2, 10, 127]:
188         testdata = data * rep
189
190         filename = utils.PathJoin(self.tmpdir, "test.data_")
191         utils.WriteFile(filename, data=testdata)
192         self.assertFileContent(filename, testdata)
193
194         for _ in range(3):
195           bname = utils.CreateBackup(filename)
196           bkpcount += 1
197           self.assertFileContent(bname, testdata)
198           self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
199
200
201 class TestListVisibleFiles(unittest.TestCase):
202   """Test case for ListVisibleFiles"""
203
204   def setUp(self):
205     self.path = tempfile.mkdtemp()
206
207   def tearDown(self):
208     shutil.rmtree(self.path)
209
210   def _CreateFiles(self, files):
211     for name in files:
212       utils.WriteFile(os.path.join(self.path, name), data="test")
213
214   def _test(self, files, expected):
215     self._CreateFiles(files)
216     found = utils.ListVisibleFiles(self.path)
217     self.assertEqual(set(found), set(expected))
218
219   def testAllVisible(self):
220     files = ["a", "b", "c"]
221     expected = files
222     self._test(files, expected)
223
224   def testNoneVisible(self):
225     files = [".a", ".b", ".c"]
226     expected = []
227     self._test(files, expected)
228
229   def testSomeVisible(self):
230     files = ["a", "b", ".c"]
231     expected = ["a", "b"]
232     self._test(files, expected)
233
234   def testNonAbsolutePath(self):
235     self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
236                           "abc")
237
238   def testNonNormalizedPath(self):
239     self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
240                           "/bin/../tmp")
241
242   def testMountpoint(self):
243     lvfmp_fn = compat.partial(utils.ListVisibleFiles,
244                               _is_mountpoint=lambda _: True)
245     self.assertEqual(lvfmp_fn(self.path), [])
246
247     # Create "lost+found" as a regular file
248     self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
249     self.assertEqual(set(lvfmp_fn(self.path)),
250                      set(["foo", "bar", "lost+found"]))
251
252     # Replace "lost+found" with a directory
253     laf_path = utils.PathJoin(self.path, "lost+found")
254     utils.RemoveFile(laf_path)
255     os.mkdir(laf_path)
256     self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
257
258   def testLostAndFoundNoMountpoint(self):
259     files = ["foo", "bar", ".Hello World", "lost+found"]
260     expected = ["foo", "bar", "lost+found"]
261     self._test(files, expected)
262
263
264 class TestWriteFile(testutils.GanetiTestCase):
265   def setUp(self):
266     testutils.GanetiTestCase.setUp(self)
267     self.tmpdir = None
268     self.tfile = tempfile.NamedTemporaryFile()
269     self.did_pre = False
270     self.did_post = False
271     self.did_write = False
272
273   def tearDown(self):
274     testutils.GanetiTestCase.tearDown(self)
275     if self.tmpdir:
276       shutil.rmtree(self.tmpdir)
277
278   def markPre(self, fd):
279     self.did_pre = True
280
281   def markPost(self, fd):
282     self.did_post = True
283
284   def markWrite(self, fd):
285     self.did_write = True
286
287   def testWrite(self):
288     data = "abc"
289     utils.WriteFile(self.tfile.name, data=data)
290     self.assertEqual(utils.ReadFile(self.tfile.name), data)
291
292   def testWriteSimpleUnicode(self):
293     data = u"abc"
294     utils.WriteFile(self.tfile.name, data=data)
295     self.assertEqual(utils.ReadFile(self.tfile.name), data)
296
297   def testErrors(self):
298     self.assertRaises(errors.ProgrammerError, utils.WriteFile,
299                       self.tfile.name, data="test", fn=lambda fd: None)
300     self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
301     self.assertRaises(errors.ProgrammerError, utils.WriteFile,
302                       self.tfile.name, data="test", atime=0)
303     self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
304                       mode=0400, keep_perms=utils.KP_ALWAYS)
305     self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
306                       uid=0, keep_perms=utils.KP_ALWAYS)
307     self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
308                       gid=0, keep_perms=utils.KP_ALWAYS)
309     self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
310                       mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
311
312   def testPreWrite(self):
313     utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
314     self.assertTrue(self.did_pre)
315     self.assertFalse(self.did_post)
316     self.assertFalse(self.did_write)
317
318   def testPostWrite(self):
319     utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
320     self.assertFalse(self.did_pre)
321     self.assertTrue(self.did_post)
322     self.assertFalse(self.did_write)
323
324   def testWriteFunction(self):
325     utils.WriteFile(self.tfile.name, fn=self.markWrite)
326     self.assertFalse(self.did_pre)
327     self.assertFalse(self.did_post)
328     self.assertTrue(self.did_write)
329
330   def testDryRun(self):
331     orig = "abc"
332     self.tfile.write(orig)
333     self.tfile.flush()
334     utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
335     self.assertEqual(utils.ReadFile(self.tfile.name), orig)
336
337   def testTimes(self):
338     f = self.tfile.name
339     for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
340                    (int(time.time()), 5000)]:
341       utils.WriteFile(f, data="hello", atime=at, mtime=mt)
342       st = os.stat(f)
343       self.assertEqual(st.st_atime, at)
344       self.assertEqual(st.st_mtime, mt)
345
346   def testNoClose(self):
347     data = "hello"
348     self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
349     fd = utils.WriteFile(self.tfile.name, data=data, close=False)
350     try:
351       os.lseek(fd, 0, 0)
352       self.assertEqual(os.read(fd, 4096), data)
353     finally:
354       os.close(fd)
355
356   def testNoLeftovers(self):
357     self.tmpdir = tempfile.mkdtemp()
358     self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
359                                      data="abc"),
360                      None)
361     self.assertEqual(os.listdir(self.tmpdir), ["test"])
362
363   def testFailRename(self):
364     self.tmpdir = tempfile.mkdtemp()
365     target = utils.PathJoin(self.tmpdir, "target")
366     os.mkdir(target)
367     self.assertRaises(OSError, utils.WriteFile, target, data="abc")
368     self.assertTrue(os.path.isdir(target))
369     self.assertEqual(os.listdir(self.tmpdir), ["target"])
370     self.assertFalse(os.listdir(target))
371
372   def testFailRenameDryRun(self):
373     self.tmpdir = tempfile.mkdtemp()
374     target = utils.PathJoin(self.tmpdir, "target")
375     os.mkdir(target)
376     self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
377     self.assertTrue(os.path.isdir(target))
378     self.assertEqual(os.listdir(self.tmpdir), ["target"])
379     self.assertFalse(os.listdir(target))
380
381   def testBackup(self):
382     self.tmpdir = tempfile.mkdtemp()
383     testfile = utils.PathJoin(self.tmpdir, "test")
384
385     self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
386     self.assertEqual(utils.ReadFile(testfile), "foo")
387     self.assertEqual(os.listdir(self.tmpdir), ["test"])
388
389     # Write again
390     assert os.path.isfile(testfile)
391     self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
392     self.assertEqual(utils.ReadFile(testfile), "bar")
393     self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
394     self.assertTrue("test" in os.listdir(self.tmpdir))
395     self.assertEqual(len(os.listdir(self.tmpdir)), 2)
396
397     # Write again as dry-run
398     assert os.path.isfile(testfile)
399     self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
400                                      dry_run=True),
401                      None)
402     self.assertEqual(utils.ReadFile(testfile), "bar")
403     self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
404     self.assertTrue("test" in os.listdir(self.tmpdir))
405     self.assertEqual(len(os.listdir(self.tmpdir)), 2)
406
407   def testFileMode(self):
408     self.tmpdir = tempfile.mkdtemp()
409     target = utils.PathJoin(self.tmpdir, "target")
410     self.assertRaises(OSError, utils.WriteFile, target, data="data",
411                       keep_perms=utils.KP_ALWAYS)
412     # All masks have only user bits set, to avoid interactions with umask
413     utils.WriteFile(target, data="data", mode=0200)
414     self.assertFileMode(target, 0200)
415     utils.WriteFile(target, data="data", mode=0400,
416                     keep_perms=utils.KP_IF_EXISTS)
417     self.assertFileMode(target, 0200)
418     utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
419     self.assertFileMode(target, 0200)
420     utils.WriteFile(target, data="data", mode=0700)
421     self.assertFileMode(target, 0700)
422
423   def testNewFileMode(self):
424     self.tmpdir = tempfile.mkdtemp()
425     target = utils.PathJoin(self.tmpdir, "target")
426     utils.WriteFile(target, data="data", mode=0400,
427                     keep_perms=utils.KP_IF_EXISTS)
428     self.assertFileMode(target, 0400)
429
430 class TestFileID(testutils.GanetiTestCase):
431   def testEquality(self):
432     name = self._CreateTempFile()
433     oldi = utils.GetFileID(path=name)
434     self.failUnless(utils.VerifyFileID(oldi, oldi))
435
436   def testUpdate(self):
437     name = self._CreateTempFile()
438     oldi = utils.GetFileID(path=name)
439     fd = os.open(name, os.O_RDWR)
440     try:
441       newi = utils.GetFileID(fd=fd)
442       self.failUnless(utils.VerifyFileID(oldi, newi))
443       self.failUnless(utils.VerifyFileID(newi, oldi))
444     finally:
445       os.close(fd)
446
447   def testWriteFile(self):
448     name = self._CreateTempFile()
449     oldi = utils.GetFileID(path=name)
450     mtime = oldi[2]
451     os.utime(name, (mtime + 10, mtime + 10))
452     self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
453                       oldi, data="")
454     os.utime(name, (mtime - 10, mtime - 10))
455     utils.SafeWriteFile(name, oldi, data="")
456     oldi = utils.GetFileID(path=name)
457     mtime = oldi[2]
458     os.utime(name, (mtime + 10, mtime + 10))
459     # this doesn't raise, since we passed None
460     utils.SafeWriteFile(name, None, data="")
461
462   def testError(self):
463     t = tempfile.NamedTemporaryFile()
464     self.assertRaises(errors.ProgrammerError, utils.GetFileID,
465                       path=t.name, fd=t.fileno())
466
467
468 class TestRemoveFile(unittest.TestCase):
469   """Test case for the RemoveFile function"""
470
471   def setUp(self):
472     """Create a temp dir and file for each case"""
473     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
474     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
475     os.close(fd)
476
477   def tearDown(self):
478     if os.path.exists(self.tmpfile):
479       os.unlink(self.tmpfile)
480     os.rmdir(self.tmpdir)
481
482   def testIgnoreDirs(self):
483     """Test that RemoveFile() ignores directories"""
484     self.assertEqual(None, utils.RemoveFile(self.tmpdir))
485
486   def testIgnoreNotExisting(self):
487     """Test that RemoveFile() ignores non-existing files"""
488     utils.RemoveFile(self.tmpfile)
489     utils.RemoveFile(self.tmpfile)
490
491   def testRemoveFile(self):
492     """Test that RemoveFile does remove a file"""
493     utils.RemoveFile(self.tmpfile)
494     if os.path.exists(self.tmpfile):
495       self.fail("File '%s' not removed" % self.tmpfile)
496
497   def testRemoveSymlink(self):
498     """Test that RemoveFile does remove symlinks"""
499     symlink = self.tmpdir + "/symlink"
500     os.symlink("no-such-file", symlink)
501     utils.RemoveFile(symlink)
502     if os.path.exists(symlink):
503       self.fail("File '%s' not removed" % symlink)
504     os.symlink(self.tmpfile, symlink)
505     utils.RemoveFile(symlink)
506     if os.path.exists(symlink):
507       self.fail("File '%s' not removed" % symlink)
508
509
510 class TestRemoveDir(unittest.TestCase):
511   def setUp(self):
512     self.tmpdir = tempfile.mkdtemp()
513
514   def tearDown(self):
515     try:
516       shutil.rmtree(self.tmpdir)
517     except EnvironmentError:
518       pass
519
520   def testEmptyDir(self):
521     utils.RemoveDir(self.tmpdir)
522     self.assertFalse(os.path.isdir(self.tmpdir))
523
524   def testNonEmptyDir(self):
525     self.tmpfile = os.path.join(self.tmpdir, "test1")
526     open(self.tmpfile, "w").close()
527     self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
528
529
530 class TestRename(unittest.TestCase):
531   """Test case for RenameFile"""
532
533   def setUp(self):
534     """Create a temporary directory"""
535     self.tmpdir = tempfile.mkdtemp()
536     self.tmpfile = os.path.join(self.tmpdir, "test1")
537
538     # Touch the file
539     open(self.tmpfile, "w").close()
540
541   def tearDown(self):
542     """Remove temporary directory"""
543     shutil.rmtree(self.tmpdir)
544
545   def testSimpleRename1(self):
546     """Simple rename 1"""
547     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
548     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
549
550   def testSimpleRename2(self):
551     """Simple rename 2"""
552     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
553                      mkdir=True)
554     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
555
556   def testRenameMkdir(self):
557     """Rename with mkdir"""
558     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
559                      mkdir=True)
560     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
561     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
562
563     self.assertRaises(EnvironmentError, utils.RenameFile,
564                       os.path.join(self.tmpdir, "test/xyz"),
565                       os.path.join(self.tmpdir, "test/foo/bar/baz"),
566                       mkdir=True)
567
568     self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
569     self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
570     self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
571                                                  "test/foo/bar/baz")))
572
573
574 class TestMakedirs(unittest.TestCase):
575   def setUp(self):
576     self.tmpdir = tempfile.mkdtemp()
577
578   def tearDown(self):
579     shutil.rmtree(self.tmpdir)
580
581   def testNonExisting(self):
582     path = utils.PathJoin(self.tmpdir, "foo")
583     utils.Makedirs(path)
584     self.assert_(os.path.isdir(path))
585
586   def testExisting(self):
587     path = utils.PathJoin(self.tmpdir, "foo")
588     os.mkdir(path)
589     utils.Makedirs(path)
590     self.assert_(os.path.isdir(path))
591
592   def testRecursiveNonExisting(self):
593     path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
594     utils.Makedirs(path)
595     self.assert_(os.path.isdir(path))
596
597   def testRecursiveExisting(self):
598     path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
599     self.assertFalse(os.path.exists(path))
600     os.mkdir(utils.PathJoin(self.tmpdir, "B"))
601     utils.Makedirs(path)
602     self.assert_(os.path.isdir(path))
603
604
605 class TestEnsureDirs(unittest.TestCase):
606   """Tests for EnsureDirs"""
607
608   def setUp(self):
609     self.dir = tempfile.mkdtemp()
610     self.old_umask = os.umask(0777)
611
612   def testEnsureDirs(self):
613     utils.EnsureDirs([
614         (utils.PathJoin(self.dir, "foo"), 0777),
615         (utils.PathJoin(self.dir, "bar"), 0000),
616         ])
617     self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
618     self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
619
620   def tearDown(self):
621     os.rmdir(utils.PathJoin(self.dir, "foo"))
622     os.rmdir(utils.PathJoin(self.dir, "bar"))
623     os.rmdir(self.dir)
624     os.umask(self.old_umask)
625
626
627 class TestIsNormAbsPath(unittest.TestCase):
628   """Testing case for IsNormAbsPath"""
629
630   def _pathTestHelper(self, path, result):
631     if result:
632       self.assert_(utils.IsNormAbsPath(path),
633           "Path %s should result absolute and normalized" % path)
634     else:
635       self.assertFalse(utils.IsNormAbsPath(path),
636           "Path %s should not result absolute and normalized" % path)
637
638   def testBase(self):
639     self._pathTestHelper("/etc", True)
640     self._pathTestHelper("/srv", True)
641     self._pathTestHelper("etc", False)
642     self._pathTestHelper("/etc/../root", False)
643     self._pathTestHelper("/etc/", False)
644
645
646 class TestIsBelowDir(unittest.TestCase):
647   """Testing case for IsBelowDir"""
648
649   def testSamePrefix(self):
650     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
651     self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
652
653   def testSamePrefixButDifferentDir(self):
654     self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
655     self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
656
657   def testSamePrefixButDirTraversal(self):
658     self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
659     self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
660
661   def testSamePrefixAndTraversal(self):
662     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
663     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
664     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
665
666   def testBothAbsPath(self):
667     self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
668     self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
669     self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
670
671
672 class TestPathJoin(unittest.TestCase):
673   """Testing case for PathJoin"""
674
675   def testBasicItems(self):
676     mlist = ["/a", "b", "c"]
677     self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
678
679   def testNonAbsPrefix(self):
680     self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
681
682   def testBackTrack(self):
683     self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
684
685   def testMultiAbs(self):
686     self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
687
688
689 class TestTailFile(testutils.GanetiTestCase):
690   """Test case for the TailFile function"""
691
692   def testEmpty(self):
693     fname = self._CreateTempFile()
694     self.failUnlessEqual(utils.TailFile(fname), [])
695     self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
696
697   def testAllLines(self):
698     data = ["test %d" % i for i in range(30)]
699     for i in range(30):
700       fname = self._CreateTempFile()
701       fd = open(fname, "w")
702       fd.write("\n".join(data[:i]))
703       if i > 0:
704         fd.write("\n")
705       fd.close()
706       self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
707
708   def testPartialLines(self):
709     data = ["test %d" % i for i in range(30)]
710     fname = self._CreateTempFile()
711     fd = open(fname, "w")
712     fd.write("\n".join(data))
713     fd.write("\n")
714     fd.close()
715     for i in range(1, 30):
716       self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
717
718   def testBigFile(self):
719     data = ["test %d" % i for i in range(30)]
720     fname = self._CreateTempFile()
721     fd = open(fname, "w")
722     fd.write("X" * 1048576)
723     fd.write("\n")
724     fd.write("\n".join(data))
725     fd.write("\n")
726     fd.close()
727     for i in range(1, 30):
728       self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
729
730
731 class TestPidFileFunctions(unittest.TestCase):
732   """Tests for WritePidFile and ReadPidFile"""
733
734   def setUp(self):
735     self.dir = tempfile.mkdtemp()
736     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
737
738   def testPidFileFunctions(self):
739     pid_file = self.f_dpn('test')
740     fd = utils.WritePidFile(self.f_dpn('test'))
741     self.failUnless(os.path.exists(pid_file),
742                     "PID file should have been created")
743     read_pid = utils.ReadPidFile(pid_file)
744     self.failUnlessEqual(read_pid, os.getpid())
745     self.failUnless(utils.IsProcessAlive(read_pid))
746     self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
747                           self.f_dpn('test'))
748     os.close(fd)
749     utils.RemoveFile(self.f_dpn("test"))
750     self.failIf(os.path.exists(pid_file),
751                 "PID file should not exist anymore")
752     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
753                          "ReadPidFile should return 0 for missing pid file")
754     fh = open(pid_file, "w")
755     fh.write("blah\n")
756     fh.close()
757     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
758                          "ReadPidFile should return 0 for invalid pid file")
759     # but now, even with the file existing, we should be able to lock it
760     fd = utils.WritePidFile(self.f_dpn('test'))
761     os.close(fd)
762     utils.RemoveFile(self.f_dpn("test"))
763     self.failIf(os.path.exists(pid_file),
764                 "PID file should not exist anymore")
765
766   def testKill(self):
767     pid_file = self.f_dpn('child')
768     r_fd, w_fd = os.pipe()
769     new_pid = os.fork()
770     if new_pid == 0: #child
771       utils.WritePidFile(self.f_dpn('child'))
772       os.write(w_fd, 'a')
773       signal.pause()
774       os._exit(0)
775       return
776     # else we are in the parent
777     # wait until the child has written the pid file
778     os.read(r_fd, 1)
779     read_pid = utils.ReadPidFile(pid_file)
780     self.failUnlessEqual(read_pid, new_pid)
781     self.failUnless(utils.IsProcessAlive(new_pid))
782
783     # Try writing to locked file
784     try:
785       utils.WritePidFile(pid_file)
786     except errors.PidFileLockError, err:
787       errmsg = str(err)
788       self.assertTrue(errmsg.endswith(" %s" % new_pid),
789                       msg=("Error message ('%s') didn't contain correct"
790                            " PID (%s)" % (errmsg, new_pid)))
791     else:
792       self.fail("Writing to locked file didn't fail")
793
794     utils.KillProcess(new_pid, waitpid=True)
795     self.failIf(utils.IsProcessAlive(new_pid))
796     utils.RemoveFile(self.f_dpn('child'))
797     self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
798
799   def testExceptionType(self):
800     # Make sure the PID lock error is a subclass of LockError in case some code
801     # depends on it
802     self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
803
804   def tearDown(self):
805     shutil.rmtree(self.dir)
806
807
808 class TestSshKeys(testutils.GanetiTestCase):
809   """Test case for the AddAuthorizedKey function"""
810
811   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
812   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
813            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
814
815   def setUp(self):
816     testutils.GanetiTestCase.setUp(self)
817     self.tmpname = self._CreateTempFile()
818     handle = open(self.tmpname, 'w')
819     try:
820       handle.write("%s\n" % TestSshKeys.KEY_A)
821       handle.write("%s\n" % TestSshKeys.KEY_B)
822     finally:
823       handle.close()
824
825   def testAddingNewKey(self):
826     utils.AddAuthorizedKey(self.tmpname,
827                            'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
828
829     self.assertFileContent(self.tmpname,
830       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
831       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
832       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
833       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
834
835   def testAddingAlmostButNotCompletelyTheSameKey(self):
836     utils.AddAuthorizedKey(self.tmpname,
837         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
838
839     self.assertFileContent(self.tmpname,
840       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
841       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
842       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
843       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
844
845   def testAddingExistingKeyWithSomeMoreSpaces(self):
846     utils.AddAuthorizedKey(self.tmpname,
847         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
848
849     self.assertFileContent(self.tmpname,
850       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
851       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
852       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
853
854   def testRemovingExistingKeyWithSomeMoreSpaces(self):
855     utils.RemoveAuthorizedKey(self.tmpname,
856         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
857
858     self.assertFileContent(self.tmpname,
859       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
860       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
861
862   def testRemovingNonExistingKey(self):
863     utils.RemoveAuthorizedKey(self.tmpname,
864         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
865
866     self.assertFileContent(self.tmpname,
867       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
868       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
869       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
870
871
872 class TestNewUUID(unittest.TestCase):
873   """Test case for NewUUID"""
874
875   def runTest(self):
876     self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
877
878
879 def _MockStatResult(cb, mode, uid, gid):
880   def _fn(path):
881     if cb:
882       cb()
883     return {
884       stat.ST_MODE: mode,
885       stat.ST_UID: uid,
886       stat.ST_GID: gid,
887       }
888   return _fn
889
890
891 def _RaiseNoEntError():
892   raise EnvironmentError(errno.ENOENT, "not found")
893
894
895 def _OtherStatRaise():
896   raise EnvironmentError()
897
898
899 class TestPermissionEnforcements(unittest.TestCase):
900   UID_A = 16024
901   UID_B = 25850
902   GID_A = 14028
903   GID_B = 29801
904
905   def setUp(self):
906     self._chown_calls = []
907     self._chmod_calls = []
908     self._mkdir_calls = []
909
910   def tearDown(self):
911     self.assertRaises(IndexError, self._mkdir_calls.pop)
912     self.assertRaises(IndexError, self._chmod_calls.pop)
913     self.assertRaises(IndexError, self._chown_calls.pop)
914
915   def _FakeMkdir(self, path):
916     self._mkdir_calls.append(path)
917
918   def _FakeChown(self, path, uid, gid):
919     self._chown_calls.append((path, uid, gid))
920
921   def _ChmodWrapper(self, cb):
922     def _fn(path, mode):
923       self._chmod_calls.append((path, mode))
924       if cb:
925         cb()
926     return _fn
927
928   def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
929     self.assertEqual(path, "/ganeti-qa-non-test")
930     self.assertEqual(mode, 0700)
931     self.assertEqual(uid, self.UID_A)
932     self.assertEqual(gid, self.GID_A)
933
934   def testMakeDirWithPerm(self):
935     is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
936     utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
937                           _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
938
939   def testDirErrors(self):
940     self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
941                       "/ganeti-qa-non-test", 0700, 0, 0,
942                       _lstat_fn=_MockStatResult(None, 0, 0, 0))
943     self.assertRaises(IndexError, self._mkdir_calls.pop)
944
945     other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
946     self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
947                       "/ganeti-qa-non-test", 0700, 0, 0,
948                       _lstat_fn=other_stat_raise)
949     self.assertRaises(IndexError, self._mkdir_calls.pop)
950
951     non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
952     utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
953                           _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
954                           _perm_fn=self._VerifyPerm)
955     self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
956
957   def testEnforcePermissionNoEnt(self):
958     self.assertRaises(errors.GenericError, utils.EnforcePermission,
959                       "/ganeti-qa-non-test", 0600,
960                       _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
961                       _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
962
963   def testEnforcePermissionNoEntMustNotExist(self):
964     utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
965                             _chmod_fn=NotImplemented,
966                             _chown_fn=NotImplemented,
967                             _stat_fn=_MockStatResult(_RaiseNoEntError,
968                                                           0, 0, 0))
969
970   def testEnforcePermissionOtherErrorMustNotExist(self):
971     self.assertRaises(errors.GenericError, utils.EnforcePermission,
972                       "/ganeti-qa-non-test", 0600, must_exist=False,
973                       _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
974                       _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
975
976   def testEnforcePermissionNoChanges(self):
977     utils.EnforcePermission("/ganeti-qa-non-test", 0600,
978                             _stat_fn=_MockStatResult(None, 0600, 0, 0),
979                             _chmod_fn=self._ChmodWrapper(None),
980                             _chown_fn=self._FakeChown)
981
982   def testEnforcePermissionChangeMode(self):
983     utils.EnforcePermission("/ganeti-qa-non-test", 0444,
984                             _stat_fn=_MockStatResult(None, 0600, 0, 0),
985                             _chmod_fn=self._ChmodWrapper(None),
986                             _chown_fn=self._FakeChown)
987     self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
988
989   def testEnforcePermissionSetUidGid(self):
990     utils.EnforcePermission("/ganeti-qa-non-test", 0600,
991                             uid=self.UID_B, gid=self.GID_B,
992                             _stat_fn=_MockStatResult(None, 0600,
993                                                      self.UID_A,
994                                                      self.GID_A),
995                             _chmod_fn=self._ChmodWrapper(None),
996                             _chown_fn=self._FakeChown)
997     self.assertEqual(self._chown_calls.pop(0),
998                      ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
999
1000
1001 if __name__ == "__main__":
1002   testutils.GanetiTestProgram()