Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ b81b3c96

History | View | Annotate | Download (30.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31
import stat
32
import errno
33

    
34
from ganeti import constants
35
from ganeti import utils
36
from ganeti import compat
37
from ganeti import errors
38

    
39
import testutils
40

    
41

    
42
class TestReadFile(testutils.GanetiTestCase):
43
  def testReadAll(self):
44
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45
    self.assertEqual(len(data), 814)
46

    
47
    h = compat.md5_hash()
48
    h.update(data)
49
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
50

    
51
  def testReadSize(self):
52
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
53
                          size=100)
54
    self.assertEqual(len(data), 100)
55

    
56
    h = compat.md5_hash()
57
    h.update(data)
58
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
59

    
60
  def testCallback(self):
61
    def _Cb(fh):
62
      self.assertEqual(fh.tell(), 0)
63
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64
    self.assertEqual(len(data), 814)
65

    
66
  def testError(self):
67
    self.assertRaises(EnvironmentError, utils.ReadFile,
68
                      "/dev/null/does-not-exist")
69

    
70

    
71
class TestReadOneLineFile(testutils.GanetiTestCase):
72
  def setUp(self):
73
    testutils.GanetiTestCase.setUp(self)
74

    
75
  def testDefault(self):
76
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77
    self.assertEqual(len(data), 27)
78
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
79

    
80
  def testNotStrict(self):
81
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
82
                                 strict=False)
83
    self.assertEqual(len(data), 27)
84
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
85

    
86
  def testStrictFailure(self):
87
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88
                      self._TestDataFilename("cert1.pem"), strict=True)
89

    
90
  def testLongLine(self):
91
    dummydata = (1024 * "Hello World! ")
92
    myfile = self._CreateTempFile()
93
    utils.WriteFile(myfile, data=dummydata)
94
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
95
    datalax = utils.ReadOneLineFile(myfile, strict=False)
96
    self.assertEqual(dummydata, datastrict)
97
    self.assertEqual(dummydata, datalax)
98

    
99
  def testNewline(self):
100
    myfile = self._CreateTempFile()
101
    myline = "myline"
102
    for nl in ["", "\n", "\r\n"]:
103
      dummydata = "%s%s" % (myline, nl)
104
      utils.WriteFile(myfile, data=dummydata)
105
      datalax = utils.ReadOneLineFile(myfile, strict=False)
106
      self.assertEqual(myline, datalax)
107
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
108
      self.assertEqual(myline, datastrict)
109

    
110
  def testWhitespaceAndMultipleLines(self):
111
    myfile = self._CreateTempFile()
112
    for nl in ["", "\n", "\r\n"]:
113
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
114
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115
        utils.WriteFile(myfile, data=dummydata)
116
        datalax = utils.ReadOneLineFile(myfile, strict=False)
117
        if nl:
118
          self.assert_(set("\r\n") & set(dummydata))
119
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
120
                            myfile, strict=True)
121
          explen = len("Foo bar baz ") + len(ws)
122
          self.assertEqual(len(datalax), explen)
123
          self.assertEqual(datalax, dummydata[:explen])
124
          self.assertFalse(set("\r\n") & set(datalax))
125
        else:
126
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
127
          self.assertEqual(dummydata, datastrict)
128
          self.assertEqual(dummydata, datalax)
129

    
130
  def testEmptylines(self):
131
    myfile = self._CreateTempFile()
132
    myline = "myline"
133
    for nl in ["\n", "\r\n"]:
134
      for ol in ["", "otherline"]:
135
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136
        utils.WriteFile(myfile, data=dummydata)
137
        self.assert_(set("\r\n") & set(dummydata))
138
        datalax = utils.ReadOneLineFile(myfile, strict=False)
139
        self.assertEqual(myline, datalax)
140
        if ol:
141
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
142
                            myfile, strict=True)
143
        else:
144
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
145
          self.assertEqual(myline, datastrict)
146

    
147
  def testEmptyfile(self):
148
    myfile = self._CreateTempFile()
149
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
150

    
151

    
152
class TestTimestampForFilename(unittest.TestCase):
153
  def test(self):
154
    self.assert_("." not in utils.TimestampForFilename())
155
    self.assert_(":" not in utils.TimestampForFilename())
