Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ b4478d34

History | View | Annotate | Download (46 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import stat
31
import md5
32
import signal
33
import socket
34
import shutil
35
import re
36
import select
37
import string
38

    
39
import ganeti
40
import testutils
41
from ganeti import constants
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti.utils import IsProcessAlive, RunCmd, \
45
     RemoveFile, MatchNameComponent, FormatUnit, \
46
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
47
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
48
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
49
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
50
     UnescapeAndSplit, RunParts, PathJoin, HostInfo
51

    
52
from ganeti.errors import LockError, UnitParseError, GenericError, \
53
     ProgrammerError, OpPrereqError
54

    
55

    
56
class TestIsProcessAlive(unittest.TestCase):
57
  """Testing case for IsProcessAlive"""
58

    
59
  def testExists(self):
60
    mypid = os.getpid()
61
    self.assert_(IsProcessAlive(mypid),
62
                 "can't find myself running")
63

    
64
  def testNotExisting(self):
65
    pid_non_existing = os.fork()
66
    if pid_non_existing == 0:
67
      os._exit(0)
68
    elif pid_non_existing < 0:
69
      raise SystemError("can't fork")
70
    os.waitpid(pid_non_existing, 0)
71
    self.assert_(not IsProcessAlive(pid_non_existing),
72
                 "nonexisting process detected")
73

    
74

    
75
class TestPidFileFunctions(unittest.TestCase):
76
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
77

    
78
  def setUp(self):
79
    self.dir = tempfile.mkdtemp()
80
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
81
    utils.DaemonPidFileName = self.f_dpn
82

    
83
  def testPidFileFunctions(self):
84
    pid_file = self.f_dpn('test')
85
    utils.WritePidFile('test')
86
    self.failUnless(os.path.exists(pid_file),
87
                    "PID file should have been created")
88
    read_pid = utils.ReadPidFile(pid_file)
89
    self.failUnlessEqual(read_pid, os.getpid())
90
    self.failUnless(utils.IsProcessAlive(read_pid))
91
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
92
    utils.RemovePidFile('test')
93
    self.failIf(os.path.exists(pid_file),
94
                "PID file should not exist anymore")
95
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
96
                         "ReadPidFile should return 0 for missing pid file")
97
    fh = open(pid_file, "w")
98
    fh.write("blah\n")
99
    fh.close()
100
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
101
                         "ReadPidFile should return 0 for invalid pid file")
102
    utils.RemovePidFile('test')
103
    self.failIf(os.path.exists(pid_file),
104
                "PID file should not exist anymore")
105

    
106
  def testKill(self):
107
    pid_file = self.f_dpn('child')
108
    r_fd, w_fd = os.pipe()
109
    new_pid = os.fork()
110
    if new_pid == 0: #child
111
      utils.WritePidFile('child')
112
      os.write(w_fd, 'a')
113
      signal.pause()
114
      os._exit(0)
115
      return
116
    # else we are in the parent
117
    # wait until the child has written the pid file
118
    os.read(r_fd, 1)
119
    read_pid = utils.ReadPidFile(pid_file)
120
    self.failUnlessEqual(read_pid, new_pid)
121
    self.failUnless(utils.IsProcessAlive(new_pid))
122
    utils.KillProcess(new_pid, waitpid=True)
123
    self.failIf(utils.IsProcessAlive(new_pid))
124
    utils.RemovePidFile('child')
125
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
126

    
127
  def tearDown(self):
128
    for name in os.listdir(self.dir):
129
      os.unlink(os.path.join(self.dir, name))
130
    os.rmdir(self.dir)
131

    
132

    
133
class TestRunCmd(testutils.GanetiTestCase):
134
  """Testing case for the RunCmd function"""
135

    
136
  def setUp(self):
137
    testutils.GanetiTestCase.setUp(self)
138
    self.magic = time.ctime() + " ganeti test"
139
    self.fname = self._CreateTempFile()
140

    
141
  def testOk(self):
142
    """Test successful exit code"""
143
    result = RunCmd("/bin/sh -c 'exit 0'")
144
    self.assertEqual(result.exit_code, 0)
145
    self.assertEqual(result.output, "")
146

    
147
  def testFail(self):
148
    """Test fail exit code"""
149
    result = RunCmd("/bin/sh -c 'exit 1'")
150
    self.assertEqual(result.exit_code, 1)
151
    self.assertEqual(result.output, "")
152

    
153
  def testStdout(self):
154
    """Test standard output"""
155
    cmd = 'echo -n "%s"' % self.magic
156
    result = RunCmd("/bin/sh -c '%s'" % cmd)
157
    self.assertEqual(result.stdout, self.magic)
158
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
159
    self.assertEqual(result.output, "")
160
    self.assertFileContent(self.fname, self.magic)
161

    
162
  def testStderr(self):
163
    """Test standard error"""
164
    cmd = 'echo -n "%s"' % self.magic
165
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
166
    self.assertEqual(result.stderr, self.magic)
167
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
168
    self.assertEqual(result.output, "")
169
    self.assertFileContent(self.fname, self.magic)
170

    
171
  def testCombined(self):
172
    """Test combined output"""
173
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
174
    expected = "A" + self.magic + "B" + self.magic
175
    result = RunCmd("/bin/sh -c '%s'" % cmd)
176
    self.assertEqual(result.output, expected)
177
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
178
    self.assertEqual(result.output, "")
179
    self.assertFileContent(self.fname, expected)
180

    
181
  def testSignal(self):
182
    """Test signal"""
