Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.io_unittest.py @ 553cb5f7

History | View | Annotate | Download (34 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.io"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import glob
29
import time
30
import signal
31
import stat
32
import errno
33

    
34
from ganeti import constants
35
from ganeti import utils
36
from ganeti import compat
37
from ganeti import errors
38

    
39
import testutils
40

    
41

    
42
class TestReadFile(testutils.GanetiTestCase):
43
  def testReadAll(self):
44
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
45
    self.assertEqual(len(data), 814)
46

    
47
    h = compat.md5_hash()
48
    h.update(data)
49
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
50

    
51
  def testReadSize(self):
52
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
53
                          size=100)
54
    self.assertEqual(len(data), 100)
55

    
56
    h = compat.md5_hash()
57
    h.update(data)
58
    self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
59

    
60
  def testCallback(self):
61
    def _Cb(fh):
62
      self.assertEqual(fh.tell(), 0)
63
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
64
    self.assertEqual(len(data), 814)
65

    
66
  def testError(self):
67
    self.assertRaises(EnvironmentError, utils.ReadFile,
68
                      "/dev/null/does-not-exist")
69

    
70

    
71
class TestReadOneLineFile(testutils.GanetiTestCase):
72
  def setUp(self):
73
    testutils.GanetiTestCase.setUp(self)
74

    
75
  def testDefault(self):
76
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
77
    self.assertEqual(len(data), 27)
78
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
79

    
80
  def testNotStrict(self):
81
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
82
                                 strict=False)
83
    self.assertEqual(len(data), 27)
84
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
85

    
86
  def testStrictFailure(self):
87
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88
                      self._TestDataFilename("cert1.pem"), strict=True)
89

    
90
  def testLongLine(self):
91
    dummydata = (1024 * "Hello World! ")
92
    myfile = self._CreateTempFile()
93
    utils.WriteFile(myfile, data=dummydata)
94
    datastrict = utils.ReadOneLineFile(myfile, strict=True)
95
    datalax = utils.ReadOneLineFile(myfile, strict=False)
96
    self.assertEqual(dummydata, datastrict)
97
    self.assertEqual(dummydata, datalax)
98

    
99
  def testNewline(self):
100
    myfile = self._CreateTempFile()
101
    myline = "myline"
102
    for nl in ["", "\n", "\r\n"]:
103
      dummydata = "%s%s" % (myline, nl)
104
      utils.WriteFile(myfile, data=dummydata)
105
      datalax = utils.ReadOneLineFile(myfile, strict=False)
106
      self.assertEqual(myline, datalax)
107
      datastrict = utils.ReadOneLineFile(myfile, strict=True)
108
      self.assertEqual(myline, datastrict)
109

    
110
  def testWhitespaceAndMultipleLines(self):
111
    myfile = self._CreateTempFile()
112
    for nl in ["", "\n", "\r\n"]:
113
      for ws in [" ", "\t", "\t\t  \t", "\t "]:
114
        dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
115
        utils.WriteFile(myfile, data=dummydata)
116
        datalax = utils.ReadOneLineFile(myfile, strict=False)
117
        if nl:
118
          self.assert_(set("\r\n") & set(dummydata))
119
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
120
                            myfile, strict=True)
121
          explen = len("Foo bar baz ") + len(ws)
122
          self.assertEqual(len(datalax), explen)
123
          self.assertEqual(datalax, dummydata[:explen])
124
          self.assertFalse(set("\r\n") & set(datalax))
125
        else:
126
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
127
          self.assertEqual(dummydata, datastrict)
128
          self.assertEqual(dummydata, datalax)
129

    
130
  def testEmptylines(self):
131
    myfile = self._CreateTempFile()
132
    myline = "myline"
133
    for nl in ["\n", "\r\n"]:
134
      for ol in ["", "otherline"]:
135
        dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
136
        utils.WriteFile(myfile, data=dummydata)
137
        self.assert_(set("\r\n") & set(dummydata))
138
        datalax = utils.ReadOneLineFile(myfile, strict=False)
139
        self.assertEqual(myline, datalax)
140
        if ol:
141
          self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
142
                            myfile, strict=True)
143
        else:
144
          datastrict = utils.ReadOneLineFile(myfile, strict=True)
145
          self.assertEqual(myline, datastrict)
146

    
147
  def testEmptyfile(self):
148
    myfile = self._CreateTempFile()
149
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile, myfile)
150

    
151

    
152
class TestTimestampForFilename(unittest.TestCase):
153
  def test(self):
