Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ 0b71067a

History | View | Annotate | Download (25.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import compat
35
from ganeti import errors
36

    
37
import testutils
38

    
39

    
40
class TestReadFile(testutils.GanetiTestCase):
41
  def testReadAll(self):
42
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
43
    self.assertEqual(len(data), 814)
44

    
45
    h = compat.md5_hash()
46
    h.update(data)
47
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
48

    
49
  def testReadSize(self):
50
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
51
                          size=100)
52
    self.assertEqual(len(data), 100)
53

    
54
    h = compat.md5_hash()
55
    h.update(data)
56
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
57

    
58
  def testError(self):
59
    self.assertRaises(EnvironmentError, utils.ReadFile,
60
                      "/dev/null/does-not-exist")
61

    
62

    
63
class TestReadOneLineFile(testutils.GanetiTestCase):
64
  def setUp(self):
65
    testutils.GanetiTestCase.setUp(self)
66

    
67
  def testDefault(self):
68
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
69
    self.assertEqual(len(data), 27)
70
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
71

    
72
  def testNotStrict(self):
73
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
74
                                 strict=False)
75
    self.assertEqual(len(data), 27)
76
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
77

    
78
  def testStrictFailure(self):
79
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
80
                      self._TestDataFilename("cert1.pem"), strict=True)
81

    
82
  def testLongLine(self):
83
    dummydata = (1024 * "Hello World! ")
84
    myfile = self._CreateTempFile()
85
    utils.WriteFile(myfile, data=dummydata)
86
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
87
    datalax = utils.ReadOneLineFile(myfile, strict=False)
88
    self.assertEqual(dummydata, datastrict)
89
    self.assertEqual(dummydata, datalax)
90

    
91
  def testNewline(self):
92
    myfile = self._CreateTempFile()
93
    myline = "myline"
94
    for nl in ["", "\n", "\r\n"]:
95
      dummydata = "%s%s" % (myline, nl)
96
      utils.WriteFile(myfile, data=dummydata)
97
      datalax = utils.ReadOneLineFile(myfile, strict=False)
98
      self.assertEqual(myline, datalax)
99
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
100
      self.assertEqual(myline, datastrict)
101

    
102
  def testWhitespaceAndMultipleLines(self):
103
    myfile = self._CreateTempFile()
104
    for nl in ["", "\n", "\r\n"]:
105
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
106
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
107
        utils.WriteFile(myfile, data=dummydata)
108
        datalax = utils.ReadOneLineFile(myfile, strict=False)
109
        if nl:
110
          self.assert_(set("\r\n") & set(dummydata))
111
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
112
                            myfile, strict=True)
113
          explen = len("Foo bar baz ") + len(ws)
114
          self.assertEqual(len(datalax), explen)
115
          self.assertEqual(datalax, dummydata[:explen])
116
          self.assertFalse(set("\r\n") & set(datalax))
117
        else:
118
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
119
          self.assertEqual(dummydata, datastrict)
120
          self.assertEqual(dummydata, datalax)
121

    
122
  def testEmptylines(self):
123
    myfile = self._CreateTempFile()
124
    myline = "myline"
125
    for nl in ["\n", "\r\n"]:
126
      for ol in ["", "otherline"]:
127
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
128
        utils.WriteFile(myfile, data=dummydata)
129
        self.assert_(set("\r\n") & set(dummydata))
130
        datalax = utils.ReadOneLineFile(myfile, strict=False)
131
        self.assertEqual(myline, datalax)
132
        if ol:
133
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
134
                            myfile, strict=True)
135
        else:
136
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
137
          self.assertEqual(myline, datastrict)
138

    
139
  def testEmptyfile(self):
140
    myfile = self._CreateTempFile()
141
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
142

    
143

    
144
class TestTimestampForFilename(unittest.TestCase):
145
  def test(self):
146
    self.assert_("." not in utils.TimestampForFilename())
147
    self.assert_(":" not in utils.TimestampForFilename())
148

    
149

    
150
class TestCreateBackup(testutils.GanetiTestCase):
151
  def setUp(self):
