Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ da961187

History | View | Annotate | Download (32.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti import errors
42
from ganeti.utils import IsProcessAlive, RunCmd, \
43
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
44
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
45
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
46
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
47
     TailFile, ForceDictType, IsNormAbsPath
48

    
49
from ganeti.errors import LockError, UnitParseError, GenericError, \
50
     ProgrammerError
51

    
52

    
53
class TestIsProcessAlive(unittest.TestCase):
54
  """Testing case for IsProcessAlive"""
55

    
56
  def testExists(self):
57
    mypid = os.getpid()
58
    self.assert_(IsProcessAlive(mypid),
59
                 "can't find myself running")
60

    
61
  def testNotExisting(self):
62
    pid_non_existing = os.fork()
63
    if pid_non_existing == 0:
64
      os._exit(0)
65
    elif pid_non_existing < 0:
66
      raise SystemError("can't fork")
67
    os.waitpid(pid_non_existing, 0)
68
    self.assert_(not IsProcessAlive(pid_non_existing),
69
                 "nonexisting process detected")
70

    
71

    
72
class TestPidFileFunctions(unittest.TestCase):
73
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
74

    
75
  def setUp(self):
76
    self.dir = tempfile.mkdtemp()
77
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
78
    utils.DaemonPidFileName = self.f_dpn
79

    
80
  def testPidFileFunctions(self):
81
    pid_file = self.f_dpn('test')
82
    utils.WritePidFile('test')
83
    self.failUnless(os.path.exists(pid_file),
84
                    "PID file should have been created")
85
    read_pid = utils.ReadPidFile(pid_file)
86
    self.failUnlessEqual(read_pid, os.getpid())
87
    self.failUnless(utils.IsProcessAlive(read_pid))
88
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
89
    utils.RemovePidFile('test')
90
    self.failIf(os.path.exists(pid_file),
91
                "PID file should not exist anymore")
92
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
93
                         "ReadPidFile should return 0 for missing pid file")
94
    fh = open(pid_file, "w")
95
    fh.write("blah\n")
96
    fh.close()
97
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
98
                         "ReadPidFile should return 0 for invalid pid file")
99
    utils.RemovePidFile('test')
100
    self.failIf(os.path.exists(pid_file),
101
                "PID file should not exist anymore")
102

    
103
  def testKill(self):
104
    pid_file = self.f_dpn('child')
105
    r_fd, w_fd = os.pipe()
106
    new_pid = os.fork()
107
    if new_pid == 0: #child
108
      utils.WritePidFile('child')
109
      os.write(w_fd, 'a')
110
      signal.pause()
111
      os._exit(0)
112
      return
113
    # else we are in the parent
114
    # wait until the child has written the pid file
115
    os.read(r_fd, 1)
116
    read_pid = utils.ReadPidFile(pid_file)
117
    self.failUnlessEqual(read_pid, new_pid)
118
    self.failUnless(utils.IsProcessAlive(new_pid))
119
    utils.KillProcess(new_pid, waitpid=True)
120
    self.failIf(utils.IsProcessAlive(new_pid))
121
    utils.RemovePidFile('child')
122
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
123

    
124
  def tearDown(self):
125
    for name in os.listdir(self.dir):
126
      os.unlink(os.path.join(self.dir, name))
127
    os.rmdir(self.dir)
128

    
129

    
130
class TestRunCmd(testutils.GanetiTestCase):
131
  """Testing case for the RunCmd function"""
132

    
133
  def setUp(self):
134
    testutils.GanetiTestCase.setUp(self)
135
    self.magic = time.ctime() + " ganeti test"
136
    self.fname = self._CreateTempFile()
137

    
138
  def testOk(self):
139
    """Test successful exit code"""
140
    result = RunCmd("/bin/sh -c 'exit 0'")
141
    self.assertEqual(result.exit_code, 0)
142
    self.assertEqual(result.output, "")
143

    
144
  def testFail(self):
145
    """Test fail exit code"""
