Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ 90e234a6

History | View | Annotate | Download (22.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31

    
32
from ganeti import constants
33
from ganeti import utils
34
from ganeti import compat
35
from ganeti import errors
36

    
37
import testutils
38

    
39

    
40
class TestReadFile(testutils.GanetiTestCase):
41
  def testReadAll(self):
42
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
43
    self.assertEqual(len(data), 814)
44

    
45
    h = compat.md5_hash()
46
    h.update(data)
47
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
48

    
49
  def testReadSize(self):
50
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
51
                          size=100)
52
    self.assertEqual(len(data), 100)
53

    
54
    h = compat.md5_hash()
55
    h.update(data)
56
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
57

    
58
  def testError(self):
59
    self.assertRaises(EnvironmentError, utils.ReadFile,
60
                      "/dev/null/does-not-exist")
61

    
62

    
63
class TestReadOneLineFile(testutils.GanetiTestCase):
64
  def setUp(self):
65
    testutils.GanetiTestCase.setUp(self)
66

    
67
  def testDefault(self):
68
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
69
    self.assertEqual(len(data), 27)
70
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
71

    
72
  def testNotStrict(self):
73
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
74
                                 strict=False)
75
    self.assertEqual(len(data), 27)
76
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
77

    
78
  def testStrictFailure(self):
79
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
80
                      self._TestDataFilename("cert1.pem"), strict=True)
81

    
82
  def testLongLine(self):
83
    dummydata = (1024 * "Hello World! ")
84
    myfile = self._CreateTempFile()
85
    utils.WriteFile(myfile, data=dummydata)
86
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
87
    datalax = utils.ReadOneLineFile(myfile, strict=False)
88
    self.assertEqual(dummydata, datastrict)
89
    self.assertEqual(dummydata, datalax)
90

    
91
  def testNewline(self):
92
    myfile = self._CreateTempFile()
93
    myline = "myline"
94
    for nl in ["", "\n", "\r\n"]:
95
      dummydata = "%s%s" % (myline, nl)
96
      utils.WriteFile(myfile, data=dummydata)
97
      datalax = utils.ReadOneLineFile(myfile, strict=False)
98
      self.assertEqual(myline, datalax)
99
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
100
      self.assertEqual(myline, datastrict)
101

    
102
  def testWhitespaceAndMultipleLines(self):
103
    myfile = self._CreateTempFile()
104
    for nl in ["", "\n", "\r\n"]:
105
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
106
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
107
        utils.WriteFile(myfile, data=dummydata)
108
        datalax = utils.ReadOneLineFile(myfile, strict=False)
109
        if nl:
110
          self.assert_(set("\r\n") & set(dummydata))
111
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
112
                            myfile, strict=True)
113
          explen = len("Foo bar baz ") + len(ws)
114
          self.assertEqual(len(datalax), explen)
115
          self.assertEqual(datalax, dummydata[:explen])
116
          self.assertFalse(set("\r\n") & set(datalax))
117
        else:
118
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
119
          self.assertEqual(dummydata, datastrict)
120
          self.assertEqual(dummydata, datalax)
121

    
122
  def testEmptylines(self):
123
    myfile = self._CreateTempFile()
124
    myline = "myline"
125
    for nl in ["\n", "\r\n"]:
126
      for ol in ["", "otherline"]:
127
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
128
        utils.WriteFile(myfile, data=dummydata)
129
        self.assert_(set("\r\n") & set(dummydata))
130
        datalax = utils.ReadOneLineFile(myfile, strict=False)
131
        self.assertEqual(myline, datalax)
132
        if ol:
133
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
134
                            myfile, strict=True)
135
        else:
136
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
137
          self.assertEqual(myline, datastrict)
138

    
139
  def testEmptyfile(self):
140
    myfile = self._CreateTempFile()
141
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
142

    
143

    
144
class TestTimestampForFilename(unittest.TestCase):
145
  def test(self):
146
    self.assert_("." not in utils.TimestampForFilename())
147
    self.assert_(":" not in utils.TimestampForFilename())
148

    
149

    
150
class TestCreateBackup(testutils.GanetiTestCase):
151
  def setUp(self):
152
    testutils.GanetiTestCase.setUp(self)
153

    
154
    self.tmpdir = tempfile.mkdtemp()
155

    
156
  def tearDown(self):
157
    testutils.GanetiTestCase.tearDown(self)
158

    
159
    shutil.rmtree(self.tmpdir)
