Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ bf4daac9

History | View | Annotate | Download (37.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36
import string
37

    
38
import ganeti
39
import testutils
40
from ganeti import constants
41
from ganeti import utils
42
from ganeti import errors
43
from ganeti.utils import IsProcessAlive, RunCmd, \
44
     RemoveFile, MatchNameComponent, FormatUnit, \
45
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
46
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
47
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
48
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
49
     UnescapeAndSplit
50

    
51
from ganeti.errors import LockError, UnitParseError, GenericError, \
52
     ProgrammerError
53

    
54

    
55
class TestIsProcessAlive(unittest.TestCase):
56
  """Testing case for IsProcessAlive"""
57

    
58
  def testExists(self):
59
    mypid = os.getpid()
60
    self.assert_(IsProcessAlive(mypid),
61
                 "can't find myself running")
62

    
63
  def testNotExisting(self):
64
    pid_non_existing = os.fork()
65
    if pid_non_existing == 0:
66
      os._exit(0)
67
    elif pid_non_existing < 0:
68
      raise SystemError("can't fork")
69
    os.waitpid(pid_non_existing, 0)
70
    self.assert_(not IsProcessAlive(pid_non_existing),
71
                 "nonexisting process detected")
72

    
73

    
74
class TestPidFileFunctions(unittest.TestCase):
75
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
76

    
77
  def setUp(self):
78
    self.dir = tempfile.mkdtemp()
79
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
80
    utils.DaemonPidFileName = self.f_dpn
81

    
82
  def testPidFileFunctions(self):
83
    pid_file = self.f_dpn('test')
84
    utils.WritePidFile('test')
85
    self.failUnless(os.path.exists(pid_file),
86
                    "PID file should have been created")
87
    read_pid = utils.ReadPidFile(pid_file)
88
    self.failUnlessEqual(read_pid, os.getpid())
89
    self.failUnless(utils.IsProcessAlive(read_pid))
90
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
91
    utils.RemovePidFile('test')
92
    self.failIf(os.path.exists(pid_file),
93
                "PID file should not exist anymore")
94
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95
                         "ReadPidFile should return 0 for missing pid file")
96
    fh = open(pid_file, "w")
97
    fh.write("blah\n")
98
    fh.close()
99
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
100
                         "ReadPidFile should return 0 for invalid pid file")
101
    utils.RemovePidFile('test')
102
    self.failIf(os.path.exists(pid_file),
103
                "PID file should not exist anymore")
104

    
105
  def testKill(self):
106
    pid_file = self.f_dpn('child')
107
    r_fd, w_fd = os.pipe()
108
    new_pid = os.fork()
109
    if new_pid == 0: #child
110
      utils.WritePidFile('child')
111
      os.write(w_fd, 'a')
112
      signal.pause()
113
      os._exit(0)
114
      return
115
    # else we are in the parent
116
    # wait until the child has written the pid file
117
    os.read(r_fd, 1)
118
    read_pid = utils.ReadPidFile(pid_file)
119
    self.failUnlessEqual(read_pid, new_pid)
120
    self.failUnless(utils.IsProcessAlive(new_pid))
121
    utils.KillProcess(new_pid, waitpid=True)
122
    self.failIf(utils.IsProcessAlive(new_pid))
123
    utils.RemovePidFile('child')
124
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
125

    
126
  def tearDown(self):
127
    for name in os.listdir(self.dir):
128
      os.unlink(os.path.join(self.dir, name))
129
    os.rmdir(self.dir)
130

    
131

    
132
class TestRunCmd(testutils.GanetiTestCase):
133
  """Testing case for the RunCmd function"""
134

    
135
  def setUp(self):
136
    testutils.GanetiTestCase.setUp(self)
137
    self.magic = time.ctime() + " ganeti test"
138
    self.fname = self._CreateTempFile()
139

    
140
  def testOk(self):
141
    """Test successful exit code"""
142
    result = RunCmd("/bin/sh -c 'exit 0'")
143
    self.assertEqual(result.exit_code, 0)
144
    self.assertEqual(result.output, "")
145

    
146
  def testFail(self):
147
    """Test fail exit code"""
148
    result = RunCmd("/bin/sh -c 'exit 1'")
149
    self.assertEqual(result.exit_code, 1)
150
    self.assertEqual(result.output, "")
151

    
152
  def testStdout(self):
153
    """Test standard output"""
154
    cmd = 'echo -n "%s"' % self.magic
155
    result = RunCmd("/bin/sh -c '%s'" % cmd)
156
    self.assertEqual(result.stdout, self.magic)
