Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ fd6eaa09

History | View | Annotate | Download (32.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31
import stat
32
import errno
33

    
34
from ganeti import constants
35
from ganeti import utils
36
from ganeti import compat
37
from ganeti import errors
38

    
39
import testutils
40

    
41

    
42
class TestReadFile(testutils.GanetiTestCase):
43
  def testReadAll(self):
44
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45
    self.assertEqual(len(data), 814)
46

    
47
    h = compat.md5_hash()
48
    h.update(data)
49
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
50

    
51
  def testReadSize(self):
52
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
53
                          size=100)
54
    self.assertEqual(len(data), 100)
55

    
56
    h = compat.md5_hash()
57
    h.update(data)
58
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
59

    
60
  def testCallback(self):
61
    def _Cb(fh):
62
      self.assertEqual(fh.tell(), 0)
63
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64
    self.assertEqual(len(data), 814)
65

    
66
  def testError(self):
67
    self.assertRaises(EnvironmentError, utils.ReadFile,
68
                      "/dev/null/does-not-exist")
69

    
70

    
71
class TestReadOneLineFile(testutils.GanetiTestCase):
72
  def setUp(self):
73
    testutils.GanetiTestCase.setUp(self)
74

    
75
  def testDefault(self):
76
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77
    self.assertEqual(len(data), 27)
78
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
79

    
80
  def testNotStrict(self):
81
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
82
                                 strict=False)
83
    self.assertEqual(len(data), 27)
84
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
85

    
86
  def testStrictFailure(self):
87
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88
                      self._TestDataFilename("cert1.pem"), strict=True)
89

    
90
  def testLongLine(self):
91
    dummydata = (1024 * "Hello World! ")
92
    myfile = self._CreateTempFile()
93
    utils.WriteFile(myfile, data=dummydata)
94
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
95
    datalax = utils.ReadOneLineFile(myfile, strict=False)
96
    self.assertEqual(dummydata, datastrict)
97
    self.assertEqual(dummydata, datalax)
98

    
99
  def testNewline(self):
100
    myfile = self._CreateTempFile()
101
    myline = "myline"
102
    for nl in ["", "\n", "\r\n"]:
103
      dummydata = "%s%s" % (myline, nl)
104
      utils.WriteFile(myfile, data=dummydata)
105
      datalax = utils.ReadOneLineFile(myfile, strict=False)
106
      self.assertEqual(myline, datalax)
107
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
108
      self.assertEqual(myline, datastrict)
109

    
110
  def testWhitespaceAndMultipleLines(self):
111
    myfile = self._CreateTempFile()
112
    for nl in ["", "\n", "\r\n"]:
113
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
114
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115
        utils.WriteFile(myfile, data=dummydata)
116
        datalax = utils.ReadOneLineFile(myfile, strict=False)
117
        if nl:
118
          self.assert_(set("\r\n") & set(dummydata))
119
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
120
                            myfile, strict=True)
121
          explen = len("Foo bar baz ") + len(ws)
122
          self.assertEqual(len(datalax), explen)
123
          self.assertEqual(datalax, dummydata[:explen])
124
          self.assertFalse(set("\r\n") & set(datalax))
125
        else:
126
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
127
          self.assertEqual(dummydata, datastrict)
128
          self.assertEqual(dummydata, datalax)
129

    
130
  def testEmptylines(self):
131
    myfile = self._CreateTempFile()
132
    myline = "myline"
133
    for nl in ["\n", "\r\n"]:
134
      for ol in ["", "otherline"]:
135
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136
        utils.WriteFile(myfile, data=dummydata)
137
        self.assert_(set("\r\n") & set(dummydata))
138
        datalax = utils.ReadOneLineFile(myfile, strict=False)
139
        self.assertEqual(myline, datalax)
140
        if ol:
141
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
142
                            myfile, strict=True)
143
        else:
144
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
145
          self.assertEqual(myline, datastrict)