156

    
157

    
158
class TestCreateBackup(testutils.GanetiTestCase):
159
  def setUp(self):
160
    testutils.GanetiTestCase.setUp(self)
161

    
162
    self.tmpdir = tempfile.mkdtemp()
163

    
164
  def tearDown(self):
165
    testutils.GanetiTestCase.tearDown(self)
166

    
167
    shutil.rmtree(self.tmpdir)
168

    
169
  def testEmpty(self):
170
    filename = utils.PathJoin(self.tmpdir, "config.data")
171
    utils.WriteFile(filename, data="")
172
    bname = utils.CreateBackup(filename)
173
    self.assertFileContent(bname, "")
174
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175
    utils.CreateBackup(filename)
176
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177
    utils.CreateBackup(filename)
178
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
179

    
180
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
181
    os.mkfifo(fifoname)
182
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
183

    
184
  def testContent(self):
185
    bkpcount = 0
186
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187
      for rep in [1, 2, 10, 127]:
188
        testdata = data * rep
189

    
190
        filename = utils.PathJoin(self.tmpdir, "test.data_")
191
        utils.WriteFile(filename, data=testdata)
192
        self.assertFileContent(filename, testdata)
193

    
194
        for _ in range(3):
195
          bname = utils.CreateBackup(filename)
196
          bkpcount += 1
197
          self.assertFileContent(bname, testdata)
198
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
199

    
200

    
201
class TestListVisibleFiles(unittest.TestCase):
202
  """Test case for ListVisibleFiles"""
203

    
204
  def setUp(self):
205
    self.path = tempfile.mkdtemp()
206

    
207
  def tearDown(self):
208
    shutil.rmtree(self.path)
209

    
210
  def _CreateFiles(self, files):
211
    for name in files:
212
      utils.WriteFile(os.path.join(self.path, name), data="test")
213

    
214
  def _test(self, files, expected):
215
    self._CreateFiles(files)
216
    found = utils.ListVisibleFiles(self.path)
217
    self.assertEqual(set(found), set(expected))
218

    
219
  def testAllVisible(self):
220
    files = ["a", "b", "c"]
221
    expected = files
222
    self._test(files, expected)
223

    
224
  def testNoneVisible(self):
225
    files = [".a", ".b", ".c"]
226
    expected = []
227
    self._test(files, expected)
228

    
229
  def testSomeVisible(self):
230
    files = ["a", "b", ".c"]
231
    expected = ["a", "b"]
232
    self._test(files, expected)
233

    
234
  def testNonAbsolutePath(self):
235
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
236
                          "abc")
237

    
238
  def testNonNormalizedPath(self):
239
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
240
                          "/bin/../tmp")
241

    
242

    
243
class TestWriteFile(unittest.TestCase):
244
  def setUp(self):
245
    self.tmpdir = None
246
    self.tfile = tempfile.NamedTemporaryFile()
247
    self.did_pre = False
248
    self.did_post = False
249
    self.did_write = False
250

    
251
  def tearDown(self):
252
    if self.tmpdir:
253
      shutil.rmtree(self.tmpdir)
254

    
255
  def markPre(self, fd):
256
    self.did_pre = True
257

    
258
  def markPost(self, fd):
259
    self.did_post = True
260

    
261
  def markWrite(self, fd):
262
    self.did_write = True
263

    
264
  def testWrite(self):
265
    data = "abc"
266
    utils.WriteFile(self.tfile.name, data=data)
267
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
268

    
269
  def testWriteSimpleUnicode(self):
270
    data = u"abc"
271
    utils.WriteFile(self.tfile.name, data=data)
272
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
273

    
274
  def testErrors(self):
275
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
276
                      self.tfile.name, data="test", fn=lambda fd: None)
277
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
278
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
279
                      self.tfile.name, data="test", atime=0)
280

    
281
  def testPreWrite(self):
282
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
283
    self.assertTrue(self.did_pre)
284
    self.assertFalse(self.did_post)
285
    self.assertFalse(self.did_write)
286

    
287
  def testPostWrite(self):
288
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
289
    self.assertFalse(self.did_pre)
290
    self.assertTrue(self.did_post)
291
    self.assertFalse(self.did_write)