183
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
184
    self.assertEqual(result.signal, 15)
185
    self.assertEqual(result.output, "")
186

    
187
  def testListRun(self):
188
    """Test list runs"""
189
    result = RunCmd(["true"])
190
    self.assertEqual(result.signal, None)
191
    self.assertEqual(result.exit_code, 0)
192
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
193
    self.assertEqual(result.signal, None)
194
    self.assertEqual(result.exit_code, 1)
195
    result = RunCmd(["echo", "-n", self.magic])
196
    self.assertEqual(result.signal, None)
197
    self.assertEqual(result.exit_code, 0)
198
    self.assertEqual(result.stdout, self.magic)
199

    
200
  def testFileEmptyOutput(self):
201
    """Test file output"""
202
    result = RunCmd(["true"], output=self.fname)
203
    self.assertEqual(result.signal, None)
204
    self.assertEqual(result.exit_code, 0)
205
    self.assertFileContent(self.fname, "")
206

    
207
  def testLang(self):
208
    """Test locale environment"""
209
    old_env = os.environ.copy()
210
    try:
211
      os.environ["LANG"] = "en_US.UTF-8"
212
      os.environ["LC_ALL"] = "en_US.UTF-8"
213
      result = RunCmd(["locale"])
214
      for line in result.output.splitlines():
215
        key, value = line.split("=", 1)
216
        # Ignore these variables, they're overridden by LC_ALL
217
        if key == "LANG" or key == "LANGUAGE":
218
          continue
219
        self.failIf(value and value != "C" and value != '"C"',
220
            "Variable %s is set to the invalid value '%s'" % (key, value))
221
    finally:
222
      os.environ = old_env
223

    
224
  def testDefaultCwd(self):
225
    """Test default working directory"""
226
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
227

    
228
  def testCwd(self):
229
    """Test default working directory"""
230
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
232
    cwd = os.getcwd()
233
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
234

    
235
  def testResetEnv(self):
236
    """Test environment reset functionality"""
237
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
238

    
239

    
240
class TestRunParts(unittest.TestCase):
241
  """Testing case for the RunParts function"""
242

    
243
  def setUp(self):
244
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
245

    
246
  def tearDown(self):
247
    shutil.rmtree(self.rundir)
248

    
249
  def testEmpty(self):
250
    """Test on an empty dir"""
251
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
252

    
253
  def testSkipWrongName(self):
254
    """Test that wrong files are skipped"""
255
    fname = os.path.join(self.rundir, "00test.dot")
256
    utils.WriteFile(fname, data="")
257
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
258
    relname = os.path.basename(fname)
259
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
260
                         [(relname, constants.RUNPARTS_SKIP, None)])
261

    
262
  def testSkipNonExec(self):
263
    """Test that non executable files are skipped"""
264
    fname = os.path.join(self.rundir, "00test")
265
    utils.WriteFile(fname, data="")
266
    relname = os.path.basename(fname)
267
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
268
                         [(relname, constants.RUNPARTS_SKIP, None)])
269

    
270
  def testError(self):
271
    """Test error on a broken executable"""
272
    fname = os.path.join(self.rundir, "00test")
273
    utils.WriteFile(fname, data="")
274
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
275
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
276
    self.failUnlessEqual(relname, os.path.basename(fname))
277
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
278
    self.failUnless(error)
279

    
280
  def testSorted(self):
281
    """Test executions are sorted"""
282
    files = []
283
    files.append(os.path.join(self.rundir, "64test"))
284
    files.append(os.path.join(self.rundir, "00test"))
285
    files.append(os.path.join(self.rundir, "42test"))
286

    
287
    for fname in files:
288
      utils.WriteFile(fname, data="")
289

    
290
    results = RunParts(self.rundir, reset_env=True)
291

    
292
    for fname in sorted(files):
293
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
294

    
295
  def testOk(self):
296
    """Test correct execution"""
297
    fname = os.path.join(self.rundir, "00test")
298
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
299
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
300
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
301
    self.failUnlessEqual(relname, os.path.basename(fname))
302
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
303
    self.failUnlessEqual(runresult.stdout, "ciao")
304

    
305
  def testRunFail(self):
306
    """Test correct execution, with run failure"""
307
    fname = os.path.join(self.rundir, "00test")
308
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
309
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
310
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
311
    self.failUnlessEqual(relname, os.path.basename(fname))
312
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
313
    self.failUnlessEqual(runresult.exit_code, 1)
314
    self.failUnless(runresult.failed)
315

    
316
  def testRunMix(self):
317
    files = []
318
    files.append(os.path.join(self.rundir, "00test"))
319
    files.append(os.path.join(self.rundir, "42test"))
320
    files.append(os.path.join(self.rundir, "64test"))
321
    files.append(os.path.join(self.rundir, "99test"))
322

    
323
    files.sort()
324

    
325
    # 1st has errors in execution
326
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
327
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
328

    
329
    # 2nd is skipped
330
    utils.WriteFile(files[1], data="")
331

    
332
    # 3rd cannot execute properly
333
    utils.WriteFile(files[2], data="")
334
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
335

    
336
    # 4th execs
337
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
338
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
339

    
340
    results = RunParts(self.rundir, reset_env=True)
341

    
342
    (relname, status, runresult) = results[0]
343
    self.failUnlessEqual(relname, os.path.basename(files[0]))
344
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
345
    self.failUnlessEqual(runresult.exit_code, 1)
346
    self.failUnless(runresult.failed)
347

    
348
    (relname, status, runresult) = results[1]
