Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ 9c2b3a70

History | View | Annotate | Download (31 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31
import stat
32
import errno
33

    
34
from ganeti import constants
35
from ganeti import utils
36
from ganeti import compat
37
from ganeti import errors
38

    
39
import testutils
40

    
41

    
42
class TestReadFile(testutils.GanetiTestCase):
43
  def testReadAll(self):
44
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45
    self.assertEqual(len(data), 814)
46

    
47
    h = compat.md5_hash()
48
    h.update(data)
49
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
50

    
51
  def testReadSize(self):
52
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
53
                          size=100)
54
    self.assertEqual(len(data), 100)
55

    
56
    h = compat.md5_hash()
57
    h.update(data)
58
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
59

    
60
  def testCallback(self):
61
    def _Cb(fh):
62
      self.assertEqual(fh.tell(), 0)
63
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64
    self.assertEqual(len(data), 814)
65

    
66
  def testError(self):
67
    self.assertRaises(EnvironmentError, utils.ReadFile,
68
                      "/dev/null/does-not-exist")
69

    
70

    
71
class TestReadOneLineFile(testutils.GanetiTestCase):
72
  def setUp(self):
73
    testutils.GanetiTestCase.setUp(self)
74

    
75
  def testDefault(self):
76
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77
    self.assertEqual(len(data), 27)
78
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
79

    
80
  def testNotStrict(self):
81
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
82
                                 strict=False)
83
    self.assertEqual(len(data), 27)
84
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
85

    
86
  def testStrictFailure(self):
87
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88
                      self._TestDataFilename("cert1.pem"), strict=True)
89

    
90
  def testLongLine(self):
91
    dummydata = (1024 * "Hello World! ")
92
    myfile = self._CreateTempFile()
93
    utils.WriteFile(myfile, data=dummydata)
94
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
95
    datalax = utils.ReadOneLineFile(myfile, strict=False)
96
    self.assertEqual(dummydata, datastrict)
97
    self.assertEqual(dummydata, datalax)
98

    
99
  def testNewline(self):
100
    myfile = self._CreateTempFile()
101
    myline = "myline"
102
    for nl in ["", "\n", "\r\n"]:
103
      dummydata = "%s%s" % (myline, nl)
104
      utils.WriteFile(myfile, data=dummydata)
105
      datalax = utils.ReadOneLineFile(myfile, strict=False)
106
      self.assertEqual(myline, datalax)
107
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
108
      self.assertEqual(myline, datastrict)
109

    
110
  def testWhitespaceAndMultipleLines(self):
111
    myfile = self._CreateTempFile()
112
    for nl in ["", "\n", "\r\n"]:
113
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
114
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115
        utils.WriteFile(myfile, data=dummydata)
116
        datalax = utils.ReadOneLineFile(myfile, strict=False)
117
        if nl:
118
          self.assert_(set("\r\n") & set(dummydata))
119
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
120
                            myfile, strict=True)
121
          explen = len("Foo bar baz ") + len(ws)
122
          self.assertEqual(len(datalax), explen)
123
          self.assertEqual(datalax, dummydata[:explen])
124
          self.assertFalse(set("\r\n") & set(datalax))
125
        else:
126
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
127
          self.assertEqual(dummydata, datastrict)
128
          self.assertEqual(dummydata, datalax)
129

    
130
  def testEmptylines(self):
131
    myfile = self._CreateTempFile()
132
    myline = "myline"
133
    for nl in ["\n", "\r\n"]:
134
      for ol in ["", "otherline"]:
135
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136
        utils.WriteFile(myfile, data=dummydata)
137
        self.assert_(set("\r\n") & set(dummydata))
138
        datalax = utils.ReadOneLineFile(myfile, strict=False)
139
        self.assertEqual(myline, datalax)
140
        if ol:
141
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
142
                            myfile, strict=True)
143
        else:
144
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
145
          self.assertEqual(myline, datastrict)
146

    
147
  def testEmptyfile(self):
148
    myfile = self._CreateTempFile()
149
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
150

    
151

    
152
class TestTimestampForFilename(unittest.TestCase):
153
  def test(self):
154
    self.assert_("." not in utils.TimestampForFilename())
155
    self.assert_(":" not in utils.TimestampForFilename())