146

    
147
  def testEmptyfile(self):
148
    myfile = self._CreateTempFile()
149
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
150

    
151

    
152
class TestTimestampForFilename(unittest.TestCase):
153
  def test(self):
154
    self.assert_("." not in utils.TimestampForFilename())
155
    self.assert_(":" not in utils.TimestampForFilename())
156

    
157

    
158
class TestCreateBackup(testutils.GanetiTestCase):
159
  def setUp(self):
160
    testutils.GanetiTestCase.setUp(self)
161

    
162
    self.tmpdir = tempfile.mkdtemp()
163

    
164
  def tearDown(self):
165
    testutils.GanetiTestCase.tearDown(self)
166

    
167
    shutil.rmtree(self.tmpdir)
168

    
169
  def testEmpty(self):
170
    filename = utils.PathJoin(self.tmpdir, "config.data")
171
    utils.WriteFile(filename, data="")
172
    bname = utils.CreateBackup(filename)
173
    self.assertFileContent(bname, "")
174
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175
    utils.CreateBackup(filename)
176
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177
    utils.CreateBackup(filename)
178
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
179

    
180
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
181
    os.mkfifo(fifoname)
182
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
183

    
184
  def testContent(self):
185
    bkpcount = 0
186
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187
      for rep in [1, 2, 10, 127]:
188
        testdata = data * rep
189

    
190
        filename = utils.PathJoin(self.tmpdir, "test.data_")
191
        utils.WriteFile(filename, data=testdata)
192
        self.assertFileContent(filename, testdata)
193

    
194
        for _ in range(3):
195
          bname = utils.CreateBackup(filename)
196
          bkpcount += 1
197
          self.assertFileContent(bname, testdata)
198
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
199

    
200

    
201
class TestListVisibleFiles(unittest.TestCase):
202
  """Test case for ListVisibleFiles"""
203

    
204
  def setUp(self):
205
    self.path = tempfile.mkdtemp()
206

    
207
  def tearDown(self):
208
    shutil.rmtree(self.path)
209

    
210
  def _CreateFiles(self, files):
211
    for name in files:
212
      utils.WriteFile(os.path.join(self.path, name), data="test")
213

    
214
  def _test(self, files, expected):
215
    self._CreateFiles(files)
216
    found = utils.ListVisibleFiles(self.path)
217
    self.assertEqual(set(found), set(expected))
218

    
219
  def testAllVisible(self):
220
    files = ["a", "b", "c"]
221
    expected = files
222
    self._test(files, expected)
223

    
224
  def testNoneVisible(self):
225
    files = [".a", ".b", ".c"]
226
    expected = []
227
    self._test(files, expected)
228

    
229
  def testSomeVisible(self):
230
    files = ["a", "b", ".c"]
231
    expected = ["a", "b"]
232
    self._test(files, expected)
233

    
234
  def testNonAbsolutePath(self):
235
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
236
                          "abc")
237

    
238
  def testNonNormalizedPath(self):
239
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
240
                          "/bin/../tmp")
241

    
242

    
243
class TestWriteFile(testutils.GanetiTestCase):
244
  def setUp(self):
245
    testutils.GanetiTestCase.setUp(self)
246
    self.tmpdir = None
247
    self.tfile = tempfile.NamedTemporaryFile()
248
    self.did_pre = False
249
    self.did_post = False
250
    self.did_write = False
251

    
252
  def tearDown(self):
253
    testutils.GanetiTestCase.tearDown(self)
254
    if self.tmpdir:
255
      shutil.rmtree(self.tmpdir)
256

    
257
  def markPre(self, fd):
258
    self.did_pre = True
259

    
260
  def markPost(self, fd):
261
    self.did_post = True
262

    
263
  def markWrite(self, fd):
264
    self.did_write = True
265

    
266
  def testWrite(self):
267
    data = "abc"
268
    utils.WriteFile(self.tfile.name, data=data)
