Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ 8dc76d54

History | View | Annotate | Download (26.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import compat
35
from ganeti import errors
36

    
37
import testutils
38

    
39

    
40
class TestReadFile(testutils.GanetiTestCase):
41
  def testReadAll(self):
42
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
43
    self.assertEqual(len(data), 814)
44

    
45
    h = compat.md5_hash()
46
    h.update(data)
47
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
48

    
49
  def testReadSize(self):
50
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
51
                          size=100)
52
    self.assertEqual(len(data), 100)
53

    
54
    h = compat.md5_hash()
55
    h.update(data)
56
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
57

    
58
  def testCallback(self):
59
    def _Cb(fh):
60
      self.assertEqual(fh.tell(), 0)
61
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
62
    self.assertEqual(len(data), 814)
63

    
64
  def testError(self):
65
    self.assertRaises(EnvironmentError, utils.ReadFile,
66
                      "/dev/null/does-not-exist")
67

    
68

    
69
class TestReadOneLineFile(testutils.GanetiTestCase):
70
  def setUp(self):
71
    testutils.GanetiTestCase.setUp(self)
72

    
73
  def testDefault(self):
74
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
75
    self.assertEqual(len(data), 27)
76
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
77

    
78
  def testNotStrict(self):
79
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
80
                                 strict=False)
81
    self.assertEqual(len(data), 27)
82
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
83

    
84
  def testStrictFailure(self):
85
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
86
                      self._TestDataFilename("cert1.pem"), strict=True)
87

    
88
  def testLongLine(self):
89
    dummydata = (1024 * "Hello World! ")
90
    myfile = self._CreateTempFile()
91
    utils.WriteFile(myfile, data=dummydata)
92
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
93
    datalax = utils.ReadOneLineFile(myfile, strict=False)
94
    self.assertEqual(dummydata, datastrict)
95
    self.assertEqual(dummydata, datalax)
96

    
97
  def testNewline(self):
98
    myfile = self._CreateTempFile()
99
    myline = "myline"
100
    for nl in ["", "\n", "\r\n"]:
101
      dummydata = "%s%s" % (myline, nl)
102
      utils.WriteFile(myfile, data=dummydata)
103
      datalax = utils.ReadOneLineFile(myfile, strict=False)
104
      self.assertEqual(myline, datalax)
105
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
106
      self.assertEqual(myline, datastrict)
107

    
108
  def testWhitespaceAndMultipleLines(self):
109
    myfile = self._CreateTempFile()
110
    for nl in ["", "\n", "\r\n"]:
111
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
112
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
113
        utils.WriteFile(myfile, data=dummydata)
114
        datalax = utils.ReadOneLineFile(myfile, strict=False)
115
        if nl:
116
          self.assert_(set("\r\n") & set(dummydata))
117
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
118
                            myfile, strict=True)
119
          explen = len("Foo bar baz ") + len(ws)
120
          self.assertEqual(len(datalax), explen)
121
          self.assertEqual(datalax, dummydata[:explen])
122
          self.assertFalse(set("\r\n") & set(datalax))
123
        else:
124
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
125
          self.assertEqual(dummydata, datastrict)
126
          self.assertEqual(dummydata, datalax)
127

    
128
  def testEmptylines(self):
129
    myfile = self._CreateTempFile()
130
    myline = "myline"
131
    for nl in ["\n", "\r\n"]:
132
      for ol in ["", "otherline"]:
133
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
134
        utils.WriteFile(myfile, data=dummydata)
135
        self.assert_(set("\r\n") & set(dummydata))
136
        datalax = utils.ReadOneLineFile(myfile, strict=False)
137
        self.assertEqual(myline, datalax)
138
        if ol:
139
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
140
                            myfile, strict=True)
141
        else:
142
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
143
          self.assertEqual(myline, datastrict)
144

    
145
  def testEmptyfile(self):
146
    myfile = self._CreateTempFile()
147
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
148

    
149

    
150
class TestTimestampForFilename(unittest.TestCase):
151
  def test(self):
152
    self.assert_("." not in utils.TimestampForFilename())
153
    self.assert_(":" not in utils.TimestampForFilename())
154

    
155

    
156
class TestCreateBackup(testutils.GanetiTestCase):
157
  def setUp(self):