154
    self.assert_("." not in utils.TimestampForFilename())
155
    self.assert_(":" not in utils.TimestampForFilename())
156

    
157

    
158
class TestCreateBackup(testutils.GanetiTestCase):
159
  def setUp(self):
160
    testutils.GanetiTestCase.setUp(self)
161

    
162
    self.tmpdir = tempfile.mkdtemp()
163

    
164
  def tearDown(self):
165
    testutils.GanetiTestCase.tearDown(self)
166

    
167
    shutil.rmtree(self.tmpdir)
168

    
169
  def testEmpty(self):
170
    filename = utils.PathJoin(self.tmpdir, "config.data")
171
    utils.WriteFile(filename, data="")
172
    bname = utils.CreateBackup(filename)
173
    self.assertFileContent(bname, "")
174
    self.assertEqual(len(glob.glob("%s*" % filename)), 2)
175
    utils.CreateBackup(filename)
176
    self.assertEqual(len(glob.glob("%s*" % filename)), 3)
177
    utils.CreateBackup(filename)
178
    self.assertEqual(len(glob.glob("%s*" % filename)), 4)
179

    
180
    fifoname = utils.PathJoin(self.tmpdir, "fifo")
181
    os.mkfifo(fifoname)
182
    self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
183

    
184
  def testContent(self):
185
    bkpcount = 0
186
    for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
187
      for rep in [1, 2, 10, 127]:
188
        testdata = data * rep
189

    
190
        filename = utils.PathJoin(self.tmpdir, "test.data_")
191
        utils.WriteFile(filename, data=testdata)
192
        self.assertFileContent(filename, testdata)
193

    
194
        for _ in range(3):
195
          bname = utils.CreateBackup(filename)
196
          bkpcount += 1
197
          self.assertFileContent(bname, testdata)
198
          self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
199

    
200

    
201
class TestListVisibleFiles(unittest.TestCase):
202
  """Test case for ListVisibleFiles"""
203

    
204
  def setUp(self):
205
    self.path = tempfile.mkdtemp()
206

    
207
  def tearDown(self):
208
    shutil.rmtree(self.path)
209

    
210
  def _CreateFiles(self, files):
211
    for name in files:
212
      utils.WriteFile(os.path.join(self.path, name), data="test")
213

    
214
  def _test(self, files, expected):
215
    self._CreateFiles(files)
216
    found = utils.ListVisibleFiles(self.path)
217
    self.assertEqual(set(found), set(expected))
218

    
219
  def testAllVisible(self):
220
    files = ["a", "b", "c"]
221
    expected = files
222
    self._test(files, expected)
223

    
224
  def testNoneVisible(self):
225
    files = [".a", ".b", ".c"]
226
    expected = []
227
    self._test(files, expected)
228

    
229
  def testSomeVisible(self):
230
    files = ["a", "b", ".c"]
231
    expected = ["a", "b"]
232
    self._test(files, expected)
233

    
234
  def testNonAbsolutePath(self):
235
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
236
                          "abc")
237

    
238
  def testNonNormalizedPath(self):
239
    self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
240
                          "/bin/../tmp")
241

    
242
  def testMountpoint(self):
243
    lvfmp_fn = compat.partial(utils.ListVisibleFiles,
244
                              _is_mountpoint=lambda _: True)
245
    self.assertEqual(lvfmp_fn(self.path), [])
246

    
247
    # Create "lost+found" as a regular file
248
    self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
249
    self.assertEqual(set(lvfmp_fn(self.path)),
250
                     set(["foo", "bar", "lost+found"]))
251

    
252
    # Replace "lost+found" with a directory
253
    laf_path = utils.PathJoin(self.path, "lost+found")
254
    utils.RemoveFile(laf_path)
255
    os.mkdir(laf_path)
256
    self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
257

    
258
  def testLostAndFoundNoMountpoint(self):
259
    files = ["foo", "bar", ".Hello World", "lost+found"]
260
    expected = ["foo", "bar", "lost+found"]
261
    self._test(files, expected)
262

    
263

    
264
class TestWriteFile(testutils.GanetiTestCase):
265
  def setUp(self):
266
    testutils.GanetiTestCase.setUp(self)
267
    self.tmpdir = None
268
    self.tfile = tempfile.NamedTemporaryFile()
269
    self.did_pre = False
270
    self.did_post = False
271
    self.did_write = False
272

    
273
  def tearDown(self):
274
    testutils.GanetiTestCase.tearDown(self)
