Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ bdefe5dd

History | View | Annotate | Download (44.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import stat
31
import md5
32
import signal
33
import socket
34
import shutil
35
import re
36
import select
37
import string
38

    
39
import ganeti
40
import testutils
41
from ganeti import constants
42
from ganeti import utils
43
from ganeti import errors
44
from ganeti.utils import IsProcessAlive, RunCmd, \
45
     RemoveFile, MatchNameComponent, FormatUnit, \
46
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
47
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
48
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
49
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
50
     UnescapeAndSplit, RunParts, PathJoin, HostInfo
51

    
52
from ganeti.errors import LockError, UnitParseError, GenericError, \
53
     ProgrammerError, OpPrereqError
54

    
55

    
56
class TestIsProcessAlive(unittest.TestCase):
57
  """Testing case for IsProcessAlive"""
58

    
59
  def testExists(self):
60
    mypid = os.getpid()
61
    self.assert_(IsProcessAlive(mypid),
62
                 "can't find myself running")
63

    
64
  def testNotExisting(self):
65
    pid_non_existing = os.fork()
66
    if pid_non_existing == 0:
67
      os._exit(0)
68
    elif pid_non_existing < 0:
69
      raise SystemError("can't fork")
70
    os.waitpid(pid_non_existing, 0)
71
    self.assert_(not IsProcessAlive(pid_non_existing),
72
                 "nonexisting process detected")
73

    
74

    
75
class TestPidFileFunctions(unittest.TestCase):
76
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
77

    
78
  def setUp(self):
79
    self.dir = tempfile.mkdtemp()
80
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
81
    utils.DaemonPidFileName = self.f_dpn
82

    
83
  def testPidFileFunctions(self):
84
    pid_file = self.f_dpn('test')
85
    utils.WritePidFile('test')
86
    self.failUnless(os.path.exists(pid_file),
87
                    "PID file should have been created")
88
    read_pid = utils.ReadPidFile(pid_file)
89
    self.failUnlessEqual(read_pid, os.getpid())
90
    self.failUnless(utils.IsProcessAlive(read_pid))
91
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
92
    utils.RemovePidFile('test')
93
    self.failIf(os.path.exists(pid_file),
94
                "PID file should not exist anymore")
95
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
96
                         "ReadPidFile should return 0 for missing pid file")
97
    fh = open(pid_file, "w")
98
    fh.write("blah\n")
99
    fh.close()
100
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
101
                         "ReadPidFile should return 0 for invalid pid file")
102
    utils.RemovePidFile('test')
103
    self.failIf(os.path.exists(pid_file),
104
                "PID file should not exist anymore")
105

    
106
  def testKill(self):
107
    pid_file = self.f_dpn('child')
108
    r_fd, w_fd = os.pipe()
109
    new_pid = os.fork()
110
    if new_pid == 0: #child
111
      utils.WritePidFile('child')
112
      os.write(w_fd, 'a')
113
      signal.pause()
114
      os._exit(0)
115
      return
116
    # else we are in the parent
117
    # wait until the child has written the pid file
118
    os.read(r_fd, 1)
119
    read_pid = utils.ReadPidFile(pid_file)
120
    self.failUnlessEqual(read_pid, new_pid)
121
    self.failUnless(utils.IsProcessAlive(new_pid))
122
    utils.KillProcess(new_pid, waitpid=True)
123
    self.failIf(utils.IsProcessAlive(new_pid))
124
    utils.RemovePidFile('child')
125
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
126

    
127
  def tearDown(self):
128
    for name in os.listdir(self.dir):
129
      os.unlink(os.path.join(self.dir, name))
130
    os.rmdir(self.dir)
131

    
132

    
133
class TestRunCmd(testutils.GanetiTestCase):
134
  """Testing case for the RunCmd function"""
135

    
136
  def setUp(self):
137
    testutils.GanetiTestCase.setUp(self)
138
    self.magic = time.ctime() + " ganeti test"
139
    self.fname = self._CreateTempFile()
140

    
141
  def testOk(self):
142
    """Test successful exit code"""
143
    result = RunCmd("/bin/sh -c 'exit 0'")
144
    self.assertEqual(result.exit_code, 0)
145
    self.assertEqual(result.output, "")
146

    
147
  def testFail(self):
148
    """Test fail exit code"""
149
    result = RunCmd("/bin/sh -c 'exit 1'")
150
    self.assertEqual(result.exit_code, 1)
151
    self.assertEqual(result.output, "")
152

    
153
  def testStdout(self):
154
    """Test standard output"""
155
    cmd = 'echo -n "%s"' % self.magic
156
    result = RunCmd("/bin/sh -c '%s'" % cmd)
157
    self.assertEqual(result.stdout, self.magic)
158
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
159
    self.assertEqual(result.output, "")
160
    self.assertFileContent(self.fname, self.magic)
161

    
162
  def testStderr(self):
163
    """Test standard error"""
164
    cmd = 'echo -n "%s"' % self.magic
165
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
166
    self.assertEqual(result.stderr, self.magic)
167
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
168
    self.assertEqual(result.output, "")
169
    self.assertFileContent(self.fname, self.magic)
170

    
171
  def testCombined(self):
172
    """Test combined output"""
173
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
174
    expected = "A" + self.magic + "B" + self.magic
175
    result = RunCmd("/bin/sh -c '%s'" % cmd)