269
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
270

    
271
  def testWriteSimpleUnicode(self):
272
    data = u"abc"
273
    utils.WriteFile(self.tfile.name, data=data)
274
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
275

    
276
  def testErrors(self):
277
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
278
                      self.tfile.name, data="test", fn=lambda fd: None)
279
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
280
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
281
                      self.tfile.name, data="test", atime=0)
282
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
283
                      mode=0400, keep_perms=utils.KP_ALWAYS)
284
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
285
                      uid=0, keep_perms=utils.KP_ALWAYS)
286
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
287
                      gid=0, keep_perms=utils.KP_ALWAYS)
288
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
289
                      mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
290

    
291
  def testPreWrite(self):
292
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
293
    self.assertTrue(self.did_pre)
294
    self.assertFalse(self.did_post)
295
    self.assertFalse(self.did_write)
296

    
297
  def testPostWrite(self):
298
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
299
    self.assertFalse(self.did_pre)
300
    self.assertTrue(self.did_post)
301
    self.assertFalse(self.did_write)
302

    
303
  def testWriteFunction(self):
304
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
305
    self.assertFalse(self.did_pre)
306
    self.assertFalse(self.did_post)
307
    self.assertTrue(self.did_write)
308

    
309
  def testDryRun(self):
310
    orig = "abc"
311
    self.tfile.write(orig)
312
    self.tfile.flush()
313
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
314
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
315

    
316
  def testTimes(self):
317
    f = self.tfile.name
318
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
319
                   (int(time.time()), 5000)]:
320
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
321
      st = os.stat(f)
322
      self.assertEqual(st.st_atime, at)
323
      self.assertEqual(st.st_mtime, mt)
324

    
325
  def testNoClose(self):
326
    data = "hello"
327
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
328
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
329
    try:
330
      os.lseek(fd, 0, 0)
331
      self.assertEqual(os.read(fd, 4096), data)
332
    finally:
333
      os.close(fd)
334

    
335
  def testNoLeftovers(self):
336
    self.tmpdir = tempfile.mkdtemp()
337
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
338
                                     data="abc"),
339
                     None)
340
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
341

    
342
  def testFailRename(self):
343
    self.tmpdir = tempfile.mkdtemp()
344
    target = utils.PathJoin(self.tmpdir, "target")
345
    os.mkdir(target)
346
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
347
    self.assertTrue(os.path.isdir(target))
348
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
349
    self.assertFalse(os.listdir(target))
350

    
351
  def testFailRenameDryRun(self):
352
    self.tmpdir = tempfile.mkdtemp()
353
    target = utils.PathJoin(self.tmpdir, "target")
354
    os.mkdir(target)
355
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
356
    self.assertTrue(os.path.isdir(target))
357
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
358
    self.assertFalse(os.listdir(target))
359

    
360
  def testBackup(self):
361
    self.tmpdir = tempfile.mkdtemp()
362
    testfile = utils.PathJoin(self.tmpdir, "test")
363

    
364
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
365
    self.assertEqual(utils.ReadFile(testfile), "foo")
366
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
367

    
368
    # Write again
369
    assert os.path.isfile(testfile)
370
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
371
    self.assertEqual(utils.ReadFile(testfile), "bar")
372
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
373
    self.assertTrue("test" in os.listdir(self.tmpdir))
374
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
375

    
376
    # Write again as dry-run
377
    assert os.path.isfile(testfile)
378
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
379
                                     dry_run=True),
380
                     None)
381
    self.assertEqual(utils.ReadFile(testfile), "bar")
382
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
383
    self.assertTrue("test" in os.listdir(self.tmpdir))
384
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
385

    
386
  def testFileMode(self):
387
    self.tmpdir = tempfile.mkdtemp()
388
    target = utils.PathJoin(self.tmpdir, "target")
389
    self.assertRaises(OSError, utils.WriteFile, target, data="data",
390
                      keep_perms=utils.KP_ALWAYS)