146
    result = RunCmd("/bin/sh -c 'exit 1'")
147
    self.assertEqual(result.exit_code, 1)
148
    self.assertEqual(result.output, "")
149

    
150
  def testStdout(self):
151
    """Test standard output"""
152
    cmd = 'echo -n "%s"' % self.magic
153
    result = RunCmd("/bin/sh -c '%s'" % cmd)
154
    self.assertEqual(result.stdout, self.magic)
155
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
156
    self.assertEqual(result.output, "")
157
    self.assertFileContent(self.fname, self.magic)
158

    
159
  def testStderr(self):
160
    """Test standard error"""
161
    cmd = 'echo -n "%s"' % self.magic
162
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
163
    self.assertEqual(result.stderr, self.magic)
164
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
165
    self.assertEqual(result.output, "")
166
    self.assertFileContent(self.fname, self.magic)
167

    
168
  def testCombined(self):
169
    """Test combined output"""
170
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
171
    expected = "A" + self.magic + "B" + self.magic
172
    result = RunCmd("/bin/sh -c '%s'" % cmd)
173
    self.assertEqual(result.output, expected)
174
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
175
    self.assertEqual(result.output, "")
176
    self.assertFileContent(self.fname, expected)
177

    
178
  def testSignal(self):
179
    """Test signal"""
180
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
181
    self.assertEqual(result.signal, 15)
182
    self.assertEqual(result.output, "")
183

    
184
  def testListRun(self):
185
    """Test list runs"""
186
    result = RunCmd(["true"])
187
    self.assertEqual(result.signal, None)
188
    self.assertEqual(result.exit_code, 0)
189
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
190
    self.assertEqual(result.signal, None)
191
    self.assertEqual(result.exit_code, 1)
192
    result = RunCmd(["echo", "-n", self.magic])
193
    self.assertEqual(result.signal, None)
194
    self.assertEqual(result.exit_code, 0)
195
    self.assertEqual(result.stdout, self.magic)
196

    
197
  def testFileEmptyOutput(self):
198
    """Test file output"""
199
    result = RunCmd(["true"], output=self.fname)
200
    self.assertEqual(result.signal, None)
201
    self.assertEqual(result.exit_code, 0)
202
    self.assertFileContent(self.fname, "")
203

    
204
  def testLang(self):
205
    """Test locale environment"""
206
    old_env = os.environ.copy()
207
    try:
208
      os.environ["LANG"] = "en_US.UTF-8"
209
      os.environ["LC_ALL"] = "en_US.UTF-8"
210
      result = RunCmd(["locale"])
211
      for line in result.output.splitlines():
212
        key, value = line.split("=", 1)
213
        # Ignore these variables, they're overridden by LC_ALL
214
        if key == "LANG" or key == "LANGUAGE":
215
          continue
216
        self.failIf(value and value != "C" and value != '"C"',
217
            "Variable %s is set to the invalid value '%s'" % (key, value))
218
    finally:
219
      os.environ = old_env
220

    
221
  def testDefaultCwd(self):
222
    """Test default working directory"""
223
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
224

    
225
  def testCwd(self):
226
    """Test default working directory"""
227
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
228
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
229
    cwd = os.getcwd()
230
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
231

    
232

    
233
class TestRemoveFile(unittest.TestCase):
234
  """Test case for the RemoveFile function"""
235

    
236
  def setUp(self):
237
    """Create a temp dir and file for each case"""
238
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
239
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
240
    os.close(fd)
241

    
242
  def tearDown(self):
243
    if os.path.exists(self.tmpfile):
244
      os.unlink(self.tmpfile)
245
    os.rmdir(self.tmpdir)
246

    
247

    
248
  def testIgnoreDirs(self):
249
    """Test that RemoveFile() ignores directories"""
250
    self.assertEqual(None, RemoveFile(self.tmpdir))
251

    
252

    
253
  def testIgnoreNotExisting(self):
254
    """Test that RemoveFile() ignores non-existing files"""
255
    RemoveFile(self.tmpfile)