157
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
158
    self.assertEqual(result.output, "")
159
    self.assertFileContent(self.fname, self.magic)
160

    
161
  def testStderr(self):
162
    """Test standard error"""
163
    cmd = 'echo -n "%s"' % self.magic
164
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
165
    self.assertEqual(result.stderr, self.magic)
166
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
167
    self.assertEqual(result.output, "")
168
    self.assertFileContent(self.fname, self.magic)
169

    
170
  def testCombined(self):
171
    """Test combined output"""
172
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
173
    expected = "A" + self.magic + "B" + self.magic
174
    result = RunCmd("/bin/sh -c '%s'" % cmd)
175
    self.assertEqual(result.output, expected)
176
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
177
    self.assertEqual(result.output, "")
178
    self.assertFileContent(self.fname, expected)
179

    
180
  def testSignal(self):
181
    """Test signal"""
182
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
183
    self.assertEqual(result.signal, 15)
184
    self.assertEqual(result.output, "")
185

    
186
  def testListRun(self):
187
    """Test list runs"""
188
    result = RunCmd(["true"])
189
    self.assertEqual(result.signal, None)
190
    self.assertEqual(result.exit_code, 0)
191
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
192
    self.assertEqual(result.signal, None)
193
    self.assertEqual(result.exit_code, 1)
194
    result = RunCmd(["echo", "-n", self.magic])
195
    self.assertEqual(result.signal, None)
196
    self.assertEqual(result.exit_code, 0)
197
    self.assertEqual(result.stdout, self.magic)
198

    
199
  def testFileEmptyOutput(self):
200
    """Test file output"""
201
    result = RunCmd(["true"], output=self.fname)
202
    self.assertEqual(result.signal, None)
203
    self.assertEqual(result.exit_code, 0)
204
    self.assertFileContent(self.fname, "")
205

    
206
  def testLang(self):
207
    """Test locale environment"""
208
    old_env = os.environ.copy()
209
    try:
210
      os.environ["LANG"] = "en_US.UTF-8"
211
      os.environ["LC_ALL"] = "en_US.UTF-8"
212
      result = RunCmd(["locale"])
213
      for line in result.output.splitlines():
214
        key, value = line.split("=", 1)
215
        # Ignore these variables, they're overridden by LC_ALL
216
        if key == "LANG" or key == "LANGUAGE":
217
          continue
218
        self.failIf(value and value != "C" and value != '"C"',
219
            "Variable %s is set to the invalid value '%s'" % (key, value))
220
    finally:
221
      os.environ = old_env
222

    
223
  def testDefaultCwd(self):
224
    """Test default working directory"""
225
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
226

    
227
  def testCwd(self):
228
    """Test default working directory"""
229
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
230
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
231
    cwd = os.getcwd()
232
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
233

    
234
  def testResetEnv(self):
235
    """Test environment reset functionality"""
236
    self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
237

    
238

    
239
class TestRemoveFile(unittest.TestCase):
240
  """Test case for the RemoveFile function"""
241

    
242
  def setUp(self):
243
    """Create a temp dir and file for each case"""
244
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
245
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
246
    os.close(fd)
247

    
248
  def tearDown(self):
249
    if os.path.exists(self.tmpfile):
250
      os.unlink(self.tmpfile)
251
    os.rmdir(self.tmpdir)
252

    
253

    
254
  def testIgnoreDirs(self):
255
    """Test that RemoveFile() ignores directories"""
256
    self.assertEqual(None, RemoveFile(self.tmpdir))
257

    
258

    
259
  def testIgnoreNotExisting(self):
260
    """Test that RemoveFile() ignores non-existing files"""
261
    RemoveFile(self.tmpfile)
262
    RemoveFile(self.tmpfile)
263

    
264

    
265
  def testRemoveFile(self):
266
    """Test that RemoveFile does remove a file"""
267
    RemoveFile(self.tmpfile)
268
    if os.path.exists(self.tmpfile):
269
      self.fail("File '%s' not removed" % self.tmpfile)
270

    
271

    
272
  def testRemoveSymlink(self):
273
    """Test that RemoveFile does remove symlinks"""
274
    symlink = self.tmpdir + "/symlink"
275
    os.symlink("no-such-file", symlink)
276
    RemoveFile(symlink)
277
    if os.path.exists(symlink):
278
      self.fail("File '%s' not removed" % symlink)
279
    os.symlink(self.tmpfile, symlink)
280
    RemoveFile(symlink)
281
    if os.path.exists(symlink):
282
      self.fail("File '%s' not removed" % symlink)