158
    testutils.GanetiTestCase.setUp(self)
159

    
160
    self.tmpdir = tempfile.mkdtemp()
161

    
162
  def tearDown(self):
163
    testutils.GanetiTestCase.tearDown(self)
164

    
165
    shutil.rmtree(self.tmpdir)
166

    
167
  def testEmpty(self):
168
    filename = utils.PathJoin(self.tmpdir, "config.data")
169
    utils.WriteFile(filename, data="")
170
    bname = utils.CreateBackup(filename)
171
    self.assertFileContent(bname, "")
172
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
173
    utils.CreateBackup(filename)
174
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
175
    utils.CreateBackup(filename)
176
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
177

    
178
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
179
    os.mkfifo(fifoname)
180
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
181

    
182
  def testContent(self):
183
    bkpcount = 0
184
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
185
      for rep in [1, 2, 10, 127]:
186
        testdata = data * rep
187

    
188
        filename = utils.PathJoin(self.tmpdir, "test.data_")
189
        utils.WriteFile(filename, data=testdata)
190
        self.assertFileContent(filename, testdata)
191

    
192
        for _ in range(3):
193
          bname = utils.CreateBackup(filename)
194
          bkpcount += 1
195
          self.assertFileContent(bname, testdata)
196
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
197

    
198

    
199
class TestListVisibleFiles(unittest.TestCase):
200
  """Test case for ListVisibleFiles"""
201

    
202
  def setUp(self):
203
    self.path = tempfile.mkdtemp()
204

    
205
  def tearDown(self):
206
    shutil.rmtree(self.path)
207

    
208
  def _CreateFiles(self, files):
209
    for name in files:
210
      utils.WriteFile(os.path.join(self.path, name), data="test")
211

    
212
  def _test(self, files, expected):
213
    self._CreateFiles(files)
214
    found = utils.ListVisibleFiles(self.path)
215
    self.assertEqual(set(found), set(expected))
216

    
217
  def testAllVisible(self):
218
    files = ["a", "b", "c"]
219
    expected = files
220
    self._test(files, expected)
221

    
222
  def testNoneVisible(self):
223
    files = [".a", ".b", ".c"]
224
    expected = []
225
    self._test(files, expected)
226

    
227
  def testSomeVisible(self):
228
    files = ["a", "b", ".c"]
229
    expected = ["a", "b"]
230
    self._test(files, expected)
231

    
232
  def testNonAbsolutePath(self):
233
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
234
                          "abc")
235

    
236
  def testNonNormalizedPath(self):
237
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
238
                          "/bin/../tmp")
239

    
240

    
241
class TestWriteFile(unittest.TestCase):
242
  def setUp(self):
243
    self.tmpdir = None
244
    self.tfile = tempfile.NamedTemporaryFile()
245
    self.did_pre = False
246
    self.did_post = False
247
    self.did_write = False
248

    
249
  def tearDown(self):
250
    if self.tmpdir:
251
      shutil.rmtree(self.tmpdir)
252

    
253
  def markPre(self, fd):
254
    self.did_pre = True
255

    
256
  def markPost(self, fd):
257
    self.did_post = True
258

    
259
  def markWrite(self, fd):
260
    self.did_write = True
261

    
262
  def testWrite(self):
263
    data = "abc"
264
    utils.WriteFile(self.tfile.name, data=data)
265
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
266

    
267
  def testWriteSimpleUnicode(self):
268
    data = u"abc"
269
    utils.WriteFile(self.tfile.name, data=data)
270
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
271

    
272
  def testErrors(self):
273
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
274
                      self.tfile.name, data="test", fn=lambda fd: None)
275
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
276
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
277
                      self.tfile.name, data="test", atime=0)
278

    
279
  def testPreWrite(self):
280
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
281
    self.assertTrue(self.did_pre)
282
    self.assertFalse(self.did_post)
283
    self.assertFalse(self.did_write)
284

    
285
  def testPostWrite(self):
286
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
287
    self.assertFalse(self.did_pre)
288
    self.assertTrue(self.did_post)
289
    self.assertFalse(self.did_write)
290

    
291
  def testWriteFunction(self):