256
    RemoveFile(self.tmpfile)
257

    
258

    
259
  def testRemoveFile(self):
260
    """Test that RemoveFile does remove a file"""
261
    RemoveFile(self.tmpfile)
262
    if os.path.exists(self.tmpfile):
263
      self.fail("File '%s' not removed" % self.tmpfile)
264

    
265

    
266
  def testRemoveSymlink(self):
267
    """Test that RemoveFile does remove symlinks"""
268
    symlink = self.tmpdir + "/symlink"
269
    os.symlink("no-such-file", symlink)
270
    RemoveFile(symlink)
271
    if os.path.exists(symlink):
272
      self.fail("File '%s' not removed" % symlink)
273
    os.symlink(self.tmpfile, symlink)
274
    RemoveFile(symlink)
275
    if os.path.exists(symlink):
276
      self.fail("File '%s' not removed" % symlink)
277

    
278

    
279
class TestRename(unittest.TestCase):
280
  """Test case for RenameFile"""
281

    
282
  def setUp(self):
283
    """Create a temporary directory"""
284
    self.tmpdir = tempfile.mkdtemp()
285
    self.tmpfile = os.path.join(self.tmpdir, "test1")
286

    
287
    # Touch the file
288
    open(self.tmpfile, "w").close()
289

    
290
  def tearDown(self):
291
    """Remove temporary directory"""
292
    shutil.rmtree(self.tmpdir)
293

    
294
  def testSimpleRename1(self):
295
    """Simple rename 1"""
296
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
297

    
298
  def testSimpleRename2(self):
299
    """Simple rename 2"""
300
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
301
                     mkdir=True)
302

    
303
  def testRenameMkdir(self):
304
    """Rename with mkdir"""
305
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
306
                     mkdir=True)
307

    
308

    
309
class TestCheckdict(unittest.TestCase):
310
  """Test case for the CheckDict function"""
311

    
312
  def testAdd(self):
313
    """Test that CheckDict adds a missing key with the correct value"""
314

    
315
    tgt = {'a':1}
316
    tmpl = {'b': 2}
317
    CheckDict(tgt, tmpl)
318
    if 'b' not in tgt or tgt['b'] != 2:
319
      self.fail("Failed to update dict")
320

    
321

    
322
  def testNoUpdate(self):
323
    """Test that CheckDict does not overwrite an existing key"""
324
    tgt = {'a':1, 'b': 3}
325
    tmpl = {'b': 2}
326
    CheckDict(tgt, tmpl)
327
    self.failUnlessEqual(tgt['b'], 3)
328

    
329

    
330
class TestMatchNameComponent(unittest.TestCase):
331
  """Test case for the MatchNameComponent function"""
332

    
333
  def testEmptyList(self):
334
    """Test that there is no match against an empty list"""
335

    
336
    self.failUnlessEqual(MatchNameComponent("", []), None)
337
    self.failUnlessEqual(MatchNameComponent("test", []), None)
338

    
339
  def testSingleMatch(self):
340
    """Test that a single match is performed correctly"""
341
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
342
    for key in "test2", "test2.example", "test2.example.com":
343
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
344

    
345
  def testMultipleMatches(self):
346
    """Test that a multiple match is returned as None"""
347
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
348
    for key in "test1", "test1.example":
349
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
350

    
351

    
352
class TestFormatUnit(unittest.TestCase):
353
  """Test case for the FormatUnit function"""
354

    
355
  def testMiB(self):
356
    self.assertEqual(FormatUnit(1, 'h'), '1M')
357
    self.assertEqual(FormatUnit(100, 'h'), '100M')
358
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
359

    
360
    self.assertEqual(FormatUnit(1, 'm'), '1')
361
    self.assertEqual(FormatUnit(100, 'm'), '100')
362
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
363

    
364
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
365
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
366
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
367
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
368

    
369
  def testGiB(self):
370
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
371
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
372
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
373
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
374

    
375
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
376
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
377
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
378
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
379

    
380
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
381
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
382
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
383

    
384
  def testTiB(self):