275
    if self.tmpdir:
276
      shutil.rmtree(self.tmpdir)
277

    
278
  def markPre(self, fd):
279
    self.did_pre = True
280

    
281
  def markPost(self, fd):
282
    self.did_post = True
283

    
284
  def markWrite(self, fd):
285
    self.did_write = True
286

    
287
  def testWrite(self):
288
    data = "abc"
289
    utils.WriteFile(self.tfile.name, data=data)
290
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
291

    
292
  def testWriteSimpleUnicode(self):
293
    data = u"abc"
294
    utils.WriteFile(self.tfile.name, data=data)
295
    self.assertEqual(utils.ReadFile(self.tfile.name), data)
296

    
297
  def testErrors(self):
298
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
299
                      self.tfile.name, data="test", fn=lambda fd: None)
300
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
301
    self.assertRaises(errors.ProgrammerError, utils.WriteFile,
302
                      self.tfile.name, data="test", atime=0)
303
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
304
                      mode=0400, keep_perms=utils.KP_ALWAYS)
305
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
306
                      uid=0, keep_perms=utils.KP_ALWAYS)
307
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
308
                      gid=0, keep_perms=utils.KP_ALWAYS)
309
    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
310
                      mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
311

    
312
  def testPreWrite(self):
313
    utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
314
    self.assertTrue(self.did_pre)
315
    self.assertFalse(self.did_post)
316
    self.assertFalse(self.did_write)
317

    
318
  def testPostWrite(self):
319
    utils.WriteFile(self.tfile.name, data="", postwrite=self.markPost)
320
    self.assertFalse(self.did_pre)
321
    self.assertTrue(self.did_post)
322
    self.assertFalse(self.did_write)
323

    
324
  def testWriteFunction(self):
325
    utils.WriteFile(self.tfile.name, fn=self.markWrite)
326
    self.assertFalse(self.did_pre)
327
    self.assertFalse(self.did_post)
328
    self.assertTrue(self.did_write)
329

    
330
  def testDryRun(self):
331
    orig = "abc"
332
    self.tfile.write(orig)
333
    self.tfile.flush()
334
    utils.WriteFile(self.tfile.name, data="hello", dry_run=True)
335
    self.assertEqual(utils.ReadFile(self.tfile.name), orig)
336

    
337
  def testTimes(self):
338
    f = self.tfile.name
339
    for at, mt in [(0, 0), (1000, 1000), (2000, 3000),
340
                   (int(time.time()), 5000)]:
341
      utils.WriteFile(f, data="hello", atime=at, mtime=mt)
342
      st = os.stat(f)
343
      self.assertEqual(st.st_atime, at)
344
      self.assertEqual(st.st_mtime, mt)
345

    
346
  def testNoClose(self):
347
    data = "hello"
348
    self.assertEqual(utils.WriteFile(self.tfile.name, data="abc"), None)
349
    fd = utils.WriteFile(self.tfile.name, data=data, close=False)
350
    try:
351
      os.lseek(fd, 0, 0)
352
      self.assertEqual(os.read(fd, 4096), data)
353
    finally:
354
      os.close(fd)
355

    
356
  def testNoLeftovers(self):
357
    self.tmpdir = tempfile.mkdtemp()
358
    self.assertEqual(utils.WriteFile(utils.PathJoin(self.tmpdir, "test"),
359
                                     data="abc"),
360
                     None)
361
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
362

    
363
  def testFailRename(self):
364
    self.tmpdir = tempfile.mkdtemp()
365
    target = utils.PathJoin(self.tmpdir, "target")
366
    os.mkdir(target)
367
    self.assertRaises(OSError, utils.WriteFile, target, data="abc")
368
    self.assertTrue(os.path.isdir(target))
369
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
370
    self.assertFalse(os.listdir(target))
371

    
372
  def testFailRenameDryRun(self):
373
    self.tmpdir = tempfile.mkdtemp()
374
    target = utils.PathJoin(self.tmpdir, "target")
375
    os.mkdir(target)
376
    self.assertEqual(utils.WriteFile(target, data="abc", dry_run=True), None)
377
    self.assertTrue(os.path.isdir(target))
378
    self.assertEqual(os.listdir(self.tmpdir), ["target"])
379
    self.assertFalse(os.listdir(target))
380

    
381
  def testBackup(self):
382
    self.tmpdir = tempfile.mkdtemp()
383
    testfile = utils.PathJoin(self.tmpdir, "test")
384

    
385
    self.assertEqual(utils.WriteFile(testfile, data="foo", backup=True), None)
