4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
40 import distutils.version
46 from ganeti import constants
47 from ganeti import compat
48 from ganeti import utils
49 from ganeti import errors
50 from ganeti import serializer
51 from ganeti.utils import IsProcessAlive, RunCmd, \
52 RemoveFile, MatchNameComponent, FormatUnit, \
53 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
54 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
55 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
56 TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
57 UnescapeAndSplit, RunParts, PathJoin, HostInfo, ReadOneLineFile
59 from ganeti.errors import LockError, UnitParseError, GenericError, \
60 ProgrammerError, OpPrereqError
63 class TestIsProcessAlive(unittest.TestCase):
64 """Testing case for IsProcessAlive"""
68 self.assert_(IsProcessAlive(mypid),
69 "can't find myself running")
71 def testNotExisting(self):
72 pid_non_existing = os.fork()
73 if pid_non_existing == 0:
75 elif pid_non_existing < 0:
76 raise SystemError("can't fork")
77 os.waitpid(pid_non_existing, 0)
78 self.assertFalse(IsProcessAlive(pid_non_existing),
79 "nonexisting process detected")
82 class TestGetProcStatusPath(unittest.TestCase):
84 self.assert_("/1234/" in utils._GetProcStatusPath(1234))
85 self.assertNotEqual(utils._GetProcStatusPath(1),
86 utils._GetProcStatusPath(2))
89 class TestIsProcessHandlingSignal(unittest.TestCase):
91 self.tmpdir = tempfile.mkdtemp()
94 shutil.rmtree(self.tmpdir)
96 def testParseSigsetT(self):
97 self.assertEqual(len(utils._ParseSigsetT("0")), 0)
98 self.assertEqual(utils._ParseSigsetT("1"), set([1]))
99 self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
100 self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
101 self.assertEqual(utils._ParseSigsetT("0000000180000202"),
102 set([2, 10, 32, 33]))
103 self.assertEqual(utils._ParseSigsetT("0000000180000002"),
105 self.assertEqual(utils._ParseSigsetT("0000000188000002"),
106 set([2, 28, 32, 33]))
107 self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
108 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
109 24, 25, 26, 28, 31]))
110 self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
112 def testGetProcStatusField(self):
113 for field in ["SigCgt", "Name", "FDSize"]:
114 for value in ["", "0", "cat", " 1234 KB"]:
115 pstatus = "\n".join([
117 "%s: %s" % (field, value),
120 result = utils._GetProcStatusField(pstatus, field)
121 self.assertEqual(result, value.strip())
124 sp = utils.PathJoin(self.tmpdir, "status")
126 utils.WriteFile(sp, data="\n".join([
128 "State: S (sleeping)",
133 "SigBlk: 0000000000010000",
134 "SigIgn: 0000000000384004",
135 "SigCgt: 000000004b813efb",
136 "CapEff: 0000000000000000",
139 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
141 def testNoSigCgt(self):
142 sp = utils.PathJoin(self.tmpdir, "status")
144 utils.WriteFile(sp, data="\n".join([
148 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
149 1234, 10, status_path=sp)
151 def testNoSuchFile(self):
152 sp = utils.PathJoin(self.tmpdir, "notexist")
154 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
157 def _TestRealProcess():
158 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
159 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
160 raise Exception("SIGUSR1 is handled when it should not be")
162 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
163 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
164 raise Exception("SIGUSR1 is not handled when it should be")
166 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
167 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
168 raise Exception("SIGUSR1 is not handled when it should be")
170 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
171 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
172 raise Exception("SIGUSR1 is handled when it should not be")
176 def testRealProcess(self):
177 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
180 class TestPidFileFunctions(unittest.TestCase):
181 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
184 self.dir = tempfile.mkdtemp()
185 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
186 utils.DaemonPidFileName = self.f_dpn
188 def testPidFileFunctions(self):
189 pid_file = self.f_dpn('test')
190 utils.WritePidFile('test')
191 self.failUnless(os.path.exists(pid_file),
192 "PID file should have been created")
193 read_pid = utils.ReadPidFile(pid_file)
194 self.failUnlessEqual(read_pid, os.getpid())
195 self.failUnless(utils.IsProcessAlive(read_pid))
196 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
197 utils.RemovePidFile('test')
198 self.failIf(os.path.exists(pid_file),
199 "PID file should not exist anymore")
200 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
201 "ReadPidFile should return 0 for missing pid file")
202 fh = open(pid_file, "w")
205 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
206 "ReadPidFile should return 0 for invalid pid file")
207 utils.RemovePidFile('test')
208 self.failIf(os.path.exists(pid_file),
209 "PID file should not exist anymore")
212 pid_file = self.f_dpn('child')
213 r_fd, w_fd = os.pipe()
215 if new_pid == 0: #child
216 utils.WritePidFile('child')
221 # else we are in the parent
222 # wait until the child has written the pid file
224 read_pid = utils.ReadPidFile(pid_file)
225 self.failUnlessEqual(read_pid, new_pid)
226 self.failUnless(utils.IsProcessAlive(new_pid))
227 utils.KillProcess(new_pid, waitpid=True)
228 self.failIf(utils.IsProcessAlive(new_pid))
229 utils.RemovePidFile('child')
230 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
233 for name in os.listdir(self.dir):
234 os.unlink(os.path.join(self.dir, name))
238 class TestRunCmd(testutils.GanetiTestCase):
239 """Testing case for the RunCmd function"""
242 testutils.GanetiTestCase.setUp(self)
243 self.magic = time.ctime() + " ganeti test"
244 self.fname = self._CreateTempFile()
247 """Test successful exit code"""
248 result = RunCmd("/bin/sh -c 'exit 0'")
249 self.assertEqual(result.exit_code, 0)
250 self.assertEqual(result.output, "")
253 """Test fail exit code"""
254 result = RunCmd("/bin/sh -c 'exit 1'")
255 self.assertEqual(result.exit_code, 1)
256 self.assertEqual(result.output, "")
258 def testStdout(self):
259 """Test standard output"""
260 cmd = 'echo -n "%s"' % self.magic
261 result = RunCmd("/bin/sh -c '%s'" % cmd)
262 self.assertEqual(result.stdout, self.magic)
263 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
264 self.assertEqual(result.output, "")
265 self.assertFileContent(self.fname, self.magic)
267 def testStderr(self):
268 """Test standard error"""
269 cmd = 'echo -n "%s"' % self.magic
270 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
271 self.assertEqual(result.stderr, self.magic)
272 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
273 self.assertEqual(result.output, "")
274 self.assertFileContent(self.fname, self.magic)
276 def testCombined(self):
277 """Test combined output"""
278 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
279 expected = "A" + self.magic + "B" + self.magic
280 result = RunCmd("/bin/sh -c '%s'" % cmd)
281 self.assertEqual(result.output, expected)
282 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
283 self.assertEqual(result.output, "")
284 self.assertFileContent(self.fname, expected)
286 def testSignal(self):
288 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
289 self.assertEqual(result.signal, 15)
290 self.assertEqual(result.output, "")
292 def testListRun(self):
294 result = RunCmd(["true"])
295 self.assertEqual(result.signal, None)
296 self.assertEqual(result.exit_code, 0)
297 result = RunCmd(["/bin/sh", "-c", "exit 1"])
298 self.assertEqual(result.signal, None)
299 self.assertEqual(result.exit_code, 1)
300 result = RunCmd(["echo", "-n", self.magic])
301 self.assertEqual(result.signal, None)
302 self.assertEqual(result.exit_code, 0)
303 self.assertEqual(result.stdout, self.magic)
305 def testFileEmptyOutput(self):
306 """Test file output"""
307 result = RunCmd(["true"], output=self.fname)
308 self.assertEqual(result.signal, None)
309 self.assertEqual(result.exit_code, 0)
310 self.assertFileContent(self.fname, "")
313 """Test locale environment"""
314 old_env = os.environ.copy()
316 os.environ["LANG"] = "en_US.UTF-8"
317 os.environ["LC_ALL"] = "en_US.UTF-8"
318 result = RunCmd(["locale"])
319 for line in result.output.splitlines():
320 key, value = line.split("=", 1)
321 # Ignore these variables, they're overridden by LC_ALL
322 if key == "LANG" or key == "LANGUAGE":
324 self.failIf(value and value != "C" and value != '"C"',
325 "Variable %s is set to the invalid value '%s'" % (key, value))
329 def testDefaultCwd(self):
330 """Test default working directory"""
331 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
334 """Test default working directory"""
335 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
336 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
338 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
340 def testResetEnv(self):
341 """Test environment reset functionality"""
342 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
343 self.failUnlessEqual(RunCmd(["env"], reset_env=True,
344 env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
347 class TestRunParts(unittest.TestCase):
348 """Testing case for the RunParts function"""
351 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
354 shutil.rmtree(self.rundir)
357 """Test on an empty dir"""
358 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
360 def testSkipWrongName(self):
361 """Test that wrong files are skipped"""
362 fname = os.path.join(self.rundir, "00test.dot")
363 utils.WriteFile(fname, data="")
364 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
365 relname = os.path.basename(fname)
366 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
367 [(relname, constants.RUNPARTS_SKIP, None)])
369 def testSkipNonExec(self):
370 """Test that non executable files are skipped"""
371 fname = os.path.join(self.rundir, "00test")
372 utils.WriteFile(fname, data="")
373 relname = os.path.basename(fname)
374 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
375 [(relname, constants.RUNPARTS_SKIP, None)])
378 """Test error on a broken executable"""
379 fname = os.path.join(self.rundir, "00test")
380 utils.WriteFile(fname, data="")
381 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
382 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
383 self.failUnlessEqual(relname, os.path.basename(fname))
384 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
385 self.failUnless(error)
387 def testSorted(self):
388 """Test executions are sorted"""
390 files.append(os.path.join(self.rundir, "64test"))
391 files.append(os.path.join(self.rundir, "00test"))
392 files.append(os.path.join(self.rundir, "42test"))
395 utils.WriteFile(fname, data="")
397 results = RunParts(self.rundir, reset_env=True)
399 for fname in sorted(files):
400 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
403 """Test correct execution"""
404 fname = os.path.join(self.rundir, "00test")
405 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
406 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
407 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
408 self.failUnlessEqual(relname, os.path.basename(fname))
409 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
410 self.failUnlessEqual(runresult.stdout, "ciao")
412 def testRunFail(self):
413 """Test correct execution, with run failure"""
414 fname = os.path.join(self.rundir, "00test")
415 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
416 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
417 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
418 self.failUnlessEqual(relname, os.path.basename(fname))
419 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
420 self.failUnlessEqual(runresult.exit_code, 1)
421 self.failUnless(runresult.failed)
423 def testRunMix(self):
425 files.append(os.path.join(self.rundir, "00test"))
426 files.append(os.path.join(self.rundir, "42test"))
427 files.append(os.path.join(self.rundir, "64test"))
428 files.append(os.path.join(self.rundir, "99test"))
432 # 1st has errors in execution
433 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
434 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
437 utils.WriteFile(files[1], data="")
439 # 3rd cannot execute properly
440 utils.WriteFile(files[2], data="")
441 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
444 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
445 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
447 results = RunParts(self.rundir, reset_env=True)
449 (relname, status, runresult) = results[0]
450 self.failUnlessEqual(relname, os.path.basename(files[0]))
451 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
452 self.failUnlessEqual(runresult.exit_code, 1)
453 self.failUnless(runresult.failed)
455 (relname, status, runresult) = results[1]
456 self.failUnlessEqual(relname, os.path.basename(files[1]))
457 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
458 self.failUnlessEqual(runresult, None)
460 (relname, status, runresult) = results[2]
461 self.failUnlessEqual(relname, os.path.basename(files[2]))
462 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
463 self.failUnless(runresult)
465 (relname, status, runresult) = results[3]
466 self.failUnlessEqual(relname, os.path.basename(files[3]))
467 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
468 self.failUnlessEqual(runresult.output, "ciao")
469 self.failUnlessEqual(runresult.exit_code, 0)
470 self.failUnless(not runresult.failed)
473 class TestStartDaemon(testutils.GanetiTestCase):
475 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
476 self.tmpfile = os.path.join(self.tmpdir, "test")
479 shutil.rmtree(self.tmpdir)
482 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
483 self._wait(self.tmpfile, 60.0, "Hello World")
485 def testShellOutput(self):
486 utils.StartDaemon("echo Hello World", output=self.tmpfile)
487 self._wait(self.tmpfile, 60.0, "Hello World")
489 def testNoShellNoOutput(self):
490 utils.StartDaemon(["pwd"])
492 def testNoShellNoOutputTouch(self):
493 testfile = os.path.join(self.tmpdir, "check")
494 self.failIf(os.path.exists(testfile))
495 utils.StartDaemon(["touch", testfile])
496 self._wait(testfile, 60.0, "")
498 def testNoShellOutput(self):
499 utils.StartDaemon(["pwd"], output=self.tmpfile)
500 self._wait(self.tmpfile, 60.0, "/")
502 def testNoShellOutputCwd(self):
503 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
504 self._wait(self.tmpfile, 60.0, os.getcwd())
506 def testShellEnv(self):
507 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
508 env={ "GNT_TEST_VAR": "Hello World", })
509 self._wait(self.tmpfile, 60.0, "Hello World")
511 def testNoShellEnv(self):
512 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
513 env={ "GNT_TEST_VAR": "Hello World", })
514 self._wait(self.tmpfile, 60.0, "Hello World")
516 def testOutputFd(self):
517 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
519 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
522 self._wait(self.tmpfile, 60.0, os.getcwd())
525 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
526 self._wait(self.tmpfile, 60.0, str(pid))
528 def testPidFile(self):
529 pidfile = os.path.join(self.tmpdir, "pid")
530 checkfile = os.path.join(self.tmpdir, "abort")
532 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
535 fd = os.open(pidfile, os.O_RDONLY)
537 # Check file is locked
538 self.assertRaises(errors.LockError, utils.LockFile, fd)
540 pidtext = os.read(fd, 100)
544 self.assertEqual(int(pidtext.strip()), pid)
546 self.assert_(utils.IsProcessAlive(pid))
548 # No matter what happens, kill daemon
549 utils.KillProcess(pid, timeout=5.0, waitpid=False)
550 self.failIf(utils.IsProcessAlive(pid))
552 self.assertEqual(utils.ReadFile(self.tmpfile), "")
554 def _wait(self, path, timeout, expected):
555 # Due to the asynchronous nature of daemon processes, polling is necessary.
556 # A timeout makes sure the test doesn't hang forever.
558 if not (os.path.isfile(path) and
559 utils.ReadFile(path).strip() == expected):
560 raise utils.RetryAgain()
563 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
564 except utils.RetryTimeout:
565 self.fail("Apparently the daemon didn't run in %s seconds and/or"
566 " didn't write the correct output" % timeout)
569 self.assertRaises(errors.OpExecError, utils.StartDaemon,
570 ["./does-NOT-EXIST/here/0123456789"])
571 self.assertRaises(errors.OpExecError, utils.StartDaemon,
572 ["./does-NOT-EXIST/here/0123456789"],
573 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
574 self.assertRaises(errors.OpExecError, utils.StartDaemon,
575 ["./does-NOT-EXIST/here/0123456789"],
576 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
577 self.assertRaises(errors.OpExecError, utils.StartDaemon,
578 ["./does-NOT-EXIST/here/0123456789"],
579 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
581 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
583 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
584 ["./does-NOT-EXIST/here/0123456789"],
585 output=self.tmpfile, output_fd=fd)
590 class TestSetCloseOnExecFlag(unittest.TestCase):
591 """Tests for SetCloseOnExecFlag"""
594 self.tmpfile = tempfile.TemporaryFile()
596 def testEnable(self):
597 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
598 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
601 def testDisable(self):
602 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
603 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
607 class TestSetNonblockFlag(unittest.TestCase):
609 self.tmpfile = tempfile.TemporaryFile()
611 def testEnable(self):
612 utils.SetNonblockFlag(self.tmpfile.fileno(), True)
613 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
616 def testDisable(self):
617 utils.SetNonblockFlag(self.tmpfile.fileno(), False)
618 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
622 class TestRemoveFile(unittest.TestCase):
623 """Test case for the RemoveFile function"""
626 """Create a temp dir and file for each case"""
627 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
628 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
632 if os.path.exists(self.tmpfile):
633 os.unlink(self.tmpfile)
634 os.rmdir(self.tmpdir)
636 def testIgnoreDirs(self):
637 """Test that RemoveFile() ignores directories"""
638 self.assertEqual(None, RemoveFile(self.tmpdir))
640 def testIgnoreNotExisting(self):
641 """Test that RemoveFile() ignores non-existing files"""
642 RemoveFile(self.tmpfile)
643 RemoveFile(self.tmpfile)
645 def testRemoveFile(self):
646 """Test that RemoveFile does remove a file"""
647 RemoveFile(self.tmpfile)
648 if os.path.exists(self.tmpfile):
649 self.fail("File '%s' not removed" % self.tmpfile)
651 def testRemoveSymlink(self):
652 """Test that RemoveFile does remove symlinks"""
653 symlink = self.tmpdir + "/symlink"
654 os.symlink("no-such-file", symlink)
656 if os.path.exists(symlink):
657 self.fail("File '%s' not removed" % symlink)
658 os.symlink(self.tmpfile, symlink)
660 if os.path.exists(symlink):
661 self.fail("File '%s' not removed" % symlink)
664 class TestRename(unittest.TestCase):
665 """Test case for RenameFile"""
668 """Create a temporary directory"""
669 self.tmpdir = tempfile.mkdtemp()
670 self.tmpfile = os.path.join(self.tmpdir, "test1")
673 open(self.tmpfile, "w").close()
676 """Remove temporary directory"""
677 shutil.rmtree(self.tmpdir)
679 def testSimpleRename1(self):
680 """Simple rename 1"""
681 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
682 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
684 def testSimpleRename2(self):
685 """Simple rename 2"""
686 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
688 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
690 def testRenameMkdir(self):
691 """Rename with mkdir"""
692 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
694 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
695 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
697 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
698 os.path.join(self.tmpdir, "test/foo/bar/baz"),
700 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
701 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
702 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
705 class TestMatchNameComponent(unittest.TestCase):
706 """Test case for the MatchNameComponent function"""
708 def testEmptyList(self):
709 """Test that there is no match against an empty list"""
711 self.failUnlessEqual(MatchNameComponent("", []), None)
712 self.failUnlessEqual(MatchNameComponent("test", []), None)
714 def testSingleMatch(self):
715 """Test that a single match is performed correctly"""
716 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
717 for key in "test2", "test2.example", "test2.example.com":
718 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
720 def testMultipleMatches(self):
721 """Test that a multiple match is returned as None"""
722 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
723 for key in "test1", "test1.example":
724 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
726 def testFullMatch(self):
727 """Test that a full match is returned correctly"""
729 key2 = "test1.example"
730 mlist = [key2, key2 + ".com"]
731 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
732 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
734 def testCaseInsensitivePartialMatch(self):
735 """Test for the case_insensitive keyword"""
736 mlist = ["test1.example.com", "test2.example.net"]
737 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
739 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
741 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
743 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
747 def testCaseInsensitiveFullMatch(self):
748 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
749 # Between the two ts1 a full string match non-case insensitive should work
750 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
752 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
754 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
756 # Between the two ts2 only case differs, so only case-match works
757 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
759 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
761 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
765 class TestReadFile(testutils.GanetiTestCase):
767 def testReadAll(self):
768 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
769 self.assertEqual(len(data), 814)
771 h = compat.md5_hash()
773 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
775 def testReadSize(self):
776 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
778 self.assertEqual(len(data), 100)
780 h = compat.md5_hash()
782 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
785 self.assertRaises(EnvironmentError, utils.ReadFile,
786 "/dev/null/does-not-exist")
789 class TestReadOneLineFile(testutils.GanetiTestCase):
792 testutils.GanetiTestCase.setUp(self)
794 def testDefault(self):
795 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
796 self.assertEqual(len(data), 27)
797 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
799 def testNotStrict(self):
800 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
801 self.assertEqual(len(data), 27)
802 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
804 def testStrictFailure(self):
805 self.assertRaises(errors.GenericError, ReadOneLineFile,
806 self._TestDataFilename("cert1.pem"), strict=True)
808 def testLongLine(self):
809 dummydata = (1024 * "Hello World! ")
810 myfile = self._CreateTempFile()
811 utils.WriteFile(myfile, data=dummydata)
812 datastrict = ReadOneLineFile(myfile, strict=True)
813 datalax = ReadOneLineFile(myfile, strict=False)
814 self.assertEqual(dummydata, datastrict)
815 self.assertEqual(dummydata, datalax)
817 def testNewline(self):
818 myfile = self._CreateTempFile()
820 for nl in ["", "\n", "\r\n"]:
821 dummydata = "%s%s" % (myline, nl)
822 utils.WriteFile(myfile, data=dummydata)
823 datalax = ReadOneLineFile(myfile, strict=False)
824 self.assertEqual(myline, datalax)
825 datastrict = ReadOneLineFile(myfile, strict=True)
826 self.assertEqual(myline, datastrict)
828 def testWhitespaceAndMultipleLines(self):
829 myfile = self._CreateTempFile()
830 for nl in ["", "\n", "\r\n"]:
831 for ws in [" ", "\t", "\t\t \t", "\t "]:
832 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
833 utils.WriteFile(myfile, data=dummydata)
834 datalax = ReadOneLineFile(myfile, strict=False)
836 self.assert_(set("\r\n") & set(dummydata))
837 self.assertRaises(errors.GenericError, ReadOneLineFile,
839 explen = len("Foo bar baz ") + len(ws)
840 self.assertEqual(len(datalax), explen)
841 self.assertEqual(datalax, dummydata[:explen])
842 self.assertFalse(set("\r\n") & set(datalax))
844 datastrict = ReadOneLineFile(myfile, strict=True)
845 self.assertEqual(dummydata, datastrict)
846 self.assertEqual(dummydata, datalax)
848 def testEmptylines(self):
849 myfile = self._CreateTempFile()
851 for nl in ["\n", "\r\n"]:
852 for ol in ["", "otherline"]:
853 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
854 utils.WriteFile(myfile, data=dummydata)
855 self.assert_(set("\r\n") & set(dummydata))
856 datalax = ReadOneLineFile(myfile, strict=False)
857 self.assertEqual(myline, datalax)
859 self.assertRaises(errors.GenericError, ReadOneLineFile,
862 datastrict = ReadOneLineFile(myfile, strict=True)
863 self.assertEqual(myline, datastrict)
866 class TestTimestampForFilename(unittest.TestCase):
868 self.assert_("." not in utils.TimestampForFilename())
869 self.assert_(":" not in utils.TimestampForFilename())
872 class TestCreateBackup(testutils.GanetiTestCase):
874 testutils.GanetiTestCase.setUp(self)
876 self.tmpdir = tempfile.mkdtemp()
879 testutils.GanetiTestCase.tearDown(self)
881 shutil.rmtree(self.tmpdir)
884 filename = utils.PathJoin(self.tmpdir, "config.data")
885 utils.WriteFile(filename, data="")
886 bname = utils.CreateBackup(filename)
887 self.assertFileContent(bname, "")
888 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
889 utils.CreateBackup(filename)
890 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
891 utils.CreateBackup(filename)
892 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
894 fifoname = utils.PathJoin(self.tmpdir, "fifo")
896 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
898 def testContent(self):
900 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
901 for rep in [1, 2, 10, 127]:
902 testdata = data * rep
904 filename = utils.PathJoin(self.tmpdir, "test.data_")
905 utils.WriteFile(filename, data=testdata)
906 self.assertFileContent(filename, testdata)
909 bname = utils.CreateBackup(filename)
911 self.assertFileContent(bname, testdata)
912 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
915 class TestFormatUnit(unittest.TestCase):
916 """Test case for the FormatUnit function"""
919 self.assertEqual(FormatUnit(1, 'h'), '1M')
920 self.assertEqual(FormatUnit(100, 'h'), '100M')
921 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
923 self.assertEqual(FormatUnit(1, 'm'), '1')
924 self.assertEqual(FormatUnit(100, 'm'), '100')
925 self.assertEqual(FormatUnit(1023, 'm'), '1023')
927 self.assertEqual(FormatUnit(1024, 'm'), '1024')
928 self.assertEqual(FormatUnit(1536, 'm'), '1536')
929 self.assertEqual(FormatUnit(17133, 'm'), '17133')
930 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
933 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
934 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
935 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
936 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
938 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
939 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
940 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
941 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
943 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
944 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
945 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
948 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
949 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
950 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
952 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
953 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
954 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
956 class TestParseUnit(unittest.TestCase):
957 """Test case for the ParseUnit function"""
960 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
961 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
962 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
964 def testRounding(self):
965 self.assertEqual(ParseUnit('0'), 0)
966 self.assertEqual(ParseUnit('1'), 4)
967 self.assertEqual(ParseUnit('2'), 4)
968 self.assertEqual(ParseUnit('3'), 4)
970 self.assertEqual(ParseUnit('124'), 124)
971 self.assertEqual(ParseUnit('125'), 128)
972 self.assertEqual(ParseUnit('126'), 128)
973 self.assertEqual(ParseUnit('127'), 128)
974 self.assertEqual(ParseUnit('128'), 128)
975 self.assertEqual(ParseUnit('129'), 132)
976 self.assertEqual(ParseUnit('130'), 132)
978 def testFloating(self):
979 self.assertEqual(ParseUnit('0'), 0)
980 self.assertEqual(ParseUnit('0.5'), 4)
981 self.assertEqual(ParseUnit('1.75'), 4)
982 self.assertEqual(ParseUnit('1.99'), 4)
983 self.assertEqual(ParseUnit('2.00'), 4)
984 self.assertEqual(ParseUnit('2.01'), 4)
985 self.assertEqual(ParseUnit('3.99'), 4)
986 self.assertEqual(ParseUnit('4.00'), 4)
987 self.assertEqual(ParseUnit('4.01'), 8)
988 self.assertEqual(ParseUnit('1.5G'), 1536)
989 self.assertEqual(ParseUnit('1.8G'), 1844)
990 self.assertEqual(ParseUnit('8.28T'), 8682212)
992 def testSuffixes(self):
993 for sep in ('', ' ', ' ', "\t", "\t "):
994 for suffix, scale in TestParseUnit.SCALES:
995 for func in (lambda x: x, str.lower, str.upper):
996 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
999 def testInvalidInput(self):
1000 for sep in ('-', '_', ',', 'a'):
1001 for suffix, _ in TestParseUnit.SCALES:
1002 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
1004 for suffix, _ in TestParseUnit.SCALES:
1005 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
1008 class TestSshKeys(testutils.GanetiTestCase):
1009 """Test case for the AddAuthorizedKey function"""
1011 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1012 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
1013 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1016 testutils.GanetiTestCase.setUp(self)
1017 self.tmpname = self._CreateTempFile()
1018 handle = open(self.tmpname, 'w')
1020 handle.write("%s\n" % TestSshKeys.KEY_A)
1021 handle.write("%s\n" % TestSshKeys.KEY_B)
1025 def testAddingNewKey(self):
1026 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1028 self.assertFileContent(self.tmpname,
1029 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1030 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1031 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1032 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1034 def testAddingAlmostButNotCompletelyTheSameKey(self):
1035 AddAuthorizedKey(self.tmpname,
1036 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1038 self.assertFileContent(self.tmpname,
1039 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1040 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1041 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1042 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1044 def testAddingExistingKeyWithSomeMoreSpaces(self):
1045 AddAuthorizedKey(self.tmpname,
1046 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1048 self.assertFileContent(self.tmpname,
1049 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1050 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1051 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1053 def testRemovingExistingKeyWithSomeMoreSpaces(self):
1054 RemoveAuthorizedKey(self.tmpname,
1055 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1057 self.assertFileContent(self.tmpname,
1058 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1059 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1061 def testRemovingNonExistingKey(self):
1062 RemoveAuthorizedKey(self.tmpname,
1063 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
1065 self.assertFileContent(self.tmpname,
1066 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1067 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1068 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1071 class TestEtcHosts(testutils.GanetiTestCase):
1072 """Test functions modifying /etc/hosts"""
1075 testutils.GanetiTestCase.setUp(self)
1076 self.tmpname = self._CreateTempFile()
1077 handle = open(self.tmpname, 'w')
1079 handle.write('# This is a test file for /etc/hosts\n')
1080 handle.write('127.0.0.1\tlocalhost\n')
1081 handle.write('192.168.1.1 router gw\n')
1085 def testSettingNewIp(self):
1086 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
1088 self.assertFileContent(self.tmpname,
1089 "# This is a test file for /etc/hosts\n"
1090 "127.0.0.1\tlocalhost\n"
1091 "192.168.1.1 router gw\n"
1092 "1.2.3.4\tmyhost.domain.tld myhost\n")
1093 self.assertFileMode(self.tmpname, 0644)
1095 def testSettingExistingIp(self):
1096 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
1099 self.assertFileContent(self.tmpname,
1100 "# This is a test file for /etc/hosts\n"
1101 "127.0.0.1\tlocalhost\n"
1102 "192.168.1.1\tmyhost.domain.tld myhost\n")
1103 self.assertFileMode(self.tmpname, 0644)
1105 def testSettingDuplicateName(self):
1106 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
1108 self.assertFileContent(self.tmpname,
1109 "# This is a test file for /etc/hosts\n"
1110 "127.0.0.1\tlocalhost\n"
1111 "192.168.1.1 router gw\n"
1112 "1.2.3.4\tmyhost\n")
1113 self.assertFileMode(self.tmpname, 0644)
1115 def testRemovingExistingHost(self):
1116 RemoveEtcHostsEntry(self.tmpname, 'router')
1118 self.assertFileContent(self.tmpname,
1119 "# This is a test file for /etc/hosts\n"
1120 "127.0.0.1\tlocalhost\n"
1122 self.assertFileMode(self.tmpname, 0644)
1124 def testRemovingSingleExistingHost(self):
1125 RemoveEtcHostsEntry(self.tmpname, 'localhost')
1127 self.assertFileContent(self.tmpname,
1128 "# This is a test file for /etc/hosts\n"
1129 "192.168.1.1 router gw\n")
1130 self.assertFileMode(self.tmpname, 0644)
1132 def testRemovingNonExistingHost(self):
1133 RemoveEtcHostsEntry(self.tmpname, 'myhost')
1135 self.assertFileContent(self.tmpname,
1136 "# This is a test file for /etc/hosts\n"
1137 "127.0.0.1\tlocalhost\n"
1138 "192.168.1.1 router gw\n")
1139 self.assertFileMode(self.tmpname, 0644)
1141 def testRemovingAlias(self):
1142 RemoveEtcHostsEntry(self.tmpname, 'gw')
1144 self.assertFileContent(self.tmpname,
1145 "# This is a test file for /etc/hosts\n"
1146 "127.0.0.1\tlocalhost\n"
1147 "192.168.1.1 router\n")
1148 self.assertFileMode(self.tmpname, 0644)
1151 class TestShellQuoting(unittest.TestCase):
1152 """Test case for shell quoting functions"""
1154 def testShellQuote(self):
1155 self.assertEqual(ShellQuote('abc'), "abc")
1156 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1157 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1158 self.assertEqual(ShellQuote("a b c"), "'a b c'")
1159 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1161 def testShellQuoteArgs(self):
1162 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1163 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1164 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1167 class TestTcpPing(unittest.TestCase):
1168 """Testcase for TCP version of ping - against listen(2)ing port"""
1171 self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1172 self.listener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
1173 self.listenerport = self.listener.getsockname()[1]
1174 self.listener.listen(1)
1177 self.listener.shutdown(socket.SHUT_RDWR)
1179 del self.listenerport
1181 def testTcpPingToLocalHostAccept(self):
1182 self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
1185 live_port_needed=True,
1186 source=constants.IP4_ADDRESS_LOCALHOST,
1188 "failed to connect to test listener")
1190 self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
1193 live_port_needed=True,
1195 "failed to connect to test listener (no source)")
1198 class TestTcpPingDeaf(unittest.TestCase):
1199 """Testcase for TCP version of ping - against non listen(2)ing port"""
1202 self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1203 self.deaflistener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
1204 self.deaflistenerport = self.deaflistener.getsockname()[1]
1207 del self.deaflistener
1208 del self.deaflistenerport
1210 def testTcpPingToLocalHostAcceptDeaf(self):
1211 self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
1212 self.deaflistenerport,
1213 timeout=constants.TCP_PING_TIMEOUT,
1214 live_port_needed=True,
1215 source=constants.IP4_ADDRESS_LOCALHOST,
1216 ), # need successful connect(2)
1217 "successfully connected to deaf listener")
1219 self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
1220 self.deaflistenerport,
1221 timeout=constants.TCP_PING_TIMEOUT,
1222 live_port_needed=True,
1223 ), # need successful connect(2)
1224 "successfully connected to deaf listener (no source addr)")
1226 def testTcpPingToLocalHostNoAccept(self):
1227 self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
1228 self.deaflistenerport,
1229 timeout=constants.TCP_PING_TIMEOUT,
1230 live_port_needed=False,
1231 source=constants.IP4_ADDRESS_LOCALHOST,
1232 ), # ECONNREFUSED is OK
1233 "failed to ping alive host on deaf port")
1235 self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
1236 self.deaflistenerport,
1237 timeout=constants.TCP_PING_TIMEOUT,
1238 live_port_needed=False,
1239 ), # ECONNREFUSED is OK
1240 "failed to ping alive host on deaf port (no source addr)")
1243 class TestOwnIpAddress(unittest.TestCase):
1244 """Testcase for OwnIpAddress"""
1246 def testOwnLoopback(self):
1247 """check having the loopback ip"""
1248 self.failUnless(OwnIpAddress(constants.IP4_ADDRESS_LOCALHOST),
1249 "Should own the loopback address")
1251 def testNowOwnAddress(self):
1252 """check that I don't own an address"""
1254 # Network 192.0.2.0/24 is reserved for test/documentation as per
1255 # RFC 5735, so we *should* not have an address of this range... if
1256 # this fails, we should extend the test to multiple addresses
1257 DST_IP = "192.0.2.1"
1258 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
1261 def _GetSocketCredentials(path):
1262 """Connect to a Unix socket and return remote credentials.
1265 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1269 return utils.GetSocketCredentials(sock)
1274 class TestGetSocketCredentials(unittest.TestCase):
1276 self.tmpdir = tempfile.mkdtemp()
1277 self.sockpath = utils.PathJoin(self.tmpdir, "sock")
1279 self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1280 self.listener.settimeout(10)
1281 self.listener.bind(self.sockpath)
1282 self.listener.listen(1)
1285 self.listener.shutdown(socket.SHUT_RDWR)
1286 self.listener.close()
1287 shutil.rmtree(self.tmpdir)
1290 (c2pr, c2pw) = os.pipe()
1292 # Start child process
1296 data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
1298 os.write(c2pw, data)
1307 # Wait for one connection
1308 (conn, _) = self.listener.accept()
1313 result = os.read(c2pr, 4096)
1316 # Check child's exit code
1317 (_, status) = os.waitpid(child, 0)
1318 self.assertFalse(os.WIFSIGNALED(status))
1319 self.assertEqual(os.WEXITSTATUS(status), 0)
1322 (pid, uid, gid) = serializer.LoadJson(result)
1323 self.assertEqual(pid, os.getpid())
1324 self.assertEqual(uid, os.getuid())
1325 self.assertEqual(gid, os.getgid())
1328 class TestListVisibleFiles(unittest.TestCase):
1329 """Test case for ListVisibleFiles"""
1332 self.path = tempfile.mkdtemp()
1335 shutil.rmtree(self.path)
1337 def _CreateFiles(self, files):
1339 utils.WriteFile(os.path.join(self.path, name), data="test")
1341 def _test(self, files, expected):
1342 self._CreateFiles(files)
1343 found = ListVisibleFiles(self.path)
1344 self.assertEqual(set(found), set(expected))
1346 def testAllVisible(self):
1347 files = ["a", "b", "c"]
1349 self._test(files, expected)
1351 def testNoneVisible(self):
1352 files = [".a", ".b", ".c"]
1354 self._test(files, expected)
1356 def testSomeVisible(self):
1357 files = ["a", "b", ".c"]
1358 expected = ["a", "b"]
1359 self._test(files, expected)
1361 def testNonAbsolutePath(self):
1362 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1364 def testNonNormalizedPath(self):
1365 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1369 class TestNewUUID(unittest.TestCase):
1370 """Test case for NewUUID"""
1372 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1373 '[a-f0-9]{4}-[a-f0-9]{12}$')
1376 self.failUnless(self._re_uuid.match(utils.NewUUID()))
1379 class TestUniqueSequence(unittest.TestCase):
1380 """Test case for UniqueSequence"""
1382 def _test(self, input, expected):
1383 self.assertEqual(utils.UniqueSequence(input), expected)
1387 self._test([1, 2, 3], [1, 2, 3])
1388 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1389 self._test([1, 2, 2, 3], [1, 2, 3])
1390 self._test([1, 2, 3, 3], [1, 2, 3])
1393 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1394 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1397 self._test(["a", "a"], ["a"])
1398 self._test(["a", "b"], ["a", "b"])
1399 self._test(["a", "b", "a"], ["a", "b"])
1402 class TestFirstFree(unittest.TestCase):
1403 """Test case for the FirstFree function"""
1406 """Test FirstFree"""
1407 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1408 self.failUnlessEqual(FirstFree([]), None)
1409 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1410 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1411 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1414 class TestTailFile(testutils.GanetiTestCase):
1415 """Test case for the TailFile function"""
1417 def testEmpty(self):
1418 fname = self._CreateTempFile()
1419 self.failUnlessEqual(TailFile(fname), [])
1420 self.failUnlessEqual(TailFile(fname, lines=25), [])
1422 def testAllLines(self):
1423 data = ["test %d" % i for i in range(30)]
1425 fname = self._CreateTempFile()
1426 fd = open(fname, "w")
1427 fd.write("\n".join(data[:i]))
1431 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1433 def testPartialLines(self):
1434 data = ["test %d" % i for i in range(30)]
1435 fname = self._CreateTempFile()
1436 fd = open(fname, "w")
1437 fd.write("\n".join(data))
1440 for i in range(1, 30):
1441 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1443 def testBigFile(self):
1444 data = ["test %d" % i for i in range(30)]
1445 fname = self._CreateTempFile()
1446 fd = open(fname, "w")
1447 fd.write("X" * 1048576)
1449 fd.write("\n".join(data))
1452 for i in range(1, 30):
1453 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1456 class _BaseFileLockTest:
1457 """Test case for the FileLock class"""
1459 def testSharedNonblocking(self):
1460 self.lock.Shared(blocking=False)
1463 def testExclusiveNonblocking(self):
1464 self.lock.Exclusive(blocking=False)
1467 def testUnlockNonblocking(self):
1468 self.lock.Unlock(blocking=False)
1471 def testSharedBlocking(self):
1472 self.lock.Shared(blocking=True)
1475 def testExclusiveBlocking(self):
1476 self.lock.Exclusive(blocking=True)
1479 def testUnlockBlocking(self):
1480 self.lock.Unlock(blocking=True)
1483 def testSharedExclusiveUnlock(self):
1484 self.lock.Shared(blocking=False)
1485 self.lock.Exclusive(blocking=False)
1486 self.lock.Unlock(blocking=False)
1489 def testExclusiveSharedUnlock(self):
1490 self.lock.Exclusive(blocking=False)
1491 self.lock.Shared(blocking=False)
1492 self.lock.Unlock(blocking=False)
1495 def testSimpleTimeout(self):
1496 # These will succeed on the first attempt, hence a short timeout
1497 self.lock.Shared(blocking=True, timeout=10.0)
1498 self.lock.Exclusive(blocking=False, timeout=10.0)
1499 self.lock.Unlock(blocking=True, timeout=10.0)
1503 def _TryLockInner(filename, shared, blocking):
1504 lock = utils.FileLock.Open(filename)
1512 # The timeout doesn't really matter as the parent process waits for us to
1514 fn(blocking=blocking, timeout=0.01)
1515 except errors.LockError, err:
1520 def _TryLock(self, *args):
1521 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1524 def testTimeout(self):
1525 for blocking in [True, False]:
1526 self.lock.Exclusive(blocking=True)
1527 self.failIf(self._TryLock(False, blocking))
1528 self.failIf(self._TryLock(True, blocking))
1530 self.lock.Shared(blocking=True)
1531 self.assert_(self._TryLock(True, blocking))
1532 self.failIf(self._TryLock(False, blocking))
1534 def testCloseShared(self):
1536 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1538 def testCloseExclusive(self):
1540 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1542 def testCloseUnlock(self):
1544 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1547 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1548 TESTDATA = "Hello World\n" * 10
1551 testutils.GanetiTestCase.setUp(self)
1553 self.tmpfile = tempfile.NamedTemporaryFile()
1554 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1555 self.lock = utils.FileLock.Open(self.tmpfile.name)
1557 # Ensure "Open" didn't truncate file
1558 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1561 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1563 testutils.GanetiTestCase.tearDown(self)
1566 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1568 self.tmpfile = tempfile.NamedTemporaryFile()
1569 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1572 class TestTimeFunctions(unittest.TestCase):
1573 """Test case for time functions"""
1576 self.assertEqual(utils.SplitTime(1), (1, 0))
1577 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1578 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1579 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1580 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1581 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1582 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1583 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1585 self.assertRaises(AssertionError, utils.SplitTime, -1)
1587 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1588 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1589 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1591 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1593 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1595 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1596 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1597 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1598 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1599 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1602 class FieldSetTestCase(unittest.TestCase):
1603 """Test case for FieldSets"""
1605 def testSimpleMatch(self):
1606 f = utils.FieldSet("a", "b", "c", "def")
1607 self.failUnless(f.Matches("a"))
1608 self.failIf(f.Matches("d"), "Substring matched")
1609 self.failIf(f.Matches("defghi"), "Prefix string matched")
1610 self.failIf(f.NonMatching(["b", "c"]))
1611 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1612 self.failUnless(f.NonMatching(["a", "d"]))
1614 def testRegexMatch(self):
1615 f = utils.FieldSet("a", "b([0-9]+)", "c")
1616 self.failUnless(f.Matches("b1"))
1617 self.failUnless(f.Matches("b99"))
1618 self.failIf(f.Matches("b/1"))
1619 self.failIf(f.NonMatching(["b12", "c"]))
1620 self.failUnless(f.NonMatching(["a", "1"]))
1622 class TestForceDictType(unittest.TestCase):
1623 """Test case for ForceDictType"""
1627 'a': constants.VTYPE_INT,
1628 'b': constants.VTYPE_BOOL,
1629 'c': constants.VTYPE_STRING,
1630 'd': constants.VTYPE_SIZE,
1633 def _fdt(self, dict, allowed_values=None):
1634 if allowed_values is None:
1635 ForceDictType(dict, self.key_types)
1637 ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1641 def testSimpleDict(self):
1642 self.assertEqual(self._fdt({}), {})
1643 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1644 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1645 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1646 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1647 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1648 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1649 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1650 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1651 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1652 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1653 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1655 def testErrors(self):
1656 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1657 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1658 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1659 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1662 class TestIsNormAbsPath(unittest.TestCase):
1663 """Testing case for IsNormAbsPath"""
1665 def _pathTestHelper(self, path, result):
1667 self.assert_(IsNormAbsPath(path),
1668 "Path %s should result absolute and normalized" % path)
1670 self.assertFalse(IsNormAbsPath(path),
1671 "Path %s should not result absolute and normalized" % path)
1674 self._pathTestHelper('/etc', True)
1675 self._pathTestHelper('/srv', True)
1676 self._pathTestHelper('etc', False)
1677 self._pathTestHelper('/etc/../root', False)
1678 self._pathTestHelper('/etc/', False)
1681 class TestSafeEncode(unittest.TestCase):
1682 """Test case for SafeEncode"""
1684 def testAscii(self):
1685 for txt in [string.digits, string.letters, string.punctuation]:
1686 self.failUnlessEqual(txt, SafeEncode(txt))
1688 def testDoubleEncode(self):
1689 for i in range(255):
1690 txt = SafeEncode(chr(i))
1691 self.failUnlessEqual(txt, SafeEncode(txt))
1693 def testUnicode(self):
1694 # 1024 is high enough to catch non-direct ASCII mappings
1695 for i in range(1024):
1696 txt = SafeEncode(unichr(i))
1697 self.failUnlessEqual(txt, SafeEncode(txt))
1700 class TestFormatTime(unittest.TestCase):
1701 """Testing case for FormatTime"""
1704 self.failUnlessEqual(FormatTime(None), "N/A")
1706 def testInvalid(self):
1707 self.failUnlessEqual(FormatTime(()), "N/A")
1710 # tests that we accept time.time input
1711 FormatTime(time.time())
1712 # tests that we accept int input
1713 FormatTime(int(time.time()))
1716 class RunInSeparateProcess(unittest.TestCase):
1718 for exp in [True, False]:
1722 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1725 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1726 def _child(carg1, carg2):
1727 return carg1 == "Foo" and carg2 == arg
1729 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1732 parent_pid = os.getpid()
1735 return os.getpid() == parent_pid
1737 self.failIf(utils.RunInSeparateProcess(_check))
1739 def testSignal(self):
1741 os.kill(os.getpid(), signal.SIGTERM)
1743 self.assertRaises(errors.GenericError,
1744 utils.RunInSeparateProcess, _kill)
1746 def testException(self):
1748 raise errors.GenericError("This is a test")
1750 self.assertRaises(errors.GenericError,
1751 utils.RunInSeparateProcess, _exc)
1754 class TestFingerprintFile(unittest.TestCase):
1756 self.tmpfile = tempfile.NamedTemporaryFile()
1759 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1760 "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1762 utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1763 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1764 "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1767 class TestUnescapeAndSplit(unittest.TestCase):
1768 """Testing case for UnescapeAndSplit"""
1771 # testing more that one separator for regexp safety
1772 self._seps = [",", "+", "."]
1774 def testSimple(self):
1775 a = ["a", "b", "c", "d"]
1776 for sep in self._seps:
1777 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1779 def testEscape(self):
1780 for sep in self._seps:
1781 a = ["a", "b\\" + sep + "c", "d"]
1782 b = ["a", "b" + sep + "c", "d"]
1783 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1785 def testDoubleEscape(self):
1786 for sep in self._seps:
1787 a = ["a", "b\\\\", "c", "d"]
1788 b = ["a", "b\\", "c", "d"]
1789 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1791 def testThreeEscape(self):
1792 for sep in self._seps:
1793 a = ["a", "b\\\\\\" + sep + "c", "d"]
1794 b = ["a", "b\\" + sep + "c", "d"]
1795 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1798 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1800 self.tmpdir = tempfile.mkdtemp()
1803 shutil.rmtree(self.tmpdir)
1805 def _checkRsaPrivateKey(self, key):
1806 lines = key.splitlines()
1807 return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1808 "-----END RSA PRIVATE KEY-----" in lines)
1810 def _checkCertificate(self, cert):
1811 lines = cert.splitlines()
1812 return ("-----BEGIN CERTIFICATE-----" in lines and
1813 "-----END CERTIFICATE-----" in lines)
1816 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1817 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1818 self._checkRsaPrivateKey(key_pem)
1819 self._checkCertificate(cert_pem)
1821 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1823 self.assert_(key.bits() >= 1024)
1824 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1825 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1827 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1829 self.failIf(x509.has_expired())
1830 self.assertEqual(x509.get_issuer().CN, common_name)
1831 self.assertEqual(x509.get_subject().CN, common_name)
1832 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1834 def testLegacy(self):
1835 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1837 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1839 cert1 = utils.ReadFile(cert1_filename)
1841 self.assert_(self._checkRsaPrivateKey(cert1))
1842 self.assert_(self._checkCertificate(cert1))
1845 class TestPathJoin(unittest.TestCase):
1846 """Testing case for PathJoin"""
1848 def testBasicItems(self):
1849 mlist = ["/a", "b", "c"]
1850 self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1852 def testNonAbsPrefix(self):
1853 self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1855 def testBackTrack(self):
1856 self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1858 def testMultiAbs(self):
1859 self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1862 class TestHostInfo(unittest.TestCase):
1863 """Testing case for HostInfo"""
1865 def testUppercase(self):
1866 data = "AbC.example.com"
1867 self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1869 def testTooLongName(self):
1870 data = "a.b." + "c" * 255
1871 self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1873 def testTrailingDot(self):
1875 self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1877 def testInvalidName(self):
1885 self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1887 def testValidName(self):
1895 HostInfo.NormalizeName(value)
1898 class TestValidateServiceName(unittest.TestCase):
1899 def testValid(self):
1901 0, 1, 2, 3, 1024, 65000, 65534, 65535,
1906 "0", "80", "1111", "65535",
1909 for name in testnames:
1910 self.assertEqual(utils.ValidateServiceName(name), name)
1912 def testInvalid(self):
1914 -15756, -1, 65536, 133428083,
1915 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1916 "-8546", "-1", "65536",
1920 for name in testnames:
1921 self.assertRaises(OpPrereqError, utils.ValidateServiceName, name)
1924 class TestParseAsn1Generalizedtime(unittest.TestCase):
1927 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1928 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1930 self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1934 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1936 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1938 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1940 self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1942 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1945 # Leap seconds are not supported by datetime.datetime
1946 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1947 "19841231235960+0000")
1948 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1949 "19920630235960+0000")
1952 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1953 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1954 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1956 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1957 "Mon Feb 22 17:47:02 UTC 2010")
1958 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1959 "2010-02-22 17:42:02")
1962 class TestGetX509CertValidity(testutils.GanetiTestCase):
1964 testutils.GanetiTestCase.setUp(self)
1966 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1968 # Test whether we have pyOpenSSL 0.7 or above
1969 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1971 if not self.pyopenssl0_7:
1972 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1973 " function correctly")
1975 def _LoadCert(self, name):
1976 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1977 self._ReadTestData(name))
1980 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1981 if self.pyopenssl0_7:
1982 self.assertEqual(validity, (1266919967, 1267524767))
1984 self.assertEqual(validity, (None, None))
1987 class TestSignX509Certificate(unittest.TestCase):
1988 KEY = "My private key!"
1989 KEY_OTHER = "Another key"
1992 # Generate certificate valid for 5 minutes
1993 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1995 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1998 # No signature at all
1999 self.assertRaises(errors.GenericError,
2000 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
2003 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2005 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2006 "X-Ganeti-Signature: \n", self.KEY)
2007 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2008 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
2009 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2010 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
2011 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2012 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
2015 for salt in list("-_@$,:;/\\ \t\n"):
2016 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
2017 cert_pem, self.KEY, "foo%sbar" % salt)
2019 for salt in ["HelloWorld", "salt", string.letters, string.digits,
2020 utils.GenerateSecret(numbytes=4),
2021 utils.GenerateSecret(numbytes=16),
2022 "{123:456}".encode("hex")]:
2023 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
2025 self._Check(cert, salt, signed_pem)
2027 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
2028 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
2029 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
2030 "lines----\n------ at\nthe end!"))
2032 def _Check(self, cert, salt, pem):
2033 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
2034 self.assertEqual(salt, salt2)
2035 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
2038 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2039 pem, self.KEY_OTHER)
2042 class TestMakedirs(unittest.TestCase):
2044 self.tmpdir = tempfile.mkdtemp()
2047 shutil.rmtree(self.tmpdir)
2049 def testNonExisting(self):
2050 path = utils.PathJoin(self.tmpdir, "foo")
2051 utils.Makedirs(path)
2052 self.assert_(os.path.isdir(path))
2054 def testExisting(self):
2055 path = utils.PathJoin(self.tmpdir, "foo")
2057 utils.Makedirs(path)
2058 self.assert_(os.path.isdir(path))
2060 def testRecursiveNonExisting(self):
2061 path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
2062 utils.Makedirs(path)
2063 self.assert_(os.path.isdir(path))
2065 def testRecursiveExisting(self):
2066 path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
2067 self.assertFalse(os.path.exists(path))
2068 os.mkdir(utils.PathJoin(self.tmpdir, "B"))
2069 utils.Makedirs(path)
2070 self.assert_(os.path.isdir(path))
2073 class TestRetry(testutils.GanetiTestCase):
2075 testutils.GanetiTestCase.setUp(self)
2079 def _RaiseRetryAgain():
2080 raise utils.RetryAgain()
2083 def _RaiseRetryAgainWithArg(args):
2084 raise utils.RetryAgain(*args)
2086 def _WrongNestedLoop(self):
2087 return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
2089 def _RetryAndSucceed(self, retries):
2090 if self.retries < retries:
2092 raise utils.RetryAgain()
2096 def testRaiseTimeout(self):
2097 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2098 self._RaiseRetryAgain, 0.01, 0.02)
2099 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2100 self._RetryAndSucceed, 0.01, 0, args=[1])
2101 self.failUnlessEqual(self.retries, 1)
2103 def testComplete(self):
2104 self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
2105 self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
2107 self.failUnlessEqual(self.retries, 2)
2109 def testNestedLoop(self):
2111 self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
2112 self._WrongNestedLoop, 0, 1)
2113 except utils.RetryTimeout:
2114 self.fail("Didn't detect inner loop's exception")
2116 def testTimeoutArgument(self):
2117 retry_arg="my_important_debugging_message"
2119 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
2120 except utils.RetryTimeout, err:
2121 self.failUnlessEqual(err.args, (retry_arg, ))
2123 self.fail("Expected timeout didn't happen")
2125 def testRaiseInnerWithExc(self):
2126 retry_arg="my_important_debugging_message"
2129 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2130 args=[[errors.GenericError(retry_arg, retry_arg)]])
2131 except utils.RetryTimeout, err:
2134 self.fail("Expected timeout didn't happen")
2135 except errors.GenericError, err:
2136 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2138 self.fail("Expected GenericError didn't happen")
2140 def testRaiseInnerWithMsg(self):
2141 retry_arg="my_important_debugging_message"
2144 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2145 args=[[retry_arg, retry_arg]])
2146 except utils.RetryTimeout, err:
2149 self.fail("Expected timeout didn't happen")
2150 except utils.RetryTimeout, err:
2151 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2153 self.fail("Expected RetryTimeout didn't happen")
2156 class TestLineSplitter(unittest.TestCase):
2159 ls = utils.LineSplitter(lines.append)
2160 ls.write("Hello World\n")
2161 self.assertEqual(lines, [])
2162 ls.write("Foo\n Bar\r\n ")
2165 self.assertEqual(lines, [])
2167 self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
2169 self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
2171 def _testExtra(self, line, all_lines, p1, p2):
2172 self.assertEqual(p1, 999)
2173 self.assertEqual(p2, "extra")
2174 all_lines.append(line)
2176 def testExtraArgsNoFlush(self):
2178 ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
2179 ls.write("\n\nHello World\n")
2180 ls.write("Foo\n Bar\r\n ")
2183 ls.write("Moo\n\nx\n")
2184 self.assertEqual(lines, [])
2186 self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2190 class TestReadLockedPidFile(unittest.TestCase):
2192 self.tmpdir = tempfile.mkdtemp()
2195 shutil.rmtree(self.tmpdir)
2197 def testNonExistent(self):
2198 path = utils.PathJoin(self.tmpdir, "nonexist")
2199 self.assert_(utils.ReadLockedPidFile(path) is None)
2201 def testUnlocked(self):
2202 path = utils.PathJoin(self.tmpdir, "pid")
2203 utils.WriteFile(path, data="123")
2204 self.assert_(utils.ReadLockedPidFile(path) is None)
2206 def testLocked(self):
2207 path = utils.PathJoin(self.tmpdir, "pid")
2208 utils.WriteFile(path, data="123")
2210 fl = utils.FileLock.Open(path)
2212 fl.Exclusive(blocking=True)
2214 self.assertEqual(utils.ReadLockedPidFile(path), 123)
2218 self.assert_(utils.ReadLockedPidFile(path) is None)
2220 def testError(self):
2221 path = utils.PathJoin(self.tmpdir, "foobar", "pid")
2222 utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
2223 # open(2) should return ENOTDIR
2224 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2227 class TestCertVerification(testutils.GanetiTestCase):
2229 testutils.GanetiTestCase.setUp(self)
2231 self.tmpdir = tempfile.mkdtemp()
2234 shutil.rmtree(self.tmpdir)
2236 def testVerifyCertificate(self):
2237 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2238 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2241 # Not checking return value as this certificate is expired
2242 utils.VerifyX509Certificate(cert, 30, 7)
2245 class TestVerifyCertificateInner(unittest.TestCase):
2247 vci = utils._VerifyCertificateInner
2250 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2254 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2255 self.assertEqual(errcode, utils.CERT_WARNING)
2258 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2259 self.assertEqual(errcode, utils.CERT_ERROR)
2261 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2262 self.assertEqual(errcode, utils.CERT_WARNING)
2264 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2265 self.assertEqual(errcode, None)
2268 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2269 self.assertEqual(errcode, utils.CERT_ERROR)
2271 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2272 self.assertEqual(errcode, utils.CERT_ERROR)
2274 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2275 self.assertEqual(errcode, utils.CERT_ERROR)
2277 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2278 self.assertEqual(errcode, utils.CERT_ERROR)
2281 class TestHmacFunctions(unittest.TestCase):
2282 # Digests can be checked with "openssl sha1 -hmac $key"
2283 def testSha1Hmac(self):
2284 self.assertEqual(utils.Sha1Hmac("", ""),
2285 "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2286 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2287 "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2288 self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2289 "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2291 longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2292 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2293 "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2295 def testSha1HmacSalt(self):
2296 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2297 "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2298 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2299 "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2300 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2301 "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2303 def testVerifySha1Hmac(self):
2304 self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2305 "7d64b71fb76370690e1d")))
2306 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2307 ("f904c2476527c6d3e660"
2308 "9ab683c66fa0652cb1dc")))
2310 digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2311 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2312 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2314 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2316 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2319 def testVerifySha1HmacSalt(self):
2320 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2321 ("17a4adc34d69c0d367d4"
2322 "ffbef96fd41d4df7a6e8"),
2324 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2325 ("7f264f8114c9066afc9b"
2326 "b7636e1786d996d3cc0d"),
2330 class TestIgnoreSignals(unittest.TestCase):
2331 """Test the IgnoreSignals decorator"""
2334 def _Raise(exception):
2341 def testIgnoreSignals(self):
2342 sock_err_intr = socket.error(errno.EINTR, "Message")
2343 sock_err_inval = socket.error(errno.EINVAL, "Message")
2345 env_err_intr = EnvironmentError(errno.EINTR, "Message")
2346 env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2348 self.assertRaises(socket.error, self._Raise, sock_err_intr)
2349 self.assertRaises(socket.error, self._Raise, sock_err_inval)
2350 self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2351 self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2353 self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2354 self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2355 self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2357 self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2360 self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2361 self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2364 class TestEnsureDirs(unittest.TestCase):
2365 """Tests for EnsureDirs"""
2368 self.dir = tempfile.mkdtemp()
2369 self.old_umask = os.umask(0777)
2371 def testEnsureDirs(self):
2373 (utils.PathJoin(self.dir, "foo"), 0777),
2374 (utils.PathJoin(self.dir, "bar"), 0000),
2376 self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2377 self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2380 os.rmdir(utils.PathJoin(self.dir, "foo"))
2381 os.rmdir(utils.PathJoin(self.dir, "bar"))
2383 os.umask(self.old_umask)
2386 class TestFormatSeconds(unittest.TestCase):
2388 self.assertEqual(utils.FormatSeconds(1), "1s")
2389 self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2390 self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2391 self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2392 self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2393 self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2394 self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2395 self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2396 self.assertEqual(utils.FormatSeconds(-1), "-1s")
2397 self.assertEqual(utils.FormatSeconds(-282), "-282s")
2398 self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2400 def testFloat(self):
2401 self.assertEqual(utils.FormatSeconds(1.3), "1s")
2402 self.assertEqual(utils.FormatSeconds(1.9), "2s")
2403 self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2404 self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2407 class RunIgnoreProcessNotFound(unittest.TestCase):
2410 os.write(fd, str(os.getpid()))
2415 (pid_read_fd, pid_write_fd) = os.pipe()
2417 # Start short-lived process which writes its PID to pipe
2418 self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2419 os.close(pid_write_fd)
2421 # Read PID from pipe
2422 pid = int(os.read(pid_read_fd, 1024))
2423 os.close(pid_read_fd)
2425 # Try to send signal to process which exited recently
2426 self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2429 if __name__ == '__main__':
2430 testutils.GanetiTestProgram()