292

    
293
  def testWriteFunction(self):
294
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
295
    self.assertFalse(self.did_pre)
296
    self.assertFalse(self.did_post)
297
    self.assertTrue(self.did_write)
298

    
299
  def testDryRun(self):
300
    orig = "abc"
301
    self.tfile.write(orig)
302
    self.tfile.flush()
303
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
304
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
305

    
306
  def testTimes(self):
307
    f = self.tfile.name
308
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
309
                   (int(time.time()), 5000)]:
310
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
311
      st = os.stat(f)
312
      self.assertEqual(st.st_atime, at)
313
      self.assertEqual(st.st_mtime, mt)
314

    
315
  def testNoClose(self):
316
    data = "hello"
317
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
318
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
319
    try:
320
      os.lseek(fd, 0, 0)
321
      self.assertEqual(os.read(fd, 4096), data)
322
    finally:
323
      os.close(fd)
324

    
325
  def testNoLeftovers(self):
326
    self.tmpdir = tempfile.mkdtemp()
327
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
328
                                     data="abc"),
329
                     None)
330
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
331

    
332
  def testFailRename(self):
333
    self.tmpdir = tempfile.mkdtemp()
334
    target = utils.PathJoin(self.tmpdir, "target")
335
    os.mkdir(target)
336
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
337
    self.assertTrue(os.path.isdir(target))
338
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
339
    self.assertFalse(os.listdir(target))
340

    
341
  def testFailRenameDryRun(self):
342
    self.tmpdir = tempfile.mkdtemp()
343
    target = utils.PathJoin(self.tmpdir, "target")
344
    os.mkdir(target)
345
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
346
    self.assertTrue(os.path.isdir(target))
347
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
348
    self.assertFalse(os.listdir(target))
349

    
350
  def testBackup(self):
351
    self.tmpdir = tempfile.mkdtemp()
352
    testfile = utils.PathJoin(self.tmpdir, "test")
353

    
354
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
355
    self.assertEqual(utils.ReadFile(testfile), "foo")
356
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
357

    
358
    # Write again
359
    assert os.path.isfile(testfile)
360
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
361
    self.assertEqual(utils.ReadFile(testfile), "bar")
362
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
363
    self.assertTrue("test" in os.listdir(self.tmpdir))
364
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
365

    
366
    # Write again as dry-run
367
    assert os.path.isfile(testfile)
368
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
369
                                     dry_run=True),
370
                     None)
371
    self.assertEqual(utils.ReadFile(testfile), "bar")
372
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
373
    self.assertTrue("test" in os.listdir(self.tmpdir))
374
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
375

    
376

    
377
class TestFileID(testutils.GanetiTestCase):
378
  def testEquality(self):
379
    name = self._CreateTempFile()
380
    oldi = utils.GetFileID(path=name)
381
    self.failUnless(utils.VerifyFileID(oldi, oldi))
382

    
383
  def testUpdate(self):
384
    name = self._CreateTempFile()
385
    oldi = utils.GetFileID(path=name)
386
    os.utime(name, None)
387
    fd = os.open(name, os.O_RDWR)
388
    try:
389
      newi = utils.GetFileID(fd=fd)
390
      self.failUnless(utils.VerifyFileID(oldi, newi))
391
      self.failUnless(utils.VerifyFileID(newi, oldi))
392
    finally:
393
      os.close(fd)
394

    
395
  def testWriteFile(self):
396
    name = self._CreateTempFile()
397
    oldi = utils.GetFileID(path=name)
398
    mtime = oldi[2]
399
    os.utime(name, (mtime + 10, mtime + 10))
400
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
401
                      oldi, data="")
402
    os.utime(name, (mtime - 10, mtime - 10))
403
    utils.SafeWriteFile(name, oldi, data="")
404
    oldi = utils.GetFileID(path=name)
405
    mtime = oldi[2]
406
    os.utime(name, (mtime + 10, mtime + 10))
407
    # this doesn't raise, since we passed None
408
    utils.SafeWriteFile(name, None, data="")
409

    
410
  def testError(self):
411
    t = tempfile.NamedTemporaryFile()
412
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
413
                      path=t.name, fd=t.fileno())
