Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils_unittest.py @ 25231ec5

History | View | Annotate | Download (35 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the utils module"""
23

    
24
import unittest
25
import os
26
import time
27
import tempfile
28
import os.path
29
import os
30
import md5
31
import signal
32
import socket
33
import shutil
34
import re
35
import select
36
import string
37

    
38
import ganeti
39
import testutils
40
from ganeti import constants
41
from ganeti import utils
42
from ganeti import errors
43
from ganeti.utils import IsProcessAlive, RunCmd, \
44
     RemoveFile, MatchNameComponent, FormatUnit, \
45
     ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
46
     ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
47
     SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
48
     TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime
49

    
50
from ganeti.errors import LockError, UnitParseError, GenericError, \
51
     ProgrammerError
52

    
53

    
54
class TestIsProcessAlive(unittest.TestCase):
55
  """Testing case for IsProcessAlive"""
56

    
57
  def testExists(self):
58
    mypid = os.getpid()
59
    self.assert_(IsProcessAlive(mypid),
60
                 "can't find myself running")
61

    
62
  def testNotExisting(self):
63
    pid_non_existing = os.fork()
64
    if pid_non_existing == 0:
65
      os._exit(0)
66
    elif pid_non_existing < 0:
67
      raise SystemError("can't fork")
68
    os.waitpid(pid_non_existing, 0)
69
    self.assert_(not IsProcessAlive(pid_non_existing),
70
                 "nonexisting process detected")
71

    
72

    
73
class TestPidFileFunctions(unittest.TestCase):
74
  """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
75

    
76
  def setUp(self):
77
    self.dir = tempfile.mkdtemp()
78
    self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
79
    utils.DaemonPidFileName = self.f_dpn
80

    
81
  def testPidFileFunctions(self):
82
    pid_file = self.f_dpn('test')
83
    utils.WritePidFile('test')
84
    self.failUnless(os.path.exists(pid_file),
85
                    "PID file should have been created")
86
    read_pid = utils.ReadPidFile(pid_file)
87
    self.failUnlessEqual(read_pid, os.getpid())
88
    self.failUnless(utils.IsProcessAlive(read_pid))
89
    self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
90
    utils.RemovePidFile('test')
91
    self.failIf(os.path.exists(pid_file),
92
                "PID file should not exist anymore")
93
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
94
                         "ReadPidFile should return 0 for missing pid file")
95
    fh = open(pid_file, "w")
96
    fh.write("blah\n")
97
    fh.close()
98
    self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
99
                         "ReadPidFile should return 0 for invalid pid file")
100
    utils.RemovePidFile('test')
101
    self.failIf(os.path.exists(pid_file),
102
                "PID file should not exist anymore")
103

    
104
  def testKill(self):
105
    pid_file = self.f_dpn('child')
106
    r_fd, w_fd = os.pipe()
107
    new_pid = os.fork()
108
    if new_pid == 0: #child
109
      utils.WritePidFile('child')
110
      os.write(w_fd, 'a')
111
      signal.pause()
112
      os._exit(0)
113
      return
114
    # else we are in the parent
115
    # wait until the child has written the pid file
116
    os.read(r_fd, 1)
117
    read_pid = utils.ReadPidFile(pid_file)
118
    self.failUnlessEqual(read_pid, new_pid)
119
    self.failUnless(utils.IsProcessAlive(new_pid))
120
    utils.KillProcess(new_pid, waitpid=True)
121
    self.failIf(utils.IsProcessAlive(new_pid))
122
    utils.RemovePidFile('child')
123
    self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
124

    
125
  def tearDown(self):
126
    for name in os.listdir(self.dir):
127
      os.unlink(os.path.join(self.dir, name))
128
    os.rmdir(self.dir)
129

    
130

    
131
class TestRunCmd(testutils.GanetiTestCase):
132
  """Testing case for the RunCmd function"""
133

    
134
  def setUp(self):
135
    testutils.GanetiTestCase.setUp(self)
136
    self.magic = time.ctime() + " ganeti test"
137
    self.fname = self._CreateTempFile()
138

    
139
  def testOk(self):
140
    """Test successful exit code"""
141
    result = RunCmd("/bin/sh -c 'exit 0'")
142
    self.assertEqual(result.exit_code, 0)
143
    self.assertEqual(result.output, "")
144

    
145
  def testFail(self):
146
    """Test fail exit code"""
147
    result = RunCmd("/bin/sh -c 'exit 1'")
