Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ 0e5084ee

History | View | Annotate | Download (25.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import compat
35
from ganeti import errors
36

    
37
import testutils
38

    
39

    
40
class TestReadFile(testutils.GanetiTestCase):
41
  def testReadAll(self):
42
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
43
    self.assertEqual(len(data), 814)
44

    
45
    h = compat.md5_hash()
46
    h.update(data)
47
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
48

    
49
  def testReadSize(self):
50
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
51
                          size=100)
52
    self.assertEqual(len(data), 100)
53

    
54
    h = compat.md5_hash()
55
    h.update(data)
56
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
57

    
58
  def testCallback(self):
59
    def _Cb(fh):
60
      self.assertEqual(fh.tell(), 0)
61
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
62
    self.assertEqual(len(data), 814)
63

    
64
  def testError(self):
65
    self.assertRaises(EnvironmentError, utils.ReadFile,
66
                      "/dev/null/does-not-exist")
67

    
68

    
69
class TestReadOneLineFile(testutils.GanetiTestCase):
70
  def setUp(self):
71
    testutils.GanetiTestCase.setUp(self)
72

    
73
  def testDefault(self):
74
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
75
    self.assertEqual(len(data), 27)
76
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
77

    
78
  def testNotStrict(self):
79
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
80
                                 strict=False)
81
    self.assertEqual(len(data), 27)
82
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
83

    
84
  def testStrictFailure(self):
85
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
86
                      self._TestDataFilename("cert1.pem"), strict=True)
87

    
88
  def testLongLine(self):
89
    dummydata = (1024 * "Hello World! ")
90
    myfile = self._CreateTempFile()
91
    utils.WriteFile(myfile, data=dummydata)
92
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
93
    datalax = utils.ReadOneLineFile(myfile, strict=False)
94
    self.assertEqual(dummydata, datastrict)
95
    self.assertEqual(dummydata, datalax)
96

    
97
  def testNewline(self):
98
    myfile = self._CreateTempFile()
99
    myline = "myline"
100
    for nl in ["", "\n", "\r\n"]:
101
      dummydata = "%s%s" % (myline, nl)
102
      utils.WriteFile(myfile, data=dummydata)
103
      datalax = utils.ReadOneLineFile(myfile, strict=False)
104
      self.assertEqual(myline, datalax)
105
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
106
      self.assertEqual(myline, datastrict)
107

    
108
  def testWhitespaceAndMultipleLines(self):
109
    myfile = self._CreateTempFile()
110
    for nl in ["", "\n", "\r\n"]:
111
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
112
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
113
        utils.WriteFile(myfile, data=dummydata)
114
        datalax = utils.ReadOneLineFile(myfile, strict=False)
115
        if nl:
116
          self.assert_(set("\r\n") & set(dummydata))
117
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
118
                            myfile, strict=True)
119
          explen = len("Foo bar baz ") + len(ws)
120
          self.assertEqual(len(datalax), explen)
121
          self.assertEqual(datalax, dummydata[:explen])
122
          self.assertFalse(set("\r\n") & set(datalax))
123
        else:
124
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
125
          self.assertEqual(dummydata, datastrict)
126
          self.assertEqual(dummydata, datalax)
127

    
128
  def testEmptylines(self):
129
    myfile = self._CreateTempFile()
130
    myline = "myline"
131
    for nl in ["\n", "\r\n"]:
132
      for ol in ["", "otherline"]:
133
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
134
        utils.WriteFile(myfile, data=dummydata)
135
        self.assert_(set("\r\n") & set(dummydata))
136
        datalax = utils.ReadOneLineFile(myfile, strict=False)
137
        self.assertEqual(myline, datalax)
138
        if ol:
139
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
140
                            myfile, strict=True)
141
        else:
142
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
143
          self.assertEqual(myline, datastrict)
144

    
145
  def testEmptyfile(self):
146
    myfile = self._CreateTempFile()
147
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
148

    
149

    
150
class TestTimestampForFilename(unittest.TestCase):
151
  def test(self):
152
    self.assert_("." not in utils.TimestampForFilename())
153
    self.assert_(":" not in utils.TimestampForFilename())
