Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ fbdac0d9

History | View | Annotate | Download (34.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31
import stat
32
import errno
33

    
34
from ganeti import constants
35
from ganeti import utils
36
from ganeti import compat
37
from ganeti import errors
38

    
39
import testutils
40

    
41

    
42
class TestReadFile(testutils.GanetiTestCase):
43
  def testReadAll(self):
44
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45
    self.assertEqual(len(data), 814)
46

    
47
    h = compat.md5_hash()
48
    h.update(data)
49
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
50

    
51
  def testReadSize(self):
52
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
53
                          size=100)
54
    self.assertEqual(len(data), 100)
55

    
56
    h = compat.md5_hash()
57
    h.update(data)
58
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
59

    
60
  def testCallback(self):
61
    def _Cb(fh):
62
      self.assertEqual(fh.tell(), 0)
63
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64
    self.assertEqual(len(data), 814)
65

    
66
  def testError(self):
67
    self.assertRaises(EnvironmentError, utils.ReadFile,
68
                      "/dev/null/does-not-exist")
69

    
70

    
71
class TestReadOneLineFile(testutils.GanetiTestCase):
72
  def setUp(self):
73
    testutils.GanetiTestCase.setUp(self)
74

    
75
  def testDefault(self):
76
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77
    self.assertEqual(len(data), 27)
78
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
79

    
80
  def testNotStrict(self):
81
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
82
                                 strict=False)
83
    self.assertEqual(len(data), 27)
84
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
85

    
86
  def testStrictFailure(self):
87
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88
                      self._TestDataFilename("cert1.pem"), strict=True)
89

    
90
  def testLongLine(self):
91
    dummydata = (1024 * "Hello World! ")
92
    myfile = self._CreateTempFile()
93
    utils.WriteFile(myfile, data=dummydata)
94
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
95
    datalax = utils.ReadOneLineFile(myfile, strict=False)
96
    self.assertEqual(dummydata, datastrict)
97
    self.assertEqual(dummydata, datalax)
98

    
99
  def testNewline(self):
100
    myfile = self._CreateTempFile()
101
    myline = "myline"
102
    for nl in ["", "\n", "\r\n"]:
103
      dummydata = "%s%s" % (myline, nl)
104
      utils.WriteFile(myfile, data=dummydata)
105
      datalax = utils.ReadOneLineFile(myfile, strict=False)
106
      self.assertEqual(myline, datalax)
107
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
108
      self.assertEqual(myline, datastrict)
109

    
110
  def testWhitespaceAndMultipleLines(self):
111
    myfile = self._CreateTempFile()
112
    for nl in ["", "\n", "\r\n"]:
113
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
114
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115
        utils.WriteFile(myfile, data=dummydata)
116
        datalax = utils.ReadOneLineFile(myfile, strict=False)
117
        if nl:
118
          self.assert_(set("\r\n") & set(dummydata))
119
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
120
                            myfile, strict=True)
121
          explen = len("Foo bar baz ") + len(ws)
122
          self.assertEqual(len(datalax), explen)
123
          self.assertEqual(datalax, dummydata[:explen])
124
          self.assertFalse(set("\r\n") & set(datalax))
125
        else:
126
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
127
          self.assertEqual(dummydata, datastrict)
128
          self.assertEqual(dummydata, datalax)
129

    
130
  def testEmptylines(self):
131
    myfile = self._CreateTempFile()
132
    myline = "myline"
133
    for nl in ["\n", "\r\n"]:
134
      for ol in ["", "otherline"]:
135
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136
        utils.WriteFile(myfile, data=dummydata)
137
        self.assert_(set("\r\n") & set(dummydata))
138
        datalax = utils.ReadOneLineFile(myfile, strict=False)
139
        self.assertEqual(myline, datalax)
140
        if ol:
141
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
142
                            myfile, strict=True)
143
        else:
144
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
145
          self.assertEqual(myline, datastrict)
146

    
147
  def testEmptyfile(self):
148
    myfile = self._CreateTempFile()
149
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
150

    
151

    
152
class TestTimestampForFilename(unittest.TestCase):
153
  def test(self):