391
    # All masks have only user bits set, to avoid interactions with umask
392
    utils.WriteFile(target, data="data", mode=0200)
393
    self.assertFileMode(target, 0200)
394
    utils.WriteFile(target, data="data", mode=0400,
395
                    keep_perms=utils.KP_IF_EXISTS)
396
    self.assertFileMode(target, 0200)
397
    utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
398
    self.assertFileMode(target, 0200)
399
    utils.WriteFile(target, data="data", mode=0700)
400
    self.assertFileMode(target, 0700)
401

    
402
  def testNewFileMode(self):
403
    self.tmpdir = tempfile.mkdtemp()
404
    target = utils.PathJoin(self.tmpdir, "target")
405
    utils.WriteFile(target, data="data", mode=0400,
406
                    keep_perms=utils.KP_IF_EXISTS)
407
    self.assertFileMode(target, 0400)
408

    
409
class TestFileID(testutils.GanetiTestCase):
410
  def testEquality(self):
411
    name = self._CreateTempFile()
412
    oldi = utils.GetFileID(path=name)
413
    self.failUnless(utils.VerifyFileID(oldi, oldi))
414

    
415
  def testUpdate(self):
416
    name = self._CreateTempFile()
417
    oldi = utils.GetFileID(path=name)
418
    os.utime(name, None)
419
    fd = os.open(name, os.O_RDWR)
420
    try:
421
      newi = utils.GetFileID(fd=fd)
422
      self.failUnless(utils.VerifyFileID(oldi, newi))
423
      self.failUnless(utils.VerifyFileID(newi, oldi))
424
    finally:
425
      os.close(fd)
426

    
427
  def testWriteFile(self):
428
    name = self._CreateTempFile()
429
    oldi = utils.GetFileID(path=name)
430
    mtime = oldi[2]
431
    os.utime(name, (mtime + 10, mtime + 10))
432
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
433
                      oldi, data="")
434
    os.utime(name, (mtime - 10, mtime - 10))
435
    utils.SafeWriteFile(name, oldi, data="")
436
    oldi = utils.GetFileID(path=name)
437
    mtime = oldi[2]
438
    os.utime(name, (mtime + 10, mtime + 10))
439
    # this doesn't raise, since we passed None
440
    utils.SafeWriteFile(name, None, data="")
441

    
442
  def testError(self):
443
    t = tempfile.NamedTemporaryFile()
444
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
445
                      path=t.name, fd=t.fileno())
446

    
447

    
448
class TestRemoveFile(unittest.TestCase):
449
  """Test case for the RemoveFile function"""
450

    
451
  def setUp(self):
452
    """Create a temp dir and file for each case"""
453
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
454
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
455
    os.close(fd)
456

    
457
  def tearDown(self):
458
    if os.path.exists(self.tmpfile):
459
      os.unlink(self.tmpfile)
460
    os.rmdir(self.tmpdir)
461

    
462
  def testIgnoreDirs(self):
463
    """Test that RemoveFile() ignores directories"""
464
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
465

    
466
  def testIgnoreNotExisting(self):
467
    """Test that RemoveFile() ignores non-existing files"""
468
    utils.RemoveFile(self.tmpfile)
469
    utils.RemoveFile(self.tmpfile)
470

    
471
  def testRemoveFile(self):
472
    """Test that RemoveFile does remove a file"""
473
    utils.RemoveFile(self.tmpfile)
474
    if os.path.exists(self.tmpfile):
475
      self.fail("File '%s' not removed" % self.tmpfile)
476

    
477
  def testRemoveSymlink(self):
478
    """Test that RemoveFile does remove symlinks"""
479
    symlink = self.tmpdir + "/symlink"
480
    os.symlink("no-such-file", symlink)
481
    utils.RemoveFile(symlink)
482
    if os.path.exists(symlink):
483
      self.fail("File '%s' not removed" % symlink)
484
    os.symlink(self.tmpfile, symlink)