154

    
155

    
156
class TestCreateBackup(testutils.GanetiTestCase):
157
  def setUp(self):
158
    testutils.GanetiTestCase.setUp(self)
159

    
160
    self.tmpdir = tempfile.mkdtemp()
161

    
162
  def tearDown(self):
163
    testutils.GanetiTestCase.tearDown(self)
164

    
165
    shutil.rmtree(self.tmpdir)
166

    
167
  def testEmpty(self):
168
    filename = utils.PathJoin(self.tmpdir, "config.data")
169
    utils.WriteFile(filename, data="")
170
    bname = utils.CreateBackup(filename)
171
    self.assertFileContent(bname, "")
172
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
173
    utils.CreateBackup(filename)
174
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
175
    utils.CreateBackup(filename)
176
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
177

    
178
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
179
    os.mkfifo(fifoname)
180
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
181

    
182
  def testContent(self):
183
    bkpcount = 0
184
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
185
      for rep in [1, 2, 10, 127]:
186
        testdata = data * rep
187

    
188
        filename = utils.PathJoin(self.tmpdir, "test.data_")
189
        utils.WriteFile(filename, data=testdata)
190
        self.assertFileContent(filename, testdata)
191

    
192
        for _ in range(3):
193
          bname = utils.CreateBackup(filename)
194
          bkpcount += 1
195
          self.assertFileContent(bname, testdata)
196
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
197

    
198

    
199
class TestListVisibleFiles(unittest.TestCase):
200
  """Test case for ListVisibleFiles"""
201

    
202
  def setUp(self):
203
    self.path = tempfile.mkdtemp()
204

    
205
  def tearDown(self):
206
    shutil.rmtree(self.path)
207

    
208
  def _CreateFiles(self, files):
209
    for name in files:
210
      utils.WriteFile(os.path.join(self.path, name), data="test")
211

    
212
  def _test(self, files, expected):
213
    self._CreateFiles(files)
214
    found = utils.ListVisibleFiles(self.path)
215
    self.assertEqual(set(found), set(expected))
216

    
217
  def testAllVisible(self):
218
    files = ["a", "b", "c"]
219
    expected = files
220
    self._test(files, expected)
221

    
222
  def testNoneVisible(self):
223
    files = [".a", ".b", ".c"]
224
    expected = []
225
    self._test(files, expected)
226

    
227
  def testSomeVisible(self):
228
    files = ["a", "b", ".c"]
229
    expected = ["a", "b"]
230
    self._test(files, expected)
231

    
232
  def testNonAbsolutePath(self):
233
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
234
                          "abc")
235

    
236
  def testNonNormalizedPath(self):
237
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
238
                          "/bin/../tmp")
239

    
240

    
241
class TestWriteFile(unittest.TestCase):
242
  def setUp(self):
243
    self.tmpdir = None
244
    self.tfile = tempfile.NamedTemporaryFile()
245
    self.did_pre = False
246
    self.did_post = False
247
    self.did_write = False
248

    
249
  def tearDown(self):
250
    if self.tmpdir:
251
      shutil.rmtree(self.tmpdir)
252

    
253
  def markPre(self, fd):
254
    self.did_pre = True
255

    
256
  def markPost(self, fd):
257
    self.did_post = True
258

    
259
  def markWrite(self, fd):
260
    self.did_write = True
261

    
262
  def testWrite(self):
263
    data = "abc"
264
    utils.WriteFile(self.tfile.name, data=data)
265
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
266

    
267
  def testWriteSimpleUnicode(self):
268
    data = u"abc"
269
    utils.WriteFile(self.tfile.name, data=data)
270
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
271

    
272
  def testErrors(self):
273
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
274
                      self.tfile.name, data="test", fn=lambda fd: None)
275
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
276
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
277
                      self.tfile.name, data="test", atime=0)
278

    
279
  def testPreWrite(self):
280
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
281
    self.assertTrue(self.did_pre)
282
    self.assertFalse(self.did_post)
283
    self.assertFalse(self.did_write)
284

    
285
  def testPostWrite(self):
286
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
287
    self.assertFalse(self.did_pre)
