4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
40 import distutils.version
46 from ganeti import constants
47 from ganeti import compat
48 from ganeti import utils
49 from ganeti import errors
50 from ganeti import serializer
51 from ganeti.utils import IsProcessAlive, RunCmd, \
52 RemoveFile, MatchNameComponent, FormatUnit, \
53 ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
54 ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
55 SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
56 TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
57 UnescapeAndSplit, RunParts, PathJoin, HostInfo, ReadOneLineFile
59 from ganeti.errors import LockError, UnitParseError, GenericError, \
60 ProgrammerError, OpPrereqError
63 class TestIsProcessAlive(unittest.TestCase):
64 """Testing case for IsProcessAlive"""
68 self.assert_(IsProcessAlive(mypid),
69 "can't find myself running")
71 def testNotExisting(self):
72 pid_non_existing = os.fork()
73 if pid_non_existing == 0:
75 elif pid_non_existing < 0:
76 raise SystemError("can't fork")
77 os.waitpid(pid_non_existing, 0)
78 self.assertFalse(IsProcessAlive(pid_non_existing),
79 "nonexisting process detected")
82 class TestGetProcStatusPath(unittest.TestCase):
84 self.assert_("/1234/" in utils._GetProcStatusPath(1234))
85 self.assertNotEqual(utils._GetProcStatusPath(1),
86 utils._GetProcStatusPath(2))
89 class TestIsProcessHandlingSignal(unittest.TestCase):
91 self.tmpdir = tempfile.mkdtemp()
94 shutil.rmtree(self.tmpdir)
96 def testParseSigsetT(self):
97 self.assertEqual(len(utils._ParseSigsetT("0")), 0)
98 self.assertEqual(utils._ParseSigsetT("1"), set([1]))
99 self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
100 self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
101 self.assertEqual(utils._ParseSigsetT("0000000180000202"),
102 set([2, 10, 32, 33]))
103 self.assertEqual(utils._ParseSigsetT("0000000180000002"),
105 self.assertEqual(utils._ParseSigsetT("0000000188000002"),
106 set([2, 28, 32, 33]))
107 self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
108 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
109 24, 25, 26, 28, 31]))
110 self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
112 def testGetProcStatusField(self):
113 for field in ["SigCgt", "Name", "FDSize"]:
114 for value in ["", "0", "cat", " 1234 KB"]:
115 pstatus = "\n".join([
117 "%s: %s" % (field, value),
120 result = utils._GetProcStatusField(pstatus, field)
121 self.assertEqual(result, value.strip())
124 sp = utils.PathJoin(self.tmpdir, "status")
126 utils.WriteFile(sp, data="\n".join([
128 "State: S (sleeping)",
133 "SigBlk: 0000000000010000",
134 "SigIgn: 0000000000384004",
135 "SigCgt: 000000004b813efb",
136 "CapEff: 0000000000000000",
139 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
141 def testNoSigCgt(self):
142 sp = utils.PathJoin(self.tmpdir, "status")
144 utils.WriteFile(sp, data="\n".join([
148 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
149 1234, 10, status_path=sp)
151 def testNoSuchFile(self):
152 sp = utils.PathJoin(self.tmpdir, "notexist")
154 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
157 def _TestRealProcess():
158 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
159 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
160 raise Exception("SIGUSR1 is handled when it should not be")
162 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
163 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
164 raise Exception("SIGUSR1 is not handled when it should be")
166 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
167 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
168 raise Exception("SIGUSR1 is not handled when it should be")
170 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
171 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
172 raise Exception("SIGUSR1 is handled when it should not be")
176 def testRealProcess(self):
177 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
180 class TestPidFileFunctions(unittest.TestCase):
181 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
184 self.dir = tempfile.mkdtemp()
185 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
186 utils.DaemonPidFileName = self.f_dpn
188 def testPidFileFunctions(self):
189 pid_file = self.f_dpn('test')
190 utils.WritePidFile('test')
191 self.failUnless(os.path.exists(pid_file),
192 "PID file should have been created")
193 read_pid = utils.ReadPidFile(pid_file)
194 self.failUnlessEqual(read_pid, os.getpid())
195 self.failUnless(utils.IsProcessAlive(read_pid))
196 self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
197 utils.RemovePidFile('test')
198 self.failIf(os.path.exists(pid_file),
199 "PID file should not exist anymore")
200 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
201 "ReadPidFile should return 0 for missing pid file")
202 fh = open(pid_file, "w")
205 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
206 "ReadPidFile should return 0 for invalid pid file")
207 utils.RemovePidFile('test')
208 self.failIf(os.path.exists(pid_file),
209 "PID file should not exist anymore")
212 pid_file = self.f_dpn('child')
213 r_fd, w_fd = os.pipe()
215 if new_pid == 0: #child
216 utils.WritePidFile('child')
221 # else we are in the parent
222 # wait until the child has written the pid file
224 read_pid = utils.ReadPidFile(pid_file)
225 self.failUnlessEqual(read_pid, new_pid)
226 self.failUnless(utils.IsProcessAlive(new_pid))
227 utils.KillProcess(new_pid, waitpid=True)
228 self.failIf(utils.IsProcessAlive(new_pid))
229 utils.RemovePidFile('child')
230 self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
233 for name in os.listdir(self.dir):
234 os.unlink(os.path.join(self.dir, name))
238 class TestRunCmd(testutils.GanetiTestCase):
239 """Testing case for the RunCmd function"""
242 testutils.GanetiTestCase.setUp(self)
243 self.magic = time.ctime() + " ganeti test"
244 self.fname = self._CreateTempFile()
247 """Test successful exit code"""
248 result = RunCmd("/bin/sh -c 'exit 0'")
249 self.assertEqual(result.exit_code, 0)
250 self.assertEqual(result.output, "")
253 """Test fail exit code"""
254 result = RunCmd("/bin/sh -c 'exit 1'")
255 self.assertEqual(result.exit_code, 1)
256 self.assertEqual(result.output, "")
258 def testStdout(self):
259 """Test standard output"""
260 cmd = 'echo -n "%s"' % self.magic
261 result = RunCmd("/bin/sh -c '%s'" % cmd)
262 self.assertEqual(result.stdout, self.magic)
263 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
264 self.assertEqual(result.output, "")
265 self.assertFileContent(self.fname, self.magic)
267 def testStderr(self):
268 """Test standard error"""
269 cmd = 'echo -n "%s"' % self.magic
270 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
271 self.assertEqual(result.stderr, self.magic)
272 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
273 self.assertEqual(result.output, "")
274 self.assertFileContent(self.fname, self.magic)
276 def testCombined(self):
277 """Test combined output"""
278 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
279 expected = "A" + self.magic + "B" + self.magic
280 result = RunCmd("/bin/sh -c '%s'" % cmd)
281 self.assertEqual(result.output, expected)
282 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
283 self.assertEqual(result.output, "")
284 self.assertFileContent(self.fname, expected)
286 def testSignal(self):
288 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
289 self.assertEqual(result.signal, 15)
290 self.assertEqual(result.output, "")
292 def testListRun(self):
294 result = RunCmd(["true"])
295 self.assertEqual(result.signal, None)
296 self.assertEqual(result.exit_code, 0)
297 result = RunCmd(["/bin/sh", "-c", "exit 1"])
298 self.assertEqual(result.signal, None)
299 self.assertEqual(result.exit_code, 1)
300 result = RunCmd(["echo", "-n", self.magic])
301 self.assertEqual(result.signal, None)
302 self.assertEqual(result.exit_code, 0)
303 self.assertEqual(result.stdout, self.magic)
305 def testFileEmptyOutput(self):
306 """Test file output"""
307 result = RunCmd(["true"], output=self.fname)
308 self.assertEqual(result.signal, None)
309 self.assertEqual(result.exit_code, 0)
310 self.assertFileContent(self.fname, "")
313 """Test locale environment"""
314 old_env = os.environ.copy()
316 os.environ["LANG"] = "en_US.UTF-8"
317 os.environ["LC_ALL"] = "en_US.UTF-8"
318 result = RunCmd(["locale"])
319 for line in result.output.splitlines():
320 key, value = line.split("=", 1)
321 # Ignore these variables, they're overridden by LC_ALL
322 if key == "LANG" or key == "LANGUAGE":
324 self.failIf(value and value != "C" and value != '"C"',
325 "Variable %s is set to the invalid value '%s'" % (key, value))
329 def testDefaultCwd(self):
330 """Test default working directory"""
331 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
334 """Test default working directory"""
335 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
336 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
338 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
340 def testResetEnv(self):
341 """Test environment reset functionality"""
342 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
343 self.failUnlessEqual(RunCmd(["env"], reset_env=True,
344 env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
347 class TestRunParts(unittest.TestCase):
348 """Testing case for the RunParts function"""
351 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
354 shutil.rmtree(self.rundir)
357 """Test on an empty dir"""
358 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
360 def testSkipWrongName(self):
361 """Test that wrong files are skipped"""
362 fname = os.path.join(self.rundir, "00test.dot")
363 utils.WriteFile(fname, data="")
364 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
365 relname = os.path.basename(fname)
366 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
367 [(relname, constants.RUNPARTS_SKIP, None)])
369 def testSkipNonExec(self):
370 """Test that non executable files are skipped"""
371 fname = os.path.join(self.rundir, "00test")
372 utils.WriteFile(fname, data="")
373 relname = os.path.basename(fname)
374 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
375 [(relname, constants.RUNPARTS_SKIP, None)])
378 """Test error on a broken executable"""
379 fname = os.path.join(self.rundir, "00test")
380 utils.WriteFile(fname, data="")
381 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
382 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
383 self.failUnlessEqual(relname, os.path.basename(fname))
384 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
385 self.failUnless(error)
387 def testSorted(self):
388 """Test executions are sorted"""
390 files.append(os.path.join(self.rundir, "64test"))
391 files.append(os.path.join(self.rundir, "00test"))
392 files.append(os.path.join(self.rundir, "42test"))
395 utils.WriteFile(fname, data="")
397 results = RunParts(self.rundir, reset_env=True)
399 for fname in sorted(files):
400 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
403 """Test correct execution"""
404 fname = os.path.join(self.rundir, "00test")
405 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
406 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
407 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
408 self.failUnlessEqual(relname, os.path.basename(fname))
409 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
410 self.failUnlessEqual(runresult.stdout, "ciao")
412 def testRunFail(self):
413 """Test correct execution, with run failure"""
414 fname = os.path.join(self.rundir, "00test")
415 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
416 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
417 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
418 self.failUnlessEqual(relname, os.path.basename(fname))
419 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
420 self.failUnlessEqual(runresult.exit_code, 1)
421 self.failUnless(runresult.failed)
423 def testRunMix(self):
425 files.append(os.path.join(self.rundir, "00test"))
426 files.append(os.path.join(self.rundir, "42test"))
427 files.append(os.path.join(self.rundir, "64test"))
428 files.append(os.path.join(self.rundir, "99test"))
432 # 1st has errors in execution
433 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
434 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
437 utils.WriteFile(files[1], data="")
439 # 3rd cannot execute properly
440 utils.WriteFile(files[2], data="")
441 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
444 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
445 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
447 results = RunParts(self.rundir, reset_env=True)
449 (relname, status, runresult) = results[0]
450 self.failUnlessEqual(relname, os.path.basename(files[0]))
451 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
452 self.failUnlessEqual(runresult.exit_code, 1)
453 self.failUnless(runresult.failed)
455 (relname, status, runresult) = results[1]
456 self.failUnlessEqual(relname, os.path.basename(files[1]))
457 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
458 self.failUnlessEqual(runresult, None)
460 (relname, status, runresult) = results[2]
461 self.failUnlessEqual(relname, os.path.basename(files[2]))
462 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
463 self.failUnless(runresult)
465 (relname, status, runresult) = results[3]
466 self.failUnlessEqual(relname, os.path.basename(files[3]))
467 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
468 self.failUnlessEqual(runresult.output, "ciao")
469 self.failUnlessEqual(runresult.exit_code, 0)
470 self.failUnless(not runresult.failed)
473 class TestStartDaemon(testutils.GanetiTestCase):
475 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
476 self.tmpfile = os.path.join(self.tmpdir, "test")
479 shutil.rmtree(self.tmpdir)
482 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
483 self._wait(self.tmpfile, 60.0, "Hello World")
485 def testShellOutput(self):
486 utils.StartDaemon("echo Hello World", output=self.tmpfile)
487 self._wait(self.tmpfile, 60.0, "Hello World")
489 def testNoShellNoOutput(self):
490 utils.StartDaemon(["pwd"])
492 def testNoShellNoOutputTouch(self):
493 testfile = os.path.join(self.tmpdir, "check")
494 self.failIf(os.path.exists(testfile))
495 utils.StartDaemon(["touch", testfile])
496 self._wait(testfile, 60.0, "")
498 def testNoShellOutput(self):
499 utils.StartDaemon(["pwd"], output=self.tmpfile)
500 self._wait(self.tmpfile, 60.0, "/")
502 def testNoShellOutputCwd(self):
503 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
504 self._wait(self.tmpfile, 60.0, os.getcwd())
506 def testShellEnv(self):
507 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
508 env={ "GNT_TEST_VAR": "Hello World", })
509 self._wait(self.tmpfile, 60.0, "Hello World")
511 def testNoShellEnv(self):
512 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
513 env={ "GNT_TEST_VAR": "Hello World", })
514 self._wait(self.tmpfile, 60.0, "Hello World")
516 def testOutputFd(self):
517 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
519 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
522 self._wait(self.tmpfile, 60.0, os.getcwd())
525 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
526 self._wait(self.tmpfile, 60.0, str(pid))
528 def testPidFile(self):
529 pidfile = os.path.join(self.tmpdir, "pid")
530 checkfile = os.path.join(self.tmpdir, "abort")
532 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
535 fd = os.open(pidfile, os.O_RDONLY)
537 # Check file is locked
538 self.assertRaises(errors.LockError, utils.LockFile, fd)
540 pidtext = os.read(fd, 100)
544 self.assertEqual(int(pidtext.strip()), pid)
546 self.assert_(utils.IsProcessAlive(pid))
548 # No matter what happens, kill daemon
549 utils.KillProcess(pid, timeout=5.0, waitpid=False)
550 self.failIf(utils.IsProcessAlive(pid))
552 self.assertEqual(utils.ReadFile(self.tmpfile), "")
554 def _wait(self, path, timeout, expected):
555 # Due to the asynchronous nature of daemon processes, polling is necessary.
556 # A timeout makes sure the test doesn't hang forever.
558 if not (os.path.isfile(path) and
559 utils.ReadFile(path).strip() == expected):
560 raise utils.RetryAgain()
563 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
564 except utils.RetryTimeout:
565 self.fail("Apparently the daemon didn't run in %s seconds and/or"
566 " didn't write the correct output" % timeout)
569 self.assertRaises(errors.OpExecError, utils.StartDaemon,
570 ["./does-NOT-EXIST/here/0123456789"])
571 self.assertRaises(errors.OpExecError, utils.StartDaemon,
572 ["./does-NOT-EXIST/here/0123456789"],
573 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
574 self.assertRaises(errors.OpExecError, utils.StartDaemon,
575 ["./does-NOT-EXIST/here/0123456789"],
576 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
577 self.assertRaises(errors.OpExecError, utils.StartDaemon,
578 ["./does-NOT-EXIST/here/0123456789"],
579 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
581 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
583 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
584 ["./does-NOT-EXIST/here/0123456789"],
585 output=self.tmpfile, output_fd=fd)
590 class TestSetCloseOnExecFlag(unittest.TestCase):
591 """Tests for SetCloseOnExecFlag"""
594 self.tmpfile = tempfile.TemporaryFile()
596 def testEnable(self):
597 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
598 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
601 def testDisable(self):
602 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
603 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
607 class TestSetNonblockFlag(unittest.TestCase):
609 self.tmpfile = tempfile.TemporaryFile()
611 def testEnable(self):
612 utils.SetNonblockFlag(self.tmpfile.fileno(), True)
613 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
616 def testDisable(self):
617 utils.SetNonblockFlag(self.tmpfile.fileno(), False)
618 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
622 class TestRemoveFile(unittest.TestCase):
623 """Test case for the RemoveFile function"""
626 """Create a temp dir and file for each case"""
627 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
628 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
632 if os.path.exists(self.tmpfile):
633 os.unlink(self.tmpfile)
634 os.rmdir(self.tmpdir)
636 def testIgnoreDirs(self):
637 """Test that RemoveFile() ignores directories"""
638 self.assertEqual(None, RemoveFile(self.tmpdir))
640 def testIgnoreNotExisting(self):
641 """Test that RemoveFile() ignores non-existing files"""
642 RemoveFile(self.tmpfile)
643 RemoveFile(self.tmpfile)
645 def testRemoveFile(self):
646 """Test that RemoveFile does remove a file"""
647 RemoveFile(self.tmpfile)
648 if os.path.exists(self.tmpfile):
649 self.fail("File '%s' not removed" % self.tmpfile)
651 def testRemoveSymlink(self):
652 """Test that RemoveFile does remove symlinks"""
653 symlink = self.tmpdir + "/symlink"
654 os.symlink("no-such-file", symlink)
656 if os.path.exists(symlink):
657 self.fail("File '%s' not removed" % symlink)
658 os.symlink(self.tmpfile, symlink)
660 if os.path.exists(symlink):
661 self.fail("File '%s' not removed" % symlink)
664 class TestRename(unittest.TestCase):
665 """Test case for RenameFile"""
668 """Create a temporary directory"""
669 self.tmpdir = tempfile.mkdtemp()
670 self.tmpfile = os.path.join(self.tmpdir, "test1")
673 open(self.tmpfile, "w").close()
676 """Remove temporary directory"""
677 shutil.rmtree(self.tmpdir)
679 def testSimpleRename1(self):
680 """Simple rename 1"""
681 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
682 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
684 def testSimpleRename2(self):
685 """Simple rename 2"""
686 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
688 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
690 def testRenameMkdir(self):
691 """Rename with mkdir"""
692 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
694 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
695 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
697 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
698 os.path.join(self.tmpdir, "test/foo/bar/baz"),
700 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
701 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
702 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
705 class TestMatchNameComponent(unittest.TestCase):
706 """Test case for the MatchNameComponent function"""
708 def testEmptyList(self):
709 """Test that there is no match against an empty list"""
711 self.failUnlessEqual(MatchNameComponent("", []), None)
712 self.failUnlessEqual(MatchNameComponent("test", []), None)
714 def testSingleMatch(self):
715 """Test that a single match is performed correctly"""
716 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
717 for key in "test2", "test2.example", "test2.example.com":
718 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
720 def testMultipleMatches(self):
721 """Test that a multiple match is returned as None"""
722 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
723 for key in "test1", "test1.example":
724 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
726 def testFullMatch(self):
727 """Test that a full match is returned correctly"""
729 key2 = "test1.example"
730 mlist = [key2, key2 + ".com"]
731 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
732 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
734 def testCaseInsensitivePartialMatch(self):
735 """Test for the case_insensitive keyword"""
736 mlist = ["test1.example.com", "test2.example.net"]
737 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
739 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
741 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
743 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
747 def testCaseInsensitiveFullMatch(self):
748 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
749 # Between the two ts1 a full string match non-case insensitive should work
750 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
752 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
754 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
756 # Between the two ts2 only case differs, so only case-match works
757 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
759 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
761 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
765 class TestReadFile(testutils.GanetiTestCase):
767 def testReadAll(self):
768 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
769 self.assertEqual(len(data), 814)
771 h = compat.md5_hash()
773 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
775 def testReadSize(self):
776 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
778 self.assertEqual(len(data), 100)
780 h = compat.md5_hash()
782 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
785 self.assertRaises(EnvironmentError, utils.ReadFile,
786 "/dev/null/does-not-exist")
789 class TestReadOneLineFile(testutils.GanetiTestCase):
792 testutils.GanetiTestCase.setUp(self)
794 def testDefault(self):
795 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
796 self.assertEqual(len(data), 27)
797 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
799 def testNotStrict(self):
800 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
801 self.assertEqual(len(data), 27)
802 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
804 def testStrictFailure(self):
805 self.assertRaises(errors.GenericError, ReadOneLineFile,
806 self._TestDataFilename("cert1.pem"), strict=True)
808 def testLongLine(self):
809 dummydata = (1024 * "Hello World! ")
810 myfile = self._CreateTempFile()
811 utils.WriteFile(myfile, data=dummydata)
812 datastrict = ReadOneLineFile(myfile, strict=True)
813 datalax = ReadOneLineFile(myfile, strict=False)
814 self.assertEqual(dummydata, datastrict)
815 self.assertEqual(dummydata, datalax)
817 def testNewline(self):
818 myfile = self._CreateTempFile()
820 for nl in ["", "\n", "\r\n"]:
821 dummydata = "%s%s" % (myline, nl)
822 utils.WriteFile(myfile, data=dummydata)
823 datalax = ReadOneLineFile(myfile, strict=False)
824 self.assertEqual(myline, datalax)
825 datastrict = ReadOneLineFile(myfile, strict=True)
826 self.assertEqual(myline, datastrict)
828 def testWhitespaceAndMultipleLines(self):
829 myfile = self._CreateTempFile()
830 for nl in ["", "\n", "\r\n"]:
831 for ws in [" ", "\t", "\t\t \t", "\t "]:
832 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
833 utils.WriteFile(myfile, data=dummydata)
834 datalax = ReadOneLineFile(myfile, strict=False)
836 self.assert_(set("\r\n") & set(dummydata))
837 self.assertRaises(errors.GenericError, ReadOneLineFile,
839 explen = len("Foo bar baz ") + len(ws)
840 self.assertEqual(len(datalax), explen)
841 self.assertEqual(datalax, dummydata[:explen])
842 self.assertFalse(set("\r\n") & set(datalax))
844 datastrict = ReadOneLineFile(myfile, strict=True)
845 self.assertEqual(dummydata, datastrict)
846 self.assertEqual(dummydata, datalax)
848 def testEmptylines(self):
849 myfile = self._CreateTempFile()
851 for nl in ["\n", "\r\n"]:
852 for ol in ["", "otherline"]:
853 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
854 utils.WriteFile(myfile, data=dummydata)
855 self.assert_(set("\r\n") & set(dummydata))
856 datalax = ReadOneLineFile(myfile, strict=False)
857 self.assertEqual(myline, datalax)
859 self.assertRaises(errors.GenericError, ReadOneLineFile,
862 datastrict = ReadOneLineFile(myfile, strict=True)
863 self.assertEqual(myline, datastrict)
866 class TestTimestampForFilename(unittest.TestCase):
868 self.assert_("." not in utils.TimestampForFilename())
869 self.assert_(":" not in utils.TimestampForFilename())
872 class TestCreateBackup(testutils.GanetiTestCase):
874 testutils.GanetiTestCase.setUp(self)
876 self.tmpdir = tempfile.mkdtemp()
879 testutils.GanetiTestCase.tearDown(self)
881 shutil.rmtree(self.tmpdir)
884 filename = utils.PathJoin(self.tmpdir, "config.data")
885 utils.WriteFile(filename, data="")
886 bname = utils.CreateBackup(filename)
887 self.assertFileContent(bname, "")
888 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
889 utils.CreateBackup(filename)
890 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
891 utils.CreateBackup(filename)
892 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
894 fifoname = utils.PathJoin(self.tmpdir, "fifo")
896 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
898 def testContent(self):
900 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
901 for rep in [1, 2, 10, 127]:
902 testdata = data * rep
904 filename = utils.PathJoin(self.tmpdir, "test.data_")
905 utils.WriteFile(filename, data=testdata)
906 self.assertFileContent(filename, testdata)
909 bname = utils.CreateBackup(filename)
911 self.assertFileContent(bname, testdata)
912 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
915 class TestFormatUnit(unittest.TestCase):
916 """Test case for the FormatUnit function"""
919 self.assertEqual(FormatUnit(1, 'h'), '1M')
920 self.assertEqual(FormatUnit(100, 'h'), '100M')
921 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
923 self.assertEqual(FormatUnit(1, 'm'), '1')
924 self.assertEqual(FormatUnit(100, 'm'), '100')
925 self.assertEqual(FormatUnit(1023, 'm'), '1023')
927 self.assertEqual(FormatUnit(1024, 'm'), '1024')
928 self.assertEqual(FormatUnit(1536, 'm'), '1536')
929 self.assertEqual(FormatUnit(17133, 'm'), '17133')
930 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
933 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
934 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
935 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
936 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
938 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
939 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
940 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
941 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
943 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
944 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
945 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
948 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
949 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
950 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
952 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
953 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
954 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
956 class TestParseUnit(unittest.TestCase):
957 """Test case for the ParseUnit function"""
960 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
961 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
962 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
964 def testRounding(self):
965 self.assertEqual(ParseUnit('0'), 0)
966 self.assertEqual(ParseUnit('1'), 4)
967 self.assertEqual(ParseUnit('2'), 4)
968 self.assertEqual(ParseUnit('3'), 4)
970 self.assertEqual(ParseUnit('124'), 124)
971 self.assertEqual(ParseUnit('125'), 128)
972 self.assertEqual(ParseUnit('126'), 128)
973 self.assertEqual(ParseUnit('127'), 128)
974 self.assertEqual(ParseUnit('128'), 128)
975 self.assertEqual(ParseUnit('129'), 132)
976 self.assertEqual(ParseUnit('130'), 132)
978 def testFloating(self):
979 self.assertEqual(ParseUnit('0'), 0)
980 self.assertEqual(ParseUnit('0.5'), 4)
981 self.assertEqual(ParseUnit('1.75'), 4)
982 self.assertEqual(ParseUnit('1.99'), 4)
983 self.assertEqual(ParseUnit('2.00'), 4)
984 self.assertEqual(ParseUnit('2.01'), 4)
985 self.assertEqual(ParseUnit('3.99'), 4)
986 self.assertEqual(ParseUnit('4.00'), 4)
987 self.assertEqual(ParseUnit('4.01'), 8)
988 self.assertEqual(ParseUnit('1.5G'), 1536)
989 self.assertEqual(ParseUnit('1.8G'), 1844)
990 self.assertEqual(ParseUnit('8.28T'), 8682212)
992 def testSuffixes(self):
993 for sep in ('', ' ', ' ', "\t", "\t "):
994 for suffix, scale in TestParseUnit.SCALES:
995 for func in (lambda x: x, str.lower, str.upper):
996 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
999 def testInvalidInput(self):
1000 for sep in ('-', '_', ',', 'a'):
1001 for suffix, _ in TestParseUnit.SCALES:
1002 self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
1004 for suffix, _ in TestParseUnit.SCALES:
1005 self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
1008 class TestSshKeys(testutils.GanetiTestCase):
1009 """Test case for the AddAuthorizedKey function"""
1011 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1012 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
1013 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1016 testutils.GanetiTestCase.setUp(self)
1017 self.tmpname = self._CreateTempFile()
1018 handle = open(self.tmpname, 'w')
1020 handle.write("%s\n" % TestSshKeys.KEY_A)
1021 handle.write("%s\n" % TestSshKeys.KEY_B)
1025 def testAddingNewKey(self):
1026 AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1028 self.assertFileContent(self.tmpname,
1029 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1030 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1031 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1032 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1034 def testAddingAlmostButNotCompletelyTheSameKey(self):
1035 AddAuthorizedKey(self.tmpname,
1036 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1038 self.assertFileContent(self.tmpname,
1039 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1040 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1041 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1042 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1044 def testAddingExistingKeyWithSomeMoreSpaces(self):
1045 AddAuthorizedKey(self.tmpname,
1046 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1048 self.assertFileContent(self.tmpname,
1049 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1050 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1051 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1053 def testRemovingExistingKeyWithSomeMoreSpaces(self):
1054 RemoveAuthorizedKey(self.tmpname,
1055 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1057 self.assertFileContent(self.tmpname,
1058 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1059 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1061 def testRemovingNonExistingKey(self):
1062 RemoveAuthorizedKey(self.tmpname,
1063 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
1065 self.assertFileContent(self.tmpname,
1066 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1067 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1068 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1071 class TestEtcHosts(testutils.GanetiTestCase):
1072 """Test functions modifying /etc/hosts"""
1075 testutils.GanetiTestCase.setUp(self)
1076 self.tmpname = self._CreateTempFile()
1077 handle = open(self.tmpname, 'w')
1079 handle.write('# This is a test file for /etc/hosts\n')
1080 handle.write('127.0.0.1\tlocalhost\n')
1081 handle.write('192.168.1.1 router gw\n')
1085 def testSettingNewIp(self):
1086 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
1088 self.assertFileContent(self.tmpname,
1089 "# This is a test file for /etc/hosts\n"
1090 "127.0.0.1\tlocalhost\n"
1091 "192.168.1.1 router gw\n"
1092 "1.2.3.4\tmyhost.domain.tld myhost\n")
1093 self.assertFileMode(self.tmpname, 0644)
1095 def testSettingExistingIp(self):
1096 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
1099 self.assertFileContent(self.tmpname,
1100 "# This is a test file for /etc/hosts\n"
1101 "127.0.0.1\tlocalhost\n"
1102 "192.168.1.1\tmyhost.domain.tld myhost\n")
1103 self.assertFileMode(self.tmpname, 0644)
1105 def testSettingDuplicateName(self):
1106 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
1108 self.assertFileContent(self.tmpname,
1109 "# This is a test file for /etc/hosts\n"
1110 "127.0.0.1\tlocalhost\n"
1111 "192.168.1.1 router gw\n"
1112 "1.2.3.4\tmyhost\n")
1113 self.assertFileMode(self.tmpname, 0644)
1115 def testRemovingExistingHost(self):
1116 RemoveEtcHostsEntry(self.tmpname, 'router')
1118 self.assertFileContent(self.tmpname,
1119 "# This is a test file for /etc/hosts\n"
1120 "127.0.0.1\tlocalhost\n"
1122 self.assertFileMode(self.tmpname, 0644)
1124 def testRemovingSingleExistingHost(self):
1125 RemoveEtcHostsEntry(self.tmpname, 'localhost')
1127 self.assertFileContent(self.tmpname,
1128 "# This is a test file for /etc/hosts\n"
1129 "192.168.1.1 router gw\n")
1130 self.assertFileMode(self.tmpname, 0644)
1132 def testRemovingNonExistingHost(self):
1133 RemoveEtcHostsEntry(self.tmpname, 'myhost')
1135 self.assertFileContent(self.tmpname,
1136 "# This is a test file for /etc/hosts\n"
1137 "127.0.0.1\tlocalhost\n"
1138 "192.168.1.1 router gw\n")
1139 self.assertFileMode(self.tmpname, 0644)
1141 def testRemovingAlias(self):
1142 RemoveEtcHostsEntry(self.tmpname, 'gw')
1144 self.assertFileContent(self.tmpname,
1145 "# This is a test file for /etc/hosts\n"
1146 "127.0.0.1\tlocalhost\n"
1147 "192.168.1.1 router\n")
1148 self.assertFileMode(self.tmpname, 0644)
1151 class TestGetMounts(unittest.TestCase):
1152 """Test case for GetMounts()."""
1155 "rootfs / rootfs rw 0 0\n"
1156 "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
1157 "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
1160 self.tmpfile = tempfile.NamedTemporaryFile()
1161 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1163 def testGetMounts(self):
1164 self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1166 ("rootfs", "/", "rootfs", "rw"),
1167 ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1168 ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1172 class TestShellQuoting(unittest.TestCase):
1173 """Test case for shell quoting functions"""
1175 def testShellQuote(self):
1176 self.assertEqual(ShellQuote('abc'), "abc")
1177 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1178 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1179 self.assertEqual(ShellQuote("a b c"), "'a b c'")
1180 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1182 def testShellQuoteArgs(self):
1183 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1184 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1185 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1188 class _BaseTcpPingTest:
1189 """Base class for TcpPing tests against listen(2)ing port"""
1194 self.listener = socket.socket(self.family, socket.SOCK_STREAM)
1195 self.listener.bind((self.address, 0))
1196 self.listenerport = self.listener.getsockname()[1]
1197 self.listener.listen(1)
1200 self.listener.shutdown(socket.SHUT_RDWR)
1202 del self.listenerport
1204 def testTcpPingToLocalHostAccept(self):
1205 self.assert_(TcpPing(self.address,
1207 timeout=constants.TCP_PING_TIMEOUT,
1208 live_port_needed=True,
1209 source=self.address,
1211 "failed to connect to test listener")
1213 self.assert_(TcpPing(self.address, self.listenerport,
1214 timeout=constants.TCP_PING_TIMEOUT,
1215 live_port_needed=True),
1216 "failed to connect to test listener (no source)")
1219 class TestIP4TcpPing(unittest.TestCase, _BaseTcpPingTest):
1220 """Testcase for IPv4 TCP version of ping - against listen(2)ing port"""
1221 family = socket.AF_INET
1222 address = constants.IP4_ADDRESS_LOCALHOST
1225 unittest.TestCase.setUp(self)
1226 _BaseTcpPingTest.setUp(self)
1229 unittest.TestCase.tearDown(self)
1230 _BaseTcpPingTest.tearDown(self)
1233 class TestIP6TcpPing(unittest.TestCase, _BaseTcpPingTest):
1234 """Testcase for IPv6 TCP version of ping - against listen(2)ing port"""
1235 family = socket.AF_INET6
1236 address = constants.IP6_ADDRESS_LOCALHOST
1239 unittest.TestCase.setUp(self)
1240 _BaseTcpPingTest.setUp(self)
1243 unittest.TestCase.tearDown(self)
1244 _BaseTcpPingTest.tearDown(self)
1247 class _BaseTcpPingDeafTest:
1248 """Base class for TcpPing tests against non listen(2)ing port"""
1253 self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
1254 self.deaflistener.bind((self.address, 0))
1255 self.deaflistenerport = self.deaflistener.getsockname()[1]
1258 del self.deaflistener
1259 del self.deaflistenerport
1261 def testTcpPingToLocalHostAcceptDeaf(self):
1262 self.assertFalse(TcpPing(self.address,
1263 self.deaflistenerport,
1264 timeout=constants.TCP_PING_TIMEOUT,
1265 live_port_needed=True,
1266 source=self.address,
1267 ), # need successful connect(2)
1268 "successfully connected to deaf listener")
1270 self.assertFalse(TcpPing(self.address,
1271 self.deaflistenerport,
1272 timeout=constants.TCP_PING_TIMEOUT,
1273 live_port_needed=True,
1274 ), # need successful connect(2)
1275 "successfully connected to deaf listener (no source)")
1277 def testTcpPingToLocalHostNoAccept(self):
1278 self.assert_(TcpPing(self.address,
1279 self.deaflistenerport,
1280 timeout=constants.TCP_PING_TIMEOUT,
1281 live_port_needed=False,
1282 source=self.address,
1283 ), # ECONNREFUSED is OK
1284 "failed to ping alive host on deaf port")
1286 self.assert_(TcpPing(self.address,
1287 self.deaflistenerport,
1288 timeout=constants.TCP_PING_TIMEOUT,
1289 live_port_needed=False,
1290 ), # ECONNREFUSED is OK
1291 "failed to ping alive host on deaf port (no source)")
1294 class TestIP4TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
1295 """Testcase for IPv4 TCP version of ping - against non listen(2)ing port"""
1296 family = socket.AF_INET
1297 address = constants.IP4_ADDRESS_LOCALHOST
1300 self.deaflistener = socket.socket(self.family, socket.SOCK_STREAM)
1301 self.deaflistener.bind((self.address, 0))
1302 self.deaflistenerport = self.deaflistener.getsockname()[1]
1305 del self.deaflistener
1306 del self.deaflistenerport
1309 class TestIP6TcpPingDeaf(unittest.TestCase, _BaseTcpPingDeafTest):
1310 """Testcase for IPv6 TCP version of ping - against non listen(2)ing port"""
1311 family = socket.AF_INET6
1312 address = constants.IP6_ADDRESS_LOCALHOST
1315 unittest.TestCase.setUp(self)
1316 _BaseTcpPingDeafTest.setUp(self)
1319 unittest.TestCase.tearDown(self)
1320 _BaseTcpPingDeafTest.tearDown(self)
1323 class TestOwnIpAddress(unittest.TestCase):
1324 """Testcase for OwnIpAddress"""
1326 def testOwnLoopback(self):
1327 """check having the loopback ip"""
1328 self.failUnless(OwnIpAddress(constants.IP4_ADDRESS_LOCALHOST),
1329 "Should own the loopback address")
1331 def testNowOwnAddress(self):
1332 """check that I don't own an address"""
1334 # Network 192.0.2.0/24 is reserved for test/documentation as per
1335 # RFC 5735, so we *should* not have an address of this range... if
1336 # this fails, we should extend the test to multiple addresses
1337 DST_IP = "192.0.2.1"
1338 self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
1341 def _GetSocketCredentials(path):
1342 """Connect to a Unix socket and return remote credentials.
1345 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1349 return utils.GetSocketCredentials(sock)
1354 class TestGetSocketCredentials(unittest.TestCase):
1356 self.tmpdir = tempfile.mkdtemp()
1357 self.sockpath = utils.PathJoin(self.tmpdir, "sock")
1359 self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1360 self.listener.settimeout(10)
1361 self.listener.bind(self.sockpath)
1362 self.listener.listen(1)
1365 self.listener.shutdown(socket.SHUT_RDWR)
1366 self.listener.close()
1367 shutil.rmtree(self.tmpdir)
1370 (c2pr, c2pw) = os.pipe()
1372 # Start child process
1376 data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
1378 os.write(c2pw, data)
1387 # Wait for one connection
1388 (conn, _) = self.listener.accept()
1393 result = os.read(c2pr, 4096)
1396 # Check child's exit code
1397 (_, status) = os.waitpid(child, 0)
1398 self.assertFalse(os.WIFSIGNALED(status))
1399 self.assertEqual(os.WEXITSTATUS(status), 0)
1402 (pid, uid, gid) = serializer.LoadJson(result)
1403 self.assertEqual(pid, os.getpid())
1404 self.assertEqual(uid, os.getuid())
1405 self.assertEqual(gid, os.getgid())
1408 class TestListVisibleFiles(unittest.TestCase):
1409 """Test case for ListVisibleFiles"""
1412 self.path = tempfile.mkdtemp()
1415 shutil.rmtree(self.path)
1417 def _CreateFiles(self, files):
1419 utils.WriteFile(os.path.join(self.path, name), data="test")
1421 def _test(self, files, expected):
1422 self._CreateFiles(files)
1423 found = ListVisibleFiles(self.path)
1424 self.assertEqual(set(found), set(expected))
1426 def testAllVisible(self):
1427 files = ["a", "b", "c"]
1429 self._test(files, expected)
1431 def testNoneVisible(self):
1432 files = [".a", ".b", ".c"]
1434 self._test(files, expected)
1436 def testSomeVisible(self):
1437 files = ["a", "b", ".c"]
1438 expected = ["a", "b"]
1439 self._test(files, expected)
1441 def testNonAbsolutePath(self):
1442 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1444 def testNonNormalizedPath(self):
1445 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1449 class TestNewUUID(unittest.TestCase):
1450 """Test case for NewUUID"""
1452 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1453 '[a-f0-9]{4}-[a-f0-9]{12}$')
1456 self.failUnless(self._re_uuid.match(utils.NewUUID()))
1459 class TestUniqueSequence(unittest.TestCase):
1460 """Test case for UniqueSequence"""
1462 def _test(self, input, expected):
1463 self.assertEqual(utils.UniqueSequence(input), expected)
1467 self._test([1, 2, 3], [1, 2, 3])
1468 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1469 self._test([1, 2, 2, 3], [1, 2, 3])
1470 self._test([1, 2, 3, 3], [1, 2, 3])
1473 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1474 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1477 self._test(["a", "a"], ["a"])
1478 self._test(["a", "b"], ["a", "b"])
1479 self._test(["a", "b", "a"], ["a", "b"])
1482 class TestFirstFree(unittest.TestCase):
1483 """Test case for the FirstFree function"""
1486 """Test FirstFree"""
1487 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1488 self.failUnlessEqual(FirstFree([]), None)
1489 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1490 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1491 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1494 class TestTailFile(testutils.GanetiTestCase):
1495 """Test case for the TailFile function"""
1497 def testEmpty(self):
1498 fname = self._CreateTempFile()
1499 self.failUnlessEqual(TailFile(fname), [])
1500 self.failUnlessEqual(TailFile(fname, lines=25), [])
1502 def testAllLines(self):
1503 data = ["test %d" % i for i in range(30)]
1505 fname = self._CreateTempFile()
1506 fd = open(fname, "w")
1507 fd.write("\n".join(data[:i]))
1511 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1513 def testPartialLines(self):
1514 data = ["test %d" % i for i in range(30)]
1515 fname = self._CreateTempFile()
1516 fd = open(fname, "w")
1517 fd.write("\n".join(data))
1520 for i in range(1, 30):
1521 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1523 def testBigFile(self):
1524 data = ["test %d" % i for i in range(30)]
1525 fname = self._CreateTempFile()
1526 fd = open(fname, "w")
1527 fd.write("X" * 1048576)
1529 fd.write("\n".join(data))
1532 for i in range(1, 30):
1533 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1536 class _BaseFileLockTest:
1537 """Test case for the FileLock class"""
1539 def testSharedNonblocking(self):
1540 self.lock.Shared(blocking=False)
1543 def testExclusiveNonblocking(self):
1544 self.lock.Exclusive(blocking=False)
1547 def testUnlockNonblocking(self):
1548 self.lock.Unlock(blocking=False)
1551 def testSharedBlocking(self):
1552 self.lock.Shared(blocking=True)
1555 def testExclusiveBlocking(self):
1556 self.lock.Exclusive(blocking=True)
1559 def testUnlockBlocking(self):
1560 self.lock.Unlock(blocking=True)
1563 def testSharedExclusiveUnlock(self):
1564 self.lock.Shared(blocking=False)
1565 self.lock.Exclusive(blocking=False)
1566 self.lock.Unlock(blocking=False)
1569 def testExclusiveSharedUnlock(self):
1570 self.lock.Exclusive(blocking=False)
1571 self.lock.Shared(blocking=False)
1572 self.lock.Unlock(blocking=False)
1575 def testSimpleTimeout(self):
1576 # These will succeed on the first attempt, hence a short timeout
1577 self.lock.Shared(blocking=True, timeout=10.0)
1578 self.lock.Exclusive(blocking=False, timeout=10.0)
1579 self.lock.Unlock(blocking=True, timeout=10.0)
1583 def _TryLockInner(filename, shared, blocking):
1584 lock = utils.FileLock.Open(filename)
1592 # The timeout doesn't really matter as the parent process waits for us to
1594 fn(blocking=blocking, timeout=0.01)
1595 except errors.LockError, err:
1600 def _TryLock(self, *args):
1601 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1604 def testTimeout(self):
1605 for blocking in [True, False]:
1606 self.lock.Exclusive(blocking=True)
1607 self.failIf(self._TryLock(False, blocking))
1608 self.failIf(self._TryLock(True, blocking))
1610 self.lock.Shared(blocking=True)
1611 self.assert_(self._TryLock(True, blocking))
1612 self.failIf(self._TryLock(False, blocking))
1614 def testCloseShared(self):
1616 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1618 def testCloseExclusive(self):
1620 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1622 def testCloseUnlock(self):
1624 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1627 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1628 TESTDATA = "Hello World\n" * 10
1631 testutils.GanetiTestCase.setUp(self)
1633 self.tmpfile = tempfile.NamedTemporaryFile()
1634 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1635 self.lock = utils.FileLock.Open(self.tmpfile.name)
1637 # Ensure "Open" didn't truncate file
1638 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1641 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1643 testutils.GanetiTestCase.tearDown(self)
1646 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1648 self.tmpfile = tempfile.NamedTemporaryFile()
1649 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1652 class TestTimeFunctions(unittest.TestCase):
1653 """Test case for time functions"""
1656 self.assertEqual(utils.SplitTime(1), (1, 0))
1657 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1658 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1659 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1660 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1661 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1662 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1663 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1665 self.assertRaises(AssertionError, utils.SplitTime, -1)
1667 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1668 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1669 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1671 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1673 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1675 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1676 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1677 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1678 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1679 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1682 class FieldSetTestCase(unittest.TestCase):
1683 """Test case for FieldSets"""
1685 def testSimpleMatch(self):
1686 f = utils.FieldSet("a", "b", "c", "def")
1687 self.failUnless(f.Matches("a"))
1688 self.failIf(f.Matches("d"), "Substring matched")
1689 self.failIf(f.Matches("defghi"), "Prefix string matched")
1690 self.failIf(f.NonMatching(["b", "c"]))
1691 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1692 self.failUnless(f.NonMatching(["a", "d"]))
1694 def testRegexMatch(self):
1695 f = utils.FieldSet("a", "b([0-9]+)", "c")
1696 self.failUnless(f.Matches("b1"))
1697 self.failUnless(f.Matches("b99"))
1698 self.failIf(f.Matches("b/1"))
1699 self.failIf(f.NonMatching(["b12", "c"]))
1700 self.failUnless(f.NonMatching(["a", "1"]))
1702 class TestForceDictType(unittest.TestCase):
1703 """Test case for ForceDictType"""
1707 'a': constants.VTYPE_INT,
1708 'b': constants.VTYPE_BOOL,
1709 'c': constants.VTYPE_STRING,
1710 'd': constants.VTYPE_SIZE,
1713 def _fdt(self, dict, allowed_values=None):
1714 if allowed_values is None:
1715 ForceDictType(dict, self.key_types)
1717 ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1721 def testSimpleDict(self):
1722 self.assertEqual(self._fdt({}), {})
1723 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1724 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1725 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1726 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1727 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1728 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1729 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1730 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1731 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1732 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1733 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1735 def testErrors(self):
1736 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1737 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1738 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1739 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1742 class TestIsNormAbsPath(unittest.TestCase):
1743 """Testing case for IsNormAbsPath"""
1745 def _pathTestHelper(self, path, result):
1747 self.assert_(IsNormAbsPath(path),
1748 "Path %s should result absolute and normalized" % path)
1750 self.assertFalse(IsNormAbsPath(path),
1751 "Path %s should not result absolute and normalized" % path)
1754 self._pathTestHelper('/etc', True)
1755 self._pathTestHelper('/srv', True)
1756 self._pathTestHelper('etc', False)
1757 self._pathTestHelper('/etc/../root', False)
1758 self._pathTestHelper('/etc/', False)
1761 class TestSafeEncode(unittest.TestCase):
1762 """Test case for SafeEncode"""
1764 def testAscii(self):
1765 for txt in [string.digits, string.letters, string.punctuation]:
1766 self.failUnlessEqual(txt, SafeEncode(txt))
1768 def testDoubleEncode(self):
1769 for i in range(255):
1770 txt = SafeEncode(chr(i))
1771 self.failUnlessEqual(txt, SafeEncode(txt))
1773 def testUnicode(self):
1774 # 1024 is high enough to catch non-direct ASCII mappings
1775 for i in range(1024):
1776 txt = SafeEncode(unichr(i))
1777 self.failUnlessEqual(txt, SafeEncode(txt))
1780 class TestFormatTime(unittest.TestCase):
1781 """Testing case for FormatTime"""
1784 self.failUnlessEqual(FormatTime(None), "N/A")
1786 def testInvalid(self):
1787 self.failUnlessEqual(FormatTime(()), "N/A")
1790 # tests that we accept time.time input
1791 FormatTime(time.time())
1792 # tests that we accept int input
1793 FormatTime(int(time.time()))
1796 class RunInSeparateProcess(unittest.TestCase):
1798 for exp in [True, False]:
1802 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1805 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1806 def _child(carg1, carg2):
1807 return carg1 == "Foo" and carg2 == arg
1809 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1812 parent_pid = os.getpid()
1815 return os.getpid() == parent_pid
1817 self.failIf(utils.RunInSeparateProcess(_check))
1819 def testSignal(self):
1821 os.kill(os.getpid(), signal.SIGTERM)
1823 self.assertRaises(errors.GenericError,
1824 utils.RunInSeparateProcess, _kill)
1826 def testException(self):
1828 raise errors.GenericError("This is a test")
1830 self.assertRaises(errors.GenericError,
1831 utils.RunInSeparateProcess, _exc)
1834 class TestFingerprintFile(unittest.TestCase):
1836 self.tmpfile = tempfile.NamedTemporaryFile()
1839 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1840 "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1842 utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1843 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1844 "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1847 class TestUnescapeAndSplit(unittest.TestCase):
1848 """Testing case for UnescapeAndSplit"""
1851 # testing more that one separator for regexp safety
1852 self._seps = [",", "+", "."]
1854 def testSimple(self):
1855 a = ["a", "b", "c", "d"]
1856 for sep in self._seps:
1857 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1859 def testEscape(self):
1860 for sep in self._seps:
1861 a = ["a", "b\\" + sep + "c", "d"]
1862 b = ["a", "b" + sep + "c", "d"]
1863 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1865 def testDoubleEscape(self):
1866 for sep in self._seps:
1867 a = ["a", "b\\\\", "c", "d"]
1868 b = ["a", "b\\", "c", "d"]
1869 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1871 def testThreeEscape(self):
1872 for sep in self._seps:
1873 a = ["a", "b\\\\\\" + sep + "c", "d"]
1874 b = ["a", "b\\" + sep + "c", "d"]
1875 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1878 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1880 self.tmpdir = tempfile.mkdtemp()
1883 shutil.rmtree(self.tmpdir)
1885 def _checkRsaPrivateKey(self, key):
1886 lines = key.splitlines()
1887 return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1888 "-----END RSA PRIVATE KEY-----" in lines)
1890 def _checkCertificate(self, cert):
1891 lines = cert.splitlines()
1892 return ("-----BEGIN CERTIFICATE-----" in lines and
1893 "-----END CERTIFICATE-----" in lines)
1896 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1897 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1898 self._checkRsaPrivateKey(key_pem)
1899 self._checkCertificate(cert_pem)
1901 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1903 self.assert_(key.bits() >= 1024)
1904 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1905 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1907 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1909 self.failIf(x509.has_expired())
1910 self.assertEqual(x509.get_issuer().CN, common_name)
1911 self.assertEqual(x509.get_subject().CN, common_name)
1912 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1914 def testLegacy(self):
1915 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1917 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1919 cert1 = utils.ReadFile(cert1_filename)
1921 self.assert_(self._checkRsaPrivateKey(cert1))
1922 self.assert_(self._checkCertificate(cert1))
1925 class TestPathJoin(unittest.TestCase):
1926 """Testing case for PathJoin"""
1928 def testBasicItems(self):
1929 mlist = ["/a", "b", "c"]
1930 self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1932 def testNonAbsPrefix(self):
1933 self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1935 def testBackTrack(self):
1936 self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1938 def testMultiAbs(self):
1939 self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1942 class TestHostInfo(unittest.TestCase):
1943 """Testing case for HostInfo"""
1945 def testUppercase(self):
1946 data = "AbC.example.com"
1947 self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
1949 def testTooLongName(self):
1950 data = "a.b." + "c" * 255
1951 self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
1953 def testTrailingDot(self):
1955 self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
1957 def testInvalidName(self):
1965 self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
1967 def testValidName(self):
1975 HostInfo.NormalizeName(value)
1978 class TestValidateServiceName(unittest.TestCase):
1979 def testValid(self):
1981 0, 1, 2, 3, 1024, 65000, 65534, 65535,
1986 "0", "80", "1111", "65535",
1989 for name in testnames:
1990 self.assertEqual(utils.ValidateServiceName(name), name)
1992 def testInvalid(self):
1994 -15756, -1, 65536, 133428083,
1995 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1996 "-8546", "-1", "65536",
2000 for name in testnames:
2001 self.assertRaises(OpPrereqError, utils.ValidateServiceName, name)
2004 class TestParseAsn1Generalizedtime(unittest.TestCase):
2007 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
2008 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
2010 self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
2014 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
2016 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
2018 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
2020 self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
2022 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
2025 # Leap seconds are not supported by datetime.datetime
2026 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
2027 "19841231235960+0000")
2028 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
2029 "19920630235960+0000")
2032 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
2033 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
2034 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
2036 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
2037 "Mon Feb 22 17:47:02 UTC 2010")
2038 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
2039 "2010-02-22 17:42:02")
2042 class TestGetX509CertValidity(testutils.GanetiTestCase):
2044 testutils.GanetiTestCase.setUp(self)
2046 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
2048 # Test whether we have pyOpenSSL 0.7 or above
2049 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
2051 if not self.pyopenssl0_7:
2052 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
2053 " function correctly")
2055 def _LoadCert(self, name):
2056 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2057 self._ReadTestData(name))
2060 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
2061 if self.pyopenssl0_7:
2062 self.assertEqual(validity, (1266919967, 1267524767))
2064 self.assertEqual(validity, (None, None))
2067 class TestSignX509Certificate(unittest.TestCase):
2068 KEY = "My private key!"
2069 KEY_OTHER = "Another key"
2072 # Generate certificate valid for 5 minutes
2073 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
2075 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2078 # No signature at all
2079 self.assertRaises(errors.GenericError,
2080 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
2083 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2085 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2086 "X-Ganeti-Signature: \n", self.KEY)
2087 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2088 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
2089 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2090 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
2091 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2092 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
2095 for salt in list("-_@$,:;/\\ \t\n"):
2096 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
2097 cert_pem, self.KEY, "foo%sbar" % salt)
2099 for salt in ["HelloWorld", "salt", string.letters, string.digits,
2100 utils.GenerateSecret(numbytes=4),
2101 utils.GenerateSecret(numbytes=16),
2102 "{123:456}".encode("hex")]:
2103 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
2105 self._Check(cert, salt, signed_pem)
2107 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
2108 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
2109 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
2110 "lines----\n------ at\nthe end!"))
2112 def _Check(self, cert, salt, pem):
2113 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
2114 self.assertEqual(salt, salt2)
2115 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
2118 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
2119 pem, self.KEY_OTHER)
2122 class TestMakedirs(unittest.TestCase):
2124 self.tmpdir = tempfile.mkdtemp()
2127 shutil.rmtree(self.tmpdir)
2129 def testNonExisting(self):
2130 path = utils.PathJoin(self.tmpdir, "foo")
2131 utils.Makedirs(path)
2132 self.assert_(os.path.isdir(path))
2134 def testExisting(self):
2135 path = utils.PathJoin(self.tmpdir, "foo")
2137 utils.Makedirs(path)
2138 self.assert_(os.path.isdir(path))
2140 def testRecursiveNonExisting(self):
2141 path = utils.PathJoin(self.tmpdir, "foo/bar/baz")
2142 utils.Makedirs(path)
2143 self.assert_(os.path.isdir(path))
2145 def testRecursiveExisting(self):
2146 path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
2147 self.assertFalse(os.path.exists(path))
2148 os.mkdir(utils.PathJoin(self.tmpdir, "B"))
2149 utils.Makedirs(path)
2150 self.assert_(os.path.isdir(path))
2153 class TestRetry(testutils.GanetiTestCase):
2155 testutils.GanetiTestCase.setUp(self)
2159 def _RaiseRetryAgain():
2160 raise utils.RetryAgain()
2163 def _RaiseRetryAgainWithArg(args):
2164 raise utils.RetryAgain(*args)
2166 def _WrongNestedLoop(self):
2167 return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
2169 def _RetryAndSucceed(self, retries):
2170 if self.retries < retries:
2172 raise utils.RetryAgain()
2176 def testRaiseTimeout(self):
2177 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2178 self._RaiseRetryAgain, 0.01, 0.02)
2179 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
2180 self._RetryAndSucceed, 0.01, 0, args=[1])
2181 self.failUnlessEqual(self.retries, 1)
2183 def testComplete(self):
2184 self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
2185 self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
2187 self.failUnlessEqual(self.retries, 2)
2189 def testNestedLoop(self):
2191 self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
2192 self._WrongNestedLoop, 0, 1)
2193 except utils.RetryTimeout:
2194 self.fail("Didn't detect inner loop's exception")
2196 def testTimeoutArgument(self):
2197 retry_arg="my_important_debugging_message"
2199 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
2200 except utils.RetryTimeout, err:
2201 self.failUnlessEqual(err.args, (retry_arg, ))
2203 self.fail("Expected timeout didn't happen")
2205 def testRaiseInnerWithExc(self):
2206 retry_arg="my_important_debugging_message"
2209 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2210 args=[[errors.GenericError(retry_arg, retry_arg)]])
2211 except utils.RetryTimeout, err:
2214 self.fail("Expected timeout didn't happen")
2215 except errors.GenericError, err:
2216 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2218 self.fail("Expected GenericError didn't happen")
2220 def testRaiseInnerWithMsg(self):
2221 retry_arg="my_important_debugging_message"
2224 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
2225 args=[[retry_arg, retry_arg]])
2226 except utils.RetryTimeout, err:
2229 self.fail("Expected timeout didn't happen")
2230 except utils.RetryTimeout, err:
2231 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
2233 self.fail("Expected RetryTimeout didn't happen")
2236 class TestLineSplitter(unittest.TestCase):
2239 ls = utils.LineSplitter(lines.append)
2240 ls.write("Hello World\n")
2241 self.assertEqual(lines, [])
2242 ls.write("Foo\n Bar\r\n ")
2245 self.assertEqual(lines, [])
2247 self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
2249 self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
2251 def _testExtra(self, line, all_lines, p1, p2):
2252 self.assertEqual(p1, 999)
2253 self.assertEqual(p2, "extra")
2254 all_lines.append(line)
2256 def testExtraArgsNoFlush(self):
2258 ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
2259 ls.write("\n\nHello World\n")
2260 ls.write("Foo\n Bar\r\n ")
2263 ls.write("Moo\n\nx\n")
2264 self.assertEqual(lines, [])
2266 self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2270 class TestReadLockedPidFile(unittest.TestCase):
2272 self.tmpdir = tempfile.mkdtemp()
2275 shutil.rmtree(self.tmpdir)
2277 def testNonExistent(self):
2278 path = utils.PathJoin(self.tmpdir, "nonexist")
2279 self.assert_(utils.ReadLockedPidFile(path) is None)
2281 def testUnlocked(self):
2282 path = utils.PathJoin(self.tmpdir, "pid")
2283 utils.WriteFile(path, data="123")
2284 self.assert_(utils.ReadLockedPidFile(path) is None)
2286 def testLocked(self):
2287 path = utils.PathJoin(self.tmpdir, "pid")
2288 utils.WriteFile(path, data="123")
2290 fl = utils.FileLock.Open(path)
2292 fl.Exclusive(blocking=True)
2294 self.assertEqual(utils.ReadLockedPidFile(path), 123)
2298 self.assert_(utils.ReadLockedPidFile(path) is None)
2300 def testError(self):
2301 path = utils.PathJoin(self.tmpdir, "foobar", "pid")
2302 utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
2303 # open(2) should return ENOTDIR
2304 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2307 class TestCertVerification(testutils.GanetiTestCase):
2309 testutils.GanetiTestCase.setUp(self)
2311 self.tmpdir = tempfile.mkdtemp()
2314 shutil.rmtree(self.tmpdir)
2316 def testVerifyCertificate(self):
2317 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2318 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2321 # Not checking return value as this certificate is expired
2322 utils.VerifyX509Certificate(cert, 30, 7)
2325 class TestVerifyCertificateInner(unittest.TestCase):
2327 vci = utils._VerifyCertificateInner
2330 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2334 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2335 self.assertEqual(errcode, utils.CERT_WARNING)
2338 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2339 self.assertEqual(errcode, utils.CERT_ERROR)
2341 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2342 self.assertEqual(errcode, utils.CERT_WARNING)
2344 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2345 self.assertEqual(errcode, None)
2348 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2349 self.assertEqual(errcode, utils.CERT_ERROR)
2351 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2352 self.assertEqual(errcode, utils.CERT_ERROR)
2354 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2355 self.assertEqual(errcode, utils.CERT_ERROR)
2357 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2358 self.assertEqual(errcode, utils.CERT_ERROR)
2361 class TestHmacFunctions(unittest.TestCase):
2362 # Digests can be checked with "openssl sha1 -hmac $key"
2363 def testSha1Hmac(self):
2364 self.assertEqual(utils.Sha1Hmac("", ""),
2365 "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2366 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2367 "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2368 self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2369 "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2371 longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2372 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2373 "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2375 def testSha1HmacSalt(self):
2376 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2377 "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2378 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2379 "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2380 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2381 "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2383 def testVerifySha1Hmac(self):
2384 self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2385 "7d64b71fb76370690e1d")))
2386 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2387 ("f904c2476527c6d3e660"
2388 "9ab683c66fa0652cb1dc")))
2390 digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2391 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2392 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2394 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2396 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2399 def testVerifySha1HmacSalt(self):
2400 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2401 ("17a4adc34d69c0d367d4"
2402 "ffbef96fd41d4df7a6e8"),
2404 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2405 ("7f264f8114c9066afc9b"
2406 "b7636e1786d996d3cc0d"),
2410 class TestIgnoreSignals(unittest.TestCase):
2411 """Test the IgnoreSignals decorator"""
2414 def _Raise(exception):
2421 def testIgnoreSignals(self):
2422 sock_err_intr = socket.error(errno.EINTR, "Message")
2423 sock_err_inval = socket.error(errno.EINVAL, "Message")
2425 env_err_intr = EnvironmentError(errno.EINTR, "Message")
2426 env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2428 self.assertRaises(socket.error, self._Raise, sock_err_intr)
2429 self.assertRaises(socket.error, self._Raise, sock_err_inval)
2430 self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2431 self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2433 self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2434 self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2435 self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2437 self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2440 self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2441 self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2444 class TestEnsureDirs(unittest.TestCase):
2445 """Tests for EnsureDirs"""
2448 self.dir = tempfile.mkdtemp()
2449 self.old_umask = os.umask(0777)
2451 def testEnsureDirs(self):
2453 (utils.PathJoin(self.dir, "foo"), 0777),
2454 (utils.PathJoin(self.dir, "bar"), 0000),
2456 self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2457 self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2460 os.rmdir(utils.PathJoin(self.dir, "foo"))
2461 os.rmdir(utils.PathJoin(self.dir, "bar"))
2463 os.umask(self.old_umask)
2466 class TestFormatSeconds(unittest.TestCase):
2468 self.assertEqual(utils.FormatSeconds(1), "1s")
2469 self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2470 self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2471 self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2472 self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2473 self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2474 self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2475 self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2476 self.assertEqual(utils.FormatSeconds(-1), "-1s")
2477 self.assertEqual(utils.FormatSeconds(-282), "-282s")
2478 self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2480 def testFloat(self):
2481 self.assertEqual(utils.FormatSeconds(1.3), "1s")
2482 self.assertEqual(utils.FormatSeconds(1.9), "2s")
2483 self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2484 self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2487 class RunIgnoreProcessNotFound(unittest.TestCase):
2490 os.write(fd, str(os.getpid()))
2495 (pid_read_fd, pid_write_fd) = os.pipe()
2497 # Start short-lived process which writes its PID to pipe
2498 self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2499 os.close(pid_write_fd)
2501 # Read PID from pipe
2502 pid = int(os.read(pid_read_fd, 1024))
2503 os.close(pid_read_fd)
2505 # Try to send signal to process which exited recently
2506 self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2509 class TestIsValidIP4(unittest.TestCase):
2511 self.assert_(utils.IsValidIP4("127.0.0.1"))
2512 self.assert_(utils.IsValidIP4("0.0.0.0"))
2513 self.assert_(utils.IsValidIP4("255.255.255.255"))
2514 self.assertFalse(utils.IsValidIP4("0"))
2515 self.assertFalse(utils.IsValidIP4("1"))
2516 self.assertFalse(utils.IsValidIP4("1.1.1"))
2517 self.assertFalse(utils.IsValidIP4("255.255.255.256"))
2518 self.assertFalse(utils.IsValidIP4("::1"))
2521 class TestIsValidIP6(unittest.TestCase):
2523 self.assert_(utils.IsValidIP6("::"))
2524 self.assert_(utils.IsValidIP6("::1"))
2525 self.assert_(utils.IsValidIP6("1" + (":1" * 7)))
2526 self.assert_(utils.IsValidIP6("ffff" + (":ffff" * 7)))
2527 self.assertFalse(utils.IsValidIP6("0"))
2528 self.assertFalse(utils.IsValidIP6(":1"))
2529 self.assertFalse(utils.IsValidIP6("f" + (":f" * 6)))
2530 self.assertFalse(utils.IsValidIP6("fffg" + (":ffff" * 7)))
2531 self.assertFalse(utils.IsValidIP6("fffff" + (":ffff" * 7)))
2532 self.assertFalse(utils.IsValidIP6("1" + (":1" * 8)))
2533 self.assertFalse(utils.IsValidIP6("127.0.0.1"))
2536 class TestIsValidIP(unittest.TestCase):
2538 self.assert_(utils.IsValidIP("0.0.0.0"))
2539 self.assert_(utils.IsValidIP("127.0.0.1"))
2540 self.assert_(utils.IsValidIP("::"))
2541 self.assert_(utils.IsValidIP("::1"))
2542 self.assertFalse(utils.IsValidIP("0"))
2543 self.assertFalse(utils.IsValidIP("1.1.1.256"))
2544 self.assertFalse(utils.IsValidIP("a:g::1"))
2547 class TestGetAddressFamily(unittest.TestCase):
2549 self.assertEqual(utils.GetAddressFamily("127.0.0.1"), socket.AF_INET)
2550 self.assertEqual(utils.GetAddressFamily("10.2.0.127"), socket.AF_INET)
2551 self.assertEqual(utils.GetAddressFamily("::1"), socket.AF_INET6)
2552 self.assertEqual(utils.GetAddressFamily("fe80::a00:27ff:fe08:5048"),
2554 self.assertRaises(errors.GenericError, utils.GetAddressFamily, "0")
2557 if __name__ == '__main__':
2558 testutils.GanetiTestProgram()