176
    self.assertEqual(result.output, expected)
177
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
178
    self.assertEqual(result.output, "")
179
    self.assertFileContent(self.fname, expected)
180

    
181
  def testSignal(self):
182
    """Test signal"""
183
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
184
    self.assertEqual(result.signal, 15)
185
    self.assertEqual(result.output, "")
186

    
187
  def testListRun(self):
188
    """Test list runs"""
189
    result = RunCmd(["true"])
190
    self.assertEqual(result.signal, None)
191
    self.assertEqual(result.exit_code, 0)
192
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
193
    self.assertEqual(result.signal, None)
194
    self.assertEqual(result.exit_code, 1)
195
    result = RunCmd(["echo", "-n", self.magic])
196
    self.assertEqual(result.signal, None)
197
    self.assertEqual(result.exit_code, 0)
198
    self.assertEqual(result.stdout, self.magic)
199

    
200
  def testFileEmptyOutput(self):
201
    """Test file output"""
202
    result = RunCmd(["true"], output=self.fname)
203
    self.assertEqual(result.signal, None)
204
    self.assertEqual(result.exit_code, 0)
205
    self.assertFileContent(self.fname, "")
206

    
207
  def testLang(self):
208
    """Test locale environment"""
209
    old_env = os.environ.copy()
210
    try:
211
      os.environ["LANG"] = "en_US.UTF-8"
212
      os.environ["LC_ALL"] = "en_US.UTF-8"
213
      result = RunCmd(["locale"])
214
      for line in result.output.splitlines():
215
        key, value = line.split("=", 1)
216
        # Ignore these variables, they're overridden by LC_ALL
217
        if key == "LANG" or key == "LANGUAGE":
218
          continue
219
        self.failIf(value and value != "C" and value != '"C"',
220
            "Variable %s is set to the invalid value '%s'" % (key, value))
221
    finally:
222
      os.environ = old_env
223

    
224
  def testDefaultCwd(self):
225
    """Test default working directory"""
226
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
227

    
228
  def testCwd(self):
229
    """Test default working directory"""
230
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
232
    cwd = os.getcwd()
233
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
234

    
235
  def testResetEnv(self):
236
    """Test environment reset functionality"""
237
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
238

    
239

    
240
class TestRunParts(unittest.TestCase):
241
  """Testing case for the RunParts function"""
242

    
243
  def setUp(self):
244
    self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
245

    
246
  def tearDown(self):
247
    shutil.rmtree(self.rundir)
248

    
249
  def testEmpty(self):
250
    """Test on an empty dir"""
251
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
252

    
253
  def testSkipWrongName(self):
254
    """Test that wrong files are skipped"""
255
    fname = os.path.join(self.rundir, "00test.dot")
256
    utils.WriteFile(fname, data="")
257
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
258
    relname = os.path.basename(fname)
259
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
260
                         [(relname, constants.RUNPARTS_SKIP, None)])
261

    
262
  def testSkipNonExec(self):
263
    """Test that non executable files are skipped"""
264
    fname = os.path.join(self.rundir, "00test")
265
    utils.WriteFile(fname, data="")
266
    relname = os.path.basename(fname)
267
    self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
268
                         [(relname, constants.RUNPARTS_SKIP, None)])
269

    
270
  def testError(self):
271
    """Test error on a broken executable"""
272
    fname = os.path.join(self.rundir, "00test")
273
    utils.WriteFile(fname, data="")