292
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
293
    self.assertFalse(self.did_pre)
294
    self.assertFalse(self.did_post)
295
    self.assertTrue(self.did_write)
296

    
297
  def testDryRun(self):
298
    orig = "abc"
299
    self.tfile.write(orig)
300
    self.tfile.flush()
301
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
302
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
303

    
304
  def testTimes(self):
305
    f = self.tfile.name
306
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
307
                   (int(time.time()), 5000)]:
308
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
309
      st = os.stat(f)
310
      self.assertEqual(st.st_atime, at)
311
      self.assertEqual(st.st_mtime, mt)
312

    
313
  def testNoClose(self):
314
    data = "hello"
315
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
316
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
317
    try:
318
      os.lseek(fd, 0, 0)
319
      self.assertEqual(os.read(fd, 4096), data)
320
    finally:
321
      os.close(fd)
322

    
323
  def testNoLeftovers(self):
324
    self.tmpdir = tempfile.mkdtemp()
325
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
326
                                     data="abc"),
327
                     None)
328
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
329

    
330
  def testFailRename(self):
331
    self.tmpdir = tempfile.mkdtemp()
332
    target = utils.PathJoin(self.tmpdir, "target")
333
    os.mkdir(target)
334
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
335
    self.assertTrue(os.path.isdir(target))
336
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
337
    self.assertFalse(os.listdir(target))
338

    
339
  def testFailRenameDryRun(self):
340
    self.tmpdir = tempfile.mkdtemp()
341
    target = utils.PathJoin(self.tmpdir, "target")
342
    os.mkdir(target)
343
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
344
    self.assertTrue(os.path.isdir(target))
345
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
346
    self.assertFalse(os.listdir(target))
347

    
348
  def testBackup(self):
349
    self.tmpdir = tempfile.mkdtemp()
350
    testfile = utils.PathJoin(self.tmpdir, "test")
351

    
352
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
353
    self.assertEqual(utils.ReadFile(testfile), "foo")
354
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
355

    
356
    # Write again
357
    assert os.path.isfile(testfile)
358
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
359
    self.assertEqual(utils.ReadFile(testfile), "bar")
360
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
361
    self.assertTrue("test" in os.listdir(self.tmpdir))
362
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
363

    
364
    # Write again as dry-run
365
    assert os.path.isfile(testfile)
366
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
367
                                     dry_run=True),
368
                     None)
369
    self.assertEqual(utils.ReadFile(testfile), "bar")
370
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
371
    self.assertTrue("test" in os.listdir(self.tmpdir))
372
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
373

    
374

    
375
class TestFileID(testutils.GanetiTestCase):
376
  def testEquality(self):
377
    name = self._CreateTempFile()
378
    oldi = utils.GetFileID(path=name)
379
    self.failUnless(utils.VerifyFileID(oldi, oldi))
380

    
381
  def testUpdate(self):
382
    name = self._CreateTempFile()
383
    oldi = utils.GetFileID(path=name)
384
    os.utime(name, None)
385
    fd = os.open(name, os.O_RDWR)
386
    try:
387
      newi = utils.GetFileID(fd=fd)
388
      self.failUnless(utils.VerifyFileID(oldi, newi))
389
      self.failUnless(utils.VerifyFileID(newi, oldi))
390
    finally:
391
      os.close(fd)
392

    
393
  def testWriteFile(self):
394
    name = self._CreateTempFile()
395
    oldi = utils.GetFileID(path=name)
396
    mtime = oldi[2]
397
    os.utime(name, (mtime + 10, mtime + 10))
398
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
399
                      oldi, data="")
400
    os.utime(name, (mtime - 10, mtime - 10))
401
    utils.SafeWriteFile(name, oldi, data="")
402
    oldi = utils.GetFileID(path=name)
403
    mtime = oldi[2]
404
    os.utime(name, (mtime + 10, mtime + 10))
405
    # this doesn't raise, since we passed None
406
    utils.SafeWriteFile(name, None, data="")
407

    
408
  def testError(self):
409
    t = tempfile.NamedTemporaryFile()
410
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
411
                      path=t.name, fd=t.fileno())
412

    
413

    
414
class TestRemoveFile(unittest.TestCase):
415
  """Test case for the RemoveFile function"""