288
    self.assertTrue(self.did_post)
289
    self.assertFalse(self.did_write)
290

    
291
  def testWriteFunction(self):
292
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
293
    self.assertFalse(self.did_pre)
294
    self.assertFalse(self.did_post)
295
    self.assertTrue(self.did_write)
296

    
297
  def testDryRun(self):
298
    orig = "abc"
299
    self.tfile.write(orig)
300
    self.tfile.flush()
301
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
302
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
303

    
304
  def testTimes(self):
305
    f = self.tfile.name
306
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
307
                   (int(time.time()), 5000)]:
308
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
309
      st = os.stat(f)
310
      self.assertEqual(st.st_atime, at)
311
      self.assertEqual(st.st_mtime, mt)
312

    
313
  def testNoClose(self):
314
    data = "hello"
315
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
316
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
317
    try:
318
      os.lseek(fd, 0, 0)
319
      self.assertEqual(os.read(fd, 4096), data)
320
    finally:
321
      os.close(fd)
322

    
323
  def testNoLeftovers(self):
324
    self.tmpdir = tempfile.mkdtemp()
325
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
326
                                     data="abc"),
327
                     None)
328
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
329

    
330
  def testFailRename(self):
331
    self.tmpdir = tempfile.mkdtemp()
332
    target = utils.PathJoin(self.tmpdir, "target")
333
    os.mkdir(target)
334
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
335
    self.assertTrue(os.path.isdir(target))
336
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
337
    self.assertFalse(os.listdir(target))
338

    
339
  def testFailRenameDryRun(self):
340
    self.tmpdir = tempfile.mkdtemp()
341
    target = utils.PathJoin(self.tmpdir, "target")
342
    os.mkdir(target)
343
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
344
    self.assertTrue(os.path.isdir(target))
345
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
346
    self.assertFalse(os.listdir(target))
347

    
348
  def testBackup(self):
349
    self.tmpdir = tempfile.mkdtemp()
350
    testfile = utils.PathJoin(self.tmpdir, "test")
351

    
352
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
353
    self.assertEqual(utils.ReadFile(testfile), "foo")
354
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
355

    
356
    # Write again
357
    assert os.path.isfile(testfile)
358
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
359
    self.assertEqual(utils.ReadFile(testfile), "bar")
360
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
361
    self.assertTrue("test" in os.listdir(self.tmpdir))
362
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
363

    
364
    # Write again as dry-run
365
    assert os.path.isfile(testfile)
366
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
367
                                     dry_run=True),
368
                     None)
369
    self.assertEqual(utils.ReadFile(testfile), "bar")
370
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
371
    self.assertTrue("test" in os.listdir(self.tmpdir))
372
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
373

    
374

    
375
class TestFileID(testutils.GanetiTestCase):
376
  def testEquality(self):
377
    name = self._CreateTempFile()
378
    oldi = utils.GetFileID(path=name)
379
    self.failUnless(utils.VerifyFileID(oldi, oldi))
380

    
381
  def testUpdate(self):
382
    name = self._CreateTempFile()
383
    oldi = utils.GetFileID(path=name)
384
    os.utime(name, None)
385
    fd = os.open(name, os.O_RDWR)
386
    try:
387
      newi = utils.GetFileID(fd=fd)
388
      self.failUnless(utils.VerifyFileID(oldi, newi))
389
      self.failUnless(utils.VerifyFileID(newi, oldi))
390
    finally:
391
      os.close(fd)
392

    
393
  def testWriteFile(self):
394
    name = self._CreateTempFile()
395
    oldi = utils.GetFileID(path=name)
396
    mtime = oldi[2]
397
    os.utime(name, (mtime + 10, mtime + 10))
398
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
399
                      oldi, data="")
400
    os.utime(name, (mtime - 10, mtime - 10))
401
    utils.SafeWriteFile(name, oldi, data="")
402
    oldi = utils.GetFileID(path=name)
403
    mtime = oldi[2]
404
    os.utime(name, (mtime + 10, mtime + 10))
405
    # this doesn't raise, since we passed None
406
    utils.SafeWriteFile(name, None, data="")