485
    utils.RemoveFile(symlink)
486
    if os.path.exists(symlink):
487
      self.fail("File '%s' not removed" % symlink)
488

    
489

    
490
class TestRemoveDir(unittest.TestCase):
491
  def setUp(self):
492
    self.tmpdir = tempfile.mkdtemp()
493

    
494
  def tearDown(self):
495
    try:
496
      shutil.rmtree(self.tmpdir)
497
    except EnvironmentError:
498
      pass
499

    
500
  def testEmptyDir(self):
501
    utils.RemoveDir(self.tmpdir)
502
    self.assertFalse(os.path.isdir(self.tmpdir))
503

    
504
  def testNonEmptyDir(self):
505
    self.tmpfile = os.path.join(self.tmpdir, "test1")
506
    open(self.tmpfile, "w").close()
507
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
508

    
509

    
510
class TestRename(unittest.TestCase):
511
  """Test case for RenameFile"""
512

    
513
  def setUp(self):
514
    """Create a temporary directory"""
515
    self.tmpdir = tempfile.mkdtemp()
516
    self.tmpfile = os.path.join(self.tmpdir, "test1")
517

    
518
    # Touch the file
519
    open(self.tmpfile, "w").close()
520

    
521
  def tearDown(self):
522
    """Remove temporary directory"""
523
    shutil.rmtree(self.tmpdir)
524

    
525
  def testSimpleRename1(self):
526
    """Simple rename 1"""
527
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
528
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
529

    
530
  def testSimpleRename2(self):
531
    """Simple rename 2"""
532
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
533
                     mkdir=True)
534
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
535

    
536
  def testRenameMkdir(self):
537
    """Rename with mkdir"""
538
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
539
                     mkdir=True)
540
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
541
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
542

    
543
    self.assertRaises(EnvironmentError, utils.RenameFile,
544
                      os.path.join(self.tmpdir, "test/xyz"),
545
                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
546
                      mkdir=True)
547

    
548
    self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
549
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
550
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
551
                                                 "test/foo/bar/baz")))
552

    
553

    
554
class TestMakedirs(unittest.TestCase):
555
  def setUp(self):
556
    self.tmpdir = tempfile.mkdtemp()
557

    
558
  def tearDown(self):
559
    shutil.rmtree(self.tmpdir)
560

    
561
  def testNonExisting(self):
562
    path = utils.PathJoin(self.tmpdir, "foo")
563
    utils.Makedirs(path)
564
    self.assert_(os.path.isdir(path))
565

    
566
  def testExisting(self):
567
    path = utils.PathJoin(self.tmpdir, "foo")
568
    os.mkdir(path)
569
    utils.Makedirs(path)
570
    self.assert_(os.path.isdir(path))
571

    
572
  def testRecursiveNonExisting(self):
573
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
574
    utils.Makedirs(path)
575
    self.assert_(os.path.isdir(path))
576

    
577
  def testRecursiveExisting(self):
578
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
579
    self.assertFalse(os.path.exists(path))
580
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
581
    utils.Makedirs(path)
582
    self.assert_(os.path.isdir(path))
583

    
584

    
585
class TestEnsureDirs(unittest.TestCase):
586
  """Tests for EnsureDirs"""
587

    
588
  def setUp(self):
589
    self.dir = tempfile.mkdtemp()
590
    self.old_umask = os.umask(0777)
591

    
592
  def testEnsureDirs(self):
593
    utils.EnsureDirs([
594
        (utils.PathJoin(self.dir, "foo"), 0777),
595
        (utils.PathJoin(self.dir, "bar"), 0000),
596
        ])
597
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
598
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
599

    
600
  def tearDown(self):
601
    os.rmdir(utils.PathJoin(self.dir, "foo"))
602
    os.rmdir(utils.PathJoin(self.dir, "bar"))
603
    os.rmdir(self.dir)
604
    os.umask(self.old_umask)