148
    self.assertEqual(result.exit_code, 1)
149
    self.assertEqual(result.output, "")
150

    
151
  def testStdout(self):
152
    """Test standard output"""
153
    cmd = 'echo -n "%s"' % self.magic
154
    result = RunCmd("/bin/sh -c '%s'" % cmd)
155
    self.assertEqual(result.stdout, self.magic)
156
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157
    self.assertEqual(result.output, "")
158
    self.assertFileContent(self.fname, self.magic)
159

    
160
  def testStderr(self):
161
    """Test standard error"""
162
    cmd = 'echo -n "%s"' % self.magic
163
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164
    self.assertEqual(result.stderr, self.magic)
165
    result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166
    self.assertEqual(result.output, "")
167
    self.assertFileContent(self.fname, self.magic)
168

    
169
  def testCombined(self):
170
    """Test combined output"""
171
    cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172
    expected = "A" + self.magic + "B" + self.magic
173
    result = RunCmd("/bin/sh -c '%s'" % cmd)
174
    self.assertEqual(result.output, expected)
175
    result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176
    self.assertEqual(result.output, "")
177
    self.assertFileContent(self.fname, expected)
178

    
179
  def testSignal(self):
180
    """Test signal"""
181
    result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182
    self.assertEqual(result.signal, 15)
183
    self.assertEqual(result.output, "")
184

    
185
  def testListRun(self):
186
    """Test list runs"""
187
    result = RunCmd(["true"])
188
    self.assertEqual(result.signal, None)
189
    self.assertEqual(result.exit_code, 0)
190
    result = RunCmd(["/bin/sh", "-c", "exit 1"])
191
    self.assertEqual(result.signal, None)
192
    self.assertEqual(result.exit_code, 1)
193
    result = RunCmd(["echo", "-n", self.magic])
194
    self.assertEqual(result.signal, None)
195
    self.assertEqual(result.exit_code, 0)
196
    self.assertEqual(result.stdout, self.magic)
197

    
198
  def testFileEmptyOutput(self):
199
    """Test file output"""
200
    result = RunCmd(["true"], output=self.fname)
201
    self.assertEqual(result.signal, None)
202
    self.assertEqual(result.exit_code, 0)
203
    self.assertFileContent(self.fname, "")
204

    
205
  def testLang(self):
206
    """Test locale environment"""
207
    old_env = os.environ.copy()
208
    try:
209
      os.environ["LANG"] = "en_US.UTF-8"
210
      os.environ["LC_ALL"] = "en_US.UTF-8"
211
      result = RunCmd(["locale"])
212
      for line in result.output.splitlines():
213
        key, value = line.split("=", 1)
214
        # Ignore these variables, they're overridden by LC_ALL
215
        if key == "LANG" or key == "LANGUAGE":
216
          continue
217
        self.failIf(value and value != "C" and value != '"C"',
218
            "Variable %s is set to the invalid value '%s'" % (key, value))
219
    finally:
220
      os.environ = old_env
221

    
222
  def testDefaultCwd(self):
223
    """Test default working directory"""
224
    self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225

    
226
  def testCwd(self):
227
    """Test default working directory"""
228
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229
    self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230
    cwd = os.getcwd()
231
    self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232

    
233

    
234
class TestRemoveFile(unittest.TestCase):
235
  """Test case for the RemoveFile function"""
236

    
237
  def setUp(self):
238
    """Create a temp dir and file for each case"""
239
    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240
    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241
    os.close(fd)
242

    
243
  def tearDown(self):
244
    if os.path.exists(self.tmpfile):
245
      os.unlink(self.tmpfile)
246
    os.rmdir(self.tmpdir)
247

    
248

    
249
  def testIgnoreDirs(self):
250
    """Test that RemoveFile() ignores directories"""
251
    self.assertEqual(None, RemoveFile(self.tmpdir))
252

    
253

    
254
  def testIgnoreNotExisting(self):
255
    """Test that RemoveFile() ignores non-existing files"""
256
    RemoveFile(self.tmpfile)
257
    RemoveFile(self.tmpfile)
258

    
259

    
260
  def testRemoveFile(self):
261
    """Test that RemoveFile does remove a file"""
262
    RemoveFile(self.tmpfile)
263
    if os.path.exists(self.tmpfile):
264
      self.fail("File '%s' not removed" % self.tmpfile)
265

    
266

    
267
  def testRemoveSymlink(self):
268
    """Test that RemoveFile does remove symlinks"""
