Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ a5728081

History | View | Annotate | Download (31.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36

    
37
import ganeti
38
import testutils
39
from ganeti import constants
40
from ganeti import utils
41
from ganeti import errors
42
from ganeti.utils import IsProcessAlive, RunCmd, \
43
     RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
44
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
45
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
46
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
47
     TailFile, ForceDictType
48

    
49
from ganeti.errors import LockError, UnitParseError, GenericError, \
50
     ProgrammerError
51

    
52

    
53
class TestIsProcessAlive(unittest.TestCase):
54
  """Testing case for IsProcessAlive"""
55

    
56
  def testExists(self):
57
    mypid = os.getpid()
58
    self.assert_(IsProcessAlive(mypid),
59
                 "can't find myself running")
60

    
61
  def testNotExisting(self):
62
    pid_non_existing = os.fork()
63
    if pid_non_existing == 0:
64
      os._exit(0)
65
    elif pid_non_existing < 0:
66
      raise SystemError("can't fork")
67
    os.waitpid(pid_non_existing, 0)
68
    self.assert_(not IsProcessAlive(pid_non_existing),
69
                 "nonexisting process detected")
70

    
71

    
72
class TestPidFileFunctions(unittest.TestCase):
73
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
74

    
75
  def setUp(self):
76
    self.dir = tempfile.mkdtemp()
77
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
78
    utils.DaemonPidFileName = self.f_dpn
79

    
80
  def testPidFileFunctions(self):
81
    pid_file = self.f_dpn('test')
82
    utils.WritePidFile('test')
83
    self.failUnless(os.path.exists(pid_file),
84
                    "PID file should have been created")
85
    read_pid = utils.ReadPidFile(pid_file)
86
    self.failUnlessEqual(read_pid, os.getpid())
87
    self.failUnless(utils.IsProcessAlive(read_pid))
88
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
89
    utils.RemovePidFile('test')
90
    self.failIf(os.path.exists(pid_file),
91
                "PID file should not exist anymore")
92
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
93
                         "ReadPidFile should return 0 for missing pid file")
94
    fh = open(pid_file, "w")
95
    fh.write("blah\n")
96
    fh.close()
97
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
98
                         "ReadPidFile should return 0 for invalid pid file")
99
    utils.RemovePidFile('test')
100
    self.failIf(os.path.exists(pid_file),
101
                "PID file should not exist anymore")
102

    
103
  def testKill(self):
104
    pid_file = self.f_dpn('child')
105
    r_fd, w_fd = os.pipe()
106
    new_pid = os.fork()
107
    if new_pid == 0: #child
108
      utils.WritePidFile('child')
109
      os.write(w_fd, 'a')
110
      signal.pause()
111
      os._exit(0)
112
      return
113
    # else we are in the parent
114
    # wait until the child has written the pid file
115
    os.read(r_fd, 1)
116
    read_pid = utils.ReadPidFile(pid_file)
117
    self.failUnlessEqual(read_pid, new_pid)
118
    self.failUnless(utils.IsProcessAlive(new_pid))
119
    utils.KillProcess(new_pid, waitpid=True)
120
    self.failIf(utils.IsProcessAlive(new_pid))
121
    utils.RemovePidFile('child')
122
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
123

    
124
  def tearDown(self):
125
    for name in os.listdir(self.dir):
126
      os.unlink(os.path.join(self.dir, name))
127
    os.rmdir(self.dir)
128

    
129

    
130
class TestRunCmd(testutils.GanetiTestCase):
131
  """Testing case for the RunCmd function"""
132

    
133
  def setUp(self):
134
    testutils.GanetiTestCase.setUp(self)
135
    self.magic = time.ctime() + " ganeti test"
136
    self.fname = self._CreateTempFile()
137

    
138
  def testOk(self):
139
    """Test successful exit code"""
140
    result = RunCmd("/bin/sh -c 'exit 0'")
141
    self.assertEqual(result.exit_code, 0)
142
    self.assertEqual(result.output, "")
143

    
144
  def testFail(self):
145
    """Test fail exit code"""
146
    result = RunCmd("/bin/sh -c 'exit 1'")
147
    self.assertEqual(result.exit_code, 1)
148
    self.assertEqual(result.output, "")
149

    
150
  def testStdout(self):
151
    """Test standard output"""
152
    cmd = 'echo -n "%s"' % self.magic
153
    result = RunCmd("/bin/sh -c '%s'" % cmd)