605

    
606

    
607
class TestIsNormAbsPath(unittest.TestCase):
608
  """Testing case for IsNormAbsPath"""
609

    
610
  def _pathTestHelper(self, path, result):
611
    if result:
612
      self.assert_(utils.IsNormAbsPath(path),
613
          "Path %s should result absolute and normalized" % path)
614
    else:
615
      self.assertFalse(utils.IsNormAbsPath(path),
616
          "Path %s should not result absolute and normalized" % path)
617

    
618
  def testBase(self):
619
    self._pathTestHelper("/etc", True)
620
    self._pathTestHelper("/srv", True)
621
    self._pathTestHelper("etc", False)
622
    self._pathTestHelper("/etc/../root", False)
623
    self._pathTestHelper("/etc/", False)
624

    
625

    
626
class TestIsBelowDir(unittest.TestCase):
627
  """Testing case for IsBelowDir"""
628

    
629
  def testSamePrefix(self):
630
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
631
    self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
632

    
633
  def testSamePrefixButDifferentDir(self):
634
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
635
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
636

    
637
  def testSamePrefixButDirTraversal(self):
638
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
639
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
640

    
641
  def testSamePrefixAndTraversal(self):
642
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
643
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
644
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
645

    
646
  def testBothAbsPath(self):
647
    self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
648
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
649
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
650

    
651

    
652
class TestPathJoin(unittest.TestCase):
653
  """Testing case for PathJoin"""
654

    
655
  def testBasicItems(self):
656
    mlist = ["/a", "b", "c"]
657
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
658

    
659
  def testNonAbsPrefix(self):
660
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
661

    
662
  def testBackTrack(self):
663
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
664

    
665
  def testMultiAbs(self):
666
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
667

    
668

    
669
class TestTailFile(testutils.GanetiTestCase):
670
  """Test case for the TailFile function"""
671

    
672
  def testEmpty(self):
673
    fname = self._CreateTempFile()
674
    self.failUnlessEqual(utils.TailFile(fname), [])
675
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
676

    
677
  def testAllLines(self):
678
    data = ["test %d" % i for i in range(30)]
679
    for i in range(30):
680
      fname = self._CreateTempFile()
681
      fd = open(fname, "w")
682
      fd.write("\n".join(data[:i]))
683
      if i > 0:
684
        fd.write("\n")
685
      fd.close()
686
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
687

    
688
  def testPartialLines(self):
689
    data = ["test %d" % i for i in range(30)]
690
    fname = self._CreateTempFile()
691
    fd = open(fname, "w")
692
    fd.write("\n".join(data))
693
    fd.write("\n")
694
    fd.close()
695
    for i in range(1, 30):
696
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
697

    
698
  def testBigFile(self):
699
    data = ["test %d" % i for i in range(30)]
700
    fname = self._CreateTempFile()
701
    fd = open(fname, "w")
702
    fd.write("X" * 1048576)
703
    fd.write("\n")
704
    fd.write("\n".join(data))
705
    fd.write("\n")
706
    fd.close()
707
    for i in range(1, 30):
708
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
709

    
710

    
711
class TestPidFileFunctions(unittest.TestCase):
712
  """Tests for WritePidFile and ReadPidFile"""
713

    
714
  def setUp(self):
715
    self.dir = tempfile.mkdtemp()
716
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
717

    
718
  def testPidFileFunctions(self):
719
    pid_file = self.f_dpn('test')
720
    fd = utils.WritePidFile(self.f_dpn('test'))
721
    self.failUnless(os.path.exists(pid_file),
722
                    "PID file should have been created")
723
    read_pid = utils.ReadPidFile(pid_file)
724
    self.failUnlessEqual(read_pid, os.getpid())
725
    self.failUnless(utils.IsProcessAlive(read_pid))
726
    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
727
                          self.f_dpn('test'))
728
    os.close(fd)
729
    utils.RemoveFile(self.f_dpn("test"))