385
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
386
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
387
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
388

    
389
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
390
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
391
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
392

    
393
class TestParseUnit(unittest.TestCase):
394
  """Test case for the ParseUnit function"""
395

    
396
  SCALES = (('', 1),
397
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
398
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
399
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
400

    
401
  def testRounding(self):
402
    self.assertEqual(ParseUnit('0'), 0)
403
    self.assertEqual(ParseUnit('1'), 4)
404
    self.assertEqual(ParseUnit('2'), 4)
405
    self.assertEqual(ParseUnit('3'), 4)
406

    
407
    self.assertEqual(ParseUnit('124'), 124)
408
    self.assertEqual(ParseUnit('125'), 128)
409
    self.assertEqual(ParseUnit('126'), 128)
410
    self.assertEqual(ParseUnit('127'), 128)
411
    self.assertEqual(ParseUnit('128'), 128)
412
    self.assertEqual(ParseUnit('129'), 132)
413
    self.assertEqual(ParseUnit('130'), 132)
414

    
415
  def testFloating(self):
416
    self.assertEqual(ParseUnit('0'), 0)
417
    self.assertEqual(ParseUnit('0.5'), 4)
418
    self.assertEqual(ParseUnit('1.75'), 4)
419
    self.assertEqual(ParseUnit('1.99'), 4)
420
    self.assertEqual(ParseUnit('2.00'), 4)
421
    self.assertEqual(ParseUnit('2.01'), 4)
422
    self.assertEqual(ParseUnit('3.99'), 4)
423
    self.assertEqual(ParseUnit('4.00'), 4)
424
    self.assertEqual(ParseUnit('4.01'), 8)
425
    self.assertEqual(ParseUnit('1.5G'), 1536)
426
    self.assertEqual(ParseUnit('1.8G'), 1844)
427
    self.assertEqual(ParseUnit('8.28T'), 8682212)
428

    
429
  def testSuffixes(self):
430
    for sep in ('', ' ', '   ', "\t", "\t "):
431
      for suffix, scale in TestParseUnit.SCALES:
432
        for func in (lambda x: x, str.lower, str.upper):
433
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
434
                           1024 * scale)
435

    
436
  def testInvalidInput(self):
437
    for sep in ('-', '_', ',', 'a'):
438
      for suffix, _ in TestParseUnit.SCALES:
439
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
440

    
441
    for suffix, _ in TestParseUnit.SCALES:
442
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
443

    
444

    
445
class TestSshKeys(testutils.GanetiTestCase):
446
  """Test case for the AddAuthorizedKey function"""
447

    
448
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
449
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
450
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
451

    
452
  def setUp(self):
453
    testutils.GanetiTestCase.setUp(self)
454
    self.tmpname = self._CreateTempFile()
455
    handle = open(self.tmpname, 'w')
456
    try:
457
      handle.write("%s\n" % TestSshKeys.KEY_A)
458
      handle.write("%s\n" % TestSshKeys.KEY_B)
459
    finally:
460
      handle.close()
461

    
462
  def testAddingNewKey(self):
