Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ 2e04d454

History | View | Annotate | Download (31 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31
import stat
32
import errno
33

    
34
from ganeti import constants
35
from ganeti import utils
36
from ganeti import compat
37
from ganeti import errors
38

    
39
import testutils
40

    
41

    
42
class TestReadFile(testutils.GanetiTestCase):
43
  def testReadAll(self):
44
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45
    self.assertEqual(len(data), 814)
46

    
47
    h = compat.md5_hash()
48
    h.update(data)
49
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
50

    
51
  def testReadSize(self):
52
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
53
                          size=100)
54
    self.assertEqual(len(data), 100)
55

    
56
    h = compat.md5_hash()
57
    h.update(data)
58
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
59

    
60
  def testCallback(self):
61
    def _Cb(fh):
62
      self.assertEqual(fh.tell(), 0)
63
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64
    self.assertEqual(len(data), 814)
65

    
66
  def testError(self):
67
    self.assertRaises(EnvironmentError, utils.ReadFile,
68
                      "/dev/null/does-not-exist")
69

    
70

    
71
class TestReadOneLineFile(testutils.GanetiTestCase):
72
  def setUp(self):
73
    testutils.GanetiTestCase.setUp(self)
74

    
75
  def testDefault(self):
76
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77
    self.assertEqual(len(data), 27)
78
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
79

    
80
  def testNotStrict(self):
81
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
82
                                 strict=False)
83
    self.assertEqual(len(data), 27)
84
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
85

    
86
  def testStrictFailure(self):
87
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88
                      self._TestDataFilename("cert1.pem"), strict=True)
89

    
90
  def testLongLine(self):
91
    dummydata = (1024 * "Hello World! ")
92
    myfile = self._CreateTempFile()
93
    utils.WriteFile(myfile, data=dummydata)
94
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
95
    datalax = utils.ReadOneLineFile(myfile, strict=False)
96
    self.assertEqual(dummydata, datastrict)
97
    self.assertEqual(dummydata, datalax)
98

    
99
  def testNewline(self):
100
    myfile = self._CreateTempFile()
101
    myline = "myline"
102
    for nl in ["", "\n", "\r\n"]:
103
      dummydata = "%s%s" % (myline, nl)
104
      utils.WriteFile(myfile, data=dummydata)
105
      datalax = utils.ReadOneLineFile(myfile, strict=False)
106
      self.assertEqual(myline, datalax)
107
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
108
      self.assertEqual(myline, datastrict)
109

    
110
  def testWhitespaceAndMultipleLines(self):
111
    myfile = self._CreateTempFile()
112
    for nl in ["", "\n", "\r\n"]:
113
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
114
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115
        utils.WriteFile(myfile, data=dummydata)
116
        datalax = utils.ReadOneLineFile(myfile, strict=False)
117
        if nl:
118
          self.assert_(set("\r\n") & set(dummydata))
119
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
120
                            myfile, strict=True)
121
          explen = len("Foo bar baz ") + len(ws)
122
          self.assertEqual(len(datalax), explen)
123
          self.assertEqual(datalax, dummydata[:explen])
124
          self.assertFalse(set("\r\n") & set(datalax))
125
        else:
126
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
127
          self.assertEqual(dummydata, datastrict)
128
          self.assertEqual(dummydata, datalax)
129

    
130
  def testEmptylines(self):
131
    myfile = self._CreateTempFile()
132
    myline = "myline"
133
    for nl in ["\n", "\r\n"]:
134
      for ol in ["", "otherline"]:
135
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136
        utils.WriteFile(myfile, data=dummydata)
137
        self.assert_(set("\r\n") & set(dummydata))
138
        datalax = utils.ReadOneLineFile(myfile, strict=False)
139
        self.assertEqual(myline, datalax)
140
        if ol:
141
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
142
                            myfile, strict=True)
143
        else:
144
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
145
          self.assertEqual(myline, datastrict)
146

    
147
  def testEmptyfile(self):
148
    myfile = self._CreateTempFile()
149
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
150

    
151

    
152
class TestTimestampForFilename(unittest.TestCase):
153
  def test(self):
154
    self.assert_("." not in utils.TimestampForFilename())
155
    self.assert_(":" not in utils.TimestampForFilename())