349
    self.failUnlessEqual(relname, os.path.basename(files[1]))
350
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
351
    self.failUnlessEqual(runresult, None)
352

    
353
    (relname, status, runresult) = results[2]
354
    self.failUnlessEqual(relname, os.path.basename(files[2]))
355
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
356
    self.failUnless(runresult)
357

    
358
    (relname, status, runresult) = results[3]
359
    self.failUnlessEqual(relname, os.path.basename(files[3]))
360
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
361
    self.failUnlessEqual(runresult.output, "ciao")
362
    self.failUnlessEqual(runresult.exit_code, 0)
363
    self.failUnless(not runresult.failed)
364

    
365

    
366
class TestRemoveFile(unittest.TestCase):
367
  """Test case for the RemoveFile function"""
368

    
369
  def setUp(self):
370
    """Create a temp dir and file for each case"""
371
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
372
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
373
    os.close(fd)
374

    
375
  def tearDown(self):
376
    if os.path.exists(self.tmpfile):
377
      os.unlink(self.tmpfile)
378
    os.rmdir(self.tmpdir)
379

    
380

    
381
  def testIgnoreDirs(self):
382
    """Test that RemoveFile() ignores directories"""
383
    self.assertEqual(None, RemoveFile(self.tmpdir))
384

    
385

    
386
  def testIgnoreNotExisting(self):
387
    """Test that RemoveFile() ignores non-existing files"""
388
    RemoveFile(self.tmpfile)
389
    RemoveFile(self.tmpfile)
390

    
391

    
392
  def testRemoveFile(self):
393
    """Test that RemoveFile does remove a file"""
394
    RemoveFile(self.tmpfile)
395
    if os.path.exists(self.tmpfile):
396
      self.fail("File '%s' not removed" % self.tmpfile)
397

    
398

    
399
  def testRemoveSymlink(self):
400
    """Test that RemoveFile does remove symlinks"""
401
    symlink = self.tmpdir + "/symlink"
402
    os.symlink("no-such-file", symlink)
403
    RemoveFile(symlink)
404
    if os.path.exists(symlink):
405
      self.fail("File '%s' not removed" % symlink)
406
    os.symlink(self.tmpfile, symlink)
407
    RemoveFile(symlink)
408
    if os.path.exists(symlink):
409
      self.fail("File '%s' not removed" % symlink)
410

    
411

    
412
class TestRename(unittest.TestCase):
413
  """Test case for RenameFile"""
414

    
415
  def setUp(self):
416
    """Create a temporary directory"""
417
    self.tmpdir = tempfile.mkdtemp()
418
    self.tmpfile = os.path.join(self.tmpdir, "test1")
419

    
420
    # Touch the file
421
    open(self.tmpfile, "w").close()
422

    
423
  def tearDown(self):
424
    """Remove temporary directory"""
425
    shutil.rmtree(self.tmpdir)
426

    
427
  def testSimpleRename1(self):
428
    """Simple rename 1"""
429
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
430
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
431

    
432
  def testSimpleRename2(self):
433
    """Simple rename 2"""
434
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
435
                     mkdir=True)
436
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
437

    
438
  def testRenameMkdir(self):
439
    """Rename with mkdir"""
440
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
441
                     mkdir=True)
442
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
443
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
444

    
445
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
446
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
447
                     mkdir=True)
448
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
449
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
450
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
451

    
452

    
453
class TestMatchNameComponent(unittest.TestCase):
454
  """Test case for the MatchNameComponent function"""
455

    
456
  def testEmptyList(self):
457
    """Test that there is no match against an empty list"""
458

    
459
    self.failUnlessEqual(MatchNameComponent("", []), None)
460
    self.failUnlessEqual(MatchNameComponent("test", []), None)
461

    
462
  def testSingleMatch(self):
463
    """Test that a single match is performed correctly"""
464
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
465
    for key in "test2", "test2.example", "test2.example.com":
466
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
467

    
468
  def testMultipleMatches(self):
469
    """Test that a multiple match is returned as None"""
470
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
471
    for key in "test1", "test1.example":
472
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
473

    
474
  def testFullMatch(self):
475
    """Test that a full match is returned correctly"""
476
    key1 = "test1"
477
    key2 = "test1.example"
478
    mlist = [key2, key2 + ".com"]
479
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
480
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
481

    
482
  def testCaseInsensitivePartialMatch(self):
483
    """Test for the case_insensitive keyword"""
484
    mlist = ["test1.example.com", "test2.example.net"]
485
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
486
                     "test2.example.net")
487
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
488
                     "test2.example.net")
489
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
490
                     "test2.example.net")
491
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
492
                     "test2.example.net")
493

    
494

    
495
  def testCaseInsensitiveFullMatch(self):
496
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
497
    # Between the two ts1 a full string match non-case insensitive should work
498
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
499
                     None)
500
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
501
                     "ts1.ex")
502
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
503
                     "ts1.ex")
504
    # Between the two ts2 only case differs, so only case-match works
505
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
506
                     "ts2.ex")
507
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
508
                     "Ts2.ex")
509
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
510
                     None)
511

    
512

    
513
class TestFormatUnit(unittest.TestCase):
514
  """Test case for the FormatUnit function"""
515

    
516
  def testMiB(self):
517
    self.assertEqual(FormatUnit(1, 'h'), '1M')
518
    self.assertEqual(FormatUnit(100, 'h'), '100M')
519
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
520

    
521
    self.assertEqual(FormatUnit(1, 'm'), '1')
522
    self.assertEqual(FormatUnit(100, 'm'), '100')
