Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ 6e7f0cd9

History | View | Annotate | Download (24.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import compat
35
from ganeti import errors
36

    
37
import testutils
38

    
39

    
40
class TestReadFile(testutils.GanetiTestCase):
41
  def testReadAll(self):
42
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
43
    self.assertEqual(len(data), 814)
44

    
45
    h = compat.md5_hash()
46
    h.update(data)
47
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
48

    
49
  def testReadSize(self):
50
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
51
                          size=100)
52
    self.assertEqual(len(data), 100)
53

    
54
    h = compat.md5_hash()
55
    h.update(data)
56
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
57

    
58
  def testError(self):
59
    self.assertRaises(EnvironmentError, utils.ReadFile,
60
                      "/dev/null/does-not-exist")
61

    
62

    
63
class TestReadOneLineFile(testutils.GanetiTestCase):
64
  def setUp(self):
65
    testutils.GanetiTestCase.setUp(self)
66

    
67
  def testDefault(self):
68
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
69
    self.assertEqual(len(data), 27)
70
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
71

    
72
  def testNotStrict(self):
73
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
74
                                 strict=False)
75
    self.assertEqual(len(data), 27)
76
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
77

    
78
  def testStrictFailure(self):
79
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
80
                      self._TestDataFilename("cert1.pem"), strict=True)
81

    
82
  def testLongLine(self):
83
    dummydata = (1024 * "Hello World! ")
84
    myfile = self._CreateTempFile()
85
    utils.WriteFile(myfile, data=dummydata)
86
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
87
    datalax = utils.ReadOneLineFile(myfile, strict=False)
88
    self.assertEqual(dummydata, datastrict)
89
    self.assertEqual(dummydata, datalax)
90

    
91
  def testNewline(self):
92
    myfile = self._CreateTempFile()
93
    myline = "myline"
94
    for nl in ["", "\n", "\r\n"]:
95
      dummydata = "%s%s" % (myline, nl)
96
      utils.WriteFile(myfile, data=dummydata)
97
      datalax = utils.ReadOneLineFile(myfile, strict=False)
98
      self.assertEqual(myline, datalax)
99
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
100
      self.assertEqual(myline, datastrict)
101

    
102
  def testWhitespaceAndMultipleLines(self):
103
    myfile = self._CreateTempFile()
104
    for nl in ["", "\n", "\r\n"]:
105
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
106
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
107
        utils.WriteFile(myfile, data=dummydata)
108
        datalax = utils.ReadOneLineFile(myfile, strict=False)
109
        if nl:
110
          self.assert_(set("\r\n") & set(dummydata))
111
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
112
                            myfile, strict=True)
113
          explen = len("Foo bar baz ") + len(ws)
114
          self.assertEqual(len(datalax), explen)
115
          self.assertEqual(datalax, dummydata[:explen])
116
          self.assertFalse(set("\r\n") & set(datalax))
117
        else:
118
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
119
          self.assertEqual(dummydata, datastrict)
120
          self.assertEqual(dummydata, datalax)
121

    
122
  def testEmptylines(self):
123
    myfile = self._CreateTempFile()
124
    myline = "myline"
125
    for nl in ["\n", "\r\n"]:
126
      for ol in ["", "otherline"]:
127
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
128
        utils.WriteFile(myfile, data=dummydata)
129
        self.assert_(set("\r\n") & set(dummydata))
130
        datalax = utils.ReadOneLineFile(myfile, strict=False)
131
        self.assertEqual(myline, datalax)
132
        if ol:
133
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
134
                            myfile, strict=True)
135
        else:
136
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
137
          self.assertEqual(myline, datastrict)
138

    
139
  def testEmptyfile(self):
140
    myfile = self._CreateTempFile()
141
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
142

    
143

    
144
class TestTimestampForFilename(unittest.TestCase):
145
  def test(self):
146
    self.assert_("." not in utils.TimestampForFilename())
147
    self.assert_(":" not in utils.TimestampForFilename())
148

    
149

    
150
class TestCreateBackup(testutils.GanetiTestCase):
151
  def setUp(self):