463
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
464

    
465
    self.assertFileContent(self.tmpname,
466
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
467
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
468
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
469
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
470

    
471
  def testAddingAlmostButNotCompletelyTheSameKey(self):
472
    AddAuthorizedKey(self.tmpname,
473
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
474

    
475
    self.assertFileContent(self.tmpname,
476
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
477
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
478
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
479
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
480

    
481
  def testAddingExistingKeyWithSomeMoreSpaces(self):
482
    AddAuthorizedKey(self.tmpname,
483
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
484

    
485
    self.assertFileContent(self.tmpname,
486
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
487
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
488
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
489

    
490
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
491
    RemoveAuthorizedKey(self.tmpname,
492
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
493

    
494
    self.assertFileContent(self.tmpname,
495
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
496
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
497

    
498
  def testRemovingNonExistingKey(self):
499
    RemoveAuthorizedKey(self.tmpname,
500
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
501

    
502
    self.assertFileContent(self.tmpname,
503
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
504
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
505
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
506

    
507

    
508
class TestEtcHosts(testutils.GanetiTestCase):
509
  """Test functions modifying /etc/hosts"""
510

    
511
  def setUp(self):
512
    testutils.GanetiTestCase.setUp(self)
513
    self.tmpname = self._CreateTempFile()
514
    handle = open(self.tmpname, 'w')
515
    try:
516
      handle.write('# This is a test file for /etc/hosts\n')
517
      handle.write('127.0.0.1\tlocalhost\n')
518
      handle.write('192.168.1.1 router gw\n')
519
    finally:
520
      handle.close()
521

    
522
  def testSettingNewIp(self):
523
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
524

    
525
    self.assertFileContent(self.tmpname,
526
      "# This is a test file for /etc/hosts\n"
527
      "127.0.0.1\tlocalhost\n"
528
      "192.168.1.1 router gw\n"
529
      "1.2.3.4\tmyhost.domain.tld myhost\n")
530
    self.assertFileMode(self.tmpname, 0644)
531

    
532
  def testSettingExistingIp(self):
533
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
534
                     ['myhost'])
535

    
536
    self.assertFileContent(self.tmpname,
537
      "# This is a test file for /etc/hosts\n"
538
      "127.0.0.1\tlocalhost\n"
539
      "192.168.1.1\tmyhost.domain.tld myhost\n")
540
    self.assertFileMode(self.tmpname, 0644)
541

    
542
  def testSettingDuplicateName(self):
543
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
544

    
545
    self.assertFileContent(self.tmpname,
546
      "# This is a test file for /etc/hosts\n"
547
      "127.0.0.1\tlocalhost\n"
548
      "192.168.1.1 router gw\n"
549
      "1.2.3.4\tmyhost\n")
550
    self.assertFileMode(self.tmpname, 0644)
551

    
552
  def testRemovingExistingHost(self):
553
    RemoveEtcHostsEntry(self.tmpname, 'router')
554

    
555
    self.assertFileContent(self.tmpname,
556
      "# This is a test file for /etc/hosts\n"
557
      "127.0.0.1\tlocalhost\n"
558
      "192.168.1.1 gw\n")
559
    self.assertFileMode(self.tmpname, 0644)
560

    
561
  def testRemovingSingleExistingHost(self):
562
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
563

    
564
    self.assertFileContent(self.tmpname,
565
      "# This is a test file for /etc/hosts\n"
566
      "192.168.1.1 router gw\n")
567
    self.assertFileMode(self.tmpname, 0644)
568

    
569
  def testRemovingNonExistingHost(self):
570
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
571

    
572
    self.assertFileContent(self.tmpname,
573
      "# This is a test file for /etc/hosts\n"
574
      "127.0.0.1\tlocalhost\n"
575
      "192.168.1.1 router gw\n")
576
    self.assertFileMode(self.tmpname, 0644)
577

    
578
  def testRemovingAlias(self):
579
    RemoveEtcHostsEntry(self.tmpname, 'gw')
580

    
581
    self.assertFileContent(self.tmpname,
582
      "# This is a test file for /etc/hosts\n"
583
      "127.0.0.1\tlocalhost\n"
584
      "192.168.1.1 router\n")
585
    self.assertFileMode(self.tmpname, 0644)
586

    
587

    
588
class TestShellQuoting(unittest.TestCase):
589
  """Test case for shell quoting functions"""
590

    
591
  def testShellQuote(self):
592
    self.assertEqual(ShellQuote('abc'), "abc")
593
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
594
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
595
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
596
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
597

    
598
  def testShellQuoteArgs(self):
599
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
600
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
601
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
602

    
603

    
604
class TestTcpPing(unittest.TestCase):
605
  """Testcase for TCP version of ping - against listen(2)ing port"""
606

    
607
  def setUp(self):
608
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
609
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
610
    self.listenerport = self.listener.getsockname()[1]
611
    self.listener.listen(1)
612

    
613
  def tearDown(self):
614
    self.listener.shutdown(socket.SHUT_RDWR)
615
    del self.listener
616
    del self.listenerport
617

    
618
  def testTcpPingToLocalHostAccept(self):
619
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
620
                         self.listenerport,
621
                         timeout=10,
622
                         live_port_needed=True,
623
                         source=constants.LOCALHOST_IP_ADDRESS,
624
                         ),
625
                 "failed to connect to test listener")
626

    
627
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
628
                         self.listenerport,
629
                         timeout=10,
630
                         live_port_needed=True,
631
                         ),
632
                 "failed to connect to test listener (no source)")
633

    
634

    
635
class TestTcpPingDeaf(unittest.TestCase):
636
  """Testcase for TCP version of ping - against non listen(2)ing port"""
637

    
638
  def setUp(self):
639
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
640
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
641
    self.deaflistenerport = self.deaflistener.getsockname()[1]
642

    
643
  def tearDown(self):
644
    del self.deaflistener
645
    del self.deaflistenerport
646

    
647
  def testTcpPingToLocalHostAcceptDeaf(self):
648
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
649
                        self.deaflistenerport,
650
                        timeout=constants.TCP_PING_TIMEOUT,
651
                        live_port_needed=True,
652
                        source=constants.LOCALHOST_IP_ADDRESS,
653
                        ), # need successful connect(2)
654
                "successfully connected to deaf listener")
655

    
656
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
657
                        self.deaflistenerport,
658
                        timeout=constants.TCP_PING_TIMEOUT,
659
                        live_port_needed=True,
660
                        ), # need successful connect(2)
661
                "successfully connected to deaf listener (no source addr)")
