Add a configuration verify check for duplicate IPs
[ganeti-local] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import unittest
25 import os
26 import time
27 import tempfile
28 import os.path
29 import os
30 import md5
31 import signal
32 import socket
33 import shutil
34 import re
35 import select
36 import string
37
38 import ganeti
39 import testutils
40 from ganeti import constants
41 from ganeti import utils
42 from ganeti import errors
43 from ganeti.utils import IsProcessAlive, RunCmd, \
44      RemoveFile, MatchNameComponent, FormatUnit, \
45      ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
46      ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
47      SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
48      TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime
49
50 from ganeti.errors import LockError, UnitParseError, GenericError, \
51      ProgrammerError
52
53
54 class TestIsProcessAlive(unittest.TestCase):
55   """Testing case for IsProcessAlive"""
56
57   def testExists(self):
58     mypid = os.getpid()
59     self.assert_(IsProcessAlive(mypid),
60                  "can't find myself running")
61
62   def testNotExisting(self):
63     pid_non_existing = os.fork()
64     if pid_non_existing == 0:
65       os._exit(0)
66     elif pid_non_existing < 0:
67       raise SystemError("can't fork")
68     os.waitpid(pid_non_existing, 0)
69     self.assert_(not IsProcessAlive(pid_non_existing),
70                  "nonexisting process detected")
71
72
73 class TestPidFileFunctions(unittest.TestCase):
74   """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
75
76   def setUp(self):
77     self.dir = tempfile.mkdtemp()
78     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
79     utils.DaemonPidFileName = self.f_dpn
80
81   def testPidFileFunctions(self):
82     pid_file = self.f_dpn('test')
83     utils.WritePidFile('test')
84     self.failUnless(os.path.exists(pid_file),
85                     "PID file should have been created")
86     read_pid = utils.ReadPidFile(pid_file)
87     self.failUnlessEqual(read_pid, os.getpid())
88     self.failUnless(utils.IsProcessAlive(read_pid))
89     self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
90     utils.RemovePidFile('test')
91     self.failIf(os.path.exists(pid_file),
92                 "PID file should not exist anymore")
93     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
94                          "ReadPidFile should return 0 for missing pid file")
95     fh = open(pid_file, "w")
96     fh.write("blah\n")
97     fh.close()
98     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
99                          "ReadPidFile should return 0 for invalid pid file")
100     utils.RemovePidFile('test')
101     self.failIf(os.path.exists(pid_file),
102                 "PID file should not exist anymore")
103
104   def testKill(self):
105     pid_file = self.f_dpn('child')
106     r_fd, w_fd = os.pipe()
107     new_pid = os.fork()
108     if new_pid == 0: #child
109       utils.WritePidFile('child')
110       os.write(w_fd, 'a')
111       signal.pause()
112       os._exit(0)
113       return
114     # else we are in the parent
115     # wait until the child has written the pid file
116     os.read(r_fd, 1)
117     read_pid = utils.ReadPidFile(pid_file)
118     self.failUnlessEqual(read_pid, new_pid)
119     self.failUnless(utils.IsProcessAlive(new_pid))
120     utils.KillProcess(new_pid, waitpid=True)
121     self.failIf(utils.IsProcessAlive(new_pid))
122     utils.RemovePidFile('child')
123     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
124
125   def tearDown(self):
126     for name in os.listdir(self.dir):
127       os.unlink(os.path.join(self.dir, name))
128     os.rmdir(self.dir)
129
130
131 class TestRunCmd(testutils.GanetiTestCase):
132   """Testing case for the RunCmd function"""
133
134   def setUp(self):
135     testutils.GanetiTestCase.setUp(self)
136     self.magic = time.ctime() + " ganeti test"
137     self.fname = self._CreateTempFile()
138
139   def testOk(self):
140     """Test successful exit code"""
141     result = RunCmd("/bin/sh -c 'exit 0'")
142     self.assertEqual(result.exit_code, 0)
143     self.assertEqual(result.output, "")
144
145   def testFail(self):
146     """Test fail exit code"""
147     result = RunCmd("/bin/sh -c 'exit 1'")
148     self.assertEqual(result.exit_code, 1)
149     self.assertEqual(result.output, "")
150
151   def testStdout(self):
152     """Test standard output"""
153     cmd = 'echo -n "%s"' % self.magic
154     result = RunCmd("/bin/sh -c '%s'" % cmd)
155     self.assertEqual(result.stdout, self.magic)
156     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157     self.assertEqual(result.output, "")
158     self.assertFileContent(self.fname, self.magic)
159
160   def testStderr(self):
161     """Test standard error"""
162     cmd = 'echo -n "%s"' % self.magic
163     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164     self.assertEqual(result.stderr, self.magic)
165     result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166     self.assertEqual(result.output, "")
167     self.assertFileContent(self.fname, self.magic)
168
169   def testCombined(self):
170     """Test combined output"""
171     cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172     expected = "A" + self.magic + "B" + self.magic
173     result = RunCmd("/bin/sh -c '%s'" % cmd)
174     self.assertEqual(result.output, expected)
175     result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176     self.assertEqual(result.output, "")
177     self.assertFileContent(self.fname, expected)
178
179   def testSignal(self):
180     """Test signal"""
181     result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182     self.assertEqual(result.signal, 15)
183     self.assertEqual(result.output, "")
184
185   def testListRun(self):
186     """Test list runs"""
187     result = RunCmd(["true"])
188     self.assertEqual(result.signal, None)
189     self.assertEqual(result.exit_code, 0)
190     result = RunCmd(["/bin/sh", "-c", "exit 1"])
191     self.assertEqual(result.signal, None)
192     self.assertEqual(result.exit_code, 1)
193     result = RunCmd(["echo", "-n", self.magic])
194     self.assertEqual(result.signal, None)
195     self.assertEqual(result.exit_code, 0)
196     self.assertEqual(result.stdout, self.magic)
197
198   def testFileEmptyOutput(self):
199     """Test file output"""
200     result = RunCmd(["true"], output=self.fname)
201     self.assertEqual(result.signal, None)
202     self.assertEqual(result.exit_code, 0)
203     self.assertFileContent(self.fname, "")
204
205   def testLang(self):
206     """Test locale environment"""
207     old_env = os.environ.copy()
208     try:
209       os.environ["LANG"] = "en_US.UTF-8"
210       os.environ["LC_ALL"] = "en_US.UTF-8"
211       result = RunCmd(["locale"])
212       for line in result.output.splitlines():
213         key, value = line.split("=", 1)
214         # Ignore these variables, they're overridden by LC_ALL
215         if key == "LANG" or key == "LANGUAGE":
216           continue
217         self.failIf(value and value != "C" and value != '"C"',
218             "Variable %s is set to the invalid value '%s'" % (key, value))
219     finally:
220       os.environ = old_env
221
222   def testDefaultCwd(self):
223     """Test default working directory"""
224     self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
225
226   def testCwd(self):
227     """Test default working directory"""
228     self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229     self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
230     cwd = os.getcwd()
231     self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
232
233
234 class TestRemoveFile(unittest.TestCase):
235   """Test case for the RemoveFile function"""
236
237   def setUp(self):
238     """Create a temp dir and file for each case"""
239     self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240     fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
241     os.close(fd)
242
243   def tearDown(self):
244     if os.path.exists(self.tmpfile):
245       os.unlink(self.tmpfile)
246     os.rmdir(self.tmpdir)
247
248
249   def testIgnoreDirs(self):
250     """Test that RemoveFile() ignores directories"""
251     self.assertEqual(None, RemoveFile(self.tmpdir))
252
253
254   def testIgnoreNotExisting(self):
255     """Test that RemoveFile() ignores non-existing files"""
256     RemoveFile(self.tmpfile)
257     RemoveFile(self.tmpfile)
258
259
260   def testRemoveFile(self):
261     """Test that RemoveFile does remove a file"""
262     RemoveFile(self.tmpfile)
263     if os.path.exists(self.tmpfile):
264       self.fail("File '%s' not removed" % self.tmpfile)
265
266
267   def testRemoveSymlink(self):
268     """Test that RemoveFile does remove symlinks"""
269     symlink = self.tmpdir + "/symlink"
270     os.symlink("no-such-file", symlink)
271     RemoveFile(symlink)
272     if os.path.exists(symlink):
273       self.fail("File '%s' not removed" % symlink)
274     os.symlink(self.tmpfile, symlink)
275     RemoveFile(symlink)
276     if os.path.exists(symlink):
277       self.fail("File '%s' not removed" % symlink)
278
279
280 class TestRename(unittest.TestCase):
281   """Test case for RenameFile"""
282
283   def setUp(self):
284     """Create a temporary directory"""
285     self.tmpdir = tempfile.mkdtemp()
286     self.tmpfile = os.path.join(self.tmpdir, "test1")
287
288     # Touch the file
289     open(self.tmpfile, "w").close()
290
291   def tearDown(self):
292     """Remove temporary directory"""
293     shutil.rmtree(self.tmpdir)
294
295   def testSimpleRename1(self):
296     """Simple rename 1"""
297     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
298
299   def testSimpleRename2(self):
300     """Simple rename 2"""
301     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
302                      mkdir=True)
303
304   def testRenameMkdir(self):
305     """Rename with mkdir"""
306     utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
307                      mkdir=True)
308
309
310 class TestMatchNameComponent(unittest.TestCase):
311   """Test case for the MatchNameComponent function"""
312
313   def testEmptyList(self):
314     """Test that there is no match against an empty list"""
315
316     self.failUnlessEqual(MatchNameComponent("", []), None)
317     self.failUnlessEqual(MatchNameComponent("test", []), None)
318
319   def testSingleMatch(self):
320     """Test that a single match is performed correctly"""
321     mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
322     for key in "test2", "test2.example", "test2.example.com":
323       self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
324
325   def testMultipleMatches(self):
326     """Test that a multiple match is returned as None"""
327     mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
328     for key in "test1", "test1.example":
329       self.failUnlessEqual(MatchNameComponent(key, mlist), None)
330
331   def testFullMatch(self):
332     """Test that a full match is returned correctly"""
333     key1 = "test1"
334     key2 = "test1.example"
335     mlist = [key2, key2 + ".com"]
336     self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
337     self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
338
339   def testCaseInsensitivePartialMatch(self):
340     """Test for the case_insensitive keyword"""
341     mlist = ["test1.example.com", "test2.example.net"]
342     self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
343                      "test2.example.net")
344     self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
345                      "test2.example.net")
346     self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
347                      "test2.example.net")
348     self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
349                      "test2.example.net")
350
351
352   def testCaseInsensitiveFullMatch(self):
353     mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
354     # Between the two ts1 a full string match non-case insensitive should work
355     self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
356                      None)
357     self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
358                      "ts1.ex")
359     self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
360                      "ts1.ex")
361     # Between the two ts2 only case differs, so only case-match works
362     self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
363                      "ts2.ex")
364     self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
365                      "Ts2.ex")
366     self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
367                      None)
368
369
370 class TestFormatUnit(unittest.TestCase):
371   """Test case for the FormatUnit function"""
372
373   def testMiB(self):
374     self.assertEqual(FormatUnit(1, 'h'), '1M')
375     self.assertEqual(FormatUnit(100, 'h'), '100M')
376     self.assertEqual(FormatUnit(1023, 'h'), '1023M')
377
378     self.assertEqual(FormatUnit(1, 'm'), '1')
379     self.assertEqual(FormatUnit(100, 'm'), '100')
380     self.assertEqual(FormatUnit(1023, 'm'), '1023')
381
382     self.assertEqual(FormatUnit(1024, 'm'), '1024')
383     self.assertEqual(FormatUnit(1536, 'm'), '1536')
384     self.assertEqual(FormatUnit(17133, 'm'), '17133')
385     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
386
387   def testGiB(self):
388     self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
389     self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
390     self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
391     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
392
393     self.assertEqual(FormatUnit(1024, 'g'), '1.0')
394     self.assertEqual(FormatUnit(1536, 'g'), '1.5')
395     self.assertEqual(FormatUnit(17133, 'g'), '16.7')
396     self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
397
398     self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
399     self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
400     self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
401
402   def testTiB(self):
403     self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
404     self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
405     self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
406
407     self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
408     self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
409     self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
410
411 class TestParseUnit(unittest.TestCase):
412   """Test case for the ParseUnit function"""
413
414   SCALES = (('', 1),
415             ('M', 1), ('G', 1024), ('T', 1024 * 1024),
416             ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
417             ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
418
419   def testRounding(self):
420     self.assertEqual(ParseUnit('0'), 0)
421     self.assertEqual(ParseUnit('1'), 4)
422     self.assertEqual(ParseUnit('2'), 4)
423     self.assertEqual(ParseUnit('3'), 4)
424
425     self.assertEqual(ParseUnit('124'), 124)
426     self.assertEqual(ParseUnit('125'), 128)
427     self.assertEqual(ParseUnit('126'), 128)
428     self.assertEqual(ParseUnit('127'), 128)
429     self.assertEqual(ParseUnit('128'), 128)
430     self.assertEqual(ParseUnit('129'), 132)
431     self.assertEqual(ParseUnit('130'), 132)
432
433   def testFloating(self):
434     self.assertEqual(ParseUnit('0'), 0)
435     self.assertEqual(ParseUnit('0.5'), 4)
436     self.assertEqual(ParseUnit('1.75'), 4)
437     self.assertEqual(ParseUnit('1.99'), 4)
438     self.assertEqual(ParseUnit('2.00'), 4)
439     self.assertEqual(ParseUnit('2.01'), 4)
440     self.assertEqual(ParseUnit('3.99'), 4)
441     self.assertEqual(ParseUnit('4.00'), 4)
442     self.assertEqual(ParseUnit('4.01'), 8)
443     self.assertEqual(ParseUnit('1.5G'), 1536)
444     self.assertEqual(ParseUnit('1.8G'), 1844)
445     self.assertEqual(ParseUnit('8.28T'), 8682212)
446
447   def testSuffixes(self):
448     for sep in ('', ' ', '   ', "\t", "\t "):
449       for suffix, scale in TestParseUnit.SCALES:
450         for func in (lambda x: x, str.lower, str.upper):
451           self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
452                            1024 * scale)
453
454   def testInvalidInput(self):
455     for sep in ('-', '_', ',', 'a'):
456       for suffix, _ in TestParseUnit.SCALES:
457         self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
458
459     for suffix, _ in TestParseUnit.SCALES:
460       self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
461
462
463 class TestSshKeys(testutils.GanetiTestCase):
464   """Test case for the AddAuthorizedKey function"""
465
466   KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
467   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
468            'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
469
470   def setUp(self):
471     testutils.GanetiTestCase.setUp(self)
472     self.tmpname = self._CreateTempFile()
473     handle = open(self.tmpname, 'w')
474     try:
475       handle.write("%s\n" % TestSshKeys.KEY_A)
476       handle.write("%s\n" % TestSshKeys.KEY_B)
477     finally:
478       handle.close()
479
480   def testAddingNewKey(self):
481     AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
482
483     self.assertFileContent(self.tmpname,
484       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
485       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
486       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
487       "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
488
489   def testAddingAlmostButNotCompletelyTheSameKey(self):
490     AddAuthorizedKey(self.tmpname,
491         'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
492
493     self.assertFileContent(self.tmpname,
494       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
495       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
496       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
497       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
498
499   def testAddingExistingKeyWithSomeMoreSpaces(self):
500     AddAuthorizedKey(self.tmpname,
501         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
502
503     self.assertFileContent(self.tmpname,
504       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
505       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
506       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
507
508   def testRemovingExistingKeyWithSomeMoreSpaces(self):
509     RemoveAuthorizedKey(self.tmpname,
510         'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
511
512     self.assertFileContent(self.tmpname,
513       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
514       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
515
516   def testRemovingNonExistingKey(self):
517     RemoveAuthorizedKey(self.tmpname,
518         'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
519
520     self.assertFileContent(self.tmpname,
521       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
522       'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
523       " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
524
525
526 class TestEtcHosts(testutils.GanetiTestCase):
527   """Test functions modifying /etc/hosts"""
528
529   def setUp(self):
530     testutils.GanetiTestCase.setUp(self)
531     self.tmpname = self._CreateTempFile()
532     handle = open(self.tmpname, 'w')
533     try:
534       handle.write('# This is a test file for /etc/hosts\n')
535       handle.write('127.0.0.1\tlocalhost\n')
536       handle.write('192.168.1.1 router gw\n')
537     finally:
538       handle.close()
539
540   def testSettingNewIp(self):
541     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
542
543     self.assertFileContent(self.tmpname,
544       "# This is a test file for /etc/hosts\n"
545       "127.0.0.1\tlocalhost\n"
546       "192.168.1.1 router gw\n"
547       "1.2.3.4\tmyhost.domain.tld myhost\n")
548     self.assertFileMode(self.tmpname, 0644)
549
550   def testSettingExistingIp(self):
551     SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
552                      ['myhost'])
553
554     self.assertFileContent(self.tmpname,
555       "# This is a test file for /etc/hosts\n"
556       "127.0.0.1\tlocalhost\n"
557       "192.168.1.1\tmyhost.domain.tld myhost\n")
558     self.assertFileMode(self.tmpname, 0644)
559
560   def testSettingDuplicateName(self):
561     SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
562
563     self.assertFileContent(self.tmpname,
564       "# This is a test file for /etc/hosts\n"
565       "127.0.0.1\tlocalhost\n"
566       "192.168.1.1 router gw\n"
567       "1.2.3.4\tmyhost\n")
568     self.assertFileMode(self.tmpname, 0644)
569
570   def testRemovingExistingHost(self):
571     RemoveEtcHostsEntry(self.tmpname, 'router')
572
573     self.assertFileContent(self.tmpname,
574       "# This is a test file for /etc/hosts\n"
575       "127.0.0.1\tlocalhost\n"
576       "192.168.1.1 gw\n")
577     self.assertFileMode(self.tmpname, 0644)
578
579   def testRemovingSingleExistingHost(self):
580     RemoveEtcHostsEntry(self.tmpname, 'localhost')
581
582     self.assertFileContent(self.tmpname,
583       "# This is a test file for /etc/hosts\n"
584       "192.168.1.1 router gw\n")
585     self.assertFileMode(self.tmpname, 0644)
586
587   def testRemovingNonExistingHost(self):
588     RemoveEtcHostsEntry(self.tmpname, 'myhost')
589
590     self.assertFileContent(self.tmpname,
591       "# This is a test file for /etc/hosts\n"
592       "127.0.0.1\tlocalhost\n"
593       "192.168.1.1 router gw\n")
594     self.assertFileMode(self.tmpname, 0644)
595
596   def testRemovingAlias(self):
597     RemoveEtcHostsEntry(self.tmpname, 'gw')
598
599     self.assertFileContent(self.tmpname,
600       "# This is a test file for /etc/hosts\n"
601       "127.0.0.1\tlocalhost\n"
602       "192.168.1.1 router\n")
603     self.assertFileMode(self.tmpname, 0644)
604
605
606 class TestShellQuoting(unittest.TestCase):
607   """Test case for shell quoting functions"""
608
609   def testShellQuote(self):
610     self.assertEqual(ShellQuote('abc'), "abc")
611     self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
612     self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
613     self.assertEqual(ShellQuote("a b c"), "'a b c'")
614     self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
615
616   def testShellQuoteArgs(self):
617     self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
618     self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
619     self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
620
621
622 class TestTcpPing(unittest.TestCase):
623   """Testcase for TCP version of ping - against listen(2)ing port"""
624
625   def setUp(self):
626     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
627     self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
628     self.listenerport = self.listener.getsockname()[1]
629     self.listener.listen(1)
630
631   def tearDown(self):
632     self.listener.shutdown(socket.SHUT_RDWR)
633     del self.listener
634     del self.listenerport
635
636   def testTcpPingToLocalHostAccept(self):
637     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
638                          self.listenerport,
639                          timeout=10,
640                          live_port_needed=True,
641                          source=constants.LOCALHOST_IP_ADDRESS,
642                          ),
643                  "failed to connect to test listener")
644
645     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
646                          self.listenerport,
647                          timeout=10,
648                          live_port_needed=True,
649                          ),
650                  "failed to connect to test listener (no source)")
651
652
653 class TestTcpPingDeaf(unittest.TestCase):
654   """Testcase for TCP version of ping - against non listen(2)ing port"""
655
656   def setUp(self):
657     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
658     self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
659     self.deaflistenerport = self.deaflistener.getsockname()[1]
660
661   def tearDown(self):
662     del self.deaflistener
663     del self.deaflistenerport
664
665   def testTcpPingToLocalHostAcceptDeaf(self):
666     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
667                         self.deaflistenerport,
668                         timeout=constants.TCP_PING_TIMEOUT,
669                         live_port_needed=True,
670                         source=constants.LOCALHOST_IP_ADDRESS,
671                         ), # need successful connect(2)
672                 "successfully connected to deaf listener")
673
674     self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
675                         self.deaflistenerport,
676                         timeout=constants.TCP_PING_TIMEOUT,
677                         live_port_needed=True,
678                         ), # need successful connect(2)
679                 "successfully connected to deaf listener (no source addr)")
680
681   def testTcpPingToLocalHostNoAccept(self):
682     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
683                          self.deaflistenerport,
684                          timeout=constants.TCP_PING_TIMEOUT,
685                          live_port_needed=False,
686                          source=constants.LOCALHOST_IP_ADDRESS,
687                          ), # ECONNREFUSED is OK
688                  "failed to ping alive host on deaf port")
689
690     self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
691                          self.deaflistenerport,
692                          timeout=constants.TCP_PING_TIMEOUT,
693                          live_port_needed=False,
694                          ), # ECONNREFUSED is OK
695                  "failed to ping alive host on deaf port (no source addr)")
696
697
698 class TestOwnIpAddress(unittest.TestCase):
699   """Testcase for OwnIpAddress"""
700
701   def testOwnLoopback(self):
702     """check having the loopback ip"""
703     self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
704                     "Should own the loopback address")
705
706   def testNowOwnAddress(self):
707     """check that I don't own an address"""
708
709     # network 192.0.2.0/24 is reserved for test/documentation as per
710     # rfc 3330, so we *should* not have an address of this range... if
711     # this fails, we should extend the test to multiple addresses
712     DST_IP = "192.0.2.1"
713     self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
714
715
716 class TestListVisibleFiles(unittest.TestCase):
717   """Test case for ListVisibleFiles"""
718
719   def setUp(self):
720     self.path = tempfile.mkdtemp()
721
722   def tearDown(self):
723     shutil.rmtree(self.path)
724
725   def _test(self, files, expected):
726     # Sort a copy
727     expected = expected[:]
728     expected.sort()
729
730     for name in files:
731       f = open(os.path.join(self.path, name), 'w')
732       try:
733         f.write("Test\n")
734       finally:
735         f.close()
736
737     found = ListVisibleFiles(self.path)
738     found.sort()
739
740     self.assertEqual(found, expected)
741
742   def testAllVisible(self):
743     files = ["a", "b", "c"]
744     expected = files
745     self._test(files, expected)
746
747   def testNoneVisible(self):
748     files = [".a", ".b", ".c"]
749     expected = []
750     self._test(files, expected)
751
752   def testSomeVisible(self):
753     files = ["a", "b", ".c"]
754     expected = ["a", "b"]
755     self._test(files, expected)
756
757
758 class TestNewUUID(unittest.TestCase):
759   """Test case for NewUUID"""
760
761   _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
762                         '[a-f0-9]{4}-[a-f0-9]{12}$')
763
764   def runTest(self):
765     self.failUnless(self._re_uuid.match(utils.NewUUID()))
766
767
768 class TestUniqueSequence(unittest.TestCase):
769   """Test case for UniqueSequence"""
770
771   def _test(self, input, expected):
772     self.assertEqual(utils.UniqueSequence(input), expected)
773
774   def runTest(self):
775     # Ordered input
776     self._test([1, 2, 3], [1, 2, 3])
777     self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
778     self._test([1, 2, 2, 3], [1, 2, 3])
779     self._test([1, 2, 3, 3], [1, 2, 3])
780
781     # Unordered input
782     self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
783     self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
784
785     # Strings
786     self._test(["a", "a"], ["a"])
787     self._test(["a", "b"], ["a", "b"])
788     self._test(["a", "b", "a"], ["a", "b"])
789
790
791 class TestFirstFree(unittest.TestCase):
792   """Test case for the FirstFree function"""
793
794   def test(self):
795     """Test FirstFree"""
796     self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
797     self.failUnlessEqual(FirstFree([]), None)
798     self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
799     self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
800     self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
801
802
803 class TestTailFile(testutils.GanetiTestCase):
804   """Test case for the TailFile function"""
805
806   def testEmpty(self):
807     fname = self._CreateTempFile()
808     self.failUnlessEqual(TailFile(fname), [])
809     self.failUnlessEqual(TailFile(fname, lines=25), [])
810
811   def testAllLines(self):
812     data = ["test %d" % i for i in range(30)]
813     for i in range(30):
814       fname = self._CreateTempFile()
815       fd = open(fname, "w")
816       fd.write("\n".join(data[:i]))
817       if i > 0:
818         fd.write("\n")
819       fd.close()
820       self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
821
822   def testPartialLines(self):
823     data = ["test %d" % i for i in range(30)]
824     fname = self._CreateTempFile()
825     fd = open(fname, "w")
826     fd.write("\n".join(data))
827     fd.write("\n")
828     fd.close()
829     for i in range(1, 30):
830       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
831
832   def testBigFile(self):
833     data = ["test %d" % i for i in range(30)]
834     fname = self._CreateTempFile()
835     fd = open(fname, "w")
836     fd.write("X" * 1048576)
837     fd.write("\n")
838     fd.write("\n".join(data))
839     fd.write("\n")
840     fd.close()
841     for i in range(1, 30):
842       self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
843
844
845 class TestFileLock(unittest.TestCase):
846   """Test case for the FileLock class"""
847
848   def setUp(self):
849     self.tmpfile = tempfile.NamedTemporaryFile()
850     self.lock = utils.FileLock(self.tmpfile.name)
851
852   def testSharedNonblocking(self):
853     self.lock.Shared(blocking=False)
854     self.lock.Close()
855
856   def testExclusiveNonblocking(self):
857     self.lock.Exclusive(blocking=False)
858     self.lock.Close()
859
860   def testUnlockNonblocking(self):
861     self.lock.Unlock(blocking=False)
862     self.lock.Close()
863
864   def testSharedBlocking(self):
865     self.lock.Shared(blocking=True)
866     self.lock.Close()
867
868   def testExclusiveBlocking(self):
869     self.lock.Exclusive(blocking=True)
870     self.lock.Close()
871
872   def testUnlockBlocking(self):
873     self.lock.Unlock(blocking=True)
874     self.lock.Close()
875
876   def testSharedExclusiveUnlock(self):
877     self.lock.Shared(blocking=False)
878     self.lock.Exclusive(blocking=False)
879     self.lock.Unlock(blocking=False)
880     self.lock.Close()
881
882   def testExclusiveSharedUnlock(self):
883     self.lock.Exclusive(blocking=False)
884     self.lock.Shared(blocking=False)
885     self.lock.Unlock(blocking=False)
886     self.lock.Close()
887
888   def testCloseShared(self):
889     self.lock.Close()
890     self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
891
892   def testCloseExclusive(self):
893     self.lock.Close()
894     self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
895
896   def testCloseUnlock(self):
897     self.lock.Close()
898     self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
899
900
901 class TestTimeFunctions(unittest.TestCase):
902   """Test case for time functions"""
903
904   def runTest(self):
905     self.assertEqual(utils.SplitTime(1), (1, 0))
906     self.assertEqual(utils.SplitTime(1.5), (1, 500000))
907     self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
908     self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
909     self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
910     self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
911     self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
912     self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
913
914     self.assertRaises(AssertionError, utils.SplitTime, -1)
915
916     self.assertEqual(utils.MergeTime((1, 0)), 1.0)
917     self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
918     self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
919
920     self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
921                      1218448917.481)
922     self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
923
924     self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
925     self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
926     self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
927     self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
928     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
929
930
931 class FieldSetTestCase(unittest.TestCase):
932   """Test case for FieldSets"""
933
934   def testSimpleMatch(self):
935     f = utils.FieldSet("a", "b", "c", "def")
936     self.failUnless(f.Matches("a"))
937     self.failIf(f.Matches("d"), "Substring matched")
938     self.failIf(f.Matches("defghi"), "Prefix string matched")
939     self.failIf(f.NonMatching(["b", "c"]))
940     self.failIf(f.NonMatching(["a", "b", "c", "def"]))
941     self.failUnless(f.NonMatching(["a", "d"]))
942
943   def testRegexMatch(self):
944     f = utils.FieldSet("a", "b([0-9]+)", "c")
945     self.failUnless(f.Matches("b1"))
946     self.failUnless(f.Matches("b99"))
947     self.failIf(f.Matches("b/1"))
948     self.failIf(f.NonMatching(["b12", "c"]))
949     self.failUnless(f.NonMatching(["a", "1"]))
950
951 class TestForceDictType(unittest.TestCase):
952   """Test case for ForceDictType"""
953
954   def setUp(self):
955     self.key_types = {
956       'a': constants.VTYPE_INT,
957       'b': constants.VTYPE_BOOL,
958       'c': constants.VTYPE_STRING,
959       'd': constants.VTYPE_SIZE,
960       }
961
962   def _fdt(self, dict, allowed_values=None):
963     if allowed_values is None:
964       ForceDictType(dict, self.key_types)
965     else:
966       ForceDictType(dict, self.key_types, allowed_values=allowed_values)
967
968     return dict
969
970   def testSimpleDict(self):
971     self.assertEqual(self._fdt({}), {})
972     self.assertEqual(self._fdt({'a': 1}), {'a': 1})
973     self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
974     self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
975     self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
976     self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
977     self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
978     self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
979     self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
980     self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
981     self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
982     self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
983
984   def testErrors(self):
985     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
986     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
987     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
988     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
989
990
991 class TestIsAbsNormPath(unittest.TestCase):
992   """Testing case for IsProcessAlive"""
993
994   def _pathTestHelper(self, path, result):
995     if result:
996       self.assert_(IsNormAbsPath(path),
997           "Path %s should result absolute and normalized" % path)
998     else:
999       self.assert_(not IsNormAbsPath(path),
1000           "Path %s should not result absolute and normalized" % path)
1001
1002   def testBase(self):
1003     self._pathTestHelper('/etc', True)
1004     self._pathTestHelper('/srv', True)
1005     self._pathTestHelper('etc', False)
1006     self._pathTestHelper('/etc/../root', False)
1007     self._pathTestHelper('/etc/', False)
1008
1009
1010 class TestSafeEncode(unittest.TestCase):
1011   """Test case for SafeEncode"""
1012
1013   def testAscii(self):
1014     for txt in [string.digits, string.letters, string.punctuation]:
1015       self.failUnlessEqual(txt, SafeEncode(txt))
1016
1017   def testDoubleEncode(self):
1018     for i in range(255):
1019       txt = SafeEncode(chr(i))
1020       self.failUnlessEqual(txt, SafeEncode(txt))
1021
1022   def testUnicode(self):
1023     # 1024 is high enough to catch non-direct ASCII mappings
1024     for i in range(1024):
1025       txt = SafeEncode(unichr(i))
1026       self.failUnlessEqual(txt, SafeEncode(txt))
1027
1028
1029 class TestFormatTime(unittest.TestCase):
1030   """Testing case for FormatTime"""
1031
1032   def testNone(self):
1033     self.failUnlessEqual(FormatTime(None), "N/A")
1034
1035   def testInvalid(self):
1036     self.failUnlessEqual(FormatTime(()), "N/A")
1037
1038   def testNow(self):
1039     # tests that we accept time.time input
1040     FormatTime(time.time())
1041     # tests that we accept int input
1042     FormatTime(int(time.time()))
1043
1044
1045 if __name__ == '__main__':
1046   testutils.GanetiTestProgram()