154
    self.assert_("." not in utils.TimestampForFilename())
155
    self.assert_(":" not in utils.TimestampForFilename())
156

    
157

    
158
class TestCreateBackup(testutils.GanetiTestCase):
159
  def setUp(self):
160
    testutils.GanetiTestCase.setUp(self)
161

    
162
    self.tmpdir = tempfile.mkdtemp()
163

    
164
  def tearDown(self):
165
    testutils.GanetiTestCase.tearDown(self)
166

    
167
    shutil.rmtree(self.tmpdir)
168

    
169
  def testEmpty(self):
170
    filename = utils.PathJoin(self.tmpdir, "config.data")
171
    utils.WriteFile(filename, data="")
172
    bname = utils.CreateBackup(filename)
173
    self.assertFileContent(bname, "")
174
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175
    utils.CreateBackup(filename)
176
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177
    utils.CreateBackup(filename)
178
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
179

    
180
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
181
    os.mkfifo(fifoname)
182
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
183

    
184
  def testContent(self):
185
    bkpcount = 0
186
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187
      for rep in [1, 2, 10, 127]:
188
        testdata = data * rep
189

    
190
        filename = utils.PathJoin(self.tmpdir, "test.data_")
191
        utils.WriteFile(filename, data=testdata)
192
        self.assertFileContent(filename, testdata)
193

    
194
        for _ in range(3):
195
          bname = utils.CreateBackup(filename)
196
          bkpcount += 1
197
          self.assertFileContent(bname, testdata)
198
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
199

    
200

    
201
class TestListVisibleFiles(unittest.TestCase):
202
  """Test case for ListVisibleFiles"""
203

    
204
  def setUp(self):
205
    self.path = tempfile.mkdtemp()
206

    
207
  def tearDown(self):
208
    shutil.rmtree(self.path)
209

    
210
  def _CreateFiles(self, files):
211
    for name in files:
212
      utils.WriteFile(os.path.join(self.path, name), data="test")
213

    
214
  def _test(self, files, expected):
215
    self._CreateFiles(files)
216
    found = utils.ListVisibleFiles(self.path)
217
    self.assertEqual(set(found), set(expected))
218

    
219
  def testAllVisible(self):
220
    files = ["a", "b", "c"]
221
    expected = files
222
    self._test(files, expected)
223

    
224
  def testNoneVisible(self):
225
    files = [".a", ".b", ".c"]
226
    expected = []
227
    self._test(files, expected)
228

    
229
  def testSomeVisible(self):
230
    files = ["a", "b", ".c"]
231
    expected = ["a", "b"]
232
    self._test(files, expected)
233

    
234
  def testNonAbsolutePath(self):
235
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
236
                          "abc")
237

    
238
  def testNonNormalizedPath(self):
239
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
240
                          "/bin/../tmp")
241

    
242
  def testMountpoint(self):
243
    lvfmp_fn = compat.partial(utils.ListVisibleFiles,
244
                              _is_mountpoint=lambda _: True)
245
    self.assertEqual(lvfmp_fn(self.path), [])
246

    
247
    # Create "lost+found" as a regular file
248
    self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
249
    self.assertEqual(set(lvfmp_fn(self.path)),
250
                     set(["foo", "bar", "lost+found"]))
251

    
252
    # Replace "lost+found" with a directory
253
    laf_path = utils.PathJoin(self.path, "lost+found")
254
    utils.RemoveFile(laf_path)
255
    os.mkdir(laf_path)
256
    self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
257

    
258
  def testLostAndFoundNoMountpoint(self):
259
    files = ["foo", "bar", ".Hello World", "lost+found"]
260
    expected = ["foo", "bar", "lost+found"]
261
    self._test(files, expected)
262

    
263

    
264
class TestWriteFile(testutils.GanetiTestCase):
265
  def setUp(self):
266
    testutils.GanetiTestCase.setUp(self)
267
    self.tmpdir = None
268
    self.tfile = tempfile.NamedTemporaryFile()
269
    self.did_pre = False
270
    self.did_post = False
271
    self.did_write = False
272

    
273
  def tearDown(self):