274
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
275
    (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
276
    self.failUnlessEqual(relname, os.path.basename(fname))
277
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
278
    self.failUnless(error)
279

    
280
  def testSorted(self):
281
    """Test executions are sorted"""
282
    files = []
283
    files.append(os.path.join(self.rundir, "64test"))
284
    files.append(os.path.join(self.rundir, "00test"))
285
    files.append(os.path.join(self.rundir, "42test"))
286

    
287
    for fname in files:
288
      utils.WriteFile(fname, data="")
289

    
290
    results = RunParts(self.rundir, reset_env=True)
291

    
292
    for fname in sorted(files):
293
      self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
294

    
295
  def testOk(self):
296
    """Test correct execution"""
297
    fname = os.path.join(self.rundir, "00test")
298
    utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
299
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
300
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
301
    self.failUnlessEqual(relname, os.path.basename(fname))
302
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
303
    self.failUnlessEqual(runresult.stdout, "ciao")
304

    
305
  def testRunFail(self):
306
    """Test correct execution, with run failure"""
307
    fname = os.path.join(self.rundir, "00test")
308
    utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
309
    os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
310
    (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
311
    self.failUnlessEqual(relname, os.path.basename(fname))
312
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
313
    self.failUnlessEqual(runresult.exit_code, 1)
314
    self.failUnless(runresult.failed)
315

    
316
  def testRunMix(self):
317
    files = []
318
    files.append(os.path.join(self.rundir, "00test"))
319
    files.append(os.path.join(self.rundir, "42test"))
320
    files.append(os.path.join(self.rundir, "64test"))
321
    files.append(os.path.join(self.rundir, "99test"))
322

    
323
    files.sort()
324

    
325
    # 1st has errors in execution
326
    utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
327
    os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
328

    
329
    # 2nd is skipped
330
    utils.WriteFile(files[1], data="")
331

    
332
    # 3rd cannot execute properly
333
    utils.WriteFile(files[2], data="")
334
    os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
335

    
336
    # 4th execs
337
    utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
338
    os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
339

    
340
    results = RunParts(self.rundir, reset_env=True)
341

    
342
    (relname, status, runresult) = results[0]
343
    self.failUnlessEqual(relname, os.path.basename(files[0]))
344
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
345
    self.failUnlessEqual(runresult.exit_code, 1)
346
    self.failUnless(runresult.failed)
347

    
348
    (relname, status, runresult) = results[1]
349
    self.failUnlessEqual(relname, os.path.basename(files[1]))
350
    self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
351
    self.failUnlessEqual(runresult, None)
352

    
353
    (relname, status, runresult) = results[2]
354
    self.failUnlessEqual(relname, os.path.basename(files[2]))
355
    self.failUnlessEqual(status, constants.RUNPARTS_ERR)
356
    self.failUnless(runresult)
357

    
358
    (relname, status, runresult) = results[3]
359
    self.failUnlessEqual(relname, os.path.basename(files[3]))
360
    self.failUnlessEqual(status, constants.RUNPARTS_RUN)
361
    self.failUnlessEqual(runresult.output, "ciao")
362
    self.failUnlessEqual(runresult.exit_code, 0)
363
    self.failUnless(not runresult.failed)
364

    
365

    
366
class TestRemoveFile(unittest.TestCase):
367
  """Test case for the RemoveFile function"""
368

    
369
  def setUp(self):
370
    """Create a temp dir and file for each case"""
371
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
372
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
373
    os.close(fd)
374

    
375
  def tearDown(self):
376
    if os.path.exists(self.tmpfile):
377
      os.unlink(self.tmpfile)
378
    os.rmdir(self.tmpdir)
379

    
380

    
381
  def testIgnoreDirs(self):
382
    """Test that RemoveFile() ignores directories"""
383
    self.assertEqual(None, RemoveFile(self.tmpdir))
384

    
385

    
386
  def testIgnoreNotExisting(self):
387
    """Test that RemoveFile() ignores non-existing files"""
388
    RemoveFile(self.tmpfile)
389
    RemoveFile(self.tmpfile)
390

    
391

    
392
  def testRemoveFile(self):
393
    """Test that RemoveFile does remove a file"""
394
    RemoveFile(self.tmpfile)
395
    if os.path.exists(self.tmpfile):
396
      self.fail("File '%s' not removed" % self.tmpfile)
397

    
398

    
399
  def testRemoveSymlink(self):
400
    """Test that RemoveFile does remove symlinks"""
401
    symlink = self.tmpdir + "/symlink"
402
    os.symlink("no-such-file", symlink)
403
    RemoveFile(symlink)
404
    if os.path.exists(symlink):
405
      self.fail("File '%s' not removed" % symlink)
406
    os.symlink(self.tmpfile, symlink)
407
    RemoveFile(symlink)
408
    if os.path.exists(symlink):
409
      self.fail("File '%s' not removed" % symlink)
410

    
411

    
412
class TestRename(unittest.TestCase):
413
  """Test case for RenameFile"""
414

    
415
  def setUp(self):
416
    """Create a temporary directory"""
417
    self.tmpdir = tempfile.mkdtemp()
418
    self.tmpfile = os.path.join(self.tmpdir, "test1")
419

    
420
    # Touch the file
421
    open(self.tmpfile, "w").close()
422

    
423
  def tearDown(self):
424
    """Remove temporary directory"""
425
    shutil.rmtree(self.tmpdir)
426

    
427
  def testSimpleRename1(self):
428
    """Simple rename 1"""
429
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
430
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
431

    
432
  def testSimpleRename2(self):
433
    """Simple rename 2"""
434
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
435
                     mkdir=True)
436
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
437

    
438
  def testRenameMkdir(self):
439
    """Rename with mkdir"""
440
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
441
                     mkdir=True)
442
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
443
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
444

    
445
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
446
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
447
                     mkdir=True)
448
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
449
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
450
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
451

    
452

    
453
class TestMatchNameComponent(unittest.TestCase):
454
  """Test case for the MatchNameComponent function"""
455

    
456
  def testEmptyList(self):
457
    """Test that there is no match against an empty list"""
458

    
459
    self.failUnlessEqual(MatchNameComponent("", []), None)
460
    self.failUnlessEqual(MatchNameComponent("test", []), None)
461

    
462
  def testSingleMatch(self):
463
    """Test that a single match is performed correctly"""
464
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
465
    for key in "test2", "test2.example", "test2.example.com":
466
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
467

    
468
  def testMultipleMatches(self):
469
    """Test that a multiple match is returned as None"""
470
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
471
    for key in "test1", "test1.example":
472
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
473

    
474
  def testFullMatch(self):
475
    """Test that a full match is returned correctly"""
476
    key1 = "test1"
477
    key2 = "test1.example"
478
    mlist = [key2, key2 + ".com"]
479
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
480
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
481

    
482
  def testCaseInsensitivePartialMatch(self):
483
    """Test for the case_insensitive keyword"""
484
    mlist = ["test1.example.com", "test2.example.net"]
485
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
486
                     "test2.example.net")
487
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
488
                     "test2.example.net")
489
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
490
                     "test2.example.net")
491
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
492
                     "test2.example.net")
493

    
494

    
495
  def testCaseInsensitiveFullMatch(self):