156

    
157

    
158
class TestCreateBackup(testutils.GanetiTestCase):
159
  def setUp(self):
160
    testutils.GanetiTestCase.setUp(self)
161

    
162
    self.tmpdir = tempfile.mkdtemp()
163

    
164
  def tearDown(self):
165
    testutils.GanetiTestCase.tearDown(self)
166

    
167
    shutil.rmtree(self.tmpdir)
168

    
169
  def testEmpty(self):
170
    filename = utils.PathJoin(self.tmpdir, "config.data")
171
    utils.WriteFile(filename, data="")
172
    bname = utils.CreateBackup(filename)
173
    self.assertFileContent(bname, "")
174
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175
    utils.CreateBackup(filename)
176
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177
    utils.CreateBackup(filename)
178
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
179

    
180
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
181
    os.mkfifo(fifoname)
182
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
183

    
184
  def testContent(self):
185
    bkpcount = 0
186
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187
      for rep in [1, 2, 10, 127]:
188
        testdata = data * rep
189

    
190
        filename = utils.PathJoin(self.tmpdir, "test.data_")
191
        utils.WriteFile(filename, data=testdata)
192
        self.assertFileContent(filename, testdata)
193

    
194
        for _ in range(3):
195
          bname = utils.CreateBackup(filename)
196
          bkpcount += 1
197
          self.assertFileContent(bname, testdata)
198
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
199

    
200

    
201
class TestListVisibleFiles(unittest.TestCase):
202
  """Test case for ListVisibleFiles"""
203

    
204
  def setUp(self):
205
    self.path = tempfile.mkdtemp()
206

    
207
  def tearDown(self):
208
    shutil.rmtree(self.path)
209

    
210
  def _CreateFiles(self, files):
211
    for name in files:
212
      utils.WriteFile(os.path.join(self.path, name), data="test")
213

    
214
  def _test(self, files, expected):
215
    self._CreateFiles(files)
216
    found = utils.ListVisibleFiles(self.path)
217
    self.assertEqual(set(found), set(expected))
218

    
219
  def testAllVisible(self):
220
    files = ["a", "b", "c"]
221
    expected = files
222
    self._test(files, expected)
223

    
224
  def testNoneVisible(self):
225
    files = [".a", ".b", ".c"]
226
    expected = []
227
    self._test(files, expected)
228

    
229
  def testSomeVisible(self):
230
    files = ["a", "b", ".c"]
231
    expected = ["a", "b"]
232
    self._test(files, expected)
233

    
234
  def testNonAbsolutePath(self):
235
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
236
                          "abc")
237

    
238
  def testNonNormalizedPath(self):
239
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
240
                          "/bin/../tmp")
241

    
242

    
243
class TestWriteFile(unittest.TestCase):
244
  def setUp(self):
245
    self.tmpdir = None
246
    self.tfile = tempfile.NamedTemporaryFile()
247
    self.did_pre = False
248
    self.did_post = False
249
    self.did_write = False
250

    
251
  def tearDown(self):
252
    if self.tmpdir:
253
      shutil.rmtree(self.tmpdir)
254

    
255
  def markPre(self, fd):
256
    self.did_pre = True
257

    
258
  def markPost(self, fd):
259
    self.did_post = True
260

    
261
  def markWrite(self, fd):
262
    self.did_write = True
263

    
264
  def testWrite(self):
265
    data = "abc"
266
    utils.WriteFile(self.tfile.name, data=data)
267
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
268

    
269
  def testWriteSimpleUnicode(self):
270
    data = u"abc"
271
    utils.WriteFile(self.tfile.name, data=data)
272
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
273

    
274
  def testErrors(self):
275
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
276
                      self.tfile.name, data="test", fn=lambda fd: None)
277
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
278
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
279
                      self.tfile.name, data="test", atime=0)
280

    
281
  def testPreWrite(self):
282
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
283
    self.assertTrue(self.did_pre)
284
    self.assertFalse(self.did_post)
285
    self.assertFalse(self.did_write)
286

    
287
  def testPostWrite(self):
288
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
289
    self.assertFalse(self.did_pre)
290
    self.assertTrue(self.did_post)
291
    self.assertFalse(self.did_write)
292

    
293
  def testWriteFunction(self):