156

    
157

    
158
class TestCreateBackup(testutils.GanetiTestCase):
159
  def setUp(self):
160
    testutils.GanetiTestCase.setUp(self)
161

    
162
    self.tmpdir = tempfile.mkdtemp()
163

    
164
  def tearDown(self):
165
    testutils.GanetiTestCase.tearDown(self)
166

    
167
    shutil.rmtree(self.tmpdir)
168

    
169
  def testEmpty(self):
170
    filename = utils.PathJoin(self.tmpdir, "config.data")
171
    utils.WriteFile(filename, data="")
172
    bname = utils.CreateBackup(filename)
173
    self.assertFileContent(bname, "")
174
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175
    utils.CreateBackup(filename)
176
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177
    utils.CreateBackup(filename)
178
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
179

    
180
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
181
    os.mkfifo(fifoname)
182
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
183

    
184
  def testContent(self):
185
    bkpcount = 0
186
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187
      for rep in [1, 2, 10, 127]:
188
        testdata = data * rep
189

    
190
        filename = utils.PathJoin(self.tmpdir, "test.data_")
191
        utils.WriteFile(filename, data=testdata)
192
        self.assertFileContent(filename, testdata)
193

    
194
        for _ in range(3):
195
          bname = utils.CreateBackup(filename)
196
          bkpcount += 1
197
          self.assertFileContent(bname, testdata)
198
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
199

    
200

    
201
class TestListVisibleFiles(unittest.TestCase):
202
  """Test case for ListVisibleFiles"""
203

    
204
  def setUp(self):
205
    self.path = tempfile.mkdtemp()
206

    
207
  def tearDown(self):
208
    shutil.rmtree(self.path)
209

    
210
  def _CreateFiles(self, files):
211
    for name in files:
212
      utils.WriteFile(os.path.join(self.path, name), data="test")
213

    
214
  def _test(self, files, expected):
215
    self._CreateFiles(files)
216
    found = utils.ListVisibleFiles(self.path)
217
    self.assertEqual(set(found), set(expected))
218

    
219
  def testAllVisible(self):
220
    files = ["a", "b", "c"]
221
    expected = files
222
    self._test(files, expected)
223

    
224
  def testNoneVisible(self):
225
    files = [".a", ".b", ".c"]
226
    expected = []
227
    self._test(files, expected)
228

    
229
  def testSomeVisible(self):
230
    files = ["a", "b", ".c"]
231
    expected = ["a", "b"]
232
    self._test(files, expected)
233

    
234
  def testNonAbsolutePath(self):
235
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
236
                          "abc")
237

    
238
  def testNonNormalizedPath(self):
239
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
240
                          "/bin/../tmp")
241

    
242

    
243
class TestWriteFile(unittest.TestCase):
244
  def setUp(self):
245
    self.tmpdir = None
246
    self.tfile = tempfile.NamedTemporaryFile()
247
    self.did_pre = False
248
    self.did_post = False
249
    self.did_write = False
250

    
251
  def tearDown(self):
252
    if self.tmpdir:
253
      shutil.rmtree(self.tmpdir)
254

    
255
  def markPre(self, fd):
256
    self.did_pre = True
257

    
258
  def markPost(self, fd):
259
    self.did_post = True
260

    
261
  def markWrite(self, fd):
262
    self.did_write = True
263

    
264
  def testWrite(self):
265
    data = "abc"
266
    utils.WriteFile(self.tfile.name, data=data)
267
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
268

    
269
  def testWriteSimpleUnicode(self):
270
    data = u"abc"
271
    utils.WriteFile(self.tfile.name, data=data)
272
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
273

    
274
  def testErrors(self):
275
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
276
                      self.tfile.name, data="test", fn=lambda fd: None)
277
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
278
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
279
                      self.tfile.name, data="test", atime=0)
280

    
281
  def testPreWrite(self):
282
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
283
    self.assertTrue(self.did_pre)
284
    self.assertFalse(self.did_post)
285
    self.assertFalse(self.did_write)
286

    
287
  def testPostWrite(self):
288
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
289
    self.assertFalse(self.did_pre)
290
    self.assertTrue(self.did_post)
291
    self.assertFalse(self.did_write)
292

    
293
  def testWriteFunction(self):