154
    self.assertEqual(result.stdout, self.magic)
155
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
156
    self.assertEqual(result.output, "")
157
    self.assertFileContent(self.fname, self.magic)
158

    
159
  def testStderr(self):
160
    """Test standard error"""
161
    cmd = 'echo -n "%s"' % self.magic
162
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
163
    self.assertEqual(result.stderr, self.magic)
164
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
165
    self.assertEqual(result.output, "")
166
    self.assertFileContent(self.fname, self.magic)
167

    
168
  def testCombined(self):
169
    """Test combined output"""
170
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
171
    expected = "A" + self.magic + "B" + self.magic
172
    result = RunCmd("/bin/sh -c '%s'" % cmd)
173
    self.assertEqual(result.output, expected)
174
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
175
    self.assertEqual(result.output, "")
176
    self.assertFileContent(self.fname, expected)
177

    
178
  def testSignal(self):
179
    """Test signal"""
180
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
181
    self.assertEqual(result.signal, 15)
182
    self.assertEqual(result.output, "")
183

    
184
  def testListRun(self):
185
    """Test list runs"""
186
    result = RunCmd(["true"])
187
    self.assertEqual(result.signal, None)
188
    self.assertEqual(result.exit_code, 0)
189
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
190
    self.assertEqual(result.signal, None)
191
    self.assertEqual(result.exit_code, 1)
192
    result = RunCmd(["echo", "-n", self.magic])
193
    self.assertEqual(result.signal, None)
194
    self.assertEqual(result.exit_code, 0)
195
    self.assertEqual(result.stdout, self.magic)
196

    
197
  def testFileEmptyOutput(self):
198
    """Test file output"""
199
    result = RunCmd(["true"], output=self.fname)
200
    self.assertEqual(result.signal, None)
201
    self.assertEqual(result.exit_code, 0)
202
    self.assertFileContent(self.fname, "")
203

    
204
  def testLang(self):
205
    """Test locale environment"""
206
    old_env = os.environ.copy()
207
    try:
208
      os.environ["LANG"] = "en_US.UTF-8"
209
      os.environ["LC_ALL"] = "en_US.UTF-8"
210
      result = RunCmd(["locale"])
211
      for line in result.output.splitlines():
212
        key, value = line.split("=", 1)
213
        # Ignore these variables, they're overridden by LC_ALL
214
        if key == "LANG" or key == "LANGUAGE":
215
          continue
216
        self.failIf(value and value != "C" and value != '"C"',
217
            "Variable %s is set to the invalid value '%s'" % (key, value))
218
    finally:
219
      os.environ = old_env
220

    
221
  def testDefaultCwd(self):
222
    """Test default working directory"""
223
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
224

    
225
  def testCwd(self):
226
    """Test default working directory"""
227
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
228
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
229
    cwd = os.getcwd()
230
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
231

    
232

    
233
class TestRemoveFile(unittest.TestCase):
234
  """Test case for the RemoveFile function"""
235

    
236
  def setUp(self):
237
    """Create a temp dir and file for each case"""
238
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
239
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
240
    os.close(fd)
241

    
242
  def tearDown(self):
243
    if os.path.exists(self.tmpfile):
244
      os.unlink(self.tmpfile)
245
    os.rmdir(self.tmpdir)
246

    
247

    
248
  def testIgnoreDirs(self):
249
    """Test that RemoveFile() ignores directories"""
250
    self.assertEqual(None, RemoveFile(self.tmpdir))
251

    
252

    
253
  def testIgnoreNotExisting(self):
254
    """Test that RemoveFile() ignores non-existing files"""
255
    RemoveFile(self.tmpfile)
256
    RemoveFile(self.tmpfile)
257

    
258

    
259
  def testRemoveFile(self):
260
    """Test that RemoveFile does remove a file"""
261
    RemoveFile(self.tmpfile)
262
    if os.path.exists(self.tmpfile):
263
      self.fail("File '%s' not removed" % self.tmpfile)
264

    
265

    
266
  def testRemoveSymlink(self):
267
    """Test that RemoveFile does remove symlinks"""
268
    symlink = self.tmpdir + "/symlink"
269
    os.symlink("no-such-file", symlink)
270
    RemoveFile(symlink)
271
    if os.path.exists(symlink):
272
      self.fail("File '%s' not removed" % symlink)
273
    os.symlink(self.tmpfile, symlink)