523
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
524

    
525
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
526
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
527
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
528
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
529

    
530
  def testGiB(self):
531
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
532
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
533
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
534
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
535

    
536
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
537
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
538
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
539
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
540

    
541
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
542
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
543
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
544

    
545
  def testTiB(self):
546
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
547
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
548
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
549

    
550
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
551
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
552
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
553

    
554
class TestParseUnit(unittest.TestCase):
555
  """Test case for the ParseUnit function"""
556

    
557
  SCALES = (('', 1),
558
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
559
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
560
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
561

    
562
  def testRounding(self):
563
    self.assertEqual(ParseUnit('0'), 0)
564
    self.assertEqual(ParseUnit('1'), 4)
565
    self.assertEqual(ParseUnit('2'), 4)
566
    self.assertEqual(ParseUnit('3'), 4)
567

    
568
    self.assertEqual(ParseUnit('124'), 124)
569
    self.assertEqual(ParseUnit('125'), 128)
570
    self.assertEqual(ParseUnit('126'), 128)
571
    self.assertEqual(ParseUnit('127'), 128)
572
    self.assertEqual(ParseUnit('128'), 128)
573
    self.assertEqual(ParseUnit('129'), 132)
574
    self.assertEqual(ParseUnit('130'), 132)
575

    
576
  def testFloating(self):
577
    self.assertEqual(ParseUnit('0'), 0)
578
    self.assertEqual(ParseUnit('0.5'), 4)
579
    self.assertEqual(ParseUnit('1.75'), 4)
580
    self.assertEqual(ParseUnit('1.99'), 4)
581
    self.assertEqual(ParseUnit('2.00'), 4)
582
    self.assertEqual(ParseUnit('2.01'), 4)
583
    self.assertEqual(ParseUnit('3.99'), 4)
584
    self.assertEqual(ParseUnit('4.00'), 4)
585
    self.assertEqual(ParseUnit('4.01'), 8)
586
    self.assertEqual(ParseUnit('1.5G'), 1536)
587
    self.assertEqual(ParseUnit('1.8G'), 1844)
588
    self.assertEqual(ParseUnit('8.28T'), 8682212)
589

    
590
  def testSuffixes(self):
591
    for sep in ('', ' ', '   ', "\t", "\t "):
592
      for suffix, scale in TestParseUnit.SCALES:
593
        for func in (lambda x: x, str.lower, str.upper):
594
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
595
                           1024 * scale)
596

    
597
  def testInvalidInput(self):
598
    for sep in ('-', '_', ',', 'a'):
599
      for suffix, _ in TestParseUnit.SCALES:
600
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
601

    
602
    for suffix, _ in TestParseUnit.SCALES:
603
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
604

    
605

    
606
class TestSshKeys(testutils.GanetiTestCase):
607
  """Test case for the AddAuthorizedKey function"""
608

    
609
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
610
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
611
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
612

    
613
  def setUp(self):
614
    testutils.GanetiTestCase.setUp(self)
615
    self.tmpname = self._CreateTempFile()
616
    handle = open(self.tmpname, 'w')
617
    try:
618
      handle.write("%s\n" % TestSshKeys.KEY_A)
619
      handle.write("%s\n" % TestSshKeys.KEY_B)
620
    finally:
621
      handle.close()
622

    
623
  def testAddingNewKey(self):