496
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
497
    # Between the two ts1 a full string match non-case insensitive should work
498
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
499
                     None)
500
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
501
                     "ts1.ex")
502
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
503
                     "ts1.ex")
504
    # Between the two ts2 only case differs, so only case-match works
505
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
506
                     "ts2.ex")
507
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
508
                     "Ts2.ex")
509
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
510
                     None)
511

    
512

    
513
class TestFormatUnit(unittest.TestCase):
514
  """Test case for the FormatUnit function"""
515

    
516
  def testMiB(self):
517
    self.assertEqual(FormatUnit(1, 'h'), '1M')
518
    self.assertEqual(FormatUnit(100, 'h'), '100M')
519
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
520

    
521
    self.assertEqual(FormatUnit(1, 'm'), '1')
522
    self.assertEqual(FormatUnit(100, 'm'), '100')
523
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
524

    
525
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
526
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
527
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
528
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
529

    
530
  def testGiB(self):
531
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
532
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
533
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
534
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
535

    
536
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
537
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
538
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
539
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
540

    
541
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
542
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
543
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
544

    
545
  def testTiB(self):
546
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
547
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
548
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
549

    
550
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
551
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
552
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
553

    
554
class TestParseUnit(unittest.TestCase):
555
  """Test case for the ParseUnit function"""
556

    
557
  SCALES = (('', 1),
558
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
559
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
560
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
561

    
562
  def testRounding(self):
563
    self.assertEqual(ParseUnit('0'), 0)
564
    self.assertEqual(ParseUnit('1'), 4)
565
    self.assertEqual(ParseUnit('2'), 4)
566
    self.assertEqual(ParseUnit('3'), 4)
567

    
568
    self.assertEqual(ParseUnit('124'), 124)
569
    self.assertEqual(ParseUnit('125'), 128)
570
    self.assertEqual(ParseUnit('126'), 128)
571
    self.assertEqual(ParseUnit('127'), 128)
572
    self.assertEqual(ParseUnit('128'), 128)
573
    self.assertEqual(ParseUnit('129'), 132)
574
    self.assertEqual(ParseUnit('130'), 132)
575

    
576
  def testFloating(self):
577
    self.assertEqual(ParseUnit('0'), 0)
578
    self.assertEqual(ParseUnit('0.5'), 4)
579
    self.assertEqual(ParseUnit('1.75'), 4)
580
    self.assertEqual(ParseUnit('1.99'), 4)
581
    self.assertEqual(ParseUnit('2.00'), 4)
582
    self.assertEqual(ParseUnit('2.01'), 4)
583
    self.assertEqual(ParseUnit('3.99'), 4)
584
    self.assertEqual(ParseUnit('4.00'), 4)
585
    self.assertEqual(ParseUnit('4.01'), 8)
586
    self.assertEqual(ParseUnit('1.5G'), 1536)
587
    self.assertEqual(ParseUnit('1.8G'), 1844)
588
    self.assertEqual(ParseUnit('8.28T'), 8682212)
589

    
590
  def testSuffixes(self):
591
    for sep in ('', ' ', '   ', "\t", "\t "):
592
      for suffix, scale in TestParseUnit.SCALES:
593
        for func in (lambda x: x, str.lower, str.upper):
594
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
595
                           1024 * scale)
596

    
597
  def testInvalidInput(self):
598
    for sep in ('-', '_', ',', 'a'):
599
      for suffix, _ in TestParseUnit.SCALES:
600
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
601

    
602
    for suffix, _ in TestParseUnit.SCALES:
603
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
604

    
605

    
606
class TestSshKeys(testutils.GanetiTestCase):
607
  """Test case for the AddAuthorizedKey function"""
608

    
609
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
610
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
611
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
612

    
613
  def setUp(self):
614
    testutils.GanetiTestCase.setUp(self)
615
    self.tmpname = self._CreateTempFile()
616
    handle = open(self.tmpname, 'w')
617
    try:
618
      handle.write("%s\n" % TestSshKeys.KEY_A)
619
      handle.write("%s\n" % TestSshKeys.KEY_B)
620
    finally:
621
      handle.close()
622

    
623
  def testAddingNewKey(self):