414

    
415

    
416
class TestRemoveFile(unittest.TestCase):
417
  """Test case for the RemoveFile function"""
418

    
419
  def setUp(self):
420
    """Create a temp dir and file for each case"""
421
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
422
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
423
    os.close(fd)
424

    
425
  def tearDown(self):
426
    if os.path.exists(self.tmpfile):
427
      os.unlink(self.tmpfile)
428
    os.rmdir(self.tmpdir)
429

    
430
  def testIgnoreDirs(self):
431
    """Test that RemoveFile() ignores directories"""
432
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
433

    
434
  def testIgnoreNotExisting(self):
435
    """Test that RemoveFile() ignores non-existing files"""
436
    utils.RemoveFile(self.tmpfile)
437
    utils.RemoveFile(self.tmpfile)
438

    
439
  def testRemoveFile(self):
440
    """Test that RemoveFile does remove a file"""
441
    utils.RemoveFile(self.tmpfile)
442
    if os.path.exists(self.tmpfile):
443
      self.fail("File '%s' not removed" % self.tmpfile)
444

    
445
  def testRemoveSymlink(self):
446
    """Test that RemoveFile does remove symlinks"""
447
    symlink = self.tmpdir + "/symlink"
448
    os.symlink("no-such-file", symlink)
449
    utils.RemoveFile(symlink)
450
    if os.path.exists(symlink):
451
      self.fail("File '%s' not removed" % symlink)
452
    os.symlink(self.tmpfile, symlink)
453
    utils.RemoveFile(symlink)
454
    if os.path.exists(symlink):
455
      self.fail("File '%s' not removed" % symlink)
456

    
457

    
458
class TestRemoveDir(unittest.TestCase):
459
  def setUp(self):
460
    self.tmpdir = tempfile.mkdtemp()
461

    
462
  def tearDown(self):
463
    try:
464
      shutil.rmtree(self.tmpdir)
465
    except EnvironmentError:
466
      pass
467

    
468
  def testEmptyDir(self):
469
    utils.RemoveDir(self.tmpdir)
470
    self.assertFalse(os.path.isdir(self.tmpdir))
471

    
472
  def testNonEmptyDir(self):
473
    self.tmpfile = os.path.join(self.tmpdir, "test1")
474
    open(self.tmpfile, "w").close()
475
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
476

    
477

    
478
class TestRename(unittest.TestCase):
479
  """Test case for RenameFile"""
480

    
481
  def setUp(self):
482
    """Create a temporary directory"""
483
    self.tmpdir = tempfile.mkdtemp()
484
    self.tmpfile = os.path.join(self.tmpdir, "test1")
485

    
486
    # Touch the file
487
    open(self.tmpfile, "w").close()
488

    
489
  def tearDown(self):
490
    """Remove temporary directory"""
491
    shutil.rmtree(self.tmpdir)
492

    
493
  def testSimpleRename1(self):
494
    """Simple rename 1"""
495
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
496
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
497

    
498
  def testSimpleRename2(self):
499
    """Simple rename 2"""
500
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
501
                     mkdir=True)
502
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
503

    
504
  def testRenameMkdir(self):
505
    """Rename with mkdir"""
506
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
507
                     mkdir=True)
508
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
509
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
510

    
511
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
512
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
513
                     mkdir=True)
514
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
515
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
516
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
517

    
518

    
519
class TestMakedirs(unittest.TestCase):
520
  def setUp(self):
521
    self.tmpdir = tempfile.mkdtemp()
522

    
523
  def tearDown(self):
524
    shutil.rmtree(self.tmpdir)
525

    
526
  def testNonExisting(self):
527
    path = utils.PathJoin(self.tmpdir, "foo")
528
    utils.Makedirs(path)
529
    self.assert_(os.path.isdir(path))
530

    
531
  def testExisting(self):
532
    path = utils.PathJoin(self.tmpdir, "foo")
533
    os.mkdir(path)
534
    utils.Makedirs(path)
535
    self.assert_(os.path.isdir(path))
536

    
537
  def testRecursiveNonExisting(self):
538
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
539
    utils.Makedirs(path)
540
    self.assert_(os.path.isdir(path))
541

    
542
  def testRecursiveExisting(self):
543
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
544
    self.assertFalse(os.path.exists(path))