160

    
161
  def testEmpty(self):
162
    filename = utils.PathJoin(self.tmpdir, "config.data")
163
    utils.WriteFile(filename, data="")
164
    bname = utils.CreateBackup(filename)
165
    self.assertFileContent(bname, "")
166
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
167
    utils.CreateBackup(filename)
168
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
169
    utils.CreateBackup(filename)
170
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
171

    
172
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
173
    os.mkfifo(fifoname)
174
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
175

    
176
  def testContent(self):
177
    bkpcount = 0
178
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
179
      for rep in [1, 2, 10, 127]:
180
        testdata = data * rep
181

    
182
        filename = utils.PathJoin(self.tmpdir, "test.data_")
183
        utils.WriteFile(filename, data=testdata)
184
        self.assertFileContent(filename, testdata)
185

    
186
        for _ in range(3):
187
          bname = utils.CreateBackup(filename)
188
          bkpcount += 1
189
          self.assertFileContent(bname, testdata)
190
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
191

    
192

    
193
class TestListVisibleFiles(unittest.TestCase):
194
  """Test case for ListVisibleFiles"""
195

    
196
  def setUp(self):
197
    self.path = tempfile.mkdtemp()
198

    
199
  def tearDown(self):
200
    shutil.rmtree(self.path)
201

    
202
  def _CreateFiles(self, files):
203
    for name in files:
204
      utils.WriteFile(os.path.join(self.path, name), data="test")
205

    
206
  def _test(self, files, expected):
207
    self._CreateFiles(files)
208
    found = utils.ListVisibleFiles(self.path)
209
    self.assertEqual(set(found), set(expected))
210

    
211
  def testAllVisible(self):
212
    files = ["a", "b", "c"]
213
    expected = files
214
    self._test(files, expected)
215

    
216
  def testNoneVisible(self):
217
    files = [".a", ".b", ".c"]
218
    expected = []
219
    self._test(files, expected)
220

    
221
  def testSomeVisible(self):
222
    files = ["a", "b", ".c"]
223
    expected = ["a", "b"]
224
    self._test(files, expected)
225

    
226
  def testNonAbsolutePath(self):
227
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
228
                          "abc")
229

    
230
  def testNonNormalizedPath(self):
231
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
232
                          "/bin/../tmp")
233

    
234

    
235
class TestWriteFile(unittest.TestCase):
236
  def setUp(self):
237
    self.tfile = tempfile.NamedTemporaryFile()
238
    self.did_pre = False
239
    self.did_post = False
240
    self.did_write = False
241

    
242
  def markPre(self, fd):
243
    self.did_pre = True
244

    
245
  def markPost(self, fd):
246
    self.did_post = True
247

    
248
  def markWrite(self, fd):
249
    self.did_write = True
250

    
251
  def testWrite(self):
252
    data = "abc"
253
    utils.WriteFile(self.tfile.name, data=data)
254
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
255

    
256
  def testErrors(self):
257
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
258
                      self.tfile.name, data="test", fn=lambda fd: None)
259
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
260
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
261
                      self.tfile.name, data="test", atime=0)
262

    
263
  def testCalls(self):
264
    utils.WriteFile(self.tfile.name, fn=self.markWrite,
265
                    prewrite=self.markPre, postwrite=self.markPost)
266
    self.assertTrue(self.did_pre)
267
    self.assertTrue(self.did_post)
268
    self.assertTrue(self.did_write)
269

    
270
  def testDryRun(self):
271
    orig = "abc"
272
    self.tfile.write(orig)
273
    self.tfile.flush()
274
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
275
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
276

    
277
  def testTimes(self):
278
    f = self.tfile.name
279
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
280
                   (int(time.time()), 5000)]:
281
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
282
      st = os.stat(f)
283
      self.assertEqual(st.st_atime, at)
284
      self.assertEqual(st.st_mtime, mt)
285

    
286
  def testNoClose(self):
287
    data = "hello"
288
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
289
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
290
    try:
291
      os.lseek(fd, 0, 0)
292
      self.assertEqual(os.read(fd, 4096), data)
293
    finally:
294
      os.close(fd)
295

    
296

    
297
class TestFileID(testutils.GanetiTestCase):
298
  def testEquality(self):
299
    name = self._CreateTempFile()
300
    oldi = utils.GetFileID(path=name)
301
    self.failUnless(utils.VerifyFileID(oldi, oldi))
302

    
303
  def testUpdate(self):
304
    name = self._CreateTempFile()