274
    RemoveFile(symlink)
275
    if os.path.exists(symlink):
276
      self.fail("File '%s' not removed" % symlink)
277

    
278

    
279
class TestRename(unittest.TestCase):
280
  """Test case for RenameFile"""
281

    
282
  def setUp(self):
283
    """Create a temporary directory"""
284
    self.tmpdir = tempfile.mkdtemp()
285
    self.tmpfile = os.path.join(self.tmpdir, "test1")
286

    
287
    # Touch the file
288
    open(self.tmpfile, "w").close()
289

    
290
  def tearDown(self):
291
    """Remove temporary directory"""
292
    shutil.rmtree(self.tmpdir)
293

    
294
  def testSimpleRename1(self):
295
    """Simple rename 1"""
296
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
297

    
298
  def testSimpleRename2(self):
299
    """Simple rename 2"""
300
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
301
                     mkdir=True)
302

    
303
  def testRenameMkdir(self):
304
    """Rename with mkdir"""
305
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
306
                     mkdir=True)
307

    
308

    
309
class TestCheckdict(unittest.TestCase):
310
  """Test case for the CheckDict function"""
311

    
312
  def testAdd(self):
313
    """Test that CheckDict adds a missing key with the correct value"""
314

    
315
    tgt = {'a':1}
316
    tmpl = {'b': 2}
317
    CheckDict(tgt, tmpl)
318
    if 'b' not in tgt or tgt['b'] != 2:
319
      self.fail("Failed to update dict")
320

    
321

    
322
  def testNoUpdate(self):
323
    """Test that CheckDict does not overwrite an existing key"""
324
    tgt = {'a':1, 'b': 3}
325
    tmpl = {'b': 2}
326
    CheckDict(tgt, tmpl)
327
    self.failUnlessEqual(tgt['b'], 3)
328

    
329

    
330
class TestMatchNameComponent(unittest.TestCase):
331
  """Test case for the MatchNameComponent function"""
332

    
333
  def testEmptyList(self):
334
    """Test that there is no match against an empty list"""
335

    
336
    self.failUnlessEqual(MatchNameComponent("", []), None)
337
    self.failUnlessEqual(MatchNameComponent("test", []), None)
338

    
339
  def testSingleMatch(self):
340
    """Test that a single match is performed correctly"""
341
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
342
    for key in "test2", "test2.example", "test2.example.com":
343
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
344

    
345
  def testMultipleMatches(self):
346
    """Test that a multiple match is returned as None"""
347
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
348
    for key in "test1", "test1.example":
349
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
350

    
351

    
352
class TestFormatUnit(unittest.TestCase):
353
  """Test case for the FormatUnit function"""
354

    
355
  def testMiB(self):
356
    self.assertEqual(FormatUnit(1, 'h'), '1M')
357
    self.assertEqual(FormatUnit(100, 'h'), '100M')
358
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
359

    
360
    self.assertEqual(FormatUnit(1, 'm'), '1')
361
    self.assertEqual(FormatUnit(100, 'm'), '100')
362
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
363

    
364
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
365
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
366
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
367
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
368

    
369
  def testGiB(self):
370
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
371
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
372
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
373
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
374

    
375
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
376
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
377
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
378
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
379

    
380
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
381
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
382
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
383

    
384
  def testTiB(self):
385
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
386
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
387
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
388

    
389
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
390
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
391
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
392

    
393
class TestParseUnit(unittest.TestCase):
394
  """Test case for the ParseUnit function"""