152
    testutils.GanetiTestCase.setUp(self)
153

    
154
    self.tmpdir = tempfile.mkdtemp()
155

    
156
  def tearDown(self):
157
    testutils.GanetiTestCase.tearDown(self)
158

    
159
    shutil.rmtree(self.tmpdir)
160

    
161
  def testEmpty(self):
162
    filename = utils.PathJoin(self.tmpdir, "config.data")
163
    utils.WriteFile(filename, data="")
164
    bname = utils.CreateBackup(filename)
165
    self.assertFileContent(bname, "")
166
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
167
    utils.CreateBackup(filename)
168
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
169
    utils.CreateBackup(filename)
170
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
171

    
172
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
173
    os.mkfifo(fifoname)
174
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
175

    
176
  def testContent(self):
177
    bkpcount = 0
178
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
179
      for rep in [1, 2, 10, 127]:
180
        testdata = data * rep
181

    
182
        filename = utils.PathJoin(self.tmpdir, "test.data_")
183
        utils.WriteFile(filename, data=testdata)
184
        self.assertFileContent(filename, testdata)
185

    
186
        for _ in range(3):
187
          bname = utils.CreateBackup(filename)
188
          bkpcount += 1
189
          self.assertFileContent(bname, testdata)
190
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
191

    
192

    
193
class TestListVisibleFiles(unittest.TestCase):
194
  """Test case for ListVisibleFiles"""
195

    
196
  def setUp(self):
197
    self.path = tempfile.mkdtemp()
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.path)
201

    
202
  def _CreateFiles(self, files):
203
    for name in files:
204
      utils.WriteFile(os.path.join(self.path, name), data="test")
205

    
206
  def _test(self, files, expected):
207
    self._CreateFiles(files)
208
    found = utils.ListVisibleFiles(self.path)
209
    self.assertEqual(set(found), set(expected))
210

    
211
  def testAllVisible(self):
212
    files = ["a", "b", "c"]
213
    expected = files
214
    self._test(files, expected)
215

    
216
  def testNoneVisible(self):
217
    files = [".a", ".b", ".c"]
218
    expected = []
219
    self._test(files, expected)
220

    
221
  def testSomeVisible(self):
222
    files = ["a", "b", ".c"]
223
    expected = ["a", "b"]
224
    self._test(files, expected)
225

    
226
  def testNonAbsolutePath(self):
227
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
228
                          "abc")
229

    
230
  def testNonNormalizedPath(self):
231
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
232
                          "/bin/../tmp")
233

    
234

    
235
class TestWriteFile(unittest.TestCase):
236
  def setUp(self):
237
    self.tmpdir = None
238
    self.tfile = tempfile.NamedTemporaryFile()
239
    self.did_pre = False
240
    self.did_post = False
241
    self.did_write = False
242

    
243
  def tearDown(self):
244
    if self.tmpdir:
245
      shutil.rmtree(self.tmpdir)
246

    
247
  def markPre(self, fd):
248
    self.did_pre = True
249

    
250
  def markPost(self, fd):
251
    self.did_post = True
252

    
253
  def markWrite(self, fd):
254
    self.did_write = True
255

    
256
  def testWrite(self):
257
    data = "abc"
258
    utils.WriteFile(self.tfile.name, data=data)
259
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
260

    
261
  def testErrors(self):
262
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
263
                      self.tfile.name, data="test", fn=lambda fd: None)
264
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
265
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
266
                      self.tfile.name, data="test", atime=0)
267

    
268
  def testPreWrite(self):
269
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
270
    self.assertTrue(self.did_pre)
271
    self.assertFalse(self.did_post)
272
    self.assertFalse(self.did_write)
273

    
274
  def testPostWrite(self):
275
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
276
    self.assertFalse(self.did_pre)
277
    self.assertTrue(self.did_post)
278
    self.assertFalse(self.did_write)
279

    
280
  def testWriteFunction(self):
281
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
282
    self.assertFalse(self.did_pre)
283
    self.assertFalse(self.did_post)