274
    testutils.GanetiTestCase.tearDown(self)
275
    if self.tmpdir:
276
      shutil.rmtree(self.tmpdir)
277

    
278
  def markPre(self, fd):
279
    self.did_pre = True
280

    
281
  def markPost(self, fd):
282
    self.did_post = True
283

    
284
  def markWrite(self, fd):
285
    self.did_write = True
286

    
287
  def testWrite(self):
288
    data = "abc"
289
    utils.WriteFile(self.tfile.name, data=data)
290
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
291

    
292
  def testWriteSimpleUnicode(self):
293
    data = u"abc"
294
    utils.WriteFile(self.tfile.name, data=data)
295
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
296

    
297
  def testErrors(self):
298
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
299
                      self.tfile.name, data="test", fn=lambda fd: None)
300
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
301
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
302
                      self.tfile.name, data="test", atime=0)
303
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
304
                      mode=0400, keep_perms=utils.KP_ALWAYS)
305
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
306
                      uid=0, keep_perms=utils.KP_ALWAYS)
307
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
308
                      gid=0, keep_perms=utils.KP_ALWAYS)
309
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
310
                      mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
311

    
312
  def testPreWrite(self):
313
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
314
    self.assertTrue(self.did_pre)
315
    self.assertFalse(self.did_post)
316
    self.assertFalse(self.did_write)
317

    
318
  def testPostWrite(self):
319
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
320
    self.assertFalse(self.did_pre)
321
    self.assertTrue(self.did_post)
322
    self.assertFalse(self.did_write)
323

    
324
  def testWriteFunction(self):
325
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
326
    self.assertFalse(self.did_pre)
327
    self.assertFalse(self.did_post)
328
    self.assertTrue(self.did_write)
329

    
330
  def testDryRun(self):
331
    orig = "abc"
332
    self.tfile.write(orig)
333
    self.tfile.flush()
334
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
335
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
336

    
337
  def testTimes(self):
338
    f = self.tfile.name
339
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
340
                   (int(time.time()), 5000)]:
341
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
342
      st = os.stat(f)
343
      self.assertEqual(st.st_atime, at)
344
      self.assertEqual(st.st_mtime, mt)
345

    
346
  def testNoClose(self):
347
    data = "hello"
348
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
349
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
350
    try:
351
      os.lseek(fd, 0, 0)
352
      self.assertEqual(os.read(fd, 4096), data)
353
    finally:
354
      os.close(fd)
355

    
356
  def testNoLeftovers(self):
357
    self.tmpdir = tempfile.mkdtemp()
358
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
359
                                     data="abc"),
360
                     None)
361
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
362

    
363
  def testFailRename(self):
364
    self.tmpdir = tempfile.mkdtemp()
365
    target = utils.PathJoin(self.tmpdir, "target")
366
    os.mkdir(target)
367
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
368
    self.assertTrue(os.path.isdir(target))
369
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
370
    self.assertFalse(os.listdir(target))
371

    
372
  def testFailRenameDryRun(self):
373
    self.tmpdir = tempfile.mkdtemp()
374
    target = utils.PathJoin(self.tmpdir, "target")
375
    os.mkdir(target)
376
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
377
    self.assertTrue(os.path.isdir(target))
378
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
379
    self.assertFalse(os.listdir(target))
380

    
381
  def testBackup(self):
382
    self.tmpdir = tempfile.mkdtemp()
383
    testfile = utils.PathJoin(self.tmpdir, "test")
384

    
385
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
386
    self.assertEqual(utils.ReadFile(testfile), "foo")
387
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
388

    
389
    # Write again
390
    assert os.path.isfile(testfile)
391
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
392
    self.assertEqual(utils.ReadFile(testfile), "bar")
393
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
394
    self.assertTrue("test" in os.listdir(self.tmpdir))
395
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
396

    
397
    # Write again as dry-run
398
    assert os.path.isfile(testfile)
399
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
400
                                     dry_run=True),
401
                     None)
402
    self.assertEqual(utils.ReadFile(testfile), "bar")
403
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
404
    self.assertTrue("test" in os.listdir(self.tmpdir))