294
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
295
    self.assertFalse(self.did_pre)
296
    self.assertFalse(self.did_post)
297
    self.assertTrue(self.did_write)
298

    
299
  def testDryRun(self):
300
    orig = "abc"
301
    self.tfile.write(orig)
302
    self.tfile.flush()
303
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
304
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
305

    
306
  def testTimes(self):
307
    f = self.tfile.name
308
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
309
                   (int(time.time()), 5000)]:
310
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
311
      st = os.stat(f)
312
      self.assertEqual(st.st_atime, at)
313
      self.assertEqual(st.st_mtime, mt)
314

    
315
  def testNoClose(self):
316
    data = "hello"
317
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
318
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
319
    try:
320
      os.lseek(fd, 0, 0)
321
      self.assertEqual(os.read(fd, 4096), data)
322
    finally:
323
      os.close(fd)
324

    
325
  def testNoLeftovers(self):
326
    self.tmpdir = tempfile.mkdtemp()
327
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
328
                                     data="abc"),
329
                     None)
330
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
331

    
332
  def testFailRename(self):
333
    self.tmpdir = tempfile.mkdtemp()
334
    target = utils.PathJoin(self.tmpdir, "target")
335
    os.mkdir(target)
336
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
337
    self.assertTrue(os.path.isdir(target))
338
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
339
    self.assertFalse(os.listdir(target))
340

    
341
  def testFailRenameDryRun(self):
342
    self.tmpdir = tempfile.mkdtemp()
343
    target = utils.PathJoin(self.tmpdir, "target")
344
    os.mkdir(target)
345
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
346
    self.assertTrue(os.path.isdir(target))
347
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
348
    self.assertFalse(os.listdir(target))
349

    
350
  def testBackup(self):
351
    self.tmpdir = tempfile.mkdtemp()
352
    testfile = utils.PathJoin(self.tmpdir, "test")
353

    
354
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
355
    self.assertEqual(utils.ReadFile(testfile), "foo")
356
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
357

    
358
    # Write again
359
    assert os.path.isfile(testfile)
360
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
361
    self.assertEqual(utils.ReadFile(testfile), "bar")
362
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
363
    self.assertTrue("test" in os.listdir(self.tmpdir))
364
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
365

    
366
    # Write again as dry-run
367
    assert os.path.isfile(testfile)
368
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
369
                                     dry_run=True),
370
                     None)
371
    self.assertEqual(utils.ReadFile(testfile), "bar")
372
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
373
    self.assertTrue("test" in os.listdir(self.tmpdir))
374
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
375

    
376

    
377
class TestFileID(testutils.GanetiTestCase):
378
  def testEquality(self):
379
    name = self._CreateTempFile()
380
    oldi = utils.GetFileID(path=name)
381
    self.failUnless(utils.VerifyFileID(oldi, oldi))
382

    
383
  def testUpdate(self):
384
    name = self._CreateTempFile()
385
    oldi = utils.GetFileID(path=name)
386
    os.utime(name, None)
387
    fd = os.open(name, os.O_RDWR)
388
    try:
389
      newi = utils.GetFileID(fd=fd)
390
      self.failUnless(utils.VerifyFileID(oldi, newi))
391
      self.failUnless(utils.VerifyFileID(newi, oldi))
392
    finally:
393
      os.close(fd)
394

    
395
  def testWriteFile(self):
396
    name = self._CreateTempFile()
397
    oldi = utils.GetFileID(path=name)
398
    mtime = oldi[2]
399
    os.utime(name, (mtime + 10, mtime + 10))
400
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
401
                      oldi, data="")
402
    os.utime(name, (mtime - 10, mtime - 10))
403
    utils.SafeWriteFile(name, oldi, data="")
404
    oldi = utils.GetFileID(path=name)
405
    mtime = oldi[2]
406
    os.utime(name, (mtime + 10, mtime + 10))
407
    # this doesn't raise, since we passed None
408
    utils.SafeWriteFile(name, None, data="")
409

    
410
  def testError(self):
411
    t = tempfile.NamedTemporaryFile()
412
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
413
                      path=t.name, fd=t.fileno())
414

    
415

    
416
class TestRemoveFile(unittest.TestCase):
417
  """Test case for the RemoveFile function"""