407

    
408
  def testError(self):
409
    t = tempfile.NamedTemporaryFile()
410
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
411
                      path=t.name, fd=t.fileno())
412

    
413

    
414
class TestRemoveFile(unittest.TestCase):
415
  """Test case for the RemoveFile function"""
416

    
417
  def setUp(self):
418
    """Create a temp dir and file for each case"""
419
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
420
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
421
    os.close(fd)
422

    
423
  def tearDown(self):
424
    if os.path.exists(self.tmpfile):
425
      os.unlink(self.tmpfile)
426
    os.rmdir(self.tmpdir)
427

    
428
  def testIgnoreDirs(self):
429
    """Test that RemoveFile() ignores directories"""
430
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
431

    
432
  def testIgnoreNotExisting(self):
433
    """Test that RemoveFile() ignores non-existing files"""
434
    utils.RemoveFile(self.tmpfile)
435
    utils.RemoveFile(self.tmpfile)
436

    
437
  def testRemoveFile(self):
438
    """Test that RemoveFile does remove a file"""
439
    utils.RemoveFile(self.tmpfile)
440
    if os.path.exists(self.tmpfile):
441
      self.fail("File '%s' not removed" % self.tmpfile)
442

    
443
  def testRemoveSymlink(self):
444
    """Test that RemoveFile does remove symlinks"""
445
    symlink = self.tmpdir + "/symlink"
446
    os.symlink("no-such-file", symlink)
447
    utils.RemoveFile(symlink)
448
    if os.path.exists(symlink):
449
      self.fail("File '%s' not removed" % symlink)
450
    os.symlink(self.tmpfile, symlink)
451
    utils.RemoveFile(symlink)
452
    if os.path.exists(symlink):
453
      self.fail("File '%s' not removed" % symlink)
454

    
455

    
456
class TestRemoveDir(unittest.TestCase):
457
  def setUp(self):
458
    self.tmpdir = tempfile.mkdtemp()
459

    
460
  def tearDown(self):
461
    try:
462
      shutil.rmtree(self.tmpdir)
463
    except EnvironmentError:
464
      pass
465

    
466
  def testEmptyDir(self):
467
    utils.RemoveDir(self.tmpdir)
468
    self.assertFalse(os.path.isdir(self.tmpdir))
469

    
470
  def testNonEmptyDir(self):
471
    self.tmpfile = os.path.join(self.tmpdir, "test1")
472
    open(self.tmpfile, "w").close()
473
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
474

    
475

    
476
class TestRename(unittest.TestCase):
477
  """Test case for RenameFile"""
478

    
479
  def setUp(self):
480
    """Create a temporary directory"""
481
    self.tmpdir = tempfile.mkdtemp()
482
    self.tmpfile = os.path.join(self.tmpdir, "test1")
483

    
484
    # Touch the file
485
    open(self.tmpfile, "w").close()
486

    
487
  def tearDown(self):
488
    """Remove temporary directory"""
489
    shutil.rmtree(self.tmpdir)
490

    
491
  def testSimpleRename1(self):
492
    """Simple rename 1"""
493
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
494
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
495

    
496
  def testSimpleRename2(self):
497
    """Simple rename 2"""
498
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
499
                     mkdir=True)
500
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
501

    
502
  def testRenameMkdir(self):
503
    """Rename with mkdir"""
504
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
505
                     mkdir=True)
506
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
507
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
508

    
509
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
510
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
511
                     mkdir=True)
512
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
513
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
514
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
515

    
516

    
517
class TestMakedirs(unittest.TestCase):
518
  def setUp(self):
519
    self.tmpdir = tempfile.mkdtemp()
520

    
521
  def tearDown(self):
522
    shutil.rmtree(self.tmpdir)
523

    
524
  def testNonExisting(self):
525
    path = utils.PathJoin(self.tmpdir, "foo")
526
    utils.Makedirs(path)
527
    self.assert_(os.path.isdir(path))
528

    
529
  def testExisting(self):
530
    path = utils.PathJoin(self.tmpdir, "foo")
531
    os.mkdir(path)
532
    utils.Makedirs(path)