152
    testutils.GanetiTestCase.setUp(self)
153

    
154
    self.tmpdir = tempfile.mkdtemp()
155

    
156
  def tearDown(self):
157
    testutils.GanetiTestCase.tearDown(self)
158

    
159
    shutil.rmtree(self.tmpdir)
160

    
161
  def testEmpty(self):
162
    filename = utils.PathJoin(self.tmpdir, "config.data")
163
    utils.WriteFile(filename, data="")
164
    bname = utils.CreateBackup(filename)
165
    self.assertFileContent(bname, "")
166
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
167
    utils.CreateBackup(filename)
168
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
169
    utils.CreateBackup(filename)
170
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
171

    
172
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
173
    os.mkfifo(fifoname)
174
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
175

    
176
  def testContent(self):
177
    bkpcount = 0
178
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
179
      for rep in [1, 2, 10, 127]:
180
        testdata = data * rep
181

    
182
        filename = utils.PathJoin(self.tmpdir, "test.data_")
183
        utils.WriteFile(filename, data=testdata)
184
        self.assertFileContent(filename, testdata)
185

    
186
        for _ in range(3):
187
          bname = utils.CreateBackup(filename)
188
          bkpcount += 1
189
          self.assertFileContent(bname, testdata)
190
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
191

    
192

    
193
class TestListVisibleFiles(unittest.TestCase):
194
  """Test case for ListVisibleFiles"""
195

    
196
  def setUp(self):
197
    self.path = tempfile.mkdtemp()
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.path)
201

    
202
  def _CreateFiles(self, files):
203
    for name in files:
204
      utils.WriteFile(os.path.join(self.path, name), data="test")
205

    
206
  def _test(self, files, expected):
207
    self._CreateFiles(files)
208
    found = utils.ListVisibleFiles(self.path)
209
    self.assertEqual(set(found), set(expected))
210

    
211
  def testAllVisible(self):
212
    files = ["a", "b", "c"]
213
    expected = files
214
    self._test(files, expected)
215

    
216
  def testNoneVisible(self):
217
    files = [".a", ".b", ".c"]
218
    expected = []
219
    self._test(files, expected)
220

    
221
  def testSomeVisible(self):
222
    files = ["a", "b", ".c"]
223
    expected = ["a", "b"]
224
    self._test(files, expected)
225

    
226
  def testNonAbsolutePath(self):
227
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
228
                          "abc")
229

    
230
  def testNonNormalizedPath(self):
231
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
232
                          "/bin/../tmp")
233

    
234

    
235
class TestWriteFile(unittest.TestCase):
236
  def setUp(self):
237
    self.tmpdir = None
238
    self.tfile = tempfile.NamedTemporaryFile()
239
    self.did_pre = False
240
    self.did_post = False
241
    self.did_write = False
242

    
243
  def tearDown(self):
244
    if self.tmpdir:
245
      shutil.rmtree(self.tmpdir)
246

    
247
  def markPre(self, fd):
248
    self.did_pre = True
249

    
250
  def markPost(self, fd):
251
    self.did_post = True
252

    
253
  def markWrite(self, fd):
254
    self.did_write = True
255

    
256
  def testWrite(self):
257
    data = "abc"
258
    utils.WriteFile(self.tfile.name, data=data)
259
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
260

    
261
  def testWriteSimpleUnicode(self):
262
    data = u"abc"
263
    utils.WriteFile(self.tfile.name, data=data)
264
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
265

    
266
  def testErrors(self):
267
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
268
                      self.tfile.name, data="test", fn=lambda fd: None)
269
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
270
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
271
                      self.tfile.name, data="test", atime=0)
272

    
273
  def testPreWrite(self):
274
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
275
    self.assertTrue(self.did_pre)
276
    self.assertFalse(self.did_post)
277
    self.assertFalse(self.did_write)
278

    
279
  def testPostWrite(self):
280
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
281
    self.assertFalse(self.did_pre)
282
    self.assertTrue(self.did_post)
283
    self.assertFalse(self.did_write)
284

    
285
  def testWriteFunction(self):