294
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
295
    self.assertFalse(self.did_pre)
296
    self.assertFalse(self.did_post)
297
    self.assertTrue(self.did_write)
298

    
299
  def testDryRun(self):
300
    orig = "abc"
301
    self.tfile.write(orig)
302
    self.tfile.flush()
303
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
304
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
305

    
306
  def testTimes(self):
307
    f = self.tfile.name
308
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
309
                   (int(time.time()), 5000)]:
310
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
311
      st = os.stat(f)
312
      self.assertEqual(st.st_atime, at)
313
      self.assertEqual(st.st_mtime, mt)
314

    
315
  def testNoClose(self):
316
    data = "hello"
317
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
318
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
319
    try:
320
      os.lseek(fd, 0, 0)
321
      self.assertEqual(os.read(fd, 4096), data)
322
    finally:
323
      os.close(fd)
324

    
325
  def testNoLeftovers(self):
326
    self.tmpdir = tempfile.mkdtemp()
327
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
328
                                     data="abc"),
329
                     None)
330
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
331

    
332
  def testFailRename(self):
333
    self.tmpdir = tempfile.mkdtemp()
334
    target = utils.PathJoin(self.tmpdir, "target")
335
    os.mkdir(target)
336
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
337
    self.assertTrue(os.path.isdir(target))
338
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
339
    self.assertFalse(os.listdir(target))
340

    
341
  def testFailRenameDryRun(self):
342
    self.tmpdir = tempfile.mkdtemp()
343
    target = utils.PathJoin(self.tmpdir, "target")
344
    os.mkdir(target)
345
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
346
    self.assertTrue(os.path.isdir(target))
347
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
348
    self.assertFalse(os.listdir(target))
349

    
350
  def testBackup(self):
351
    self.tmpdir = tempfile.mkdtemp()
352
    testfile = utils.PathJoin(self.tmpdir, "test")
353

    
354
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
355
    self.assertEqual(utils.ReadFile(testfile), "foo")
356
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
357

    
358
    # Write again
359
    assert os.path.isfile(testfile)
360
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
361
    self.assertEqual(utils.ReadFile(testfile), "bar")
362
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
363
    self.assertTrue("test" in os.listdir(self.tmpdir))
364
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
365

    
366
    # Write again as dry-run
367
    assert os.path.isfile(testfile)
368
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
369
                                     dry_run=True),
370
                     None)
371
    self.assertEqual(utils.ReadFile(testfile), "bar")
372
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
373
    self.assertTrue("test" in os.listdir(self.tmpdir))
374
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
375

    
376

    
377
class TestFileID(testutils.GanetiTestCase):
378
  def testEquality(self):
379
    name = self._CreateTempFile()
380
    oldi = utils.GetFileID(path=name)
381
    self.failUnless(utils.VerifyFileID(oldi, oldi))
382

    
383
  def testUpdate(self):
384
    name = self._CreateTempFile()
385
    oldi = utils.GetFileID(path=name)
386
    os.utime(name, None)
387
    fd = os.open(name, os.O_RDWR)
388
    try:
389
      newi = utils.GetFileID(fd=fd)
390
      self.failUnless(utils.VerifyFileID(oldi, newi))
391
      self.failUnless(utils.VerifyFileID(newi, oldi))
392
    finally:
393
      os.close(fd)
394

    
395
  def testWriteFile(self):
396
    name = self._CreateTempFile()
397
    oldi = utils.GetFileID(path=name)
398
    mtime = oldi[2]
399
    os.utime(name, (mtime + 10, mtime + 10))
400
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
401
                      oldi, data="")
402
    os.utime(name, (mtime - 10, mtime - 10))
403
    utils.SafeWriteFile(name, oldi, data="")
404
    oldi = utils.GetFileID(path=name)
405
    mtime = oldi[2]
406
    os.utime(name, (mtime + 10, mtime + 10))
407
    # this doesn't raise, since we passed None
408
    utils.SafeWriteFile(name, None, data="")
409

    
410
  def testError(self):
411
    t = tempfile.NamedTemporaryFile()
412
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
413
                      path=t.name, fd=t.fileno())
414

    
415

    
416
class TestRemoveFile(unittest.TestCase):
417
  """Test case for the RemoveFile function"""