730
    self.failIf(os.path.exists(pid_file),
731
                "PID file should not exist anymore")
732
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
733
                         "ReadPidFile should return 0 for missing pid file")
734
    fh = open(pid_file, "w")
735
    fh.write("blah\n")
736
    fh.close()
737
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
738
                         "ReadPidFile should return 0 for invalid pid file")
739
    # but now, even with the file existing, we should be able to lock it
740
    fd = utils.WritePidFile(self.f_dpn('test'))
741
    os.close(fd)
742
    utils.RemoveFile(self.f_dpn("test"))
743
    self.failIf(os.path.exists(pid_file),
744
                "PID file should not exist anymore")
745

    
746
  def testKill(self):
747
    pid_file = self.f_dpn('child')
748
    r_fd, w_fd = os.pipe()
749
    new_pid = os.fork()
750
    if new_pid == 0: #child
751
      utils.WritePidFile(self.f_dpn('child'))
752
      os.write(w_fd, 'a')
753
      signal.pause()
754
      os._exit(0)
755
      return
756
    # else we are in the parent
757
    # wait until the child has written the pid file
758
    os.read(r_fd, 1)
759
    read_pid = utils.ReadPidFile(pid_file)
760
    self.failUnlessEqual(read_pid, new_pid)
761
    self.failUnless(utils.IsProcessAlive(new_pid))
762
    utils.KillProcess(new_pid, waitpid=True)
763
    self.failIf(utils.IsProcessAlive(new_pid))
764
    utils.RemoveFile(self.f_dpn('child'))
765
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
766

    
767
  def tearDown(self):
768
    shutil.rmtree(self.dir)
769

    
770

    
771
class TestSshKeys(testutils.GanetiTestCase):
772
  """Test case for the AddAuthorizedKey function"""
773

    
774
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
775
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
776
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
777

    
778
  def setUp(self):
779
    testutils.GanetiTestCase.setUp(self)
780
    self.tmpname = self._CreateTempFile()
781
    handle = open(self.tmpname, 'w')
782
    try:
783
      handle.write("%s\n" % TestSshKeys.KEY_A)
784
      handle.write("%s\n" % TestSshKeys.KEY_B)
785
    finally:
786
      handle.close()
787

    
788
  def testAddingNewKey(self):