545
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
546
    utils.Makedirs(path)
547
    self.assert_(os.path.isdir(path))
548

    
549

    
550
class TestEnsureDirs(unittest.TestCase):
551
  """Tests for EnsureDirs"""
552

    
553
  def setUp(self):
554
    self.dir = tempfile.mkdtemp()
555
    self.old_umask = os.umask(0777)
556

    
557
  def testEnsureDirs(self):
558
    utils.EnsureDirs([
559
        (utils.PathJoin(self.dir, "foo"), 0777),
560
        (utils.PathJoin(self.dir, "bar"), 0000),
561
        ])
562
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
563
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
564

    
565
  def tearDown(self):
566
    os.rmdir(utils.PathJoin(self.dir, "foo"))
567
    os.rmdir(utils.PathJoin(self.dir, "bar"))
568
    os.rmdir(self.dir)
569
    os.umask(self.old_umask)
570

    
571

    
572
class TestIsNormAbsPath(unittest.TestCase):
573
  """Testing case for IsNormAbsPath"""
574

    
575
  def _pathTestHelper(self, path, result):
576
    if result:
577
      self.assert_(utils.IsNormAbsPath(path),
578
          "Path %s should result absolute and normalized" % path)
579
    else:
580
      self.assertFalse(utils.IsNormAbsPath(path),
581
          "Path %s should not result absolute and normalized" % path)
582

    
583
  def testBase(self):
584
    self._pathTestHelper("/etc", True)
585
    self._pathTestHelper("/srv", True)
586
    self._pathTestHelper("etc", False)
587
    self._pathTestHelper("/etc/../root", False)
588
    self._pathTestHelper("/etc/", False)
589

    
590

    
591
class TestIsBelowDir(unittest.TestCase):
592
  """Testing case for IsBelowDir"""
593

    
594
  def testSamePrefix(self):
595
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
596
    self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
597

    
598
  def testSamePrefixButDifferentDir(self):
599
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
600
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
601

    
602
  def testSamePrefixButDirTraversal(self):
603
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
604
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
605

    
606
  def testSamePrefixAndTraversal(self):
607
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
608
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
609
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
610

    
611
  def testBothAbsPath(self):
612
    self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
613
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
614
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
615

    
616

    
617
class TestPathJoin(unittest.TestCase):
618
  """Testing case for PathJoin"""
619

    
620
  def testBasicItems(self):
621
    mlist = ["/a", "b", "c"]
622
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
623

    
624
  def testNonAbsPrefix(self):
625
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
626

    
627
  def testBackTrack(self):
628
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
629

    
630
  def testMultiAbs(self):
631
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
632

    
633

    
634
class TestTailFile(testutils.GanetiTestCase):
635
  """Test case for the TailFile function"""
636

    
637
  def testEmpty(self):
638
    fname = self._CreateTempFile()
639
    self.failUnlessEqual(utils.TailFile(fname), [])
640
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
641

    
642
  def testAllLines(self):
643
    data = ["test %d" % i for i in range(30)]
644
    for i in range(30):
645
      fname = self._CreateTempFile()
646
      fd = open(fname, "w")
647
      fd.write("\n".join(data[:i]))
648
      if i > 0:
649
        fd.write("\n")
650
      fd.close()
651
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
652

    
653
  def testPartialLines(self):
654
    data = ["test %d" % i for i in range(30)]
655
    fname = self._CreateTempFile()
656
    fd = open(fname, "w")
657
    fd.write("\n".join(data))
658
    fd.write("\n")
659
    fd.close()
660
    for i in range(1, 30):
661
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
662

    
663
  def testBigFile(self):
664
    data = ["test %d" % i for i in range(30)]
665
    fname = self._CreateTempFile()
666
    fd = open(fname, "w")
667
    fd.write("X" * 1048576)
668
    fd.write("\n")
669
    fd.write("\n".join(data))
670
    fd.write("\n")
671
    fd.close()
672
    for i in range(1, 30):
673
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
674

    
675

    
676
class TestPidFileFunctions(unittest.TestCase):
677
  """Tests for WritePidFile and ReadPidFile"""
678

    
679
  def setUp(self):
680
    self.dir = tempfile.mkdtemp()