405
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
406

    
407
  def testFileMode(self):
408
    self.tmpdir = tempfile.mkdtemp()
409
    target = utils.PathJoin(self.tmpdir, "target")
410
    self.assertRaises(OSError, utils.WriteFile, target, data="data",
411
                      keep_perms=utils.KP_ALWAYS)
412
    # All masks have only user bits set, to avoid interactions with umask
413
    utils.WriteFile(target, data="data", mode=0200)
414
    self.assertFileMode(target, 0200)
415
    utils.WriteFile(target, data="data", mode=0400,
416
                    keep_perms=utils.KP_IF_EXISTS)
417
    self.assertFileMode(target, 0200)
418
    utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
419
    self.assertFileMode(target, 0200)
420
    utils.WriteFile(target, data="data", mode=0700)
421
    self.assertFileMode(target, 0700)
422

    
423
  def testNewFileMode(self):
424
    self.tmpdir = tempfile.mkdtemp()
425
    target = utils.PathJoin(self.tmpdir, "target")
426
    utils.WriteFile(target, data="data", mode=0400,
427
                    keep_perms=utils.KP_IF_EXISTS)
428
    self.assertFileMode(target, 0400)
429

    
430
class TestFileID(testutils.GanetiTestCase):
431
  def testEquality(self):
432
    name = self._CreateTempFile()
433
    oldi = utils.GetFileID(path=name)
434
    self.failUnless(utils.VerifyFileID(oldi, oldi))
435

    
436
  def testUpdate(self):
437
    name = self._CreateTempFile()
438
    oldi = utils.GetFileID(path=name)
439
    fd = os.open(name, os.O_RDWR)
440
    try:
441
      newi = utils.GetFileID(fd=fd)
442
      self.failUnless(utils.VerifyFileID(oldi, newi))
443
      self.failUnless(utils.VerifyFileID(newi, oldi))
444
    finally:
445
      os.close(fd)
446

    
447
  def testWriteFile(self):
448
    name = self._CreateTempFile()
449
    oldi = utils.GetFileID(path=name)
450
    mtime = oldi[2]
451
    os.utime(name, (mtime + 10, mtime + 10))
452
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
453
                      oldi, data="")
454
    os.utime(name, (mtime - 10, mtime - 10))
455
    utils.SafeWriteFile(name, oldi, data="")
456
    oldi = utils.GetFileID(path=name)
457
    mtime = oldi[2]
458
    os.utime(name, (mtime + 10, mtime + 10))
459
    # this doesn't raise, since we passed None
460
    utils.SafeWriteFile(name, None, data="")
461

    
462
  def testError(self):
463
    t = tempfile.NamedTemporaryFile()
464
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
465
                      path=t.name, fd=t.fileno())
466

    
467

    
468
class TestRemoveFile(unittest.TestCase):
469
  """Test case for the RemoveFile function"""
470

    
471
  def setUp(self):
472
    """Create a temp dir and file for each case"""
473
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
474
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
475
    os.close(fd)
476

    
477
  def tearDown(self):
478
    if os.path.exists(self.tmpfile):
479
      os.unlink(self.tmpfile)
480
    os.rmdir(self.tmpdir)
481

    
482
  def testIgnoreDirs(self):
483
    """Test that RemoveFile() ignores directories"""
484
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
485

    
486
  def testIgnoreNotExisting(self):
487
    """Test that RemoveFile() ignores non-existing files"""
488
    utils.RemoveFile(self.tmpfile)
489
    utils.RemoveFile(self.tmpfile)
490

    
491
  def testRemoveFile(self):
492
    """Test that RemoveFile does remove a file"""
493
    utils.RemoveFile(self.tmpfile)
494
    if os.path.exists(self.tmpfile):
495
      self.fail("File '%s' not removed" % self.tmpfile)
496

    
497
  def testRemoveSymlink(self):
498
    """Test that RemoveFile does remove symlinks"""
499
    symlink = self.tmpdir + "/symlink"
500
    os.symlink("no-such-file", symlink)
501
    utils.RemoveFile(symlink)
502
    if os.path.exists(symlink):
503
      self.fail("File '%s' not removed" % symlink)