418

    
419
  def setUp(self):
420
    """Create a temp dir and file for each case"""
421
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
422
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
423
    os.close(fd)
424

    
425
  def tearDown(self):
426
    if os.path.exists(self.tmpfile):
427
      os.unlink(self.tmpfile)
428
    os.rmdir(self.tmpdir)
429

    
430
  def testIgnoreDirs(self):
431
    """Test that RemoveFile() ignores directories"""
432
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
433

    
434
  def testIgnoreNotExisting(self):
435
    """Test that RemoveFile() ignores non-existing files"""
436
    utils.RemoveFile(self.tmpfile)
437
    utils.RemoveFile(self.tmpfile)
438

    
439
  def testRemoveFile(self):
440
    """Test that RemoveFile does remove a file"""
441
    utils.RemoveFile(self.tmpfile)
442
    if os.path.exists(self.tmpfile):
443
      self.fail("File '%s' not removed" % self.tmpfile)
444

    
445
  def testRemoveSymlink(self):
446
    """Test that RemoveFile does remove symlinks"""
447
    symlink = self.tmpdir + "/symlink"
448
    os.symlink("no-such-file", symlink)
449
    utils.RemoveFile(symlink)
450
    if os.path.exists(symlink):
451
      self.fail("File '%s' not removed" % symlink)
452
    os.symlink(self.tmpfile, symlink)
453
    utils.RemoveFile(symlink)
454
    if os.path.exists(symlink):
455
      self.fail("File '%s' not removed" % symlink)
456

    
457

    
458
class TestRemoveDir(unittest.TestCase):
459
  def setUp(self):
460
    self.tmpdir = tempfile.mkdtemp()
461

    
462
  def tearDown(self):
463
    try:
464
      shutil.rmtree(self.tmpdir)
465
    except EnvironmentError:
466
      pass
467

    
468
  def testEmptyDir(self):
469
    utils.RemoveDir(self.tmpdir)
470
    self.assertFalse(os.path.isdir(self.tmpdir))
471

    
472
  def testNonEmptyDir(self):
473
    self.tmpfile = os.path.join(self.tmpdir, "test1")
474
    open(self.tmpfile, "w").close()
475
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
476

    
477

    
478
class TestRename(unittest.TestCase):
479
  """Test case for RenameFile"""
480

    
481
  def setUp(self):
482
    """Create a temporary directory"""
483
    self.tmpdir = tempfile.mkdtemp()
484
    self.tmpfile = os.path.join(self.tmpdir, "test1")
485

    
486
    # Touch the file
487
    open(self.tmpfile, "w").close()
488

    
489
  def tearDown(self):
490
    """Remove temporary directory"""
491
    shutil.rmtree(self.tmpdir)
492

    
493
  def testSimpleRename1(self):
494
    """Simple rename 1"""
495
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
496
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
497

    
498
  def testSimpleRename2(self):
499
    """Simple rename 2"""
500
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
501
                     mkdir=True)
502
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
503

    
504
  def testRenameMkdir(self):
505
    """Rename with mkdir"""
506
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
507
                     mkdir=True)
508
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
509
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
510

    
511
    self.assertRaises(EnvironmentError, utils.RenameFile,
512
                      os.path.join(self.tmpdir, "test/xyz"),
513
                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
514
                      mkdir=True)
515

    
516
    self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
517
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
518
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
519
                                                 "test/foo/bar/baz")))
520

    
521

    
522
class TestMakedirs(unittest.TestCase):
523
  def setUp(self):
524
    self.tmpdir = tempfile.mkdtemp()
525

    
526
  def tearDown(self):
527
    shutil.rmtree(self.tmpdir)
528

    
529
  def testNonExisting(self):
530
    path = utils.PathJoin(self.tmpdir, "foo")
531
    utils.Makedirs(path)
532
    self.assert_(os.path.isdir(path))
533

    
534
  def testExisting(self):
535
    path = utils.PathJoin(self.tmpdir, "foo")
536
    os.mkdir(path)
537
    utils.Makedirs(path)
538
    self.assert_(os.path.isdir(path))
539

    
540
  def testRecursiveNonExisting(self):
541
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
542
    utils.Makedirs(path)
543
    self.assert_(os.path.isdir(path))