681
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
682

    
683
  def testPidFileFunctions(self):
684
    pid_file = self.f_dpn('test')
685
    fd = utils.WritePidFile(self.f_dpn('test'))
686
    self.failUnless(os.path.exists(pid_file),
687
                    "PID file should have been created")
688
    read_pid = utils.ReadPidFile(pid_file)
689
    self.failUnlessEqual(read_pid, os.getpid())
690
    self.failUnless(utils.IsProcessAlive(read_pid))
691
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
692
                          self.f_dpn('test'))
693
    os.close(fd)
694
    utils.RemoveFile(self.f_dpn("test"))
695
    self.failIf(os.path.exists(pid_file),
696
                "PID file should not exist anymore")
697
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
698
                         "ReadPidFile should return 0 for missing pid file")
699
    fh = open(pid_file, "w")
700
    fh.write("blah\n")
701
    fh.close()
702
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
703
                         "ReadPidFile should return 0 for invalid pid file")
704
    # but now, even with the file existing, we should be able to lock it
705
    fd = utils.WritePidFile(self.f_dpn('test'))
706
    os.close(fd)
707
    utils.RemoveFile(self.f_dpn("test"))
708
    self.failIf(os.path.exists(pid_file),
709
                "PID file should not exist anymore")
710

    
711
  def testKill(self):
712
    pid_file = self.f_dpn('child')
713
    r_fd, w_fd = os.pipe()
714
    new_pid = os.fork()
715
    if new_pid == 0: #child
716
      utils.WritePidFile(self.f_dpn('child'))
717
      os.write(w_fd, 'a')
718
      signal.pause()
719
      os._exit(0)
720
      return
721
    # else we are in the parent
722
    # wait until the child has written the pid file
723
    os.read(r_fd, 1)
724
    read_pid = utils.ReadPidFile(pid_file)
725
    self.failUnlessEqual(read_pid, new_pid)
726
    self.failUnless(utils.IsProcessAlive(new_pid))
727
    utils.KillProcess(new_pid, waitpid=True)
728
    self.failIf(utils.IsProcessAlive(new_pid))
729
    utils.RemoveFile(self.f_dpn('child'))
730
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
731

    
732
  def tearDown(self):
733
    shutil.rmtree(self.dir)
734

    
735

    
736
class TestSshKeys(testutils.GanetiTestCase):
737
  """Test case for the AddAuthorizedKey function"""
738

    
739
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
740
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
741
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
742

    
743
  def setUp(self):
744
    testutils.GanetiTestCase.setUp(self)
745
    self.tmpname = self._CreateTempFile()
746
    handle = open(self.tmpname, 'w')
747
    try:
748
      handle.write("%s\n" % TestSshKeys.KEY_A)
749
      handle.write("%s\n" % TestSshKeys.KEY_B)
750
    finally:
751
      handle.close()
752

    
753
  def testAddingNewKey(self):