504
    os.symlink(self.tmpfile, symlink)
505
    utils.RemoveFile(symlink)
506
    if os.path.exists(symlink):
507
      self.fail("File '%s' not removed" % symlink)
508

    
509

    
510
class TestRemoveDir(unittest.TestCase):
511
  def setUp(self):
512
    self.tmpdir = tempfile.mkdtemp()
513

    
514
  def tearDown(self):
515
    try:
516
      shutil.rmtree(self.tmpdir)
517
    except EnvironmentError:
518
      pass
519

    
520
  def testEmptyDir(self):
521
    utils.RemoveDir(self.tmpdir)
522
    self.assertFalse(os.path.isdir(self.tmpdir))
523

    
524
  def testNonEmptyDir(self):
525
    self.tmpfile = os.path.join(self.tmpdir, "test1")
526
    open(self.tmpfile, "w").close()
527
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
528

    
529

    
530
class TestRename(unittest.TestCase):
531
  """Test case for RenameFile"""
532

    
533
  def setUp(self):
534
    """Create a temporary directory"""
535
    self.tmpdir = tempfile.mkdtemp()
536
    self.tmpfile = os.path.join(self.tmpdir, "test1")
537

    
538
    # Touch the file
539
    open(self.tmpfile, "w").close()
540

    
541
  def tearDown(self):
542
    """Remove temporary directory"""
543
    shutil.rmtree(self.tmpdir)
544

    
545
  def testSimpleRename1(self):
546
    """Simple rename 1"""
547
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
548
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
549

    
550
  def testSimpleRename2(self):
551
    """Simple rename 2"""
552
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
553
                     mkdir=True)
554
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
555

    
556
  def testRenameMkdir(self):
557
    """Rename with mkdir"""
558
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
559
                     mkdir=True)
560
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
561
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
562

    
563
    self.assertRaises(EnvironmentError, utils.RenameFile,
564
                      os.path.join(self.tmpdir, "test/xyz"),
565
                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
566
                      mkdir=True)
567

    
568
    self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
569
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
570
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
571
                                                 "test/foo/bar/baz")))
572

    
573

    
574
class TestMakedirs(unittest.TestCase):
575
  def setUp(self):
576
    self.tmpdir = tempfile.mkdtemp()
577

    
578
  def tearDown(self):
579
    shutil.rmtree(self.tmpdir)
580

    
581
  def testNonExisting(self):
582
    path = utils.PathJoin(self.tmpdir, "foo")
583
    utils.Makedirs(path)
584
    self.assert_(os.path.isdir(path))
585

    
586
  def testExisting(self):
587
    path = utils.PathJoin(self.tmpdir, "foo")
588
    os.mkdir(path)
589
    utils.Makedirs(path)
590
    self.assert_(os.path.isdir(path))
591

    
592
  def testRecursiveNonExisting(self):
593
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
594
    utils.Makedirs(path)
595
    self.assert_(os.path.isdir(path))
596

    
597
  def testRecursiveExisting(self):
598
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
599
    self.assertFalse(os.path.exists(path))
600
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
601
    utils.Makedirs(path)
602
    self.assert_(os.path.isdir(path))
603

    
604

    
605
class TestEnsureDirs(unittest.TestCase):
606
  """Tests for EnsureDirs"""
607

    
608
  def setUp(self):
609
    self.dir = tempfile.mkdtemp()
610
    self.old_umask = os.umask(0777)
611

    
612
  def testEnsureDirs(self):
613
    utils.EnsureDirs([
614
        (utils.PathJoin(self.dir, "foo"), 0777),
615
        (utils.PathJoin(self.dir, "bar"), 0000),
616
        ])
617
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
618
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
619

    
620
  def tearDown(self):
621
    os.rmdir(utils.PathJoin(self.dir, "foo"))
622
    os.rmdir(utils.PathJoin(self.dir, "bar"))
623
    os.rmdir(self.dir)
624
    os.umask(self.old_umask)
625

    
626

    
627
class TestIsNormAbsPath(unittest.TestCase):
628
  """Testing case for IsNormAbsPath"""