305
    oldi = utils.GetFileID(path=name)
306
    os.utime(name, None)
307
    fd = os.open(name, os.O_RDWR)
308
    try:
309
      newi = utils.GetFileID(fd=fd)
310
      self.failUnless(utils.VerifyFileID(oldi, newi))
311
      self.failUnless(utils.VerifyFileID(newi, oldi))
312
    finally:
313
      os.close(fd)
314

    
315
  def testWriteFile(self):
316
    name = self._CreateTempFile()
317
    oldi = utils.GetFileID(path=name)
318
    mtime = oldi[2]
319
    os.utime(name, (mtime + 10, mtime + 10))
320
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
321
                      oldi, data="")
322
    os.utime(name, (mtime - 10, mtime - 10))
323
    utils.SafeWriteFile(name, oldi, data="")
324
    oldi = utils.GetFileID(path=name)
325
    mtime = oldi[2]
326
    os.utime(name, (mtime + 10, mtime + 10))
327
    # this doesn't raise, since we passed None
328
    utils.SafeWriteFile(name, None, data="")
329

    
330
  def testError(self):
331
    t = tempfile.NamedTemporaryFile()
332
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
333
                      path=t.name, fd=t.fileno())
334

    
335

    
336
class TestRemoveFile(unittest.TestCase):
337
  """Test case for the RemoveFile function"""
338

    
339
  def setUp(self):
340
    """Create a temp dir and file for each case"""
341
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
342
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
343
    os.close(fd)
344

    
345
  def tearDown(self):
346
    if os.path.exists(self.tmpfile):
347
      os.unlink(self.tmpfile)
348
    os.rmdir(self.tmpdir)
349

    
350
  def testIgnoreDirs(self):
351
    """Test that RemoveFile() ignores directories"""
352
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
353

    
354
  def testIgnoreNotExisting(self):
355
    """Test that RemoveFile() ignores non-existing files"""
356
    utils.RemoveFile(self.tmpfile)
357
    utils.RemoveFile(self.tmpfile)
358

    
359
  def testRemoveFile(self):
360
    """Test that RemoveFile does remove a file"""
361
    utils.RemoveFile(self.tmpfile)
362
    if os.path.exists(self.tmpfile):
363
      self.fail("File '%s' not removed" % self.tmpfile)
364

    
365
  def testRemoveSymlink(self):
366
    """Test that RemoveFile does remove symlinks"""
367
    symlink = self.tmpdir + "/symlink"
368
    os.symlink("no-such-file", symlink)
369
    utils.RemoveFile(symlink)
370
    if os.path.exists(symlink):
371
      self.fail("File '%s' not removed" % symlink)
372
    os.symlink(self.tmpfile, symlink)
373
    utils.RemoveFile(symlink)
374
    if os.path.exists(symlink):
375
      self.fail("File '%s' not removed" % symlink)
376

    
377

    
378
class TestRemoveDir(unittest.TestCase):
379
  def setUp(self):
380
    self.tmpdir = tempfile.mkdtemp()
381

    
382
  def tearDown(self):
383
    try:
384
      shutil.rmtree(self.tmpdir)
385
    except EnvironmentError:
386
      pass
387

    
388
  def testEmptyDir(self):
389
    utils.RemoveDir(self.tmpdir)
390
    self.assertFalse(os.path.isdir(self.tmpdir))
391

    
392
  def testNonEmptyDir(self):
393
    self.tmpfile = os.path.join(self.tmpdir, "test1")
394
    open(self.tmpfile, "w").close()
395
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
396

    
397

    
398
class TestRename(unittest.TestCase):
399
  """Test case for RenameFile"""
400

    
401
  def setUp(self):
402
    """Create a temporary directory"""
403
    self.tmpdir = tempfile.mkdtemp()
404
    self.tmpfile = os.path.join(self.tmpdir, "test1")
405

    
406
    # Touch the file
407
    open(self.tmpfile, "w").close()
408

    
409
  def tearDown(self):
410
    """Remove temporary directory"""
411
    shutil.rmtree(self.tmpdir)
412

    
413
  def testSimpleRename1(self):
414
    """Simple rename 1"""
415
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
416
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
417

    
418
  def testSimpleRename2(self):
419
    """Simple rename 2"""
420
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
421
                     mkdir=True)
422
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
423

    
424
  def testRenameMkdir(self):
425
    """Rename with mkdir"""
426
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
427
                     mkdir=True)