624
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
625

    
626
    self.assertFileContent(self.tmpname,
627
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
628
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
629
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
630
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
631

    
632
  def testAddingAlmostButNotCompletelyTheSameKey(self):
633
    AddAuthorizedKey(self.tmpname,
634
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
635

    
636
    self.assertFileContent(self.tmpname,
637
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
638
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
639
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
640
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
641

    
642
  def testAddingExistingKeyWithSomeMoreSpaces(self):
643
    AddAuthorizedKey(self.tmpname,
644
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
645

    
646
    self.assertFileContent(self.tmpname,
647
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
648
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
649
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
650

    
651
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
652
    RemoveAuthorizedKey(self.tmpname,
653
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
654

    
655
    self.assertFileContent(self.tmpname,
656
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
657
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
658

    
659
  def testRemovingNonExistingKey(self):
660
    RemoveAuthorizedKey(self.tmpname,
661
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
662

    
663
    self.assertFileContent(self.tmpname,
664
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
665
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
666
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
667

    
668

    
669
class TestEtcHosts(testutils.GanetiTestCase):
670
  """Test functions modifying /etc/hosts"""
671

    
672
  def setUp(self):
673
    testutils.GanetiTestCase.setUp(self)
674
    self.tmpname = self._CreateTempFile()
675
    handle = open(self.tmpname, 'w')
676
    try:
677
      handle.write('# This is a test file for /etc/hosts\n')
678
      handle.write('127.0.0.1\tlocalhost\n')
679
      handle.write('192.168.1.1 router gw\n')
680
    finally:
681
      handle.close()
682

    
683
  def testSettingNewIp(self):
684
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
685

    
686
    self.assertFileContent(self.tmpname,
687
      "# This is a test file for /etc/hosts\n"
688
      "127.0.0.1\tlocalhost\n"
689
      "192.168.1.1 router gw\n"
690
      "1.2.3.4\tmyhost.domain.tld myhost\n")
691
    self.assertFileMode(self.tmpname, 0644)
692

    
693
  def testSettingExistingIp(self):
694
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
695
                     ['myhost'])
696

    
697
    self.assertFileContent(self.tmpname,
698
      "# This is a test file for /etc/hosts\n"
699
      "127.0.0.1\tlocalhost\n"
700
      "192.168.1.1\tmyhost.domain.tld myhost\n")
701
    self.assertFileMode(self.tmpname, 0644)
702

    
703
  def testSettingDuplicateName(self):
704
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
705

    
706
    self.assertFileContent(self.tmpname,
707
      "# This is a test file for /etc/hosts\n"
708
      "127.0.0.1\tlocalhost\n"
709
      "192.168.1.1 router gw\n"
710
      "1.2.3.4\tmyhost\n")
711
    self.assertFileMode(self.tmpname, 0644)
712

    
713
  def testRemovingExistingHost(self):
714
    RemoveEtcHostsEntry(self.tmpname, 'router')
715

    
716
    self.assertFileContent(self.tmpname,
717
      "# This is a test file for /etc/hosts\n"
718
      "127.0.0.1\tlocalhost\n"
719
      "192.168.1.1 gw\n")
720
    self.assertFileMode(self.tmpname, 0644)
721

    
722
  def testRemovingSingleExistingHost(self):
723
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
724

    
725
    self.assertFileContent(self.tmpname,
726
      "# This is a test file for /etc/hosts\n"
727
      "192.168.1.1 router gw\n")
728
    self.assertFileMode(self.tmpname, 0644)
729

    
730
  def testRemovingNonExistingHost(self):
731
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
732

    
733
    self.assertFileContent(self.tmpname,
734
      "# This is a test file for /etc/hosts\n"
735
      "127.0.0.1\tlocalhost\n"
736
      "192.168.1.1 router gw\n")
737
    self.assertFileMode(self.tmpname, 0644)
738

    
739
  def testRemovingAlias(self):
740
    RemoveEtcHostsEntry(self.tmpname, 'gw')
741

    
742
    self.assertFileContent(self.tmpname,
743
      "# This is a test file for /etc/hosts\n"
744
      "127.0.0.1\tlocalhost\n"
745
      "192.168.1.1 router\n")
746
    self.assertFileMode(self.tmpname, 0644)
747

    
748

    
749
class TestShellQuoting(unittest.TestCase):
750
  """Test case for shell quoting functions"""
751

    
752
  def testShellQuote(self):
753
    self.assertEqual(ShellQuote('abc'), "abc")
754
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
755
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
756
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
757
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
758

    
759
  def testShellQuoteArgs(self):
760
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
761
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
762
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
763

    
764

    
765
class TestTcpPing(unittest.TestCase):
766
  """Testcase for TCP version of ping - against listen(2)ing port"""
767

    
768
  def setUp(self):
769
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
770
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
771
    self.listenerport = self.listener.getsockname()[1]
772
    self.listener.listen(1)
773

    
774
  def tearDown(self):
775
    self.listener.shutdown(socket.SHUT_RDWR)
776
    del self.listener
777
    del self.listenerport
778

    
779
  def testTcpPingToLocalHostAccept(self):
780
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
781
                         self.listenerport,
782
                         timeout=10,
783
                         live_port_needed=True,
784
                         source=constants.LOCALHOST_IP_ADDRESS,
785
                         ),
786
                 "failed to connect to test listener")
787

    
788
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
789
                         self.listenerport,
790
                         timeout=10,
791
                         live_port_needed=True,
792
                         ),
793
                 "failed to connect to test listener (no source)")
794

    
795

    
796
class TestTcpPingDeaf(unittest.TestCase):
797
  """Testcase for TCP version of ping - against non listen(2)ing port"""
798

    
799
  def setUp(self):
800
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
801
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
802
    self.deaflistenerport = self.deaflistener.getsockname()[1]
803

    
804
  def tearDown(self):
805
    del self.deaflistener
806
    del self.deaflistenerport
807

    
808
  def testTcpPingToLocalHostAcceptDeaf(self):
809
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
810
                        self.deaflistenerport,
811
                        timeout=constants.TCP_PING_TIMEOUT,
812
                        live_port_needed=True,
813
                        source=constants.LOCALHOST_IP_ADDRESS,
814
                        ), # need successful connect(2)
815
                "successfully connected to deaf listener")
816

    
817
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
818
                        self.deaflistenerport,
819
                        timeout=constants.TCP_PING_TIMEOUT,
820
                        live_port_needed=True,
821
                        ), # need successful connect(2)
822
                "successfully connected to deaf listener (no source addr)")
823

    
824
  def testTcpPingToLocalHostNoAccept(self):
825
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
826
                         self.deaflistenerport,
827
                         timeout=constants.TCP_PING_TIMEOUT,
828
                         live_port_needed=False,
829
                         source=constants.LOCALHOST_IP_ADDRESS,
830
                         ), # ECONNREFUSED is OK
831
                 "failed to ping alive host on deaf port")
832

    
833
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
834
                         self.deaflistenerport,
835
                         timeout=constants.TCP_PING_TIMEOUT,
836
                         live_port_needed=False,
837
                         ), # ECONNREFUSED is OK
838
                 "failed to ping alive host on deaf port (no source addr)")
839

    
840

    
841
class TestOwnIpAddress(unittest.TestCase):
842
  """Testcase for OwnIpAddress"""
843

    
844
  def testOwnLoopback(self):
845
    """check having the loopback ip"""
846
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
847
                    "Should own the loopback address")
848

    
849
  def testNowOwnAddress(self):
850
    """check that I don't own an address"""
851

    
852
    # network 192.0.2.0/24 is reserved for test/documentation as per
853
    # rfc 3330, so we *should* not have an address of this range... if