269
    symlink = self.tmpdir + "/symlink"
270
    os.symlink("no-such-file", symlink)
271
    RemoveFile(symlink)
272
    if os.path.exists(symlink):
273
      self.fail("File '%s' not removed" % symlink)
274
    os.symlink(self.tmpfile, symlink)
275
    RemoveFile(symlink)
276
    if os.path.exists(symlink):
277
      self.fail("File '%s' not removed" % symlink)
278

    
279

    
280
class TestRename(unittest.TestCase):
281
  """Test case for RenameFile"""
282

    
283
  def setUp(self):
284
    """Create a temporary directory"""
285
    self.tmpdir = tempfile.mkdtemp()
286
    self.tmpfile = os.path.join(self.tmpdir, "test1")
287

    
288
    # Touch the file
289
    open(self.tmpfile, "w").close()
290

    
291
  def tearDown(self):
292
    """Remove temporary directory"""
293
    shutil.rmtree(self.tmpdir)
294

    
295
  def testSimpleRename1(self):
296
    """Simple rename 1"""
297
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
298

    
299
  def testSimpleRename2(self):
300
    """Simple rename 2"""
301
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
302
                     mkdir=True)
303

    
304
  def testRenameMkdir(self):
305
    """Rename with mkdir"""
306
    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
307
                     mkdir=True)
308

    
309

    
310
class TestMatchNameComponent(unittest.TestCase):
311
  """Test case for the MatchNameComponent function"""
312

    
313
  def testEmptyList(self):
314
    """Test that there is no match against an empty list"""
315

    
316
    self.failUnlessEqual(MatchNameComponent("", []), None)
317
    self.failUnlessEqual(MatchNameComponent("test", []), None)
318

    
319
  def testSingleMatch(self):
320
    """Test that a single match is performed correctly"""
321
    mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
322
    for key in "test2", "test2.example", "test2.example.com":
323
      self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
324

    
325
  def testMultipleMatches(self):
326
    """Test that a multiple match is returned as None"""
327
    mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
328
    for key in "test1", "test1.example":
329
      self.failUnlessEqual(MatchNameComponent(key, mlist), None)
330

    
331
  def testFullMatch(self):
332
    """Test that a full match is returned correctly"""
333
    key1 = "test1"
334
    key2 = "test1.example"
335
    mlist = [key2, key2 + ".com"]
336
    self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
337
    self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
338

    
339
  def testCaseInsensitivePartialMatch(self):
340
    """Test for the case_insensitive keyword"""
341
    mlist = ["test1.example.com", "test2.example.net"]
342
    self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
343
                     "test2.example.net")
344
    self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
345
                     "test2.example.net")
346
    self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
347
                     "test2.example.net")
348
    self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
349
                     "test2.example.net")
350

    
351

    
352
  def testCaseInsensitiveFullMatch(self):
353
    mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
354
    # Between the two ts1 a full string match non-case insensitive should work
355
    self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
356
                     None)
357
    self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
358
                     "ts1.ex")
359
    self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
360
                     "ts1.ex")
361
    # Between the two ts2 only case differs, so only case-match works
362
    self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
363
                     "ts2.ex")
364
    self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
365
                     "Ts2.ex")
366
    self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
367
                     None)
368

    
369

    
370
class TestFormatUnit(unittest.TestCase):
371
  """Test case for the FormatUnit function"""
372

    
373
  def testMiB(self):
374
    self.assertEqual(FormatUnit(1, 'h'), '1M')
375
    self.assertEqual(FormatUnit(100, 'h'), '100M')
376
    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
377

    
378
    self.assertEqual(FormatUnit(1, 'm'), '1')
379
    self.assertEqual(FormatUnit(100, 'm'), '100')
380
    self.assertEqual(FormatUnit(1023, 'm'), '1023')
381

    
382
    self.assertEqual(FormatUnit(1024, 'm'), '1024')
383
    self.assertEqual(FormatUnit(1536, 'm'), '1536')
384
    self.assertEqual(FormatUnit(17133, 'm'), '17133')
385
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
386

    
387
  def testGiB(self):
388
    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
389
    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
390
    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
391
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
392

    
393
    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
394
    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
395
    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
396
    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
397

    
398
    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
399
    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
400
    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
401

    
402
  def testTiB(self):
403
    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
404
    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
405
    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
406

    
407
    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
408
    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
409
    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
410

    
411
class TestParseUnit(unittest.TestCase):
412
  """Test case for the ParseUnit function"""