624
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
625

    
626
    self.assertFileContent(self.tmpname,
627
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
628
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
629
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
630
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
631

    
632
  def testAddingAlmostButNotCompletelyTheSameKey(self):
633
    AddAuthorizedKey(self.tmpname,
634
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
635

    
636
    self.assertFileContent(self.tmpname,
637
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
638
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
639
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
640
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
641

    
642
  def testAddingExistingKeyWithSomeMoreSpaces(self):
643
    AddAuthorizedKey(self.tmpname,
644
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
645

    
646
    self.assertFileContent(self.tmpname,
647
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
648
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
649
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
650

    
651
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
652
    RemoveAuthorizedKey(self.tmpname,
653
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
654

    
655
    self.assertFileContent(self.tmpname,
656
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
657
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
658

    
659
  def testRemovingNonExistingKey(self):
660
    RemoveAuthorizedKey(self.tmpname,
661
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
662

    
663
    self.assertFileContent(self.tmpname,
664
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
665
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
666
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
667

    
668

    
669
class TestEtcHosts(testutils.GanetiTestCase):
670
  """Test functions modifying /etc/hosts"""
671

    
672
  def setUp(self):
673
    testutils.GanetiTestCase.setUp(self)
674
    self.tmpname = self._CreateTempFile()
675
    handle = open(self.tmpname, 'w')
676
    try:
677
      handle.write('# This is a test file for /etc/hosts\n')
678
      handle.write('127.0.0.1\tlocalhost\n')
679
      handle.write('192.168.1.1 router gw\n')
680
    finally:
681
      handle.close()
682

    
683
  def testSettingNewIp(self):
684
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
685

    
686
    self.assertFileContent(self.tmpname,
687
      "# This is a test file for /etc/hosts\n"
688
      "127.0.0.1\tlocalhost\n"
689
      "192.168.1.1 router gw\n"
690
      "1.2.3.4\tmyhost.domain.tld myhost\n")
691
    self.assertFileMode(self.tmpname, 0644)
692

    
693
  def testSettingExistingIp(self):
694
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
695
                     ['myhost'])
696

    
697
    self.assertFileContent(self.tmpname,
698
      "# This is a test file for /etc/hosts\n"
699
      "127.0.0.1\tlocalhost\n"
700
      "192.168.1.1\tmyhost.domain.tld myhost\n")
701
    self.assertFileMode(self.tmpname, 0644)
702

    
703
  def testSettingDuplicateName(self):
704
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
705

    
706
    self.assertFileContent(self.tmpname,
707
      "# This is a test file for /etc/hosts\n"
708
      "127.0.0.1\tlocalhost\n"
709
      "192.168.1.1 router gw\n"
710
      "1.2.3.4\tmyhost\n")
711
    self.assertFileMode(self.tmpname, 0644)
712

    
713
  def testRemovingExistingHost(self):
714
    RemoveEtcHostsEntry(self.tmpname, 'router')
715

    
716
    self.assertFileContent(self.tmpname,
717
      "# This is a test file for /etc/hosts\n"
718
      "127.0.0.1\tlocalhost\n"
719
      "192.168.1.1 gw\n")
720
    self.assertFileMode(self.tmpname, 0644)
721

    
722
  def testRemovingSingleExistingHost(self):
723
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
724

    
725
    self.assertFileContent(self.tmpname,
726
      "# This is a test file for /etc/hosts\n"
727
      "192.168.1.1 router gw\n")
728
    self.assertFileMode(self.tmpname, 0644)
729

    
730
  def testRemovingNonExistingHost(self):
731
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
732

    
733
    self.assertFileContent(self.tmpname,
734
      "# This is a test file for /etc/hosts\n"
735
      "127.0.0.1\tlocalhost\n"
736
      "192.168.1.1 router gw\n")
737
    self.assertFileMode(self.tmpname, 0644)
738

    
739
  def testRemovingAlias(self):
740
    RemoveEtcHostsEntry(self.tmpname, 'gw')
741

    
742
    self.assertFileContent(self.tmpname,
743
      "# This is a test file for /etc/hosts\n"
744
      "127.0.0.1\tlocalhost\n"
745
      "192.168.1.1 router\n")
746
    self.assertFileMode(self.tmpname, 0644)
747

    
748

    
749
class TestShellQuoting(unittest.TestCase):
750
  """Test case for shell quoting functions"""
751

    
752
  def testShellQuote(self):
753
    self.assertEqual(ShellQuote('abc'), "abc")
754
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
755
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
756
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
757
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
758

    
759
  def testShellQuoteArgs(self):
760
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
761
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
762
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
763

    
764

    
765
class TestTcpPing(unittest.TestCase):
766
  """Testcase for TCP version of ping - against listen(2)ing port"""
767

    
768
  def setUp(self):
769
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
770
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
771
    self.listenerport = self.listener.getsockname()[1]
772
    self.listener.listen(1)
773

    
774
  def tearDown(self):
775
    self.listener.shutdown(socket.SHUT_RDWR)
776
    del self.listener
777
    del self.listenerport
778

    
779
  def testTcpPingToLocalHostAccept(self):
780
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
781
                         self.listenerport,
782
                         timeout=10,
783
                         live_port_needed=True,
784
                         source=constants.LOCALHOST_IP_ADDRESS,
785
                         ),
786
                 "failed to connect to test listener")
787

    
788
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
789
                         self.listenerport,
790
                         timeout=10,
791
                         live_port_needed=True,
792
                         ),
793
                 "failed to connect to test listener (no source)")
794

    
795

    
796
class TestTcpPingDeaf(unittest.TestCase):
797
  """Testcase for TCP version of ping - against non listen(2)ing port"""
798

    
799
  def setUp(self):
800
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
801
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
802
    self.deaflistenerport = self.deaflistener.getsockname()[1]
803

    
804
  def tearDown(self):
805
    del self.deaflistener
806
    del self.deaflistenerport
807

    
808
  def testTcpPingToLocalHostAcceptDeaf(self):
809
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
810
                        self.deaflistenerport,
811
                        timeout=constants.TCP_PING_TIMEOUT,
812
                        live_port_needed=True,
813
                        source=constants.LOCALHOST_IP_ADDRESS,
814
                        ), # need successful connect(2)
815
                "successfully connected to deaf listener")
816

    
817
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
818
                        self.deaflistenerport,
819
                        timeout=constants.TCP_PING_TIMEOUT,
820
                        live_port_needed=True,
821
                        ), # need successful connect(2)
822
                "successfully connected to deaf listener (no source addr)")