854
    # this fails, we should extend the test to multiple addresses
855
    DST_IP = "192.0.2.1"
856
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
857

    
858

    
859
class TestListVisibleFiles(unittest.TestCase):
860
  """Test case for ListVisibleFiles"""
861

    
862
  def setUp(self):
863
    self.path = tempfile.mkdtemp()
864

    
865
  def tearDown(self):
866
    shutil.rmtree(self.path)
867

    
868
  def _test(self, files, expected):
869
    # Sort a copy
870
    expected = expected[:]
871
    expected.sort()
872

    
873
    for name in files:
874
      f = open(os.path.join(self.path, name), 'w')
875
      try:
876
        f.write("Test\n")
877
      finally:
878
        f.close()
879

    
880
    found = ListVisibleFiles(self.path)
881
    found.sort()
882

    
883
    self.assertEqual(found, expected)
884

    
885
  def testAllVisible(self):
886
    files = ["a", "b", "c"]
887
    expected = files
888
    self._test(files, expected)
889

    
890
  def testNoneVisible(self):
891
    files = [".a", ".b", ".c"]
892
    expected = []
893
    self._test(files, expected)
894

    
895
  def testSomeVisible(self):
896
    files = ["a", "b", ".c"]
897
    expected = ["a", "b"]
898
    self._test(files, expected)
899

    
900
  def testNonAbsolutePath(self):
901
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
902

    
903
  def testNonNormalizedPath(self):
904
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
905
                          "/bin/../tmp")
906

    
907

    
908
class TestNewUUID(unittest.TestCase):
909
  """Test case for NewUUID"""
910

    
911
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
912
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
913

    
914
  def runTest(self):
915
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
916

    
917

    
918
class TestUniqueSequence(unittest.TestCase):
919
  """Test case for UniqueSequence"""
920

    
921
  def _test(self, input, expected):
922
    self.assertEqual(utils.UniqueSequence(input), expected)
923

    
924
  def runTest(self):
925
    # Ordered input
926
    self._test([1, 2, 3], [1, 2, 3])
927
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
928
    self._test([1, 2, 2, 3], [1, 2, 3])
929
    self._test([1, 2, 3, 3], [1, 2, 3])
930

    
931
    # Unordered input
932
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
933
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
934

    
935
    # Strings
936
    self._test(["a", "a"], ["a"])
937
    self._test(["a", "b"], ["a", "b"])
938
    self._test(["a", "b", "a"], ["a", "b"])
939

    
940

    
941
class TestFirstFree(unittest.TestCase):
942
  """Test case for the FirstFree function"""
943

    
944
  def test(self):
945
    """Test FirstFree"""
946
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
947
    self.failUnlessEqual(FirstFree([]), None)
948
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
949
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
950
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
951

    
952

    
953
class TestTailFile(testutils.GanetiTestCase):
954
  """Test case for the TailFile function"""
955

    
956
  def testEmpty(self):
957
    fname = self._CreateTempFile()
958
    self.failUnlessEqual(TailFile(fname), [])
959
    self.failUnlessEqual(TailFile(fname, lines=25), [])
960

    
961
  def testAllLines(self):
962
    data = ["test %d" % i for i in range(30)]
963
    for i in range(30):
964
      fname = self._CreateTempFile()
965
      fd = open(fname, "w")
966
      fd.write("\n".join(data[:i]))
967
      if i > 0:
968
        fd.write("\n")
969
      fd.close()
970
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
971

    
972
  def testPartialLines(self):
973
    data = ["test %d" % i for i in range(30)]
974
    fname = self._CreateTempFile()
975
    fd = open(fname, "w")
976
    fd.write("\n".join(data))
977
    fd.write("\n")
978
    fd.close()
979
    for i in range(1, 30):
980
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
981

    
982
  def testBigFile(self):
983
    data = ["test %d" % i for i in range(30)]
984
    fname = self._CreateTempFile()
985
    fd = open(fname, "w")
986
    fd.write("X" * 1048576)
987
    fd.write("\n")
988
    fd.write("\n".join(data))
989
    fd.write("\n")
990
    fd.close()
991
    for i in range(1, 30):
992
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
993

    
994

    
995
class _BaseFileLockTest:
996
  """Test case for the FileLock class"""
997

    
998
  def testSharedNonblocking(self):
999
    self.lock.Shared(blocking=False)
1000
    self.lock.Close()
1001

    
1002
  def testExclusiveNonblocking(self):
1003
    self.lock.Exclusive(blocking=False)
1004
    self.lock.Close()
1005

    
1006
  def testUnlockNonblocking(self):
1007
    self.lock.Unlock(blocking=False)
1008
    self.lock.Close()
1009

    
1010
  def testSharedBlocking(self):
1011
    self.lock.Shared(blocking=True)
1012
    self.lock.Close()
1013

    
1014
  def testExclusiveBlocking(self):
1015
    self.lock.Exclusive(blocking=True)
1016
    self.lock.Close()
1017

    
1018
  def testUnlockBlocking(self):
1019
    self.lock.Unlock(blocking=True)
1020
    self.lock.Close()
1021

    
1022
  def testSharedExclusiveUnlock(self):
1023
    self.lock.Shared(blocking=False)
1024
    self.lock.Exclusive(blocking=False)
1025
    self.lock.Unlock(blocking=False)
1026
    self.lock.Close()
1027

    
1028
  def testExclusiveSharedUnlock(self):
1029
    self.lock.Exclusive(blocking=False)
1030
    self.lock.Shared(blocking=False)