283

    
284

    
285
class TestRename(unittest.TestCase):
286
  """Test case for RenameFile"""
287

    
288
  def setUp(self):
289
    """Create a temporary directory"""
290
    self.tmpdir = tempfile.mkdtemp()
291
    self.tmpfile = os.path.join(self.tmpdir, "test1")
292

    
293
    # Touch the file
294
    open(self.tmpfile, "w").close()
295

    
296
  def tearDown(self):
297
    """Remove temporary directory"""
298
    shutil.rmtree(self.tmpdir)
299

    
300
  def testSimpleRename1(self):
301
    """Simple rename 1"""
302
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
303
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
304

    
305
  def testSimpleRename2(self):
306
    """Simple rename 2"""
307
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
308
                     mkdir=True)
309
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
310

    
311
  def testRenameMkdir(self):
312
    """Rename with mkdir"""
313
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
314
                     mkdir=True)
315
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
316
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
317

    
318
    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
319
                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
320
                     mkdir=True)
321
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
322
    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
323
    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
324

    
325

    
326
class TestMatchNameComponent(unittest.TestCase):
327
  """Test case for the MatchNameComponent function"""
328

    
329
  def testEmptyList(self):
330
    """Test that there is no match against an empty list"""
331

    
332
    self.failUnlessEqual(MatchNameComponent("", []), None)
333
    self.failUnlessEqual(MatchNameComponent("test", []), None)
334

    
335
  def testSingleMatch(self):
336
    """Test that a single match is performed correctly"""
337
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
338
    for key in "test2", "test2.example", "test2.example.com":
339
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
340

    
341
  def testMultipleMatches(self):
342
    """Test that a multiple match is returned as None"""
343
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
344
    for key in "test1", "test1.example":
345
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
346

    
347
  def testFullMatch(self):
348
    """Test that a full match is returned correctly"""
349
    key1 = "test1"
350
    key2 = "test1.example"
351
    mlist = [key2, key2 + ".com"]
352
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
353
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
354

    
355
  def testCaseInsensitivePartialMatch(self):
356
    """Test for the case_insensitive keyword"""
357
    mlist = ["test1.example.com", "test2.example.net"]
358
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
359
                     "test2.example.net")
360
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
361
                     "test2.example.net")
362
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
363
                     "test2.example.net")
364
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
365
                     "test2.example.net")
366

    
367

    
368
  def testCaseInsensitiveFullMatch(self):
369
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
370
    # Between the two ts1 a full string match non-case insensitive should work
371
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
372
                     None)
373
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
374
                     "ts1.ex")
375
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
376
                     "ts1.ex")
377
    # Between the two ts2 only case differs, so only case-match works
378
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
379
                     "ts2.ex")
380
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
381
                     "Ts2.ex")
382
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
383
                     None)
384

    
385

    
386
class TestFormatUnit(unittest.TestCase):
387
  """Test case for the FormatUnit function"""
388

    
389
  def testMiB(self):
390
    self.assertEqual(FormatUnit(1, 'h'), '1M')
391
    self.assertEqual(FormatUnit(100, 'h'), '100M')
392
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
393

    
394
    self.assertEqual(FormatUnit(1, 'm'), '1')
395
    self.assertEqual(FormatUnit(100, 'm'), '100')
396
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
397

    
398
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
399
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
400
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
401
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
402

    
403
  def testGiB(self):
404
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
405
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
406
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
407
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
408

    
409
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
410
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
411
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
412
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
413

    
414
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
415
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
416
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
417

    
418
  def testTiB(self):
419
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
420
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
421
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
422

    
423
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
424
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
425
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
426

    
427
class TestParseUnit(unittest.TestCase):
428
  """Test case for the ParseUnit function"""