413

    
414
  SCALES = (('', 1),
415
            ('M', 1), ('G', 1024), ('T', 1024 * 1024),
416
            ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
417
            ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
418

    
419
  def testRounding(self):
420
    self.assertEqual(ParseUnit('0'), 0)
421
    self.assertEqual(ParseUnit('1'), 4)
422
    self.assertEqual(ParseUnit('2'), 4)
423
    self.assertEqual(ParseUnit('3'), 4)
424

    
425
    self.assertEqual(ParseUnit('124'), 124)
426
    self.assertEqual(ParseUnit('125'), 128)
427
    self.assertEqual(ParseUnit('126'), 128)
428
    self.assertEqual(ParseUnit('127'), 128)
429
    self.assertEqual(ParseUnit('128'), 128)
430
    self.assertEqual(ParseUnit('129'), 132)
431
    self.assertEqual(ParseUnit('130'), 132)
432

    
433
  def testFloating(self):
434
    self.assertEqual(ParseUnit('0'), 0)
435
    self.assertEqual(ParseUnit('0.5'), 4)
436
    self.assertEqual(ParseUnit('1.75'), 4)
437
    self.assertEqual(ParseUnit('1.99'), 4)
438
    self.assertEqual(ParseUnit('2.00'), 4)
439
    self.assertEqual(ParseUnit('2.01'), 4)
440
    self.assertEqual(ParseUnit('3.99'), 4)
441
    self.assertEqual(ParseUnit('4.00'), 4)
442
    self.assertEqual(ParseUnit('4.01'), 8)
443
    self.assertEqual(ParseUnit('1.5G'), 1536)
444
    self.assertEqual(ParseUnit('1.8G'), 1844)
445
    self.assertEqual(ParseUnit('8.28T'), 8682212)
446

    
447
  def testSuffixes(self):
448
    for sep in ('', ' ', '   ', "\t", "\t "):
449
      for suffix, scale in TestParseUnit.SCALES:
450
        for func in (lambda x: x, str.lower, str.upper):
451
          self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
452
                           1024 * scale)
453

    
454
  def testInvalidInput(self):
455
    for sep in ('-', '_', ',', 'a'):
456
      for suffix, _ in TestParseUnit.SCALES:
457
        self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
458

    
459
    for suffix, _ in TestParseUnit.SCALES:
460
      self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
461

    
462

    
463
class TestSshKeys(testutils.GanetiTestCase):
464
  """Test case for the AddAuthorizedKey function"""
465

    
466
  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
467
  KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
468
           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
469

    
470
  def setUp(self):
471
    testutils.GanetiTestCase.setUp(self)
472
    self.tmpname = self._CreateTempFile()
473
    handle = open(self.tmpname, 'w')
474
    try:
475
      handle.write("%s\n" % TestSshKeys.KEY_A)
476
      handle.write("%s\n" % TestSshKeys.KEY_B)
477
    finally:
478
      handle.close()
479

    
480
  def testAddingNewKey(self):
481
    AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