284
    self.assertTrue(self.did_write)
285

    
286
  def testDryRun(self):
287
    orig = "abc"
288
    self.tfile.write(orig)
289
    self.tfile.flush()
290
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
291
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
292

    
293
  def testTimes(self):
294
    f = self.tfile.name
295
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
296
                   (int(time.time()), 5000)]:
297
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
298
      st = os.stat(f)
299
      self.assertEqual(st.st_atime, at)
300
      self.assertEqual(st.st_mtime, mt)
301

    
302
  def testNoClose(self):
303
    data = "hello"
304
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
305
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
306
    try:
307
      os.lseek(fd, 0, 0)
308
      self.assertEqual(os.read(fd, 4096), data)
309
    finally:
310
      os.close(fd)
311

    
312
  def testNoLeftovers(self):
313
    self.tmpdir = tempfile.mkdtemp()
314
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
315
                                     data="abc"),
316
                     None)
317
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
318

    
319
  def testFailRename(self):
320
    self.tmpdir = tempfile.mkdtemp()
321
    target = utils.PathJoin(self.tmpdir, "target")
322
    os.mkdir(target)
323
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
324
    self.assertTrue(os.path.isdir(target))
325
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
326
    self.assertFalse(os.listdir(target))
327

    
328
  def testFailRenameDryRun(self):
329
    self.tmpdir = tempfile.mkdtemp()
330
    target = utils.PathJoin(self.tmpdir, "target")
331
    os.mkdir(target)
332
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
333
    self.assertTrue(os.path.isdir(target))
334
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
335
    self.assertFalse(os.listdir(target))
336

    
337
  def testBackup(self):
338
    self.tmpdir = tempfile.mkdtemp()
339
    testfile = utils.PathJoin(self.tmpdir, "test")
340

    
341
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
342
    self.assertEqual(utils.ReadFile(testfile), "foo")
343
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
344

    
345
    # Write again
346
    assert os.path.isfile(testfile)
347
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
348
    self.assertEqual(utils.ReadFile(testfile), "bar")
349
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
350
    self.assertTrue("test" in os.listdir(self.tmpdir))
351
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
352

    
353
    # Write again as dry-run
354
    assert os.path.isfile(testfile)
355
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
356
                                     dry_run=True),
357
                     None)
358
    self.assertEqual(utils.ReadFile(testfile), "bar")
359
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
360
    self.assertTrue("test" in os.listdir(self.tmpdir))
361
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
362

    
363

    
364
class TestFileID(testutils.GanetiTestCase):
365
  def testEquality(self):
366
    name = self._CreateTempFile()
367
    oldi = utils.GetFileID(path=name)
368
    self.failUnless(utils.VerifyFileID(oldi, oldi))
369

    
370
  def testUpdate(self):
371
    name = self._CreateTempFile()
372
    oldi = utils.GetFileID(path=name)
373
    os.utime(name, None)
374
    fd = os.open(name, os.O_RDWR)
375
    try:
376
      newi = utils.GetFileID(fd=fd)
377
      self.failUnless(utils.VerifyFileID(oldi, newi))
378
      self.failUnless(utils.VerifyFileID(newi, oldi))
379
    finally:
380
      os.close(fd)
381

    
382
  def testWriteFile(self):
383
    name = self._CreateTempFile()
384
    oldi = utils.GetFileID(path=name)
385
    mtime = oldi[2]
386
    os.utime(name, (mtime + 10, mtime + 10))
387
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
388
                      oldi, data="")
389
    os.utime(name, (mtime - 10, mtime - 10))
390
    utils.SafeWriteFile(name, oldi, data="")
391
    oldi = utils.GetFileID(path=name)
392
    mtime = oldi[2]
393
    os.utime(name, (mtime + 10, mtime + 10))
394
    # this doesn't raise, since we passed None
395
    utils.SafeWriteFile(name, None, data="")
396

    
397
  def testError(self):
398
    t = tempfile.NamedTemporaryFile()
399
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
400
                      path=t.name, fd=t.fileno())