429

    
430
  SCALES = (('', 1),
431
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
432
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
433
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
434

    
435
  def testRounding(self):
436
    self.assertEqual(ParseUnit('0'), 0)
437
    self.assertEqual(ParseUnit('1'), 4)
438
    self.assertEqual(ParseUnit('2'), 4)
439
    self.assertEqual(ParseUnit('3'), 4)
440

    
441
    self.assertEqual(ParseUnit('124'), 124)
442
    self.assertEqual(ParseUnit('125'), 128)
443
    self.assertEqual(ParseUnit('126'), 128)
444
    self.assertEqual(ParseUnit('127'), 128)
445
    self.assertEqual(ParseUnit('128'), 128)
446
    self.assertEqual(ParseUnit('129'), 132)
447
    self.assertEqual(ParseUnit('130'), 132)
448

    
449
  def testFloating(self):
450
    self.assertEqual(ParseUnit('0'), 0)
451
    self.assertEqual(ParseUnit('0.5'), 4)
452
    self.assertEqual(ParseUnit('1.75'), 4)
453
    self.assertEqual(ParseUnit('1.99'), 4)
454
    self.assertEqual(ParseUnit('2.00'), 4)
455
    self.assertEqual(ParseUnit('2.01'), 4)
456
    self.assertEqual(ParseUnit('3.99'), 4)
457
    self.assertEqual(ParseUnit('4.00'), 4)
458
    self.assertEqual(ParseUnit('4.01'), 8)
459
    self.assertEqual(ParseUnit('1.5G'), 1536)
460
    self.assertEqual(ParseUnit('1.8G'), 1844)
461
    self.assertEqual(ParseUnit('8.28T'), 8682212)
462

    
463
  def testSuffixes(self):
464
    for sep in ('', ' ', '   ', "\t", "\t "):
465
      for suffix, scale in TestParseUnit.SCALES:
466
        for func in (lambda x: x, str.lower, str.upper):
467
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
468
                           1024 * scale)
469

    
470
  def testInvalidInput(self):
471
    for sep in ('-', '_', ',', 'a'):
472
      for suffix, _ in TestParseUnit.SCALES:
473
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
474

    
475
    for suffix, _ in TestParseUnit.SCALES:
476
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
477

    
478

    
479
class TestSshKeys(testutils.GanetiTestCase):
480
  """Test case for the AddAuthorizedKey function"""
481

    
482
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
483
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
484
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
485

    
486
  def setUp(self):
487
    testutils.GanetiTestCase.setUp(self)
488
    self.tmpname = self._CreateTempFile()
489
    handle = open(self.tmpname, 'w')
490
    try:
491
      handle.write("%s\n" % TestSshKeys.KEY_A)
492
      handle.write("%s\n" % TestSshKeys.KEY_B)
493
    finally:
494
      handle.close()
495

    
496
  def testAddingNewKey(self):