823

    
824
  def testTcpPingToLocalHostNoAccept(self):
825
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
826
                         self.deaflistenerport,
827
                         timeout=constants.TCP_PING_TIMEOUT,
828
                         live_port_needed=False,
829
                         source=constants.LOCALHOST_IP_ADDRESS,
830
                         ), # ECONNREFUSED is OK
831
                 "failed to ping alive host on deaf port")
832

    
833
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
834
                         self.deaflistenerport,
835
                         timeout=constants.TCP_PING_TIMEOUT,
836
                         live_port_needed=False,
837
                         ), # ECONNREFUSED is OK
838
                 "failed to ping alive host on deaf port (no source addr)")
839

    
840

    
841
class TestOwnIpAddress(unittest.TestCase):
842
  """Testcase for OwnIpAddress"""
843

    
844
  def testOwnLoopback(self):
845
    """check having the loopback ip"""
846
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
847
                    "Should own the loopback address")
848

    
849
  def testNowOwnAddress(self):
850
    """check that I don't own an address"""
851

    
852
    # network 192.0.2.0/24 is reserved for test/documentation as per
853
    # rfc 3330, so we *should* not have an address of this range... if
854
    # this fails, we should extend the test to multiple addresses
855
    DST_IP = "192.0.2.1"
856
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
857

    
858

    
859
class TestListVisibleFiles(unittest.TestCase):
860
  """Test case for ListVisibleFiles"""
861

    
862
  def setUp(self):
863
    self.path = tempfile.mkdtemp()
864

    
865
  def tearDown(self):
866
    shutil.rmtree(self.path)
867

    
868
  def _test(self, files, expected):
869
    # Sort a copy
870
    expected = expected[:]
871
    expected.sort()
872

    
873
    for name in files:
874
      f = open(os.path.join(self.path, name), 'w')
875
      try:
876
        f.write("Test\n")
877
      finally:
878
        f.close()
879

    
880
    found = ListVisibleFiles(self.path)
881
    found.sort()
882

    
883
    self.assertEqual(found, expected)
884

    
885
  def testAllVisible(self):
886
    files = ["a", "b", "c"]
887
    expected = files
888
    self._test(files, expected)
889

    
890
  def testNoneVisible(self):
891
    files = [".a", ".b", ".c"]
892
    expected = []
893
    self._test(files, expected)
894

    
895
  def testSomeVisible(self):
896
    files = ["a", "b", ".c"]
897
    expected = ["a", "b"]
898
    self._test(files, expected)
899

    
900
  def testNonAbsolutePath(self):
901
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
902

    
903
  def testNonNormalizedPath(self):
904
    self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
905
                          "/bin/../tmp")
906

    
907

    
908
class TestNewUUID(unittest.TestCase):
909
  """Test case for NewUUID"""
910

    
911
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
912
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
913

    
914
  def runTest(self):
915
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
916

    
917

    
918
class TestUniqueSequence(unittest.TestCase):
919
  """Test case for UniqueSequence"""
920

    
921
  def _test(self, input, expected):
922
    self.assertEqual(utils.UniqueSequence(input), expected)
923

    
924
  def runTest(self):
925
    # Ordered input
926
    self._test([1, 2, 3], [1, 2, 3])
927
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
928
    self._test([1, 2, 2, 3], [1, 2, 3])
929
    self._test([1, 2, 3, 3], [1, 2, 3])
930

    
931
    # Unordered input
932
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
933
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
934

    
935
    # Strings
936
    self._test(["a", "a"], ["a"])
937
    self._test(["a", "b"], ["a", "b"])
938
    self._test(["a", "b", "a"], ["a", "b"])
939

    
940

    
941
class TestFirstFree(unittest.TestCase):
942
  """Test case for the FirstFree function"""
943

    
944
  def test(self):
945
    """Test FirstFree"""
946
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
947
    self.failUnlessEqual(FirstFree([]), None)
948
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
949
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
950
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
951

    
952

    
953
class TestTailFile(testutils.GanetiTestCase):
954
  """Test case for the TailFile function"""
955

    
956
  def testEmpty(self):
957
    fname = self._CreateTempFile()
958
    self.failUnlessEqual(TailFile(fname), [])
959
    self.failUnlessEqual(TailFile(fname, lines=25), [])
960

    
961
  def testAllLines(self):
962
    data = ["test %d" % i for i in range(30)]
963
    for i in range(30):
964
      fname = self._CreateTempFile()
965
      fd = open(fname, "w")
966
      fd.write("\n".join(data[:i]))
967
      if i > 0:
968
        fd.write("\n")
969
      fd.close()
970
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
971

    
972
  def testPartialLines(self):
973
    data = ["test %d" % i for i in range(30)]
974
    fname = self._CreateTempFile()
975
    fd = open(fname, "w")
976
    fd.write("\n".join(data))
977
    fd.write("\n")
978
    fd.close()
979
    for i in range(1, 30):
980
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
981

    
982
  def testBigFile(self):
983
    data = ["test %d" % i for i in range(30)]
984
    fname = self._CreateTempFile()
985
    fd = open(fname, "w")
986
    fd.write("X" * 1048576)
987
    fd.write("\n")
988
    fd.write("\n".join(data))
989
    fd.write("\n")
990
    fd.close()
991
    for i in range(1, 30):