629

    
630
  def _pathTestHelper(self, path, result):
631
    if result:
632
      self.assert_(utils.IsNormAbsPath(path),
633
          "Path %s should result absolute and normalized" % path)
634
    else:
635
      self.assertFalse(utils.IsNormAbsPath(path),
636
          "Path %s should not result absolute and normalized" % path)
637

    
638
  def testBase(self):
639
    self._pathTestHelper("/etc", True)
640
    self._pathTestHelper("/srv", True)
641
    self._pathTestHelper("etc", False)
642
    self._pathTestHelper("/etc/../root", False)
643
    self._pathTestHelper("/etc/", False)
644

    
645

    
646
class TestIsBelowDir(unittest.TestCase):
647
  """Testing case for IsBelowDir"""
648

    
649
  def testExactlyTheSame(self):
650
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b"))
651
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/"))
652
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b"))
653
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/"))
654

    
655
  def testSamePrefix(self):
656
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
657
    self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
658

    
659
  def testSamePrefixButDifferentDir(self):
660
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
661
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
662

    
663
  def testSamePrefixButDirTraversal(self):
664
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
665
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
666

    
667
  def testSamePrefixAndTraversal(self):
668
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
669
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
670
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
671

    
672
  def testBothAbsPath(self):
673
    self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
674
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
675
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
676

    
677

    
678
class TestPathJoin(unittest.TestCase):
679
  """Testing case for PathJoin"""
680

    
681
  def testBasicItems(self):
682
    mlist = ["/a", "b", "c"]
683
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
684

    
685
  def testNonAbsPrefix(self):
686
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
687

    
688
  def testBackTrack(self):
689
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
690

    
691
  def testMultiAbs(self):
692
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
693

    
694

    
695
class TestTailFile(testutils.GanetiTestCase):
696
  """Test case for the TailFile function"""
697

    
698
  def testEmpty(self):
699
    fname = self._CreateTempFile()
700
    self.failUnlessEqual(utils.TailFile(fname), [])
701
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
702

    
703
  def testAllLines(self):
704
    data = ["test %d" % i for i in range(30)]
705
    for i in range(30):
706
      fname = self._CreateTempFile()
707
      fd = open(fname, "w")
708
      fd.write("\n".join(data[:i]))
709
      if i > 0:
710
        fd.write("\n")
711
      fd.close()
712
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
713

    
714
  def testPartialLines(self):
715
    data = ["test %d" % i for i in range(30)]
716
    fname = self._CreateTempFile()
717
    fd = open(fname, "w")
718
    fd.write("\n".join(data))
719
    fd.write("\n")
720
    fd.close()
721
    for i in range(1, 30):
722
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
723

    
724
  def testBigFile(self):
725
    data = ["test %d" % i for i in range(30)]
726
    fname = self._CreateTempFile()
727
    fd = open(fname, "w")
728
    fd.write("X" * 1048576)
729
    fd.write("\n")
730
    fd.write("\n".join(data))
731
    fd.write("\n")
732
    fd.close()
733
    for i in range(1, 30):
734
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
735

    
736

    
737
class TestPidFileFunctions(unittest.TestCase):
738
  """Tests for WritePidFile and ReadPidFile"""
739

    
740
  def setUp(self):
741
    self.dir = tempfile.mkdtemp()
742
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
743

    
744
  def testPidFileFunctions(self):
745
    pid_file = self.f_dpn('test')
746
    fd = utils.WritePidFile(self.f_dpn('test'))
747
    self.failUnless(os.path.exists(pid_file),
748
                    "PID file should have been created")
749
    read_pid = utils.ReadPidFile(pid_file)
750
    self.failUnlessEqual(read_pid, os.getpid())
751
    self.failUnless(utils.IsProcessAlive(read_pid))
752
    self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
753
                          self.f_dpn('test'))
754
    os.close(fd)
755
    utils.RemoveFile(self.f_dpn("test"))
756
    self.failIf(os.path.exists(pid_file),
757
                "PID file should not exist anymore")
758
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
759
                         "ReadPidFile should return 0 for missing pid file")
760
    fh = open(pid_file, "w")