482

    
483
    self.assertFileContent(self.tmpname,
484
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
485
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
486
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
487
      "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
488

    
489
  def testAddingAlmostButNotCompletelyTheSameKey(self):
490
    AddAuthorizedKey(self.tmpname,
491
        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
492

    
493
    self.assertFileContent(self.tmpname,
494
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
495
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
496
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
497
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
498

    
499
  def testAddingExistingKeyWithSomeMoreSpaces(self):
500
    AddAuthorizedKey(self.tmpname,
501
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
502

    
503
    self.assertFileContent(self.tmpname,
504
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
505
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
506
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
507

    
508
  def testRemovingExistingKeyWithSomeMoreSpaces(self):
509
    RemoveAuthorizedKey(self.tmpname,
510
        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
511

    
512
    self.assertFileContent(self.tmpname,
513
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
514
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
515

    
516
  def testRemovingNonExistingKey(self):
517
    RemoveAuthorizedKey(self.tmpname,
518
        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
519

    
520
    self.assertFileContent(self.tmpname,
521
      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
522
      'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
523
      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
524

    
525

    
526
class TestEtcHosts(testutils.GanetiTestCase):
527
  """Test functions modifying /etc/hosts"""
528

    
529
  def setUp(self):
530
    testutils.GanetiTestCase.setUp(self)
531
    self.tmpname = self._CreateTempFile()
532
    handle = open(self.tmpname, 'w')
533
    try:
534
      handle.write('# This is a test file for /etc/hosts\n')
535
      handle.write('127.0.0.1\tlocalhost\n')
536
      handle.write('192.168.1.1 router gw\n')
537
    finally:
538
      handle.close()
539

    
540
  def testSettingNewIp(self):
541
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
542

    
543
    self.assertFileContent(self.tmpname,
544
      "# This is a test file for /etc/hosts\n"
545
      "127.0.0.1\tlocalhost\n"
546
      "192.168.1.1 router gw\n"
547
      "1.2.3.4\tmyhost.domain.tld myhost\n")
548
    self.assertFileMode(self.tmpname, 0644)
549

    
550
  def testSettingExistingIp(self):
551
    SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
552
                     ['myhost'])
553

    
554
    self.assertFileContent(self.tmpname,
555
      "# This is a test file for /etc/hosts\n"
556
      "127.0.0.1\tlocalhost\n"
557
      "192.168.1.1\tmyhost.domain.tld myhost\n")
558
    self.assertFileMode(self.tmpname, 0644)
559

    
560
  def testSettingDuplicateName(self):
561
    SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
562

    
563
    self.assertFileContent(self.tmpname,
564
      "# This is a test file for /etc/hosts\n"
565
      "127.0.0.1\tlocalhost\n"
566
      "192.168.1.1 router gw\n"
567
      "1.2.3.4\tmyhost\n")
568
    self.assertFileMode(self.tmpname, 0644)
569

    
570
  def testRemovingExistingHost(self):
571
    RemoveEtcHostsEntry(self.tmpname, 'router')
572

    
573
    self.assertFileContent(self.tmpname,
574
      "# This is a test file for /etc/hosts\n"
575
      "127.0.0.1\tlocalhost\n"
576
      "192.168.1.1 gw\n")
577
    self.assertFileMode(self.tmpname, 0644)
578

    
579
  def testRemovingSingleExistingHost(self):
580
    RemoveEtcHostsEntry(self.tmpname, 'localhost')
581

    
582
    self.assertFileContent(self.tmpname,
583
      "# This is a test file for /etc/hosts\n"
584
      "192.168.1.1 router gw\n")
585
    self.assertFileMode(self.tmpname, 0644)
586

    
587
  def testRemovingNonExistingHost(self):
588
    RemoveEtcHostsEntry(self.tmpname, 'myhost')
589

    
590
    self.assertFileContent(self.tmpname,
591
      "# This is a test file for /etc/hosts\n"
592
      "127.0.0.1\tlocalhost\n"
593
      "192.168.1.1 router gw\n")
594
    self.assertFileMode(self.tmpname, 0644)
595

    
596
  def testRemovingAlias(self):
597
    RemoveEtcHostsEntry(self.tmpname, 'gw')
598

    
599
    self.assertFileContent(self.tmpname,
600
      "# This is a test file for /etc/hosts\n"
601
      "127.0.0.1\tlocalhost\n"
602
      "192.168.1.1 router\n")
603
    self.assertFileMode(self.tmpname, 0644)
604

    
605

    
606
class TestShellQuoting(unittest.TestCase):
607
  """Test case for shell quoting functions"""
608

    
609
  def testShellQuote(self):
610
    self.assertEqual(ShellQuote('abc'), "abc")
611
    self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
612
    self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
613
    self.assertEqual(ShellQuote("a b c"), "'a b c'")
614
    self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
615

    
616
  def testShellQuoteArgs(self):
617
    self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
618
    self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
619
    self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
620

    
621

    
622
class TestTcpPing(unittest.TestCase):
623
  """Testcase for TCP version of ping - against listen(2)ing port"""
624

    
625
  def setUp(self):
626
    self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
627
    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
628
    self.listenerport = self.listener.getsockname()[1]
629
    self.listener.listen(1)
630

    
631
  def tearDown(self):
632
    self.listener.shutdown(socket.SHUT_RDWR)
633
    del self.listener
634
    del self.listenerport
635

    
636
  def testTcpPingToLocalHostAccept(self):
637
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
638
                         self.listenerport,
639
                         timeout=10,
640
                         live_port_needed=True,
641
                         source=constants.LOCALHOST_IP_ADDRESS,
642
                         ),
643
                 "failed to connect to test listener")
644

    
645
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
646
                         self.listenerport,
647
                         timeout=10,
648
                         live_port_needed=True,
649
                         ),
650
                 "failed to connect to test listener (no source)")
651

    
652

    
653
class TestTcpPingDeaf(unittest.TestCase):
654
  """Testcase for TCP version of ping - against non listen(2)ing port"""
655

    
656
  def setUp(self):
657
    self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
658
    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
659
    self.deaflistenerport = self.deaflistener.getsockname()[1]
660

    
661
  def tearDown(self):
662
    del self.deaflistener
663
    del self.deaflistenerport
664

    
665
  def testTcpPingToLocalHostAcceptDeaf(self):
666
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
667
                        self.deaflistenerport,
668
                        timeout=constants.TCP_PING_TIMEOUT,
669
                        live_port_needed=True,
670
                        source=constants.LOCALHOST_IP_ADDRESS,
671
                        ), # need successful connect(2)
672
                "successfully connected to deaf listener")
673

    
674
    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
675
                        self.deaflistenerport,
676
                        timeout=constants.TCP_PING_TIMEOUT,
677
                        live_port_needed=True,
678
                        ), # need successful connect(2)
679
                "successfully connected to deaf listener (no source addr)")