401

    
402

    
403
class TestRemoveFile(unittest.TestCase):
404
  """Test case for the RemoveFile function"""
405

    
406
  def setUp(self):
407
    """Create a temp dir and file for each case"""
408
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
409
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
410
    os.close(fd)
411

    
412
  def tearDown(self):
413
    if os.path.exists(self.tmpfile):
414
      os.unlink(self.tmpfile)
415
    os.rmdir(self.tmpdir)
416

    
417
  def testIgnoreDirs(self):
418
    """Test that RemoveFile() ignores directories"""
419
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
420

    
421
  def testIgnoreNotExisting(self):
422
    """Test that RemoveFile() ignores non-existing files"""
423
    utils.RemoveFile(self.tmpfile)
424
    utils.RemoveFile(self.tmpfile)
425

    
426
  def testRemoveFile(self):
427
    """Test that RemoveFile does remove a file"""
428
    utils.RemoveFile(self.tmpfile)
429
    if os.path.exists(self.tmpfile):
430
      self.fail("File '%s' not removed" % self.tmpfile)
431

    
432
  def testRemoveSymlink(self):
433
    """Test that RemoveFile does remove symlinks"""
434
    symlink = self.tmpdir + "/symlink"
435
    os.symlink("no-such-file", symlink)
436
    utils.RemoveFile(symlink)
437
    if os.path.exists(symlink):
438
      self.fail("File '%s' not removed" % symlink)
439
    os.symlink(self.tmpfile, symlink)
440
    utils.RemoveFile(symlink)
441
    if os.path.exists(symlink):
442
      self.fail("File '%s' not removed" % symlink)
443

    
444

    
445
class TestRemoveDir(unittest.TestCase):
446
  def setUp(self):
447
    self.tmpdir = tempfile.mkdtemp()
448

    
449
  def tearDown(self):
450
    try:
451
      shutil.rmtree(self.tmpdir)
452
    except EnvironmentError:
453
      pass
454

    
455
  def testEmptyDir(self):
456
    utils.RemoveDir(self.tmpdir)
457
    self.assertFalse(os.path.isdir(self.tmpdir))
458

    
459
  def testNonEmptyDir(self):
460
    self.tmpfile = os.path.join(self.tmpdir, "test1")
461
    open(self.tmpfile, "w").close()
462
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
463

    
464

    
465
class TestRename(unittest.TestCase):
466
  """Test case for RenameFile"""
467

    
468
  def setUp(self):
469
    """Create a temporary directory"""
470
    self.tmpdir = tempfile.mkdtemp()
471
    self.tmpfile = os.path.join(self.tmpdir, "test1")
472

    
473
    # Touch the file
474
    open(self.tmpfile, "w").close()
475

    
476
  def tearDown(self):
477
    """Remove temporary directory"""
478
    shutil.rmtree(self.tmpdir)
479

    
480
  def testSimpleRename1(self):
481
    """Simple rename 1"""
482
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
483
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
484

    
485
  def testSimpleRename2(self):
486
    """Simple rename 2"""
487
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
488
                     mkdir=True)
489
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
490

    
491
  def testRenameMkdir(self):
492
    """Rename with mkdir"""
493
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
494
                     mkdir=True)
495
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
496
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
497

    
498
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
499
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
500
                     mkdir=True)
501
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
502
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
503
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
504

    
505

    
506
class TestMakedirs(unittest.TestCase):
507
  def setUp(self):
508
    self.tmpdir = tempfile.mkdtemp()
509

    
510
  def tearDown(self):
511
    shutil.rmtree(self.tmpdir)
512

    
513
  def testNonExisting(self):
514
    path = utils.PathJoin(self.tmpdir, "foo")
515
    utils.Makedirs(path)
516
    self.assert_(os.path.isdir(path))
517

    
518
  def testExisting(self):
519
    path = utils.PathJoin(self.tmpdir, "foo")
520
    os.mkdir(path)
521
    utils.Makedirs(path)
522
    self.assert_(os.path.isdir(path))
523

    
524
  def testRecursiveNonExisting(self):