386
    self.assertEqual(utils.ReadFile(testfile), "foo")
387
    self.assertEqual(os.listdir(self.tmpdir), ["test"])
388

    
389
    # Write again
390
    assert os.path.isfile(testfile)
391
    self.assertEqual(utils.WriteFile(testfile, data="bar", backup=True), None)
392
    self.assertEqual(utils.ReadFile(testfile), "bar")
393
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
394
    self.assertTrue("test" in os.listdir(self.tmpdir))
395
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
396

    
397
    # Write again as dry-run
398
    assert os.path.isfile(testfile)
399
    self.assertEqual(utils.WriteFile(testfile, data="000", backup=True,
400
                                     dry_run=True),
401
                     None)
402
    self.assertEqual(utils.ReadFile(testfile), "bar")
403
    self.assertEqual(len(glob.glob("%s.backup*" % testfile)), 1)
404
    self.assertTrue("test" in os.listdir(self.tmpdir))
405
    self.assertEqual(len(os.listdir(self.tmpdir)), 2)
406

    
407
  def testFileMode(self):
408
    self.tmpdir = tempfile.mkdtemp()
409
    target = utils.PathJoin(self.tmpdir, "target")
410
    self.assertRaises(OSError, utils.WriteFile, target, data="data",
411
                      keep_perms=utils.KP_ALWAYS)
412
    # All masks have only user bits set, to avoid interactions with umask
413
    utils.WriteFile(target, data="data", mode=0200)
414
    self.assertFileMode(target, 0200)
415
    utils.WriteFile(target, data="data", mode=0400,
416
                    keep_perms=utils.KP_IF_EXISTS)
417
    self.assertFileMode(target, 0200)
418
    utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
419
    self.assertFileMode(target, 0200)
420
    utils.WriteFile(target, data="data", mode=0700)
421
    self.assertFileMode(target, 0700)
422

    
423
  def testNewFileMode(self):
424
    self.tmpdir = tempfile.mkdtemp()
425
    target = utils.PathJoin(self.tmpdir, "target")
426
    utils.WriteFile(target, data="data", mode=0400,
427
                    keep_perms=utils.KP_IF_EXISTS)
428
    self.assertFileMode(target, 0400)
429

    
430
class TestFileID(testutils.GanetiTestCase):
431
  def testEquality(self):
432
    name = self._CreateTempFile()
433
    oldi = utils.GetFileID(path=name)
434
    self.failUnless(utils.VerifyFileID(oldi, oldi))
435

    
436
  def testUpdate(self):
437
    name = self._CreateTempFile()
438
    oldi = utils.GetFileID(path=name)
439
    fd = os.open(name, os.O_RDWR)
440
    try:
441
      newi = utils.GetFileID(fd=fd)
442
      self.failUnless(utils.VerifyFileID(oldi, newi))
443
      self.failUnless(utils.VerifyFileID(newi, oldi))
444
    finally:
445
      os.close(fd)
446

    
447
  def testWriteFile(self):
448
    name = self._CreateTempFile()
449
    oldi = utils.GetFileID(path=name)
450
    mtime = oldi[2]
451
    os.utime(name, (mtime + 10, mtime + 10))
452
    self.assertRaises(errors.LockError, utils.SafeWriteFile, name,
453
                      oldi, data="")
454
    os.utime(name, (mtime - 10, mtime - 10))
455
    utils.SafeWriteFile(name, oldi, data="")
456
    oldi = utils.GetFileID(path=name)
457
    mtime = oldi[2]
458
    os.utime(name, (mtime + 10, mtime + 10))
459
    # this doesn't raise, since we passed None
460
    utils.SafeWriteFile(name, None, data="")
461

    
462
  def testError(self):
463
    t = tempfile.NamedTemporaryFile()
464
    self.assertRaises(errors.ProgrammerError, utils.GetFileID,
465
                      path=t.name, fd=t.fileno())
466

    
467

    
468
class TestRemoveFile(unittest.TestCase):
469
  """Test case for the RemoveFile function"""
470

    
471
  def setUp(self):
472
    """Create a temp dir and file for each case"""
473
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
474
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
475
    os.close(fd)
476

    
477
  def tearDown(self):
478
    if os.path.exists(self.tmpfile):
479
      os.unlink(self.tmpfile)
480
    os.rmdir(self.tmpdir)
481

    
482
  def testIgnoreDirs(self):
483
    """Test that RemoveFile() ignores directories"""