761
    fh.write("blah\n")
762
    fh.close()
763
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
764
                         "ReadPidFile should return 0 for invalid pid file")
765
    # but now, even with the file existing, we should be able to lock it
766
    fd = utils.WritePidFile(self.f_dpn('test'))
767
    os.close(fd)
768
    utils.RemoveFile(self.f_dpn("test"))
769
    self.failIf(os.path.exists(pid_file),
770
                "PID file should not exist anymore")
771

    
772
  def testKill(self):
773
    pid_file = self.f_dpn('child')
774
    r_fd, w_fd = os.pipe()
775
    new_pid = os.fork()
776
    if new_pid == 0: #child
777
      utils.WritePidFile(self.f_dpn('child'))
778
      os.write(w_fd, 'a')
779
      signal.pause()
780
      os._exit(0)
781
      return
782
    # else we are in the parent
783
    # wait until the child has written the pid file
784
    os.read(r_fd, 1)
785
    read_pid = utils.ReadPidFile(pid_file)
786
    self.failUnlessEqual(read_pid, new_pid)
787
    self.failUnless(utils.IsProcessAlive(new_pid))
788

    
789
    # Try writing to locked file
790
    try:
791
      utils.WritePidFile(pid_file)
792
    except errors.PidFileLockError, err:
793
      errmsg = str(err)
794
      self.assertTrue(errmsg.endswith(" %s" % new_pid),
795
                      msg=("Error message ('%s') didn't contain correct"
796
                           " PID (%s)" % (errmsg, new_pid)))
797
    else:
798
      self.fail("Writing to locked file didn't fail")
799

    
800
    utils.KillProcess(new_pid, waitpid=True)
801
    self.failIf(utils.IsProcessAlive(new_pid))
802
    utils.RemoveFile(self.f_dpn('child'))
803
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
804

    
805
  def testExceptionType(self):
806
    # Make sure the PID lock error is a subclass of LockError in case some code
807
    # depends on it
808
    self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
809

    
810
  def tearDown(self):
811
    shutil.rmtree(self.dir)
812

    
813

    
814
class TestSshKeys(testutils.GanetiTestCase):
815
  """Test case for the AddAuthorizedKey function"""
816

    
817
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
818
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
819
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
820

    
821
  def setUp(self):
822
    testutils.GanetiTestCase.setUp(self)
823
    self.tmpname = self._CreateTempFile()
824
    handle = open(self.tmpname, 'w')
825
    try:
826
      handle.write("%s\n" % TestSshKeys.KEY_A)
827
      handle.write("%s\n" % TestSshKeys.KEY_B)
828
    finally:
829
      handle.close()
830

    
831
  def testAddingNewKey(self):