497
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
498

    
499
    self.assertFileContent(self.tmpname,
500
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
501
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
502
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
503
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
504

    
505
  def testAddingAlmostButNotCompletelyTheSameKey(self):
506
    AddAuthorizedKey(self.tmpname,
507
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
508

    
509
    self.assertFileContent(self.tmpname,
510
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
511
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
512
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
513
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
514

    
515
  def testAddingExistingKeyWithSomeMoreSpaces(self):
516
    AddAuthorizedKey(self.tmpname,
517
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
518

    
519
    self.assertFileContent(self.tmpname,
520
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
521
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
522
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
523

    
524
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
525
    RemoveAuthorizedKey(self.tmpname,
526
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
527

    
528
    self.assertFileContent(self.tmpname,
529
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
530
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
531

    
532
  def testRemovingNonExistingKey(self):
533
    RemoveAuthorizedKey(self.tmpname,
534
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
535

    
536
    self.assertFileContent(self.tmpname,
537
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
538
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
539
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
540

    
541

    
542
class TestEtcHosts(testutils.GanetiTestCase):
543
  """Test functions modifying /etc/hosts"""
544

    
545
  def setUp(self):
546
    testutils.GanetiTestCase.setUp(self)
547
    self.tmpname = self._CreateTempFile()
548
    handle = open(self.tmpname, 'w')
549
    try:
550
      handle.write('# This is a test file for /etc/hosts\n')
551
      handle.write('127.0.0.1\tlocalhost\n')
552
      handle.write('192.168.1.1 router gw\n')
553
    finally:
554
      handle.close()
555

    
556
  def testSettingNewIp(self):
557
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
558

    
559
    self.assertFileContent(self.tmpname,
560
      "# This is a test file for /etc/hosts\n"
561
      "127.0.0.1\tlocalhost\n"
562
      "192.168.1.1 router gw\n"
563
      "1.2.3.4\tmyhost.domain.tld myhost\n")
564
    self.assertFileMode(self.tmpname, 0644)
565

    
566
  def testSettingExistingIp(self):
567
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
568
                     ['myhost'])
569

    
570
    self.assertFileContent(self.tmpname,
571
      "# This is a test file for /etc/hosts\n"
572
      "127.0.0.1\tlocalhost\n"
573
      "192.168.1.1\tmyhost.domain.tld myhost\n")
574
    self.assertFileMode(self.tmpname, 0644)
575

    
576
  def testSettingDuplicateName(self):
577
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
578

    
579
    self.assertFileContent(self.tmpname,
580
      "# This is a test file for /etc/hosts\n"
581
      "127.0.0.1\tlocalhost\n"
582
      "192.168.1.1 router gw\n"
583
      "1.2.3.4\tmyhost\n")
584
    self.assertFileMode(self.tmpname, 0644)
585

    
586
  def testRemovingExistingHost(self):
587
    RemoveEtcHostsEntry(self.tmpname, 'router')
588

    
589
    self.assertFileContent(self.tmpname,
590
      "# This is a test file for /etc/hosts\n"
591
      "127.0.0.1\tlocalhost\n"
592
      "192.168.1.1 gw\n")
593
    self.assertFileMode(self.tmpname, 0644)
594

    
595
  def testRemovingSingleExistingHost(self):
596
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
597

    
598
    self.assertFileContent(self.tmpname,
599
      "# This is a test file for /etc/hosts\n"
600
      "192.168.1.1 router gw\n")
601
    self.assertFileMode(self.tmpname, 0644)
602

    
603
  def testRemovingNonExistingHost(self):
604
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
605

    
606
    self.assertFileContent(self.tmpname,
607
      "# This is a test file for /etc/hosts\n"
608
      "127.0.0.1\tlocalhost\n"
609
      "192.168.1.1 router gw\n")
610
    self.assertFileMode(self.tmpname, 0644)
611

    
612
  def testRemovingAlias(self):
613
    RemoveEtcHostsEntry(self.tmpname, 'gw')
614

    
615
    self.assertFileContent(self.tmpname,
616
      "# This is a test file for /etc/hosts\n"
617
      "127.0.0.1\tlocalhost\n"
618
      "192.168.1.1 router\n")
619
    self.assertFileMode(self.tmpname, 0644)
620

    
621

    
622
class TestShellQuoting(unittest.TestCase):
623
  """Test case for shell quoting functions"""
624

    
625
  def testShellQuote(self):
626
    self.assertEqual(ShellQuote('abc'), "abc")
627
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
628
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
629
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
630
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
631

    
632
  def testShellQuoteArgs(self):
633
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
634
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
635
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
636

    
637

    
638
class TestTcpPing(unittest.TestCase):
639
  """Testcase for TCP version of ping - against listen(2)ing port"""
640

    
641
  def setUp(self):
642
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
643
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
644
    self.listenerport = self.listener.getsockname()[1]
645
    self.listener.listen(1)
646

    
647
  def tearDown(self):
648
    self.listener.shutdown(socket.SHUT_RDWR)
649
    del self.listener
650
    del self.listenerport
651

    
652
  def testTcpPingToLocalHostAccept(self):
653
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
654
                         self.listenerport,
655
                         timeout=10,
656
                         live_port_needed=True,
657
                         source=constants.LOCALHOST_IP_ADDRESS,
658
                         ),
659
                 "failed to connect to test listener")
660

    
661
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
662
                         self.listenerport,
663
                         timeout=10,
664
                         live_port_needed=True,
665
                         ),
666
                 "failed to connect to test listener (no source)")
667

    
668

    
669
class TestTcpPingDeaf(unittest.TestCase):
670
  """Testcase for TCP version of ping - against non listen(2)ing port"""
671

    
672
  def setUp(self):
673
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
674
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
675
    self.deaflistenerport = self.deaflistener.getsockname()[1]
676

    
677
  def tearDown(self):
678
    del self.deaflistener
679
    del self.deaflistenerport
680

    
681
  def testTcpPingToLocalHostAcceptDeaf(self):
682
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
683
                        self.deaflistenerport,
684
                        timeout=constants.TCP_PING_TIMEOUT,
685
                        live_port_needed=True,
686
                        source=constants.LOCALHOST_IP_ADDRESS,
687
                        ), # need successful connect(2)
688
                "successfully connected to deaf listener")
689

    
690
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
691
                        self.deaflistenerport,
692
                        timeout=constants.TCP_PING_TIMEOUT,
693
                        live_port_needed=True,
694
                        ), # need successful connect(2)
695
                "successfully connected to deaf listener (no source addr)")
696

    
697
  def testTcpPingToLocalHostNoAccept(self):
698
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
699
                         self.deaflistenerport,
700
                         timeout=constants.TCP_PING_TIMEOUT,
701
                         live_port_needed=False,
702
                         source=constants.LOCALHOST_IP_ADDRESS,
703
                         ), # ECONNREFUSED is OK
704
                 "failed to ping alive host on deaf port")
705

    
706
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
707
                         self.deaflistenerport,