418

    
419
  def setUp(self):
420
    """Create a temp dir and file for each case"""
421
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
422
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
423
    os.close(fd)
424

    
425
  def tearDown(self):
426
    if os.path.exists(self.tmpfile):
427
      os.unlink(self.tmpfile)
428
    os.rmdir(self.tmpdir)
429

    
430
  def testIgnoreDirs(self):
431
    """Test that RemoveFile() ignores directories"""
432
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
433

    
434
  def testIgnoreNotExisting(self):
435
    """Test that RemoveFile() ignores non-existing files"""
436
    utils.RemoveFile(self.tmpfile)
437
    utils.RemoveFile(self.tmpfile)
438

    
439
  def testRemoveFile(self):
440
    """Test that RemoveFile does remove a file"""
441
    utils.RemoveFile(self.tmpfile)
442
    if os.path.exists(self.tmpfile):
443
      self.fail("File '%s' not removed" % self.tmpfile)
444

    
445
  def testRemoveSymlink(self):
446
    """Test that RemoveFile does remove symlinks"""
447
    symlink = self.tmpdir + "/symlink"
448
    os.symlink("no-such-file", symlink)
449
    utils.RemoveFile(symlink)
450
    if os.path.exists(symlink):
451
      self.fail("File '%s' not removed" % symlink)
452
    os.symlink(self.tmpfile, symlink)
453
    utils.RemoveFile(symlink)
454
    if os.path.exists(symlink):
455
      self.fail("File '%s' not removed" % symlink)
456

    
457

    
458
class TestRemoveDir(unittest.TestCase):
459
  def setUp(self):
460
    self.tmpdir = tempfile.mkdtemp()
461

    
462
  def tearDown(self):
463
    try:
464
      shutil.rmtree(self.tmpdir)
465
    except EnvironmentError:
466
      pass
467

    
468
  def testEmptyDir(self):
469
    utils.RemoveDir(self.tmpdir)
470
    self.assertFalse(os.path.isdir(self.tmpdir))
471

    
472
  def testNonEmptyDir(self):
473
    self.tmpfile = os.path.join(self.tmpdir, "test1")
474
    open(self.tmpfile, "w").close()
475
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
476

    
477

    
478
class TestRename(unittest.TestCase):
479
  """Test case for RenameFile"""
480

    
481
  def setUp(self):
482
    """Create a temporary directory"""
483
    self.tmpdir = tempfile.mkdtemp()
484
    self.tmpfile = os.path.join(self.tmpdir, "test1")
485

    
486
    # Touch the file
487
    open(self.tmpfile, "w").close()
488

    
489
  def tearDown(self):
490
    """Remove temporary directory"""
491
    shutil.rmtree(self.tmpdir)
492

    
493
  def testSimpleRename1(self):
494
    """Simple rename 1"""
495
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
496
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
497

    
498
  def testSimpleRename2(self):
499
    """Simple rename 2"""
500
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
501
                     mkdir=True)
502
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
503

    
504
  def testRenameMkdir(self):
505
    """Rename with mkdir"""
506
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
507
                     mkdir=True)
508
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
509
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
510

    
511
    self.assertRaises(EnvironmentError, utils.RenameFile,
512
                      os.path.join(self.tmpdir, "test/xyz"),
513
                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
514
                      mkdir=True)
515

    
516
    self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
517
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
518
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar/baz")))
519

    
520

    
521
class TestMakedirs(unittest.TestCase):
522
  def setUp(self):
523
    self.tmpdir = tempfile.mkdtemp()
524

    
525
  def tearDown(self):
526
    shutil.rmtree(self.tmpdir)
527

    
528
  def testNonExisting(self):
529
    path = utils.PathJoin(self.tmpdir, "foo")
530
    utils.Makedirs(path)
531
    self.assert_(os.path.isdir(path))
532

    
533
  def testExisting(self):
534
    path = utils.PathJoin(self.tmpdir, "foo")
535
    os.mkdir(path)
536
    utils.Makedirs(path)
537
    self.assert_(os.path.isdir(path))
538

    
539
  def testRecursiveNonExisting(self):
540
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
541
    utils.Makedirs(path)
542
    self.assert_(os.path.isdir(path))
543

    
544
  def testRecursiveExisting(self):