992
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
993

    
994

    
995
class TestFileLock(unittest.TestCase):
996
  """Test case for the FileLock class"""
997

    
998
  def setUp(self):
999
    self.tmpfile = tempfile.NamedTemporaryFile()
1000
    self.lock = utils.FileLock(self.tmpfile.name)
1001

    
1002
  def testSharedNonblocking(self):
1003
    self.lock.Shared(blocking=False)
1004
    self.lock.Close()
1005

    
1006
  def testExclusiveNonblocking(self):
1007
    self.lock.Exclusive(blocking=False)
1008
    self.lock.Close()
1009

    
1010
  def testUnlockNonblocking(self):
1011
    self.lock.Unlock(blocking=False)
1012
    self.lock.Close()
1013

    
1014
  def testSharedBlocking(self):
1015
    self.lock.Shared(blocking=True)
1016
    self.lock.Close()
1017

    
1018
  def testExclusiveBlocking(self):
1019
    self.lock.Exclusive(blocking=True)
1020
    self.lock.Close()
1021

    
1022
  def testUnlockBlocking(self):
1023
    self.lock.Unlock(blocking=True)
1024
    self.lock.Close()
1025

    
1026
  def testSharedExclusiveUnlock(self):
1027
    self.lock.Shared(blocking=False)
1028
    self.lock.Exclusive(blocking=False)
1029
    self.lock.Unlock(blocking=False)
1030
    self.lock.Close()
1031

    
1032
  def testExclusiveSharedUnlock(self):
1033
    self.lock.Exclusive(blocking=False)
1034
    self.lock.Shared(blocking=False)
1035
    self.lock.Unlock(blocking=False)
1036
    self.lock.Close()
1037

    
1038
  def testCloseShared(self):
1039
    self.lock.Close()
1040
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1041

    
1042
  def testCloseExclusive(self):
1043
    self.lock.Close()
1044
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1045

    
1046
  def testCloseUnlock(self):
1047
    self.lock.Close()
1048
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1049

    
1050

    
1051
class TestTimeFunctions(unittest.TestCase):
1052
  """Test case for time functions"""
1053

    
1054
  def runTest(self):
1055
    self.assertEqual(utils.SplitTime(1), (1, 0))
1056
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1057
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1058
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1059
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1060
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1061
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1062
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1063

    
1064
    self.assertRaises(AssertionError, utils.SplitTime, -1)
1065

    
1066
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1067
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1068
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1069

    
1070
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1071
                     1218448917.481)
1072
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1073

    
1074
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1075
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1076
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1077
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1078
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1079

    
1080

    
1081
class FieldSetTestCase(unittest.TestCase):
1082
  """Test case for FieldSets"""
1083

    
1084
  def testSimpleMatch(self):
1085
    f = utils.FieldSet("a", "b", "c", "def")
1086
    self.failUnless(f.Matches("a"))
1087
    self.failIf(f.Matches("d"), "Substring matched")
1088
    self.failIf(f.Matches("defghi"), "Prefix string matched")
1089
    self.failIf(f.NonMatching(["b", "c"]))
1090
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1091
    self.failUnless(f.NonMatching(["a", "d"]))
1092

    
1093
  def testRegexMatch(self):
1094
    f = utils.FieldSet("a", "b([0-9]+)", "c")
1095
    self.failUnless(f.Matches("b1"))
1096
    self.failUnless(f.Matches("b99"))
1097
    self.failIf(f.Matches("b/1"))
1098
    self.failIf(f.NonMatching(["b12", "c"]))
1099
    self.failUnless(f.NonMatching(["a", "1"]))
1100

    
1101
class TestForceDictType(unittest.TestCase):
1102
  """Test case for ForceDictType"""
1103

    
1104
  def setUp(self):
1105
    self.key_types = {
1106
      'a': constants.VTYPE_INT,
1107
      'b': constants.VTYPE_BOOL,
1108
      'c': constants.VTYPE_STRING,
1109
      'd': constants.VTYPE_SIZE,
1110
      }
1111

    
1112
  def _fdt(self, dict, allowed_values=None):
1113
    if allowed_values is None:
1114
      ForceDictType(dict, self.key_types)
1115
    else:
1116
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1117

    
1118
    return dict
1119

    
1120
  def testSimpleDict(self):
1121
    self.assertEqual(self._fdt({}), {})
1122
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1123
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1124
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1125
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1126
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1127
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1128
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1129
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1130
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1131
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1132
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1133

    
1134
  def testErrors(self):
1135
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1136
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1137
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1138
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1139

    
1140

    
1141
class TestIsAbsNormPath(unittest.TestCase):
1142
  """Testing case for IsProcessAlive"""
1143

    
1144
  def _pathTestHelper(self, path, result):
1145
    if result:
1146
      self.assert_(IsNormAbsPath(path),
1147
          "Path %s should result absolute and normalized" % path)
1148
    else:
1149
      self.assert_(not IsNormAbsPath(path),
1150
          "Path %s should not result absolute and normalized" % path)
1151

    
1152
  def testBase(self):
1153
    self._pathTestHelper('/etc', True)
1154
    self._pathTestHelper('/srv', True)
1155
    self._pathTestHelper('etc', False)
1156
    self._pathTestHelper('/etc/../root', False)
1157
    self._pathTestHelper('/etc/', False)