544

    
545
  def testRecursiveExisting(self):
546
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
547
    self.assertFalse(os.path.exists(path))
548
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
549
    utils.Makedirs(path)
550
    self.assert_(os.path.isdir(path))
551

    
552

    
553
class TestEnsureDirs(unittest.TestCase):
554
  """Tests for EnsureDirs"""
555

    
556
  def setUp(self):
557
    self.dir = tempfile.mkdtemp()
558
    self.old_umask = os.umask(0777)
559

    
560
  def testEnsureDirs(self):
561
    utils.EnsureDirs([
562
        (utils.PathJoin(self.dir, "foo"), 0777),
563
        (utils.PathJoin(self.dir, "bar"), 0000),
564
        ])
565
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
566
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
567

    
568
  def tearDown(self):
569
    os.rmdir(utils.PathJoin(self.dir, "foo"))
570
    os.rmdir(utils.PathJoin(self.dir, "bar"))
571
    os.rmdir(self.dir)
572
    os.umask(self.old_umask)
573

    
574

    
575
class TestIsNormAbsPath(unittest.TestCase):
576
  """Testing case for IsNormAbsPath"""
577

    
578
  def _pathTestHelper(self, path, result):
579
    if result:
580
      self.assert_(utils.IsNormAbsPath(path),
581
          "Path %s should result absolute and normalized" % path)
582
    else:
583
      self.assertFalse(utils.IsNormAbsPath(path),
584
          "Path %s should not result absolute and normalized" % path)
585

    
586
  def testBase(self):
587
    self._pathTestHelper("/etc", True)
588
    self._pathTestHelper("/srv", True)
589
    self._pathTestHelper("etc", False)
590
    self._pathTestHelper("/etc/../root", False)
591
    self._pathTestHelper("/etc/", False)
592

    
593

    
594
class TestIsBelowDir(unittest.TestCase):
595
  """Testing case for IsBelowDir"""
596

    
597
  def testSamePrefix(self):
598
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
599
    self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
600

    
601
  def testSamePrefixButDifferentDir(self):
602
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
603
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
604

    
605
  def testSamePrefixButDirTraversal(self):
606
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
607
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
608

    
609
  def testSamePrefixAndTraversal(self):
610
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
611
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
612
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
613

    
614
  def testBothAbsPath(self):
615
    self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
616
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
617
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
618

    
619

    
620
class TestPathJoin(unittest.TestCase):
621
  """Testing case for PathJoin"""
622

    
623
  def testBasicItems(self):
624
    mlist = ["/a", "b", "c"]
625
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
626

    
627
  def testNonAbsPrefix(self):
628
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
629

    
630
  def testBackTrack(self):
631
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
632

    
633
  def testMultiAbs(self):
634
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
635

    
636

    
637
class TestTailFile(testutils.GanetiTestCase):
638
  """Test case for the TailFile function"""
639

    
640
  def testEmpty(self):
641
    fname = self._CreateTempFile()
642
    self.failUnlessEqual(utils.TailFile(fname), [])
643
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
644

    
645
  def testAllLines(self):
646
    data = ["test %d" % i for i in range(30)]
647
    for i in range(30):
648
      fname = self._CreateTempFile()
649
      fd = open(fname, "w")
650
      fd.write("\n".join(data[:i]))
651
      if i > 0:
652
        fd.write("\n")
653
      fd.close()
654
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
655

    
656
  def testPartialLines(self):
657
    data = ["test %d" % i for i in range(30)]
658
    fname = self._CreateTempFile()
659
    fd = open(fname, "w")
660
    fd.write("\n".join(data))
661
    fd.write("\n")
662
    fd.close()
663
    for i in range(1, 30):
664
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
665

    
666
  def testBigFile(self):
667
    data = ["test %d" % i for i in range(30)]
668
    fname = self._CreateTempFile()
669
    fd = open(fname, "w")
670
    fd.write("X" * 1048576)
671
    fd.write("\n")
672
    fd.write("\n".join(data))
673
    fd.write("\n")
674
    fd.close()
675
    for i in range(1, 30):
676
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
677

    
678

    
679
class TestPidFileFunctions(unittest.TestCase):
680
  """Tests for WritePidFile and ReadPidFile"""
681

    
682
  def setUp(self):
