burnin: move start_stop at the end
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import signal
32 import socket
33 import shutil
34 import re
35 import select
36
37 import ganeti
38 import testutils
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti.utils import IsProcessAlive, RunCmd, \
42      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46 from ganeti.errors import LockError, UnitParseError, GenericError, \
47      ProgrammerError
48
49
50 class TestIsProcessAlive(unittest.TestCase):
51   """Testing case for IsProcessAlive"""
52
53   def testExists(self):
54     mypid = os.getpid()
55     self.assert_(IsProcessAlive(mypid),
56                  "can't find myself running")
57
58   def testNotExisting(self):
59     pid_non_existing = os.fork()
60     if pid_non_existing == 0:
61       os._exit(0)
62     elif pid_non_existing < 0:
63       raise SystemError("can't fork")
64     os.waitpid(pid_non_existing, 0)
65     self.assert_(not IsProcessAlive(pid_non_existing),
66                  "nonexisting process detected")
67
68
69 class TestPidFileFunctions(unittest.TestCase):
70   """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
71
72   def setUp(self):
73     self.dir = tempfile.mkdtemp()
74     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
75     utils.DaemonPidFileName = self.f_dpn
76
77   def testPidFileFunctions(self):
78     pid_file = self.f_dpn('test')
79     utils.WritePidFile('test')
80     self.failUnless(os.path.exists(pid_file),
81                     "PID file should have been created")
82     read_pid = utils.ReadPidFile(pid_file)
83     self.failUnlessEqual(read_pid, os.getpid())
84     self.failUnless(utils.IsProcessAlive(read_pid))
85     self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
86     utils.RemovePidFile('test')
87     self.failIf(os.path.exists(pid_file),
88                 "PID file should not exist anymore")
89     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
90                          "ReadPidFile should return 0 for missing pid file")
91     fh = open(pid_file, "w")
92     fh.write("blah\n")
93     fh.close()
94     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95                          "ReadPidFile should return 0 for invalid pid file")
96     utils.RemovePidFile('test')
97     self.failIf(os.path.exists(pid_file),
98                 "PID file should not exist anymore")
99
100   def testKill(self):
101     pid_file = self.f_dpn('child')
102     r_fd, w_fd = os.pipe()
103     new_pid = os.fork()
104     if new_pid == 0: #child
105       utils.WritePidFile('child')
106       os.write(w_fd, 'a')
107       signal.pause()
108       os._exit(0)
109       return
110     # else we are in the parent
111     # wait until the child has written the pid file
112     os.read(r_fd, 1)
113     read_pid = utils.ReadPidFile(pid_file)
114     self.failUnlessEqual(read_pid, new_pid)
115     self.failUnless(utils.IsProcessAlive(new_pid))
116     utils.KillProcess(new_pid, waitpid=True)
117     self.failIf(utils.IsProcessAlive(new_pid))
118     utils.RemovePidFile('child')
119     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
120
121   def tearDown(self):
122     for name in os.listdir(self.dir):
123       os.unlink(os.path.join(self.dir, name))
124     os.rmdir(self.dir)
125
126
127 class TestRunCmd(testutils.GanetiTestCase):
128   """Testing case for the RunCmd function"""
129
130   def setUp(self):
131     self.magic = time.ctime() + " ganeti test"
132     fh, self.fname = tempfile.mkstemp()
133     os.close(fh)
134
135   def tearDown(self):
136     if self.fname:
137       utils.RemoveFile(self.fname)
138
139   def testOk(self):
140     """Test successful exit code"""
141     result = RunCmd("/bin/sh -c 'exit 0'")
142     self.assertEqual(result.exit_code, 0)
143     self.assertEqual(result.output, "")
144
145   def testFail(self):
146     """Test fail exit code"""
147     result = RunCmd("/bin/sh -c 'exit 1'")
148     self.assertEqual(result.exit_code, 1)
149     self.assertEqual(result.output, "")
150
151   def testStdout(self):
152     """Test standard output"""
153     cmd = 'echo -n "%s"' % self.magic
154     result = RunCmd("/bin/sh -c '%s'" % cmd)
155     self.assertEqual(result.stdout, self.magic)
156     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157     self.assertEqual(result.output, "")
158     self.assertFileContent(self.fname, self.magic)
159
160   def testStderr(self):
161     """Test standard error"""
162     cmd = 'echo -n "%s"' % self.magic
163     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164     self.assertEqual(result.stderr, self.magic)
165     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166     self.assertEqual(result.output, "")
167     self.assertFileContent(self.fname, self.magic)
168
169   def testCombined(self):
170     """Test combined output"""
171     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172     expected = "A" + self.magic + "B" + self.magic
173     result = RunCmd("/bin/sh -c '%s'" % cmd)
174     self.assertEqual(result.output, expected)
175     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176     self.assertEqual(result.output, "")
177     self.assertFileContent(self.fname, expected)
178
179   def testSignal(self):
180     """Test signal"""
181     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182     self.assertEqual(result.signal, 15)
183     self.assertEqual(result.output, "")
184
185   def testListRun(self):
186     """Test list runs"""
187     result = RunCmd(["true"])
188     self.assertEqual(result.signal, None)
189     self.assertEqual(result.exit_code, 0)
190     result = RunCmd(["/bin/sh", "-c", "exit 1"])
191     self.assertEqual(result.signal, None)
192     self.assertEqual(result.exit_code, 1)
193     result = RunCmd(["echo", "-n", self.magic])
194     self.assertEqual(result.signal, None)
195     self.assertEqual(result.exit_code, 0)
196     self.assertEqual(result.stdout, self.magic)
197
198   def testFileEmptyOutput(self):
199     """Test file output"""
200     result = RunCmd(["true"], output=self.fname)
201     self.assertEqual(result.signal, None)
202     self.assertEqual(result.exit_code, 0)
203     self.assertFileContent(self.fname, "")
204
205   def testLang(self):
206     """Test locale environment"""
207     old_env = os.environ.copy()
208     try:
209       os.environ["LANG"] = "en_US.UTF-8"
210       os.environ["LC_ALL"] = "en_US.UTF-8"
211       result = RunCmd(["locale"])
212       for line in result.output.splitlines():
213         key, value = line.split("=", 1)
214         # Ignore these variables, they're overridden by LC_ALL
215         if key == "LANG" or key == "LANGUAGE":
216           continue
217         self.failIf(value and value != "C" and value != '"C"',
218             "Variable %s is set to the invalid value '%s'" % (key, value))
219     finally:
220       os.environ = old_env
221
222   def testDefaultCwd(self):
223     """Test default working directory"""
224     self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225
226   def testCwd(self):
227     """Test default working directory"""
228     self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229     self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230     cwd = os.getcwd()
231     self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232
233
234 class TestRemoveFile(unittest.TestCase):
235   """Test case for the RemoveFile function"""
236
237   def setUp(self):
238     """Create a temp dir and file for each case"""
239     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241     os.close(fd)
242
243   def tearDown(self):
244     if os.path.exists(self.tmpfile):
245       os.unlink(self.tmpfile)
246     os.rmdir(self.tmpdir)
247
248
249   def testIgnoreDirs(self):
250     """Test that RemoveFile() ignores directories"""
251     self.assertEqual(None, RemoveFile(self.tmpdir))
252
253
254   def testIgnoreNotExisting(self):
255     """Test that RemoveFile() ignores non-existing files"""
256     RemoveFile(self.tmpfile)
257     RemoveFile(self.tmpfile)
258
259
260   def testRemoveFile(self):
261     """Test that RemoveFile does remove a file"""
262     RemoveFile(self.tmpfile)
263     if os.path.exists(self.tmpfile):
264       self.fail("File '%s' not removed" % self.tmpfile)
265
266
267   def testRemoveSymlink(self):
268     """Test that RemoveFile does remove symlinks"""
269     symlink = self.tmpdir + "/symlink"
270     os.symlink("no-such-file", symlink)
271     RemoveFile(symlink)
272     if os.path.exists(symlink):
273       self.fail("File '%s' not removed" % symlink)
274     os.symlink(self.tmpfile, symlink)
275     RemoveFile(symlink)
276     if os.path.exists(symlink):
277       self.fail("File '%s' not removed" % symlink)
278
279
280 class TestRename(unittest.TestCase):
281   """Test case for RenameFile"""
282
283   def setUp(self):
284     """Create a temporary directory"""
285     self.tmpdir = tempfile.mkdtemp()
286     self.tmpfile = os.path.join(self.tmpdir, "test1")
287
288     # Touch the file
289     open(self.tmpfile, "w").close()
290
291   def tearDown(self):
292     """Remove temporary directory"""
293     shutil.rmtree(self.tmpdir)
294
295   def testSimpleRename1(self):
296     """Simple rename 1"""
297     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
298
299   def testSimpleRename2(self):
300     """Simple rename 2"""
301     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
302                      mkdir=True)
303
304   def testRenameMkdir(self):
305     """Rename with mkdir"""
306     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
307                      mkdir=True)
308
309
310 class TestCheckdict(unittest.TestCase):
311   """Test case for the CheckDict function"""
312
313   def testAdd(self):
314     """Test that CheckDict adds a missing key with the correct value"""
315
316     tgt = {'a':1}
317     tmpl = {'b': 2}
318     CheckDict(tgt, tmpl)
319     if 'b' not in tgt or tgt['b'] != 2:
320       self.fail("Failed to update dict")
321
322
323   def testNoUpdate(self):
324     """Test that CheckDict does not overwrite an existing key"""
325     tgt = {'a':1, 'b': 3}
326     tmpl = {'b': 2}
327     CheckDict(tgt, tmpl)
328     self.failUnlessEqual(tgt['b'], 3)
329
330
331 class TestMatchNameComponent(unittest.TestCase):
332   """Test case for the MatchNameComponent function"""
333
334   def testEmptyList(self):
335     """Test that there is no match against an empty list"""
336
337     self.failUnlessEqual(MatchNameComponent("", []), None)
338     self.failUnlessEqual(MatchNameComponent("test", []), None)
339
340   def testSingleMatch(self):
341     """Test that a single match is performed correctly"""
342     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
343     for key in "test2", "test2.example", "test2.example.com":
344       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
345
346   def testMultipleMatches(self):
347     """Test that a multiple match is returned as None"""
348     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
349     for key in "test1", "test1.example":
350       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
351
352
353 class TestFormatUnit(unittest.TestCase):
354   """Test case for the FormatUnit function"""
355
356   def testMiB(self):
357     self.assertEqual(FormatUnit(1, 'h'), '1M')
358     self.assertEqual(FormatUnit(100, 'h'), '100M')
359     self.assertEqual(FormatUnit(1023, 'h'), '1023M')
360
361     self.assertEqual(FormatUnit(1, 'm'), '1')
362     self.assertEqual(FormatUnit(100, 'm'), '100')
363     self.assertEqual(FormatUnit(1023, 'm'), '1023')
364
365     self.assertEqual(FormatUnit(1024, 'm'), '1024')
366     self.assertEqual(FormatUnit(1536, 'm'), '1536')
367     self.assertEqual(FormatUnit(17133, 'm'), '17133')
368     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
369
370   def testGiB(self):
371     self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
372     self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
373     self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
374     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
375
376     self.assertEqual(FormatUnit(1024, 'g'), '1.0')
377     self.assertEqual(FormatUnit(1536, 'g'), '1.5')
378     self.assertEqual(FormatUnit(17133, 'g'), '16.7')
379     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
380
381     self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
382     self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
383     self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
384
385   def testTiB(self):
386     self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
387     self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
388     self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
389
390     self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
391     self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
392     self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
393
394 class TestParseUnit(unittest.TestCase):
395   """Test case for the ParseUnit function"""
396
397   SCALES = (('', 1),
398             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
399             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
400             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
401
402   def testRounding(self):
403     self.assertEqual(ParseUnit('0'), 0)
404     self.assertEqual(ParseUnit('1'), 4)
405     self.assertEqual(ParseUnit('2'), 4)
406     self.assertEqual(ParseUnit('3'), 4)
407
408     self.assertEqual(ParseUnit('124'), 124)
409     self.assertEqual(ParseUnit('125'), 128)
410     self.assertEqual(ParseUnit('126'), 128)
411     self.assertEqual(ParseUnit('127'), 128)
412     self.assertEqual(ParseUnit('128'), 128)
413     self.assertEqual(ParseUnit('129'), 132)
414     self.assertEqual(ParseUnit('130'), 132)
415
416   def testFloating(self):
417     self.assertEqual(ParseUnit('0'), 0)
418     self.assertEqual(ParseUnit('0.5'), 4)
419     self.assertEqual(ParseUnit('1.75'), 4)
420     self.assertEqual(ParseUnit('1.99'), 4)
421     self.assertEqual(ParseUnit('2.00'), 4)
422     self.assertEqual(ParseUnit('2.01'), 4)
423     self.assertEqual(ParseUnit('3.99'), 4)
424     self.assertEqual(ParseUnit('4.00'), 4)
425     self.assertEqual(ParseUnit('4.01'), 8)
426     self.assertEqual(ParseUnit('1.5G'), 1536)
427     self.assertEqual(ParseUnit('1.8G'), 1844)
428     self.assertEqual(ParseUnit('8.28T'), 8682212)
429
430   def testSuffixes(self):
431     for sep in ('', ' ', '   ', "\t", "\t "):
432       for suffix, scale in TestParseUnit.SCALES:
433         for func in (lambda x: x, str.lower, str.upper):
434           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
435                            1024 * scale)
436
437   def testInvalidInput(self):
438     for sep in ('-', '_', ',', 'a'):
439       for suffix, _ in TestParseUnit.SCALES:
440         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
441
442     for suffix, _ in TestParseUnit.SCALES:
443       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
444
445
446 class TestSshKeys(testutils.GanetiTestCase):
447   """Test case for the AddAuthorizedKey function"""
448
449   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
450   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
451            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
452
453   def setUp(self):
454     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
455     try:
456       handle = os.fdopen(fd, 'w')
457       try:
458         handle.write("%s\n" % TestSshKeys.KEY_A)
459         handle.write("%s\n" % TestSshKeys.KEY_B)
460       finally:
461         handle.close()
462     except:
463       utils.RemoveFile(self.tmpname)
464       raise
465
466   def tearDown(self):
467     utils.RemoveFile(self.tmpname)
468     del self.tmpname
469
470   def testAddingNewKey(self):
471     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
472
473     self.assertFileContent(self.tmpname,
474       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
475       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
476       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
477       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
478
479   def testAddingAlmostButNotCompletelyTheSameKey(self):
480     AddAuthorizedKey(self.tmpname,
481         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
482
483     self.assertFileContent(self.tmpname,
484       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
485       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
486       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
487       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
488
489   def testAddingExistingKeyWithSomeMoreSpaces(self):
490     AddAuthorizedKey(self.tmpname,
491         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
492
493     self.assertFileContent(self.tmpname,
494       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
495       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
496       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
497
498   def testRemovingExistingKeyWithSomeMoreSpaces(self):
499     RemoveAuthorizedKey(self.tmpname,
500         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
501
502     self.assertFileContent(self.tmpname,
503       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
504       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
505
506   def testRemovingNonExistingKey(self):
507     RemoveAuthorizedKey(self.tmpname,
508         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
509
510     self.assertFileContent(self.tmpname,
511       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
512       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
513       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
514
515
516 class TestEtcHosts(testutils.GanetiTestCase):
517   """Test functions modifying /etc/hosts"""
518
519   def setUp(self):
520     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
521     try:
522       handle = os.fdopen(fd, 'w')
523       try:
524         handle.write('# This is a test file for /etc/hosts\n')
525         handle.write('127.0.0.1\tlocalhost\n')
526         handle.write('192.168.1.1 router gw\n')
527       finally:
528         handle.close()
529     except:
530       utils.RemoveFile(self.tmpname)
531       raise
532
533   def tearDown(self):
534     utils.RemoveFile(self.tmpname)
535     del self.tmpname
536
537   def testSettingNewIp(self):
538     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
539
540     self.assertFileContent(self.tmpname,
541       "# This is a test file for /etc/hosts\n"
542       "127.0.0.1\tlocalhost\n"
543       "192.168.1.1 router gw\n"
544       "1.2.3.4\tmyhost.domain.tld myhost\n")
545
546   def testSettingExistingIp(self):
547     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
548                      ['myhost'])
549
550     self.assertFileContent(self.tmpname,
551       "# This is a test file for /etc/hosts\n"
552       "127.0.0.1\tlocalhost\n"
553       "192.168.1.1\tmyhost.domain.tld myhost\n")
554
555   def testSettingDuplicateName(self):
556     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
557
558     self.assertFileContent(self.tmpname,
559       "# This is a test file for /etc/hosts\n"
560       "127.0.0.1\tlocalhost\n"
561       "192.168.1.1 router gw\n"
562       "1.2.3.4\tmyhost\n")
563
564   def testRemovingExistingHost(self):
565     RemoveEtcHostsEntry(self.tmpname, 'router')
566
567     self.assertFileContent(self.tmpname,
568       "# This is a test file for /etc/hosts\n"
569       "127.0.0.1\tlocalhost\n"
570       "192.168.1.1 gw\n")
571
572   def testRemovingSingleExistingHost(self):
573     RemoveEtcHostsEntry(self.tmpname, 'localhost')
574
575     self.assertFileContent(self.tmpname,
576       "# This is a test file for /etc/hosts\n"
577       "192.168.1.1 router gw\n")
578
579   def testRemovingNonExistingHost(self):
580     RemoveEtcHostsEntry(self.tmpname, 'myhost')
581
582     self.assertFileContent(self.tmpname,
583       "# This is a test file for /etc/hosts\n"
584       "127.0.0.1\tlocalhost\n"
585       "192.168.1.1 router gw\n")
586
587   def testRemovingAlias(self):
588     RemoveEtcHostsEntry(self.tmpname, 'gw')
589
590     self.assertFileContent(self.tmpname,
591       "# This is a test file for /etc/hosts\n"
592       "127.0.0.1\tlocalhost\n"
593       "192.168.1.1 router\n")
594
595
596 class TestShellQuoting(unittest.TestCase):
597   """Test case for shell quoting functions"""
598
599   def testShellQuote(self):
600     self.assertEqual(ShellQuote('abc'), "abc")
601     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
602     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
603     self.assertEqual(ShellQuote("a b c"), "'a b c'")
604     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
605
606   def testShellQuoteArgs(self):
607     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
608     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
609     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
610
611
612 class TestTcpPing(unittest.TestCase):
613   """Testcase for TCP version of ping - against listen(2)ing port"""
614
615   def setUp(self):
616     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
617     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
618     self.listenerport = self.listener.getsockname()[1]
619     self.listener.listen(1)
620
621   def tearDown(self):
622     self.listener.shutdown(socket.SHUT_RDWR)
623     del self.listener
624     del self.listenerport
625
626   def testTcpPingToLocalHostAccept(self):
627     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
628                          self.listenerport,
629                          timeout=10,
630                          live_port_needed=True,
631                          source=constants.LOCALHOST_IP_ADDRESS,
632                          ),
633                  "failed to connect to test listener")
634
635     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
636                          self.listenerport,
637                          timeout=10,
638                          live_port_needed=True,
639                          ),
640                  "failed to connect to test listener (no source)")
641
642
643 class TestTcpPingDeaf(unittest.TestCase):
644   """Testcase for TCP version of ping - against non listen(2)ing port"""
645
646   def setUp(self):
647     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
648     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
649     self.deaflistenerport = self.deaflistener.getsockname()[1]
650
651   def tearDown(self):
652     del self.deaflistener
653     del self.deaflistenerport
654
655   def testTcpPingToLocalHostAcceptDeaf(self):
656     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
657                         self.deaflistenerport,
658                         timeout=constants.TCP_PING_TIMEOUT,
659                         live_port_needed=True,
660                         source=constants.LOCALHOST_IP_ADDRESS,
661                         ), # need successful connect(2)
662                 "successfully connected to deaf listener")
663
664     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
665                         self.deaflistenerport,
666                         timeout=constants.TCP_PING_TIMEOUT,
667                         live_port_needed=True,
668                         ), # need successful connect(2)
669                 "successfully connected to deaf listener (no source addr)")
670
671   def testTcpPingToLocalHostNoAccept(self):
672     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
673                          self.deaflistenerport,
674                          timeout=constants.TCP_PING_TIMEOUT,
675                          live_port_needed=False,
676                          source=constants.LOCALHOST_IP_ADDRESS,
677                          ), # ECONNREFUSED is OK
678                  "failed to ping alive host on deaf port")
679
680     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
681                          self.deaflistenerport,
682                          timeout=constants.TCP_PING_TIMEOUT,
683                          live_port_needed=False,
684                          ), # ECONNREFUSED is OK
685                  "failed to ping alive host on deaf port (no source addr)")
686
687
688 class TestOwnIpAddress(unittest.TestCase):
689   """Testcase for OwnIpAddress"""
690
691   def testOwnLoopback(self):
692     """check having the loopback ip"""
693     self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
694                     "Should own the loopback address")
695
696   def testNowOwnAddress(self):
697     """check that I don't own an address"""
698
699     # network 192.0.2.0/24 is reserved for test/documentation as per
700     # rfc 3330, so we *should* not have an address of this range... if
701     # this fails, we should extend the test to multiple addresses
702     DST_IP = "192.0.2.1"
703     self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
704
705
706 class TestListVisibleFiles(unittest.TestCase):
707   """Test case for ListVisibleFiles"""
708
709   def setUp(self):
710     self.path = tempfile.mkdtemp()
711
712   def tearDown(self):
713     shutil.rmtree(self.path)
714
715   def _test(self, files, expected):
716     # Sort a copy
717     expected = expected[:]
718     expected.sort()
719
720     for name in files:
721       f = open(os.path.join(self.path, name), 'w')
722       try:
723         f.write("Test\n")
724       finally:
725         f.close()
726
727     found = ListVisibleFiles(self.path)
728     found.sort()
729
730     self.assertEqual(found, expected)
731
732   def testAllVisible(self):
733     files = ["a", "b", "c"]
734     expected = files
735     self._test(files, expected)
736
737   def testNoneVisible(self):
738     files = [".a", ".b", ".c"]
739     expected = []
740     self._test(files, expected)
741
742   def testSomeVisible(self):
743     files = ["a", "b", ".c"]
744     expected = ["a", "b"]
745     self._test(files, expected)
746
747
748 class TestNewUUID(unittest.TestCase):
749   """Test case for NewUUID"""
750
751   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
752                         '[a-f0-9]{4}-[a-f0-9]{12}$')
753
754   def runTest(self):
755     self.failUnless(self._re_uuid.match(utils.NewUUID()))
756
757
758 class TestUniqueSequence(unittest.TestCase):
759   """Test case for UniqueSequence"""
760
761   def _test(self, input, expected):
762     self.assertEqual(utils.UniqueSequence(input), expected)
763
764   def runTest(self):
765     # Ordered input
766     self._test([1, 2, 3], [1, 2, 3])
767     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
768     self._test([1, 2, 2, 3], [1, 2, 3])
769     self._test([1, 2, 3, 3], [1, 2, 3])
770
771     # Unordered input
772     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
773     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
774
775     # Strings
776     self._test(["a", "a"], ["a"])
777     self._test(["a", "b"], ["a", "b"])
778     self._test(["a", "b", "a"], ["a", "b"])
779
780
781 class TestFirstFree(unittest.TestCase):
782   """Test case for the FirstFree function"""
783
784   def test(self):
785     """Test FirstFree"""
786     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
787     self.failUnlessEqual(FirstFree([]), None)
788     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
789     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
790     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
791
792
793 class TestFileLock(unittest.TestCase):
794   """Test case for the FileLock class"""
795
796   def setUp(self):
797     self.tmpfile = tempfile.NamedTemporaryFile()
798     self.lock = utils.FileLock(self.tmpfile.name)
799
800   def testSharedNonblocking(self):
801     self.lock.Shared(blocking=False)
802     self.lock.Close()
803
804   def testExclusiveNonblocking(self):
805     self.lock.Exclusive(blocking=False)
806     self.lock.Close()
807
808   def testUnlockNonblocking(self):
809     self.lock.Unlock(blocking=False)
810     self.lock.Close()
811
812   def testSharedBlocking(self):
813     self.lock.Shared(blocking=True)
814     self.lock.Close()
815
816   def testExclusiveBlocking(self):
817     self.lock.Exclusive(blocking=True)
818     self.lock.Close()
819
820   def testUnlockBlocking(self):
821     self.lock.Unlock(blocking=True)
822     self.lock.Close()
823
824   def testSharedExclusiveUnlock(self):
825     self.lock.Shared(blocking=False)
826     self.lock.Exclusive(blocking=False)
827     self.lock.Unlock(blocking=False)
828     self.lock.Close()
829
830   def testExclusiveSharedUnlock(self):
831     self.lock.Exclusive(blocking=False)
832     self.lock.Shared(blocking=False)
833     self.lock.Unlock(blocking=False)
834     self.lock.Close()
835
836   def testCloseShared(self):
837     self.lock.Close()
838     self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
839
840   def testCloseExclusive(self):
841     self.lock.Close()
842     self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
843
844   def testCloseUnlock(self):
845     self.lock.Close()
846     self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
847
848
849 class TestTimeFunctions(unittest.TestCase):
850   """Test case for time functions"""
851
852   def runTest(self):
853     self.assertEqual(utils.SplitTime(1), (1, 0))
854     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
855     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
856     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
857     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
858     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
859     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
860     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
861
862     self.assertRaises(AssertionError, utils.SplitTime, -1)
863
864     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
865     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
866     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
867
868     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
869     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
870
871     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
872     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
873     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
874     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
875     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
876
877
878 class FieldSetTestCase(unittest.TestCase):
879   """Test case for FieldSets"""
880
881   def testSimpleMatch(self):
882     f = utils.FieldSet("a", "b", "c", "def")
883     self.failUnless(f.Matches("a"))
884     self.failIf(f.Matches("d"), "Substring matched")
885     self.failIf(f.Matches("defghi"), "Prefix string matched")
886     self.failIf(f.NonMatching(["b", "c"]))
887     self.failIf(f.NonMatching(["a", "b", "c", "def"]))
888     self.failUnless(f.NonMatching(["a", "d"]))
889
890   def testRegexMatch(self):
891     f = utils.FieldSet("a", "b([0-9]+)", "c")
892     self.failUnless(f.Matches("b1"))
893     self.failUnless(f.Matches("b99"))
894     self.failIf(f.Matches("b/1"))
895     self.failIf(f.NonMatching(["b12", "c"]))
896     self.failUnless(f.NonMatching(["a", "1"]))
897
898
899 if __name__ == '__main__':
900   unittest.main()