1031
    self.lock.Unlock(blocking=False)
1032
    self.lock.Close()
1033

    
1034
  def testSimpleTimeout(self):
1035
    # These will succeed on the first attempt, hence a short timeout
1036
    self.lock.Shared(blocking=True, timeout=10.0)
1037
    self.lock.Exclusive(blocking=False, timeout=10.0)
1038
    self.lock.Unlock(blocking=True, timeout=10.0)
1039
    self.lock.Close()
1040

    
1041
  @staticmethod
1042
  def _TryLockInner(filename, shared, blocking):
1043
    lock = utils.FileLock.Open(filename)
1044

    
1045
    if shared:
1046
      fn = lock.Shared
1047
    else:
1048
      fn = lock.Exclusive
1049

    
1050
    try:
1051
      # The timeout doesn't really matter as the parent process waits for us to
1052
      # finish anyway.
1053
      fn(blocking=blocking, timeout=0.01)
1054
    except errors.LockError, err:
1055
      return False
1056

    
1057
    return True
1058

    
1059
  def _TryLock(self, *args):
1060
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1061
                                      *args)
1062

    
1063
  def testTimeout(self):
1064
    for blocking in [True, False]:
1065
      self.lock.Exclusive(blocking=True)
1066
      self.failIf(self._TryLock(False, blocking))
1067
      self.failIf(self._TryLock(True, blocking))
1068

    
1069
      self.lock.Shared(blocking=True)
1070
      self.assert_(self._TryLock(True, blocking))
1071
      self.failIf(self._TryLock(False, blocking))
1072

    
1073
  def testCloseShared(self):
1074
    self.lock.Close()
1075
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1076

    
1077
  def testCloseExclusive(self):
1078
    self.lock.Close()
1079
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1080

    
1081
  def testCloseUnlock(self):
1082
    self.lock.Close()
1083
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1084

    
1085

    
1086
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1087
  TESTDATA = "Hello World\n" * 10
1088

    
1089
  def setUp(self):
1090
    testutils.GanetiTestCase.setUp(self)
1091

    
1092
    self.tmpfile = tempfile.NamedTemporaryFile()
1093
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1094
    self.lock = utils.FileLock.Open(self.tmpfile.name)
1095

    
1096
    # Ensure "Open" didn't truncate file
1097
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1098

    
1099
  def tearDown(self):
1100
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1101

    
1102
    testutils.GanetiTestCase.tearDown(self)
1103

    
1104

    
1105
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1106
  def setUp(self):
1107
    self.tmpfile = tempfile.NamedTemporaryFile()
1108
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1109

    
1110

    
1111
class TestTimeFunctions(unittest.TestCase):
1112
  """Test case for time functions"""
1113

    
1114
  def runTest(self):
1115
    self.assertEqual(utils.SplitTime(1), (1, 0))
1116
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1117
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1118
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1119
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1120
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1121
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1122
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1123

    
1124
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1125

    
1126
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1127
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1128
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1129

    
1130
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1131
                     1218448917.481)
1132
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1133

    
1134
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1135
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1136
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1137
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1138
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1139

    
1140

    
1141
class FieldSetTestCase(unittest.TestCase):
1142
  """Test case for FieldSets"""
1143

    
1144
  def testSimpleMatch(self):
1145
    f = utils.FieldSet("a", "b", "c", "def")
1146
    self.failUnless(f.Matches("a"))
1147
    self.failIf(f.Matches("d"), "Substring matched")
1148
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1149
    self.failIf(f.NonMatching(["b", "c"]))
1150
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1151
    self.failUnless(f.NonMatching(["a", "d"]))
1152

    
1153
  def testRegexMatch(self):
1154
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1155
    self.failUnless(f.Matches("b1"))
1156
    self.failUnless(f.Matches("b99"))
1157
    self.failIf(f.Matches("b/1"))
1158
    self.failIf(f.NonMatching(["b12", "c"]))
1159
    self.failUnless(f.NonMatching(["a", "1"]))
1160

    
1161
class TestForceDictType(unittest.TestCase):
1162
  """Test case for ForceDictType"""
1163

    
1164
  def setUp(self):
1165
    self.key_types = {
1166
      'a': constants.VTYPE_INT,
1167
      'b': constants.VTYPE_BOOL,
1168
      'c': constants.VTYPE_STRING,
1169
      'd': constants.VTYPE_SIZE,
1170
      }
1171

    
1172
  def _fdt(self, dict, allowed_values=None):
1173
    if allowed_values is None:
1174
      ForceDictType(dict, self.key_types)
1175
    else:
1176
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1177

    
1178
    return dict
1179

    
1180
  def testSimpleDict(self):
1181
    self.assertEqual(self._fdt({}), {})
1182
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1183
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1184
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1185
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1186
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1187
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1188
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1189
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1190
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1191
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1192
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1193

    
1194
  def testErrors(self):
1195
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1196
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1197
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1198
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1199

    
1200

    
1201
class TestIsAbsNormPath(unittest.TestCase):
1202
  """Testing case for IsProcessAlive"""
1203

    
1204
  def _pathTestHelper(self, path, result):
1205
    if result:
1206
      self.assert_(IsNormAbsPath(path),
1207
          "Path %s should result absolute and normalized" % path)
1208
    else:
1209
      self.assert_(not IsNormAbsPath(path),
1210
          "Path %s should not result absolute and normalized" % path)
1211

    
1212
  def testBase(self):
1213
    self._pathTestHelper('/etc', True)
1214
    self._pathTestHelper('/srv', True)