683
    self.dir = tempfile.mkdtemp()
684
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
685

    
686
  def testPidFileFunctions(self):
687
    pid_file = self.f_dpn('test')
688
    fd = utils.WritePidFile(self.f_dpn('test'))
689
    self.failUnless(os.path.exists(pid_file),
690
                    "PID file should have been created")
691
    read_pid = utils.ReadPidFile(pid_file)
692
    self.failUnlessEqual(read_pid, os.getpid())
693
    self.failUnless(utils.IsProcessAlive(read_pid))
694
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
695
                          self.f_dpn('test'))
696
    os.close(fd)
697
    utils.RemoveFile(self.f_dpn("test"))
698
    self.failIf(os.path.exists(pid_file),
699
                "PID file should not exist anymore")
700
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
701
                         "ReadPidFile should return 0 for missing pid file")
702
    fh = open(pid_file, "w")
703
    fh.write("blah\n")
704
    fh.close()
705
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
706
                         "ReadPidFile should return 0 for invalid pid file")
707
    # but now, even with the file existing, we should be able to lock it
708
    fd = utils.WritePidFile(self.f_dpn('test'))
709
    os.close(fd)
710
    utils.RemoveFile(self.f_dpn("test"))
711
    self.failIf(os.path.exists(pid_file),
712
                "PID file should not exist anymore")
713

    
714
  def testKill(self):
715
    pid_file = self.f_dpn('child')
716
    r_fd, w_fd = os.pipe()
717
    new_pid = os.fork()
718
    if new_pid == 0: #child
719
      utils.WritePidFile(self.f_dpn('child'))
720
      os.write(w_fd, 'a')
721
      signal.pause()
722
      os._exit(0)
723
      return
724
    # else we are in the parent
725
    # wait until the child has written the pid file
726
    os.read(r_fd, 1)
727
    read_pid = utils.ReadPidFile(pid_file)
728
    self.failUnlessEqual(read_pid, new_pid)
729
    self.failUnless(utils.IsProcessAlive(new_pid))
730
    utils.KillProcess(new_pid, waitpid=True)
731
    self.failIf(utils.IsProcessAlive(new_pid))
732
    utils.RemoveFile(self.f_dpn('child'))
733
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
734

    
735
  def tearDown(self):
736
    shutil.rmtree(self.dir)
737

    
738

    
739
class TestSshKeys(testutils.GanetiTestCase):
740
  """Test case for the AddAuthorizedKey function"""
741

    
742
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
743
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
744
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
745

    
746
  def setUp(self):
747
    testutils.GanetiTestCase.setUp(self)
748
    self.tmpname = self._CreateTempFile()
749
    handle = open(self.tmpname, 'w')
750
    try:
751
      handle.write("%s\n" % TestSshKeys.KEY_A)
752
      handle.write("%s\n" % TestSshKeys.KEY_B)
753
    finally:
754
      handle.close()
755

    
756
  def testAddingNewKey(self):