545
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
546
    self.assertFalse(os.path.exists(path))
547
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
548
    utils.Makedirs(path)
549
    self.assert_(os.path.isdir(path))
550

    
551

    
552
class TestEnsureDirs(unittest.TestCase):
553
  """Tests for EnsureDirs"""
554

    
555
  def setUp(self):
556
    self.dir = tempfile.mkdtemp()
557
    self.old_umask = os.umask(0777)
558

    
559
  def testEnsureDirs(self):
560
    utils.EnsureDirs([
561
        (utils.PathJoin(self.dir, "foo"), 0777),
562
        (utils.PathJoin(self.dir, "bar"), 0000),
563
        ])
564
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
565
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
566

    
567
  def tearDown(self):
568
    os.rmdir(utils.PathJoin(self.dir, "foo"))
569
    os.rmdir(utils.PathJoin(self.dir, "bar"))
570
    os.rmdir(self.dir)
571
    os.umask(self.old_umask)
572

    
573

    
574
class TestIsNormAbsPath(unittest.TestCase):
575
  """Testing case for IsNormAbsPath"""
576

    
577
  def _pathTestHelper(self, path, result):
578
    if result:
579
      self.assert_(utils.IsNormAbsPath(path),
580
          "Path %s should result absolute and normalized" % path)
581
    else:
582
      self.assertFalse(utils.IsNormAbsPath(path),
583
          "Path %s should not result absolute and normalized" % path)
584

    
585
  def testBase(self):
586
    self._pathTestHelper("/etc", True)
587
    self._pathTestHelper("/srv", True)
588
    self._pathTestHelper("etc", False)
589
    self._pathTestHelper("/etc/../root", False)
590
    self._pathTestHelper("/etc/", False)
591

    
592

    
593
class TestIsBelowDir(unittest.TestCase):
594
  """Testing case for IsBelowDir"""
595

    
596
  def testSamePrefix(self):
597
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
598
    self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
599

    
600
  def testSamePrefixButDifferentDir(self):
601
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
602
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
603

    
604
  def testSamePrefixButDirTraversal(self):
605
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
606
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
607

    
608
  def testSamePrefixAndTraversal(self):
609
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
610
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
611
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
612

    
613
  def testBothAbsPath(self):
614
    self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
615
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
616
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
617

    
618

    
619
class TestPathJoin(unittest.TestCase):
620
  """Testing case for PathJoin"""
621

    
622
  def testBasicItems(self):
623
    mlist = ["/a", "b", "c"]
624
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
625

    
626
  def testNonAbsPrefix(self):
627
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
628

    
629
  def testBackTrack(self):
630
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
631

    
632
  def testMultiAbs(self):
633
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
634

    
635

    
636
class TestTailFile(testutils.GanetiTestCase):
637
  """Test case for the TailFile function"""
638

    
639
  def testEmpty(self):
640
    fname = self._CreateTempFile()
641
    self.failUnlessEqual(utils.TailFile(fname), [])
642
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
643

    
644
  def testAllLines(self):
645
    data = ["test %d" % i for i in range(30)]
646
    for i in range(30):
647
      fname = self._CreateTempFile()
648
      fd = open(fname, "w")
649
      fd.write("\n".join(data[:i]))
650
      if i > 0:
651
        fd.write("\n")
652
      fd.close()
653
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
654

    
655
  def testPartialLines(self):
656
    data = ["test %d" % i for i in range(30)]
657
    fname = self._CreateTempFile()
658
    fd = open(fname, "w")
659
    fd.write("\n".join(data))
660
    fd.write("\n")
661
    fd.close()
662
    for i in range(1, 30):
663
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
664

    
665
  def testBigFile(self):
666
    data = ["test %d" % i for i in range(30)]
667
    fname = self._CreateTempFile()
668
    fd = open(fname, "w")
669
    fd.write("X" * 1048576)
670
    fd.write("\n")
671
    fd.write("\n".join(data))
672
    fd.write("\n")
673
    fd.close()
674
    for i in range(1, 30):
675
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
676

    
677

    
678
class TestPidFileFunctions(unittest.TestCase):
679
  """Tests for WritePidFile and ReadPidFile"""
680

    
681
  def setUp(self):