428
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
429
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
430

    
431
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
432
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
433
                     mkdir=True)
434
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
435
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
436
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
437

    
438

    
439
class TestMakedirs(unittest.TestCase):
440
  def setUp(self):
441
    self.tmpdir = tempfile.mkdtemp()
442

    
443
  def tearDown(self):
444
    shutil.rmtree(self.tmpdir)
445

    
446
  def testNonExisting(self):
447
    path = utils.PathJoin(self.tmpdir, "foo")
448
    utils.Makedirs(path)
449
    self.assert_(os.path.isdir(path))
450

    
451
  def testExisting(self):
452
    path = utils.PathJoin(self.tmpdir, "foo")
453
    os.mkdir(path)
454
    utils.Makedirs(path)
455
    self.assert_(os.path.isdir(path))
456

    
457
  def testRecursiveNonExisting(self):
458
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
459
    utils.Makedirs(path)
460
    self.assert_(os.path.isdir(path))
461

    
462
  def testRecursiveExisting(self):
463
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
464
    self.assertFalse(os.path.exists(path))
465
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
466
    utils.Makedirs(path)
467
    self.assert_(os.path.isdir(path))
468

    
469

    
470
class TestEnsureDirs(unittest.TestCase):
471
  """Tests for EnsureDirs"""
472

    
473
  def setUp(self):
474
    self.dir = tempfile.mkdtemp()
475
    self.old_umask = os.umask(0777)
476

    
477
  def testEnsureDirs(self):
478
    utils.EnsureDirs([
479
        (utils.PathJoin(self.dir, "foo"), 0777),
480
        (utils.PathJoin(self.dir, "bar"), 0000),
481
        ])
482
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
483
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
484

    
485
  def tearDown(self):
486
    os.rmdir(utils.PathJoin(self.dir, "foo"))
487
    os.rmdir(utils.PathJoin(self.dir, "bar"))
488
    os.rmdir(self.dir)
489
    os.umask(self.old_umask)
490

    
491

    
492
class TestIsNormAbsPath(unittest.TestCase):
493
  """Testing case for IsNormAbsPath"""
494

    
495
  def _pathTestHelper(self, path, result):
496
    if result:
497
      self.assert_(utils.IsNormAbsPath(path),
498
          "Path %s should result absolute and normalized" % path)
499
    else:
500
      self.assertFalse(utils.IsNormAbsPath(path),
501
          "Path %s should not result absolute and normalized" % path)
502

    
503
  def testBase(self):
504
    self._pathTestHelper("/etc", True)
505
    self._pathTestHelper("/srv", True)
506
    self._pathTestHelper("etc", False)
507
    self._pathTestHelper("/etc/../root", False)
508
    self._pathTestHelper("/etc/", False)
509

    
510

    
511
class TestPathJoin(unittest.TestCase):
512
  """Testing case for PathJoin"""
513

    
514
  def testBasicItems(self):
515
    mlist = ["/a", "b", "c"]
516
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
517

    
518
  def testNonAbsPrefix(self):
519
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
520

    
521
  def testBackTrack(self):
522
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
523

    
524
  def testMultiAbs(self):
525
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
526

    
527

    
528
class TestTailFile(testutils.GanetiTestCase):
529
  """Test case for the TailFile function"""
530

    
531
  def testEmpty(self):
532
    fname = self._CreateTempFile()
533
    self.failUnlessEqual(utils.TailFile(fname), [])
534
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
535

    
536
  def testAllLines(self):
537
    data = ["test %d" % i for i in range(30)]
538
    for i in range(30):
539
      fname = self._CreateTempFile()
540
      fd = open(fname, "w")
541
      fd.write("\n".join(data[:i]))
542
      if i > 0:
543
        fd.write("\n")
544
      fd.close()
545
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
546

    
547
  def testPartialLines(self):
548
    data = ["test %d" % i for i in range(30)]
549
    fname = self._CreateTempFile()
550
    fd = open(fname, "w")
551
    fd.write("\n".join(data))
552
    fd.write("\n")
553
    fd.close()
554
    for i in range(1, 30):
555
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
556

    
557
  def testBigFile(self):
558
    data = ["test %d" % i for i in range(30)]
559
    fname = self._CreateTempFile()
560
    fd = open(fname, "w")
561
    fd.write("X" * 1048576)
562
    fd.write("\n")
563
    fd.write("\n".join(data))
564
    fd.write("\n")
565
    fd.close()
566
    for i in range(1, 30):