1215
    self._pathTestHelper('etc', False)
1216
    self._pathTestHelper('/etc/../root', False)
1217
    self._pathTestHelper('/etc/', False)
1218

    
1219

    
1220
class TestSafeEncode(unittest.TestCase):
1221
  """Test case for SafeEncode"""
1222

    
1223
  def testAscii(self):
1224
    for txt in [string.digits, string.letters, string.punctuation]:
1225
      self.failUnlessEqual(txt, SafeEncode(txt))
1226

    
1227
  def testDoubleEncode(self):
1228
    for i in range(255):
1229
      txt = SafeEncode(chr(i))
1230
      self.failUnlessEqual(txt, SafeEncode(txt))
1231

    
1232
  def testUnicode(self):
1233
    # 1024 is high enough to catch non-direct ASCII mappings
1234
    for i in range(1024):
1235
      txt = SafeEncode(unichr(i))
1236
      self.failUnlessEqual(txt, SafeEncode(txt))
1237

    
1238

    
1239
class TestFormatTime(unittest.TestCase):
1240
  """Testing case for FormatTime"""
1241

    
1242
  def testNone(self):
1243
    self.failUnlessEqual(FormatTime(None), "N/A")
1244

    
1245
  def testInvalid(self):
1246
    self.failUnlessEqual(FormatTime(()), "N/A")
1247

    
1248
  def testNow(self):
1249
    # tests that we accept time.time input
1250
    FormatTime(time.time())
1251
    # tests that we accept int input
1252
    FormatTime(int(time.time()))
1253

    
1254

    
1255
class RunInSeparateProcess(unittest.TestCase):
1256
  def test(self):
1257
    for exp in [True, False]:
1258
      def _child():
1259
        return exp
1260

    
1261
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1262

    
1263
  def testArgs(self):
1264
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1265
      def _child(carg1, carg2):
1266
        return carg1 == "Foo" and carg2 == arg
1267

    
1268
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1269

    
1270
  def testPid(self):
1271
    parent_pid = os.getpid()
1272

    
1273
    def _check():
1274
      return os.getpid() == parent_pid
1275

    
1276
    self.failIf(utils.RunInSeparateProcess(_check))
1277

    
1278
  def testSignal(self):
1279
    def _kill():
1280
      os.kill(os.getpid(), signal.SIGTERM)
1281

    
1282
    self.assertRaises(errors.GenericError,
1283
                      utils.RunInSeparateProcess, _kill)
1284

    
1285
  def testException(self):
1286
    def _exc():
1287
      raise errors.GenericError("This is a test")
1288

    
1289
    self.assertRaises(errors.GenericError,
1290
                      utils.RunInSeparateProcess, _exc)
1291

    
1292

    
1293
class TestFingerprintFile(unittest.TestCase):
1294
  def setUp(self):
1295
    self.tmpfile = tempfile.NamedTemporaryFile()
1296

    
1297
  def test(self):
1298
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1299
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1300

    
1301
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1302
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1303
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1304

    
1305

    
1306
class TestUnescapeAndSplit(unittest.TestCase):
1307
  """Testing case for UnescapeAndSplit"""
1308

    
1309
  def setUp(self):
1310
    # testing more that one separator for regexp safety
1311
    self._seps = [",", "+", "."]
1312

    
1313
  def testSimple(self):
1314
    a = ["a", "b", "c", "d"]
1315
    for sep in self._seps:
1316
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1317

    
1318
  def testEscape(self):
1319
    for sep in self._seps:
1320
      a = ["a", "b\\" + sep + "c", "d"]
1321
      b = ["a", "b" + sep + "c", "d"]
1322
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1323

    
1324
  def testDoubleEscape(self):
1325
    for sep in self._seps:
1326
      a = ["a", "b\\\\", "c", "d"]
1327
      b = ["a", "b\\", "c", "d"]
1328
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1329

    
1330
  def testThreeEscape(self):
1331
    for sep in self._seps:
1332
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1333
      b = ["a", "b\\" + sep + "c", "d"]
1334
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1335

    
1336

    
1337
class TestPathJoin(unittest.TestCase):
1338
  """Testing case for PathJoin"""
1339

    
1340
  def testBasicItems(self):
1341
    mlist = ["/a", "b", "c"]
1342
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1343

    
1344
  def testNonAbsPrefix(self):
1345
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1346

    
1347
  def testBackTrack(self):
1348
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1349

    
1350
  def testMultiAbs(self):
1351
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1352

    
1353

    
1354
class TestHostInfo(unittest.TestCase):
1355
  """Testing case for HostInfo"""
1356

    
1357
  def testUppercase(self):
1358
    data = "AbC.example.com"
1359
    self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1360

    
1361
  def testTooLongName(self):
1362
    data = "a.b." + "c" * 255
1363
    self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1364

    
1365
  def testTrailingDot(self):
1366
    data = "a.b.c"
1367
    self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1368

    
1369
  def testInvalidName(self):
1370
    data = [
1371
      "a b",
1372
      "a/b",
1373
      ".a.b",
1374
      "a..b",
1375
      ]
1376
    for value in data:
1377
      self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1378

    
1379
  def testValidName(self):
1380
    data = [
1381
      "a.b",
1382
      "a-b",
1383
      "a_b",
1384
      "a.b.c",
1385
      ]
1386
    for value in data:
1387
      HostInfo.NormalizeName(value)
1388

    
1389

    
1390

    
1391
if __name__ == '__main__':
1392
  testutils.GanetiTestProgram()