484
    self.assertEqual(None, utils.RemoveFile(self.tmpdir))
485

    
486
  def testIgnoreNotExisting(self):
487
    """Test that RemoveFile() ignores non-existing files"""
488
    utils.RemoveFile(self.tmpfile)
489
    utils.RemoveFile(self.tmpfile)
490

    
491
  def testRemoveFile(self):
492
    """Test that RemoveFile does remove a file"""
493
    utils.RemoveFile(self.tmpfile)
494
    if os.path.exists(self.tmpfile):
495
      self.fail("File '%s' not removed" % self.tmpfile)
496

    
497
  def testRemoveSymlink(self):
498
    """Test that RemoveFile does remove symlinks"""
499
    symlink = self.tmpdir + "/symlink"
500
    os.symlink("no-such-file", symlink)
501
    utils.RemoveFile(symlink)
502
    if os.path.exists(symlink):
503
      self.fail("File '%s' not removed" % symlink)
504
    os.symlink(self.tmpfile, symlink)
505
    utils.RemoveFile(symlink)
506
    if os.path.exists(symlink):
507
      self.fail("File '%s' not removed" % symlink)
508

    
509

    
510
class TestRemoveDir(unittest.TestCase):
511
  def setUp(self):
512
    self.tmpdir = tempfile.mkdtemp()
513

    
514
  def tearDown(self):
515
    try:
516
      shutil.rmtree(self.tmpdir)
517
    except EnvironmentError:
518
      pass
519

    
520
  def testEmptyDir(self):
521
    utils.RemoveDir(self.tmpdir)
522
    self.assertFalse(os.path.isdir(self.tmpdir))
523

    
524
  def testNonEmptyDir(self):
525
    self.tmpfile = os.path.join(self.tmpdir, "test1")
526
    open(self.tmpfile, "w").close()
527
    self.assertRaises(EnvironmentError, utils.RemoveDir, self.tmpdir)
528

    
529

    
530
class TestRename(unittest.TestCase):
531
  """Test case for RenameFile"""
532

    
533
  def setUp(self):
534
    """Create a temporary directory"""
535
    self.tmpdir = tempfile.mkdtemp()
536
    self.tmpfile = os.path.join(self.tmpdir, "test1")
537

    
538
    # Touch the file
539
    open(self.tmpfile, "w").close()
540

    
541
  def tearDown(self):
542
    """Remove temporary directory"""
543
    shutil.rmtree(self.tmpdir)
544

    
545
  def testSimpleRename1(self):
546
    """Simple rename 1"""
547
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
548
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
549

    
550
  def testSimpleRename2(self):
551
    """Simple rename 2"""
552
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
553
                     mkdir=True)
554
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
555

    
556
  def testRenameMkdir(self):
557
    """Rename with mkdir"""
558
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
559
                     mkdir=True)
560
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
561
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
562

    
563
    self.assertRaises(EnvironmentError, utils.RenameFile,
564
                      os.path.join(self.tmpdir, "test/xyz"),
565
                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
566
                      mkdir=True)
567

    
568
    self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
569
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
570
    self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
571
                                                 "test/foo/bar/baz")))
572

    
573

    
574
class TestMakedirs(unittest.TestCase):
575
  def setUp(self):
576
    self.tmpdir = tempfile.mkdtemp()
577

    
578
  def tearDown(self):
579
    shutil.rmtree(self.tmpdir)
580

    
581
  def testNonExisting(self):
582
    path = utils.PathJoin(self.tmpdir, "foo")
583
    utils.Makedirs(path)
584
    self.assert_(os.path.isdir(path))
585

    
586
  def testExisting(self):
587
    path = utils.PathJoin(self.tmpdir, "foo")
588
    os.mkdir(path)
589
    utils.Makedirs(path)
590
    self.assert_(os.path.isdir(path))
591

    
592
  def testRecursiveNonExisting(self):
593
    path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
594
    utils.Makedirs(path)
595
    self.assert_(os.path.isdir(path))
596

    
597
  def testRecursiveExisting(self):
598
    path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
599
    self.assertFalse(os.path.exists(path))
600
    os.mkdir(utils.PathJoin(self.tmpdir, "B"))
601
    utils.Makedirs(path)
602
    self.assert_(os.path.isdir(path))
603

    
604

    
605
class TestEnsureDirs(unittest.TestCase):
606
  """Tests for EnsureDirs"""
607

    
608
  def setUp(self):
609
    self.dir = tempfile.mkdtemp()