680

    
681
  def testTcpPingToLocalHostNoAccept(self):
682
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
683
                         self.deaflistenerport,
684
                         timeout=constants.TCP_PING_TIMEOUT,
685
                         live_port_needed=False,
686
                         source=constants.LOCALHOST_IP_ADDRESS,
687
                         ), # ECONNREFUSED is OK
688
                 "failed to ping alive host on deaf port")
689

    
690
    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
691
                         self.deaflistenerport,
692
                         timeout=constants.TCP_PING_TIMEOUT,
693
                         live_port_needed=False,
694
                         ), # ECONNREFUSED is OK
695
                 "failed to ping alive host on deaf port (no source addr)")
696

    
697

    
698
class TestOwnIpAddress(unittest.TestCase):
699
  """Testcase for OwnIpAddress"""
700

    
701
  def testOwnLoopback(self):
702
    """check having the loopback ip"""
703
    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
704
                    "Should own the loopback address")
705

    
706
  def testNowOwnAddress(self):
707
    """check that I don't own an address"""
708

    
709
    # network 192.0.2.0/24 is reserved for test/documentation as per
710
    # rfc 3330, so we *should* not have an address of this range... if
711
    # this fails, we should extend the test to multiple addresses
712
    DST_IP = "192.0.2.1"
713
    self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
714

    
715

    
716
class TestListVisibleFiles(unittest.TestCase):
717
  """Test case for ListVisibleFiles"""
718

    
719
  def setUp(self):
720
    self.path = tempfile.mkdtemp()
721

    
722
  def tearDown(self):
723
    shutil.rmtree(self.path)
724

    
725
  def _test(self, files, expected):
726
    # Sort a copy
727
    expected = expected[:]
728
    expected.sort()
729

    
730
    for name in files:
731
      f = open(os.path.join(self.path, name), 'w')
732
      try:
733
        f.write("Test\n")
734
      finally:
735
        f.close()
736

    
737
    found = ListVisibleFiles(self.path)
738
    found.sort()
739

    
740
    self.assertEqual(found, expected)
741

    
742
  def testAllVisible(self):
743
    files = ["a", "b", "c"]
744
    expected = files
745
    self._test(files, expected)
746

    
747
  def testNoneVisible(self):
748
    files = [".a", ".b", ".c"]
749
    expected = []
750
    self._test(files, expected)
751

    
752
  def testSomeVisible(self):
753
    files = ["a", "b", ".c"]
754
    expected = ["a", "b"]
755
    self._test(files, expected)
756

    
757

    
758
class TestNewUUID(unittest.TestCase):
759
  """Test case for NewUUID"""
760

    
761
  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
762
                        '[a-f0-9]{4}-[a-f0-9]{12}$')
763

    
764
  def runTest(self):
765
    self.failUnless(self._re_uuid.match(utils.NewUUID()))
766

    
767

    
768
class TestUniqueSequence(unittest.TestCase):
769
  """Test case for UniqueSequence"""
770

    
771
  def _test(self, input, expected):
772
    self.assertEqual(utils.UniqueSequence(input), expected)
773

    
774
  def runTest(self):
775
    # Ordered input
776
    self._test([1, 2, 3], [1, 2, 3])
777
    self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
778
    self._test([1, 2, 2, 3], [1, 2, 3])
779
    self._test([1, 2, 3, 3], [1, 2, 3])
780

    
781
    # Unordered input