395

    
396
  SCALES = (('', 1),
397
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
398
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
399
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
400

    
401
  def testRounding(self):
402
    self.assertEqual(ParseUnit('0'), 0)
403
    self.assertEqual(ParseUnit('1'), 4)
404
    self.assertEqual(ParseUnit('2'), 4)
405
    self.assertEqual(ParseUnit('3'), 4)
406

    
407
    self.assertEqual(ParseUnit('124'), 124)
408
    self.assertEqual(ParseUnit('125'), 128)
409
    self.assertEqual(ParseUnit('126'), 128)
410
    self.assertEqual(ParseUnit('127'), 128)
411
    self.assertEqual(ParseUnit('128'), 128)
412
    self.assertEqual(ParseUnit('129'), 132)
413
    self.assertEqual(ParseUnit('130'), 132)
414

    
415
  def testFloating(self):
416
    self.assertEqual(ParseUnit('0'), 0)
417
    self.assertEqual(ParseUnit('0.5'), 4)
418
    self.assertEqual(ParseUnit('1.75'), 4)
419
    self.assertEqual(ParseUnit('1.99'), 4)
420
    self.assertEqual(ParseUnit('2.00'), 4)
421
    self.assertEqual(ParseUnit('2.01'), 4)
422
    self.assertEqual(ParseUnit('3.99'), 4)
423
    self.assertEqual(ParseUnit('4.00'), 4)
424
    self.assertEqual(ParseUnit('4.01'), 8)
425
    self.assertEqual(ParseUnit('1.5G'), 1536)
426
    self.assertEqual(ParseUnit('1.8G'), 1844)
427
    self.assertEqual(ParseUnit('8.28T'), 8682212)
428

    
429
  def testSuffixes(self):
430
    for sep in ('', ' ', '   ', "\t", "\t "):
431
      for suffix, scale in TestParseUnit.SCALES:
432
        for func in (lambda x: x, str.lower, str.upper):
433
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
434
                           1024 * scale)
435

    
436
  def testInvalidInput(self):
437
    for sep in ('-', '_', ',', 'a'):
438
      for suffix, _ in TestParseUnit.SCALES:
439
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
440

    
441
    for suffix, _ in TestParseUnit.SCALES:
442
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
443

    
444

    
445
class TestSshKeys(testutils.GanetiTestCase):
446
  """Test case for the AddAuthorizedKey function"""
447

    
448
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
449
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
450
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
451

    
452
  def setUp(self):
453
    testutils.GanetiTestCase.setUp(self)
454
    self.tmpname = self._CreateTempFile()
455
    handle = open(self.tmpname, 'w')
456
    try:
457
      handle.write("%s\n" % TestSshKeys.KEY_A)
458
      handle.write("%s\n" % TestSshKeys.KEY_B)
459
    finally:
460
      handle.close()
461

    
462
  def testAddingNewKey(self):