610
    self.old_umask = os.umask(0777)
611

    
612
  def testEnsureDirs(self):
613
    utils.EnsureDirs([
614
        (utils.PathJoin(self.dir, "foo"), 0777),
615
        (utils.PathJoin(self.dir, "bar"), 0000),
616
        ])
617
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
618
    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
619

    
620
  def tearDown(self):
621
    os.rmdir(utils.PathJoin(self.dir, "foo"))
622
    os.rmdir(utils.PathJoin(self.dir, "bar"))
623
    os.rmdir(self.dir)
624
    os.umask(self.old_umask)
625

    
626

    
627
class TestIsNormAbsPath(unittest.TestCase):
628
  """Testing case for IsNormAbsPath"""
629

    
630
  def _pathTestHelper(self, path, result):
631
    if result:
632
      self.assert_(utils.IsNormAbsPath(path),
633
          "Path %s should result absolute and normalized" % path)
634
    else:
635
      self.assertFalse(utils.IsNormAbsPath(path),
636
          "Path %s should not result absolute and normalized" % path)
637

    
638
  def testBase(self):
639
    self._pathTestHelper("/etc", True)
640
    self._pathTestHelper("/srv", True)
641
    self._pathTestHelper("etc", False)
642
    self._pathTestHelper("/etc/../root", False)
643
    self._pathTestHelper("/etc/", False)
644

    
645

    
646
class TestIsBelowDir(unittest.TestCase):
647
  """Testing case for IsBelowDir"""
648

    
649
  def testSamePrefix(self):
650
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
651
    self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
652

    
653
  def testSamePrefixButDifferentDir(self):
654
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
655
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
656

    
657
  def testSamePrefixButDirTraversal(self):
658
    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
659
    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
660

    
661
  def testSamePrefixAndTraversal(self):
662
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
663
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
664
    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
665

    
666
  def testBothAbsPath(self):
667
    self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
668
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
669
    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
670

    
671

    
672
class TestPathJoin(unittest.TestCase):
673
  """Testing case for PathJoin"""
674

    
675
  def testBasicItems(self):
676
    mlist = ["/a", "b", "c"]
677
    self.failUnlessEqual(utils.PathJoin(*mlist), "/".join(mlist))
678

    
679
  def testNonAbsPrefix(self):
680
    self.failUnlessRaises(ValueError, utils.PathJoin, "a", "b")
681

    
682
  def testBackTrack(self):
683
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "b/../c")
684

    
685
  def testMultiAbs(self):
686
    self.failUnlessRaises(ValueError, utils.PathJoin, "/a", "/b")
687

    
688

    
689
class TestTailFile(testutils.GanetiTestCase):
690
  """Test case for the TailFile function"""
691

    
692
  def testEmpty(self):
693
    fname = self._CreateTempFile()
694
    self.failUnlessEqual(utils.TailFile(fname), [])
695
    self.failUnlessEqual(utils.TailFile(fname, lines=25), [])
696

    
697
  def testAllLines(self):
698
    data = ["test %d" % i for i in range(30)]
699
    for i in range(30):
700
      fname = self._CreateTempFile()
701
      fd = open(fname, "w")
702
      fd.write("\n".join(data[:i]))
703
      if i > 0:
704
        fd.write("\n")
705
      fd.close()
706
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[:i])
707

    
708
  def testPartialLines(self):
709
    data = ["test %d" % i for i in range(30)]
710
    fname = self._CreateTempFile()
711
    fd = open(fname, "w")
712
    fd.write("\n".join(data))
713
    fd.write("\n")
714
    fd.close()
715
    for i in range(1, 30):
716
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
717

    
718
  def testBigFile(self):
719
    data = ["test %d" % i for i in range(30)]
720
    fname = self._CreateTempFile()
721
    fd = open(fname, "w")
722
    fd.write("X" * 1048576)
723
    fd.write("\n")
724
    fd.write("\n".join(data))
725
    fd.write("\n")
726
    fd.close()
727
    for i in range(1, 30):
728
      self.failUnlessEqual(utils.TailFile(fname, lines=i), data[-i:])
729

    
730

    
731
class TestPidFileFunctions(unittest.TestCase):
732
  """Tests for WritePidFile and ReadPidFile"""
733

    
734
  def setUp(self):
735
    self.dir = tempfile.mkdtemp()
736
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
737

    
738
  def testPidFileFunctions(self):
739
    pid_file = self.f_dpn('test')
740
    fd = utils.WritePidFile(self.f_dpn('test'))
