Add utils.IsNormAbsPath function
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import signal
32 import socket
33 import shutil
34 import re
35 import select
36
37 import ganeti
38 import testutils
39 from ganeti import constants
40 from ganeti import utils
41 from ganeti import errors
42 from ganeti.utils import IsProcessAlive, RunCmd, \
43      RemoveFile, CheckDict, MatchNameComponent, FormatUnit, \
44      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
45      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
46      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
47      TailFile, ForceDictType, IsNormAbsPath
48
49 from ganeti.errors import LockError, UnitParseError, GenericError, \
50      ProgrammerError
51
52
53 class TestIsProcessAlive(unittest.TestCase):
54   """Testing case for IsProcessAlive"""
55
56   def testExists(self):
57     mypid = os.getpid()
58     self.assert_(IsProcessAlive(mypid),
59                  "can't find myself running")
60
61   def testNotExisting(self):
62     pid_non_existing = os.fork()
63     if pid_non_existing == 0:
64       os._exit(0)
65     elif pid_non_existing < 0:
66       raise SystemError("can't fork")
67     os.waitpid(pid_non_existing, 0)
68     self.assert_(not IsProcessAlive(pid_non_existing),
69                  "nonexisting process detected")
70
71
72 class TestPidFileFunctions(unittest.TestCase):
73   """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
74
75   def setUp(self):
76     self.dir = tempfile.mkdtemp()
77     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
78     utils.DaemonPidFileName = self.f_dpn
79
80   def testPidFileFunctions(self):
81     pid_file = self.f_dpn('test')
82     utils.WritePidFile('test')
83     self.failUnless(os.path.exists(pid_file),
84                     "PID file should have been created")
85     read_pid = utils.ReadPidFile(pid_file)
86     self.failUnlessEqual(read_pid, os.getpid())
87     self.failUnless(utils.IsProcessAlive(read_pid))
88     self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
89     utils.RemovePidFile('test')
90     self.failIf(os.path.exists(pid_file),
91                 "PID file should not exist anymore")
92     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
93                          "ReadPidFile should return 0 for missing pid file")
94     fh = open(pid_file, "w")
95     fh.write("blah\n")
96     fh.close()
97     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
98                          "ReadPidFile should return 0 for invalid pid file")
99     utils.RemovePidFile('test')
100     self.failIf(os.path.exists(pid_file),
101                 "PID file should not exist anymore")
102
103   def testKill(self):
104     pid_file = self.f_dpn('child')
105     r_fd, w_fd = os.pipe()
106     new_pid = os.fork()
107     if new_pid == 0: #child
108       utils.WritePidFile('child')
109       os.write(w_fd, 'a')
110       signal.pause()
111       os._exit(0)
112       return
113     # else we are in the parent
114     # wait until the child has written the pid file
115     os.read(r_fd, 1)
116     read_pid = utils.ReadPidFile(pid_file)
117     self.failUnlessEqual(read_pid, new_pid)
118     self.failUnless(utils.IsProcessAlive(new_pid))
119     utils.KillProcess(new_pid, waitpid=True)
120     self.failIf(utils.IsProcessAlive(new_pid))
121     utils.RemovePidFile('child')
122     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
123
124   def tearDown(self):
125     for name in os.listdir(self.dir):
126       os.unlink(os.path.join(self.dir, name))
127     os.rmdir(self.dir)
128
129
130 class TestRunCmd(testutils.GanetiTestCase):
131   """Testing case for the RunCmd function"""
132
133   def setUp(self):
134     testutils.GanetiTestCase.setUp(self)
135     self.magic = time.ctime() + " ganeti test"
136     self.fname = self._CreateTempFile()
137
138   def testOk(self):
139     """Test successful exit code"""
140     result = RunCmd("/bin/sh -c 'exit 0'")
141     self.assertEqual(result.exit_code, 0)
142     self.assertEqual(result.output, "")
143
144   def testFail(self):
145     """Test fail exit code"""
146     result = RunCmd("/bin/sh -c 'exit 1'")
147     self.assertEqual(result.exit_code, 1)
148     self.assertEqual(result.output, "")
149
150   def testStdout(self):
151     """Test standard output"""
152     cmd = 'echo -n "%s"' % self.magic
153     result = RunCmd("/bin/sh -c '%s'" % cmd)
154     self.assertEqual(result.stdout, self.magic)
155     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
156     self.assertEqual(result.output, "")
157     self.assertFileContent(self.fname, self.magic)
158
159   def testStderr(self):
160     """Test standard error"""
161     cmd = 'echo -n "%s"' % self.magic
162     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
163     self.assertEqual(result.stderr, self.magic)
164     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
165     self.assertEqual(result.output, "")
166     self.assertFileContent(self.fname, self.magic)
167
168   def testCombined(self):
169     """Test combined output"""
170     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
171     expected = "A" + self.magic + "B" + self.magic
172     result = RunCmd("/bin/sh -c '%s'" % cmd)
173     self.assertEqual(result.output, expected)
174     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
175     self.assertEqual(result.output, "")
176     self.assertFileContent(self.fname, expected)
177
178   def testSignal(self):
179     """Test signal"""
180     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
181     self.assertEqual(result.signal, 15)
182     self.assertEqual(result.output, "")
183
184   def testListRun(self):
185     """Test list runs"""
186     result = RunCmd(["true"])
187     self.assertEqual(result.signal, None)
188     self.assertEqual(result.exit_code, 0)
189     result = RunCmd(["/bin/sh", "-c", "exit 1"])
190     self.assertEqual(result.signal, None)
191     self.assertEqual(result.exit_code, 1)
192     result = RunCmd(["echo", "-n", self.magic])
193     self.assertEqual(result.signal, None)
194     self.assertEqual(result.exit_code, 0)
195     self.assertEqual(result.stdout, self.magic)
196
197   def testFileEmptyOutput(self):
198     """Test file output"""
199     result = RunCmd(["true"], output=self.fname)
200     self.assertEqual(result.signal, None)
201     self.assertEqual(result.exit_code, 0)
202     self.assertFileContent(self.fname, "")
203
204   def testLang(self):
205     """Test locale environment"""
206     old_env = os.environ.copy()
207     try:
208       os.environ["LANG"] = "en_US.UTF-8"
209       os.environ["LC_ALL"] = "en_US.UTF-8"
210       result = RunCmd(["locale"])
211       for line in result.output.splitlines():
212         key, value = line.split("=", 1)
213         # Ignore these variables, they're overridden by LC_ALL
214         if key == "LANG" or key == "LANGUAGE":
215           continue
216         self.failIf(value and value != "C" and value != '"C"',
217             "Variable %s is set to the invalid value '%s'" % (key, value))
218     finally:
219       os.environ = old_env
220
221   def testDefaultCwd(self):
222     """Test default working directory"""
223     self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
224
225   def testCwd(self):
226     """Test default working directory"""
227     self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
228     self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
229     cwd = os.getcwd()
230     self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
231
232
233 class TestRemoveFile(unittest.TestCase):
234   """Test case for the RemoveFile function"""
235
236   def setUp(self):
237     """Create a temp dir and file for each case"""
238     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
239     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
240     os.close(fd)
241
242   def tearDown(self):
243     if os.path.exists(self.tmpfile):
244       os.unlink(self.tmpfile)
245     os.rmdir(self.tmpdir)
246
247
248   def testIgnoreDirs(self):
249     """Test that RemoveFile() ignores directories"""
250     self.assertEqual(None, RemoveFile(self.tmpdir))
251
252
253   def testIgnoreNotExisting(self):
254     """Test that RemoveFile() ignores non-existing files"""
255     RemoveFile(self.tmpfile)
256     RemoveFile(self.tmpfile)
257
258
259   def testRemoveFile(self):
260     """Test that RemoveFile does remove a file"""
261     RemoveFile(self.tmpfile)
262     if os.path.exists(self.tmpfile):
263       self.fail("File '%s' not removed" % self.tmpfile)
264
265
266   def testRemoveSymlink(self):
267     """Test that RemoveFile does remove symlinks"""
268     symlink = self.tmpdir + "/symlink"
269     os.symlink("no-such-file", symlink)
270     RemoveFile(symlink)
271     if os.path.exists(symlink):
272       self.fail("File '%s' not removed" % symlink)
273     os.symlink(self.tmpfile, symlink)
274     RemoveFile(symlink)
275     if os.path.exists(symlink):
276       self.fail("File '%s' not removed" % symlink)
277
278
279 class TestRename(unittest.TestCase):
280   """Test case for RenameFile"""
281
282   def setUp(self):
283     """Create a temporary directory"""
284     self.tmpdir = tempfile.mkdtemp()
285     self.tmpfile = os.path.join(self.tmpdir, "test1")
286
287     # Touch the file
288     open(self.tmpfile, "w").close()
289
290   def tearDown(self):
291     """Remove temporary directory"""
292     shutil.rmtree(self.tmpdir)
293
294   def testSimpleRename1(self):
295     """Simple rename 1"""
296     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
297
298   def testSimpleRename2(self):
299     """Simple rename 2"""
300     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
301                      mkdir=True)
302
303   def testRenameMkdir(self):
304     """Rename with mkdir"""
305     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
306                      mkdir=True)
307
308
309 class TestCheckdict(unittest.TestCase):
310   """Test case for the CheckDict function"""
311
312   def testAdd(self):
313     """Test that CheckDict adds a missing key with the correct value"""
314
315     tgt = {'a':1}
316     tmpl = {'b': 2}
317     CheckDict(tgt, tmpl)
318     if 'b' not in tgt or tgt['b'] != 2:
319       self.fail("Failed to update dict")
320
321
322   def testNoUpdate(self):
323     """Test that CheckDict does not overwrite an existing key"""
324     tgt = {'a':1, 'b': 3}
325     tmpl = {'b': 2}
326     CheckDict(tgt, tmpl)
327     self.failUnlessEqual(tgt['b'], 3)
328
329
330 class TestMatchNameComponent(unittest.TestCase):
331   """Test case for the MatchNameComponent function"""
332
333   def testEmptyList(self):
334     """Test that there is no match against an empty list"""
335
336     self.failUnlessEqual(MatchNameComponent("", []), None)
337     self.failUnlessEqual(MatchNameComponent("test", []), None)
338
339   def testSingleMatch(self):
340     """Test that a single match is performed correctly"""
341     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
342     for key in "test2", "test2.example", "test2.example.com":
343       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
344
345   def testMultipleMatches(self):
346     """Test that a multiple match is returned as None"""
347     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
348     for key in "test1", "test1.example":
349       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
350
351
352 class TestFormatUnit(unittest.TestCase):
353   """Test case for the FormatUnit function"""
354
355   def testMiB(self):
356     self.assertEqual(FormatUnit(1, 'h'), '1M')
357     self.assertEqual(FormatUnit(100, 'h'), '100M')
358     self.assertEqual(FormatUnit(1023, 'h'), '1023M')
359
360     self.assertEqual(FormatUnit(1, 'm'), '1')
361     self.assertEqual(FormatUnit(100, 'm'), '100')
362     self.assertEqual(FormatUnit(1023, 'm'), '1023')
363
364     self.assertEqual(FormatUnit(1024, 'm'), '1024')
365     self.assertEqual(FormatUnit(1536, 'm'), '1536')
366     self.assertEqual(FormatUnit(17133, 'm'), '17133')
367     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
368
369   def testGiB(self):
370     self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
371     self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
372     self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
373     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
374
375     self.assertEqual(FormatUnit(1024, 'g'), '1.0')
376     self.assertEqual(FormatUnit(1536, 'g'), '1.5')
377     self.assertEqual(FormatUnit(17133, 'g'), '16.7')
378     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
379
380     self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
381     self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
382     self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
383
384   def testTiB(self):
385     self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
386     self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
387     self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
388
389     self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
390     self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
391     self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
392
393 class TestParseUnit(unittest.TestCase):
394   """Test case for the ParseUnit function"""
395
396   SCALES = (('', 1),
397             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
398             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
399             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
400
401   def testRounding(self):
402     self.assertEqual(ParseUnit('0'), 0)
403     self.assertEqual(ParseUnit('1'), 4)
404     self.assertEqual(ParseUnit('2'), 4)
405     self.assertEqual(ParseUnit('3'), 4)
406
407     self.assertEqual(ParseUnit('124'), 124)
408     self.assertEqual(ParseUnit('125'), 128)
409     self.assertEqual(ParseUnit('126'), 128)
410     self.assertEqual(ParseUnit('127'), 128)
411     self.assertEqual(ParseUnit('128'), 128)
412     self.assertEqual(ParseUnit('129'), 132)
413     self.assertEqual(ParseUnit('130'), 132)
414
415   def testFloating(self):
416     self.assertEqual(ParseUnit('0'), 0)
417     self.assertEqual(ParseUnit('0.5'), 4)
418     self.assertEqual(ParseUnit('1.75'), 4)
419     self.assertEqual(ParseUnit('1.99'), 4)
420     self.assertEqual(ParseUnit('2.00'), 4)
421     self.assertEqual(ParseUnit('2.01'), 4)
422     self.assertEqual(ParseUnit('3.99'), 4)
423     self.assertEqual(ParseUnit('4.00'), 4)
424     self.assertEqual(ParseUnit('4.01'), 8)
425     self.assertEqual(ParseUnit('1.5G'), 1536)
426     self.assertEqual(ParseUnit('1.8G'), 1844)
427     self.assertEqual(ParseUnit('8.28T'), 8682212)
428
429   def testSuffixes(self):
430     for sep in ('', ' ', '   ', "\t", "\t "):
431       for suffix, scale in TestParseUnit.SCALES:
432         for func in (lambda x: x, str.lower, str.upper):
433           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
434                            1024 * scale)
435
436   def testInvalidInput(self):
437     for sep in ('-', '_', ',', 'a'):
438       for suffix, _ in TestParseUnit.SCALES:
439         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
440
441     for suffix, _ in TestParseUnit.SCALES:
442       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
443
444
445 class TestSshKeys(testutils.GanetiTestCase):
446   """Test case for the AddAuthorizedKey function"""
447
448   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
449   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
450            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
451
452   def setUp(self):
453     testutils.GanetiTestCase.setUp(self)
454     self.tmpname = self._CreateTempFile()
455     handle = open(self.tmpname, 'w')
456     try:
457       handle.write("%s\n" % TestSshKeys.KEY_A)
458       handle.write("%s\n" % TestSshKeys.KEY_B)
459     finally:
460       handle.close()
461
462   def testAddingNewKey(self):
463     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
464
465     self.assertFileContent(self.tmpname,
466       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
467       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
468       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
469       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
470
471   def testAddingAlmostButNotCompletelyTheSameKey(self):
472     AddAuthorizedKey(self.tmpname,
473         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
474
475     self.assertFileContent(self.tmpname,
476       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
477       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
478       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
479       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
480
481   def testAddingExistingKeyWithSomeMoreSpaces(self):
482     AddAuthorizedKey(self.tmpname,
483         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
484
485     self.assertFileContent(self.tmpname,
486       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
487       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
488       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
489
490   def testRemovingExistingKeyWithSomeMoreSpaces(self):
491     RemoveAuthorizedKey(self.tmpname,
492         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
493
494     self.assertFileContent(self.tmpname,
495       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
496       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
497
498   def testRemovingNonExistingKey(self):
499     RemoveAuthorizedKey(self.tmpname,
500         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
501
502     self.assertFileContent(self.tmpname,
503       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
504       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
505       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
506
507
508 class TestEtcHosts(testutils.GanetiTestCase):
509   """Test functions modifying /etc/hosts"""
510
511   def setUp(self):
512     testutils.GanetiTestCase.setUp(self)
513     self.tmpname = self._CreateTempFile()
514     handle = open(self.tmpname, 'w')
515     try:
516       handle.write('# This is a test file for /etc/hosts\n')
517       handle.write('127.0.0.1\tlocalhost\n')
518       handle.write('192.168.1.1 router gw\n')
519     finally:
520       handle.close()
521
522   def testSettingNewIp(self):
523     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
524
525     self.assertFileContent(self.tmpname,
526       "# This is a test file for /etc/hosts\n"
527       "127.0.0.1\tlocalhost\n"
528       "192.168.1.1 router gw\n"
529       "1.2.3.4\tmyhost.domain.tld myhost\n")
530     self.assertFileMode(self.tmpname, 0644)
531
532   def testSettingExistingIp(self):
533     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
534                      ['myhost'])
535
536     self.assertFileContent(self.tmpname,
537       "# This is a test file for /etc/hosts\n"
538       "127.0.0.1\tlocalhost\n"
539       "192.168.1.1\tmyhost.domain.tld myhost\n")
540     self.assertFileMode(self.tmpname, 0644)
541
542   def testSettingDuplicateName(self):
543     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
544
545     self.assertFileContent(self.tmpname,
546       "# This is a test file for /etc/hosts\n"
547       "127.0.0.1\tlocalhost\n"
548       "192.168.1.1 router gw\n"
549       "1.2.3.4\tmyhost\n")
550     self.assertFileMode(self.tmpname, 0644)
551
552   def testRemovingExistingHost(self):
553     RemoveEtcHostsEntry(self.tmpname, 'router')
554
555     self.assertFileContent(self.tmpname,
556       "# This is a test file for /etc/hosts\n"
557       "127.0.0.1\tlocalhost\n"
558       "192.168.1.1 gw\n")
559     self.assertFileMode(self.tmpname, 0644)
560
561   def testRemovingSingleExistingHost(self):
562     RemoveEtcHostsEntry(self.tmpname, 'localhost')
563
564     self.assertFileContent(self.tmpname,
565       "# This is a test file for /etc/hosts\n"
566       "192.168.1.1 router gw\n")
567     self.assertFileMode(self.tmpname, 0644)
568
569   def testRemovingNonExistingHost(self):
570     RemoveEtcHostsEntry(self.tmpname, 'myhost')
571
572     self.assertFileContent(self.tmpname,
573       "# This is a test file for /etc/hosts\n"
574       "127.0.0.1\tlocalhost\n"
575       "192.168.1.1 router gw\n")
576     self.assertFileMode(self.tmpname, 0644)
577
578   def testRemovingAlias(self):
579     RemoveEtcHostsEntry(self.tmpname, 'gw')
580
581     self.assertFileContent(self.tmpname,
582       "# This is a test file for /etc/hosts\n"
583       "127.0.0.1\tlocalhost\n"
584       "192.168.1.1 router\n")
585     self.assertFileMode(self.tmpname, 0644)
586
587
588 class TestShellQuoting(unittest.TestCase):
589   """Test case for shell quoting functions"""
590
591   def testShellQuote(self):
592     self.assertEqual(ShellQuote('abc'), "abc")
593     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
594     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
595     self.assertEqual(ShellQuote("a b c"), "'a b c'")
596     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
597
598   def testShellQuoteArgs(self):
599     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
600     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
601     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
602
603
604 class TestTcpPing(unittest.TestCase):
605   """Testcase for TCP version of ping - against listen(2)ing port"""
606
607   def setUp(self):
608     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
609     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
610     self.listenerport = self.listener.getsockname()[1]
611     self.listener.listen(1)
612
613   def tearDown(self):
614     self.listener.shutdown(socket.SHUT_RDWR)
615     del self.listener
616     del self.listenerport
617
618   def testTcpPingToLocalHostAccept(self):
619     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
620                          self.listenerport,
621                          timeout=10,
622                          live_port_needed=True,
623                          source=constants.LOCALHOST_IP_ADDRESS,
624                          ),
625                  "failed to connect to test listener")
626
627     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
628                          self.listenerport,
629                          timeout=10,
630                          live_port_needed=True,
631                          ),
632                  "failed to connect to test listener (no source)")
633
634
635 class TestTcpPingDeaf(unittest.TestCase):
636   """Testcase for TCP version of ping - against non listen(2)ing port"""
637
638   def setUp(self):
639     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
640     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
641     self.deaflistenerport = self.deaflistener.getsockname()[1]
642
643   def tearDown(self):
644     del self.deaflistener
645     del self.deaflistenerport
646
647   def testTcpPingToLocalHostAcceptDeaf(self):
648     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
649                         self.deaflistenerport,
650                         timeout=constants.TCP_PING_TIMEOUT,
651                         live_port_needed=True,
652                         source=constants.LOCALHOST_IP_ADDRESS,
653                         ), # need successful connect(2)
654                 "successfully connected to deaf listener")
655
656     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
657                         self.deaflistenerport,
658                         timeout=constants.TCP_PING_TIMEOUT,
659                         live_port_needed=True,
660                         ), # need successful connect(2)
661                 "successfully connected to deaf listener (no source addr)")
662
663   def testTcpPingToLocalHostNoAccept(self):
664     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
665                          self.deaflistenerport,
666                          timeout=constants.TCP_PING_TIMEOUT,
667                          live_port_needed=False,
668                          source=constants.LOCALHOST_IP_ADDRESS,
669                          ), # ECONNREFUSED is OK
670                  "failed to ping alive host on deaf port")
671
672     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
673                          self.deaflistenerport,
674                          timeout=constants.TCP_PING_TIMEOUT,
675                          live_port_needed=False,
676                          ), # ECONNREFUSED is OK
677                  "failed to ping alive host on deaf port (no source addr)")
678
679
680 class TestOwnIpAddress(unittest.TestCase):
681   """Testcase for OwnIpAddress"""
682
683   def testOwnLoopback(self):
684     """check having the loopback ip"""
685     self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
686                     "Should own the loopback address")
687
688   def testNowOwnAddress(self):
689     """check that I don't own an address"""
690
691     # network 192.0.2.0/24 is reserved for test/documentation as per
692     # rfc 3330, so we *should* not have an address of this range... if
693     # this fails, we should extend the test to multiple addresses
694     DST_IP = "192.0.2.1"
695     self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
696
697
698 class TestListVisibleFiles(unittest.TestCase):
699   """Test case for ListVisibleFiles"""
700
701   def setUp(self):
702     self.path = tempfile.mkdtemp()
703
704   def tearDown(self):
705     shutil.rmtree(self.path)
706
707   def _test(self, files, expected):
708     # Sort a copy
709     expected = expected[:]
710     expected.sort()
711
712     for name in files:
713       f = open(os.path.join(self.path, name), 'w')
714       try:
715         f.write("Test\n")
716       finally:
717         f.close()
718
719     found = ListVisibleFiles(self.path)
720     found.sort()
721
722     self.assertEqual(found, expected)
723
724   def testAllVisible(self):
725     files = ["a", "b", "c"]
726     expected = files
727     self._test(files, expected)
728
729   def testNoneVisible(self):
730     files = [".a", ".b", ".c"]
731     expected = []
732     self._test(files, expected)
733
734   def testSomeVisible(self):
735     files = ["a", "b", ".c"]
736     expected = ["a", "b"]
737     self._test(files, expected)
738
739
740 class TestNewUUID(unittest.TestCase):
741   """Test case for NewUUID"""
742
743   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
744                         '[a-f0-9]{4}-[a-f0-9]{12}$')
745
746   def runTest(self):
747     self.failUnless(self._re_uuid.match(utils.NewUUID()))
748
749
750 class TestUniqueSequence(unittest.TestCase):
751   """Test case for UniqueSequence"""
752
753   def _test(self, input, expected):
754     self.assertEqual(utils.UniqueSequence(input), expected)
755
756   def runTest(self):
757     # Ordered input
758     self._test([1, 2, 3], [1, 2, 3])
759     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
760     self._test([1, 2, 2, 3], [1, 2, 3])
761     self._test([1, 2, 3, 3], [1, 2, 3])
762
763     # Unordered input
764     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
765     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
766
767     # Strings
768     self._test(["a", "a"], ["a"])
769     self._test(["a", "b"], ["a", "b"])
770     self._test(["a", "b", "a"], ["a", "b"])
771
772
773 class TestFirstFree(unittest.TestCase):
774   """Test case for the FirstFree function"""
775
776   def test(self):
777     """Test FirstFree"""
778     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
779     self.failUnlessEqual(FirstFree([]), None)
780     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
781     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
782     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
783
784
785 class TestTailFile(testutils.GanetiTestCase):
786   """Test case for the TailFile function"""
787
788   def testEmpty(self):
789     fname = self._CreateTempFile()
790     self.failUnlessEqual(TailFile(fname), [])
791     self.failUnlessEqual(TailFile(fname, lines=25), [])
792
793   def testAllLines(self):
794     data = ["test %d" % i for i in range(30)]
795     for i in range(30):
796       fname = self._CreateTempFile()
797       fd = open(fname, "w")
798       fd.write("\n".join(data[:i]))
799       if i > 0:
800         fd.write("\n")
801       fd.close()
802       self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
803
804   def testPartialLines(self):
805     data = ["test %d" % i for i in range(30)]
806     fname = self._CreateTempFile()
807     fd = open(fname, "w")
808     fd.write("\n".join(data))
809     fd.write("\n")
810     fd.close()
811     for i in range(1, 30):
812       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
813
814   def testBigFile(self):
815     data = ["test %d" % i for i in range(30)]
816     fname = self._CreateTempFile()
817     fd = open(fname, "w")
818     fd.write("X" * 1048576)
819     fd.write("\n")
820     fd.write("\n".join(data))
821     fd.write("\n")
822     fd.close()
823     for i in range(1, 30):
824       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
825
826
827 class TestFileLock(unittest.TestCase):
828   """Test case for the FileLock class"""
829
830   def setUp(self):
831     self.tmpfile = tempfile.NamedTemporaryFile()
832     self.lock = utils.FileLock(self.tmpfile.name)
833
834   def testSharedNonblocking(self):
835     self.lock.Shared(blocking=False)
836     self.lock.Close()
837
838   def testExclusiveNonblocking(self):
839     self.lock.Exclusive(blocking=False)
840     self.lock.Close()
841
842   def testUnlockNonblocking(self):
843     self.lock.Unlock(blocking=False)
844     self.lock.Close()
845
846   def testSharedBlocking(self):
847     self.lock.Shared(blocking=True)
848     self.lock.Close()
849
850   def testExclusiveBlocking(self):
851     self.lock.Exclusive(blocking=True)
852     self.lock.Close()
853
854   def testUnlockBlocking(self):
855     self.lock.Unlock(blocking=True)
856     self.lock.Close()
857
858   def testSharedExclusiveUnlock(self):
859     self.lock.Shared(blocking=False)
860     self.lock.Exclusive(blocking=False)
861     self.lock.Unlock(blocking=False)
862     self.lock.Close()
863
864   def testExclusiveSharedUnlock(self):
865     self.lock.Exclusive(blocking=False)
866     self.lock.Shared(blocking=False)
867     self.lock.Unlock(blocking=False)
868     self.lock.Close()
869
870   def testCloseShared(self):
871     self.lock.Close()
872     self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
873
874   def testCloseExclusive(self):
875     self.lock.Close()
876     self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
877
878   def testCloseUnlock(self):
879     self.lock.Close()
880     self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
881
882
883 class TestTimeFunctions(unittest.TestCase):
884   """Test case for time functions"""
885
886   def runTest(self):
887     self.assertEqual(utils.SplitTime(1), (1, 0))
888     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
889     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
890     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
891     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
892     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
893     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
894     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
895
896     self.assertRaises(AssertionError, utils.SplitTime, -1)
897
898     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
899     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
900     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
901
902     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
903     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
904
905     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
906     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
907     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
908     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
909     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
910
911
912 class FieldSetTestCase(unittest.TestCase):
913   """Test case for FieldSets"""
914
915   def testSimpleMatch(self):
916     f = utils.FieldSet("a", "b", "c", "def")
917     self.failUnless(f.Matches("a"))
918     self.failIf(f.Matches("d"), "Substring matched")
919     self.failIf(f.Matches("defghi"), "Prefix string matched")
920     self.failIf(f.NonMatching(["b", "c"]))
921     self.failIf(f.NonMatching(["a", "b", "c", "def"]))
922     self.failUnless(f.NonMatching(["a", "d"]))
923
924   def testRegexMatch(self):
925     f = utils.FieldSet("a", "b([0-9]+)", "c")
926     self.failUnless(f.Matches("b1"))
927     self.failUnless(f.Matches("b99"))
928     self.failIf(f.Matches("b/1"))
929     self.failIf(f.NonMatching(["b12", "c"]))
930     self.failUnless(f.NonMatching(["a", "1"]))
931
932 class TestForceDictType(unittest.TestCase):
933   """Test case for ForceDictType"""
934
935   def setUp(self):
936     self.key_types = {
937       'a': constants.VTYPE_INT,
938       'b': constants.VTYPE_BOOL,
939       'c': constants.VTYPE_STRING,
940       'd': constants.VTYPE_SIZE,
941       }
942
943   def _fdt(self, dict, allowed_values=None):
944     if allowed_values is None:
945       ForceDictType(dict, self.key_types)
946     else:
947       ForceDictType(dict, self.key_types, allowed_values=allowed_values)
948
949     return dict
950
951   def testSimpleDict(self):
952     self.assertEqual(self._fdt({}), {})
953     self.assertEqual(self._fdt({'a': 1}), {'a': 1})
954     self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
955     self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
956     self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
957     self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
958     self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
959     self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
960     self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
961     self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
962     self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
963     self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
964
965   def testErrors(self):
966     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
967     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
968     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
969     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
970
971
972 class TestIsAbsNormPath(unittest.TestCase):
973   """Testing case for IsProcessAlive"""
974
975   def _pathTestHelper(self, path, result):
976     if result:
977       self.assert_(IsNormAbsPath(path),
978           "Path %s should be absolute and normal" % path)
979     else:
980       self.assert_(not IsNormAbsPath(path),
981           "Path %s should not be absolute and normal" % path)
982
983   def testBase(self):
984     self._pathTestHelper('/etc', True)
985     self._pathTestHelper('/srv', True)
986     self._pathTestHelper('etc', False)
987     self._pathTestHelper('/etc/../root', False)
988     self._pathTestHelper('/etc/', False)
989
990 if __name__ == '__main__':
991   unittest.main()