832
    utils.AddAuthorizedKey(self.tmpname,
833
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
834

    
835
    self.assertFileContent(self.tmpname,
836
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
837
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
838
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
839
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
840

    
841
  def testAddingAlmostButNotCompletelyTheSameKey(self):
842
    utils.AddAuthorizedKey(self.tmpname,
843
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
844

    
845
    self.assertFileContent(self.tmpname,
846
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
847
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
848
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
849
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
850

    
851
  def testAddingExistingKeyWithSomeMoreSpaces(self):
852
    utils.AddAuthorizedKey(self.tmpname,
853
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
854

    
855
    self.assertFileContent(self.tmpname,
856
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
857
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
858
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
859

    
860
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
861
    utils.RemoveAuthorizedKey(self.tmpname,
862
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
863

    
864
    self.assertFileContent(self.tmpname,
865
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
866
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
867

    
868
  def testRemovingNonExistingKey(self):
869
    utils.RemoveAuthorizedKey(self.tmpname,
870
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
871

    
872
    self.assertFileContent(self.tmpname,
873
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
874
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
875
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
876

    
877

    
878
class TestNewUUID(unittest.TestCase):
879
  """Test case for NewUUID"""
880

    
881
  def runTest(self):
882
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
883

    
884

    
885
def _MockStatResult(cb, mode, uid, gid):
886
  def _fn(path):
887
    if cb:
888
      cb()
889
    return {
890
      stat.ST_MODE: mode,
891
      stat.ST_UID: uid,
892
      stat.ST_GID: gid,
893
      }
894
  return _fn
895

    
896

    
897
def _RaiseNoEntError():
898
  raise EnvironmentError(errno.ENOENT, "not found")
899

    
900

    
901
def _OtherStatRaise():
902
  raise EnvironmentError()
903

    
904

    
905
class TestPermissionEnforcements(unittest.TestCase):
906
  UID_A = 16024
907
  UID_B = 25850
908
  GID_A = 14028
909
  GID_B = 29801
910

    
911
  def setUp(self):
912
    self._chown_calls = []
913
    self._chmod_calls = []
914
    self._mkdir_calls = []
915

    
916
  def tearDown(self):
917
    self.assertRaises(IndexError, self._mkdir_calls.pop)
918
    self.assertRaises(IndexError, self._chmod_calls.pop)
919
    self.assertRaises(IndexError, self._chown_calls.pop)
920

    
921
  def _FakeMkdir(self, path):
922
    self._mkdir_calls.append(path)
923

    
924
  def _FakeChown(self, path, uid, gid):
925
    self._chown_calls.append((path, uid, gid))
926

    
927
  def _ChmodWrapper(self, cb):
928
    def _fn(path, mode):
929
      self._chmod_calls.append((path, mode))
930
      if cb:
931
        cb()
932
    return _fn
933

    
934
  def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
935
    self.assertEqual(path, "/ganeti-qa-non-test")
936
    self.assertEqual(mode, 0700)
937
    self.assertEqual(uid, self.UID_A)
938
    self.assertEqual(gid, self.GID_A)
939

    
940
  def testMakeDirWithPerm(self):
941
    is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
942
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
943
                          _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
944

    
945
  def testDirErrors(self):
946
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
947
                      "/ganeti-qa-non-test", 0700, 0, 0,
948
                      _lstat_fn=_MockStatResult(None, 0, 0, 0))
949
    self.assertRaises(IndexError, self._mkdir_calls.pop)
950

    
951
    other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
952
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
953
                      "/ganeti-qa-non-test", 0700, 0, 0,
954
                      _lstat_fn=other_stat_raise)
955
    self.assertRaises(IndexError, self._mkdir_calls.pop)
956

    
957
    non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
958
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
959
                          _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
960
                          _perm_fn=self._VerifyPerm)
961
    self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
962

    
963
  def testEnforcePermissionNoEnt(self):
964
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
965
                      "/ganeti-qa-non-test", 0600,
966
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
967
                      _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
968

    
969
  def testEnforcePermissionNoEntMustNotExist(self):
970
    utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
971
                            _chmod_fn=NotImplemented,
972
                            _chown_fn=NotImplemented,
973
                            _stat_fn=_MockStatResult(_RaiseNoEntError,
974
                                                          0, 0, 0))
975

    
976
  def testEnforcePermissionOtherErrorMustNotExist(self):
977
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
978
                      "/ganeti-qa-non-test", 0600, must_exist=False,
979
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
980
                      _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
981

    
982
  def testEnforcePermissionNoChanges(self):
983
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
984
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
985
                            _chmod_fn=self._ChmodWrapper(None),
986
                            _chown_fn=self._FakeChown)
987

    
988
  def testEnforcePermissionChangeMode(self):
989
    utils.EnforcePermission("/ganeti-qa-non-test", 0444,
990
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
991
                            _chmod_fn=self._ChmodWrapper(None),
992
                            _chown_fn=self._FakeChown)
993
    self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
994

    
995
  def testEnforcePermissionSetUidGid(self):
996
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
997
                            uid=self.UID_B, gid=self.GID_B,
998
                            _stat_fn=_MockStatResult(None, 0600,
999
                                                     self.UID_A,
1000
                                                     self.GID_A),
1001
                            _chmod_fn=self._ChmodWrapper(None),
1002
                            _chown_fn=self._FakeChown)
1003
    self.assertEqual(self._chown_calls.pop(0),
1004
                     ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
1005

    
1006

    
1007
if __name__ == "__main__":
1008
  testutils.GanetiTestProgram()