525
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
526
    utils.Makedirs(path)
527
    self.assert_(os.path.isdir(path))
528

    
529
  def testRecursiveExisting(self):
530
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
531
    self.assertFalse(os.path.exists(path))
532
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
533
    utils.Makedirs(path)
534
    self.assert_(os.path.isdir(path))
535

    
536

    
537
class TestEnsureDirs(unittest.TestCase):
538
  """Tests for EnsureDirs"""
539

    
540
  def setUp(self):
541
    self.dir = tempfile.mkdtemp()
542
    self.old_umask = os.umask(0777)
543

    
544
  def testEnsureDirs(self):
545
    utils.EnsureDirs([
546
        (utils.PathJoin(self.dir, "foo"), 0777),
547
        (utils.PathJoin(self.dir, "bar"), 0000),
548
        ])
549
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
550
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
551

    
552
  def tearDown(self):
553
    os.rmdir(utils.PathJoin(self.dir, "foo"))
554
    os.rmdir(utils.PathJoin(self.dir, "bar"))
555
    os.rmdir(self.dir)
556
    os.umask(self.old_umask)
557

    
558

    
559
class TestIsNormAbsPath(unittest.TestCase):
560
  """Testing case for IsNormAbsPath"""
561

    
562
  def _pathTestHelper(self, path, result):
563
    if result:
564
      self.assert_(utils.IsNormAbsPath(path),
565
          "Path %s should result absolute and normalized" % path)
566
    else:
567
      self.assertFalse(utils.IsNormAbsPath(path),
568
          "Path %s should not result absolute and normalized" % path)
569

    
570
  def testBase(self):
571
    self._pathTestHelper("/etc", True)
572
    self._pathTestHelper("/srv", True)
573
    self._pathTestHelper("etc", False)
574
    self._pathTestHelper("/etc/../root", False)
575
    self._pathTestHelper("/etc/", False)
576

    
577

    
578
class TestPathJoin(unittest.TestCase):
579
  """Testing case for PathJoin"""
580

    
581
  def testBasicItems(self):
582
    mlist = ["/a", "b", "c"]
583
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
584

    
585
  def testNonAbsPrefix(self):
586
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
587

    
588
  def testBackTrack(self):
589
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
590

    
591
  def testMultiAbs(self):
592
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
593

    
594

    
595
class TestTailFile(testutils.GanetiTestCase):
596
  """Test case for the TailFile function"""
597

    
598
  def testEmpty(self):
599
    fname = self._CreateTempFile()
600
    self.failUnlessEqual(utils.TailFile(fname), [])
601
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
602

    
603
  def testAllLines(self):
604
    data = ["test %d" % i for i in range(30)]
605
    for i in range(30):
606
      fname = self._CreateTempFile()
607
      fd = open(fname, "w")
608
      fd.write("\n".join(data[:i]))
609
      if i > 0:
610
        fd.write("\n")
611
      fd.close()
612
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
613

    
614
  def testPartialLines(self):
615
    data = ["test %d" % i for i in range(30)]
616
    fname = self._CreateTempFile()
617
    fd = open(fname, "w")
618
    fd.write("\n".join(data))
619
    fd.write("\n")
620
    fd.close()
621
    for i in range(1, 30):
622
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
623

    
624
  def testBigFile(self):
625
    data = ["test %d" % i for i in range(30)]
626
    fname = self._CreateTempFile()
627
    fd = open(fname, "w")
628
    fd.write("X" * 1048576)
629
    fd.write("\n")
630
    fd.write("\n".join(data))
631
    fd.write("\n")
632
    fd.close()
633
    for i in range(1, 30):
634
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
635

    
636

    
637
class TestPidFileFunctions(unittest.TestCase):
638
  """Tests for WritePidFile and ReadPidFile"""
639

    
640
  def setUp(self):
641
    self.dir = tempfile.mkdtemp()
642
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
643

    
644
  def testPidFileFunctions(self):
645
    pid_file = self.f_dpn('test')
646
    fd = utils.WritePidFile(self.f_dpn('test'))
