Convert iallocator to the new _ComputeDiskSize
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import signal
32 import socket
33 import shutil
34 import re
35 import select
36
37 import ganeti
38 import testutils
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti.utils import IsProcessAlive, RunCmd, \
42      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
43      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
44      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
45      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress
46 from ganeti.errors import LockError, UnitParseError, GenericError, \
47      ProgrammerError
48
49
50 class TestIsProcessAlive(unittest.TestCase):
51   """Testing case for IsProcessAlive"""
52
53   def testExists(self):
54     mypid = os.getpid()
55     self.assert_(IsProcessAlive(mypid),
56                  "can't find myself running")
57
58   def testNotExisting(self):
59     pid_non_existing = os.fork()
60     if pid_non_existing == 0:
61       os._exit(0)
62     elif pid_non_existing < 0:
63       raise SystemError("can't fork")
64     os.waitpid(pid_non_existing, 0)
65     self.assert_(not IsProcessAlive(pid_non_existing),
66                  "nonexisting process detected")
67
68
69 class TestPidFileFunctions(unittest.TestCase):
70   """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
71
72   def setUp(self):
73     self.dir = tempfile.mkdtemp()
74     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
75     utils.DaemonPidFileName = self.f_dpn
76
77   def testPidFileFunctions(self):
78     pid_file = self.f_dpn('test')
79     utils.WritePidFile('test')
80     self.failUnless(os.path.exists(pid_file),
81                     "PID file should have been created")
82     read_pid = utils.ReadPidFile(pid_file)
83     self.failUnlessEqual(read_pid, os.getpid())
84     self.failUnless(utils.IsProcessAlive(read_pid))
85     self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
86     utils.RemovePidFile('test')
87     self.failIf(os.path.exists(pid_file),
88                 "PID file should not exist anymore")
89     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
90                          "ReadPidFile should return 0 for missing pid file")
91     fh = open(pid_file, "w")
92     fh.write("blah\n")
93     fh.close()
94     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95                          "ReadPidFile should return 0 for invalid pid file")
96     utils.RemovePidFile('test')
97     self.failIf(os.path.exists(pid_file),
98                 "PID file should not exist anymore")
99
100   def testKill(self):
101     pid_file = self.f_dpn('child')
102     r_fd, w_fd = os.pipe()
103     new_pid = os.fork()
104     if new_pid == 0: #child
105       utils.WritePidFile('child')
106       os.write(w_fd, 'a')
107       signal.pause()
108       os._exit(0)
109       return
110     # else we are in the parent
111     # wait until the child has written the pid file
112     os.read(r_fd, 1)
113     read_pid = utils.ReadPidFile(pid_file)
114     self.failUnlessEqual(read_pid, new_pid)
115     self.failUnless(utils.IsProcessAlive(new_pid))
116     utils.KillProcess(new_pid, waitpid=True)
117     self.failIf(utils.IsProcessAlive(new_pid))
118     utils.RemovePidFile('child')
119     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
120
121   def tearDown(self):
122     for name in os.listdir(self.dir):
123       os.unlink(os.path.join(self.dir, name))
124     os.rmdir(self.dir)
125
126
127 class TestRunCmd(testutils.GanetiTestCase):
128   """Testing case for the RunCmd function"""
129
130   def setUp(self):
131     self.magic = time.ctime() + " ganeti test"
132     fh, self.fname = tempfile.mkstemp()
133     os.close(fh)
134
135   def tearDown(self):
136     if self.fname:
137       utils.RemoveFile(self.fname)
138
139   def testOk(self):
140     """Test successful exit code"""
141     result = RunCmd("/bin/sh -c 'exit 0'")
142     self.assertEqual(result.exit_code, 0)
143     self.assertEqual(result.output, "")
144
145   def testFail(self):
146     """Test fail exit code"""
147     result = RunCmd("/bin/sh -c 'exit 1'")
148     self.assertEqual(result.exit_code, 1)
149     self.assertEqual(result.output, "")
150
151   def testStdout(self):
152     """Test standard output"""
153     cmd = 'echo -n "%s"' % self.magic
154     result = RunCmd("/bin/sh -c '%s'" % cmd)
155     self.assertEqual(result.stdout, self.magic)
156     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157     self.assertEqual(result.output, "")
158     self.assertFileContent(self.fname, self.magic)
159
160   def testStderr(self):
161     """Test standard error"""
162     cmd = 'echo -n "%s"' % self.magic
163     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164     self.assertEqual(result.stderr, self.magic)
165     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166     self.assertEqual(result.output, "")
167     self.assertFileContent(self.fname, self.magic)
168
169   def testCombined(self):
170     """Test combined output"""
171     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172     expected = "A" + self.magic + "B" + self.magic
173     result = RunCmd("/bin/sh -c '%s'" % cmd)
174     self.assertEqual(result.output, expected)
175     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176     self.assertEqual(result.output, "")
177     self.assertFileContent(self.fname, expected)
178
179   def testSignal(self):
180     """Test signal"""
181     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182     self.assertEqual(result.signal, 15)
183     self.assertEqual(result.output, "")
184
185   def testListRun(self):
186     """Test list runs"""
187     result = RunCmd(["true"])
188     self.assertEqual(result.signal, None)
189     self.assertEqual(result.exit_code, 0)
190     result = RunCmd(["/bin/sh", "-c", "exit 1"])
191     self.assertEqual(result.signal, None)
192     self.assertEqual(result.exit_code, 1)
193     result = RunCmd(["echo", "-n", self.magic])
194     self.assertEqual(result.signal, None)
195     self.assertEqual(result.exit_code, 0)
196     self.assertEqual(result.stdout, self.magic)
197
198   def testFileEmptyOutput(self):
199     """Test file output"""
200     result = RunCmd(["true"], output=self.fname)
201     self.assertEqual(result.signal, None)
202     self.assertEqual(result.exit_code, 0)
203     self.assertFileContent(self.fname, "")
204
205   def testLang(self):
206     """Test locale environment"""
207     old_env = os.environ.copy()
208     try:
209       os.environ["LANG"] = "en_US.UTF-8"
210       os.environ["LC_ALL"] = "en_US.UTF-8"
211       result = RunCmd(["locale"])
212       for line in result.output.splitlines():
213         key, value = line.split("=", 1)
214         # Ignore these variables, they're overridden by LC_ALL
215         if key == "LANG" or key == "LANGUAGE":
216           continue
217         self.failIf(value and value != "C" and value != '"C"',
218             "Variable %s is set to the invalid value '%s'" % (key, value))
219     finally:
220       os.environ = old_env
221
222   def testDefaultCwd(self):
223     """Test default working directory"""
224     self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225
226   def testCwd(self):
227     """Test default working directory"""
228     self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229     self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230     cwd = os.getcwd()
231     self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232
233
234 class TestRemoveFile(unittest.TestCase):
235   """Test case for the RemoveFile function"""
236
237   def setUp(self):
238     """Create a temp dir and file for each case"""
239     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241     os.close(fd)
242
243   def tearDown(self):
244     if os.path.exists(self.tmpfile):
245       os.unlink(self.tmpfile)
246     os.rmdir(self.tmpdir)
247
248
249   def testIgnoreDirs(self):
250     """Test that RemoveFile() ignores directories"""
251     self.assertEqual(None, RemoveFile(self.tmpdir))
252
253
254   def testIgnoreNotExisting(self):
255     """Test that RemoveFile() ignores non-existing files"""
256     RemoveFile(self.tmpfile)
257     RemoveFile(self.tmpfile)
258
259
260   def testRemoveFile(self):
261     """Test that RemoveFile does remove a file"""
262     RemoveFile(self.tmpfile)
263     if os.path.exists(self.tmpfile):
264       self.fail("File '%s' not removed" % self.tmpfile)
265
266
267   def testRemoveSymlink(self):
268     """Test that RemoveFile does remove symlinks"""
269     symlink = self.tmpdir + "/symlink"
270     os.symlink("no-such-file", symlink)
271     RemoveFile(symlink)
272     if os.path.exists(symlink):
273       self.fail("File '%s' not removed" % symlink)
274     os.symlink(self.tmpfile, symlink)
275     RemoveFile(symlink)
276     if os.path.exists(symlink):
277       self.fail("File '%s' not removed" % symlink)
278
279
280 class TestCheckdict(unittest.TestCase):
281   """Test case for the CheckDict function"""
282
283   def testAdd(self):
284     """Test that CheckDict adds a missing key with the correct value"""
285
286     tgt = {'a':1}
287     tmpl = {'b': 2}
288     CheckDict(tgt, tmpl)
289     if 'b' not in tgt or tgt['b'] != 2:
290       self.fail("Failed to update dict")
291
292
293   def testNoUpdate(self):
294     """Test that CheckDict does not overwrite an existing key"""
295     tgt = {'a':1, 'b': 3}
296     tmpl = {'b': 2}
297     CheckDict(tgt, tmpl)
298     self.failUnlessEqual(tgt['b'], 3)
299
300
301 class TestMatchNameComponent(unittest.TestCase):
302   """Test case for the MatchNameComponent function"""
303
304   def testEmptyList(self):
305     """Test that there is no match against an empty list"""
306
307     self.failUnlessEqual(MatchNameComponent("", []), None)
308     self.failUnlessEqual(MatchNameComponent("test", []), None)
309
310   def testSingleMatch(self):
311     """Test that a single match is performed correctly"""
312     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
313     for key in "test2", "test2.example", "test2.example.com":
314       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
315
316   def testMultipleMatches(self):
317     """Test that a multiple match is returned as None"""
318     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
319     for key in "test1", "test1.example":
320       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
321
322
323 class TestFormatUnit(unittest.TestCase):
324   """Test case for the FormatUnit function"""
325
326   def testMiB(self):
327     self.assertEqual(FormatUnit(1, 'h'), '1M')
328     self.assertEqual(FormatUnit(100, 'h'), '100M')
329     self.assertEqual(FormatUnit(1023, 'h'), '1023M')
330
331     self.assertEqual(FormatUnit(1, 'm'), '1')
332     self.assertEqual(FormatUnit(100, 'm'), '100')
333     self.assertEqual(FormatUnit(1023, 'm'), '1023')
334
335     self.assertEqual(FormatUnit(1024, 'm'), '1024')
336     self.assertEqual(FormatUnit(1536, 'm'), '1536')
337     self.assertEqual(FormatUnit(17133, 'm'), '17133')
338     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
339
340   def testGiB(self):
341     self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
342     self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
343     self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
344     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
345
346     self.assertEqual(FormatUnit(1024, 'g'), '1.0')
347     self.assertEqual(FormatUnit(1536, 'g'), '1.5')
348     self.assertEqual(FormatUnit(17133, 'g'), '16.7')
349     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
350
351     self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
352     self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
353     self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
354
355   def testTiB(self):
356     self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
357     self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
358     self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
359
360     self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
361     self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
362     self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
363
364 class TestParseUnit(unittest.TestCase):
365   """Test case for the ParseUnit function"""
366
367   SCALES = (('', 1),
368             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
369             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
370             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
371
372   def testRounding(self):
373     self.assertEqual(ParseUnit('0'), 0)
374     self.assertEqual(ParseUnit('1'), 4)
375     self.assertEqual(ParseUnit('2'), 4)
376     self.assertEqual(ParseUnit('3'), 4)
377
378     self.assertEqual(ParseUnit('124'), 124)
379     self.assertEqual(ParseUnit('125'), 128)
380     self.assertEqual(ParseUnit('126'), 128)
381     self.assertEqual(ParseUnit('127'), 128)
382     self.assertEqual(ParseUnit('128'), 128)
383     self.assertEqual(ParseUnit('129'), 132)
384     self.assertEqual(ParseUnit('130'), 132)
385
386   def testFloating(self):
387     self.assertEqual(ParseUnit('0'), 0)
388     self.assertEqual(ParseUnit('0.5'), 4)
389     self.assertEqual(ParseUnit('1.75'), 4)
390     self.assertEqual(ParseUnit('1.99'), 4)
391     self.assertEqual(ParseUnit('2.00'), 4)
392     self.assertEqual(ParseUnit('2.01'), 4)
393     self.assertEqual(ParseUnit('3.99'), 4)
394     self.assertEqual(ParseUnit('4.00'), 4)
395     self.assertEqual(ParseUnit('4.01'), 8)
396     self.assertEqual(ParseUnit('1.5G'), 1536)
397     self.assertEqual(ParseUnit('1.8G'), 1844)
398     self.assertEqual(ParseUnit('8.28T'), 8682212)
399
400   def testSuffixes(self):
401     for sep in ('', ' ', '   ', "\t", "\t "):
402       for suffix, scale in TestParseUnit.SCALES:
403         for func in (lambda x: x, str.lower, str.upper):
404           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
405                            1024 * scale)
406
407   def testInvalidInput(self):
408     for sep in ('-', '_', ',', 'a'):
409       for suffix, _ in TestParseUnit.SCALES:
410         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
411
412     for suffix, _ in TestParseUnit.SCALES:
413       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
414
415
416 class TestSshKeys(testutils.GanetiTestCase):
417   """Test case for the AddAuthorizedKey function"""
418
419   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
420   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
421            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
422
423   def setUp(self):
424     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
425     try:
426       handle = os.fdopen(fd, 'w')
427       try:
428         handle.write("%s\n" % TestSshKeys.KEY_A)
429         handle.write("%s\n" % TestSshKeys.KEY_B)
430       finally:
431         handle.close()
432     except:
433       utils.RemoveFile(self.tmpname)
434       raise
435
436   def tearDown(self):
437     utils.RemoveFile(self.tmpname)
438     del self.tmpname
439
440   def testAddingNewKey(self):
441     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
442
443     self.assertFileContent(self.tmpname,
444       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
445       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
446       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
447       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
448
449   def testAddingAlmostButNotCompletelyTheSameKey(self):
450     AddAuthorizedKey(self.tmpname,
451         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
452
453     self.assertFileContent(self.tmpname,
454       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
455       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
456       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
457       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
458
459   def testAddingExistingKeyWithSomeMoreSpaces(self):
460     AddAuthorizedKey(self.tmpname,
461         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
462
463     self.assertFileContent(self.tmpname,
464       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
465       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
466       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
467
468   def testRemovingExistingKeyWithSomeMoreSpaces(self):
469     RemoveAuthorizedKey(self.tmpname,
470         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
471
472     self.assertFileContent(self.tmpname,
473       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
474       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
475
476   def testRemovingNonExistingKey(self):
477     RemoveAuthorizedKey(self.tmpname,
478         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
479
480     self.assertFileContent(self.tmpname,
481       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
482       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
483       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
484
485
486 class TestEtcHosts(testutils.GanetiTestCase):
487   """Test functions modifying /etc/hosts"""
488
489   def setUp(self):
490     (fd, self.tmpname) = tempfile.mkstemp(prefix='ganeti-test')
491     try:
492       handle = os.fdopen(fd, 'w')
493       try:
494         handle.write('# This is a test file for /etc/hosts\n')
495         handle.write('127.0.0.1\tlocalhost\n')
496         handle.write('192.168.1.1 router gw\n')
497       finally:
498         handle.close()
499     except:
500       utils.RemoveFile(self.tmpname)
501       raise
502
503   def tearDown(self):
504     utils.RemoveFile(self.tmpname)
505     del self.tmpname
506
507   def testSettingNewIp(self):
508     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
509
510     self.assertFileContent(self.tmpname,
511       "# This is a test file for /etc/hosts\n"
512       "127.0.0.1\tlocalhost\n"
513       "192.168.1.1 router gw\n"
514       "1.2.3.4\tmyhost.domain.tld myhost\n")
515
516   def testSettingExistingIp(self):
517     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
518                      ['myhost'])
519
520     self.assertFileContent(self.tmpname,
521       "# This is a test file for /etc/hosts\n"
522       "127.0.0.1\tlocalhost\n"
523       "192.168.1.1\tmyhost.domain.tld myhost\n")
524
525   def testSettingDuplicateName(self):
526     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
527
528     self.assertFileContent(self.tmpname,
529       "# This is a test file for /etc/hosts\n"
530       "127.0.0.1\tlocalhost\n"
531       "192.168.1.1 router gw\n"
532       "1.2.3.4\tmyhost\n")
533
534   def testRemovingExistingHost(self):
535     RemoveEtcHostsEntry(self.tmpname, 'router')
536
537     self.assertFileContent(self.tmpname,
538       "# This is a test file for /etc/hosts\n"
539       "127.0.0.1\tlocalhost\n"
540       "192.168.1.1 gw\n")
541
542   def testRemovingSingleExistingHost(self):
543     RemoveEtcHostsEntry(self.tmpname, 'localhost')
544
545     self.assertFileContent(self.tmpname,
546       "# This is a test file for /etc/hosts\n"
547       "192.168.1.1 router gw\n")
548
549   def testRemovingNonExistingHost(self):
550     RemoveEtcHostsEntry(self.tmpname, 'myhost')
551
552     self.assertFileContent(self.tmpname,
553       "# This is a test file for /etc/hosts\n"
554       "127.0.0.1\tlocalhost\n"
555       "192.168.1.1 router gw\n")
556
557   def testRemovingAlias(self):
558     RemoveEtcHostsEntry(self.tmpname, 'gw')
559
560     self.assertFileContent(self.tmpname,
561       "# This is a test file for /etc/hosts\n"
562       "127.0.0.1\tlocalhost\n"
563       "192.168.1.1 router\n")
564
565
566 class TestShellQuoting(unittest.TestCase):
567   """Test case for shell quoting functions"""
568
569   def testShellQuote(self):
570     self.assertEqual(ShellQuote('abc'), "abc")
571     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
572     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
573     self.assertEqual(ShellQuote("a b c"), "'a b c'")
574     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
575
576   def testShellQuoteArgs(self):
577     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
578     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
579     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
580
581
582 class TestTcpPing(unittest.TestCase):
583   """Testcase for TCP version of ping - against listen(2)ing port"""
584
585   def setUp(self):
586     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
587     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
588     self.listenerport = self.listener.getsockname()[1]
589     self.listener.listen(1)
590
591   def tearDown(self):
592     self.listener.shutdown(socket.SHUT_RDWR)
593     del self.listener
594     del self.listenerport
595
596   def testTcpPingToLocalHostAccept(self):
597     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
598                          self.listenerport,
599                          timeout=10,
600                          live_port_needed=True,
601                          source=constants.LOCALHOST_IP_ADDRESS,
602                          ),
603                  "failed to connect to test listener")
604
605     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
606                          self.listenerport,
607                          timeout=10,
608                          live_port_needed=True,
609                          ),
610                  "failed to connect to test listener (no source)")
611
612
613 class TestTcpPingDeaf(unittest.TestCase):
614   """Testcase for TCP version of ping - against non listen(2)ing port"""
615
616   def setUp(self):
617     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
618     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
619     self.deaflistenerport = self.deaflistener.getsockname()[1]
620
621   def tearDown(self):
622     del self.deaflistener
623     del self.deaflistenerport
624
625   def testTcpPingToLocalHostAcceptDeaf(self):
626     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
627                         self.deaflistenerport,
628                         timeout=constants.TCP_PING_TIMEOUT,
629                         live_port_needed=True,
630                         source=constants.LOCALHOST_IP_ADDRESS,
631                         ), # need successful connect(2)
632                 "successfully connected to deaf listener")
633
634     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
635                         self.deaflistenerport,
636                         timeout=constants.TCP_PING_TIMEOUT,
637                         live_port_needed=True,
638                         ), # need successful connect(2)
639                 "successfully connected to deaf listener (no source addr)")
640
641   def testTcpPingToLocalHostNoAccept(self):
642     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
643                          self.deaflistenerport,
644                          timeout=constants.TCP_PING_TIMEOUT,
645                          live_port_needed=False,
646                          source=constants.LOCALHOST_IP_ADDRESS,
647                          ), # ECONNREFUSED is OK
648                  "failed to ping alive host on deaf port")
649
650     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
651                          self.deaflistenerport,
652                          timeout=constants.TCP_PING_TIMEOUT,
653                          live_port_needed=False,
654                          ), # ECONNREFUSED is OK
655                  "failed to ping alive host on deaf port (no source addr)")
656
657
658 class TestOwnIpAddress(unittest.TestCase):
659   """Testcase for OwnIpAddress"""
660
661   def testOwnLoopback(self):
662     """check having the loopback ip"""
663     self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
664                     "Should own the loopback address")
665
666   def testNowOwnAddress(self):
667     """check that I don't own an address"""
668
669     # network 192.0.2.0/24 is reserved for test/documentation as per
670     # rfc 3330, so we *should* not have an address of this range... if
671     # this fails, we should extend the test to multiple addresses
672     DST_IP = "192.0.2.1"
673     self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
674
675
676 class TestListVisibleFiles(unittest.TestCase):
677   """Test case for ListVisibleFiles"""
678
679   def setUp(self):
680     self.path = tempfile.mkdtemp()
681
682   def tearDown(self):
683     shutil.rmtree(self.path)
684
685   def _test(self, files, expected):
686     # Sort a copy
687     expected = expected[:]
688     expected.sort()
689
690     for name in files:
691       f = open(os.path.join(self.path, name), 'w')
692       try:
693         f.write("Test\n")
694       finally:
695         f.close()
696
697     found = ListVisibleFiles(self.path)
698     found.sort()
699
700     self.assertEqual(found, expected)
701
702   def testAllVisible(self):
703     files = ["a", "b", "c"]
704     expected = files
705     self._test(files, expected)
706
707   def testNoneVisible(self):
708     files = [".a", ".b", ".c"]
709     expected = []
710     self._test(files, expected)
711
712   def testSomeVisible(self):
713     files = ["a", "b", ".c"]
714     expected = ["a", "b"]
715     self._test(files, expected)
716
717
718 class TestNewUUID(unittest.TestCase):
719   """Test case for NewUUID"""
720
721   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
722                         '[a-f0-9]{4}-[a-f0-9]{12}$')
723
724   def runTest(self):
725     self.failUnless(self._re_uuid.match(utils.NewUUID()))
726
727
728 class TestUniqueSequence(unittest.TestCase):
729   """Test case for UniqueSequence"""
730
731   def _test(self, input, expected):
732     self.assertEqual(utils.UniqueSequence(input), expected)
733
734   def runTest(self):
735     # Ordered input
736     self._test([1, 2, 3], [1, 2, 3])
737     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
738     self._test([1, 2, 2, 3], [1, 2, 3])
739     self._test([1, 2, 3, 3], [1, 2, 3])
740
741     # Unordered input
742     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
743     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
744
745     # Strings
746     self._test(["a", "a"], ["a"])
747     self._test(["a", "b"], ["a", "b"])
748     self._test(["a", "b", "a"], ["a", "b"])
749
750
751 class TestFirstFree(unittest.TestCase):
752   """Test case for the FirstFree function"""
753
754   def test(self):
755     """Test FirstFree"""
756     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
757     self.failUnlessEqual(FirstFree([]), None)
758     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
759     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
760     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
761
762
763 class TestFileLock(unittest.TestCase):
764   """Test case for the FileLock class"""
765
766   def setUp(self):
767     self.tmpfile = tempfile.NamedTemporaryFile()
768     self.lock = utils.FileLock(self.tmpfile.name)
769
770   def testSharedNonblocking(self):
771     self.lock.Shared(blocking=False)
772     self.lock.Close()
773
774   def testExclusiveNonblocking(self):
775     self.lock.Exclusive(blocking=False)
776     self.lock.Close()
777
778   def testUnlockNonblocking(self):
779     self.lock.Unlock(blocking=False)
780     self.lock.Close()
781
782   def testSharedBlocking(self):
783     self.lock.Shared(blocking=True)
784     self.lock.Close()
785
786   def testExclusiveBlocking(self):
787     self.lock.Exclusive(blocking=True)
788     self.lock.Close()
789
790   def testUnlockBlocking(self):
791     self.lock.Unlock(blocking=True)
792     self.lock.Close()
793
794   def testSharedExclusiveUnlock(self):
795     self.lock.Shared(blocking=False)
796     self.lock.Exclusive(blocking=False)
797     self.lock.Unlock(blocking=False)
798     self.lock.Close()
799
800   def testExclusiveSharedUnlock(self):
801     self.lock.Exclusive(blocking=False)
802     self.lock.Shared(blocking=False)
803     self.lock.Unlock(blocking=False)
804     self.lock.Close()
805
806   def testCloseShared(self):
807     self.lock.Close()
808     self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
809
810   def testCloseExclusive(self):
811     self.lock.Close()
812     self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
813
814   def testCloseUnlock(self):
815     self.lock.Close()
816     self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
817
818
819 class TestTimeFunctions(unittest.TestCase):
820   """Test case for time functions"""
821
822   def runTest(self):
823     self.assertEqual(utils.SplitTime(1), (1, 0))
824     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
825     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
826     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
827     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
828     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
829     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
830     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
831
832     self.assertRaises(AssertionError, utils.SplitTime, -1)
833
834     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
835     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
836     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
837
838     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
839     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
840
841     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
842     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
843     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
844     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
845     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
846
847
848 class FieldSetTestCase(unittest.TestCase):
849   """Test case for FieldSets"""
850
851   def testSimpleMatch(self):
852     f = utils.FieldSet("a", "b", "c", "def")
853     self.failUnless(f.Matches("a"))
854     self.failIf(f.Matches("d"), "Substring matched")
855     self.failIf(f.Matches("defghi"), "Prefix string matched")
856     self.failIf(f.NonMatching(["b", "c"]))
857     self.failIf(f.NonMatching(["a", "b", "c", "def"]))
858     self.failUnless(f.NonMatching(["a", "d"]))
859
860   def testRegexMatch(self):
861     f = utils.FieldSet("a", "b([0-9]+)", "c")
862     self.failUnless(f.Matches("b1"))
863     self.failUnless(f.Matches("b99"))
864     self.failIf(f.Matches("b/1"))
865     self.failIf(f.NonMatching(["b12", "c"]))
866     self.failUnless(f.NonMatching(["a", "1"]))
867
868
869 if __name__ == '__main__':
870   unittest.main()