708
                         timeout=constants.TCP_PING_TIMEOUT,
709
                         live_port_needed=False,
710
                         ), # ECONNREFUSED is OK
711
                 "failed to ping alive host on deaf port (no source addr)")
712

    
713

    
714
class TestOwnIpAddress(unittest.TestCase):
715
  """Testcase for OwnIpAddress"""
716

    
717
  def testOwnLoopback(self):
718
    """check having the loopback ip"""
719
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
720
                    "Should own the loopback address")
721

    
722
  def testNowOwnAddress(self):
723
    """check that I don't own an address"""
724

    
725
    # network 192.0.2.0/24 is reserved for test/documentation as per
726
    # rfc 3330, so we *should* not have an address of this range... if
727
    # this fails, we should extend the test to multiple addresses
728
    DST_IP = "192.0.2.1"
729
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
730

    
731

    
732
class TestListVisibleFiles(unittest.TestCase):
733
  """Test case for ListVisibleFiles"""
734

    
735
  def setUp(self):
736
    self.path = tempfile.mkdtemp()
737

    
738
  def tearDown(self):
739
    shutil.rmtree(self.path)
740

    
741
  def _test(self, files, expected):
742
    # Sort a copy
743
    expected = expected[:]
744
    expected.sort()
745

    
746
    for name in files:
747
      f = open(os.path.join(self.path, name), 'w')
748
      try:
749
        f.write("Test\n")
750
      finally:
751
        f.close()
752

    
753
    found = ListVisibleFiles(self.path)
754
    found.sort()
755

    
756
    self.assertEqual(found, expected)
757

    
758
  def testAllVisible(self):
759
    files = ["a", "b", "c"]
760
    expected = files
761
    self._test(files, expected)
762

    
763
  def testNoneVisible(self):
764
    files = [".a", ".b", ".c"]
765
    expected = []
766
    self._test(files, expected)
767

    
768
  def testSomeVisible(self):
769
    files = ["a", "b", ".c"]
770
    expected = ["a", "b"]
771
    self._test(files, expected)
772

    
773

    
774
class TestNewUUID(unittest.TestCase):
775
  """Test case for NewUUID"""
776

    
777
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
778
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
779

    
780
  def runTest(self):
781
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
782

    
783

    
784
class TestUniqueSequence(unittest.TestCase):
785
  """Test case for UniqueSequence"""
786

    
787
  def _test(self, input, expected):
788
    self.assertEqual(utils.UniqueSequence(input), expected)
789

    
790
  def runTest(self):
791
    # Ordered input
792
    self._test([1, 2, 3], [1, 2, 3])
793
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
794
    self._test([1, 2, 2, 3], [1, 2, 3])
795
    self._test([1, 2, 3, 3], [1, 2, 3])
796

    
797
    # Unordered input
798
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
799
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
800

    
801
    # Strings
802
    self._test(["a", "a"], ["a"])
803
    self._test(["a", "b"], ["a", "b"])
804
    self._test(["a", "b", "a"], ["a", "b"])
805

    
806

    
807
class TestFirstFree(unittest.TestCase):
808
  """Test case for the FirstFree function"""
809

    
810
  def test(self):
811
    """Test FirstFree"""
812
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
813
    self.failUnlessEqual(FirstFree([]), None)
814
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
815
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
816
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
817

    
818

    
819
class TestTailFile(testutils.GanetiTestCase):
820
  """Test case for the TailFile function"""
821

    
822
  def testEmpty(self):
823
    fname = self._CreateTempFile()
824
    self.failUnlessEqual(TailFile(fname), [])
825
    self.failUnlessEqual(TailFile(fname, lines=25), [])
826

    
827
  def testAllLines(self):
828
    data = ["test %d" % i for i in range(30)]
829
    for i in range(30):
830
      fname = self._CreateTempFile()
831
      fd = open(fname, "w")
832
      fd.write("\n".join(data[:i]))
833
      if i > 0:
834
        fd.write("\n")
835
      fd.close()
836
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
837

    
838
  def testPartialLines(self):
839
    data = ["test %d" % i for i in range(30)]
840
    fname = self._CreateTempFile()
841
    fd = open(fname, "w")
842
    fd.write("\n".join(data))
843
    fd.write("\n")
844
    fd.close()
845
    for i in range(1, 30):
846
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
847

    
848
  def testBigFile(self):
849
    data = ["test %d" % i for i in range(30)]
850
    fname = self._CreateTempFile()
851
    fd = open(fname, "w")
852
    fd.write("X" * 1048576)
853
    fd.write("\n")