682
    self.dir = tempfile.mkdtemp()
683
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
684

    
685
  def testPidFileFunctions(self):
686
    pid_file = self.f_dpn('test')
687
    fd = utils.WritePidFile(self.f_dpn('test'))
688
    self.failUnless(os.path.exists(pid_file),
689
                    "PID file should have been created")
690
    read_pid = utils.ReadPidFile(pid_file)
691
    self.failUnlessEqual(read_pid, os.getpid())
692
    self.failUnless(utils.IsProcessAlive(read_pid))
693
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
694
                          self.f_dpn('test'))
695
    os.close(fd)
696
    utils.RemoveFile(self.f_dpn("test"))
697
    self.failIf(os.path.exists(pid_file),
698
                "PID file should not exist anymore")
699
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
700
                         "ReadPidFile should return 0 for missing pid file")
701
    fh = open(pid_file, "w")
702
    fh.write("blah\n")
703
    fh.close()
704
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
705
                         "ReadPidFile should return 0 for invalid pid file")
706
    # but now, even with the file existing, we should be able to lock it
707
    fd = utils.WritePidFile(self.f_dpn('test'))
708
    os.close(fd)
709
    utils.RemoveFile(self.f_dpn("test"))
710
    self.failIf(os.path.exists(pid_file),
711
                "PID file should not exist anymore")
712

    
713
  def testKill(self):
714
    pid_file = self.f_dpn('child')
715
    r_fd, w_fd = os.pipe()
716
    new_pid = os.fork()
717
    if new_pid == 0: #child
718
      utils.WritePidFile(self.f_dpn('child'))
719
      os.write(w_fd, 'a')
720
      signal.pause()
721
      os._exit(0)
722
      return
723
    # else we are in the parent
724
    # wait until the child has written the pid file
725
    os.read(r_fd, 1)
726
    read_pid = utils.ReadPidFile(pid_file)
727
    self.failUnlessEqual(read_pid, new_pid)
728
    self.failUnless(utils.IsProcessAlive(new_pid))
729
    utils.KillProcess(new_pid, waitpid=True)
730
    self.failIf(utils.IsProcessAlive(new_pid))
731
    utils.RemoveFile(self.f_dpn('child'))
732
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
733

    
734
  def tearDown(self):
735
    shutil.rmtree(self.dir)
736

    
737

    
738
class TestSshKeys(testutils.GanetiTestCase):
739
  """Test case for the AddAuthorizedKey function"""
740

    
741
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
742
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
743
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
744

    
745
  def setUp(self):
746
    testutils.GanetiTestCase.setUp(self)
747
    self.tmpname = self._CreateTempFile()
748
    handle = open(self.tmpname, 'w')
749
    try:
750
      handle.write("%s\n" % TestSshKeys.KEY_A)
751
      handle.write("%s\n" % TestSshKeys.KEY_B)
752
    finally:
753
      handle.close()
754

    
755
  def testAddingNewKey(self):