1158

    
1159

    
1160
class TestSafeEncode(unittest.TestCase):
1161
  """Test case for SafeEncode"""
1162

    
1163
  def testAscii(self):
1164
    for txt in [string.digits, string.letters, string.punctuation]:
1165
      self.failUnlessEqual(txt, SafeEncode(txt))
1166

    
1167
  def testDoubleEncode(self):
1168
    for i in range(255):
1169
      txt = SafeEncode(chr(i))
1170
      self.failUnlessEqual(txt, SafeEncode(txt))
1171

    
1172
  def testUnicode(self):
1173
    # 1024 is high enough to catch non-direct ASCII mappings
1174
    for i in range(1024):
1175
      txt = SafeEncode(unichr(i))
1176
      self.failUnlessEqual(txt, SafeEncode(txt))
1177

    
1178

    
1179
class TestFormatTime(unittest.TestCase):
1180
  """Testing case for FormatTime"""
1181

    
1182
  def testNone(self):
1183
    self.failUnlessEqual(FormatTime(None), "N/A")
1184

    
1185
  def testInvalid(self):
1186
    self.failUnlessEqual(FormatTime(()), "N/A")
1187

    
1188
  def testNow(self):
1189
    # tests that we accept time.time input
1190
    FormatTime(time.time())
1191
    # tests that we accept int input
1192
    FormatTime(int(time.time()))
1193

    
1194

    
1195
class RunInSeparateProcess(unittest.TestCase):
1196
  def test(self):
1197
    for exp in [True, False]:
1198
      def _child():
1199
        return exp
1200

    
1201
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1202

    
1203
  def testArgs(self):
1204
    for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1205
      def _child(carg1, carg2):
1206
        return carg1 == "Foo" and carg2 == arg
1207

    
1208
      self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1209

    
1210
  def testPid(self):
1211
    parent_pid = os.getpid()
1212

    
1213
    def _check():
1214
      return os.getpid() == parent_pid
1215

    
1216
    self.failIf(utils.RunInSeparateProcess(_check))
1217

    
1218
  def testSignal(self):
1219
    def _kill():
1220
      os.kill(os.getpid(), signal.SIGTERM)
1221

    
1222
    self.assertRaises(errors.GenericError,
1223
                      utils.RunInSeparateProcess, _kill)
1224

    
1225
  def testException(self):
1226
    def _exc():
1227
      raise errors.GenericError("This is a test")
1228

    
1229
    self.assertRaises(errors.GenericError,
1230
                      utils.RunInSeparateProcess, _exc)
1231

    
1232

    
1233
class TestFingerprintFile(unittest.TestCase):
1234
  def setUp(self):
1235
    self.tmpfile = tempfile.NamedTemporaryFile()
1236

    
1237
  def test(self):
1238
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1239
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1240

    
1241
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1242
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1243
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1244

    
1245

    
1246
class TestUnescapeAndSplit(unittest.TestCase):
1247
  """Testing case for UnescapeAndSplit"""
1248

    
1249
  def setUp(self):
1250
    # testing more that one separator for regexp safety
1251
    self._seps = [",", "+", "."]
1252

    
1253
  def testSimple(self):
1254
    a = ["a", "b", "c", "d"]
1255
    for sep in self._seps:
1256
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1257

    
1258
  def testEscape(self):
1259
    for sep in self._seps:
1260
      a = ["a", "b\\" + sep + "c", "d"]
1261
      b = ["a", "b" + sep + "c", "d"]
1262
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1263

    
1264
  def testDoubleEscape(self):
1265
    for sep in self._seps:
1266
      a = ["a", "b\\\\", "c", "d"]
1267
      b = ["a", "b\\", "c", "d"]
1268
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1269

    
1270
  def testThreeEscape(self):
1271
    for sep in self._seps:
1272
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1273
      b = ["a", "b\\" + sep + "c", "d"]
1274
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1275

    
1276

    
1277
class TestPathJoin(unittest.TestCase):
1278
  """Testing case for PathJoin"""
1279

    
1280
  def testBasicItems(self):
1281
    mlist = ["/a", "b", "c"]
1282
    self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1283

    
1284
  def testNonAbsPrefix(self):
1285
    self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1286

    
1287
  def testBackTrack(self):
1288
    self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1289

    
1290
  def testMultiAbs(self):
1291
    self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1292

    
1293

    
1294
class TestHostInfo(unittest.TestCase):
1295
  """Testing case for HostInfo"""
1296

    
1297
  def testUppercase(self):
1298
    data = "AbC.example.com"
1299
    self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1300

    
1301
  def testTooLongName(self):
1302
    data = "a.b." + "c" * 255
1303
    self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1304

    
1305
  def testTrailingDot(self):
1306
    data = "a.b.c"
1307
    self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1308

    
1309
  def testInvalidName(self):
1310
    data = [
1311
      "a b",
1312
      "a/b",
1313
      ".a.b",
1314
      "a..b",
1315
      ]
1316
    for value in data:
1317
      self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1318

    
1319
  def testValidName(self):
1320
    data = [
1321
      "a.b",
1322
      "a-b",
1323
      "a_b",
1324
      "a.b.c",
1325
      ]
1326
    for value in data:
1327
      HostInfo.NormalizeName(value)
1328

    
1329

    
1330

    
1331
if __name__ == '__main__':
1332
  testutils.GanetiTestProgram()