286
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
287
    self.assertFalse(self.did_pre)
288
    self.assertFalse(self.did_post)
289
    self.assertTrue(self.did_write)
290

    
291
  def testDryRun(self):
292
    orig = "abc"
293
    self.tfile.write(orig)
294
    self.tfile.flush()
295
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
296
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
297

    
298
  def testTimes(self):
299
    f = self.tfile.name
300
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
301
                   (int(time.time()), 5000)]:
302
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
303
      st = os.stat(f)
304
      self.assertEqual(st.st_atime, at)
305
      self.assertEqual(st.st_mtime, mt)
306

    
307
  def testNoClose(self):
308
    data = "hello"
309
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
310
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
311
    try:
312
      os.lseek(fd, 0, 0)
313
      self.assertEqual(os.read(fd, 4096), data)
314
    finally:
315
      os.close(fd)
316

    
317
  def testNoLeftovers(self):
318
    self.tmpdir = tempfile.mkdtemp()
319
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
320
                                     data="abc"),
321
                     None)
322
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
323

    
324
  def testFailRename(self):
325
    self.tmpdir = tempfile.mkdtemp()
326
    target = utils.PathJoin(self.tmpdir, "target")
327
    os.mkdir(target)
328
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
329
    self.assertTrue(os.path.isdir(target))
330
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
331
    self.assertFalse(os.listdir(target))
332

    
333
  def testFailRenameDryRun(self):
334
    self.tmpdir = tempfile.mkdtemp()
335
    target = utils.PathJoin(self.tmpdir, "target")
336
    os.mkdir(target)
337
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
338
    self.assertTrue(os.path.isdir(target))
339
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
340
    self.assertFalse(os.listdir(target))
341

    
342
  def testBackup(self):
343
    self.tmpdir = tempfile.mkdtemp()
344
    testfile = utils.PathJoin(self.tmpdir, "test")
345

    
346
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
347
    self.assertEqual(utils.ReadFile(testfile), "foo")
348
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
349

    
350
    # Write again
351
    assert os.path.isfile(testfile)
352
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
353
    self.assertEqual(utils.ReadFile(testfile), "bar")
354
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
355
    self.assertTrue("test" in os.listdir(self.tmpdir))
356
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
357

    
358
    # Write again as dry-run
359
    assert os.path.isfile(testfile)
360
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
361
                                     dry_run=True),
362
                     None)
363
    self.assertEqual(utils.ReadFile(testfile), "bar")
364
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
365
    self.assertTrue("test" in os.listdir(self.tmpdir))
366
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
367

    
368

    
369
class TestFileID(testutils.GanetiTestCase):
370
  def testEquality(self):
371
    name = self._CreateTempFile()
372
    oldi = utils.GetFileID(path=name)
373
    self.failUnless(utils.VerifyFileID(oldi, oldi))
374

    
375
  def testUpdate(self):
376
    name = self._CreateTempFile()
377
    oldi = utils.GetFileID(path=name)
378
    os.utime(name, None)
379
    fd = os.open(name, os.O_RDWR)
380
    try:
381
      newi = utils.GetFileID(fd=fd)
382
      self.failUnless(utils.VerifyFileID(oldi, newi))
383
      self.failUnless(utils.VerifyFileID(newi, oldi))
384
    finally:
385
      os.close(fd)
386

    
387
  def testWriteFile(self):
388
    name = self._CreateTempFile()
389
    oldi = utils.GetFileID(path=name)
390
    mtime = oldi[2]
391
    os.utime(name, (mtime + 10, mtime + 10))
392
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
393
                      oldi, data="")
394
    os.utime(name, (mtime - 10, mtime - 10))
395
    utils.SafeWriteFile(name, oldi, data="")
396
    oldi = utils.GetFileID(path=name)
397
    mtime = oldi[2]
398
    os.utime(name, (mtime + 10, mtime + 10))
399
    # this doesn't raise, since we passed None
400
    utils.SafeWriteFile(name, None, data="")
401

    
402
  def testError(self):
403
    t = tempfile.NamedTemporaryFile()
