4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
41 from ganeti import constants
42 from ganeti import utils
43 from ganeti import errors
44 from ganeti.utils import IsProcessAlive, RunCmd, \
45 RemoveFile, MatchNameComponent, FormatUnit, \
46 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
47 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
48 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
49 TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
50 UnescapeAndSplit, RunParts, PathJoin, HostInfo
52 from ganeti.errors import LockError, UnitParseError, GenericError, \
53 ProgrammerError, OpPrereqError
56 class TestIsProcessAlive(unittest.TestCase):
57 """Testing case for IsProcessAlive"""
61 self.assert_(IsProcessAlive(mypid),
62 "can't find myself running")
64 def testNotExisting(self):
65 pid_non_existing = os.fork()
66 if pid_non_existing == 0:
68 elif pid_non_existing < 0:
69 raise SystemError("can't fork")
70 os.waitpid(pid_non_existing, 0)
71 self.assert_(not IsProcessAlive(pid_non_existing),
72 "nonexisting process detected")
75 class TestPidFileFunctions(unittest.TestCase):
76 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
79 self.dir = tempfile.mkdtemp()
80 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
81 utils.DaemonPidFileName = self.f_dpn
83 def testPidFileFunctions(self):
84 pid_file = self.f_dpn('test')
85 utils.WritePidFile('test')
86 self.failUnless(os.path.exists(pid_file),
87 "PID file should have been created")
88 read_pid = utils.ReadPidFile(pid_file)
89 self.failUnlessEqual(read_pid, os.getpid())
90 self.failUnless(utils.IsProcessAlive(read_pid))
91 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
92 utils.RemovePidFile('test')
93 self.failIf(os.path.exists(pid_file),
94 "PID file should not exist anymore")
95 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
96 "ReadPidFile should return 0 for missing pid file")
97 fh = open(pid_file, "w")
100 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
101 "ReadPidFile should return 0 for invalid pid file")
102 utils.RemovePidFile('test')
103 self.failIf(os.path.exists(pid_file),
104 "PID file should not exist anymore")
107 pid_file = self.f_dpn('child')
108 r_fd, w_fd = os.pipe()
110 if new_pid == 0: #child
111 utils.WritePidFile('child')
116 # else we are in the parent
117 # wait until the child has written the pid file
119 read_pid = utils.ReadPidFile(pid_file)
120 self.failUnlessEqual(read_pid, new_pid)
121 self.failUnless(utils.IsProcessAlive(new_pid))
122 utils.KillProcess(new_pid, waitpid=True)
123 self.failIf(utils.IsProcessAlive(new_pid))
124 utils.RemovePidFile('child')
125 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
128 for name in os.listdir(self.dir):
129 os.unlink(os.path.join(self.dir, name))
133 class TestRunCmd(testutils.GanetiTestCase):
134 """Testing case for the RunCmd function"""
137 testutils.GanetiTestCase.setUp(self)
138 self.magic = time.ctime() + " ganeti test"
139 self.fname = self._CreateTempFile()
142 """Test successful exit code"""
143 result = RunCmd("/bin/sh -c 'exit 0'")
144 self.assertEqual(result.exit_code, 0)
145 self.assertEqual(result.output, "")
148 """Test fail exit code"""
149 result = RunCmd("/bin/sh -c 'exit 1'")
150 self.assertEqual(result.exit_code, 1)
151 self.assertEqual(result.output, "")
153 def testStdout(self):
154 """Test standard output"""
155 cmd = 'echo -n "%s"' % self.magic
156 result = RunCmd("/bin/sh -c '%s'" % cmd)
157 self.assertEqual(result.stdout, self.magic)
158 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
159 self.assertEqual(result.output, "")
160 self.assertFileContent(self.fname, self.magic)
162 def testStderr(self):
163 """Test standard error"""
164 cmd = 'echo -n "%s"' % self.magic
165 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
166 self.assertEqual(result.stderr, self.magic)
167 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
168 self.assertEqual(result.output, "")
169 self.assertFileContent(self.fname, self.magic)
171 def testCombined(self):
172 """Test combined output"""
173 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
174 expected = "A" + self.magic + "B" + self.magic
175 result = RunCmd("/bin/sh -c '%s'" % cmd)
176 self.assertEqual(result.output, expected)
177 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
178 self.assertEqual(result.output, "")
179 self.assertFileContent(self.fname, expected)
181 def testSignal(self):
183 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
184 self.assertEqual(result.signal, 15)
185 self.assertEqual(result.output, "")
187 def testListRun(self):
189 result = RunCmd(["true"])
190 self.assertEqual(result.signal, None)
191 self.assertEqual(result.exit_code, 0)
192 result = RunCmd(["/bin/sh", "-c", "exit 1"])
193 self.assertEqual(result.signal, None)
194 self.assertEqual(result.exit_code, 1)
195 result = RunCmd(["echo", "-n", self.magic])
196 self.assertEqual(result.signal, None)
197 self.assertEqual(result.exit_code, 0)
198 self.assertEqual(result.stdout, self.magic)
200 def testFileEmptyOutput(self):
201 """Test file output"""
202 result = RunCmd(["true"], output=self.fname)
203 self.assertEqual(result.signal, None)
204 self.assertEqual(result.exit_code, 0)
205 self.assertFileContent(self.fname, "")
208 """Test locale environment"""
209 old_env = os.environ.copy()
211 os.environ["LANG"] = "en_US.UTF-8"
212 os.environ["LC_ALL"] = "en_US.UTF-8"
213 result = RunCmd(["locale"])
214 for line in result.output.splitlines():
215 key, value = line.split("=", 1)
216 # Ignore these variables, they're overridden by LC_ALL
217 if key == "LANG" or key == "LANGUAGE":
219 self.failIf(value and value != "C" and value != '"C"',
220 "Variable %s is set to the invalid value '%s'" % (key, value))
224 def testDefaultCwd(self):
225 """Test default working directory"""
226 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
229 """Test default working directory"""
230 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
231 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
233 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
235 def testResetEnv(self):
236 """Test environment reset functionality"""
237 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
240 class TestRunParts(unittest.TestCase):
241 """Testing case for the RunParts function"""
244 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
247 shutil.rmtree(self.rundir)
250 """Test on an empty dir"""
251 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
253 def testSkipWrongName(self):
254 """Test that wrong files are skipped"""
255 fname = os.path.join(self.rundir, "00test.dot")
256 utils.WriteFile(fname, data="")
257 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
258 relname = os.path.basename(fname)
259 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
260 [(relname, constants.RUNPARTS_SKIP, None)])
262 def testSkipNonExec(self):
263 """Test that non executable files are skipped"""
264 fname = os.path.join(self.rundir, "00test")
265 utils.WriteFile(fname, data="")
266 relname = os.path.basename(fname)
267 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
268 [(relname, constants.RUNPARTS_SKIP, None)])
271 """Test error on a broken executable"""
272 fname = os.path.join(self.rundir, "00test")
273 utils.WriteFile(fname, data="")
274 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
275 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
276 self.failUnlessEqual(relname, os.path.basename(fname))
277 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
278 self.failUnless(error)
280 def testSorted(self):
281 """Test executions are sorted"""
283 files.append(os.path.join(self.rundir, "64test"))
284 files.append(os.path.join(self.rundir, "00test"))
285 files.append(os.path.join(self.rundir, "42test"))
288 utils.WriteFile(fname, data="")
290 results = RunParts(self.rundir, reset_env=True)
292 for fname in sorted(files):
293 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
296 """Test correct execution"""
297 fname = os.path.join(self.rundir, "00test")
298 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
299 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
300 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
301 self.failUnlessEqual(relname, os.path.basename(fname))
302 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
303 self.failUnlessEqual(runresult.stdout, "ciao")
305 def testRunFail(self):
306 """Test correct execution, with run failure"""
307 fname = os.path.join(self.rundir, "00test")
308 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
309 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
310 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
311 self.failUnlessEqual(relname, os.path.basename(fname))
312 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
313 self.failUnlessEqual(runresult.exit_code, 1)
314 self.failUnless(runresult.failed)
316 def testRunMix(self):
318 files.append(os.path.join(self.rundir, "00test"))
319 files.append(os.path.join(self.rundir, "42test"))
320 files.append(os.path.join(self.rundir, "64test"))
321 files.append(os.path.join(self.rundir, "99test"))
325 # 1st has errors in execution
326 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
327 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
330 utils.WriteFile(files[1], data="")
332 # 3rd cannot execute properly
333 utils.WriteFile(files[2], data="")
334 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
337 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
338 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
340 results = RunParts(self.rundir, reset_env=True)
342 (relname, status, runresult) = results[0]
343 self.failUnlessEqual(relname, os.path.basename(files[0]))
344 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
345 self.failUnlessEqual(runresult.exit_code, 1)
346 self.failUnless(runresult.failed)
348 (relname, status, runresult) = results[1]
349 self.failUnlessEqual(relname, os.path.basename(files[1]))
350 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
351 self.failUnlessEqual(runresult, None)
353 (relname, status, runresult) = results[2]
354 self.failUnlessEqual(relname, os.path.basename(files[2]))
355 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
356 self.failUnless(runresult)
358 (relname, status, runresult) = results[3]
359 self.failUnlessEqual(relname, os.path.basename(files[3]))
360 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
361 self.failUnlessEqual(runresult.output, "ciao")
362 self.failUnlessEqual(runresult.exit_code, 0)
363 self.failUnless(not runresult.failed)
366 class TestRemoveFile(unittest.TestCase):
367 """Test case for the RemoveFile function"""
370 """Create a temp dir and file for each case"""
371 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
372 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
376 if os.path.exists(self.tmpfile):
377 os.unlink(self.tmpfile)
378 os.rmdir(self.tmpdir)
381 def testIgnoreDirs(self):
382 """Test that RemoveFile() ignores directories"""
383 self.assertEqual(None, RemoveFile(self.tmpdir))
386 def testIgnoreNotExisting(self):
387 """Test that RemoveFile() ignores non-existing files"""
388 RemoveFile(self.tmpfile)
389 RemoveFile(self.tmpfile)
392 def testRemoveFile(self):
393 """Test that RemoveFile does remove a file"""
394 RemoveFile(self.tmpfile)
395 if os.path.exists(self.tmpfile):
396 self.fail("File '%s' not removed" % self.tmpfile)
399 def testRemoveSymlink(self):
400 """Test that RemoveFile does remove symlinks"""
401 symlink = self.tmpdir + "/symlink"
402 os.symlink("no-such-file", symlink)
404 if os.path.exists(symlink):
405 self.fail("File '%s' not removed" % symlink)
406 os.symlink(self.tmpfile, symlink)
408 if os.path.exists(symlink):
409 self.fail("File '%s' not removed" % symlink)
412 class TestRename(unittest.TestCase):
413 """Test case for RenameFile"""
416 """Create a temporary directory"""
417 self.tmpdir = tempfile.mkdtemp()
418 self.tmpfile = os.path.join(self.tmpdir, "test1")
421 open(self.tmpfile, "w").close()
424 """Remove temporary directory"""
425 shutil.rmtree(self.tmpdir)
427 def testSimpleRename1(self):
428 """Simple rename 1"""
429 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
430 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
432 def testSimpleRename2(self):
433 """Simple rename 2"""
434 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
436 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
438 def testRenameMkdir(self):
439 """Rename with mkdir"""
440 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
442 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
443 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
445 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
446 os.path.join(self.tmpdir, "test/foo/bar/baz"),
448 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
449 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
450 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
453 class TestMatchNameComponent(unittest.TestCase):
454 """Test case for the MatchNameComponent function"""
456 def testEmptyList(self):
457 """Test that there is no match against an empty list"""
459 self.failUnlessEqual(MatchNameComponent("", []), None)
460 self.failUnlessEqual(MatchNameComponent("test", []), None)
462 def testSingleMatch(self):
463 """Test that a single match is performed correctly"""
464 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
465 for key in "test2", "test2.example", "test2.example.com":
466 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
468 def testMultipleMatches(self):
469 """Test that a multiple match is returned as None"""
470 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
471 for key in "test1", "test1.example":
472 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
474 def testFullMatch(self):
475 """Test that a full match is returned correctly"""
477 key2 = "test1.example"
478 mlist = [key2, key2 + ".com"]
479 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
480 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
482 def testCaseInsensitivePartialMatch(self):
483 """Test for the case_insensitive keyword"""
484 mlist = ["test1.example.com", "test2.example.net"]
485 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
487 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
489 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
491 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
495 def testCaseInsensitiveFullMatch(self):
496 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
497 # Between the two ts1 a full string match non-case insensitive should work
498 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
500 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
502 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
504 # Between the two ts2 only case differs, so only case-match works
505 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
507 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
509 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
513 class TestFormatUnit(unittest.TestCase):
514 """Test case for the FormatUnit function"""
517 self.assertEqual(FormatUnit(1, 'h'), '1M')
518 self.assertEqual(FormatUnit(100, 'h'), '100M')
519 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
521 self.assertEqual(FormatUnit(1, 'm'), '1')
522 self.assertEqual(FormatUnit(100, 'm'), '100')
523 self.assertEqual(FormatUnit(1023, 'm'), '1023')
525 self.assertEqual(FormatUnit(1024, 'm'), '1024')
526 self.assertEqual(FormatUnit(1536, 'm'), '1536')
527 self.assertEqual(FormatUnit(17133, 'm'), '17133')
528 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
531 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
532 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
533 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
534 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
536 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
537 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
538 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
539 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
541 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
542 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
543 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
546 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
547 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
548 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
550 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
551 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
552 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
554 class TestParseUnit(unittest.TestCase):
555 """Test case for the ParseUnit function"""
558 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
559 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
560 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
562 def testRounding(self):
563 self.assertEqual(ParseUnit('0'), 0)
564 self.assertEqual(ParseUnit('1'), 4)
565 self.assertEqual(ParseUnit('2'), 4)
566 self.assertEqual(ParseUnit('3'), 4)
568 self.assertEqual(ParseUnit('124'), 124)
569 self.assertEqual(ParseUnit('125'), 128)
570 self.assertEqual(ParseUnit('126'), 128)
571 self.assertEqual(ParseUnit('127'), 128)
572 self.assertEqual(ParseUnit('128'), 128)
573 self.assertEqual(ParseUnit('129'), 132)
574 self.assertEqual(ParseUnit('130'), 132)
576 def testFloating(self):
577 self.assertEqual(ParseUnit('0'), 0)
578 self.assertEqual(ParseUnit('0.5'), 4)
579 self.assertEqual(ParseUnit('1.75'), 4)
580 self.assertEqual(ParseUnit('1.99'), 4)
581 self.assertEqual(ParseUnit('2.00'), 4)
582 self.assertEqual(ParseUnit('2.01'), 4)
583 self.assertEqual(ParseUnit('3.99'), 4)
584 self.assertEqual(ParseUnit('4.00'), 4)
585 self.assertEqual(ParseUnit('4.01'), 8)
586 self.assertEqual(ParseUnit('1.5G'), 1536)
587 self.assertEqual(ParseUnit('1.8G'), 1844)
588 self.assertEqual(ParseUnit('8.28T'), 8682212)
590 def testSuffixes(self):
591 for sep in ('', ' ', ' ', "\t", "\t "):
592 for suffix, scale in TestParseUnit.SCALES:
593 for func in (lambda x: x, str.lower, str.upper):
594 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
597 def testInvalidInput(self):
598 for sep in ('-', '_', ',', 'a'):
599 for suffix, _ in TestParseUnit.SCALES:
600 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
602 for suffix, _ in TestParseUnit.SCALES:
603 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
606 class TestSshKeys(testutils.GanetiTestCase):
607 """Test case for the AddAuthorizedKey function"""
609 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
610 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
611 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
614 testutils.GanetiTestCase.setUp(self)
615 self.tmpname = self._CreateTempFile()
616 handle = open(self.tmpname, 'w')
618 handle.write("%s\n" % TestSshKeys.KEY_A)
619 handle.write("%s\n" % TestSshKeys.KEY_B)
623 def testAddingNewKey(self):
624 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
626 self.assertFileContent(self.tmpname,
627 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
628 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
629 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
630 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
632 def testAddingAlmostButNotCompletelyTheSameKey(self):
633 AddAuthorizedKey(self.tmpname,
634 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
636 self.assertFileContent(self.tmpname,
637 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
638 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
639 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
640 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
642 def testAddingExistingKeyWithSomeMoreSpaces(self):
643 AddAuthorizedKey(self.tmpname,
644 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
646 self.assertFileContent(self.tmpname,
647 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
648 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
649 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
651 def testRemovingExistingKeyWithSomeMoreSpaces(self):
652 RemoveAuthorizedKey(self.tmpname,
653 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
655 self.assertFileContent(self.tmpname,
656 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
657 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
659 def testRemovingNonExistingKey(self):
660 RemoveAuthorizedKey(self.tmpname,
661 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
663 self.assertFileContent(self.tmpname,
664 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
665 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
666 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
669 class TestEtcHosts(testutils.GanetiTestCase):
670 """Test functions modifying /etc/hosts"""
673 testutils.GanetiTestCase.setUp(self)
674 self.tmpname = self._CreateTempFile()
675 handle = open(self.tmpname, 'w')
677 handle.write('# This is a test file for /etc/hosts\n')
678 handle.write('127.0.0.1\tlocalhost\n')
679 handle.write('192.168.1.1 router gw\n')
683 def testSettingNewIp(self):
684 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
686 self.assertFileContent(self.tmpname,
687 "# This is a test file for /etc/hosts\n"
688 "127.0.0.1\tlocalhost\n"
689 "192.168.1.1 router gw\n"
690 "1.2.3.4\tmyhost.domain.tld myhost\n")
691 self.assertFileMode(self.tmpname, 0644)
693 def testSettingExistingIp(self):
694 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
697 self.assertFileContent(self.tmpname,
698 "# This is a test file for /etc/hosts\n"
699 "127.0.0.1\tlocalhost\n"
700 "192.168.1.1\tmyhost.domain.tld myhost\n")
701 self.assertFileMode(self.tmpname, 0644)
703 def testSettingDuplicateName(self):
704 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
706 self.assertFileContent(self.tmpname,
707 "# This is a test file for /etc/hosts\n"
708 "127.0.0.1\tlocalhost\n"
709 "192.168.1.1 router gw\n"
711 self.assertFileMode(self.tmpname, 0644)
713 def testRemovingExistingHost(self):
714 RemoveEtcHostsEntry(self.tmpname, 'router')
716 self.assertFileContent(self.tmpname,
717 "# This is a test file for /etc/hosts\n"
718 "127.0.0.1\tlocalhost\n"
720 self.assertFileMode(self.tmpname, 0644)
722 def testRemovingSingleExistingHost(self):
723 RemoveEtcHostsEntry(self.tmpname, 'localhost')
725 self.assertFileContent(self.tmpname,
726 "# This is a test file for /etc/hosts\n"
727 "192.168.1.1 router gw\n")
728 self.assertFileMode(self.tmpname, 0644)
730 def testRemovingNonExistingHost(self):
731 RemoveEtcHostsEntry(self.tmpname, 'myhost')
733 self.assertFileContent(self.tmpname,
734 "# This is a test file for /etc/hosts\n"
735 "127.0.0.1\tlocalhost\n"
736 "192.168.1.1 router gw\n")
737 self.assertFileMode(self.tmpname, 0644)
739 def testRemovingAlias(self):
740 RemoveEtcHostsEntry(self.tmpname, 'gw')
742 self.assertFileContent(self.tmpname,
743 "# This is a test file for /etc/hosts\n"
744 "127.0.0.1\tlocalhost\n"
745 "192.168.1.1 router\n")
746 self.assertFileMode(self.tmpname, 0644)
749 class TestShellQuoting(unittest.TestCase):
750 """Test case for shell quoting functions"""
752 def testShellQuote(self):
753 self.assertEqual(ShellQuote('abc'), "abc")
754 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
755 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
756 self.assertEqual(ShellQuote("a b c"), "'a b c'")
757 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
759 def testShellQuoteArgs(self):
760 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
761 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
762 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
765 class TestTcpPing(unittest.TestCase):
766 """Testcase for TCP version of ping - against listen(2)ing port"""
769 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
770 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
771 self.listenerport = self.listener.getsockname()[1]
772 self.listener.listen(1)
775 self.listener.shutdown(socket.SHUT_RDWR)
777 del self.listenerport
779 def testTcpPingToLocalHostAccept(self):
780 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
783 live_port_needed=True,
784 source=constants.LOCALHOST_IP_ADDRESS,
786 "failed to connect to test listener")
788 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
791 live_port_needed=True,
793 "failed to connect to test listener (no source)")
796 class TestTcpPingDeaf(unittest.TestCase):
797 """Testcase for TCP version of ping - against non listen(2)ing port"""
800 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
801 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
802 self.deaflistenerport = self.deaflistener.getsockname()[1]
805 del self.deaflistener
806 del self.deaflistenerport
808 def testTcpPingToLocalHostAcceptDeaf(self):
809 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
810 self.deaflistenerport,
811 timeout=constants.TCP_PING_TIMEOUT,
812 live_port_needed=True,
813 source=constants.LOCALHOST_IP_ADDRESS,
814 ), # need successful connect(2)
815 "successfully connected to deaf listener")
817 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
818 self.deaflistenerport,
819 timeout=constants.TCP_PING_TIMEOUT,
820 live_port_needed=True,
821 ), # need successful connect(2)
822 "successfully connected to deaf listener (no source addr)")
824 def testTcpPingToLocalHostNoAccept(self):
825 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
826 self.deaflistenerport,
827 timeout=constants.TCP_PING_TIMEOUT,
828 live_port_needed=False,
829 source=constants.LOCALHOST_IP_ADDRESS,
830 ), # ECONNREFUSED is OK
831 "failed to ping alive host on deaf port")
833 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
834 self.deaflistenerport,
835 timeout=constants.TCP_PING_TIMEOUT,
836 live_port_needed=False,
837 ), # ECONNREFUSED is OK
838 "failed to ping alive host on deaf port (no source addr)")
841 class TestOwnIpAddress(unittest.TestCase):
842 """Testcase for OwnIpAddress"""
844 def testOwnLoopback(self):
845 """check having the loopback ip"""
846 self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
847 "Should own the loopback address")
849 def testNowOwnAddress(self):
850 """check that I don't own an address"""
852 # network 192.0.2.0/24 is reserved for test/documentation as per
853 # rfc 3330, so we *should* not have an address of this range... if
854 # this fails, we should extend the test to multiple addresses
856 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
859 class TestListVisibleFiles(unittest.TestCase):
860 """Test case for ListVisibleFiles"""
863 self.path = tempfile.mkdtemp()
866 shutil.rmtree(self.path)
868 def _test(self, files, expected):
870 expected = expected[:]
874 f = open(os.path.join(self.path, name), 'w')
880 found = ListVisibleFiles(self.path)
883 self.assertEqual(found, expected)
885 def testAllVisible(self):
886 files = ["a", "b", "c"]
888 self._test(files, expected)
890 def testNoneVisible(self):
891 files = [".a", ".b", ".c"]
893 self._test(files, expected)
895 def testSomeVisible(self):
896 files = ["a", "b", ".c"]
897 expected = ["a", "b"]
898 self._test(files, expected)
900 def testNonAbsolutePath(self):
901 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
903 def testNonNormalizedPath(self):
904 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
908 class TestNewUUID(unittest.TestCase):
909 """Test case for NewUUID"""
911 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
912 '[a-f0-9]{4}-[a-f0-9]{12}$')
915 self.failUnless(self._re_uuid.match(utils.NewUUID()))
918 class TestUniqueSequence(unittest.TestCase):
919 """Test case for UniqueSequence"""
921 def _test(self, input, expected):
922 self.assertEqual(utils.UniqueSequence(input), expected)
926 self._test([1, 2, 3], [1, 2, 3])
927 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
928 self._test([1, 2, 2, 3], [1, 2, 3])
929 self._test([1, 2, 3, 3], [1, 2, 3])
932 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
933 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
936 self._test(["a", "a"], ["a"])
937 self._test(["a", "b"], ["a", "b"])
938 self._test(["a", "b", "a"], ["a", "b"])
941 class TestFirstFree(unittest.TestCase):
942 """Test case for the FirstFree function"""
946 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
947 self.failUnlessEqual(FirstFree([]), None)
948 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
949 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
950 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
953 class TestTailFile(testutils.GanetiTestCase):
954 """Test case for the TailFile function"""
957 fname = self._CreateTempFile()
958 self.failUnlessEqual(TailFile(fname), [])
959 self.failUnlessEqual(TailFile(fname, lines=25), [])
961 def testAllLines(self):
962 data = ["test %d" % i for i in range(30)]
964 fname = self._CreateTempFile()
965 fd = open(fname, "w")
966 fd.write("\n".join(data[:i]))
970 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
972 def testPartialLines(self):
973 data = ["test %d" % i for i in range(30)]
974 fname = self._CreateTempFile()
975 fd = open(fname, "w")
976 fd.write("\n".join(data))
979 for i in range(1, 30):
980 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
982 def testBigFile(self):
983 data = ["test %d" % i for i in range(30)]
984 fname = self._CreateTempFile()
985 fd = open(fname, "w")
986 fd.write("X" * 1048576)
988 fd.write("\n".join(data))
991 for i in range(1, 30):
992 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
995 class _BaseFileLockTest:
996 """Test case for the FileLock class"""
998 def testSharedNonblocking(self):
999 self.lock.Shared(blocking=False)
1002 def testExclusiveNonblocking(self):
1003 self.lock.Exclusive(blocking=False)
1006 def testUnlockNonblocking(self):
1007 self.lock.Unlock(blocking=False)
1010 def testSharedBlocking(self):
1011 self.lock.Shared(blocking=True)
1014 def testExclusiveBlocking(self):
1015 self.lock.Exclusive(blocking=True)
1018 def testUnlockBlocking(self):
1019 self.lock.Unlock(blocking=True)
1022 def testSharedExclusiveUnlock(self):
1023 self.lock.Shared(blocking=False)
1024 self.lock.Exclusive(blocking=False)
1025 self.lock.Unlock(blocking=False)
1028 def testExclusiveSharedUnlock(self):
1029 self.lock.Exclusive(blocking=False)
1030 self.lock.Shared(blocking=False)
1031 self.lock.Unlock(blocking=False)
1034 def testSimpleTimeout(self):
1035 # These will succeed on the first attempt, hence a short timeout
1036 self.lock.Shared(blocking=True, timeout=10.0)
1037 self.lock.Exclusive(blocking=False, timeout=10.0)
1038 self.lock.Unlock(blocking=True, timeout=10.0)
1042 def _TryLockInner(filename, shared, blocking):
1043 lock = utils.FileLock.Open(filename)
1051 # The timeout doesn't really matter as the parent process waits for us to
1053 fn(blocking=blocking, timeout=0.01)
1054 except errors.LockError, err:
1059 def _TryLock(self, *args):
1060 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1063 def testTimeout(self):
1064 for blocking in [True, False]:
1065 self.lock.Exclusive(blocking=True)
1066 self.failIf(self._TryLock(False, blocking))
1067 self.failIf(self._TryLock(True, blocking))
1069 self.lock.Shared(blocking=True)
1070 self.assert_(self._TryLock(True, blocking))
1071 self.failIf(self._TryLock(False, blocking))
1073 def testCloseShared(self):
1075 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1077 def testCloseExclusive(self):
1079 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1081 def testCloseUnlock(self):
1083 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1086 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1087 TESTDATA = "Hello World\n" * 10
1090 testutils.GanetiTestCase.setUp(self)
1092 self.tmpfile = tempfile.NamedTemporaryFile()
1093 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1094 self.lock = utils.FileLock.Open(self.tmpfile.name)
1096 # Ensure "Open" didn't truncate file
1097 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1100 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1102 testutils.GanetiTestCase.tearDown(self)
1105 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1107 self.tmpfile = tempfile.NamedTemporaryFile()
1108 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1111 class TestTimeFunctions(unittest.TestCase):
1112 """Test case for time functions"""
1115 self.assertEqual(utils.SplitTime(1), (1, 0))
1116 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1117 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1118 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1119 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1120 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1121 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1122 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1124 self.assertRaises(AssertionError, utils.SplitTime, -1)
1126 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1127 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1128 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1130 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1132 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1134 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1135 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1136 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1137 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1138 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1141 class FieldSetTestCase(unittest.TestCase):
1142 """Test case for FieldSets"""
1144 def testSimpleMatch(self):
1145 f = utils.FieldSet("a", "b", "c", "def")
1146 self.failUnless(f.Matches("a"))
1147 self.failIf(f.Matches("d"), "Substring matched")
1148 self.failIf(f.Matches("defghi"), "Prefix string matched")
1149 self.failIf(f.NonMatching(["b", "c"]))
1150 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1151 self.failUnless(f.NonMatching(["a", "d"]))
1153 def testRegexMatch(self):
1154 f = utils.FieldSet("a", "b([0-9]+)", "c")
1155 self.failUnless(f.Matches("b1"))
1156 self.failUnless(f.Matches("b99"))
1157 self.failIf(f.Matches("b/1"))
1158 self.failIf(f.NonMatching(["b12", "c"]))
1159 self.failUnless(f.NonMatching(["a", "1"]))
1161 class TestForceDictType(unittest.TestCase):
1162 """Test case for ForceDictType"""
1166 'a': constants.VTYPE_INT,
1167 'b': constants.VTYPE_BOOL,
1168 'c': constants.VTYPE_STRING,
1169 'd': constants.VTYPE_SIZE,
1172 def _fdt(self, dict, allowed_values=None):
1173 if allowed_values is None:
1174 ForceDictType(dict, self.key_types)
1176 ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1180 def testSimpleDict(self):
1181 self.assertEqual(self._fdt({}), {})
1182 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1183 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1184 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1185 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1186 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1187 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1188 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1189 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1190 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1191 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1192 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1194 def testErrors(self):
1195 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1196 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1197 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1198 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1201 class TestIsAbsNormPath(unittest.TestCase):
1202 """Testing case for IsProcessAlive"""
1204 def _pathTestHelper(self, path, result):
1206 self.assert_(IsNormAbsPath(path),
1207 "Path %s should result absolute and normalized" % path)
1209 self.assert_(not IsNormAbsPath(path),
1210 "Path %s should not result absolute and normalized" % path)
1213 self._pathTestHelper('/etc', True)
1214 self._pathTestHelper('/srv', True)
1215 self._pathTestHelper('etc', False)
1216 self._pathTestHelper('/etc/../root', False)
1217 self._pathTestHelper('/etc/', False)
1220 class TestSafeEncode(unittest.TestCase):
1221 """Test case for SafeEncode"""
1223 def testAscii(self):
1224 for txt in [string.digits, string.letters, string.punctuation]:
1225 self.failUnlessEqual(txt, SafeEncode(txt))
1227 def testDoubleEncode(self):
1228 for i in range(255):
1229 txt = SafeEncode(chr(i))
1230 self.failUnlessEqual(txt, SafeEncode(txt))
1232 def testUnicode(self):
1233 # 1024 is high enough to catch non-direct ASCII mappings
1234 for i in range(1024):
1235 txt = SafeEncode(unichr(i))
1236 self.failUnlessEqual(txt, SafeEncode(txt))
1239 class TestFormatTime(unittest.TestCase):
1240 """Testing case for FormatTime"""
1243 self.failUnlessEqual(FormatTime(None), "N/A")
1245 def testInvalid(self):
1246 self.failUnlessEqual(FormatTime(()), "N/A")
1249 # tests that we accept time.time input
1250 FormatTime(time.time())
1251 # tests that we accept int input
1252 FormatTime(int(time.time()))
1255 class RunInSeparateProcess(unittest.TestCase):
1257 for exp in [True, False]:
1261 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1264 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1265 def _child(carg1, carg2):
1266 return carg1 == "Foo" and carg2 == arg
1268 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1271 parent_pid = os.getpid()
1274 return os.getpid() == parent_pid
1276 self.failIf(utils.RunInSeparateProcess(_check))
1278 def testSignal(self):
1280 os.kill(os.getpid(), signal.SIGTERM)
1282 self.assertRaises(errors.GenericError,
1283 utils.RunInSeparateProcess, _kill)
1285 def testException(self):
1287 raise errors.GenericError("This is a test")
1289 self.assertRaises(errors.GenericError,
1290 utils.RunInSeparateProcess, _exc)
1293 class TestFingerprintFile(unittest.TestCase):
1295 self.tmpfile = tempfile.NamedTemporaryFile()
1298 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1299 "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1301 utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1302 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1303 "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1306 class TestUnescapeAndSplit(unittest.TestCase):
1307 """Testing case for UnescapeAndSplit"""
1310 # testing more that one separator for regexp safety
1311 self._seps = [",", "+", "."]
1313 def testSimple(self):
1314 a = ["a", "b", "c", "d"]
1315 for sep in self._seps:
1316 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1318 def testEscape(self):
1319 for sep in self._seps:
1320 a = ["a", "b\\" + sep + "c", "d"]
1321 b = ["a", "b" + sep + "c", "d"]
1322 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1324 def testDoubleEscape(self):
1325 for sep in self._seps:
1326 a = ["a", "b\\\\", "c", "d"]
1327 b = ["a", "b\\", "c", "d"]
1328 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1330 def testThreeEscape(self):
1331 for sep in self._seps:
1332 a = ["a", "b\\\\\\" + sep + "c", "d"]
1333 b = ["a", "b\\" + sep + "c", "d"]
1334 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1337 class TestPathJoin(unittest.TestCase):
1338 """Testing case for PathJoin"""
1340 def testBasicItems(self):
1341 mlist = ["/a", "b", "c"]
1342 self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1344 def testNonAbsPrefix(self):
1345 self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1347 def testBackTrack(self):
1348 self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1350 def testMultiAbs(self):
1351 self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1354 class TestHostInfo(unittest.TestCase):
1355 """Testing case for HostInfo"""
1357 def testUppercase(self):
1358 data = "AbC.example.com"
1359 self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1361 def testTooLongName(self):
1362 data = "a.b." + "c" * 255
1363 self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1365 def testTrailingDot(self):
1367 self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1369 def testInvalidName(self):
1377 self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1379 def testValidName(self):
1387 HostInfo.NormalizeName(value)
1391 if __name__ == '__main__':
1392 testutils.GanetiTestProgram()