4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
40 from ganeti import constants
41 from ganeti import utils
42 from ganeti import errors
43 from ganeti.utils import IsProcessAlive, RunCmd, \
44 RemoveFile, MatchNameComponent, FormatUnit, \
45 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
46 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
47 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
48 TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
51 from ganeti.errors import LockError, UnitParseError, GenericError, \
55 class TestIsProcessAlive(unittest.TestCase):
56 """Testing case for IsProcessAlive"""
60 self.assert_(IsProcessAlive(mypid),
61 "can't find myself running")
63 def testNotExisting(self):
64 pid_non_existing = os.fork()
65 if pid_non_existing == 0:
67 elif pid_non_existing < 0:
68 raise SystemError("can't fork")
69 os.waitpid(pid_non_existing, 0)
70 self.assert_(not IsProcessAlive(pid_non_existing),
71 "nonexisting process detected")
74 class TestPidFileFunctions(unittest.TestCase):
75 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
78 self.dir = tempfile.mkdtemp()
79 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
80 utils.DaemonPidFileName = self.f_dpn
82 def testPidFileFunctions(self):
83 pid_file = self.f_dpn('test')
84 utils.WritePidFile('test')
85 self.failUnless(os.path.exists(pid_file),
86 "PID file should have been created")
87 read_pid = utils.ReadPidFile(pid_file)
88 self.failUnlessEqual(read_pid, os.getpid())
89 self.failUnless(utils.IsProcessAlive(read_pid))
90 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
91 utils.RemovePidFile('test')
92 self.failIf(os.path.exists(pid_file),
93 "PID file should not exist anymore")
94 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
95 "ReadPidFile should return 0 for missing pid file")
96 fh = open(pid_file, "w")
99 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
100 "ReadPidFile should return 0 for invalid pid file")
101 utils.RemovePidFile('test')
102 self.failIf(os.path.exists(pid_file),
103 "PID file should not exist anymore")
106 pid_file = self.f_dpn('child')
107 r_fd, w_fd = os.pipe()
109 if new_pid == 0: #child
110 utils.WritePidFile('child')
115 # else we are in the parent
116 # wait until the child has written the pid file
118 read_pid = utils.ReadPidFile(pid_file)
119 self.failUnlessEqual(read_pid, new_pid)
120 self.failUnless(utils.IsProcessAlive(new_pid))
121 utils.KillProcess(new_pid, waitpid=True)
122 self.failIf(utils.IsProcessAlive(new_pid))
123 utils.RemovePidFile('child')
124 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
127 for name in os.listdir(self.dir):
128 os.unlink(os.path.join(self.dir, name))
132 class TestRunCmd(testutils.GanetiTestCase):
133 """Testing case for the RunCmd function"""
136 testutils.GanetiTestCase.setUp(self)
137 self.magic = time.ctime() + " ganeti test"
138 self.fname = self._CreateTempFile()
141 """Test successful exit code"""
142 result = RunCmd("/bin/sh -c 'exit 0'")
143 self.assertEqual(result.exit_code, 0)
144 self.assertEqual(result.output, "")
147 """Test fail exit code"""
148 result = RunCmd("/bin/sh -c 'exit 1'")
149 self.assertEqual(result.exit_code, 1)
150 self.assertEqual(result.output, "")
152 def testStdout(self):
153 """Test standard output"""
154 cmd = 'echo -n "%s"' % self.magic
155 result = RunCmd("/bin/sh -c '%s'" % cmd)
156 self.assertEqual(result.stdout, self.magic)
157 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
158 self.assertEqual(result.output, "")
159 self.assertFileContent(self.fname, self.magic)
161 def testStderr(self):
162 """Test standard error"""
163 cmd = 'echo -n "%s"' % self.magic
164 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
165 self.assertEqual(result.stderr, self.magic)
166 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
167 self.assertEqual(result.output, "")
168 self.assertFileContent(self.fname, self.magic)
170 def testCombined(self):
171 """Test combined output"""
172 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
173 expected = "A" + self.magic + "B" + self.magic
174 result = RunCmd("/bin/sh -c '%s'" % cmd)
175 self.assertEqual(result.output, expected)
176 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
177 self.assertEqual(result.output, "")
178 self.assertFileContent(self.fname, expected)
180 def testSignal(self):
182 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
183 self.assertEqual(result.signal, 15)
184 self.assertEqual(result.output, "")
186 def testListRun(self):
188 result = RunCmd(["true"])
189 self.assertEqual(result.signal, None)
190 self.assertEqual(result.exit_code, 0)
191 result = RunCmd(["/bin/sh", "-c", "exit 1"])
192 self.assertEqual(result.signal, None)
193 self.assertEqual(result.exit_code, 1)
194 result = RunCmd(["echo", "-n", self.magic])
195 self.assertEqual(result.signal, None)
196 self.assertEqual(result.exit_code, 0)
197 self.assertEqual(result.stdout, self.magic)
199 def testFileEmptyOutput(self):
200 """Test file output"""
201 result = RunCmd(["true"], output=self.fname)
202 self.assertEqual(result.signal, None)
203 self.assertEqual(result.exit_code, 0)
204 self.assertFileContent(self.fname, "")
207 """Test locale environment"""
208 old_env = os.environ.copy()
210 os.environ["LANG"] = "en_US.UTF-8"
211 os.environ["LC_ALL"] = "en_US.UTF-8"
212 result = RunCmd(["locale"])
213 for line in result.output.splitlines():
214 key, value = line.split("=", 1)
215 # Ignore these variables, they're overridden by LC_ALL
216 if key == "LANG" or key == "LANGUAGE":
218 self.failIf(value and value != "C" and value != '"C"',
219 "Variable %s is set to the invalid value '%s'" % (key, value))
223 def testDefaultCwd(self):
224 """Test default working directory"""
225 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
228 """Test default working directory"""
229 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
230 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
232 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
235 class TestRemoveFile(unittest.TestCase):
236 """Test case for the RemoveFile function"""
239 """Create a temp dir and file for each case"""
240 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
241 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
245 if os.path.exists(self.tmpfile):
246 os.unlink(self.tmpfile)
247 os.rmdir(self.tmpdir)
250 def testIgnoreDirs(self):
251 """Test that RemoveFile() ignores directories"""
252 self.assertEqual(None, RemoveFile(self.tmpdir))
255 def testIgnoreNotExisting(self):
256 """Test that RemoveFile() ignores non-existing files"""
257 RemoveFile(self.tmpfile)
258 RemoveFile(self.tmpfile)
261 def testRemoveFile(self):
262 """Test that RemoveFile does remove a file"""
263 RemoveFile(self.tmpfile)
264 if os.path.exists(self.tmpfile):
265 self.fail("File '%s' not removed" % self.tmpfile)
268 def testRemoveSymlink(self):
269 """Test that RemoveFile does remove symlinks"""
270 symlink = self.tmpdir + "/symlink"
271 os.symlink("no-such-file", symlink)
273 if os.path.exists(symlink):
274 self.fail("File '%s' not removed" % symlink)
275 os.symlink(self.tmpfile, symlink)
277 if os.path.exists(symlink):
278 self.fail("File '%s' not removed" % symlink)
281 class TestRename(unittest.TestCase):
282 """Test case for RenameFile"""
285 """Create a temporary directory"""
286 self.tmpdir = tempfile.mkdtemp()
287 self.tmpfile = os.path.join(self.tmpdir, "test1")
290 open(self.tmpfile, "w").close()
293 """Remove temporary directory"""
294 shutil.rmtree(self.tmpdir)
296 def testSimpleRename1(self):
297 """Simple rename 1"""
298 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
299 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
301 def testSimpleRename2(self):
302 """Simple rename 2"""
303 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
305 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
307 def testRenameMkdir(self):
308 """Rename with mkdir"""
309 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
311 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
312 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
314 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
315 os.path.join(self.tmpdir, "test/foo/bar/baz"),
317 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
318 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
319 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
322 class TestMatchNameComponent(unittest.TestCase):
323 """Test case for the MatchNameComponent function"""
325 def testEmptyList(self):
326 """Test that there is no match against an empty list"""
328 self.failUnlessEqual(MatchNameComponent("", []), None)
329 self.failUnlessEqual(MatchNameComponent("test", []), None)
331 def testSingleMatch(self):
332 """Test that a single match is performed correctly"""
333 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
334 for key in "test2", "test2.example", "test2.example.com":
335 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
337 def testMultipleMatches(self):
338 """Test that a multiple match is returned as None"""
339 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
340 for key in "test1", "test1.example":
341 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
343 def testFullMatch(self):
344 """Test that a full match is returned correctly"""
346 key2 = "test1.example"
347 mlist = [key2, key2 + ".com"]
348 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
349 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
351 def testCaseInsensitivePartialMatch(self):
352 """Test for the case_insensitive keyword"""
353 mlist = ["test1.example.com", "test2.example.net"]
354 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
356 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
358 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
360 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
364 def testCaseInsensitiveFullMatch(self):
365 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
366 # Between the two ts1 a full string match non-case insensitive should work
367 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
369 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
371 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
373 # Between the two ts2 only case differs, so only case-match works
374 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
376 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
378 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
382 class TestFormatUnit(unittest.TestCase):
383 """Test case for the FormatUnit function"""
386 self.assertEqual(FormatUnit(1, 'h'), '1M')
387 self.assertEqual(FormatUnit(100, 'h'), '100M')
388 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
390 self.assertEqual(FormatUnit(1, 'm'), '1')
391 self.assertEqual(FormatUnit(100, 'm'), '100')
392 self.assertEqual(FormatUnit(1023, 'm'), '1023')
394 self.assertEqual(FormatUnit(1024, 'm'), '1024')
395 self.assertEqual(FormatUnit(1536, 'm'), '1536')
396 self.assertEqual(FormatUnit(17133, 'm'), '17133')
397 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
400 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
401 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
402 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
403 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
405 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
406 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
407 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
408 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
410 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
411 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
412 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
415 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
416 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
417 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
419 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
420 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
421 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
423 class TestParseUnit(unittest.TestCase):
424 """Test case for the ParseUnit function"""
427 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
428 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
429 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
431 def testRounding(self):
432 self.assertEqual(ParseUnit('0'), 0)
433 self.assertEqual(ParseUnit('1'), 4)
434 self.assertEqual(ParseUnit('2'), 4)
435 self.assertEqual(ParseUnit('3'), 4)
437 self.assertEqual(ParseUnit('124'), 124)
438 self.assertEqual(ParseUnit('125'), 128)
439 self.assertEqual(ParseUnit('126'), 128)
440 self.assertEqual(ParseUnit('127'), 128)
441 self.assertEqual(ParseUnit('128'), 128)
442 self.assertEqual(ParseUnit('129'), 132)
443 self.assertEqual(ParseUnit('130'), 132)
445 def testFloating(self):
446 self.assertEqual(ParseUnit('0'), 0)
447 self.assertEqual(ParseUnit('0.5'), 4)
448 self.assertEqual(ParseUnit('1.75'), 4)
449 self.assertEqual(ParseUnit('1.99'), 4)
450 self.assertEqual(ParseUnit('2.00'), 4)
451 self.assertEqual(ParseUnit('2.01'), 4)
452 self.assertEqual(ParseUnit('3.99'), 4)
453 self.assertEqual(ParseUnit('4.00'), 4)
454 self.assertEqual(ParseUnit('4.01'), 8)
455 self.assertEqual(ParseUnit('1.5G'), 1536)
456 self.assertEqual(ParseUnit('1.8G'), 1844)
457 self.assertEqual(ParseUnit('8.28T'), 8682212)
459 def testSuffixes(self):
460 for sep in ('', ' ', ' ', "\t", "\t "):
461 for suffix, scale in TestParseUnit.SCALES:
462 for func in (lambda x: x, str.lower, str.upper):
463 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
466 def testInvalidInput(self):
467 for sep in ('-', '_', ',', 'a'):
468 for suffix, _ in TestParseUnit.SCALES:
469 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
471 for suffix, _ in TestParseUnit.SCALES:
472 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
475 class TestSshKeys(testutils.GanetiTestCase):
476 """Test case for the AddAuthorizedKey function"""
478 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
479 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
480 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
483 testutils.GanetiTestCase.setUp(self)
484 self.tmpname = self._CreateTempFile()
485 handle = open(self.tmpname, 'w')
487 handle.write("%s\n" % TestSshKeys.KEY_A)
488 handle.write("%s\n" % TestSshKeys.KEY_B)
492 def testAddingNewKey(self):
493 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
495 self.assertFileContent(self.tmpname,
496 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
497 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
498 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
499 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
501 def testAddingAlmostButNotCompletelyTheSameKey(self):
502 AddAuthorizedKey(self.tmpname,
503 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
505 self.assertFileContent(self.tmpname,
506 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
507 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
508 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
509 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
511 def testAddingExistingKeyWithSomeMoreSpaces(self):
512 AddAuthorizedKey(self.tmpname,
513 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
515 self.assertFileContent(self.tmpname,
516 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
517 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
518 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
520 def testRemovingExistingKeyWithSomeMoreSpaces(self):
521 RemoveAuthorizedKey(self.tmpname,
522 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
524 self.assertFileContent(self.tmpname,
525 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
526 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
528 def testRemovingNonExistingKey(self):
529 RemoveAuthorizedKey(self.tmpname,
530 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
532 self.assertFileContent(self.tmpname,
533 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
534 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
535 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
538 class TestEtcHosts(testutils.GanetiTestCase):
539 """Test functions modifying /etc/hosts"""
542 testutils.GanetiTestCase.setUp(self)
543 self.tmpname = self._CreateTempFile()
544 handle = open(self.tmpname, 'w')
546 handle.write('# This is a test file for /etc/hosts\n')
547 handle.write('127.0.0.1\tlocalhost\n')
548 handle.write('192.168.1.1 router gw\n')
552 def testSettingNewIp(self):
553 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
555 self.assertFileContent(self.tmpname,
556 "# This is a test file for /etc/hosts\n"
557 "127.0.0.1\tlocalhost\n"
558 "192.168.1.1 router gw\n"
559 "1.2.3.4\tmyhost.domain.tld myhost\n")
560 self.assertFileMode(self.tmpname, 0644)
562 def testSettingExistingIp(self):
563 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
566 self.assertFileContent(self.tmpname,
567 "# This is a test file for /etc/hosts\n"
568 "127.0.0.1\tlocalhost\n"
569 "192.168.1.1\tmyhost.domain.tld myhost\n")
570 self.assertFileMode(self.tmpname, 0644)
572 def testSettingDuplicateName(self):
573 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
575 self.assertFileContent(self.tmpname,
576 "# This is a test file for /etc/hosts\n"
577 "127.0.0.1\tlocalhost\n"
578 "192.168.1.1 router gw\n"
580 self.assertFileMode(self.tmpname, 0644)
582 def testRemovingExistingHost(self):
583 RemoveEtcHostsEntry(self.tmpname, 'router')
585 self.assertFileContent(self.tmpname,
586 "# This is a test file for /etc/hosts\n"
587 "127.0.0.1\tlocalhost\n"
589 self.assertFileMode(self.tmpname, 0644)
591 def testRemovingSingleExistingHost(self):
592 RemoveEtcHostsEntry(self.tmpname, 'localhost')
594 self.assertFileContent(self.tmpname,
595 "# This is a test file for /etc/hosts\n"
596 "192.168.1.1 router gw\n")
597 self.assertFileMode(self.tmpname, 0644)
599 def testRemovingNonExistingHost(self):
600 RemoveEtcHostsEntry(self.tmpname, 'myhost')
602 self.assertFileContent(self.tmpname,
603 "# This is a test file for /etc/hosts\n"
604 "127.0.0.1\tlocalhost\n"
605 "192.168.1.1 router gw\n")
606 self.assertFileMode(self.tmpname, 0644)
608 def testRemovingAlias(self):
609 RemoveEtcHostsEntry(self.tmpname, 'gw')
611 self.assertFileContent(self.tmpname,
612 "# This is a test file for /etc/hosts\n"
613 "127.0.0.1\tlocalhost\n"
614 "192.168.1.1 router\n")
615 self.assertFileMode(self.tmpname, 0644)
618 class TestShellQuoting(unittest.TestCase):
619 """Test case for shell quoting functions"""
621 def testShellQuote(self):
622 self.assertEqual(ShellQuote('abc'), "abc")
623 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
624 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
625 self.assertEqual(ShellQuote("a b c"), "'a b c'")
626 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
628 def testShellQuoteArgs(self):
629 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
630 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
631 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
634 class TestTcpPing(unittest.TestCase):
635 """Testcase for TCP version of ping - against listen(2)ing port"""
638 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
639 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
640 self.listenerport = self.listener.getsockname()[1]
641 self.listener.listen(1)
644 self.listener.shutdown(socket.SHUT_RDWR)
646 del self.listenerport
648 def testTcpPingToLocalHostAccept(self):
649 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
652 live_port_needed=True,
653 source=constants.LOCALHOST_IP_ADDRESS,
655 "failed to connect to test listener")
657 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
660 live_port_needed=True,
662 "failed to connect to test listener (no source)")
665 class TestTcpPingDeaf(unittest.TestCase):
666 """Testcase for TCP version of ping - against non listen(2)ing port"""
669 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
670 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
671 self.deaflistenerport = self.deaflistener.getsockname()[1]
674 del self.deaflistener
675 del self.deaflistenerport
677 def testTcpPingToLocalHostAcceptDeaf(self):
678 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
679 self.deaflistenerport,
680 timeout=constants.TCP_PING_TIMEOUT,
681 live_port_needed=True,
682 source=constants.LOCALHOST_IP_ADDRESS,
683 ), # need successful connect(2)
684 "successfully connected to deaf listener")
686 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
687 self.deaflistenerport,
688 timeout=constants.TCP_PING_TIMEOUT,
689 live_port_needed=True,
690 ), # need successful connect(2)
691 "successfully connected to deaf listener (no source addr)")
693 def testTcpPingToLocalHostNoAccept(self):
694 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
695 self.deaflistenerport,
696 timeout=constants.TCP_PING_TIMEOUT,
697 live_port_needed=False,
698 source=constants.LOCALHOST_IP_ADDRESS,
699 ), # ECONNREFUSED is OK
700 "failed to ping alive host on deaf port")
702 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
703 self.deaflistenerport,
704 timeout=constants.TCP_PING_TIMEOUT,
705 live_port_needed=False,
706 ), # ECONNREFUSED is OK
707 "failed to ping alive host on deaf port (no source addr)")
710 class TestOwnIpAddress(unittest.TestCase):
711 """Testcase for OwnIpAddress"""
713 def testOwnLoopback(self):
714 """check having the loopback ip"""
715 self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
716 "Should own the loopback address")
718 def testNowOwnAddress(self):
719 """check that I don't own an address"""
721 # network 192.0.2.0/24 is reserved for test/documentation as per
722 # rfc 3330, so we *should* not have an address of this range... if
723 # this fails, we should extend the test to multiple addresses
725 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
728 class TestListVisibleFiles(unittest.TestCase):
729 """Test case for ListVisibleFiles"""
732 self.path = tempfile.mkdtemp()
735 shutil.rmtree(self.path)
737 def _test(self, files, expected):
739 expected = expected[:]
743 f = open(os.path.join(self.path, name), 'w')
749 found = ListVisibleFiles(self.path)
752 self.assertEqual(found, expected)
754 def testAllVisible(self):
755 files = ["a", "b", "c"]
757 self._test(files, expected)
759 def testNoneVisible(self):
760 files = [".a", ".b", ".c"]
762 self._test(files, expected)
764 def testSomeVisible(self):
765 files = ["a", "b", ".c"]
766 expected = ["a", "b"]
767 self._test(files, expected)
770 class TestNewUUID(unittest.TestCase):
771 """Test case for NewUUID"""
773 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
774 '[a-f0-9]{4}-[a-f0-9]{12}$')
777 self.failUnless(self._re_uuid.match(utils.NewUUID()))
780 class TestUniqueSequence(unittest.TestCase):
781 """Test case for UniqueSequence"""
783 def _test(self, input, expected):
784 self.assertEqual(utils.UniqueSequence(input), expected)
788 self._test([1, 2, 3], [1, 2, 3])
789 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
790 self._test([1, 2, 2, 3], [1, 2, 3])
791 self._test([1, 2, 3, 3], [1, 2, 3])
794 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
795 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
798 self._test(["a", "a"], ["a"])
799 self._test(["a", "b"], ["a", "b"])
800 self._test(["a", "b", "a"], ["a", "b"])
803 class TestFirstFree(unittest.TestCase):
804 """Test case for the FirstFree function"""
808 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
809 self.failUnlessEqual(FirstFree([]), None)
810 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
811 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
812 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
815 class TestTailFile(testutils.GanetiTestCase):
816 """Test case for the TailFile function"""
819 fname = self._CreateTempFile()
820 self.failUnlessEqual(TailFile(fname), [])
821 self.failUnlessEqual(TailFile(fname, lines=25), [])
823 def testAllLines(self):
824 data = ["test %d" % i for i in range(30)]
826 fname = self._CreateTempFile()
827 fd = open(fname, "w")
828 fd.write("\n".join(data[:i]))
832 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
834 def testPartialLines(self):
835 data = ["test %d" % i for i in range(30)]
836 fname = self._CreateTempFile()
837 fd = open(fname, "w")
838 fd.write("\n".join(data))
841 for i in range(1, 30):
842 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
844 def testBigFile(self):
845 data = ["test %d" % i for i in range(30)]
846 fname = self._CreateTempFile()
847 fd = open(fname, "w")
848 fd.write("X" * 1048576)
850 fd.write("\n".join(data))
853 for i in range(1, 30):
854 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
857 class TestFileLock(unittest.TestCase):
858 """Test case for the FileLock class"""
861 self.tmpfile = tempfile.NamedTemporaryFile()
862 self.lock = utils.FileLock(self.tmpfile.name)
864 def testSharedNonblocking(self):
865 self.lock.Shared(blocking=False)
868 def testExclusiveNonblocking(self):
869 self.lock.Exclusive(blocking=False)
872 def testUnlockNonblocking(self):
873 self.lock.Unlock(blocking=False)
876 def testSharedBlocking(self):
877 self.lock.Shared(blocking=True)
880 def testExclusiveBlocking(self):
881 self.lock.Exclusive(blocking=True)
884 def testUnlockBlocking(self):
885 self.lock.Unlock(blocking=True)
888 def testSharedExclusiveUnlock(self):
889 self.lock.Shared(blocking=False)
890 self.lock.Exclusive(blocking=False)
891 self.lock.Unlock(blocking=False)
894 def testExclusiveSharedUnlock(self):
895 self.lock.Exclusive(blocking=False)
896 self.lock.Shared(blocking=False)
897 self.lock.Unlock(blocking=False)
900 def testCloseShared(self):
902 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
904 def testCloseExclusive(self):
906 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
908 def testCloseUnlock(self):
910 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
913 class TestTimeFunctions(unittest.TestCase):
914 """Test case for time functions"""
917 self.assertEqual(utils.SplitTime(1), (1, 0))
918 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
919 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
920 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
921 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
922 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
923 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
924 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
926 self.assertRaises(AssertionError, utils.SplitTime, -1)
928 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
929 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
930 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
932 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
934 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
936 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
937 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
938 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
939 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
940 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
943 class FieldSetTestCase(unittest.TestCase):
944 """Test case for FieldSets"""
946 def testSimpleMatch(self):
947 f = utils.FieldSet("a", "b", "c", "def")
948 self.failUnless(f.Matches("a"))
949 self.failIf(f.Matches("d"), "Substring matched")
950 self.failIf(f.Matches("defghi"), "Prefix string matched")
951 self.failIf(f.NonMatching(["b", "c"]))
952 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
953 self.failUnless(f.NonMatching(["a", "d"]))
955 def testRegexMatch(self):
956 f = utils.FieldSet("a", "b([0-9]+)", "c")
957 self.failUnless(f.Matches("b1"))
958 self.failUnless(f.Matches("b99"))
959 self.failIf(f.Matches("b/1"))
960 self.failIf(f.NonMatching(["b12", "c"]))
961 self.failUnless(f.NonMatching(["a", "1"]))
963 class TestForceDictType(unittest.TestCase):
964 """Test case for ForceDictType"""
968 'a': constants.VTYPE_INT,
969 'b': constants.VTYPE_BOOL,
970 'c': constants.VTYPE_STRING,
971 'd': constants.VTYPE_SIZE,
974 def _fdt(self, dict, allowed_values=None):
975 if allowed_values is None:
976 ForceDictType(dict, self.key_types)
978 ForceDictType(dict, self.key_types, allowed_values=allowed_values)
982 def testSimpleDict(self):
983 self.assertEqual(self._fdt({}), {})
984 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
985 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
986 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
987 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
988 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
989 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
990 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
991 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
992 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
993 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
994 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
996 def testErrors(self):
997 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
998 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
999 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1000 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1003 class TestIsAbsNormPath(unittest.TestCase):
1004 """Testing case for IsProcessAlive"""
1006 def _pathTestHelper(self, path, result):
1008 self.assert_(IsNormAbsPath(path),
1009 "Path %s should result absolute and normalized" % path)
1011 self.assert_(not IsNormAbsPath(path),
1012 "Path %s should not result absolute and normalized" % path)
1015 self._pathTestHelper('/etc', True)
1016 self._pathTestHelper('/srv', True)
1017 self._pathTestHelper('etc', False)
1018 self._pathTestHelper('/etc/../root', False)
1019 self._pathTestHelper('/etc/', False)
1022 class TestSafeEncode(unittest.TestCase):
1023 """Test case for SafeEncode"""
1025 def testAscii(self):
1026 for txt in [string.digits, string.letters, string.punctuation]:
1027 self.failUnlessEqual(txt, SafeEncode(txt))
1029 def testDoubleEncode(self):
1030 for i in range(255):
1031 txt = SafeEncode(chr(i))
1032 self.failUnlessEqual(txt, SafeEncode(txt))
1034 def testUnicode(self):
1035 # 1024 is high enough to catch non-direct ASCII mappings
1036 for i in range(1024):
1037 txt = SafeEncode(unichr(i))
1038 self.failUnlessEqual(txt, SafeEncode(txt))
1041 class TestFormatTime(unittest.TestCase):
1042 """Testing case for FormatTime"""
1045 self.failUnlessEqual(FormatTime(None), "N/A")
1047 def testInvalid(self):
1048 self.failUnlessEqual(FormatTime(()), "N/A")
1051 # tests that we accept time.time input
1052 FormatTime(time.time())
1053 # tests that we accept int input
1054 FormatTime(int(time.time()))
1057 class TestFingerprintFile(unittest.TestCase):
1059 self.tmpfile = tempfile.NamedTemporaryFile()
1062 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1063 "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1065 utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1066 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1067 "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1070 class TestUnescapeAndSplit(unittest.TestCase):
1071 """Testing case for UnescapeAndSplit"""
1074 # testing more that one separator for regexp safety
1075 self._seps = [",", "+", "."]
1077 def testSimple(self):
1078 a = ["a", "b", "c", "d"]
1079 for sep in self._seps:
1080 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1082 def testEscape(self):
1083 for sep in self._seps:
1084 a = ["a", "b\\" + sep + "c", "d"]
1085 b = ["a", "b" + sep + "c", "d"]
1086 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1088 def testDoubleEscape(self):
1089 for sep in self._seps:
1090 a = ["a", "b\\\\", "c", "d"]
1091 b = ["a", "b\\", "c", "d"]
1092 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1094 def testThreeEscape(self):
1095 for sep in self._seps:
1096 a = ["a", "b\\\\\\" + sep + "c", "d"]
1097 b = ["a", "b\\" + sep + "c", "d"]
1098 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1101 class TestGenerateSelfSignedSslCert(unittest.TestCase):
1103 self.tmpdir = tempfile.mkdtemp()
1106 shutil.rmtree(self.tmpdir)
1108 def _checkPrivateRsaKey(self, key):
1109 lines = key.splitlines()
1110 self.assert_("-----BEGIN RSA PRIVATE KEY-----" in lines)
1111 self.assert_("-----END RSA PRIVATE KEY-----" in lines)
1113 def _checkRsaCertificate(self, cert):
1114 lines = cert.splitlines()
1115 self.assert_("-----BEGIN CERTIFICATE-----" in lines)
1116 self.assert_("-----END CERTIFICATE-----" in lines)
1118 def testSingleFile(self):
1119 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1121 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1123 cert1 = utils.ReadFile(cert1_filename)
1125 self._checkPrivateRsaKey(cert1)
1126 self._checkRsaCertificate(cert1)
1129 if __name__ == '__main__':
1130 testutils.GanetiTestProgram()