404
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
405
                      path=t.name, fd=t.fileno())
406

    
407

    
408
class TestRemoveFile(unittest.TestCase):
409
  """Test case for the RemoveFile function"""
410

    
411
  def setUp(self):
412
    """Create a temp dir and file for each case"""
413
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
414
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
415
    os.close(fd)
416

    
417
  def tearDown(self):
418
    if os.path.exists(self.tmpfile):
419
      os.unlink(self.tmpfile)
420
    os.rmdir(self.tmpdir)
421

    
422
  def testIgnoreDirs(self):
423
    """Test that RemoveFile() ignores directories"""
424
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
425

    
426
  def testIgnoreNotExisting(self):
427
    """Test that RemoveFile() ignores non-existing files"""
428
    utils.RemoveFile(self.tmpfile)
429
    utils.RemoveFile(self.tmpfile)
430

    
431
  def testRemoveFile(self):
432
    """Test that RemoveFile does remove a file"""
433
    utils.RemoveFile(self.tmpfile)
434
    if os.path.exists(self.tmpfile):
435
      self.fail("File '%s' not removed" % self.tmpfile)
436

    
437
  def testRemoveSymlink(self):
438
    """Test that RemoveFile does remove symlinks"""
439
    symlink = self.tmpdir + "/symlink"
440
    os.symlink("no-such-file", symlink)
441
    utils.RemoveFile(symlink)
442
    if os.path.exists(symlink):
443
      self.fail("File '%s' not removed" % symlink)
444
    os.symlink(self.tmpfile, symlink)
445
    utils.RemoveFile(symlink)
446
    if os.path.exists(symlink):
447
      self.fail("File '%s' not removed" % symlink)
448

    
449

    
450
class TestRemoveDir(unittest.TestCase):
451
  def setUp(self):
452
    self.tmpdir = tempfile.mkdtemp()
453

    
454
  def tearDown(self):
455
    try:
456
      shutil.rmtree(self.tmpdir)
457
    except EnvironmentError:
458
      pass
459

    
460
  def testEmptyDir(self):
461
    utils.RemoveDir(self.tmpdir)
462
    self.assertFalse(os.path.isdir(self.tmpdir))
463

    
464
  def testNonEmptyDir(self):
465
    self.tmpfile = os.path.join(self.tmpdir, "test1")
466
    open(self.tmpfile, "w").close()
467
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
468

    
469

    
470
class TestRename(unittest.TestCase):
471
  """Test case for RenameFile"""
472

    
473
  def setUp(self):
474
    """Create a temporary directory"""
475
    self.tmpdir = tempfile.mkdtemp()
476
    self.tmpfile = os.path.join(self.tmpdir, "test1")
477

    
478
    # Touch the file
479
    open(self.tmpfile, "w").close()
480

    
481
  def tearDown(self):
482
    """Remove temporary directory"""
483
    shutil.rmtree(self.tmpdir)
484

    
485
  def testSimpleRename1(self):
486
    """Simple rename 1"""
487
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
488
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
489

    
490
  def testSimpleRename2(self):
491
    """Simple rename 2"""
492
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
493
                     mkdir=True)
494
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
495

    
496
  def testRenameMkdir(self):
497
    """Rename with mkdir"""
498
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
499
                     mkdir=True)
500
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
501
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
502

    
503
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
504
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
505
                     mkdir=True)
506
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
507
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
508
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
509

    
510

    
511
class TestMakedirs(unittest.TestCase):
512
  def setUp(self):
513
    self.tmpdir = tempfile.mkdtemp()
514

    
515
  def tearDown(self):
516
    shutil.rmtree(self.tmpdir)
517

    
518
  def testNonExisting(self):
519
    path = utils.PathJoin(self.tmpdir, "foo")
520
    utils.Makedirs(path)
521
    self.assert_(os.path.isdir(path))
522

    
523
  def testExisting(self):
524
    path = utils.PathJoin(self.tmpdir, "foo")
525
    os.mkdir(path)
526
    utils.Makedirs(path)
527
    self.assert_(os.path.isdir(path))