757
    utils.AddAuthorizedKey(self.tmpname,
758
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
759

    
760
    self.assertFileContent(self.tmpname,
761
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
762
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
763
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
764
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
765

    
766
  def testAddingAlmostButNotCompletelyTheSameKey(self):
767
    utils.AddAuthorizedKey(self.tmpname,
768
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
769

    
770
    self.assertFileContent(self.tmpname,
771
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
772
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
773
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
774
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
775

    
776
  def testAddingExistingKeyWithSomeMoreSpaces(self):
777
    utils.AddAuthorizedKey(self.tmpname,
778
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
779

    
780
    self.assertFileContent(self.tmpname,
781
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
782
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
783
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
784

    
785
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
786
    utils.RemoveAuthorizedKey(self.tmpname,
787
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
788

    
789
    self.assertFileContent(self.tmpname,
790
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
791
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
792

    
793
  def testRemovingNonExistingKey(self):
794
    utils.RemoveAuthorizedKey(self.tmpname,
795
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
796

    
797
    self.assertFileContent(self.tmpname,
798
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
799
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
800
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
801

    
802

    
803
class TestNewUUID(unittest.TestCase):
804
  """Test case for NewUUID"""
805

    
806
  def runTest(self):
807
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
808

    
809

    
810
def _MockStatResult(cb, mode, uid, gid):
811
  def _fn(path):
812
    if cb:
813
      cb()
814
    return {
815
      stat.ST_MODE: mode,
816
      stat.ST_UID: uid,
817
      stat.ST_GID: gid,
818
      }
819
  return _fn
820

    
821

    
822
def _RaiseNoEntError():
823
  raise EnvironmentError(errno.ENOENT, "not found")
824

    
825

    
826
def _OtherStatRaise():
827
  raise EnvironmentError()
828

    
829

    
830
class TestPermissionEnforcements(unittest.TestCase):
831
  UID_A = 16024
832
  UID_B = 25850
833
  GID_A = 14028
834
  GID_B = 29801
835

    
836
  def setUp(self):
837
    self._chown_calls = []
838
    self._chmod_calls = []
839
    self._mkdir_calls = []
840

    
841
  def tearDown(self):
842
    self.assertRaises(IndexError, self._mkdir_calls.pop)
843
    self.assertRaises(IndexError, self._chmod_calls.pop)
844
    self.assertRaises(IndexError, self._chown_calls.pop)
845

    
846
  def _FakeMkdir(self, path):
847
    self._mkdir_calls.append(path)
848

    
849
  def _FakeChown(self, path, uid, gid):
850
    self._chown_calls.append((path, uid, gid))
851

    
852
  def _ChmodWrapper(self, cb):
853
    def _fn(path, mode):
854
      self._chmod_calls.append((path, mode))
855
      if cb:
856
        cb()
857
    return _fn
858

    
859
  def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
860
    self.assertEqual(path, "/ganeti-qa-non-test")
861
    self.assertEqual(mode, 0700)
862
    self.assertEqual(uid, self.UID_A)
863
    self.assertEqual(gid, self.GID_A)
864

    
865
  def testMakeDirWithPerm(self):
866
    is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
867
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
868
                          _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
869

    
870
  def testDirErrors(self):
871
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
872
                      "/ganeti-qa-non-test", 0700, 0, 0,
873
                      _lstat_fn=_MockStatResult(None, 0, 0, 0))
874
    self.assertRaises(IndexError, self._mkdir_calls.pop)
875

    
876
    other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
877
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
878
                      "/ganeti-qa-non-test", 0700, 0, 0,
879
                      _lstat_fn=other_stat_raise)
880
    self.assertRaises(IndexError, self._mkdir_calls.pop)
881

    
882
    non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
883
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
884
                          _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
885
                          _perm_fn=self._VerifyPerm)
886
    self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
887

    
888
  def testEnforcePermissionNoEnt(self):
889
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
890
                      "/ganeti-qa-non-test", 0600,
891
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
892
                      _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
893

    
894
  def testEnforcePermissionNoEntMustNotExist(self):
895
    utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
896
                            _chmod_fn=NotImplemented,
897
                            _chown_fn=NotImplemented,
898
                            _stat_fn=_MockStatResult(_RaiseNoEntError,
899
                                                          0, 0, 0))
900

    
901
  def testEnforcePermissionOtherErrorMustNotExist(self):
902
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
903
                      "/ganeti-qa-non-test", 0600, must_exist=False,
904
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
905
                      _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
906

    
907
  def testEnforcePermissionNoChanges(self):
908
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
909
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
910
                            _chmod_fn=self._ChmodWrapper(None),
911
                            _chown_fn=self._FakeChown)
912

    
913
  def testEnforcePermissionChangeMode(self):
914
    utils.EnforcePermission("/ganeti-qa-non-test", 0444,
915
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
916
                            _chmod_fn=self._ChmodWrapper(None),
917
                            _chown_fn=self._FakeChown)
918
    self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
919

    
920
  def testEnforcePermissionSetUidGid(self):
921
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
922
                            uid=self.UID_B, gid=self.GID_B,
923
                            _stat_fn=_MockStatResult(None, 0600,
924
                                                     self.UID_A,
925
                                                     self.GID_A),
926
                            _chmod_fn=self._ChmodWrapper(None),
927
                            _chown_fn=self._FakeChown)
928
    self.assertEqual(self._chown_calls.pop(0),
929
                     ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
930

    
931

    
932
if __name__ == "__main__":
933
  testutils.GanetiTestProgram()