416

    
417
  def setUp(self):
418
    """Create a temp dir and file for each case"""
419
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
420
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
421
    os.close(fd)
422

    
423
  def tearDown(self):
424
    if os.path.exists(self.tmpfile):
425
      os.unlink(self.tmpfile)
426
    os.rmdir(self.tmpdir)
427

    
428
  def testIgnoreDirs(self):
429
    """Test that RemoveFile() ignores directories"""
430
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
431

    
432
  def testIgnoreNotExisting(self):
433
    """Test that RemoveFile() ignores non-existing files"""
434
    utils.RemoveFile(self.tmpfile)
435
    utils.RemoveFile(self.tmpfile)
436

    
437
  def testRemoveFile(self):
438
    """Test that RemoveFile does remove a file"""
439
    utils.RemoveFile(self.tmpfile)
440
    if os.path.exists(self.tmpfile):
441
      self.fail("File '%s' not removed" % self.tmpfile)
442

    
443
  def testRemoveSymlink(self):
444
    """Test that RemoveFile does remove symlinks"""
445
    symlink = self.tmpdir + "/symlink"
446
    os.symlink("no-such-file", symlink)
447
    utils.RemoveFile(symlink)
448
    if os.path.exists(symlink):
449
      self.fail("File '%s' not removed" % symlink)
450
    os.symlink(self.tmpfile, symlink)
451
    utils.RemoveFile(symlink)
452
    if os.path.exists(symlink):
453
      self.fail("File '%s' not removed" % symlink)
454

    
455

    
456
class TestRemoveDir(unittest.TestCase):
457
  def setUp(self):
458
    self.tmpdir = tempfile.mkdtemp()
459

    
460
  def tearDown(self):
461
    try:
462
      shutil.rmtree(self.tmpdir)
463
    except EnvironmentError:
464
      pass
465

    
466
  def testEmptyDir(self):
467
    utils.RemoveDir(self.tmpdir)
468
    self.assertFalse(os.path.isdir(self.tmpdir))
469

    
470
  def testNonEmptyDir(self):
471
    self.tmpfile = os.path.join(self.tmpdir, "test1")
472
    open(self.tmpfile, "w").close()
473
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
474

    
475

    
476
class TestRename(unittest.TestCase):
477
  """Test case for RenameFile"""
478

    
479
  def setUp(self):
480
    """Create a temporary directory"""
481
    self.tmpdir = tempfile.mkdtemp()
482
    self.tmpfile = os.path.join(self.tmpdir, "test1")
483

    
484
    # Touch the file
485
    open(self.tmpfile, "w").close()
486

    
487
  def tearDown(self):
488
    """Remove temporary directory"""
489
    shutil.rmtree(self.tmpdir)
490

    
491
  def testSimpleRename1(self):
492
    """Simple rename 1"""
493
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
494
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
495

    
496
  def testSimpleRename2(self):
497
    """Simple rename 2"""
498
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
499
                     mkdir=True)
500
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
501

    
502
  def testRenameMkdir(self):
503
    """Rename with mkdir"""
504
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
505
                     mkdir=True)
506
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
507
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
508

    
509
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
510
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
511
                     mkdir=True)
512
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
513
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
514
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
515

    
516

    
517
class TestMakedirs(unittest.TestCase):
518
  def setUp(self):
519
    self.tmpdir = tempfile.mkdtemp()
520

    
521
  def tearDown(self):
522
    shutil.rmtree(self.tmpdir)
523

    
524
  def testNonExisting(self):
525
    path = utils.PathJoin(self.tmpdir, "foo")
526
    utils.Makedirs(path)
527
    self.assert_(os.path.isdir(path))
528

    
529
  def testExisting(self):
530
    path = utils.PathJoin(self.tmpdir, "foo")
531
    os.mkdir(path)
532
    utils.Makedirs(path)
533
    self.assert_(os.path.isdir(path))
534

    
535
  def testRecursiveNonExisting(self):
536
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
537
    utils.Makedirs(path)
538
    self.assert_(os.path.isdir(path))
539

    
540
  def testRecursiveExisting(self):
541
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
542
    self.assertFalse(os.path.exists(path))
543
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
544
    utils.Makedirs(path)
