4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
40 import distutils.version
46 from ganeti import constants
47 from ganeti import compat
48 from ganeti import utils
49 from ganeti import errors
50 from ganeti import serializer
51 from ganeti.utils import IsProcessAlive, RunCmd, \
52 RemoveFile, MatchNameComponent, FormatUnit, \
53 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
54 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
55 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
56 TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
57 UnescapeAndSplit, RunParts, PathJoin, HostInfo, ReadOneLineFile
59 from ganeti.errors import LockError, UnitParseError, GenericError, \
60 ProgrammerError, OpPrereqError
63 class TestIsProcessAlive(unittest.TestCase):
64 """Testing case for IsProcessAlive"""
68 self.assert_(IsProcessAlive(mypid),
69 "can't find myself running")
71 def testNotExisting(self):
72 pid_non_existing = os.fork()
73 if pid_non_existing == 0:
75 elif pid_non_existing < 0:
76 raise SystemError("can't fork")
77 os.waitpid(pid_non_existing, 0)
78 self.assert_(not IsProcessAlive(pid_non_existing),
79 "nonexisting process detected")
82 class TestGetProcStatusPath(unittest.TestCase):
84 self.assert_("/1234/" in utils._GetProcStatusPath(1234))
85 self.assertNotEqual(utils._GetProcStatusPath(1),
86 utils._GetProcStatusPath(2))
89 class TestIsProcessHandlingSignal(unittest.TestCase):
91 self.tmpdir = tempfile.mkdtemp()
94 shutil.rmtree(self.tmpdir)
96 def testParseSigsetT(self):
97 self.assertEqual(len(utils._ParseSigsetT("0")), 0)
98 self.assertEqual(utils._ParseSigsetT("1"), set([1]))
99 self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
100 self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
101 self.assertEqual(utils._ParseSigsetT("0000000180000202"),
102 set([2, 10, 32, 33]))
103 self.assertEqual(utils._ParseSigsetT("0000000180000002"),
105 self.assertEqual(utils._ParseSigsetT("0000000188000002"),
106 set([2, 28, 32, 33]))
107 self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
108 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
109 24, 25, 26, 28, 31]))
110 self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
112 def testGetProcStatusField(self):
113 for field in ["SigCgt", "Name", "FDSize"]:
114 for value in ["", "0", "cat", " 1234 KB"]:
115 pstatus = "\n".join([
117 "%s: %s" % (field, value),
120 result = utils._GetProcStatusField(pstatus, field)
121 self.assertEqual(result, value.strip())
124 sp = utils.PathJoin(self.tmpdir, "status")
126 utils.WriteFile(sp, data="\n".join([
128 "State: S (sleeping)",
133 "SigBlk: 0000000000010000",
134 "SigIgn: 0000000000384004",
135 "SigCgt: 000000004b813efb",
136 "CapEff: 0000000000000000",
139 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
141 def testNoSigCgt(self):
142 sp = utils.PathJoin(self.tmpdir, "status")
144 utils.WriteFile(sp, data="\n".join([
148 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
149 1234, 10, status_path=sp)
151 def testNoSuchFile(self):
152 sp = utils.PathJoin(self.tmpdir, "notexist")
154 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
157 def _TestRealProcess():
158 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
159 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
160 raise Exception("SIGUSR1 is handled when it should not be")
162 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
163 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
164 raise Exception("SIGUSR1 is not handled when it should be")
166 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
167 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
168 raise Exception("SIGUSR1 is not handled when it should be")
170 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
171 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
172 raise Exception("SIGUSR1 is handled when it should not be")
176 def testRealProcess(self):
177 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
180 class TestPidFileFunctions(unittest.TestCase):
181 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
184 self.dir = tempfile.mkdtemp()
185 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
186 utils.DaemonPidFileName = self.f_dpn
188 def testPidFileFunctions(self):
189 pid_file = self.f_dpn('test')
190 utils.WritePidFile('test')
191 self.failUnless(os.path.exists(pid_file),
192 "PID file should have been created")
193 read_pid = utils.ReadPidFile(pid_file)
194 self.failUnlessEqual(read_pid, os.getpid())
195 self.failUnless(utils.IsProcessAlive(read_pid))
196 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
197 utils.RemovePidFile('test')
198 self.failIf(os.path.exists(pid_file),
199 "PID file should not exist anymore")
200 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
201 "ReadPidFile should return 0 for missing pid file")
202 fh = open(pid_file, "w")
205 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
206 "ReadPidFile should return 0 for invalid pid file")
207 utils.RemovePidFile('test')
208 self.failIf(os.path.exists(pid_file),
209 "PID file should not exist anymore")
212 pid_file = self.f_dpn('child')
213 r_fd, w_fd = os.pipe()
215 if new_pid == 0: #child
216 utils.WritePidFile('child')
221 # else we are in the parent
222 # wait until the child has written the pid file
224 read_pid = utils.ReadPidFile(pid_file)
225 self.failUnlessEqual(read_pid, new_pid)
226 self.failUnless(utils.IsProcessAlive(new_pid))
227 utils.KillProcess(new_pid, waitpid=True)
228 self.failIf(utils.IsProcessAlive(new_pid))
229 utils.RemovePidFile('child')
230 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
233 for name in os.listdir(self.dir):
234 os.unlink(os.path.join(self.dir, name))
238 class TestRunCmd(testutils.GanetiTestCase):
239 """Testing case for the RunCmd function"""
242 testutils.GanetiTestCase.setUp(self)
243 self.magic = time.ctime() + " ganeti test"
244 self.fname = self._CreateTempFile()
247 """Test successful exit code"""
248 result = RunCmd("/bin/sh -c 'exit 0'")
249 self.assertEqual(result.exit_code, 0)
250 self.assertEqual(result.output, "")
253 """Test fail exit code"""
254 result = RunCmd("/bin/sh -c 'exit 1'")
255 self.assertEqual(result.exit_code, 1)
256 self.assertEqual(result.output, "")
258 def testStdout(self):
259 """Test standard output"""
260 cmd = 'echo -n "%s"' % self.magic
261 result = RunCmd("/bin/sh -c '%s'" % cmd)
262 self.assertEqual(result.stdout, self.magic)
263 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
264 self.assertEqual(result.output, "")
265 self.assertFileContent(self.fname, self.magic)
267 def testStderr(self):
268 """Test standard error"""
269 cmd = 'echo -n "%s"' % self.magic
270 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
271 self.assertEqual(result.stderr, self.magic)
272 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
273 self.assertEqual(result.output, "")
274 self.assertFileContent(self.fname, self.magic)
276 def testCombined(self):
277 """Test combined output"""
278 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
279 expected = "A" + self.magic + "B" + self.magic
280 result = RunCmd("/bin/sh -c '%s'" % cmd)
281 self.assertEqual(result.output, expected)
282 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
283 self.assertEqual(result.output, "")
284 self.assertFileContent(self.fname, expected)
286 def testSignal(self):
288 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
289 self.assertEqual(result.signal, 15)
290 self.assertEqual(result.output, "")
292 def testListRun(self):
294 result = RunCmd(["true"])
295 self.assertEqual(result.signal, None)
296 self.assertEqual(result.exit_code, 0)
297 result = RunCmd(["/bin/sh", "-c", "exit 1"])
298 self.assertEqual(result.signal, None)
299 self.assertEqual(result.exit_code, 1)
300 result = RunCmd(["echo", "-n", self.magic])
301 self.assertEqual(result.signal, None)
302 self.assertEqual(result.exit_code, 0)
303 self.assertEqual(result.stdout, self.magic)
305 def testFileEmptyOutput(self):
306 """Test file output"""
307 result = RunCmd(["true"], output=self.fname)
308 self.assertEqual(result.signal, None)
309 self.assertEqual(result.exit_code, 0)
310 self.assertFileContent(self.fname, "")
313 """Test locale environment"""
314 old_env = os.environ.copy()
316 os.environ["LANG"] = "en_US.UTF-8"
317 os.environ["LC_ALL"] = "en_US.UTF-8"
318 result = RunCmd(["locale"])
319 for line in result.output.splitlines():
320 key, value = line.split("=", 1)
321 # Ignore these variables, they're overridden by LC_ALL
322 if key == "LANG" or key == "LANGUAGE":
324 self.failIf(value and value != "C" and value != '"C"',
325 "Variable %s is set to the invalid value '%s'" % (key, value))
329 def testDefaultCwd(self):
330 """Test default working directory"""
331 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
334 """Test default working directory"""
335 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
336 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
338 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
340 def testResetEnv(self):
341 """Test environment reset functionality"""
342 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
343 self.failUnlessEqual(RunCmd(["env"], reset_env=True,
344 env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
347 class TestRunParts(unittest.TestCase):
348 """Testing case for the RunParts function"""
351 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
354 shutil.rmtree(self.rundir)
357 """Test on an empty dir"""
358 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
360 def testSkipWrongName(self):
361 """Test that wrong files are skipped"""
362 fname = os.path.join(self.rundir, "00test.dot")
363 utils.WriteFile(fname, data="")
364 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
365 relname = os.path.basename(fname)
366 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
367 [(relname, constants.RUNPARTS_SKIP, None)])
369 def testSkipNonExec(self):
370 """Test that non executable files are skipped"""
371 fname = os.path.join(self.rundir, "00test")
372 utils.WriteFile(fname, data="")
373 relname = os.path.basename(fname)
374 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
375 [(relname, constants.RUNPARTS_SKIP, None)])
378 """Test error on a broken executable"""
379 fname = os.path.join(self.rundir, "00test")
380 utils.WriteFile(fname, data="")
381 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
382 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
383 self.failUnlessEqual(relname, os.path.basename(fname))
384 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
385 self.failUnless(error)
387 def testSorted(self):
388 """Test executions are sorted"""
390 files.append(os.path.join(self.rundir, "64test"))
391 files.append(os.path.join(self.rundir, "00test"))
392 files.append(os.path.join(self.rundir, "42test"))
395 utils.WriteFile(fname, data="")
397 results = RunParts(self.rundir, reset_env=True)
399 for fname in sorted(files):
400 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
403 """Test correct execution"""
404 fname = os.path.join(self.rundir, "00test")
405 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
406 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
407 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
408 self.failUnlessEqual(relname, os.path.basename(fname))
409 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
410 self.failUnlessEqual(runresult.stdout, "ciao")
412 def testRunFail(self):
413 """Test correct execution, with run failure"""
414 fname = os.path.join(self.rundir, "00test")
415 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
416 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
417 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
418 self.failUnlessEqual(relname, os.path.basename(fname))
419 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
420 self.failUnlessEqual(runresult.exit_code, 1)
421 self.failUnless(runresult.failed)
423 def testRunMix(self):
425 files.append(os.path.join(self.rundir, "00test"))
426 files.append(os.path.join(self.rundir, "42test"))
427 files.append(os.path.join(self.rundir, "64test"))
428 files.append(os.path.join(self.rundir, "99test"))
432 # 1st has errors in execution
433 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
434 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
437 utils.WriteFile(files[1], data="")
439 # 3rd cannot execute properly
440 utils.WriteFile(files[2], data="")
441 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
444 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
445 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
447 results = RunParts(self.rundir, reset_env=True)
449 (relname, status, runresult) = results[0]
450 self.failUnlessEqual(relname, os.path.basename(files[0]))
451 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
452 self.failUnlessEqual(runresult.exit_code, 1)
453 self.failUnless(runresult.failed)
455 (relname, status, runresult) = results[1]
456 self.failUnlessEqual(relname, os.path.basename(files[1]))
457 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
458 self.failUnlessEqual(runresult, None)
460 (relname, status, runresult) = results[2]
461 self.failUnlessEqual(relname, os.path.basename(files[2]))
462 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
463 self.failUnless(runresult)
465 (relname, status, runresult) = results[3]
466 self.failUnlessEqual(relname, os.path.basename(files[3]))
467 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
468 self.failUnlessEqual(runresult.output, "ciao")
469 self.failUnlessEqual(runresult.exit_code, 0)
470 self.failUnless(not runresult.failed)
473 class TestStartDaemon(testutils.GanetiTestCase):
475 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
476 self.tmpfile = os.path.join(self.tmpdir, "test")
479 shutil.rmtree(self.tmpdir)
482 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
483 self._wait(self.tmpfile, 60.0, "Hello World")
485 def testShellOutput(self):
486 utils.StartDaemon("echo Hello World", output=self.tmpfile)
487 self._wait(self.tmpfile, 60.0, "Hello World")
489 def testNoShellNoOutput(self):
490 utils.StartDaemon(["pwd"])
492 def testNoShellNoOutputTouch(self):
493 testfile = os.path.join(self.tmpdir, "check")
494 self.failIf(os.path.exists(testfile))
495 utils.StartDaemon(["touch", testfile])
496 self._wait(testfile, 60.0, "")
498 def testNoShellOutput(self):
499 utils.StartDaemon(["pwd"], output=self.tmpfile)
500 self._wait(self.tmpfile, 60.0, "/")
502 def testNoShellOutputCwd(self):
503 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
504 self._wait(self.tmpfile, 60.0, os.getcwd())
506 def testShellEnv(self):
507 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
508 env={ "GNT_TEST_VAR": "Hello World", })
509 self._wait(self.tmpfile, 60.0, "Hello World")
511 def testNoShellEnv(self):
512 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
513 env={ "GNT_TEST_VAR": "Hello World", })
514 self._wait(self.tmpfile, 60.0, "Hello World")
516 def testOutputFd(self):
517 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
519 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
522 self._wait(self.tmpfile, 60.0, os.getcwd())
525 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
526 self._wait(self.tmpfile, 60.0, str(pid))
528 def testPidFile(self):
529 pidfile = os.path.join(self.tmpdir, "pid")
530 checkfile = os.path.join(self.tmpdir, "abort")
532 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
535 fd = os.open(pidfile, os.O_RDONLY)
537 # Check file is locked
538 self.assertRaises(errors.LockError, utils.LockFile, fd)
540 pidtext = os.read(fd, 100)
544 self.assertEqual(int(pidtext.strip()), pid)
546 self.assert_(utils.IsProcessAlive(pid))
548 # No matter what happens, kill daemon
549 utils.KillProcess(pid, timeout=5.0, waitpid=False)
550 self.failIf(utils.IsProcessAlive(pid))
552 self.assertEqual(utils.ReadFile(self.tmpfile), "")
554 def _wait(self, path, timeout, expected):
555 # Due to the asynchronous nature of daemon processes, polling is necessary.
556 # A timeout makes sure the test doesn't hang forever.
558 if not (os.path.isfile(path) and
559 utils.ReadFile(path).strip() == expected):
560 raise utils.RetryAgain()
563 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
564 except utils.RetryTimeout:
565 self.fail("Apparently the daemon didn't run in %s seconds and/or"
566 " didn't write the correct output" % timeout)
569 self.assertRaises(errors.OpExecError, utils.StartDaemon,
570 ["./does-NOT-EXIST/here/0123456789"])
571 self.assertRaises(errors.OpExecError, utils.StartDaemon,
572 ["./does-NOT-EXIST/here/0123456789"],
573 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
574 self.assertRaises(errors.OpExecError, utils.StartDaemon,
575 ["./does-NOT-EXIST/here/0123456789"],
576 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
577 self.assertRaises(errors.OpExecError, utils.StartDaemon,
578 ["./does-NOT-EXIST/here/0123456789"],
579 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
581 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
583 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
584 ["./does-NOT-EXIST/here/0123456789"],
585 output=self.tmpfile, output_fd=fd)
590 class TestSetCloseOnExecFlag(unittest.TestCase):
591 """Tests for SetCloseOnExecFlag"""
594 self.tmpfile = tempfile.TemporaryFile()
596 def testEnable(self):
597 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
598 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
601 def testDisable(self):
602 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
603 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
607 class TestSetNonblockFlag(unittest.TestCase):
609 self.tmpfile = tempfile.TemporaryFile()
611 def testEnable(self):
612 utils.SetNonblockFlag(self.tmpfile.fileno(), True)
613 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
616 def testDisable(self):
617 utils.SetNonblockFlag(self.tmpfile.fileno(), False)
618 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
622 class TestRemoveFile(unittest.TestCase):
623 """Test case for the RemoveFile function"""
626 """Create a temp dir and file for each case"""
627 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
628 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
632 if os.path.exists(self.tmpfile):
633 os.unlink(self.tmpfile)
634 os.rmdir(self.tmpdir)
636 def testIgnoreDirs(self):
637 """Test that RemoveFile() ignores directories"""
638 self.assertEqual(None, RemoveFile(self.tmpdir))
640 def testIgnoreNotExisting(self):
641 """Test that RemoveFile() ignores non-existing files"""
642 RemoveFile(self.tmpfile)
643 RemoveFile(self.tmpfile)
645 def testRemoveFile(self):
646 """Test that RemoveFile does remove a file"""
647 RemoveFile(self.tmpfile)
648 if os.path.exists(self.tmpfile):
649 self.fail("File '%s' not removed" % self.tmpfile)
651 def testRemoveSymlink(self):
652 """Test that RemoveFile does remove symlinks"""
653 symlink = self.tmpdir + "/symlink"
654 os.symlink("no-such-file", symlink)
656 if os.path.exists(symlink):
657 self.fail("File '%s' not removed" % symlink)
658 os.symlink(self.tmpfile, symlink)
660 if os.path.exists(symlink):
661 self.fail("File '%s' not removed" % symlink)
664 class TestRename(unittest.TestCase):
665 """Test case for RenameFile"""
668 """Create a temporary directory"""
669 self.tmpdir = tempfile.mkdtemp()
670 self.tmpfile = os.path.join(self.tmpdir, "test1")
673 open(self.tmpfile, "w").close()
676 """Remove temporary directory"""
677 shutil.rmtree(self.tmpdir)
679 def testSimpleRename1(self):
680 """Simple rename 1"""
681 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
682 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
684 def testSimpleRename2(self):
685 """Simple rename 2"""
686 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
688 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
690 def testRenameMkdir(self):
691 """Rename with mkdir"""
692 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
694 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
695 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
697 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
698 os.path.join(self.tmpdir, "test/foo/bar/baz"),
700 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
701 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
702 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
705 class TestMatchNameComponent(unittest.TestCase):
706 """Test case for the MatchNameComponent function"""
708 def testEmptyList(self):
709 """Test that there is no match against an empty list"""
711 self.failUnlessEqual(MatchNameComponent("", []), None)
712 self.failUnlessEqual(MatchNameComponent("test", []), None)
714 def testSingleMatch(self):
715 """Test that a single match is performed correctly"""
716 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
717 for key in "test2", "test2.example", "test2.example.com":
718 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
720 def testMultipleMatches(self):
721 """Test that a multiple match is returned as None"""
722 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
723 for key in "test1", "test1.example":
724 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
726 def testFullMatch(self):
727 """Test that a full match is returned correctly"""
729 key2 = "test1.example"
730 mlist = [key2, key2 + ".com"]
731 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
732 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
734 def testCaseInsensitivePartialMatch(self):
735 """Test for the case_insensitive keyword"""
736 mlist = ["test1.example.com", "test2.example.net"]
737 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
739 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
741 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
743 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
747 def testCaseInsensitiveFullMatch(self):
748 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
749 # Between the two ts1 a full string match non-case insensitive should work
750 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
752 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
754 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
756 # Between the two ts2 only case differs, so only case-match works
757 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
759 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
761 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
765 class TestReadFile(testutils.GanetiTestCase):
767 def testReadAll(self):
768 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
769 self.assertEqual(len(data), 814)
771 h = compat.md5_hash()
773 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
775 def testReadSize(self):
776 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
778 self.assertEqual(len(data), 100)
780 h = compat.md5_hash()
782 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
785 self.assertRaises(EnvironmentError, utils.ReadFile,
786 "/dev/null/does-not-exist")
789 class TestReadOneLineFile(testutils.GanetiTestCase):
792 testutils.GanetiTestCase.setUp(self)
794 def testDefault(self):
795 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
796 self.assertEqual(len(data), 27)
797 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
799 def testNotStrict(self):
800 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
801 self.assertEqual(len(data), 27)
802 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
804 def testStrictFailure(self):
805 self.assertRaises(errors.GenericError, ReadOneLineFile,
806 self._TestDataFilename("cert1.pem"), strict=True)
808 def testLongLine(self):
809 dummydata = (1024 * "Hello World! ")
810 myfile = self._CreateTempFile()
811 utils.WriteFile(myfile, data=dummydata)
812 datastrict = ReadOneLineFile(myfile, strict=True)
813 datalax = ReadOneLineFile(myfile, strict=False)
814 self.assertEqual(dummydata, datastrict)
815 self.assertEqual(dummydata, datalax)
817 def testNewline(self):
818 myfile = self._CreateTempFile()
820 for nl in ["", "\n", "\r\n"]:
821 dummydata = "%s%s" % (myline, nl)
822 utils.WriteFile(myfile, data=dummydata)
823 datalax = ReadOneLineFile(myfile, strict=False)
824 self.assertEqual(myline, datalax)
825 datastrict = ReadOneLineFile(myfile, strict=True)
826 self.assertEqual(myline, datastrict)
828 def testWhitespaceAndMultipleLines(self):
829 myfile = self._CreateTempFile()
830 for nl in ["", "\n", "\r\n"]:
831 for ws in [" ", "\t", "\t\t \t", "\t "]:
832 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
833 utils.WriteFile(myfile, data=dummydata)
834 datalax = ReadOneLineFile(myfile, strict=False)
836 self.assert_(set("\r\n") & set(dummydata))
837 self.assertRaises(errors.GenericError, ReadOneLineFile,
839 explen = len("Foo bar baz ") + len(ws)
840 self.assertEqual(len(datalax), explen)
841 self.assertEqual(datalax, dummydata[:explen])
842 self.assertFalse(set("\r\n") & set(datalax))
844 datastrict = ReadOneLineFile(myfile, strict=True)
845 self.assertEqual(dummydata, datastrict)
846 self.assertEqual(dummydata, datalax)
848 def testEmptylines(self):
849 myfile = self._CreateTempFile()
851 for nl in ["\n", "\r\n"]:
852 for ol in ["", "otherline"]:
853 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
854 utils.WriteFile(myfile, data=dummydata)
855 self.assert_(set("\r\n") & set(dummydata))
856 datalax = ReadOneLineFile(myfile, strict=False)
857 self.assertEqual(myline, datalax)
859 self.assertRaises(errors.GenericError, ReadOneLineFile,
862 datastrict = ReadOneLineFile(myfile, strict=True)
863 self.assertEqual(myline, datastrict)
866 class TestTimestampForFilename(unittest.TestCase):
868 self.assert_("." not in utils.TimestampForFilename())
869 self.assert_(":" not in utils.TimestampForFilename())
872 class TestCreateBackup(testutils.GanetiTestCase):
874 testutils.GanetiTestCase.setUp(self)
876 self.tmpdir = tempfile.mkdtemp()
879 testutils.GanetiTestCase.tearDown(self)
881 shutil.rmtree(self.tmpdir)
884 filename = utils.PathJoin(self.tmpdir, "config.data")
885 utils.WriteFile(filename, data="")
886 bname = utils.CreateBackup(filename)
887 self.assertFileContent(bname, "")
888 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
889 utils.CreateBackup(filename)
890 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
891 utils.CreateBackup(filename)
892 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
894 fifoname = utils.PathJoin(self.tmpdir, "fifo")
896 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
898 def testContent(self):
900 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
901 for rep in [1, 2, 10, 127]:
902 testdata = data * rep
904 filename = utils.PathJoin(self.tmpdir, "test.data_")
905 utils.WriteFile(filename, data=testdata)
906 self.assertFileContent(filename, testdata)
909 bname = utils.CreateBackup(filename)
911 self.assertFileContent(bname, testdata)
912 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
915 class TestFormatUnit(unittest.TestCase):
916 """Test case for the FormatUnit function"""
919 self.assertEqual(FormatUnit(1, 'h'), '1M')
920 self.assertEqual(FormatUnit(100, 'h'), '100M')
921 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
923 self.assertEqual(FormatUnit(1, 'm'), '1')
924 self.assertEqual(FormatUnit(100, 'm'), '100')
925 self.assertEqual(FormatUnit(1023, 'm'), '1023')
927 self.assertEqual(FormatUnit(1024, 'm'), '1024')
928 self.assertEqual(FormatUnit(1536, 'm'), '1536')
929 self.assertEqual(FormatUnit(17133, 'm'), '17133')
930 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
933 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
934 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
935 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
936 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
938 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
939 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
940 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
941 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
943 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
944 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
945 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
948 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
949 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
950 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
952 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
953 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
954 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
956 class TestParseUnit(unittest.TestCase):
957 """Test case for the ParseUnit function"""
960 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
961 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
962 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
964 def testRounding(self):
965 self.assertEqual(ParseUnit('0'), 0)
966 self.assertEqual(ParseUnit('1'), 4)
967 self.assertEqual(ParseUnit('2'), 4)
968 self.assertEqual(ParseUnit('3'), 4)
970 self.assertEqual(ParseUnit('124'), 124)
971 self.assertEqual(ParseUnit('125'), 128)
972 self.assertEqual(ParseUnit('126'), 128)
973 self.assertEqual(ParseUnit('127'), 128)
974 self.assertEqual(ParseUnit('128'), 128)
975 self.assertEqual(ParseUnit('129'), 132)
976 self.assertEqual(ParseUnit('130'), 132)
978 def testFloating(self):
979 self.assertEqual(ParseUnit('0'), 0)
980 self.assertEqual(ParseUnit('0.5'), 4)
981 self.assertEqual(ParseUnit('1.75'), 4)
982 self.assertEqual(ParseUnit('1.99'), 4)
983 self.assertEqual(ParseUnit('2.00'), 4)
984 self.assertEqual(ParseUnit('2.01'), 4)
985 self.assertEqual(ParseUnit('3.99'), 4)
986 self.assertEqual(ParseUnit('4.00'), 4)
987 self.assertEqual(ParseUnit('4.01'), 8)
988 self.assertEqual(ParseUnit('1.5G'), 1536)
989 self.assertEqual(ParseUnit('1.8G'), 1844)
990 self.assertEqual(ParseUnit('8.28T'), 8682212)
992 def testSuffixes(self):
993 for sep in ('', ' ', ' ', "\t", "\t "):
994 for suffix, scale in TestParseUnit.SCALES:
995 for func in (lambda x: x, str.lower, str.upper):
996 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
999 def testInvalidInput(self):
1000 for sep in ('-', '_', ',', 'a'):
1001 for suffix, _ in TestParseUnit.SCALES:
1002 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
1004 for suffix, _ in TestParseUnit.SCALES:
1005 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
1008 class TestSshKeys(testutils.GanetiTestCase):
1009 """Test case for the AddAuthorizedKey function"""
1011 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1012 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
1013 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1016 testutils.GanetiTestCase.setUp(self)
1017 self.tmpname = self._CreateTempFile()
1018 handle = open(self.tmpname, 'w')
1020 handle.write("%s\n" % TestSshKeys.KEY_A)
1021 handle.write("%s\n" % TestSshKeys.KEY_B)
1025 def testAddingNewKey(self):
1026 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1028 self.assertFileContent(self.tmpname,
1029 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1030 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1031 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1032 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1034 def testAddingAlmostButNotCompletelyTheSameKey(self):
1035 AddAuthorizedKey(self.tmpname,
1036 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1038 self.assertFileContent(self.tmpname,
1039 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1040 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1041 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1042 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1044 def testAddingExistingKeyWithSomeMoreSpaces(self):
1045 AddAuthorizedKey(self.tmpname,
1046 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1048 self.assertFileContent(self.tmpname,
1049 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1050 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1051 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1053 def testRemovingExistingKeyWithSomeMoreSpaces(self):
1054 RemoveAuthorizedKey(self.tmpname,
1055 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1057 self.assertFileContent(self.tmpname,
1058 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1059 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1061 def testRemovingNonExistingKey(self):
1062 RemoveAuthorizedKey(self.tmpname,
1063 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
1065 self.assertFileContent(self.tmpname,
1066 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1067 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1068 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1071 class TestEtcHosts(testutils.GanetiTestCase):
1072 """Test functions modifying /etc/hosts"""
1075 testutils.GanetiTestCase.setUp(self)
1076 self.tmpname = self._CreateTempFile()
1077 handle = open(self.tmpname, 'w')
1079 handle.write('# This is a test file for /etc/hosts\n')
1080 handle.write('127.0.0.1\tlocalhost\n')
1081 handle.write('192.168.1.1 router gw\n')
1085 def testSettingNewIp(self):
1086 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
1088 self.assertFileContent(self.tmpname,
1089 "# This is a test file for /etc/hosts\n"
1090 "127.0.0.1\tlocalhost\n"
1091 "192.168.1.1 router gw\n"
1092 "1.2.3.4\tmyhost.domain.tld myhost\n")
1093 self.assertFileMode(self.tmpname, 0644)
1095 def testSettingExistingIp(self):
1096 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
1099 self.assertFileContent(self.tmpname,
1100 "# This is a test file for /etc/hosts\n"
1101 "127.0.0.1\tlocalhost\n"
1102 "192.168.1.1\tmyhost.domain.tld myhost\n")
1103 self.assertFileMode(self.tmpname, 0644)
1105 def testSettingDuplicateName(self):
1106 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
1108 self.assertFileContent(self.tmpname,
1109 "# This is a test file for /etc/hosts\n"
1110 "127.0.0.1\tlocalhost\n"
1111 "192.168.1.1 router gw\n"
1112 "1.2.3.4\tmyhost\n")
1113 self.assertFileMode(self.tmpname, 0644)
1115 def testRemovingExistingHost(self):
1116 RemoveEtcHostsEntry(self.tmpname, 'router')
1118 self.assertFileContent(self.tmpname,
1119 "# This is a test file for /etc/hosts\n"
1120 "127.0.0.1\tlocalhost\n"
1122 self.assertFileMode(self.tmpname, 0644)
1124 def testRemovingSingleExistingHost(self):
1125 RemoveEtcHostsEntry(self.tmpname, 'localhost')
1127 self.assertFileContent(self.tmpname,
1128 "# This is a test file for /etc/hosts\n"
1129 "192.168.1.1 router gw\n")
1130 self.assertFileMode(self.tmpname, 0644)
1132 def testRemovingNonExistingHost(self):
1133 RemoveEtcHostsEntry(self.tmpname, 'myhost')
1135 self.assertFileContent(self.tmpname,
1136 "# This is a test file for /etc/hosts\n"
1137 "127.0.0.1\tlocalhost\n"
1138 "192.168.1.1 router gw\n")
1139 self.assertFileMode(self.tmpname, 0644)
1141 def testRemovingAlias(self):
1142 RemoveEtcHostsEntry(self.tmpname, 'gw')
1144 self.assertFileContent(self.tmpname,
1145 "# This is a test file for /etc/hosts\n"
1146 "127.0.0.1\tlocalhost\n"
1147 "192.168.1.1 router\n")
1148 self.assertFileMode(self.tmpname, 0644)
1151 class TestShellQuoting(unittest.TestCase):
1152 """Test case for shell quoting functions"""
1154 def testShellQuote(self):
1155 self.assertEqual(ShellQuote('abc'), "abc")
1156 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1157 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1158 self.assertEqual(ShellQuote("a b c"), "'a b c'")
1159 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1161 def testShellQuoteArgs(self):
1162 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1163 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1164 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1167 class TestTcpPing(unittest.TestCase):
1168 """Testcase for TCP version of ping - against listen(2)ing port"""
1171 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1172 self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
1173 self.listenerport = self.listener.getsockname()[1]
1174 self.listener.listen(1)
1177 self.listener.shutdown(socket.SHUT_RDWR)
1179 del self.listenerport
1181 def testTcpPingToLocalHostAccept(self):
1182 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1185 live_port_needed=True,
1186 source=constants.LOCALHOST_IP_ADDRESS,
1188 "failed to connect to test listener")
1190 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1193 live_port_needed=True,
1195 "failed to connect to test listener (no source)")
1198 class TestTcpPingDeaf(unittest.TestCase):
1199 """Testcase for TCP version of ping - against non listen(2)ing port"""
1202 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1203 self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
1204 self.deaflistenerport = self.deaflistener.getsockname()[1]
1207 del self.deaflistener
1208 del self.deaflistenerport
1210 def testTcpPingToLocalHostAcceptDeaf(self):
1211 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1212 self.deaflistenerport,
1213 timeout=constants.TCP_PING_TIMEOUT,
1214 live_port_needed=True,
1215 source=constants.LOCALHOST_IP_ADDRESS,
1216 ), # need successful connect(2)
1217 "successfully connected to deaf listener")
1219 self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1220 self.deaflistenerport,
1221 timeout=constants.TCP_PING_TIMEOUT,
1222 live_port_needed=True,
1223 ), # need successful connect(2)
1224 "successfully connected to deaf listener (no source addr)")
1226 def testTcpPingToLocalHostNoAccept(self):
1227 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1228 self.deaflistenerport,
1229 timeout=constants.TCP_PING_TIMEOUT,
1230 live_port_needed=False,
1231 source=constants.LOCALHOST_IP_ADDRESS,
1232 ), # ECONNREFUSED is OK
1233 "failed to ping alive host on deaf port")
1235 self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
1236 self.deaflistenerport,
1237 timeout=constants.TCP_PING_TIMEOUT,
1238 live_port_needed=False,
1239 ), # ECONNREFUSED is OK
1240 "failed to ping alive host on deaf port (no source addr)")
1243 class TestOwnIpAddress(unittest.TestCase):
1244 """Testcase for OwnIpAddress"""
1246 def testOwnLoopback(self):
1247 """check having the loopback ip"""
1248 self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
1249 "Should own the loopback address")
1251 def testNowOwnAddress(self):
1252 """check that I don't own an address"""
1254 # Network 192.0.2.0/24 is reserved for test/documentation as per
1255 # RFC 5735, so we *should* not have an address of this range... if
1256 # this fails, we should extend the test to multiple addresses
1257 DST_IP = "192.0.2.1"
1258 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
1261 def _GetSocketCredentials(path):
1262 """Connect to a Unix socket and return remote credentials.
1265 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1269 return utils.GetSocketCredentials(sock)
1274 class TestGetSocketCredentials(unittest.TestCase):
1276 self.tmpdir = tempfile.mkdtemp()
1277 self.sockpath = utils.PathJoin(self.tmpdir, "sock")
1279 self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1280 self.listener.settimeout(10)
1281 self.listener.bind(self.sockpath)
1282 self.listener.listen(1)
1285 self.listener.shutdown(socket.SHUT_RDWR)
1286 self.listener.close()
1287 shutil.rmtree(self.tmpdir)
1290 (c2pr, c2pw) = os.pipe()
1292 # Start child process
1296 data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
1298 os.write(c2pw, data)
1307 # Wait for one connection
1308 (conn, _) = self.listener.accept()
1313 result = os.read(c2pr, 4096)
1316 # Check child's exit code
1317 (_, status) = os.waitpid(child, 0)
1318 self.assertFalse(os.WIFSIGNALED(status))
1319 self.assertEqual(os.WEXITSTATUS(status), 0)
1322 (pid, uid, gid) = serializer.LoadJson(result)
1323 self.assertEqual(pid, os.getpid())
1324 self.assertEqual(uid, os.getuid())
1325 self.assertEqual(gid, os.getgid())
1328 class TestListVisibleFiles(unittest.TestCase):
1329 """Test case for ListVisibleFiles"""
1332 self.path = tempfile.mkdtemp()
1335 shutil.rmtree(self.path)
1337 def _test(self, files, expected):
1339 expected = expected[:]
1343 f = open(os.path.join(self.path, name), 'w')
1349 found = ListVisibleFiles(self.path)
1352 self.assertEqual(found, expected)
1354 def testAllVisible(self):
1355 files = ["a", "b", "c"]
1357 self._test(files, expected)
1359 def testNoneVisible(self):
1360 files = [".a", ".b", ".c"]
1362 self._test(files, expected)
1364 def testSomeVisible(self):
1365 files = ["a", "b", ".c"]
1366 expected = ["a", "b"]
1367 self._test(files, expected)
1369 def testNonAbsolutePath(self):
1370 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1372 def testNonNormalizedPath(self):
1373 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1377 class TestNewUUID(unittest.TestCase):
1378 """Test case for NewUUID"""
1380 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1381 '[a-f0-9]{4}-[a-f0-9]{12}$')
1384 self.failUnless(self._re_uuid.match(utils.NewUUID()))
1387 class TestUniqueSequence(unittest.TestCase):
1388 """Test case for UniqueSequence"""
1390 def _test(self, input, expected):
1391 self.assertEqual(utils.UniqueSequence(input), expected)
1395 self._test([1, 2, 3], [1, 2, 3])
1396 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1397 self._test([1, 2, 2, 3], [1, 2, 3])
1398 self._test([1, 2, 3, 3], [1, 2, 3])
1401 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1402 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1405 self._test(["a", "a"], ["a"])
1406 self._test(["a", "b"], ["a", "b"])
1407 self._test(["a", "b", "a"], ["a", "b"])
1410 class TestFirstFree(unittest.TestCase):
1411 """Test case for the FirstFree function"""
1414 """Test FirstFree"""
1415 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1416 self.failUnlessEqual(FirstFree([]), None)
1417 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1418 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1419 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1422 class TestTailFile(testutils.GanetiTestCase):
1423 """Test case for the TailFile function"""
1425 def testEmpty(self):
1426 fname = self._CreateTempFile()
1427 self.failUnlessEqual(TailFile(fname), [])
1428 self.failUnlessEqual(TailFile(fname, lines=25), [])
1430 def testAllLines(self):
1431 data = ["test %d" % i for i in range(30)]
1433 fname = self._CreateTempFile()
1434 fd = open(fname, "w")
1435 fd.write("\n".join(data[:i]))
1439 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1441 def testPartialLines(self):
1442 data = ["test %d" % i for i in range(30)]
1443 fname = self._CreateTempFile()
1444 fd = open(fname, "w")
1445 fd.write("\n".join(data))
1448 for i in range(1, 30):
1449 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1451 def testBigFile(self):
1452 data = ["test %d" % i for i in range(30)]
1453 fname = self._CreateTempFile()
1454 fd = open(fname, "w")
1455 fd.write("X" * 1048576)
1457 fd.write("\n".join(data))
1460 for i in range(1, 30):
1461 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1464 class _BaseFileLockTest:
1465 """Test case for the FileLock class"""
1467 def testSharedNonblocking(self):
1468 self.lock.Shared(blocking=False)
1471 def testExclusiveNonblocking(self):
1472 self.lock.Exclusive(blocking=False)
1475 def testUnlockNonblocking(self):
1476 self.lock.Unlock(blocking=False)
1479 def testSharedBlocking(self):
1480 self.lock.Shared(blocking=True)
1483 def testExclusiveBlocking(self):
1484 self.lock.Exclusive(blocking=True)
1487 def testUnlockBlocking(self):
1488 self.lock.Unlock(blocking=True)
1491 def testSharedExclusiveUnlock(self):
1492 self.lock.Shared(blocking=False)
1493 self.lock.Exclusive(blocking=False)
1494 self.lock.Unlock(blocking=False)
1497 def testExclusiveSharedUnlock(self):
1498 self.lock.Exclusive(blocking=False)
1499 self.lock.Shared(blocking=False)
1500 self.lock.Unlock(blocking=False)
1503 def testSimpleTimeout(self):
1504 # These will succeed on the first attempt, hence a short timeout
1505 self.lock.Shared(blocking=True, timeout=10.0)
1506 self.lock.Exclusive(blocking=False, timeout=10.0)
1507 self.lock.Unlock(blocking=True, timeout=10.0)
1511 def _TryLockInner(filename, shared, blocking):
1512 lock = utils.FileLock.Open(filename)
1520 # The timeout doesn't really matter as the parent process waits for us to
1522 fn(blocking=blocking, timeout=0.01)
1523 except errors.LockError, err:
1528 def _TryLock(self, *args):
1529 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1532 def testTimeout(self):
1533 for blocking in [True, False]:
1534 self.lock.Exclusive(blocking=True)
1535 self.failIf(self._TryLock(False, blocking))
1536 self.failIf(self._TryLock(True, blocking))
1538 self.lock.Shared(blocking=True)
1539 self.assert_(self._TryLock(True, blocking))
1540 self.failIf(self._TryLock(False, blocking))
1542 def testCloseShared(self):
1544 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1546 def testCloseExclusive(self):
1548 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1550 def testCloseUnlock(self):
1552 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1555 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1556 TESTDATA = "Hello World\n" * 10
1559 testutils.GanetiTestCase.setUp(self)
1561 self.tmpfile = tempfile.NamedTemporaryFile()
1562 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1563 self.lock = utils.FileLock.Open(self.tmpfile.name)
1565 # Ensure "Open" didn't truncate file
1566 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1569 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1571 testutils.GanetiTestCase.tearDown(self)
1574 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1576 self.tmpfile = tempfile.NamedTemporaryFile()
1577 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1580 class TestTimeFunctions(unittest.TestCase):
1581 """Test case for time functions"""
1584 self.assertEqual(utils.SplitTime(1), (1, 0))
1585 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1586 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1587 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1588 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1589 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1590 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1591 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1593 self.assertRaises(AssertionError, utils.SplitTime, -1)
1595 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1596 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1597 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1599 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1601 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1603 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1604 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1605 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1606 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1607 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1610 class FieldSetTestCase(unittest.TestCase):
1611 """Test case for FieldSets"""
1613 def testSimpleMatch(self):
1614 f = utils.FieldSet("a", "b", "c", "def")
1615 self.failUnless(f.Matches("a"))
1616 self.failIf(f.Matches("d"), "Substring matched")
1617 self.failIf(f.Matches("defghi"), "Prefix string matched")
1618 self.failIf(f.NonMatching(["b", "c"]))
1619 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1620 self.failUnless(f.NonMatching(["a", "d"]))
1622 def testRegexMatch(self):
1623 f = utils.FieldSet("a", "b([0-9]+)", "c")
1624 self.failUnless(f.Matches("b1"))
1625 self.failUnless(f.Matches("b99"))
1626 self.failIf(f.Matches("b/1"))
1627 self.failIf(f.NonMatching(["b12", "c"]))
1628 self.failUnless(f.NonMatching(["a", "1"]))
1630 class TestForceDictType(unittest.TestCase):
1631 """Test case for ForceDictType"""
1635 'a': constants.VTYPE_INT,
1636 'b': constants.VTYPE_BOOL,
1637 'c': constants.VTYPE_STRING,
1638 'd': constants.VTYPE_SIZE,
1641 def _fdt(self, dict, allowed_values=None):
1642 if allowed_values is None:
1643 ForceDictType(dict, self.key_types)
1645 ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1649 def testSimpleDict(self):
1650 self.assertEqual(self._fdt({}), {})
1651 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1652 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1653 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1654 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1655 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1656 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1657 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1658 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1659 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1660 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1661 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1663 def testErrors(self):
1664 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1665 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1666 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1667 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1670 class TestIsNormAbsPath(unittest.TestCase):
1671 """Testing case for IsNormAbsPath"""
1673 def _pathTestHelper(self, path, result):
1675 self.assert_(IsNormAbsPath(path),
1676 "Path %s should result absolute and normalized" % path)
1678 self.assert_(not IsNormAbsPath(path),
1679 "Path %s should not result absolute and normalized" % path)
1682 self._pathTestHelper('/etc', True)
1683 self._pathTestHelper('/srv', True)
1684 self._pathTestHelper('etc', False)
1685 self._pathTestHelper('/etc/../root', False)
1686 self._pathTestHelper('/etc/', False)
1689 class TestSafeEncode(unittest.TestCase):
1690 """Test case for SafeEncode"""
1692 def testAscii(self):
1693 for txt in [string.digits, string.letters, string.punctuation]:
1694 self.failUnlessEqual(txt, SafeEncode(txt))
1696 def testDoubleEncode(self):
1697 for i in range(255):
1698 txt = SafeEncode(chr(i))
1699 self.failUnlessEqual(txt, SafeEncode(txt))
1701 def testUnicode(self):
1702 # 1024 is high enough to catch non-direct ASCII mappings
1703 for i in range(1024):
1704 txt = SafeEncode(unichr(i))
1705 self.failUnlessEqual(txt, SafeEncode(txt))
1708 class TestFormatTime(unittest.TestCase):
1709 """Testing case for FormatTime"""
1712 self.failUnlessEqual(FormatTime(None), "N/A")
1714 def testInvalid(self):
1715 self.failUnlessEqual(FormatTime(()), "N/A")
1718 # tests that we accept time.time input
1719 FormatTime(time.time())
1720 # tests that we accept int input
1721 FormatTime(int(time.time()))
1724 class RunInSeparateProcess(unittest.TestCase):
1726 for exp in [True, False]:
1730 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1733 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1734 def _child(carg1, carg2):
1735 return carg1 == "Foo" and carg2 == arg
1737 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1740 parent_pid = os.getpid()
1743 return os.getpid() == parent_pid
1745 self.failIf(utils.RunInSeparateProcess(_check))
1747 def testSignal(self):
1749 os.kill(os.getpid(), signal.SIGTERM)
1751 self.assertRaises(errors.GenericError,
1752 utils.RunInSeparateProcess, _kill)
1754 def testException(self):
1756 raise errors.GenericError("This is a test")
1758 self.assertRaises(errors.GenericError,
1759 utils.RunInSeparateProcess, _exc)
1762 class TestFingerprintFile(unittest.TestCase):
1764 self.tmpfile = tempfile.NamedTemporaryFile()
1767 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1768 "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1770 utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1771 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1772 "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1775 class TestUnescapeAndSplit(unittest.TestCase):
1776 """Testing case for UnescapeAndSplit"""
1779 # testing more that one separator for regexp safety
1780 self._seps = [",", "+", "."]
1782 def testSimple(self):
1783 a = ["a", "b", "c", "d"]
1784 for sep in self._seps:
1785 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1787 def testEscape(self):
1788 for sep in self._seps:
1789 a = ["a", "b\\" + sep + "c", "d"]
1790 b = ["a", "b" + sep + "c", "d"]
1791 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1793 def testDoubleEscape(self):
1794 for sep in self._seps:
1795 a = ["a", "b\\\\", "c", "d"]
1796 b = ["a", "b\\", "c", "d"]
1797 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1799 def testThreeEscape(self):
1800 for sep in self._seps:
1801 a = ["a", "b\\\\\\" + sep + "c", "d"]
1802 b = ["a", "b\\" + sep + "c", "d"]
1803 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1806 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1808 self.tmpdir = tempfile.mkdtemp()
1811 shutil.rmtree(self.tmpdir)
1813 def _checkRsaPrivateKey(self, key):
1814 lines = key.splitlines()
1815 return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1816 "-----END RSA PRIVATE KEY-----" in lines)
1818 def _checkCertificate(self, cert):
1819 lines = cert.splitlines()
1820 return ("-----BEGIN CERTIFICATE-----" in lines and
1821 "-----END CERTIFICATE-----" in lines)
1824 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1825 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1826 self._checkRsaPrivateKey(key_pem)
1827 self._checkCertificate(cert_pem)
1829 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1831 self.assert_(key.bits() >= 1024)
1832 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1833 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1835 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1837 self.failIf(x509.has_expired())
1838 self.assertEqual(x509.get_issuer().CN, common_name)
1839 self.assertEqual(x509.get_subject().CN, common_name)
1840 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1842 def testLegacy(self):
1843 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1845 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1847 cert1 = utils.ReadFile(cert1_filename)
1849 self.assert_(self._checkRsaPrivateKey(cert1))
1850 self.assert_(self._checkCertificate(cert1))
1853 class TestPathJoin(unittest.TestCase):
1854 """Testing case for PathJoin"""
1856 def testBasicItems(self):
1857 mlist = ["/a", "b", "c"]
1858 self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1860 def testNonAbsPrefix(self):
1861 self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1863 def testBackTrack(self):
1864 self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1866 def testMultiAbs(self):
1867 self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1870 class TestHostInfo(unittest.TestCase):
1871 """Testing case for HostInfo"""
1873 def testUppercase(self):
1874 data = "AbC.example.com"
1875 self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1877 def testTooLongName(self):
1878 data = "a.b." + "c" * 255
1879 self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1881 def testTrailingDot(self):
1883 self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1885 def testInvalidName(self):
1893 self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1895 def testValidName(self):
1903 HostInfo.NormalizeName(value)
1906 class TestParseAsn1Generalizedtime(unittest.TestCase):
1909 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1910 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1912 self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1916 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1918 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1920 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1922 self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1924 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1927 # Leap seconds are not supported by datetime.datetime
1928 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1929 "19841231235960+0000")
1930 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1931 "19920630235960+0000")
1934 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1935 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1936 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1938 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1939 "Mon Feb 22 17:47:02 UTC 2010")
1940 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1941 "2010-02-22 17:42:02")
1944 class TestGetX509CertValidity(testutils.GanetiTestCase):
1946 testutils.GanetiTestCase.setUp(self)
1948 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1950 # Test whether we have pyOpenSSL 0.7 or above
1951 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1953 if not self.pyopenssl0_7:
1954 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1955 " function correctly")
1957 def _LoadCert(self, name):
1958 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1959 self._ReadTestData(name))
1962 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1963 if self.pyopenssl0_7:
1964 self.assertEqual(validity, (1266919967, 1267524767))
1966 self.assertEqual(validity, (None, None))
1969 class TestSignX509Certificate(unittest.TestCase):
1970 KEY = "My private key!"
1971 KEY_OTHER = "Another key"
1974 # Generate certificate valid for 5 minutes
1975 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1977 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1980 # No signature at all
1981 self.assertRaises(errors.GenericError,
1982 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1985 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1987 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1988 "X-Ganeti-Signature: \n", self.KEY)
1989 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1990 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1991 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1992 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1993 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1994 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1997 for salt in list("-_@$,:;/\\ \t\n"):
1998 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1999 cert_pem, self.KEY, "foo%sbar" % salt)
2001 for salt in ["HelloWorld", "salt", string.letters, string.digits,
2002 utils.GenerateSecret(numbytes=4),
2003 utils.GenerateSecret(numbytes=16),
2004 "{123:456}".encode("hex")]:
2005 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
2007 self._Check(cert, salt, signed_pem)
2009 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
2010 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
2011 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
2012 "lines----\n------ at\nthe end!"))
2014 def _Check(self, cert, salt, pem):
2015 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
2016 self.assertEqual(salt, salt2)
2017 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
2020 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2021 pem, self.KEY_OTHER)
2024 class TestMakedirs(unittest.TestCase):
2026 self.tmpdir = tempfile.mkdtemp()
2029 shutil.rmtree(self.tmpdir)
2031 def testNonExisting(self):
2032 path = utils.PathJoin(self.tmpdir, "foo")
2033 utils.Makedirs(path)
2034 self.assert_(os.path.isdir(path))
2036 def testExisting(self):
2037 path = utils.PathJoin(self.tmpdir, "foo")
2039 utils.Makedirs(path)
2040 self.assert_(os.path.isdir(path))
2042 def testRecursiveNonExisting(self):
2043 path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
2044 utils.Makedirs(path)
2045 self.assert_(os.path.isdir(path))
2047 def testRecursiveExisting(self):
2048 path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
2049 self.assert_(not os.path.exists(path))
2050 os.mkdir(utils.PathJoin(self.tmpdir, "B"))
2051 utils.Makedirs(path)
2052 self.assert_(os.path.isdir(path))
2055 class TestRetry(testutils.GanetiTestCase):
2057 testutils.GanetiTestCase.setUp(self)
2061 def _RaiseRetryAgain():
2062 raise utils.RetryAgain()
2065 def _RaiseRetryAgainWithArg(args):
2066 raise utils.RetryAgain(*args)
2068 def _WrongNestedLoop(self):
2069 return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
2071 def _RetryAndSucceed(self, retries):
2072 if self.retries < retries:
2074 raise utils.RetryAgain()
2078 def testRaiseTimeout(self):
2079 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2080 self._RaiseRetryAgain, 0.01, 0.02)
2081 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2082 self._RetryAndSucceed, 0.01, 0, args=[1])
2083 self.failUnlessEqual(self.retries, 1)
2085 def testComplete(self):
2086 self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
2087 self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
2089 self.failUnlessEqual(self.retries, 2)
2091 def testNestedLoop(self):
2093 self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
2094 self._WrongNestedLoop, 0, 1)
2095 except utils.RetryTimeout:
2096 self.fail("Didn't detect inner loop's exception")
2098 def testTimeoutArgument(self):
2099 retry_arg="my_important_debugging_message"
2101 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
2102 except utils.RetryTimeout, err:
2103 self.failUnlessEqual(err.args, (retry_arg, ))
2105 self.fail("Expected timeout didn't happen")
2107 def testRaiseInnerWithExc(self):
2108 retry_arg="my_important_debugging_message"
2111 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2112 args=[[errors.GenericError(retry_arg, retry_arg)]])
2113 except utils.RetryTimeout, err:
2116 self.fail("Expected timeout didn't happen")
2117 except errors.GenericError, err:
2118 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2120 self.fail("Expected GenericError didn't happen")
2122 def testRaiseInnerWithMsg(self):
2123 retry_arg="my_important_debugging_message"
2126 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2127 args=[[retry_arg, retry_arg]])
2128 except utils.RetryTimeout, err:
2131 self.fail("Expected timeout didn't happen")
2132 except utils.RetryTimeout, err:
2133 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2135 self.fail("Expected RetryTimeout didn't happen")
2138 class TestLineSplitter(unittest.TestCase):
2141 ls = utils.LineSplitter(lines.append)
2142 ls.write("Hello World\n")
2143 self.assertEqual(lines, [])
2144 ls.write("Foo\n Bar\r\n ")
2147 self.assertEqual(lines, [])
2149 self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
2151 self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
2153 def _testExtra(self, line, all_lines, p1, p2):
2154 self.assertEqual(p1, 999)
2155 self.assertEqual(p2, "extra")
2156 all_lines.append(line)
2158 def testExtraArgsNoFlush(self):
2160 ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
2161 ls.write("\n\nHello World\n")
2162 ls.write("Foo\n Bar\r\n ")
2165 ls.write("Moo\n\nx\n")
2166 self.assertEqual(lines, [])
2168 self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2172 class TestReadLockedPidFile(unittest.TestCase):
2174 self.tmpdir = tempfile.mkdtemp()
2177 shutil.rmtree(self.tmpdir)
2179 def testNonExistent(self):
2180 path = utils.PathJoin(self.tmpdir, "nonexist")
2181 self.assert_(utils.ReadLockedPidFile(path) is None)
2183 def testUnlocked(self):
2184 path = utils.PathJoin(self.tmpdir, "pid")
2185 utils.WriteFile(path, data="123")
2186 self.assert_(utils.ReadLockedPidFile(path) is None)
2188 def testLocked(self):
2189 path = utils.PathJoin(self.tmpdir, "pid")
2190 utils.WriteFile(path, data="123")
2192 fl = utils.FileLock.Open(path)
2194 fl.Exclusive(blocking=True)
2196 self.assertEqual(utils.ReadLockedPidFile(path), 123)
2200 self.assert_(utils.ReadLockedPidFile(path) is None)
2202 def testError(self):
2203 path = utils.PathJoin(self.tmpdir, "foobar", "pid")
2204 utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
2205 # open(2) should return ENOTDIR
2206 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2209 class TestCertVerification(testutils.GanetiTestCase):
2211 testutils.GanetiTestCase.setUp(self)
2213 self.tmpdir = tempfile.mkdtemp()
2216 shutil.rmtree(self.tmpdir)
2218 def testVerifyCertificate(self):
2219 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2220 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2223 # Not checking return value as this certificate is expired
2224 utils.VerifyX509Certificate(cert, 30, 7)
2227 class TestVerifyCertificateInner(unittest.TestCase):
2229 vci = utils._VerifyCertificateInner
2232 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2236 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2237 self.assertEqual(errcode, utils.CERT_WARNING)
2240 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2241 self.assertEqual(errcode, utils.CERT_ERROR)
2243 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2244 self.assertEqual(errcode, utils.CERT_WARNING)
2246 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2247 self.assertEqual(errcode, None)
2250 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2251 self.assertEqual(errcode, utils.CERT_ERROR)
2253 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2254 self.assertEqual(errcode, utils.CERT_ERROR)
2256 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2257 self.assertEqual(errcode, utils.CERT_ERROR)
2259 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2260 self.assertEqual(errcode, utils.CERT_ERROR)
2263 class TestHmacFunctions(unittest.TestCase):
2264 # Digests can be checked with "openssl sha1 -hmac $key"
2265 def testSha1Hmac(self):
2266 self.assertEqual(utils.Sha1Hmac("", ""),
2267 "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2268 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2269 "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2270 self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2271 "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2273 longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2274 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2275 "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2277 def testSha1HmacSalt(self):
2278 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2279 "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2280 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2281 "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2282 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2283 "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2285 def testVerifySha1Hmac(self):
2286 self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2287 "7d64b71fb76370690e1d")))
2288 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2289 ("f904c2476527c6d3e660"
2290 "9ab683c66fa0652cb1dc")))
2292 digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2293 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2294 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2296 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2298 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2301 def testVerifySha1HmacSalt(self):
2302 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2303 ("17a4adc34d69c0d367d4"
2304 "ffbef96fd41d4df7a6e8"),
2306 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2307 ("7f264f8114c9066afc9b"
2308 "b7636e1786d996d3cc0d"),
2312 class TestIgnoreSignals(unittest.TestCase):
2313 """Test the IgnoreSignals decorator"""
2316 def _Raise(exception):
2323 def testIgnoreSignals(self):
2324 sock_err_intr = socket.error(errno.EINTR, "Message")
2325 sock_err_inval = socket.error(errno.EINVAL, "Message")
2327 env_err_intr = EnvironmentError(errno.EINTR, "Message")
2328 env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2330 self.assertRaises(socket.error, self._Raise, sock_err_intr)
2331 self.assertRaises(socket.error, self._Raise, sock_err_inval)
2332 self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2333 self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2335 self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2336 self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2337 self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2339 self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2342 self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2343 self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2346 class TestEnsureDirs(unittest.TestCase):
2347 """Tests for EnsureDirs"""
2350 self.dir = tempfile.mkdtemp()
2351 self.old_umask = os.umask(0777)
2353 def testEnsureDirs(self):
2355 (utils.PathJoin(self.dir, "foo"), 0777),
2356 (utils.PathJoin(self.dir, "bar"), 0000),
2358 self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2359 self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2362 os.rmdir(utils.PathJoin(self.dir, "foo"))
2363 os.rmdir(utils.PathJoin(self.dir, "bar"))
2365 os.umask(self.old_umask)
2368 class TestFormatSeconds(unittest.TestCase):
2370 self.assertEqual(utils.FormatSeconds(1), "1s")
2371 self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2372 self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2373 self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2374 self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2375 self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2376 self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2377 self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2378 self.assertEqual(utils.FormatSeconds(-1), "-1s")
2379 self.assertEqual(utils.FormatSeconds(-282), "-282s")
2380 self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2382 def testFloat(self):
2383 self.assertEqual(utils.FormatSeconds(1.3), "1s")
2384 self.assertEqual(utils.FormatSeconds(1.9), "2s")
2385 self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2386 self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2389 if __name__ == '__main__':
2390 testutils.GanetiTestProgram()