647
    self.failUnless(os.path.exists(pid_file),
648
                    "PID file should have been created")
649
    read_pid = utils.ReadPidFile(pid_file)
650
    self.failUnlessEqual(read_pid, os.getpid())
651
    self.failUnless(utils.IsProcessAlive(read_pid))
652
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
653
                          self.f_dpn('test'))
654
    os.close(fd)
655
    utils.RemoveFile(self.f_dpn("test"))
656
    self.failIf(os.path.exists(pid_file),
657
                "PID file should not exist anymore")
658
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
659
                         "ReadPidFile should return 0 for missing pid file")
660
    fh = open(pid_file, "w")
661
    fh.write("blah\n")
662
    fh.close()
663
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
664
                         "ReadPidFile should return 0 for invalid pid file")
665
    # but now, even with the file existing, we should be able to lock it
666
    fd = utils.WritePidFile(self.f_dpn('test'))
667
    os.close(fd)
668
    utils.RemoveFile(self.f_dpn("test"))
669
    self.failIf(os.path.exists(pid_file),
670
                "PID file should not exist anymore")
671

    
672
  def testKill(self):
673
    pid_file = self.f_dpn('child')
674
    r_fd, w_fd = os.pipe()
675
    new_pid = os.fork()
676
    if new_pid == 0: #child
677
      utils.WritePidFile(self.f_dpn('child'))
678
      os.write(w_fd, 'a')
679
      signal.pause()
680
      os._exit(0)
681
      return
682
    # else we are in the parent
683
    # wait until the child has written the pid file
684
    os.read(r_fd, 1)
685
    read_pid = utils.ReadPidFile(pid_file)
686
    self.failUnlessEqual(read_pid, new_pid)
687
    self.failUnless(utils.IsProcessAlive(new_pid))
688
    utils.KillProcess(new_pid, waitpid=True)
689
    self.failIf(utils.IsProcessAlive(new_pid))
690
    utils.RemoveFile(self.f_dpn('child'))
691
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
692

    
693
  def tearDown(self):
694
    shutil.rmtree(self.dir)
695

    
696

    
697
class TestSshKeys(testutils.GanetiTestCase):
698
  """Test case for the AddAuthorizedKey function"""
699

    
700
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
701
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
702
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
703

    
704
  def setUp(self):
705
    testutils.GanetiTestCase.setUp(self)
706
    self.tmpname = self._CreateTempFile()
707
    handle = open(self.tmpname, 'w')
708
    try:
709
      handle.write("%s\n" % TestSshKeys.KEY_A)
710
      handle.write("%s\n" % TestSshKeys.KEY_B)
711
    finally:
712
      handle.close()
713

    
714
  def testAddingNewKey(self):
715
    utils.AddAuthorizedKey(self.tmpname,
716
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
717

    
718
    self.assertFileContent(self.tmpname,
719
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
720
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
721
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
722
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
723

    
724
  def testAddingAlmostButNotCompletelyTheSameKey(self):
725
    utils.AddAuthorizedKey(self.tmpname,
726
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
727

    
728
    self.assertFileContent(self.tmpname,
729
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
730
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
731
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
732
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
733

    
734
  def testAddingExistingKeyWithSomeMoreSpaces(self):
735
    utils.AddAuthorizedKey(self.tmpname,
736
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
737

    
738
    self.assertFileContent(self.tmpname,
739
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
740
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
741
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
742

    
743
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
744
    utils.RemoveAuthorizedKey(self.tmpname,
745
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
746

    
747
    self.assertFileContent(self.tmpname,
748
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
749
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
750

    
751
  def testRemovingNonExistingKey(self):
752
    utils.RemoveAuthorizedKey(self.tmpname,
753
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
754

    
755
    self.assertFileContent(self.tmpname,
756
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
757
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
758
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
759

    
760

    
761
class TestNewUUID(unittest.TestCase):
762
  """Test case for NewUUID"""
763

    
764
  def runTest(self):
765
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
766

    
767

    
768
if __name__ == "__main__":
769
  testutils.GanetiTestProgram()