545
    self.assert_(os.path.isdir(path))
546

    
547

    
548
class TestEnsureDirs(unittest.TestCase):
549
  """Tests for EnsureDirs"""
550

    
551
  def setUp(self):
552
    self.dir = tempfile.mkdtemp()
553
    self.old_umask = os.umask(0777)
554

    
555
  def testEnsureDirs(self):
556
    utils.EnsureDirs([
557
        (utils.PathJoin(self.dir, "foo"), 0777),
558
        (utils.PathJoin(self.dir, "bar"), 0000),
559
        ])
560
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
561
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
562

    
563
  def tearDown(self):
564
    os.rmdir(utils.PathJoin(self.dir, "foo"))
565
    os.rmdir(utils.PathJoin(self.dir, "bar"))
566
    os.rmdir(self.dir)
567
    os.umask(self.old_umask)
568

    
569

    
570
class TestIsNormAbsPath(unittest.TestCase):
571
  """Testing case for IsNormAbsPath"""
572

    
573
  def _pathTestHelper(self, path, result):
574
    if result:
575
      self.assert_(utils.IsNormAbsPath(path),
576
          "Path %s should result absolute and normalized" % path)
577
    else:
578
      self.assertFalse(utils.IsNormAbsPath(path),
579
          "Path %s should not result absolute and normalized" % path)
580

    
581
  def testBase(self):
582
    self._pathTestHelper("/etc", True)
583
    self._pathTestHelper("/srv", True)
584
    self._pathTestHelper("etc", False)
585
    self._pathTestHelper("/etc/../root", False)
586
    self._pathTestHelper("/etc/", False)
587

    
588

    
589
class TestIsBelowDir(unittest.TestCase):
590
  """Testing case for IsBelowDir"""
591

    
592
  def testSamePrefix(self):
593
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
594
    self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
595

    
596
  def testSamePrefixButDifferentDir(self):
597
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
598
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
599

    
600
  def testSamePrefixButDirTraversal(self):
601
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
602
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
603

    
604
  def testSamePrefixAndTraversal(self):
605
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
606
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
607
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
608

    
609
  def testBothAbsPath(self):
610
    self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
611
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
612
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
613

    
614

    
615
class TestPathJoin(unittest.TestCase):
616
  """Testing case for PathJoin"""
617

    
618
  def testBasicItems(self):
619
    mlist = ["/a", "b", "c"]
620
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
621

    
622
  def testNonAbsPrefix(self):
623
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
624

    
625
  def testBackTrack(self):
626
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
627

    
628
  def testMultiAbs(self):
629
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
630

    
631

    
632
class TestTailFile(testutils.GanetiTestCase):
633
  """Test case for the TailFile function"""
634

    
635
  def testEmpty(self):
636
    fname = self._CreateTempFile()
637
    self.failUnlessEqual(utils.TailFile(fname), [])
638
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
639

    
640
  def testAllLines(self):
641
    data = ["test %d" % i for i in range(30)]
642
    for i in range(30):
643
      fname = self._CreateTempFile()
644
      fd = open(fname, "w")
645
      fd.write("\n".join(data[:i]))
646
      if i > 0:
647
        fd.write("\n")
648
      fd.close()
649
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
650

    
651
  def testPartialLines(self):
652
    data = ["test %d" % i for i in range(30)]
653
    fname = self._CreateTempFile()
654
    fd = open(fname, "w")
655
    fd.write("\n".join(data))
656
    fd.write("\n")
657
    fd.close()
658
    for i in range(1, 30):
659
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
660

    
661
  def testBigFile(self):
662
    data = ["test %d" % i for i in range(30)]
663
    fname = self._CreateTempFile()
664
    fd = open(fname, "w")
665
    fd.write("X" * 1048576)
666
    fd.write("\n")
667
    fd.write("\n".join(data))
668
    fd.write("\n")
669
    fd.close()
670
    for i in range(1, 30):
671
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
672

    
673

    
674
class TestPidFileFunctions(unittest.TestCase):
675
  """Tests for WritePidFile and ReadPidFile"""
676

    
677
  def setUp(self):
678
    self.dir = tempfile.mkdtemp()
679
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
680

    
681
  def testPidFileFunctions(self):