662

    
663
  def testTcpPingToLocalHostNoAccept(self):
664
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
665
                         self.deaflistenerport,
666
                         timeout=constants.TCP_PING_TIMEOUT,
667
                         live_port_needed=False,
668
                         source=constants.LOCALHOST_IP_ADDRESS,
669
                         ), # ECONNREFUSED is OK
670
                 "failed to ping alive host on deaf port")
671

    
672
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
673
                         self.deaflistenerport,
674
                         timeout=constants.TCP_PING_TIMEOUT,
675
                         live_port_needed=False,
676
                         ), # ECONNREFUSED is OK
677
                 "failed to ping alive host on deaf port (no source addr)")
678

    
679

    
680
class TestOwnIpAddress(unittest.TestCase):
681
  """Testcase for OwnIpAddress"""
682

    
683
  def testOwnLoopback(self):
684
    """check having the loopback ip"""
685
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
686
                    "Should own the loopback address")
687

    
688
  def testNowOwnAddress(self):
689
    """check that I don't own an address"""
690

    
691
    # network 192.0.2.0/24 is reserved for test/documentation as per
692
    # rfc 3330, so we *should* not have an address of this range... if
693
    # this fails, we should extend the test to multiple addresses
694
    DST_IP = "192.0.2.1"
695
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
696

    
697

    
698
class TestListVisibleFiles(unittest.TestCase):
699
  """Test case for ListVisibleFiles"""
700

    
701
  def setUp(self):
702
    self.path = tempfile.mkdtemp()
703

    
704
  def tearDown(self):
705
    shutil.rmtree(self.path)
706

    
707
  def _test(self, files, expected):
708
    # Sort a copy
709
    expected = expected[:]
710
    expected.sort()
711

    
712
    for name in files:
713
      f = open(os.path.join(self.path, name), 'w')
714
      try:
715
        f.write("Test\n")
716
      finally:
717
        f.close()
718

    
719
    found = ListVisibleFiles(self.path)
720
    found.sort()
721

    
722
    self.assertEqual(found, expected)
723

    
724
  def testAllVisible(self):
725
    files = ["a", "b", "c"]
726
    expected = files
727
    self._test(files, expected)
728

    
729
  def testNoneVisible(self):