463
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
464

    
465
    self.assertFileContent(self.tmpname,
466
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
467
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
468
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
469
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
470

    
471
  def testAddingAlmostButNotCompletelyTheSameKey(self):
472
    AddAuthorizedKey(self.tmpname,
473
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
474

    
475
    self.assertFileContent(self.tmpname,
476
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
477
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
478
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
479
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
480

    
481
  def testAddingExistingKeyWithSomeMoreSpaces(self):
482
    AddAuthorizedKey(self.tmpname,
483
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
484

    
485
    self.assertFileContent(self.tmpname,
486
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
487
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
488
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
489

    
490
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
491
    RemoveAuthorizedKey(self.tmpname,
492
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
493

    
494
    self.assertFileContent(self.tmpname,
495
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
496
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
497

    
498
  def testRemovingNonExistingKey(self):
499
    RemoveAuthorizedKey(self.tmpname,
500
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
501

    
502
    self.assertFileContent(self.tmpname,
503
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
504
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
505
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
506

    
507

    
508
class TestEtcHosts(testutils.GanetiTestCase):
509
  """Test functions modifying /etc/hosts"""
510

    
511
  def setUp(self):
512
    testutils.GanetiTestCase.setUp(self)
513
    self.tmpname = self._CreateTempFile()
514
    handle = open(self.tmpname, 'w')
515
    try:
516
      handle.write('# This is a test file for /etc/hosts\n')
517
      handle.write('127.0.0.1\tlocalhost\n')
518
      handle.write('192.168.1.1 router gw\n')
519
    finally:
520
      handle.close()
521

    
522
  def testSettingNewIp(self):
523
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
524

    
525
    self.assertFileContent(self.tmpname,
526
      "# This is a test file for /etc/hosts\n"
527
      "127.0.0.1\tlocalhost\n"
528
      "192.168.1.1 router gw\n"
529
      "1.2.3.4\tmyhost.domain.tld myhost\n")
530

    
531
  def testSettingExistingIp(self):
532
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
533
                     ['myhost'])
534

    
535
    self.assertFileContent(self.tmpname,
536
      "# This is a test file for /etc/hosts\n"
537
      "127.0.0.1\tlocalhost\n"
538
      "192.168.1.1\tmyhost.domain.tld myhost\n")
539

    
540
  def testSettingDuplicateName(self):
541
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
542

    
543
    self.assertFileContent(self.tmpname,
544
      "# This is a test file for /etc/hosts\n"
545
      "127.0.0.1\tlocalhost\n"
546
      "192.168.1.1 router gw\n"
547
      "1.2.3.4\tmyhost\n")
548

    
549
  def testRemovingExistingHost(self):
550
    RemoveEtcHostsEntry(self.tmpname, 'router')
551

    
552
    self.assertFileContent(self.tmpname,
553
      "# This is a test file for /etc/hosts\n"
554
      "127.0.0.1\tlocalhost\n"
555
      "192.168.1.1 gw\n")
556

    
557
  def testRemovingSingleExistingHost(self):
558
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
559

    
560
    self.assertFileContent(self.tmpname,
561
      "# This is a test file for /etc/hosts\n"
562
      "192.168.1.1 router gw\n")
563

    
564
  def testRemovingNonExistingHost(self):
565
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
566

    
567
    self.assertFileContent(self.tmpname,
568
      "# This is a test file for /etc/hosts\n"
569
      "127.0.0.1\tlocalhost\n"
570
      "192.168.1.1 router gw\n")
571

    
572
  def testRemovingAlias(self):
573
    RemoveEtcHostsEntry(self.tmpname, 'gw')
574

    
575
    self.assertFileContent(self.tmpname,
576
      "# This is a test file for /etc/hosts\n"
577
      "127.0.0.1\tlocalhost\n"
578
      "192.168.1.1 router\n")
579

    
580

    
581
class TestShellQuoting(unittest.TestCase):
582
  """Test case for shell quoting functions"""
583

    
584
  def testShellQuote(self):
585
    self.assertEqual(ShellQuote('abc'), "abc")
586
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
587
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
588
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
589
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
590

    
591
  def testShellQuoteArgs(self):
592
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
593
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
594
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
595

    
596

    
597
class TestTcpPing(unittest.TestCase):
598
  """Testcase for TCP version of ping - against listen(2)ing port"""
599

    
600
  def setUp(self):
601
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
602
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
603
    self.listenerport = self.listener.getsockname()[1]
604
    self.listener.listen(1)
605

    
606
  def tearDown(self):
607
    self.listener.shutdown(socket.SHUT_RDWR)
608
    del self.listener
609
    del self.listenerport
610

    
611
  def testTcpPingToLocalHostAccept(self):
612
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
613
                         self.listenerport,
614
                         timeout=10,
615
                         live_port_needed=True,
616
                         source=constants.LOCALHOST_IP_ADDRESS,
617
                         ),
618
                 "failed to connect to test listener")
619

    
620
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
621
                         self.listenerport,
622
                         timeout=10,
623
                         live_port_needed=True,
624
                         ),
625
                 "failed to connect to test listener (no source)")
626

    
627

    
628
class TestTcpPingDeaf(unittest.TestCase):
629
  """Testcase for TCP version of ping - against non listen(2)ing port"""
630

    
631
  def setUp(self):
632
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
633
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
634
    self.deaflistenerport = self.deaflistener.getsockname()[1]
635

    
636
  def tearDown(self):
637
    del self.deaflistener
638
    del self.deaflistenerport
639

    
640
  def testTcpPingToLocalHostAcceptDeaf(self):
641
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
642
                        self.deaflistenerport,
643
                        timeout=constants.TCP_PING_TIMEOUT,
644
                        live_port_needed=True,
645
                        source=constants.LOCALHOST_IP_ADDRESS,
646
                        ), # need successful connect(2)
647
                "successfully connected to deaf listener")
648

    
649
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
650
                        self.deaflistenerport,
651
                        timeout=constants.TCP_PING_TIMEOUT,
652
                        live_port_needed=True,
653
                        ), # need successful connect(2)
654
                "successfully connected to deaf listener (no source addr)")
655

    
656
  def testTcpPingToLocalHostNoAccept(self):
657
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
658
                         self.deaflistenerport,
659
                         timeout=constants.TCP_PING_TIMEOUT,
660
                         live_port_needed=False,
661
                         source=constants.LOCALHOST_IP_ADDRESS,
662
                         ), # ECONNREFUSED is OK
663
                 "failed to ping alive host on deaf port")
664

    
665
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
666
                         self.deaflistenerport,
667
                         timeout=constants.TCP_PING_TIMEOUT,
668
                         live_port_needed=False,
669
                         ), # ECONNREFUSED is OK
670
                 "failed to ping alive host on deaf port (no source addr)")