528

    
529
  def testRecursiveNonExisting(self):
530
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
531
    utils.Makedirs(path)
532
    self.assert_(os.path.isdir(path))
533

    
534
  def testRecursiveExisting(self):
535
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
536
    self.assertFalse(os.path.exists(path))
537
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
538
    utils.Makedirs(path)
539
    self.assert_(os.path.isdir(path))
540

    
541

    
542
class TestEnsureDirs(unittest.TestCase):
543
  """Tests for EnsureDirs"""
544

    
545
  def setUp(self):
546
    self.dir = tempfile.mkdtemp()
547
    self.old_umask = os.umask(0777)
548

    
549
  def testEnsureDirs(self):
550
    utils.EnsureDirs([
551
        (utils.PathJoin(self.dir, "foo"), 0777),
552
        (utils.PathJoin(self.dir, "bar"), 0000),
553
        ])
554
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
555
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
556

    
557
  def tearDown(self):
558
    os.rmdir(utils.PathJoin(self.dir, "foo"))
559
    os.rmdir(utils.PathJoin(self.dir, "bar"))
560
    os.rmdir(self.dir)
561
    os.umask(self.old_umask)
562

    
563

    
564
class TestIsNormAbsPath(unittest.TestCase):
565
  """Testing case for IsNormAbsPath"""
566

    
567
  def _pathTestHelper(self, path, result):
568
    if result:
569
      self.assert_(utils.IsNormAbsPath(path),
570
          "Path %s should result absolute and normalized" % path)
571
    else:
572
      self.assertFalse(utils.IsNormAbsPath(path),
573
          "Path %s should not result absolute and normalized" % path)
574

    
575
  def testBase(self):
576
    self._pathTestHelper("/etc", True)
577
    self._pathTestHelper("/srv", True)
578
    self._pathTestHelper("etc", False)
579
    self._pathTestHelper("/etc/../root", False)
580
    self._pathTestHelper("/etc/", False)
581

    
582

    
583
class TestPathJoin(unittest.TestCase):
584
  """Testing case for PathJoin"""
585

    
586
  def testBasicItems(self):
587
    mlist = ["/a", "b", "c"]
588
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
589

    
590
  def testNonAbsPrefix(self):
591
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
592

    
593
  def testBackTrack(self):
594
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
595

    
596
  def testMultiAbs(self):
597
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
598

    
599

    
600
class TestTailFile(testutils.GanetiTestCase):
601
  """Test case for the TailFile function"""
602

    
603
  def testEmpty(self):
604
    fname = self._CreateTempFile()
605
    self.failUnlessEqual(utils.TailFile(fname), [])
606
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
607

    
608
  def testAllLines(self):
609
    data = ["test %d" % i for i in range(30)]
610
    for i in range(30):
611
      fname = self._CreateTempFile()
612
      fd = open(fname, "w")
613
      fd.write("\n".join(data[:i]))
614
      if i > 0:
615
        fd.write("\n")
616
      fd.close()
617
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
618

    
619
  def testPartialLines(self):
620
    data = ["test %d" % i for i in range(30)]
621
    fname = self._CreateTempFile()
622
    fd = open(fname, "w")
623
    fd.write("\n".join(data))
624
    fd.write("\n")
625
    fd.close()
626
    for i in range(1, 30):
627
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
628

    
629
  def testBigFile(self):
630
    data = ["test %d" % i for i in range(30)]
631
    fname = self._CreateTempFile()
632
    fd = open(fname, "w")
633
    fd.write("X" * 1048576)
634
    fd.write("\n")
635
    fd.write("\n".join(data))
636
    fd.write("\n")
637
    fd.close()
638
    for i in range(1, 30):
639
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
640

    
641

    
642
class TestPidFileFunctions(unittest.TestCase):
643
  """Tests for WritePidFile and ReadPidFile"""
644

    
645
  def setUp(self):
646
    self.dir = tempfile.mkdtemp()
647
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
648

    
649
  def testPidFileFunctions(self):
650
    pid_file = self.f_dpn('test')
651
    fd = utils.WritePidFile(self.f_dpn('test'))