567
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
568

    
569

    
570
class TestPidFileFunctions(unittest.TestCase):
571
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
572

    
573
  def setUp(self):
574
    self.dir = tempfile.mkdtemp()
575
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
576

    
577
  def testPidFileFunctions(self):
578
    pid_file = self.f_dpn('test')
579
    fd = utils.WritePidFile(self.f_dpn('test'))
580
    self.failUnless(os.path.exists(pid_file),
581
                    "PID file should have been created")
582
    read_pid = utils.ReadPidFile(pid_file)
583
    self.failUnlessEqual(read_pid, os.getpid())
584
    self.failUnless(utils.IsProcessAlive(read_pid))
585
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
586
                          self.f_dpn('test'))
587
    os.close(fd)
588
    utils.RemovePidFile(self.f_dpn("test"))
589
    self.failIf(os.path.exists(pid_file),
590
                "PID file should not exist anymore")
591
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
592
                         "ReadPidFile should return 0 for missing pid file")
593
    fh = open(pid_file, "w")
594
    fh.write("blah\n")
595
    fh.close()
596
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
597
                         "ReadPidFile should return 0 for invalid pid file")
598
    # but now, even with the file existing, we should be able to lock it
599
    fd = utils.WritePidFile(self.f_dpn('test'))
600
    os.close(fd)
601
    utils.RemovePidFile(self.f_dpn("test"))
602
    self.failIf(os.path.exists(pid_file),
603
                "PID file should not exist anymore")
604

    
605
  def testKill(self):
606
    pid_file = self.f_dpn('child')
607
    r_fd, w_fd = os.pipe()
608
    new_pid = os.fork()
609
    if new_pid == 0: #child
610
      utils.WritePidFile(self.f_dpn('child'))
611
      os.write(w_fd, 'a')
612
      signal.pause()
613
      os._exit(0)
614
      return
615
    # else we are in the parent
616
    # wait until the child has written the pid file
617
    os.read(r_fd, 1)
618
    read_pid = utils.ReadPidFile(pid_file)
619
    self.failUnlessEqual(read_pid, new_pid)
620
    self.failUnless(utils.IsProcessAlive(new_pid))
621
    utils.KillProcess(new_pid, waitpid=True)
622
    self.failIf(utils.IsProcessAlive(new_pid))
623
    utils.RemovePidFile(self.f_dpn('child'))
624
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
625

    
626
  def tearDown(self):
627
    shutil.rmtree(self.dir)
628

    
629

    
630
class TestSshKeys(testutils.GanetiTestCase):
631
  """Test case for the AddAuthorizedKey function"""
632

    
633
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
634
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
635
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
636

    
637
  def setUp(self):
638
    testutils.GanetiTestCase.setUp(self)
639
    self.tmpname = self._CreateTempFile()
640
    handle = open(self.tmpname, 'w')
641
    try:
642
      handle.write("%s\n" % TestSshKeys.KEY_A)
643
      handle.write("%s\n" % TestSshKeys.KEY_B)
644
    finally:
645
      handle.close()
646

    
647
  def testAddingNewKey(self):
648
    utils.AddAuthorizedKey(self.tmpname,
649
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
650

    
651
    self.assertFileContent(self.tmpname,
652
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
653
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
654
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
655
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
656

    
657
  def testAddingAlmostButNotCompletelyTheSameKey(self):
658
    utils.AddAuthorizedKey(self.tmpname,
659
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
660

    
661
    self.assertFileContent(self.tmpname,
662
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
663
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
664
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
665
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
666

    
667
  def testAddingExistingKeyWithSomeMoreSpaces(self):
668
    utils.AddAuthorizedKey(self.tmpname,
669
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
670

    
671
    self.assertFileContent(self.tmpname,
672
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
673
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
674
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
675

    
676
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
677
    utils.RemoveAuthorizedKey(self.tmpname,
678
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
679

    
680
    self.assertFileContent(self.tmpname,
681
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
682
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
683

    
684
  def testRemovingNonExistingKey(self):
685
    utils.RemoveAuthorizedKey(self.tmpname,
686
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
687

    
688
    self.assertFileContent(self.tmpname,
689
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
690
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
691
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
692

    
693

    
694
class TestNewUUID(unittest.TestCase):
695
  """Test case for NewUUID"""
696

    
697
  def runTest(self):
698
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
699

    
700

    
701
if __name__ == "__main__":
702
  testutils.GanetiTestProgram()