682
    pid_file = self.f_dpn('test')
683
    fd = utils.WritePidFile(self.f_dpn('test'))
684
    self.failUnless(os.path.exists(pid_file),
685
                    "PID file should have been created")
686
    read_pid = utils.ReadPidFile(pid_file)
687
    self.failUnlessEqual(read_pid, os.getpid())
688
    self.failUnless(utils.IsProcessAlive(read_pid))
689
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
690
                          self.f_dpn('test'))
691
    os.close(fd)
692
    utils.RemoveFile(self.f_dpn("test"))
693
    self.failIf(os.path.exists(pid_file),
694
                "PID file should not exist anymore")
695
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
696
                         "ReadPidFile should return 0 for missing pid file")
697
    fh = open(pid_file, "w")
698
    fh.write("blah\n")
699
    fh.close()
700
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
701
                         "ReadPidFile should return 0 for invalid pid file")
702
    # but now, even with the file existing, we should be able to lock it
703
    fd = utils.WritePidFile(self.f_dpn('test'))
704
    os.close(fd)
705
    utils.RemoveFile(self.f_dpn("test"))
706
    self.failIf(os.path.exists(pid_file),
707
                "PID file should not exist anymore")
708

    
709
  def testKill(self):
710
    pid_file = self.f_dpn('child')
711
    r_fd, w_fd = os.pipe()
712
    new_pid = os.fork()
713
    if new_pid == 0: #child
714
      utils.WritePidFile(self.f_dpn('child'))
715
      os.write(w_fd, 'a')
716
      signal.pause()
717
      os._exit(0)
718
      return
719
    # else we are in the parent
720
    # wait until the child has written the pid file
721
    os.read(r_fd, 1)
722
    read_pid = utils.ReadPidFile(pid_file)
723
    self.failUnlessEqual(read_pid, new_pid)
724
    self.failUnless(utils.IsProcessAlive(new_pid))
725
    utils.KillProcess(new_pid, waitpid=True)
726
    self.failIf(utils.IsProcessAlive(new_pid))
727
    utils.RemoveFile(self.f_dpn('child'))
728
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
729

    
730
  def tearDown(self):
731
    shutil.rmtree(self.dir)
732

    
733

    
734
class TestSshKeys(testutils.GanetiTestCase):
735
  """Test case for the AddAuthorizedKey function"""
736

    
737
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
738
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
739
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
740

    
741
  def setUp(self):
742
    testutils.GanetiTestCase.setUp(self)
743
    self.tmpname = self._CreateTempFile()
744
    handle = open(self.tmpname, 'w')
745
    try:
746
      handle.write("%s\n" % TestSshKeys.KEY_A)
747
      handle.write("%s\n" % TestSshKeys.KEY_B)
748
    finally:
749
      handle.close()
750

    
751
  def testAddingNewKey(self):
752
    utils.AddAuthorizedKey(self.tmpname,
753
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
754

    
755
    self.assertFileContent(self.tmpname,
756
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
757
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
758
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
759
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
760

    
761
  def testAddingAlmostButNotCompletelyTheSameKey(self):
762
    utils.AddAuthorizedKey(self.tmpname,
763
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
764

    
765
    self.assertFileContent(self.tmpname,
766
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
767
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
768
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
769
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
770

    
771
  def testAddingExistingKeyWithSomeMoreSpaces(self):
772
    utils.AddAuthorizedKey(self.tmpname,
773
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
774

    
775
    self.assertFileContent(self.tmpname,
776
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
777
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
778
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
779

    
780
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
781
    utils.RemoveAuthorizedKey(self.tmpname,
782
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
783

    
784
    self.assertFileContent(self.tmpname,
785
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
786
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
787

    
788
  def testRemovingNonExistingKey(self):
789
    utils.RemoveAuthorizedKey(self.tmpname,
790
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
791

    
792
    self.assertFileContent(self.tmpname,
793
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
794
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
795
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
796

    
797

    
798
class TestNewUUID(unittest.TestCase):
799
  """Test case for NewUUID"""
800

    
801
  def runTest(self):
802
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
803

    
804

    
805
if __name__ == "__main__":
806
  testutils.GanetiTestProgram()