789
    utils.AddAuthorizedKey(self.tmpname,
790
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
791

    
792
    self.assertFileContent(self.tmpname,
793
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
794
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
795
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
796
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
797

    
798
  def testAddingAlmostButNotCompletelyTheSameKey(self):
799
    utils.AddAuthorizedKey(self.tmpname,
800
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
801

    
802
    self.assertFileContent(self.tmpname,
803
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
804
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
805
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
806
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
807

    
808
  def testAddingExistingKeyWithSomeMoreSpaces(self):
809
    utils.AddAuthorizedKey(self.tmpname,
810
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
811

    
812
    self.assertFileContent(self.tmpname,
813
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
814
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
815
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
816

    
817
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
818
    utils.RemoveAuthorizedKey(self.tmpname,
819
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
820

    
821
    self.assertFileContent(self.tmpname,
822
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
823
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
824

    
825
  def testRemovingNonExistingKey(self):
826
    utils.RemoveAuthorizedKey(self.tmpname,
827
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
828

    
829
    self.assertFileContent(self.tmpname,
830
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
831
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
832
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
833

    
834

    
835
class TestNewUUID(unittest.TestCase):
836
  """Test case for NewUUID"""
837

    
838
  def runTest(self):
839
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
840

    
841

    
842
def _MockStatResult(cb, mode, uid, gid):
843
  def _fn(path):
844
    if cb:
845
      cb()
846
    return {
847
      stat.ST_MODE: mode,
848
      stat.ST_UID: uid,
849
      stat.ST_GID: gid,
850
      }
851
  return _fn
852

    
853

    
854
def _RaiseNoEntError():
855
  raise EnvironmentError(errno.ENOENT, "not found")
856

    
857

    
858
def _OtherStatRaise():
859
  raise EnvironmentError()
860

    
861

    
862
class TestPermissionEnforcements(unittest.TestCase):
863
  UID_A = 16024
864
  UID_B = 25850
865
  GID_A = 14028
866
  GID_B = 29801
867

    
868
  def setUp(self):
869
    self._chown_calls = []
870
    self._chmod_calls = []
871
    self._mkdir_calls = []
872

    
873
  def tearDown(self):
874
    self.assertRaises(IndexError, self._mkdir_calls.pop)
875
    self.assertRaises(IndexError, self._chmod_calls.pop)
876
    self.assertRaises(IndexError, self._chown_calls.pop)
877

    
878
  def _FakeMkdir(self, path):
879
    self._mkdir_calls.append(path)
880

    
881
  def _FakeChown(self, path, uid, gid):
882
    self._chown_calls.append((path, uid, gid))
883

    
884
  def _ChmodWrapper(self, cb):
885
    def _fn(path, mode):
886
      self._chmod_calls.append((path, mode))
887
      if cb:
888
        cb()
889
    return _fn
890

    
891
  def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
892
    self.assertEqual(path, "/ganeti-qa-non-test")
893
    self.assertEqual(mode, 0700)
894
    self.assertEqual(uid, self.UID_A)
895
    self.assertEqual(gid, self.GID_A)
896

    
897
  def testMakeDirWithPerm(self):
898
    is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
899
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
900
                          _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
901

    
902
  def testDirErrors(self):
903
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
904
                      "/ganeti-qa-non-test", 0700, 0, 0,
905
                      _lstat_fn=_MockStatResult(None, 0, 0, 0))
906
    self.assertRaises(IndexError, self._mkdir_calls.pop)
907

    
908
    other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
909
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
910
                      "/ganeti-qa-non-test", 0700, 0, 0,
911
                      _lstat_fn=other_stat_raise)
912
    self.assertRaises(IndexError, self._mkdir_calls.pop)
913

    
914
    non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
915
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
916
                          _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
917
                          _perm_fn=self._VerifyPerm)
918
    self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
919

    
920
  def testEnforcePermissionNoEnt(self):
921
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
922
                      "/ganeti-qa-non-test", 0600,
923
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
924
                      _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
925

    
926
  def testEnforcePermissionNoEntMustNotExist(self):
927
    utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
928
                            _chmod_fn=NotImplemented,
929
                            _chown_fn=NotImplemented,
930
                            _stat_fn=_MockStatResult(_RaiseNoEntError,
931
                                                          0, 0, 0))
932

    
933
  def testEnforcePermissionOtherErrorMustNotExist(self):
934
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
935
                      "/ganeti-qa-non-test", 0600, must_exist=False,
936
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
937
                      _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
938

    
939
  def testEnforcePermissionNoChanges(self):
940
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
941
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
942
                            _chmod_fn=self._ChmodWrapper(None),
943
                            _chown_fn=self._FakeChown)
944

    
945
  def testEnforcePermissionChangeMode(self):
946
    utils.EnforcePermission("/ganeti-qa-non-test", 0444,
947
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
948
                            _chmod_fn=self._ChmodWrapper(None),
949
                            _chown_fn=self._FakeChown)
950
    self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
951

    
952
  def testEnforcePermissionSetUidGid(self):
953
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
954
                            uid=self.UID_B, gid=self.GID_B,
955
                            _stat_fn=_MockStatResult(None, 0600,
956
                                                     self.UID_A,
957
                                                     self.GID_A),
958
                            _chmod_fn=self._ChmodWrapper(None),
959
                            _chown_fn=self._FakeChown)
960
    self.assertEqual(self._chown_calls.pop(0),
961
                     ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
962

    
963

    
964
if __name__ == "__main__":
965
  testutils.GanetiTestProgram()