741
    self.failUnless(os.path.exists(pid_file),
742
                    "PID file should have been created")
743
    read_pid = utils.ReadPidFile(pid_file)
744
    self.failUnlessEqual(read_pid, os.getpid())
745
    self.failUnless(utils.IsProcessAlive(read_pid))
746
    self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
747
                          self.f_dpn('test'))
748
    os.close(fd)
749
    utils.RemoveFile(self.f_dpn("test"))
750
    self.failIf(os.path.exists(pid_file),
751
                "PID file should not exist anymore")
752
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
753
                         "ReadPidFile should return 0 for missing pid file")
754
    fh = open(pid_file, "w")
755
    fh.write("blah\n")
756
    fh.close()
757
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
758
                         "ReadPidFile should return 0 for invalid pid file")
759
    # but now, even with the file existing, we should be able to lock it
760
    fd = utils.WritePidFile(self.f_dpn('test'))
761
    os.close(fd)
762
    utils.RemoveFile(self.f_dpn("test"))
763
    self.failIf(os.path.exists(pid_file),
764
                "PID file should not exist anymore")
765

    
766
  def testKill(self):
767
    pid_file = self.f_dpn('child')
768
    r_fd, w_fd = os.pipe()
769
    new_pid = os.fork()
770
    if new_pid == 0: #child
771
      utils.WritePidFile(self.f_dpn('child'))
772
      os.write(w_fd, 'a')
773
      signal.pause()
774
      os._exit(0)
775
      return
776
    # else we are in the parent
777
    # wait until the child has written the pid file
778
    os.read(r_fd, 1)
779
    read_pid = utils.ReadPidFile(pid_file)
780
    self.failUnlessEqual(read_pid, new_pid)
781
    self.failUnless(utils.IsProcessAlive(new_pid))
782

    
783
    # Try writing to locked file
784
    try:
785
      utils.WritePidFile(pid_file)
786
    except errors.PidFileLockError, err:
787
      errmsg = str(err)
788
      self.assertTrue(errmsg.endswith(" %s" % new_pid),
789
                      msg=("Error message ('%s') didn't contain correct"
790
                           " PID (%s)" % (errmsg, new_pid)))
791
    else:
792
      self.fail("Writing to locked file didn't fail")
793

    
794
    utils.KillProcess(new_pid, waitpid=True)
795
    self.failIf(utils.IsProcessAlive(new_pid))
796
    utils.RemoveFile(self.f_dpn('child'))
797
    self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
798

    
799
  def testExceptionType(self):
800
    # Make sure the PID lock error is a subclass of LockError in case some code
801
    # depends on it
802
    self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
803

    
804
  def tearDown(self):
805
    shutil.rmtree(self.dir)
806

    
807

    
808
class TestSshKeys(testutils.GanetiTestCase):
809
  """Test case for the AddAuthorizedKey function"""
810

    
811
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
812
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
813
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
814

    
815
  def setUp(self):
816
    testutils.GanetiTestCase.setUp(self)
817
    self.tmpname = self._CreateTempFile()
818
    handle = open(self.tmpname, 'w')
819
    try:
820
      handle.write("%s\n" % TestSshKeys.KEY_A)
821
      handle.write("%s\n" % TestSshKeys.KEY_B)
822
    finally:
823
      handle.close()
824

    
825
  def testAddingNewKey(self):