854
    fd.write("\n".join(data))
855
    fd.write("\n")
856
    fd.close()
857
    for i in range(1, 30):
858
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
859

    
860

    
861
class TestFileLock(unittest.TestCase):
862
  """Test case for the FileLock class"""
863

    
864
  def setUp(self):
865
    self.tmpfile = tempfile.NamedTemporaryFile()
866
    self.lock = utils.FileLock(self.tmpfile.name)
867

    
868
  def testSharedNonblocking(self):
869
    self.lock.Shared(blocking=False)
870
    self.lock.Close()
871

    
872
  def testExclusiveNonblocking(self):
873
    self.lock.Exclusive(blocking=False)
874
    self.lock.Close()
875

    
876
  def testUnlockNonblocking(self):
877
    self.lock.Unlock(blocking=False)
878
    self.lock.Close()
879

    
880
  def testSharedBlocking(self):
881
    self.lock.Shared(blocking=True)
882
    self.lock.Close()
883

    
884
  def testExclusiveBlocking(self):
885
    self.lock.Exclusive(blocking=True)
886
    self.lock.Close()
887

    
888
  def testUnlockBlocking(self):
889
    self.lock.Unlock(blocking=True)
890
    self.lock.Close()
891

    
892
  def testSharedExclusiveUnlock(self):
893
    self.lock.Shared(blocking=False)
894
    self.lock.Exclusive(blocking=False)
895
    self.lock.Unlock(blocking=False)
896
    self.lock.Close()
897

    
898
  def testExclusiveSharedUnlock(self):
899
    self.lock.Exclusive(blocking=False)
900
    self.lock.Shared(blocking=False)
901
    self.lock.Unlock(blocking=False)
902
    self.lock.Close()
903

    
904
  def testCloseShared(self):
905
    self.lock.Close()
906
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
907

    
908
  def testCloseExclusive(self):
909
    self.lock.Close()
910
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
911

    
912
  def testCloseUnlock(self):
913
    self.lock.Close()
914
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
915

    
916

    
917
class TestTimeFunctions(unittest.TestCase):
918
  """Test case for time functions"""
919

    
920
  def runTest(self):
921
    self.assertEqual(utils.SplitTime(1), (1, 0))
922
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
923
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
924
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
925
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
926
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
927
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
928
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
929

    
930
    self.assertRaises(AssertionError, utils.SplitTime, -1)
931

    
932
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
933
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
934
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
935

    
936
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
937
                     1218448917.481)
938
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
939

    
940
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
941
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
942
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
943
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
944
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
945

    
946

    
947
class FieldSetTestCase(unittest.TestCase):
948
  """Test case for FieldSets"""
949

    
950
  def testSimpleMatch(self):
951
    f = utils.FieldSet("a", "b", "c", "def")
952
    self.failUnless(f.Matches("a"))
953
    self.failIf(f.Matches("d"), "Substring matched")
954
    self.failIf(f.Matches("defghi"), "Prefix string matched")
955
    self.failIf(f.NonMatching(["b", "c"]))
956
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
957
    self.failUnless(f.NonMatching(["a", "d"]))
958

    
959
  def testRegexMatch(self):
960
    f = utils.FieldSet("a", "b([0-9]+)", "c")
961
    self.failUnless(f.Matches("b1"))
962
    self.failUnless(f.Matches("b99"))
963
    self.failIf(f.Matches("b/1"))
964
    self.failIf(f.NonMatching(["b12", "c"]))
965
    self.failUnless(f.NonMatching(["a", "1"]))
966

    
967
class TestForceDictType(unittest.TestCase):
968
  """Test case for ForceDictType"""
969

    
970
  def setUp(self):
971
    self.key_types = {
972
      'a': constants.VTYPE_INT,
973
      'b': constants.VTYPE_BOOL,
974
      'c': constants.VTYPE_STRING,
975
      'd': constants.VTYPE_SIZE,
976
      }
977

    
978
  def _fdt(self, dict, allowed_values=None):
979
    if allowed_values is None:
980
      ForceDictType(dict, self.key_types)
981
    else:
982
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
983

    
984
    return dict
985

    
986
  def testSimpleDict(self):
987
    self.assertEqual(self._fdt({}), {})
988
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
989
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
990
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
991
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
992
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
993
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
994
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
995
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
996
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
997
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
998
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
999

    
1000
  def testErrors(self):
1001
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1002
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1003
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1004
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1005

    
1006

    
1007
class TestIsAbsNormPath(unittest.TestCase):
1008
  """Testing case for IsProcessAlive"""