533
    self.assert_(os.path.isdir(path))
534

    
535
  def testRecursiveNonExisting(self):
536
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
537
    utils.Makedirs(path)
538
    self.assert_(os.path.isdir(path))
539

    
540
  def testRecursiveExisting(self):
541
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
542
    self.assertFalse(os.path.exists(path))
543
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
544
    utils.Makedirs(path)
545
    self.assert_(os.path.isdir(path))
546

    
547

    
548
class TestEnsureDirs(unittest.TestCase):
549
  """Tests for EnsureDirs"""
550

    
551
  def setUp(self):
552
    self.dir = tempfile.mkdtemp()
553
    self.old_umask = os.umask(0777)
554

    
555
  def testEnsureDirs(self):
556
    utils.EnsureDirs([
557
        (utils.PathJoin(self.dir, "foo"), 0777),
558
        (utils.PathJoin(self.dir, "bar"), 0000),
559
        ])
560
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
561
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
562

    
563
  def tearDown(self):
564
    os.rmdir(utils.PathJoin(self.dir, "foo"))
565
    os.rmdir(utils.PathJoin(self.dir, "bar"))
566
    os.rmdir(self.dir)
567
    os.umask(self.old_umask)
568

    
569

    
570
class TestIsNormAbsPath(unittest.TestCase):
571
  """Testing case for IsNormAbsPath"""
572

    
573
  def _pathTestHelper(self, path, result):
574
    if result:
575
      self.assert_(utils.IsNormAbsPath(path),
576
          "Path %s should result absolute and normalized" % path)
577
    else:
578
      self.assertFalse(utils.IsNormAbsPath(path),
579
          "Path %s should not result absolute and normalized" % path)
580

    
581
  def testBase(self):
582
    self._pathTestHelper("/etc", True)
583
    self._pathTestHelper("/srv", True)
584
    self._pathTestHelper("etc", False)
585
    self._pathTestHelper("/etc/../root", False)
586
    self._pathTestHelper("/etc/", False)
587

    
588

    
589
class TestPathJoin(unittest.TestCase):
590
  """Testing case for PathJoin"""
591

    
592
  def testBasicItems(self):
593
    mlist = ["/a", "b", "c"]
594
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
595

    
596
  def testNonAbsPrefix(self):
597
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
598

    
599
  def testBackTrack(self):
600
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
601

    
602
  def testMultiAbs(self):
603
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
604

    
605

    
606
class TestTailFile(testutils.GanetiTestCase):
607
  """Test case for the TailFile function"""
608

    
609
  def testEmpty(self):
610
    fname = self._CreateTempFile()
611
    self.failUnlessEqual(utils.TailFile(fname), [])
612
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
613

    
614
  def testAllLines(self):
615
    data = ["test %d" % i for i in range(30)]
616
    for i in range(30):
617
      fname = self._CreateTempFile()
618
      fd = open(fname, "w")
619
      fd.write("\n".join(data[:i]))
620
      if i > 0:
621
        fd.write("\n")
622
      fd.close()
623
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
624

    
625
  def testPartialLines(self):
626
    data = ["test %d" % i for i in range(30)]
627
    fname = self._CreateTempFile()
628
    fd = open(fname, "w")
629
    fd.write("\n".join(data))
630
    fd.write("\n")
631
    fd.close()
632
    for i in range(1, 30):
633
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
634

    
635
  def testBigFile(self):
636
    data = ["test %d" % i for i in range(30)]
637
    fname = self._CreateTempFile()
638
    fd = open(fname, "w")
639
    fd.write("X" * 1048576)
640
    fd.write("\n")
641
    fd.write("\n".join(data))
642
    fd.write("\n")
643
    fd.close()
644
    for i in range(1, 30):
645
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
646

    
647

    
648
class TestPidFileFunctions(unittest.TestCase):
649
  """Tests for WritePidFile and ReadPidFile"""
650

    
651
  def setUp(self):
652
    self.dir = tempfile.mkdtemp()
653
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
654

    
655
  def testPidFileFunctions(self):
656
    pid_file = self.f_dpn('test')