826
    utils.AddAuthorizedKey(self.tmpname,
827
                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
828

    
829
    self.assertFileContent(self.tmpname,
830
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
831
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
832
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
833
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
834

    
835
  def testAddingAlmostButNotCompletelyTheSameKey(self):
836
    utils.AddAuthorizedKey(self.tmpname,
837
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
838

    
839
    self.assertFileContent(self.tmpname,
840
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
841
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
842
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
843
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
844

    
845
  def testAddingExistingKeyWithSomeMoreSpaces(self):
846
    utils.AddAuthorizedKey(self.tmpname,
847
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
848

    
849
    self.assertFileContent(self.tmpname,
850
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
851
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
852
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
853

    
854
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
855
    utils.RemoveAuthorizedKey(self.tmpname,
856
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
857

    
858
    self.assertFileContent(self.tmpname,
859
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
860
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
861

    
862
  def testRemovingNonExistingKey(self):
863
    utils.RemoveAuthorizedKey(self.tmpname,
864
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
865

    
866
    self.assertFileContent(self.tmpname,
867
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
868
      'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
869
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
870

    
871

    
872
class TestNewUUID(unittest.TestCase):
873
  """Test case for NewUUID"""
874

    
875
  def runTest(self):
876
    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
877

    
878

    
879
def _MockStatResult(cb, mode, uid, gid):
880
  def _fn(path):
881
    if cb:
882
      cb()
883
    return {
884
      stat.ST_MODE: mode,
885
      stat.ST_UID: uid,
886
      stat.ST_GID: gid,
887
      }
888
  return _fn
889

    
890

    
891
def _RaiseNoEntError():
892
  raise EnvironmentError(errno.ENOENT, "not found")
893

    
894

    
895
def _OtherStatRaise():
896
  raise EnvironmentError()
897

    
898

    
899
class TestPermissionEnforcements(unittest.TestCase):
900
  UID_A = 16024
901
  UID_B = 25850
902
  GID_A = 14028
903
  GID_B = 29801
904

    
905
  def setUp(self):
906
    self._chown_calls = []
907
    self._chmod_calls = []
908
    self._mkdir_calls = []
909

    
910
  def tearDown(self):
911
    self.assertRaises(IndexError, self._mkdir_calls.pop)
912
    self.assertRaises(IndexError, self._chmod_calls.pop)
913
    self.assertRaises(IndexError, self._chown_calls.pop)
914

    
915
  def _FakeMkdir(self, path):
916
    self._mkdir_calls.append(path)
917

    
918
  def _FakeChown(self, path, uid, gid):
919
    self._chown_calls.append((path, uid, gid))
920

    
921
  def _ChmodWrapper(self, cb):
922
    def _fn(path, mode):
923
      self._chmod_calls.append((path, mode))
924
      if cb:
925
        cb()
926
    return _fn
927

    
928
  def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
929
    self.assertEqual(path, "/ganeti-qa-non-test")
930
    self.assertEqual(mode, 0700)
931
    self.assertEqual(uid, self.UID_A)
932
    self.assertEqual(gid, self.GID_A)
933

    
934
  def testMakeDirWithPerm(self):
935
    is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
936
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
937
                          _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
938

    
939
  def testDirErrors(self):
940
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
941
                      "/ganeti-qa-non-test", 0700, 0, 0,
942
                      _lstat_fn=_MockStatResult(None, 0, 0, 0))
943
    self.assertRaises(IndexError, self._mkdir_calls.pop)
944

    
945
    other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
946
    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
947
                      "/ganeti-qa-non-test", 0700, 0, 0,
948
                      _lstat_fn=other_stat_raise)
949
    self.assertRaises(IndexError, self._mkdir_calls.pop)
950

    
951
    non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
952
    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
953
                          _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
954
                          _perm_fn=self._VerifyPerm)
955
    self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
956

    
957
  def testEnforcePermissionNoEnt(self):
958
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
959
                      "/ganeti-qa-non-test", 0600,
960
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
961
                      _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
962

    
963
  def testEnforcePermissionNoEntMustNotExist(self):
964
    utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
965
                            _chmod_fn=NotImplemented,
966
                            _chown_fn=NotImplemented,
967
                            _stat_fn=_MockStatResult(_RaiseNoEntError,
968
                                                          0, 0, 0))
969

    
970
  def testEnforcePermissionOtherErrorMustNotExist(self):
971
    self.assertRaises(errors.GenericError, utils.EnforcePermission,
972
                      "/ganeti-qa-non-test", 0600, must_exist=False,
973
                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
974
                      _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
975

    
976
  def testEnforcePermissionNoChanges(self):
977
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
978
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
979
                            _chmod_fn=self._ChmodWrapper(None),
980
                            _chown_fn=self._FakeChown)
981

    
982
  def testEnforcePermissionChangeMode(self):
983
    utils.EnforcePermission("/ganeti-qa-non-test", 0444,
984
                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
985
                            _chmod_fn=self._ChmodWrapper(None),
986
                            _chown_fn=self._FakeChown)
987
    self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
988

    
989
  def testEnforcePermissionSetUidGid(self):
990
    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
991
                            uid=self.UID_B, gid=self.GID_B,
992
                            _stat_fn=_MockStatResult(None, 0600,
993
                                                     self.UID_A,
994
                                                     self.GID_A),
995
                            _chmod_fn=self._ChmodWrapper(None),
996
                            _chown_fn=self._FakeChown)
997
    self.assertEqual(self._chown_calls.pop(0),
998
                     ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
999

    
1000

    
1001
if __name__ == "__main__":
1002
  testutils.GanetiTestProgram()