671

    
672

    
673
class TestOwnIpAddress(unittest.TestCase):
674
  """Testcase for OwnIpAddress"""
675

    
676
  def testOwnLoopback(self):
677
    """check having the loopback ip"""
678
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
679
                    "Should own the loopback address")
680

    
681
  def testNowOwnAddress(self):
682
    """check that I don't own an address"""
683

    
684
    # network 192.0.2.0/24 is reserved for test/documentation as per
685
    # rfc 3330, so we *should* not have an address of this range... if
686
    # this fails, we should extend the test to multiple addresses
687
    DST_IP = "192.0.2.1"
688
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
689

    
690

    
691
class TestListVisibleFiles(unittest.TestCase):
692
  """Test case for ListVisibleFiles"""
693

    
694
  def setUp(self):
695
    self.path = tempfile.mkdtemp()
696

    
697
  def tearDown(self):
698
    shutil.rmtree(self.path)
699

    
700
  def _test(self, files, expected):
701
    # Sort a copy
702
    expected = expected[:]
703
    expected.sort()
704

    
705
    for name in files:
706
      f = open(os.path.join(self.path, name), 'w')
707
      try:
708
        f.write("Test\n")
709
      finally:
710
        f.close()
711

    
712
    found = ListVisibleFiles(self.path)
713
    found.sort()
714

    
715
    self.assertEqual(found, expected)
716

    
717
  def testAllVisible(self):
718
    files = ["a", "b", "c"]
719
    expected = files
720
    self._test(files, expected)
721

    
722
  def testNoneVisible(self):
723
    files = [".a", ".b", ".c"]
724
    expected = []
725
    self._test(files, expected)
726

    
727
  def testSomeVisible(self):
728
    files = ["a", "b", ".c"]
729
    expected = ["a", "b"]
730
    self._test(files, expected)
731

    
732

    
733
class TestNewUUID(unittest.TestCase):
734
  """Test case for NewUUID"""
735

    
736
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
737
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
738

    
739
  def runTest(self):
740
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
741

    
742

    
743
class TestUniqueSequence(unittest.TestCase):
744
  """Test case for UniqueSequence"""
745

    
746
  def _test(self, input, expected):
747
    self.assertEqual(utils.UniqueSequence(input), expected)
748

    
749
  def runTest(self):
750
    # Ordered input
751
    self._test([1, 2, 3], [1, 2, 3])
752
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
753
    self._test([1, 2, 2, 3], [1, 2, 3])
754
    self._test([1, 2, 3, 3], [1, 2, 3])
755

    
756
    # Unordered input
757
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
758
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
759

    
760
    # Strings
761
    self._test(["a", "a"], ["a"])
762
    self._test(["a", "b"], ["a", "b"])
763
    self._test(["a", "b", "a"], ["a", "b"])
764

    
765

    
766
class TestFirstFree(unittest.TestCase):
767
  """Test case for the FirstFree function"""
768

    
769
  def test(self):
770
    """Test FirstFree"""
771
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
772
    self.failUnlessEqual(FirstFree([]), None)
773
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
774
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
775
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
776

    
777

    
778
class TestTailFile(testutils.GanetiTestCase):
779
  """Test case for the TailFile function"""
780

    
781
  def testEmpty(self):
782
    fname = self._CreateTempFile()
783
    self.failUnlessEqual(TailFile(fname), [])
784
    self.failUnlessEqual(TailFile(fname, lines=25), [])
785

    
786
  def testAllLines(self):
787
    data = ["test %d" % i for i in range(30)]
788
    for i in range(30):
789
      fname = self._CreateTempFile()
790
      fd = open(fname, "w")
791
      fd.write("\n".join(data[:i]))
792
      if i > 0:
793
        fd.write("\n")
794
      fd.close()
795
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
796

    
797
  def testPartialLines(self):
798
    data = ["test %d" % i for i in range(30)]
799
    fname = self._CreateTempFile()
800
    fd = open(fname, "w")
801
    fd.write("\n".join(data))
802
    fd.write("\n")
803
    fd.close()
804
    for i in range(1, 30):
805
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
806

    
807
  def testBigFile(self):
808
    data = ["test %d" % i for i in range(30)]
809
    fname = self._CreateTempFile()
810
    fd = open(fname, "w")
811
    fd.write("X" * 1048576)
812
    fd.write("\n")
813
    fd.write("\n".join(data))
814
    fd.write("\n")