1009

    
1010
  def _pathTestHelper(self, path, result):
1011
    if result:
1012
      self.assert_(IsNormAbsPath(path),
1013
          "Path %s should result absolute and normalized" % path)
1014
    else:
1015
      self.assert_(not IsNormAbsPath(path),
1016
          "Path %s should not result absolute and normalized" % path)
1017

    
1018
  def testBase(self):
1019
    self._pathTestHelper('/etc', True)
1020
    self._pathTestHelper('/srv', True)
1021
    self._pathTestHelper('etc', False)
1022
    self._pathTestHelper('/etc/../root', False)
1023
    self._pathTestHelper('/etc/', False)
1024

    
1025

    
1026
class TestSafeEncode(unittest.TestCase):
1027
  """Test case for SafeEncode"""
1028

    
1029
  def testAscii(self):
1030
    for txt in [string.digits, string.letters, string.punctuation]:
1031
      self.failUnlessEqual(txt, SafeEncode(txt))
1032

    
1033
  def testDoubleEncode(self):
1034
    for i in range(255):
1035
      txt = SafeEncode(chr(i))
1036
      self.failUnlessEqual(txt, SafeEncode(txt))
1037

    
1038
  def testUnicode(self):
1039
    # 1024 is high enough to catch non-direct ASCII mappings
1040
    for i in range(1024):
1041
      txt = SafeEncode(unichr(i))
1042
      self.failUnlessEqual(txt, SafeEncode(txt))
1043

    
1044

    
1045
class TestFormatTime(unittest.TestCase):
1046
  """Testing case for FormatTime"""
1047

    
1048
  def testNone(self):
1049
    self.failUnlessEqual(FormatTime(None), "N/A")
1050

    
1051
  def testInvalid(self):
1052
    self.failUnlessEqual(FormatTime(()), "N/A")
1053

    
1054
  def testNow(self):
1055
    # tests that we accept time.time input
1056
    FormatTime(time.time())
1057
    # tests that we accept int input
1058
    FormatTime(int(time.time()))
1059

    
1060

    
1061
class RunInSeparateProcess(unittest.TestCase):
1062
  def test(self):
1063
    for exp in [True, False]:
1064
      def _child():
1065
        return exp
1066

    
1067
      self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1068

    
1069
  def testPid(self):
1070
    parent_pid = os.getpid()
1071

    
1072
    def _check():
1073
      return os.getpid() == parent_pid
1074

    
1075
    self.failIf(utils.RunInSeparateProcess(_check))
1076

    
1077
  def testSignal(self):
1078
    def _kill():
1079
      os.kill(os.getpid(), signal.SIGTERM)
1080

    
1081
    self.assertRaises(errors.GenericError,
1082
                      utils.RunInSeparateProcess, _kill)
1083

    
1084
  def testException(self):
1085
    def _exc():
1086
      raise errors.GenericError("This is a test")
1087

    
1088
    self.assertRaises(errors.GenericError,
1089
                      utils.RunInSeparateProcess, _exc)
1090

    
1091

    
1092
class TestFingerprintFile(unittest.TestCase):
1093
  def setUp(self):
1094
    self.tmpfile = tempfile.NamedTemporaryFile()
1095

    
1096
  def test(self):
1097
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1098
                     "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1099

    
1100
    utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1101
    self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1102
                     "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1103

    
1104

    
1105
class TestUnescapeAndSplit(unittest.TestCase):
1106
  """Testing case for UnescapeAndSplit"""
1107

    
1108
  def setUp(self):
1109
    # testing more that one separator for regexp safety
1110
    self._seps = [",", "+", "."]
1111

    
1112
  def testSimple(self):
1113
    a = ["a", "b", "c", "d"]
1114
    for sep in self._seps:
1115
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1116

    
1117
  def testEscape(self):
1118
    for sep in self._seps:
1119
      a = ["a", "b\\" + sep + "c", "d"]
1120
      b = ["a", "b" + sep + "c", "d"]
1121
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1122

    
1123
  def testDoubleEscape(self):
1124
    for sep in self._seps:
1125
      a = ["a", "b\\\\", "c", "d"]
1126
      b = ["a", "b\\", "c", "d"]
1127
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1128

    
1129
  def testThreeEscape(self):
1130
    for sep in self._seps:
1131
      a = ["a", "b\\\\\\" + sep + "c", "d"]
1132
      b = ["a", "b\\" + sep + "c", "d"]
1133
      self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1134

    
1135

    
1136
if __name__ == '__main__':
1137
  testutils.GanetiTestProgram()