756
    utils.AddAuthorizedKey(self.tmpname,
757
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
758

    
759
    self.assertFileContent(self.tmpname,
760
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
761
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
762
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
763
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
764

    
765
  def testAddingAlmostButNotCompletelyTheSameKey(self):
766
    utils.AddAuthorizedKey(self.tmpname,
767
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
768

    
769
    self.assertFileContent(self.tmpname,
770
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
771
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
772
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
773
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
774

    
775
  def testAddingExistingKeyWithSomeMoreSpaces(self):
776
    utils.AddAuthorizedKey(self.tmpname,
777
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
778

    
779
    self.assertFileContent(self.tmpname,
780
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
781
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
782
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
783

    
784
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
785
    utils.RemoveAuthorizedKey(self.tmpname,
786
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
787

    
788
    self.assertFileContent(self.tmpname,
789
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
790
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
791

    
792
  def testRemovingNonExistingKey(self):
793
    utils.RemoveAuthorizedKey(self.tmpname,
794
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
795

    
796
    self.assertFileContent(self.tmpname,
797
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
798
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
799
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
800

    
801

    
802
class TestNewUUID(unittest.TestCase):
803
  """Test case for NewUUID"""
804

    
805
  def runTest(self):
806
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
807

    
808

    
809
def _MockStatResult(cb, mode, uid, gid):
810
  def _fn(path):
811
    if cb:
812
      cb()
813
    return {
814
      stat.ST_MODE: mode,
815
      stat.ST_UID: uid,
816
      stat.ST_GID: gid,
817
      }
818
  return _fn
819

    
820

    
821
def _RaiseNoEntError():
822
  raise EnvironmentError(errno.ENOENT, "not found")
823

    
824

    
825
def _OtherStatRaise():
826
  raise EnvironmentError()
827

    
828

    
829
class TestPermissionEnforcements(unittest.TestCase):
830
  UID_A = 16024
831
  UID_B = 25850
832
  GID_A = 14028
833
  GID_B = 29801
834

    
835
  def setUp(self):
836
    self._chown_calls = []
837
    self._chmod_calls = []
838
    self._mkdir_calls = []
839

    
840
  def tearDown(self):
841
    self.assertRaises(IndexError, self._mkdir_calls.pop)
842
    self.assertRaises(IndexError, self._chmod_calls.pop)
843
    self.assertRaises(IndexError, self._chown_calls.pop)
844

    
845
  def _FakeMkdir(self, path):
846
    self._mkdir_calls.append(path)
847

    
848
  def _FakeChown(self, path, uid, gid):
849
    self._chown_calls.append((path, uid, gid))
850

    
851
  def _ChmodWrapper(self, cb):
852
    def _fn(path, mode):
853
      self._chmod_calls.append((path, mode))
854
      if cb:
855
        cb()
856
    return _fn
857

    
858
  def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
859
    self.assertEqual(path, "/ganeti-qa-non-test")
860
    self.assertEqual(mode, 0700)
861
    self.assertEqual(uid, self.UID_A)
862
    self.assertEqual(gid, self.GID_A)
863

    
864
  def testMakeDirWithPerm(self):
865
    is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
866
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
867
                          _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
868

    
869
  def testDirErrors(self):
870
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
871
                      "/ganeti-qa-non-test", 0700, 0, 0,
872
                      _lstat_fn=_MockStatResult(None, 0, 0, 0))
873
    self.assertRaises(IndexError, self._mkdir_calls.pop)
874

    
875
    other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
876
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
877
                      "/ganeti-qa-non-test", 0700, 0, 0,
878
                      _lstat_fn=other_stat_raise)
879
    self.assertRaises(IndexError, self._mkdir_calls.pop)
880

    
881
    non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
882
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
883
                          _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
884
                          _perm_fn=self._VerifyPerm)
885
    self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
886

    
887
  def testEnforcePermissionNoEnt(self):
888
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
889
                      "/ganeti-qa-non-test", 0600,
890
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
891
                      _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
892

    
893
  def testEnforcePermissionNoEntMustNotExist(self):
894
    utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
895
                            _chmod_fn=NotImplemented,
896
                            _chown_fn=NotImplemented,
897
                            _stat_fn=_MockStatResult(_RaiseNoEntError,
898
                                                          0, 0, 0))
899

    
900
  def testEnforcePermissionOtherErrorMustNotExist(self):
901
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
902
                      "/ganeti-qa-non-test", 0600, must_exist=False,
903
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
904
                      _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
905

    
906
  def testEnforcePermissionNoChanges(self):
907
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
908
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
909
                            _chmod_fn=self._ChmodWrapper(None),
910
                            _chown_fn=self._FakeChown)
911

    
912
  def testEnforcePermissionChangeMode(self):
913
    utils.EnforcePermission("/ganeti-qa-non-test", 0444,
914
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
915
                            _chmod_fn=self._ChmodWrapper(None),
916
                            _chown_fn=self._FakeChown)
917
    self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
918

    
919
  def testEnforcePermissionSetUidGid(self):
920
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
921
                            uid=self.UID_B, gid=self.GID_B,
922
                            _stat_fn=_MockStatResult(None, 0600,
923
                                                     self.UID_A,
924
                                                     self.GID_A),
925
                            _chmod_fn=self._ChmodWrapper(None),
926
                            _chown_fn=self._FakeChown)
927
    self.assertEqual(self._chown_calls.pop(0),
928
                     ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
929

    
930

    
931
if __name__ == "__main__":
932
  testutils.GanetiTestProgram()