815
    fd.close()
816
    for i in range(1, 30):
817
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
818

    
819

    
820
class TestFileLock(unittest.TestCase):
821
  """Test case for the FileLock class"""
822

    
823
  def setUp(self):
824
    self.tmpfile = tempfile.NamedTemporaryFile()
825
    self.lock = utils.FileLock(self.tmpfile.name)
826

    
827
  def testSharedNonblocking(self):
828
    self.lock.Shared(blocking=False)
829
    self.lock.Close()
830

    
831
  def testExclusiveNonblocking(self):
832
    self.lock.Exclusive(blocking=False)
833
    self.lock.Close()
834

    
835
  def testUnlockNonblocking(self):
836
    self.lock.Unlock(blocking=False)
837
    self.lock.Close()
838

    
839
  def testSharedBlocking(self):
840
    self.lock.Shared(blocking=True)
841
    self.lock.Close()
842

    
843
  def testExclusiveBlocking(self):
844
    self.lock.Exclusive(blocking=True)
845
    self.lock.Close()
846

    
847
  def testUnlockBlocking(self):
848
    self.lock.Unlock(blocking=True)
849
    self.lock.Close()
850

    
851
  def testSharedExclusiveUnlock(self):
852
    self.lock.Shared(blocking=False)
853
    self.lock.Exclusive(blocking=False)
854
    self.lock.Unlock(blocking=False)
855
    self.lock.Close()
856

    
857
  def testExclusiveSharedUnlock(self):
858
    self.lock.Exclusive(blocking=False)
859
    self.lock.Shared(blocking=False)
860
    self.lock.Unlock(blocking=False)
861
    self.lock.Close()
862

    
863
  def testCloseShared(self):
864
    self.lock.Close()
865
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
866

    
867
  def testCloseExclusive(self):
868
    self.lock.Close()
869
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
870

    
871
  def testCloseUnlock(self):
872
    self.lock.Close()
873
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
874

    
875

    
876
class TestTimeFunctions(unittest.TestCase):
877
  """Test case for time functions"""
878

    
879
  def runTest(self):
880
    self.assertEqual(utils.SplitTime(1), (1, 0))
881
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
882
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
883
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
884
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
885
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
886
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
887
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
888

    
889
    self.assertRaises(AssertionError, utils.SplitTime, -1)
890

    
891
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
892
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
893
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
894

    
895
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
896
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
897

    
898
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
899
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
900
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
901
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
902
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
903

    
904

    
905
class FieldSetTestCase(unittest.TestCase):
906
  """Test case for FieldSets"""
907

    
908
  def testSimpleMatch(self):
909
    f = utils.FieldSet("a", "b", "c", "def")
910
    self.failUnless(f.Matches("a"))
911
    self.failIf(f.Matches("d"), "Substring matched")
912
    self.failIf(f.Matches("defghi"), "Prefix string matched")
913
    self.failIf(f.NonMatching(["b", "c"]))
914
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
915
    self.failUnless(f.NonMatching(["a", "d"]))
916

    
917
  def testRegexMatch(self):
918
    f = utils.FieldSet("a", "b([0-9]+)", "c")
919
    self.failUnless(f.Matches("b1"))
920
    self.failUnless(f.Matches("b99"))
921
    self.failIf(f.Matches("b/1"))
922
    self.failIf(f.NonMatching(["b12", "c"]))
923
    self.failUnless(f.NonMatching(["a", "1"]))
924

    
925
class TestForceDictType(unittest.TestCase):
926
  """Test case for ForceDictType"""
927

    
928
  def setUp(self):
929
    self.key_types = {
930
      'a': constants.VTYPE_INT,
931
      'b': constants.VTYPE_BOOL,
932
      'c': constants.VTYPE_STRING,
933
      'd': constants.VTYPE_SIZE,
934
      }
935

    
936
  def _fdt(self, dict, allowed_values=None):
937
    if allowed_values is None:
938
      ForceDictType(dict, self.key_types)
939
    else:
940
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
941

    
942
    return dict
943

    
944
  def testSimpleDict(self):
945
    self.assertEqual(self._fdt({}), {})
946
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
947
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
948
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
949
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
950
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
951
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
952
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
953
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
954
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
955
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
956
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
957

    
958
  def testErrors(self):
959
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
960
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
961
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
962
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
963

    
964

    
965
if __name__ == '__main__':
966
  unittest.main()