730
    files = [".a", ".b", ".c"]
731
    expected = []
732
    self._test(files, expected)
733

    
734
  def testSomeVisible(self):
735
    files = ["a", "b", ".c"]
736
    expected = ["a", "b"]
737
    self._test(files, expected)
738

    
739

    
740
class TestNewUUID(unittest.TestCase):
741
  """Test case for NewUUID"""
742

    
743
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
744
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
745

    
746
  def runTest(self):
747
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
748

    
749

    
750
class TestUniqueSequence(unittest.TestCase):
751
  """Test case for UniqueSequence"""
752

    
753
  def _test(self, input, expected):
754
    self.assertEqual(utils.UniqueSequence(input), expected)
755

    
756
  def runTest(self):
757
    # Ordered input
758
    self._test([1, 2, 3], [1, 2, 3])
759
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
760
    self._test([1, 2, 2, 3], [1, 2, 3])
761
    self._test([1, 2, 3, 3], [1, 2, 3])
762

    
763
    # Unordered input
764
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
765
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
766

    
767
    # Strings
768
    self._test(["a", "a"], ["a"])
769
    self._test(["a", "b"], ["a", "b"])
770
    self._test(["a", "b", "a"], ["a", "b"])
771

    
772

    
773
class TestFirstFree(unittest.TestCase):
774
  """Test case for the FirstFree function"""
775

    
776
  def test(self):
777
    """Test FirstFree"""
778
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
779
    self.failUnlessEqual(FirstFree([]), None)
780
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
781
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
782
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
783

    
784

    
785
class TestTailFile(testutils.GanetiTestCase):
786
  """Test case for the TailFile function"""
787

    
788
  def testEmpty(self):
789
    fname = self._CreateTempFile()
790
    self.failUnlessEqual(TailFile(fname), [])
791
    self.failUnlessEqual(TailFile(fname, lines=25), [])
792

    
793
  def testAllLines(self):
794
    data = ["test %d" % i for i in range(30)]
795
    for i in range(30):
796
      fname = self._CreateTempFile()
797
      fd = open(fname, "w")
798
      fd.write("\n".join(data[:i]))
799
      if i > 0:
800
        fd.write("\n")
801
      fd.close()
802
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
803

    
804
  def testPartialLines(self):
805
    data = ["test %d" % i for i in range(30)]
806
    fname = self._CreateTempFile()
807
    fd = open(fname, "w")
808
    fd.write("\n".join(data))
809
    fd.write("\n")
810
    fd.close()
811
    for i in range(1, 30):
812
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
813

    
814
  def testBigFile(self):
815
    data = ["test %d" % i for i in range(30)]
816
    fname = self._CreateTempFile()
817
    fd = open(fname, "w")
818
    fd.write("X" * 1048576)
819
    fd.write("\n")
820
    fd.write("\n".join(data))
821
    fd.write("\n")
822
    fd.close()
823
    for i in range(1, 30):
824
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
825

    
826

    
827
class TestFileLock(unittest.TestCase):
828
  """Test case for the FileLock class"""
829

    
830
  def setUp(self):
831
    self.tmpfile = tempfile.NamedTemporaryFile()
832
    self.lock = utils.FileLock(self.tmpfile.name)
833

    
834
  def testSharedNonblocking(self):
835
    self.lock.Shared(blocking=False)
836
    self.lock.Close()
837

    
838
  def testExclusiveNonblocking(self):
839
    self.lock.Exclusive(blocking=False)
840
    self.lock.Close()
841

    
842
  def testUnlockNonblocking(self):
843
    self.lock.Unlock(blocking=False)
844
    self.lock.Close()
845

    
846
  def testSharedBlocking(self):
847
    self.lock.Shared(blocking=True)
848
    self.lock.Close()
849

    
850
  def testExclusiveBlocking(self):
851
    self.lock.Exclusive(blocking=True)
852
    self.lock.Close()
853

    
854
  def testUnlockBlocking(self):
855
    self.lock.Unlock(blocking=True)