782
    self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
783
    self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
784

    
785
    # Strings
786
    self._test(["a", "a"], ["a"])
787
    self._test(["a", "b"], ["a", "b"])
788
    self._test(["a", "b", "a"], ["a", "b"])
789

    
790

    
791
class TestFirstFree(unittest.TestCase):
792
  """Test case for the FirstFree function"""
793

    
794
  def test(self):
795
    """Test FirstFree"""
796
    self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
797
    self.failUnlessEqual(FirstFree([]), None)
798
    self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
799
    self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
800
    self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
801

    
802

    
803
class TestTailFile(testutils.GanetiTestCase):
804
  """Test case for the TailFile function"""
805

    
806
  def testEmpty(self):
807
    fname = self._CreateTempFile()
808
    self.failUnlessEqual(TailFile(fname), [])
809
    self.failUnlessEqual(TailFile(fname, lines=25), [])
810

    
811
  def testAllLines(self):
812
    data = ["test %d" % i for i in range(30)]
813
    for i in range(30):
814
      fname = self._CreateTempFile()
815
      fd = open(fname, "w")
816
      fd.write("\n".join(data[:i]))
817
      if i > 0:
818
        fd.write("\n")
819
      fd.close()
820
      self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
821

    
822
  def testPartialLines(self):
823
    data = ["test %d" % i for i in range(30)]
824
    fname = self._CreateTempFile()
825
    fd = open(fname, "w")
826
    fd.write("\n".join(data))
827
    fd.write("\n")
828
    fd.close()
829
    for i in range(1, 30):
830
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
831

    
832
  def testBigFile(self):
833
    data = ["test %d" % i for i in range(30)]
834
    fname = self._CreateTempFile()
835
    fd = open(fname, "w")
836
    fd.write("X" * 1048576)
837
    fd.write("\n")
838
    fd.write("\n".join(data))
839
    fd.write("\n")
840
    fd.close()
841
    for i in range(1, 30):
842
      self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
843

    
844

    
845
class TestFileLock(unittest.TestCase):
846
  """Test case for the FileLock class"""
847

    
848
  def setUp(self):
849
    self.tmpfile = tempfile.NamedTemporaryFile()
850
    self.lock = utils.FileLock(self.tmpfile.name)
851

    
852
  def testSharedNonblocking(self):
853
    self.lock.Shared(blocking=False)
854
    self.lock.Close()
855

    
856
  def testExclusiveNonblocking(self):
857
    self.lock.Exclusive(blocking=False)
858
    self.lock.Close()
859

    
860
  def testUnlockNonblocking(self):
861
    self.lock.Unlock(blocking=False)
862
    self.lock.Close()
863

    
864
  def testSharedBlocking(self):
865
    self.lock.Shared(blocking=True)
866
    self.lock.Close()
867

    
868
  def testExclusiveBlocking(self):
869
    self.lock.Exclusive(blocking=True)
870
    self.lock.Close()
871

    
872
  def testUnlockBlocking(self):
873
    self.lock.Unlock(blocking=True)
874
    self.lock.Close()
875

    
876
  def testSharedExclusiveUnlock(self):
877
    self.lock.Shared(blocking=False)
878
    self.lock.Exclusive(blocking=False)
879
    self.lock.Unlock(blocking=False)
880
    self.lock.Close()
881

    
882
  def testExclusiveSharedUnlock(self):
883
    self.lock.Exclusive(blocking=False)
884
    self.lock.Shared(blocking=False)
885
    self.lock.Unlock(blocking=False)
886
    self.lock.Close()
887

    
888
  def testCloseShared(self):
889
    self.lock.Close()
890
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
891

    
892
  def testCloseExclusive(self):
893
    self.lock.Close()
894
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
895

    
896
  def testCloseUnlock(self):
897
    self.lock.Close()
898
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
899

    
900

    
901
class TestTimeFunctions(unittest.TestCase):
902
  """Test case for time functions"""
903

    
904
  def runTest(self):
905
    self.assertEqual(utils.SplitTime(1), (1, 0))
906
    self.assertEqual(utils.SplitTime(1.5), (1, 500000))
907
    self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
908
    self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
909
    self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
910
    self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
911
    self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
912
    self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
913

    
914
    self.assertRaises(AssertionError, utils.SplitTime, -1)
915

    
916
    self.assertEqual(utils.MergeTime((1, 0)), 1.0)
917
    self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
918
    self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
919

    
920
    self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
