4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
40 from ganeti import constants
41 from ganeti import utils
42 from ganeti import errors
43 from ganeti.utils import IsProcessAlive, RunCmd, \
44 RemoveFile, MatchNameComponent, FormatUnit, \
45 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
46 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
47 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
48 TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime
50 from ganeti.errors import LockError, UnitParseError, GenericError, \
54 class TestIsProcessAlive(unittest.TestCase):
55 """Testing case for IsProcessAlive"""
59 self.assert_(IsProcessAlive(mypid),
60 "can't find myself running")
62 def testNotExisting(self):
63 pid_non_existing = os.fork()
64 if pid_non_existing == 0:
66 elif pid_non_existing < 0:
67 raise SystemError("can't fork")
68 os.waitpid(pid_non_existing, 0)
69 self.assert_(not IsProcessAlive(pid_non_existing),
70 "nonexisting process detected")
73 class TestPidFileFunctions(unittest.TestCase):
74 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
77 self.dir = tempfile.mkdtemp()
78 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
79 utils.DaemonPidFileName = self.f_dpn
81 def testPidFileFunctions(self):
82 pid_file = self.f_dpn('test')
83 utils.WritePidFile('test')
84 self.failUnless(os.path.exists(pid_file),
85 "PID file should have been created")
86 read_pid = utils.ReadPidFile(pid_file)
87 self.failUnlessEqual(read_pid, os.getpid())
88 self.failUnless(utils.IsProcessAlive(read_pid))
89 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
90 utils.RemovePidFile('test')
91 self.failIf(os.path.exists(pid_file),
92 "PID file should not exist anymore")
93 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
94 "ReadPidFile should return 0 for missing pid file")
95 fh = open(pid_file, "w")
98 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
99 "ReadPidFile should return 0 for invalid pid file")
100 utils.RemovePidFile('test')
101 self.failIf(os.path.exists(pid_file),
102 "PID file should not exist anymore")
105 pid_file = self.f_dpn('child')
106 r_fd, w_fd = os.pipe()
108 if new_pid == 0: #child
109 utils.WritePidFile('child')
114 # else we are in the parent
115 # wait until the child has written the pid file
117 read_pid = utils.ReadPidFile(pid_file)
118 self.failUnlessEqual(read_pid, new_pid)
119 self.failUnless(utils.IsProcessAlive(new_pid))
120 utils.KillProcess(new_pid, waitpid=True)
121 self.failIf(utils.IsProcessAlive(new_pid))
122 utils.RemovePidFile('child')
123 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
126 for name in os.listdir(self.dir):
127 os.unlink(os.path.join(self.dir, name))
131 class TestRunCmd(testutils.GanetiTestCase):
132 """Testing case for the RunCmd function"""
135 testutils.GanetiTestCase.setUp(self)
136 self.magic = time.ctime() + " ganeti test"
137 self.fname = self._CreateTempFile()
140 """Test successful exit code"""
141 result = RunCmd("/bin/sh -c 'exit 0'")
142 self.assertEqual(result.exit_code, 0)
143 self.assertEqual(result.output, "")
146 """Test fail exit code"""
147 result = RunCmd("/bin/sh -c 'exit 1'")
148 self.assertEqual(result.exit_code, 1)
149 self.assertEqual(result.output, "")
151 def testStdout(self):
152 """Test standard output"""
153 cmd = 'echo -n "%s"' % self.magic
154 result = RunCmd("/bin/sh -c '%s'" % cmd)
155 self.assertEqual(result.stdout, self.magic)
156 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
157 self.assertEqual(result.output, "")
158 self.assertFileContent(self.fname, self.magic)
160 def testStderr(self):
161 """Test standard error"""
162 cmd = 'echo -n "%s"' % self.magic
163 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
164 self.assertEqual(result.stderr, self.magic)
165 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
166 self.assertEqual(result.output, "")
167 self.assertFileContent(self.fname, self.magic)
169 def testCombined(self):
170 """Test combined output"""
171 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
172 expected = "A" + self.magic + "B" + self.magic
173 result = RunCmd("/bin/sh -c '%s'" % cmd)
174 self.assertEqual(result.output, expected)
175 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
176 self.assertEqual(result.output, "")
177 self.assertFileContent(self.fname, expected)
179 def testSignal(self):
181 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
182 self.assertEqual(result.signal, 15)
183 self.assertEqual(result.output, "")
185 def testListRun(self):
187 result = RunCmd(["true"])
188 self.assertEqual(result.signal, None)
189 self.assertEqual(result.exit_code, 0)
190 result = RunCmd(["/bin/sh", "-c", "exit 1"])
191 self.assertEqual(result.signal, None)
192 self.assertEqual(result.exit_code, 1)
193 result = RunCmd(["echo", "-n", self.magic])
194 self.assertEqual(result.signal, None)
195 self.assertEqual(result.exit_code, 0)
196 self.assertEqual(result.stdout, self.magic)
198 def testFileEmptyOutput(self):
199 """Test file output"""
200 result = RunCmd(["true"], output=self.fname)
201 self.assertEqual(result.signal, None)
202 self.assertEqual(result.exit_code, 0)
203 self.assertFileContent(self.fname, "")
206 """Test locale environment"""
207 old_env = os.environ.copy()
209 os.environ["LANG"] = "en_US.UTF-8"
210 os.environ["LC_ALL"] = "en_US.UTF-8"
211 result = RunCmd(["locale"])
212 for line in result.output.splitlines():
213 key, value = line.split("=", 1)
214 # Ignore these variables, they're overridden by LC_ALL
215 if key == "LANG" or key == "LANGUAGE":
217 self.failIf(value and value != "C" and value != '"C"',
218 "Variable %s is set to the invalid value '%s'" % (key, value))
222 def testDefaultCwd(self):
223 """Test default working directory"""
224 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
227 """Test default working directory"""
228 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
229 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
231 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
234 class TestRemoveFile(unittest.TestCase):
235 """Test case for the RemoveFile function"""
238 """Create a temp dir and file for each case"""
239 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
240 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
244 if os.path.exists(self.tmpfile):
245 os.unlink(self.tmpfile)
246 os.rmdir(self.tmpdir)
249 def testIgnoreDirs(self):
250 """Test that RemoveFile() ignores directories"""
251 self.assertEqual(None, RemoveFile(self.tmpdir))
254 def testIgnoreNotExisting(self):
255 """Test that RemoveFile() ignores non-existing files"""
256 RemoveFile(self.tmpfile)
257 RemoveFile(self.tmpfile)
260 def testRemoveFile(self):
261 """Test that RemoveFile does remove a file"""
262 RemoveFile(self.tmpfile)
263 if os.path.exists(self.tmpfile):
264 self.fail("File '%s' not removed" % self.tmpfile)
267 def testRemoveSymlink(self):
268 """Test that RemoveFile does remove symlinks"""
269 symlink = self.tmpdir + "/symlink"
270 os.symlink("no-such-file", symlink)
272 if os.path.exists(symlink):
273 self.fail("File '%s' not removed" % symlink)
274 os.symlink(self.tmpfile, symlink)
276 if os.path.exists(symlink):
277 self.fail("File '%s' not removed" % symlink)
280 class TestRename(unittest.TestCase):
281 """Test case for RenameFile"""
284 """Create a temporary directory"""
285 self.tmpdir = tempfile.mkdtemp()
286 self.tmpfile = os.path.join(self.tmpdir, "test1")
289 open(self.tmpfile, "w").close()
292 """Remove temporary directory"""
293 shutil.rmtree(self.tmpdir)
295 def testSimpleRename1(self):
296 """Simple rename 1"""
297 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
299 def testSimpleRename2(self):
300 """Simple rename 2"""
301 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
304 def testRenameMkdir(self):
305 """Rename with mkdir"""
306 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
310 class TestMatchNameComponent(unittest.TestCase):
311 """Test case for the MatchNameComponent function"""
313 def testEmptyList(self):
314 """Test that there is no match against an empty list"""
316 self.failUnlessEqual(MatchNameComponent("", []), None)
317 self.failUnlessEqual(MatchNameComponent("test", []), None)
319 def testSingleMatch(self):
320 """Test that a single match is performed correctly"""
321 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
322 for key in "test2", "test2.example", "test2.example.com":
323 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
325 def testMultipleMatches(self):
326 """Test that a multiple match is returned as None"""
327 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
328 for key in "test1", "test1.example":
329 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
331 def testFullMatch(self):
332 """Test that a full match is returned correctly"""
334 key2 = "test1.example"
335 mlist = [key2, key2 + ".com"]
336 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
337 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
339 def testCaseInsensitivePartialMatch(self):
340 """Test for the case_insensitive keyword"""
341 mlist = ["test1.example.com", "test2.example.net"]
342 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
344 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
346 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
348 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
352 def testCaseInsensitiveFullMatch(self):
353 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
354 # Between the two ts1 a full string match non-case insensitive should work
355 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
357 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
359 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
361 # Between the two ts2 only case differs, so only case-match works
362 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
364 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
366 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
370 class TestFormatUnit(unittest.TestCase):
371 """Test case for the FormatUnit function"""
374 self.assertEqual(FormatUnit(1, 'h'), '1M')
375 self.assertEqual(FormatUnit(100, 'h'), '100M')
376 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
378 self.assertEqual(FormatUnit(1, 'm'), '1')
379 self.assertEqual(FormatUnit(100, 'm'), '100')
380 self.assertEqual(FormatUnit(1023, 'm'), '1023')
382 self.assertEqual(FormatUnit(1024, 'm'), '1024')
383 self.assertEqual(FormatUnit(1536, 'm'), '1536')
384 self.assertEqual(FormatUnit(17133, 'm'), '17133')
385 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
388 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
389 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
390 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
391 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
393 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
394 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
395 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
396 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
398 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
399 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
400 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
403 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
404 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
405 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
407 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
408 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
409 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
411 class TestParseUnit(unittest.TestCase):
412 """Test case for the ParseUnit function"""
415 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
416 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
417 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
419 def testRounding(self):
420 self.assertEqual(ParseUnit('0'), 0)
421 self.assertEqual(ParseUnit('1'), 4)
422 self.assertEqual(ParseUnit('2'), 4)
423 self.assertEqual(ParseUnit('3'), 4)
425 self.assertEqual(ParseUnit('124'), 124)
426 self.assertEqual(ParseUnit('125'), 128)
427 self.assertEqual(ParseUnit('126'), 128)
428 self.assertEqual(ParseUnit('127'), 128)
429 self.assertEqual(ParseUnit('128'), 128)
430 self.assertEqual(ParseUnit('129'), 132)
431 self.assertEqual(ParseUnit('130'), 132)
433 def testFloating(self):
434 self.assertEqual(ParseUnit('0'), 0)
435 self.assertEqual(ParseUnit('0.5'), 4)
436 self.assertEqual(ParseUnit('1.75'), 4)
437 self.assertEqual(ParseUnit('1.99'), 4)
438 self.assertEqual(ParseUnit('2.00'), 4)
439 self.assertEqual(ParseUnit('2.01'), 4)
440 self.assertEqual(ParseUnit('3.99'), 4)
441 self.assertEqual(ParseUnit('4.00'), 4)
442 self.assertEqual(ParseUnit('4.01'), 8)
443 self.assertEqual(ParseUnit('1.5G'), 1536)
444 self.assertEqual(ParseUnit('1.8G'), 1844)
445 self.assertEqual(ParseUnit('8.28T'), 8682212)
447 def testSuffixes(self):
448 for sep in ('', ' ', ' ', "\t", "\t "):
449 for suffix, scale in TestParseUnit.SCALES:
450 for func in (lambda x: x, str.lower, str.upper):
451 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
454 def testInvalidInput(self):
455 for sep in ('-', '_', ',', 'a'):
456 for suffix, _ in TestParseUnit.SCALES:
457 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
459 for suffix, _ in TestParseUnit.SCALES:
460 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
463 class TestSshKeys(testutils.GanetiTestCase):
464 """Test case for the AddAuthorizedKey function"""
466 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
467 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
468 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
471 testutils.GanetiTestCase.setUp(self)
472 self.tmpname = self._CreateTempFile()
473 handle = open(self.tmpname, 'w')
475 handle.write("%s\n" % TestSshKeys.KEY_A)
476 handle.write("%s\n" % TestSshKeys.KEY_B)
480 def testAddingNewKey(self):
481 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
483 self.assertFileContent(self.tmpname,
484 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
485 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
486 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
487 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
489 def testAddingAlmostButNotCompletelyTheSameKey(self):
490 AddAuthorizedKey(self.tmpname,
491 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
493 self.assertFileContent(self.tmpname,
494 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
495 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
496 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
497 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
499 def testAddingExistingKeyWithSomeMoreSpaces(self):
500 AddAuthorizedKey(self.tmpname,
501 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
503 self.assertFileContent(self.tmpname,
504 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
505 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
506 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
508 def testRemovingExistingKeyWithSomeMoreSpaces(self):
509 RemoveAuthorizedKey(self.tmpname,
510 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
512 self.assertFileContent(self.tmpname,
513 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
514 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
516 def testRemovingNonExistingKey(self):
517 RemoveAuthorizedKey(self.tmpname,
518 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
520 self.assertFileContent(self.tmpname,
521 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
522 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
523 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
526 class TestEtcHosts(testutils.GanetiTestCase):
527 """Test functions modifying /etc/hosts"""
530 testutils.GanetiTestCase.setUp(self)
531 self.tmpname = self._CreateTempFile()
532 handle = open(self.tmpname, 'w')
534 handle.write('# This is a test file for /etc/hosts\n')
535 handle.write('127.0.0.1\tlocalhost\n')
536 handle.write('192.168.1.1 router gw\n')
540 def testSettingNewIp(self):
541 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
543 self.assertFileContent(self.tmpname,
544 "# This is a test file for /etc/hosts\n"
545 "127.0.0.1\tlocalhost\n"
546 "192.168.1.1 router gw\n"
547 "1.2.3.4\tmyhost.domain.tld myhost\n")
548 self.assertFileMode(self.tmpname, 0644)
550 def testSettingExistingIp(self):
551 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
554 self.assertFileContent(self.tmpname,
555 "# This is a test file for /etc/hosts\n"
556 "127.0.0.1\tlocalhost\n"
557 "192.168.1.1\tmyhost.domain.tld myhost\n")
558 self.assertFileMode(self.tmpname, 0644)
560 def testSettingDuplicateName(self):
561 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
563 self.assertFileContent(self.tmpname,
564 "# This is a test file for /etc/hosts\n"
565 "127.0.0.1\tlocalhost\n"
566 "192.168.1.1 router gw\n"
568 self.assertFileMode(self.tmpname, 0644)
570 def testRemovingExistingHost(self):
571 RemoveEtcHostsEntry(self.tmpname, 'router')
573 self.assertFileContent(self.tmpname,
574 "# This is a test file for /etc/hosts\n"
575 "127.0.0.1\tlocalhost\n"
577 self.assertFileMode(self.tmpname, 0644)
579 def testRemovingSingleExistingHost(self):
580 RemoveEtcHostsEntry(self.tmpname, 'localhost')
582 self.assertFileContent(self.tmpname,
583 "# This is a test file for /etc/hosts\n"
584 "192.168.1.1 router gw\n")
585 self.assertFileMode(self.tmpname, 0644)
587 def testRemovingNonExistingHost(self):
588 RemoveEtcHostsEntry(self.tmpname, 'myhost')
590 self.assertFileContent(self.tmpname,
591 "# This is a test file for /etc/hosts\n"
592 "127.0.0.1\tlocalhost\n"
593 "192.168.1.1 router gw\n")
594 self.assertFileMode(self.tmpname, 0644)
596 def testRemovingAlias(self):
597 RemoveEtcHostsEntry(self.tmpname, 'gw')
599 self.assertFileContent(self.tmpname,
600 "# This is a test file for /etc/hosts\n"
601 "127.0.0.1\tlocalhost\n"
602 "192.168.1.1 router\n")
603 self.assertFileMode(self.tmpname, 0644)
606 class TestShellQuoting(unittest.TestCase):
607 """Test case for shell quoting functions"""
609 def testShellQuote(self):
610 self.assertEqual(ShellQuote('abc'), "abc")
611 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
612 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
613 self.assertEqual(ShellQuote("a b c"), "'a b c'")
614 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
616 def testShellQuoteArgs(self):
617 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
618 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
619 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
622 class TestTcpPing(unittest.TestCase):
623 """Testcase for TCP version of ping - against listen(2)ing port"""
626 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
627 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
628 self.listenerport = self.listener.getsockname()[1]
629 self.listener.listen(1)
632 self.listener.shutdown(socket.SHUT_RDWR)
634 del self.listenerport
636 def testTcpPingToLocalHostAccept(self):
637 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
640 live_port_needed=True,
641 source=constants.LOCALHOST_IP_ADDRESS,
643 "failed to connect to test listener")
645 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
648 live_port_needed=True,
650 "failed to connect to test listener (no source)")
653 class TestTcpPingDeaf(unittest.TestCase):
654 """Testcase for TCP version of ping - against non listen(2)ing port"""
657 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
658 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
659 self.deaflistenerport = self.deaflistener.getsockname()[1]
662 del self.deaflistener
663 del self.deaflistenerport
665 def testTcpPingToLocalHostAcceptDeaf(self):
666 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
667 self.deaflistenerport,
668 timeout=constants.TCP_PING_TIMEOUT,
669 live_port_needed=True,
670 source=constants.LOCALHOST_IP_ADDRESS,
671 ), # need successful connect(2)
672 "successfully connected to deaf listener")
674 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
675 self.deaflistenerport,
676 timeout=constants.TCP_PING_TIMEOUT,
677 live_port_needed=True,
678 ), # need successful connect(2)
679 "successfully connected to deaf listener (no source addr)")
681 def testTcpPingToLocalHostNoAccept(self):
682 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
683 self.deaflistenerport,
684 timeout=constants.TCP_PING_TIMEOUT,
685 live_port_needed=False,
686 source=constants.LOCALHOST_IP_ADDRESS,
687 ), # ECONNREFUSED is OK
688 "failed to ping alive host on deaf port")
690 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
691 self.deaflistenerport,
692 timeout=constants.TCP_PING_TIMEOUT,
693 live_port_needed=False,
694 ), # ECONNREFUSED is OK
695 "failed to ping alive host on deaf port (no source addr)")
698 class TestOwnIpAddress(unittest.TestCase):
699 """Testcase for OwnIpAddress"""
701 def testOwnLoopback(self):
702 """check having the loopback ip"""
703 self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
704 "Should own the loopback address")
706 def testNowOwnAddress(self):
707 """check that I don't own an address"""
709 # network 192.0.2.0/24 is reserved for test/documentation as per
710 # rfc 3330, so we *should* not have an address of this range... if
711 # this fails, we should extend the test to multiple addresses
713 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
716 class TestListVisibleFiles(unittest.TestCase):
717 """Test case for ListVisibleFiles"""
720 self.path = tempfile.mkdtemp()
723 shutil.rmtree(self.path)
725 def _test(self, files, expected):
727 expected = expected[:]
731 f = open(os.path.join(self.path, name), 'w')
737 found = ListVisibleFiles(self.path)
740 self.assertEqual(found, expected)
742 def testAllVisible(self):
743 files = ["a", "b", "c"]
745 self._test(files, expected)
747 def testNoneVisible(self):
748 files = [".a", ".b", ".c"]
750 self._test(files, expected)
752 def testSomeVisible(self):
753 files = ["a", "b", ".c"]
754 expected = ["a", "b"]
755 self._test(files, expected)
758 class TestNewUUID(unittest.TestCase):
759 """Test case for NewUUID"""
761 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
762 '[a-f0-9]{4}-[a-f0-9]{12}$')
765 self.failUnless(self._re_uuid.match(utils.NewUUID()))
768 class TestUniqueSequence(unittest.TestCase):
769 """Test case for UniqueSequence"""
771 def _test(self, input, expected):
772 self.assertEqual(utils.UniqueSequence(input), expected)
776 self._test([1, 2, 3], [1, 2, 3])
777 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
778 self._test([1, 2, 2, 3], [1, 2, 3])
779 self._test([1, 2, 3, 3], [1, 2, 3])
782 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
783 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
786 self._test(["a", "a"], ["a"])
787 self._test(["a", "b"], ["a", "b"])
788 self._test(["a", "b", "a"], ["a", "b"])
791 class TestFirstFree(unittest.TestCase):
792 """Test case for the FirstFree function"""
796 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
797 self.failUnlessEqual(FirstFree([]), None)
798 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
799 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
800 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
803 class TestTailFile(testutils.GanetiTestCase):
804 """Test case for the TailFile function"""
807 fname = self._CreateTempFile()
808 self.failUnlessEqual(TailFile(fname), [])
809 self.failUnlessEqual(TailFile(fname, lines=25), [])
811 def testAllLines(self):
812 data = ["test %d" % i for i in range(30)]
814 fname = self._CreateTempFile()
815 fd = open(fname, "w")
816 fd.write("\n".join(data[:i]))
820 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
822 def testPartialLines(self):
823 data = ["test %d" % i for i in range(30)]
824 fname = self._CreateTempFile()
825 fd = open(fname, "w")
826 fd.write("\n".join(data))
829 for i in range(1, 30):
830 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
832 def testBigFile(self):
833 data = ["test %d" % i for i in range(30)]
834 fname = self._CreateTempFile()
835 fd = open(fname, "w")
836 fd.write("X" * 1048576)
838 fd.write("\n".join(data))
841 for i in range(1, 30):
842 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
845 class TestFileLock(unittest.TestCase):
846 """Test case for the FileLock class"""
849 self.tmpfile = tempfile.NamedTemporaryFile()
850 self.lock = utils.FileLock(self.tmpfile.name)
852 def testSharedNonblocking(self):
853 self.lock.Shared(blocking=False)
856 def testExclusiveNonblocking(self):
857 self.lock.Exclusive(blocking=False)
860 def testUnlockNonblocking(self):
861 self.lock.Unlock(blocking=False)
864 def testSharedBlocking(self):
865 self.lock.Shared(blocking=True)
868 def testExclusiveBlocking(self):
869 self.lock.Exclusive(blocking=True)
872 def testUnlockBlocking(self):
873 self.lock.Unlock(blocking=True)
876 def testSharedExclusiveUnlock(self):
877 self.lock.Shared(blocking=False)
878 self.lock.Exclusive(blocking=False)
879 self.lock.Unlock(blocking=False)
882 def testExclusiveSharedUnlock(self):
883 self.lock.Exclusive(blocking=False)
884 self.lock.Shared(blocking=False)
885 self.lock.Unlock(blocking=False)
888 def testCloseShared(self):
890 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
892 def testCloseExclusive(self):
894 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
896 def testCloseUnlock(self):
898 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
901 class TestTimeFunctions(unittest.TestCase):
902 """Test case for time functions"""
905 self.assertEqual(utils.SplitTime(1), (1, 0))
906 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
907 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
908 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
909 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
910 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
911 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
912 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
914 self.assertRaises(AssertionError, utils.SplitTime, -1)
916 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
917 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
918 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
920 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
922 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
924 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
925 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
926 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
927 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
928 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
931 class FieldSetTestCase(unittest.TestCase):
932 """Test case for FieldSets"""
934 def testSimpleMatch(self):
935 f = utils.FieldSet("a", "b", "c", "def")
936 self.failUnless(f.Matches("a"))
937 self.failIf(f.Matches("d"), "Substring matched")
938 self.failIf(f.Matches("defghi"), "Prefix string matched")
939 self.failIf(f.NonMatching(["b", "c"]))
940 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
941 self.failUnless(f.NonMatching(["a", "d"]))
943 def testRegexMatch(self):
944 f = utils.FieldSet("a", "b([0-9]+)", "c")
945 self.failUnless(f.Matches("b1"))
946 self.failUnless(f.Matches("b99"))
947 self.failIf(f.Matches("b/1"))
948 self.failIf(f.NonMatching(["b12", "c"]))
949 self.failUnless(f.NonMatching(["a", "1"]))
951 class TestForceDictType(unittest.TestCase):
952 """Test case for ForceDictType"""
956 'a': constants.VTYPE_INT,
957 'b': constants.VTYPE_BOOL,
958 'c': constants.VTYPE_STRING,
959 'd': constants.VTYPE_SIZE,
962 def _fdt(self, dict, allowed_values=None):
963 if allowed_values is None:
964 ForceDictType(dict, self.key_types)
966 ForceDictType(dict, self.key_types, allowed_values=allowed_values)
970 def testSimpleDict(self):
971 self.assertEqual(self._fdt({}), {})
972 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
973 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
974 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
975 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
976 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
977 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
978 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
979 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
980 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
981 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
982 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
984 def testErrors(self):
985 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
986 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
987 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
988 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
991 class TestIsAbsNormPath(unittest.TestCase):
992 """Testing case for IsProcessAlive"""
994 def _pathTestHelper(self, path, result):
996 self.assert_(IsNormAbsPath(path),
997 "Path %s should result absolute and normalized" % path)
999 self.assert_(not IsNormAbsPath(path),
1000 "Path %s should not result absolute and normalized" % path)
1003 self._pathTestHelper('/etc', True)
1004 self._pathTestHelper('/srv', True)
1005 self._pathTestHelper('etc', False)
1006 self._pathTestHelper('/etc/../root', False)
1007 self._pathTestHelper('/etc/', False)
1010 class TestSafeEncode(unittest.TestCase):
1011 """Test case for SafeEncode"""
1013 def testAscii(self):
1014 for txt in [string.digits, string.letters, string.punctuation]:
1015 self.failUnlessEqual(txt, SafeEncode(txt))
1017 def testDoubleEncode(self):
1018 for i in range(255):
1019 txt = SafeEncode(chr(i))
1020 self.failUnlessEqual(txt, SafeEncode(txt))
1022 def testUnicode(self):
1023 # 1024 is high enough to catch non-direct ASCII mappings
1024 for i in range(1024):
1025 txt = SafeEncode(unichr(i))
1026 self.failUnlessEqual(txt, SafeEncode(txt))
1029 class TestFormatTime(unittest.TestCase):
1030 """Testing case for FormatTime"""
1033 self.failUnlessEqual(FormatTime(None), "N/A")
1035 def testInvalid(self):
1036 self.failUnlessEqual(FormatTime(()), "N/A")
1039 # tests that we accept time.time input
1040 FormatTime(time.time())
1041 # tests that we accept int input
1042 FormatTime(int(time.time()))
1045 if __name__ == '__main__':
1046 testutils.GanetiTestProgram()