856
    self.lock.Close()
857

    
858
  def testSharedExclusiveUnlock(self):
859
    self.lock.Shared(blocking=False)
860
    self.lock.Exclusive(blocking=False)
861
    self.lock.Unlock(blocking=False)
862
    self.lock.Close()
863

    
864
  def testExclusiveSharedUnlock(self):
865
    self.lock.Exclusive(blocking=False)
866
    self.lock.Shared(blocking=False)
867
    self.lock.Unlock(blocking=False)
868
    self.lock.Close()
869

    
870
  def testCloseShared(self):
871
    self.lock.Close()
872
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
873

    
874
  def testCloseExclusive(self):
875
    self.lock.Close()
876
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
877

    
878
  def testCloseUnlock(self):
879
    self.lock.Close()
880
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
881

    
882

    
883
class TestTimeFunctions(unittest.TestCase):
884
  """Test case for time functions"""
885

    
886
  def runTest(self):
887
    self.assertEqual(utils.SplitTime(1), (1, 0))
888
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
889
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
890
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
891
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
892
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
893
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
894
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
895

    
896
    self.assertRaises(AssertionError, utils.SplitTime, -1)
897

    
898
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
899
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
900
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
901

    
902
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
903
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
904

    
905
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
906
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
907
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
908
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
909
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
910

    
911

    
912
class FieldSetTestCase(unittest.TestCase):
913
  """Test case for FieldSets"""
914

    
915
  def testSimpleMatch(self):
916
    f = utils.FieldSet("a", "b", "c", "def")
917
    self.failUnless(f.Matches("a"))
918
    self.failIf(f.Matches("d"), "Substring matched")
919
    self.failIf(f.Matches("defghi"), "Prefix string matched")
920
    self.failIf(f.NonMatching(["b", "c"]))
921
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
922
    self.failUnless(f.NonMatching(["a", "d"]))
923

    
924
  def testRegexMatch(self):
925
    f = utils.FieldSet("a", "b([0-9]+)", "c")
926
    self.failUnless(f.Matches("b1"))
927
    self.failUnless(f.Matches("b99"))
928
    self.failIf(f.Matches("b/1"))
929
    self.failIf(f.NonMatching(["b12", "c"]))
930
    self.failUnless(f.NonMatching(["a", "1"]))
931

    
932
class TestForceDictType(unittest.TestCase):
933
  """Test case for ForceDictType"""
934

    
935
  def setUp(self):
936
    self.key_types = {
937
      'a': constants.VTYPE_INT,
938
      'b': constants.VTYPE_BOOL,
939
      'c': constants.VTYPE_STRING,
940
      'd': constants.VTYPE_SIZE,
941
      }
942

    
943
  def _fdt(self, dict, allowed_values=None):
944
    if allowed_values is None:
945
      ForceDictType(dict, self.key_types)
946
    else:
947
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
948

    
949
    return dict
950

    
951
  def testSimpleDict(self):
952
    self.assertEqual(self._fdt({}), {})
953
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
954
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
955
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
956
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
957
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
958
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
959
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
960
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
961
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
962
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
963
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
964

    
965
  def testErrors(self):
966
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
967
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
968
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
969
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
970

    
971

    
972
class TestIsAbsNormPath(unittest.TestCase):
973
  """Testing case for IsProcessAlive"""
974

    
975
  def _pathTestHelper(self, path, result):
976
    if result:
977
      self.assert_(IsNormAbsPath(path),
978
          "Path %s should be absolute and normal" % path)
979
    else:
980
      self.assert_(not IsNormAbsPath(path),
981
          "Path %s should not be absolute and normal" % path)
982

    
983
  def testBase(self):
984
    self._pathTestHelper('/etc', True)
985
    self._pathTestHelper('/srv', True)
986
    self._pathTestHelper('etc', False)
987
    self._pathTestHelper('/etc/../root', False)
988
    self._pathTestHelper('/etc/', False)
989

    
990
if __name__ == '__main__':
991
  unittest.main()