921
                     1218448917.481)
922
    self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
923

    
924
    self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
925
    self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
926
    self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
927
    self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
928
    self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
929

    
930

    
931
class FieldSetTestCase(unittest.TestCase):
932
  """Test case for FieldSets"""
933

    
934
  def testSimpleMatch(self):
935
    f = utils.FieldSet("a", "b", "c", "def")
936
    self.failUnless(f.Matches("a"))
937
    self.failIf(f.Matches("d"), "Substring matched")
938
    self.failIf(f.Matches("defghi"), "Prefix string matched")
939
    self.failIf(f.NonMatching(["b", "c"]))
940
    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
941
    self.failUnless(f.NonMatching(["a", "d"]))
942

    
943
  def testRegexMatch(self):
944
    f = utils.FieldSet("a", "b([0-9]+)", "c")
945
    self.failUnless(f.Matches("b1"))
946
    self.failUnless(f.Matches("b99"))
947
    self.failIf(f.Matches("b/1"))
948
    self.failIf(f.NonMatching(["b12", "c"]))
949
    self.failUnless(f.NonMatching(["a", "1"]))
950

    
951
class TestForceDictType(unittest.TestCase):
952
  """Test case for ForceDictType"""
953

    
954
  def setUp(self):
955
    self.key_types = {
956
      'a': constants.VTYPE_INT,
957
      'b': constants.VTYPE_BOOL,
958
      'c': constants.VTYPE_STRING,
959
      'd': constants.VTYPE_SIZE,
960
      }
961

    
962
  def _fdt(self, dict, allowed_values=None):
963
    if allowed_values is None:
964
      ForceDictType(dict, self.key_types)
965
    else:
966
      ForceDictType(dict, self.key_types, allowed_values=allowed_values)
967

    
968
    return dict
969

    
970
  def testSimpleDict(self):
971
    self.assertEqual(self._fdt({}), {})
972
    self.assertEqual(self._fdt({'a': 1}), {'a': 1})
973
    self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
974
    self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
975
    self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
976
    self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
977
    self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
978
    self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
979
    self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
980
    self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
981
    self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
982
    self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
983

    
984
  def testErrors(self):
985
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
986
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
987
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
988
    self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
989

    
990

    
991
class TestIsAbsNormPath(unittest.TestCase):
992
  """Testing case for IsProcessAlive"""
993

    
994
  def _pathTestHelper(self, path, result):
995
    if result:
996
      self.assert_(IsNormAbsPath(path),
997
          "Path %s should result absolute and normalized" % path)
998
    else:
999
      self.assert_(not IsNormAbsPath(path),
1000
          "Path %s should not result absolute and normalized" % path)
1001

    
1002
  def testBase(self):
1003
    self._pathTestHelper('/etc', True)
1004
    self._pathTestHelper('/srv', True)
1005
    self._pathTestHelper('etc', False)
1006
    self._pathTestHelper('/etc/../root', False)
1007
    self._pathTestHelper('/etc/', False)
1008

    
1009

    
1010
class TestSafeEncode(unittest.TestCase):
1011
  """Test case for SafeEncode"""
1012

    
1013
  def testAscii(self):
1014
    for txt in [string.digits, string.letters, string.punctuation]:
1015
      self.failUnlessEqual(txt, SafeEncode(txt))
1016

    
1017
  def testDoubleEncode(self):
1018
    for i in range(255):
1019
      txt = SafeEncode(chr(i))
1020
      self.failUnlessEqual(txt, SafeEncode(txt))
1021

    
1022
  def testUnicode(self):
1023
    # 1024 is high enough to catch non-direct ASCII mappings
1024
    for i in range(1024):
1025
      txt = SafeEncode(unichr(i))
1026
      self.failUnlessEqual(txt, SafeEncode(txt))
1027

    
1028

    
1029
class TestFormatTime(unittest.TestCase):
1030
  """Testing case for FormatTime"""
1031

    
1032
  def testNone(self):
1033
    self.failUnlessEqual(FormatTime(None), "N/A")
1034

    
1035
  def testInvalid(self):
1036
    self.failUnlessEqual(FormatTime(()), "N/A")
1037

    
1038
  def testNow(self):
1039
    # tests that we accept time.time input
1040
    FormatTime(time.time())
1041
    # tests that we accept int input
1042
    FormatTime(int(time.time()))
1043

    
1044

    
1045
if __name__ == '__main__':
1046
  testutils.GanetiTestProgram()