657
    fd = utils.WritePidFile(self.f_dpn('test'))
658
    self.failUnless(os.path.exists(pid_file),
659
                    "PID file should have been created")
660
    read_pid = utils.ReadPidFile(pid_file)
661
    self.failUnlessEqual(read_pid, os.getpid())
662
    self.failUnless(utils.IsProcessAlive(read_pid))
663
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
664
                          self.f_dpn('test'))
665
    os.close(fd)
666
    utils.RemoveFile(self.f_dpn("test"))
667
    self.failIf(os.path.exists(pid_file),
668
                "PID file should not exist anymore")
669
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
670
                         "ReadPidFile should return 0 for missing pid file")
671
    fh = open(pid_file, "w")
672
    fh.write("blah\n")
673
    fh.close()
674
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
675
                         "ReadPidFile should return 0 for invalid pid file")
676
    # but now, even with the file existing, we should be able to lock it
677
    fd = utils.WritePidFile(self.f_dpn('test'))
678
    os.close(fd)
679
    utils.RemoveFile(self.f_dpn("test"))
680
    self.failIf(os.path.exists(pid_file),
681
                "PID file should not exist anymore")
682

    
683
  def testKill(self):
684
    pid_file = self.f_dpn('child')
685
    r_fd, w_fd = os.pipe()
686
    new_pid = os.fork()
687
    if new_pid == 0: #child
688
      utils.WritePidFile(self.f_dpn('child'))
689
      os.write(w_fd, 'a')
690
      signal.pause()
691
      os._exit(0)
692
      return
693
    # else we are in the parent
694
    # wait until the child has written the pid file
695
    os.read(r_fd, 1)
696
    read_pid = utils.ReadPidFile(pid_file)
697
    self.failUnlessEqual(read_pid, new_pid)
698
    self.failUnless(utils.IsProcessAlive(new_pid))
699
    utils.KillProcess(new_pid, waitpid=True)
700
    self.failIf(utils.IsProcessAlive(new_pid))
701
    utils.RemoveFile(self.f_dpn('child'))
702
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
703

    
704
  def tearDown(self):
705
    shutil.rmtree(self.dir)
706

    
707

    
708
class TestSshKeys(testutils.GanetiTestCase):
709
  """Test case for the AddAuthorizedKey function"""
710

    
711
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
712
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
713
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
714

    
715
  def setUp(self):
716
    testutils.GanetiTestCase.setUp(self)
717
    self.tmpname = self._CreateTempFile()
718
    handle = open(self.tmpname, 'w')
719
    try:
720
      handle.write("%s\n" % TestSshKeys.KEY_A)
721
      handle.write("%s\n" % TestSshKeys.KEY_B)
722
    finally:
723
      handle.close()
724

    
725
  def testAddingNewKey(self):
726
    utils.AddAuthorizedKey(self.tmpname,
727
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
728

    
729
    self.assertFileContent(self.tmpname,
730
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
731
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
732
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
733
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
734

    
735
  def testAddingAlmostButNotCompletelyTheSameKey(self):
736
    utils.AddAuthorizedKey(self.tmpname,
737
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
738

    
739
    self.assertFileContent(self.tmpname,
740
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
741
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
742
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
743
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
744

    
745
  def testAddingExistingKeyWithSomeMoreSpaces(self):
746
    utils.AddAuthorizedKey(self.tmpname,
747
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
748

    
749
    self.assertFileContent(self.tmpname,
750
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
751
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
752
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
753

    
754
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
755
    utils.RemoveAuthorizedKey(self.tmpname,
756
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
757

    
758
    self.assertFileContent(self.tmpname,
759
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
760
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
761

    
762
  def testRemovingNonExistingKey(self):
763
    utils.RemoveAuthorizedKey(self.tmpname,
764
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
765

    
766
    self.assertFileContent(self.tmpname,
767
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
768
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
769
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
770

    
771

    
772
class TestNewUUID(unittest.TestCase):
773
  """Test case for NewUUID"""
774

    
775
  def runTest(self):
776
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
777

    
778

    
779
if __name__ == "__main__":
780
  testutils.GanetiTestProgram()