652
    self.failUnless(os.path.exists(pid_file),
653
                    "PID file should have been created")
654
    read_pid = utils.ReadPidFile(pid_file)
655
    self.failUnlessEqual(read_pid, os.getpid())
656
    self.failUnless(utils.IsProcessAlive(read_pid))
657
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
658
                          self.f_dpn('test'))
659
    os.close(fd)
660
    utils.RemoveFile(self.f_dpn("test"))
661
    self.failIf(os.path.exists(pid_file),
662
                "PID file should not exist anymore")
663
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
664
                         "ReadPidFile should return 0 for missing pid file")
665
    fh = open(pid_file, "w")
666
    fh.write("blah\n")
667
    fh.close()
668
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
669
                         "ReadPidFile should return 0 for invalid pid file")
670
    # but now, even with the file existing, we should be able to lock it
671
    fd = utils.WritePidFile(self.f_dpn('test'))
672
    os.close(fd)
673
    utils.RemoveFile(self.f_dpn("test"))
674
    self.failIf(os.path.exists(pid_file),
675
                "PID file should not exist anymore")
676

    
677
  def testKill(self):
678
    pid_file = self.f_dpn('child')
679
    r_fd, w_fd = os.pipe()
680
    new_pid = os.fork()
681
    if new_pid == 0: #child
682
      utils.WritePidFile(self.f_dpn('child'))
683
      os.write(w_fd, 'a')
684
      signal.pause()
685
      os._exit(0)
686
      return
687
    # else we are in the parent
688
    # wait until the child has written the pid file
689
    os.read(r_fd, 1)
690
    read_pid = utils.ReadPidFile(pid_file)
691
    self.failUnlessEqual(read_pid, new_pid)
692
    self.failUnless(utils.IsProcessAlive(new_pid))
693
    utils.KillProcess(new_pid, waitpid=True)
694
    self.failIf(utils.IsProcessAlive(new_pid))
695
    utils.RemoveFile(self.f_dpn('child'))
696
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
697

    
698
  def tearDown(self):
699
    shutil.rmtree(self.dir)
700

    
701

    
702
class TestSshKeys(testutils.GanetiTestCase):
703
  """Test case for the AddAuthorizedKey function"""
704

    
705
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
706
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
707
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
708

    
709
  def setUp(self):
710
    testutils.GanetiTestCase.setUp(self)
711
    self.tmpname = self._CreateTempFile()
712
    handle = open(self.tmpname, 'w')
713
    try:
714
      handle.write("%s\n" % TestSshKeys.KEY_A)
715
      handle.write("%s\n" % TestSshKeys.KEY_B)
716
    finally:
717
      handle.close()
718

    
719
  def testAddingNewKey(self):
720
    utils.AddAuthorizedKey(self.tmpname,
721
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
722

    
723
    self.assertFileContent(self.tmpname,
724
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
725
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
726
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
727
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
728

    
729
  def testAddingAlmostButNotCompletelyTheSameKey(self):
730
    utils.AddAuthorizedKey(self.tmpname,
731
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
732

    
733
    self.assertFileContent(self.tmpname,
734
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
735
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
736
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
737
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
738

    
739
  def testAddingExistingKeyWithSomeMoreSpaces(self):
740
    utils.AddAuthorizedKey(self.tmpname,
741
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
742

    
743
    self.assertFileContent(self.tmpname,
744
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
745
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
746
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
747

    
748
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
749
    utils.RemoveAuthorizedKey(self.tmpname,
750
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
751

    
752
    self.assertFileContent(self.tmpname,
753
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
754
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
755

    
756
  def testRemovingNonExistingKey(self):
757
    utils.RemoveAuthorizedKey(self.tmpname,
758
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
759

    
760
    self.assertFileContent(self.tmpname,
761
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
762
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
763
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
764

    
765

    
766
class TestNewUUID(unittest.TestCase):
767
  """Test case for NewUUID"""
768

    
769
  def runTest(self):
770
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
771

    
772

    
773
if __name__ == "__main__":
774
  testutils.GanetiTestProgram()