754
    utils.AddAuthorizedKey(self.tmpname,
755
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
756

    
757
    self.assertFileContent(self.tmpname,
758
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
759
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
760
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
761
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
762

    
763
  def testAddingAlmostButNotCompletelyTheSameKey(self):
764
    utils.AddAuthorizedKey(self.tmpname,
765
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
766

    
767
    self.assertFileContent(self.tmpname,
768
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
769
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
770
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
771
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
772

    
773
  def testAddingExistingKeyWithSomeMoreSpaces(self):
774
    utils.AddAuthorizedKey(self.tmpname,
775
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
776

    
777
    self.assertFileContent(self.tmpname,
778
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
779
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
780
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
781

    
782
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
783
    utils.RemoveAuthorizedKey(self.tmpname,
784
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
785

    
786
    self.assertFileContent(self.tmpname,
787
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
788
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
789

    
790
  def testRemovingNonExistingKey(self):
791
    utils.RemoveAuthorizedKey(self.tmpname,
792
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
793

    
794
    self.assertFileContent(self.tmpname,
795
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
796
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
797
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
798

    
799

    
800
class TestNewUUID(unittest.TestCase):
801
  """Test case for NewUUID"""
802

    
803
  def runTest(self):
804
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
805

    
806

    
807
def _MockStatResult(cb, mode, uid, gid):
808
  def _fn(path):
809
    if cb:
810
      cb()
811
    return {
812
      stat.ST_MODE: mode,
813
      stat.ST_UID: uid,
814
      stat.ST_GID: gid,
815
      }
816
  return _fn
817

    
818

    
819
def _RaiseNoEntError():
820
  raise EnvironmentError(errno.ENOENT, "not found")
821

    
822

    
823
def _OtherStatRaise():
824
  raise EnvironmentError()
825

    
826

    
827
class TestPermissionEnforcements(unittest.TestCase):
828
  UID_A = 16024
829
  UID_B = 25850
830
  GID_A = 14028
831
  GID_B = 29801
832

    
833
  def setUp(self):
834
    self._chown_calls = []
835
    self._chmod_calls = []
836
    self._mkdir_calls = []
837

    
838
  def tearDown(self):
839
    self.assertRaises(IndexError, self._mkdir_calls.pop)
840
    self.assertRaises(IndexError, self._chmod_calls.pop)
841
    self.assertRaises(IndexError, self._chown_calls.pop)
842

    
843
  def _FakeMkdir(self, path):
844
    self._mkdir_calls.append(path)
845

    
846
  def _FakeChown(self, path, uid, gid):
847
    self._chown_calls.append((path, uid, gid))
848

    
849
  def _ChmodWrapper(self, cb):
850
    def _fn(path, mode):
851
      self._chmod_calls.append((path, mode))
852
      if cb:
853
        cb()
854
    return _fn
855

    
856
  def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
857
    self.assertEqual(path, "/ganeti-qa-non-test")
858
    self.assertEqual(mode, 0700)
859
    self.assertEqual(uid, self.UID_A)
860
    self.assertEqual(gid, self.GID_A)
861

    
862
  def testMakeDirWithPerm(self):
863
    is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
864
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
865
                          _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
866

    
867
  def testDirErrors(self):
868
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
869
                      "/ganeti-qa-non-test", 0700, 0, 0,
870
                      _lstat_fn=_MockStatResult(None, 0, 0, 0))
871
    self.assertRaises(IndexError, self._mkdir_calls.pop)
872

    
873
    other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
874
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
875
                      "/ganeti-qa-non-test", 0700, 0, 0,
876
                      _lstat_fn=other_stat_raise)
877
    self.assertRaises(IndexError, self._mkdir_calls.pop)
878

    
879
    non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
880
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
881
                          _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
882
                          _perm_fn=self._VerifyPerm)
883
    self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
884

    
885
  def testEnforcePermissionNoEnt(self):
886
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
887
                      "/ganeti-qa-non-test", 0600,
888
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
889
                      _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
890

    
891
  def testEnforcePermissionNoEntMustNotExist(self):
892
    utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
893
                            _chmod_fn=NotImplemented,
894
                            _chown_fn=NotImplemented,
895
                            _stat_fn=_MockStatResult(_RaiseNoEntError,
896
                                                          0, 0, 0))
897

    
898
  def testEnforcePermissionOtherErrorMustNotExist(self):
899
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
900
                      "/ganeti-qa-non-test", 0600, must_exist=False,
901
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
902
                      _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
903

    
904
  def testEnforcePermissionNoChanges(self):
905
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
906
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
907
                            _chmod_fn=self._ChmodWrapper(None),
908
                            _chown_fn=self._FakeChown)
909

    
910
  def testEnforcePermissionChangeMode(self):
911
    utils.EnforcePermission("/ganeti-qa-non-test", 0444,
912
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
913
                            _chmod_fn=self._ChmodWrapper(None),
914
                            _chown_fn=self._FakeChown)
915
    self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
916

    
917
  def testEnforcePermissionSetUidGid(self):
918
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
919
                            uid=self.UID_B, gid=self.GID_B,
920
                            _stat_fn=_MockStatResult(None, 0600,
921
                                                     self.UID_A,
922
                                                     self.GID_A),
923
                            _chmod_fn=self._ChmodWrapper(None),
924
                            _chown_fn=self._FakeChown)
925
    self.assertEqual(self._chown_calls.pop(0),
926
                     ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
927

    
928

    
929
if __name__ == "__main__":
930
  testutils.GanetiTestProgram()