4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the utils module"""
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti.utils import RunCmd, \
49 SetEtcHostsEntry, RemoveEtcHostsEntry
52 class TestIsProcessAlive(unittest.TestCase):
53 """Testing case for IsProcessAlive"""
57 self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
59 def testNotExisting(self):
60 pid_non_existing = os.fork()
61 if pid_non_existing == 0:
63 elif pid_non_existing < 0:
64 raise SystemError("can't fork")
65 os.waitpid(pid_non_existing, 0)
66 self.assertFalse(utils.IsProcessAlive(pid_non_existing),
67 "nonexisting process detected")
70 class TestGetProcStatusPath(unittest.TestCase):
72 self.assert_("/1234/" in utils._GetProcStatusPath(1234))
73 self.assertNotEqual(utils._GetProcStatusPath(1),
74 utils._GetProcStatusPath(2))
77 class TestIsProcessHandlingSignal(unittest.TestCase):
79 self.tmpdir = tempfile.mkdtemp()
82 shutil.rmtree(self.tmpdir)
84 def testParseSigsetT(self):
85 self.assertEqual(len(utils._ParseSigsetT("0")), 0)
86 self.assertEqual(utils._ParseSigsetT("1"), set([1]))
87 self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
88 self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
89 self.assertEqual(utils._ParseSigsetT("0000000180000202"),
91 self.assertEqual(utils._ParseSigsetT("0000000180000002"),
93 self.assertEqual(utils._ParseSigsetT("0000000188000002"),
95 self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
96 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
98 self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
100 def testGetProcStatusField(self):
101 for field in ["SigCgt", "Name", "FDSize"]:
102 for value in ["", "0", "cat", " 1234 KB"]:
103 pstatus = "\n".join([
105 "%s: %s" % (field, value),
108 result = utils._GetProcStatusField(pstatus, field)
109 self.assertEqual(result, value.strip())
112 sp = utils.PathJoin(self.tmpdir, "status")
114 utils.WriteFile(sp, data="\n".join([
116 "State: S (sleeping)",
121 "SigBlk: 0000000000010000",
122 "SigIgn: 0000000000384004",
123 "SigCgt: 000000004b813efb",
124 "CapEff: 0000000000000000",
127 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
129 def testNoSigCgt(self):
130 sp = utils.PathJoin(self.tmpdir, "status")
132 utils.WriteFile(sp, data="\n".join([
136 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
137 1234, 10, status_path=sp)
139 def testNoSuchFile(self):
140 sp = utils.PathJoin(self.tmpdir, "notexist")
142 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
145 def _TestRealProcess():
146 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
147 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
148 raise Exception("SIGUSR1 is handled when it should not be")
150 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
151 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
152 raise Exception("SIGUSR1 is not handled when it should be")
154 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
155 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
156 raise Exception("SIGUSR1 is not handled when it should be")
158 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
159 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
160 raise Exception("SIGUSR1 is handled when it should not be")
164 def testRealProcess(self):
165 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
168 class TestRunCmd(testutils.GanetiTestCase):
169 """Testing case for the RunCmd function"""
172 testutils.GanetiTestCase.setUp(self)
173 self.magic = time.ctime() + " ganeti test"
174 self.fname = self._CreateTempFile()
175 self.fifo_tmpdir = tempfile.mkdtemp()
176 self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo")
177 os.mkfifo(self.fifo_file)
180 shutil.rmtree(self.fifo_tmpdir)
181 testutils.GanetiTestCase.tearDown(self)
184 """Test successful exit code"""
185 result = RunCmd("/bin/sh -c 'exit 0'")
186 self.assertEqual(result.exit_code, 0)
187 self.assertEqual(result.output, "")
190 """Test fail exit code"""
191 result = RunCmd("/bin/sh -c 'exit 1'")
192 self.assertEqual(result.exit_code, 1)
193 self.assertEqual(result.output, "")
195 def testStdout(self):
196 """Test standard output"""
197 cmd = 'echo -n "%s"' % self.magic
198 result = RunCmd("/bin/sh -c '%s'" % cmd)
199 self.assertEqual(result.stdout, self.magic)
200 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
201 self.assertEqual(result.output, "")
202 self.assertFileContent(self.fname, self.magic)
204 def testStderr(self):
205 """Test standard error"""
206 cmd = 'echo -n "%s"' % self.magic
207 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
208 self.assertEqual(result.stderr, self.magic)
209 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
210 self.assertEqual(result.output, "")
211 self.assertFileContent(self.fname, self.magic)
213 def testCombined(self):
214 """Test combined output"""
215 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
216 expected = "A" + self.magic + "B" + self.magic
217 result = RunCmd("/bin/sh -c '%s'" % cmd)
218 self.assertEqual(result.output, expected)
219 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
220 self.assertEqual(result.output, "")
221 self.assertFileContent(self.fname, expected)
223 def testSignal(self):
225 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
226 self.assertEqual(result.signal, 15)
227 self.assertEqual(result.output, "")
229 def testTimeoutClean(self):
230 cmd = "trap 'exit 0' TERM; read < %s" % self.fifo_file
231 result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
232 self.assertEqual(result.exit_code, 0)
234 def testTimeoutKill(self):
235 cmd = ["/bin/sh", "-c", "trap '' TERM; read < %s" % self.fifo_file]
237 out, err, status, ta = utils._RunCmdPipe(cmd, {}, False, "/", False,
238 timeout, _linger_timeout=0.2)
239 self.assert_(status < 0)
240 self.assertEqual(-status, signal.SIGKILL)
242 def testTimeoutOutputAfterTerm(self):
243 cmd = "trap 'echo sigtermed; exit 1' TERM; read < %s" % self.fifo_file
244 result = RunCmd(["/bin/sh", "-c", cmd], timeout=0.2)
245 self.assert_(result.failed)
246 self.assertEqual(result.stdout, "sigtermed\n")
248 def testListRun(self):
250 result = RunCmd(["true"])
251 self.assertEqual(result.signal, None)
252 self.assertEqual(result.exit_code, 0)
253 result = RunCmd(["/bin/sh", "-c", "exit 1"])
254 self.assertEqual(result.signal, None)
255 self.assertEqual(result.exit_code, 1)
256 result = RunCmd(["echo", "-n", self.magic])
257 self.assertEqual(result.signal, None)
258 self.assertEqual(result.exit_code, 0)
259 self.assertEqual(result.stdout, self.magic)
261 def testFileEmptyOutput(self):
262 """Test file output"""
263 result = RunCmd(["true"], output=self.fname)
264 self.assertEqual(result.signal, None)
265 self.assertEqual(result.exit_code, 0)
266 self.assertFileContent(self.fname, "")
269 """Test locale environment"""
270 old_env = os.environ.copy()
272 os.environ["LANG"] = "en_US.UTF-8"
273 os.environ["LC_ALL"] = "en_US.UTF-8"
274 result = RunCmd(["locale"])
275 for line in result.output.splitlines():
276 key, value = line.split("=", 1)
277 # Ignore these variables, they're overridden by LC_ALL
278 if key == "LANG" or key == "LANGUAGE":
280 self.failIf(value and value != "C" and value != '"C"',
281 "Variable %s is set to the invalid value '%s'" % (key, value))
285 def testDefaultCwd(self):
286 """Test default working directory"""
287 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
290 """Test default working directory"""
291 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
292 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
294 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
296 def testResetEnv(self):
297 """Test environment reset functionality"""
298 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
299 self.failUnlessEqual(RunCmd(["env"], reset_env=True,
300 env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
302 def testNoFork(self):
303 """Test that nofork raise an error"""
304 self.assertFalse(utils._no_fork)
307 self.assertTrue(utils._no_fork)
308 self.assertRaises(errors.ProgrammerError, RunCmd, ["true"])
310 utils._no_fork = False
312 def testWrongParams(self):
313 """Test wrong parameters"""
314 self.assertRaises(errors.ProgrammerError, RunCmd, ["true"],
315 output="/dev/null", interactive=True)
318 class TestRunParts(testutils.GanetiTestCase):
319 """Testing case for the RunParts function"""
322 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
325 shutil.rmtree(self.rundir)
328 """Test on an empty dir"""
329 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
331 def testSkipWrongName(self):
332 """Test that wrong files are skipped"""
333 fname = os.path.join(self.rundir, "00test.dot")
334 utils.WriteFile(fname, data="")
335 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
336 relname = os.path.basename(fname)
337 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
338 [(relname, constants.RUNPARTS_SKIP, None)])
340 def testSkipNonExec(self):
341 """Test that non executable files are skipped"""
342 fname = os.path.join(self.rundir, "00test")
343 utils.WriteFile(fname, data="")
344 relname = os.path.basename(fname)
345 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
346 [(relname, constants.RUNPARTS_SKIP, None)])
349 """Test error on a broken executable"""
350 fname = os.path.join(self.rundir, "00test")
351 utils.WriteFile(fname, data="")
352 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
353 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
354 self.failUnlessEqual(relname, os.path.basename(fname))
355 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
356 self.failUnless(error)
358 def testSorted(self):
359 """Test executions are sorted"""
361 files.append(os.path.join(self.rundir, "64test"))
362 files.append(os.path.join(self.rundir, "00test"))
363 files.append(os.path.join(self.rundir, "42test"))
366 utils.WriteFile(fname, data="")
368 results = RunParts(self.rundir, reset_env=True)
370 for fname in sorted(files):
371 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
374 """Test correct execution"""
375 fname = os.path.join(self.rundir, "00test")
376 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
377 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
378 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
379 self.failUnlessEqual(relname, os.path.basename(fname))
380 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
381 self.failUnlessEqual(runresult.stdout, "ciao")
383 def testRunFail(self):
384 """Test correct execution, with run failure"""
385 fname = os.path.join(self.rundir, "00test")
386 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
387 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
388 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
389 self.failUnlessEqual(relname, os.path.basename(fname))
390 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
391 self.failUnlessEqual(runresult.exit_code, 1)
392 self.failUnless(runresult.failed)
394 def testRunMix(self):
396 files.append(os.path.join(self.rundir, "00test"))
397 files.append(os.path.join(self.rundir, "42test"))
398 files.append(os.path.join(self.rundir, "64test"))
399 files.append(os.path.join(self.rundir, "99test"))
403 # 1st has errors in execution
404 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
405 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
408 utils.WriteFile(files[1], data="")
410 # 3rd cannot execute properly
411 utils.WriteFile(files[2], data="")
412 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
415 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
416 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
418 results = RunParts(self.rundir, reset_env=True)
420 (relname, status, runresult) = results[0]
421 self.failUnlessEqual(relname, os.path.basename(files[0]))
422 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
423 self.failUnlessEqual(runresult.exit_code, 1)
424 self.failUnless(runresult.failed)
426 (relname, status, runresult) = results[1]
427 self.failUnlessEqual(relname, os.path.basename(files[1]))
428 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
429 self.failUnlessEqual(runresult, None)
431 (relname, status, runresult) = results[2]
432 self.failUnlessEqual(relname, os.path.basename(files[2]))
433 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
434 self.failUnless(runresult)
436 (relname, status, runresult) = results[3]
437 self.failUnlessEqual(relname, os.path.basename(files[3]))
438 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
439 self.failUnlessEqual(runresult.output, "ciao")
440 self.failUnlessEqual(runresult.exit_code, 0)
441 self.failUnless(not runresult.failed)
443 def testMissingDirectory(self):
444 nosuchdir = utils.PathJoin(self.rundir, "no/such/directory")
445 self.assertEqual(RunParts(nosuchdir), [])
448 class TestStartDaemon(testutils.GanetiTestCase):
450 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
451 self.tmpfile = os.path.join(self.tmpdir, "test")
454 shutil.rmtree(self.tmpdir)
457 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
458 self._wait(self.tmpfile, 60.0, "Hello World")
460 def testShellOutput(self):
461 utils.StartDaemon("echo Hello World", output=self.tmpfile)
462 self._wait(self.tmpfile, 60.0, "Hello World")
464 def testNoShellNoOutput(self):
465 utils.StartDaemon(["pwd"])
467 def testNoShellNoOutputTouch(self):
468 testfile = os.path.join(self.tmpdir, "check")
469 self.failIf(os.path.exists(testfile))
470 utils.StartDaemon(["touch", testfile])
471 self._wait(testfile, 60.0, "")
473 def testNoShellOutput(self):
474 utils.StartDaemon(["pwd"], output=self.tmpfile)
475 self._wait(self.tmpfile, 60.0, "/")
477 def testNoShellOutputCwd(self):
478 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
479 self._wait(self.tmpfile, 60.0, os.getcwd())
481 def testShellEnv(self):
482 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
483 env={ "GNT_TEST_VAR": "Hello World", })
484 self._wait(self.tmpfile, 60.0, "Hello World")
486 def testNoShellEnv(self):
487 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
488 env={ "GNT_TEST_VAR": "Hello World", })
489 self._wait(self.tmpfile, 60.0, "Hello World")
491 def testOutputFd(self):
492 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
494 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
497 self._wait(self.tmpfile, 60.0, os.getcwd())
500 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
501 self._wait(self.tmpfile, 60.0, str(pid))
503 def testPidFile(self):
504 pidfile = os.path.join(self.tmpdir, "pid")
505 checkfile = os.path.join(self.tmpdir, "abort")
507 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
510 fd = os.open(pidfile, os.O_RDONLY)
512 # Check file is locked
513 self.assertRaises(errors.LockError, utils.LockFile, fd)
515 pidtext = os.read(fd, 100)
519 self.assertEqual(int(pidtext.strip()), pid)
521 self.assert_(utils.IsProcessAlive(pid))
523 # No matter what happens, kill daemon
524 utils.KillProcess(pid, timeout=5.0, waitpid=False)
525 self.failIf(utils.IsProcessAlive(pid))
527 self.assertEqual(utils.ReadFile(self.tmpfile), "")
529 def _wait(self, path, timeout, expected):
530 # Due to the asynchronous nature of daemon processes, polling is necessary.
531 # A timeout makes sure the test doesn't hang forever.
533 if not (os.path.isfile(path) and
534 utils.ReadFile(path).strip() == expected):
535 raise utils.RetryAgain()
538 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
539 except utils.RetryTimeout:
540 self.fail("Apparently the daemon didn't run in %s seconds and/or"
541 " didn't write the correct output" % timeout)
544 self.assertRaises(errors.OpExecError, utils.StartDaemon,
545 ["./does-NOT-EXIST/here/0123456789"])
546 self.assertRaises(errors.OpExecError, utils.StartDaemon,
547 ["./does-NOT-EXIST/here/0123456789"],
548 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
549 self.assertRaises(errors.OpExecError, utils.StartDaemon,
550 ["./does-NOT-EXIST/here/0123456789"],
551 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
552 self.assertRaises(errors.OpExecError, utils.StartDaemon,
553 ["./does-NOT-EXIST/here/0123456789"],
554 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
556 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
558 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
559 ["./does-NOT-EXIST/here/0123456789"],
560 output=self.tmpfile, output_fd=fd)
565 class TestParseCpuMask(unittest.TestCase):
566 """Test case for the ParseCpuMask function."""
568 def testWellFormed(self):
569 self.assertEqual(utils.ParseCpuMask(""), [])
570 self.assertEqual(utils.ParseCpuMask("1"), [1])
571 self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
573 def testInvalidInput(self):
574 for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
575 self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
578 class TestEtcHosts(testutils.GanetiTestCase):
579 """Test functions modifying /etc/hosts"""
582 testutils.GanetiTestCase.setUp(self)
583 self.tmpname = self._CreateTempFile()
584 handle = open(self.tmpname, 'w')
586 handle.write('# This is a test file for /etc/hosts\n')
587 handle.write('127.0.0.1\tlocalhost\n')
588 handle.write('192.0.2.1 router gw\n')
592 def testSettingNewIp(self):
593 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
596 self.assertFileContent(self.tmpname,
597 "# This is a test file for /etc/hosts\n"
598 "127.0.0.1\tlocalhost\n"
599 "192.0.2.1 router gw\n"
600 "198.51.100.4\tmyhost.example.com myhost\n")
601 self.assertFileMode(self.tmpname, 0644)
603 def testSettingExistingIp(self):
604 SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
607 self.assertFileContent(self.tmpname,
608 "# This is a test file for /etc/hosts\n"
609 "127.0.0.1\tlocalhost\n"
610 "192.0.2.1\tmyhost.example.com myhost\n")
611 self.assertFileMode(self.tmpname, 0644)
613 def testSettingDuplicateName(self):
614 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
616 self.assertFileContent(self.tmpname,
617 "# This is a test file for /etc/hosts\n"
618 "127.0.0.1\tlocalhost\n"
619 "192.0.2.1 router gw\n"
620 "198.51.100.4\tmyhost\n")
621 self.assertFileMode(self.tmpname, 0644)
623 def testRemovingExistingHost(self):
624 RemoveEtcHostsEntry(self.tmpname, 'router')
626 self.assertFileContent(self.tmpname,
627 "# This is a test file for /etc/hosts\n"
628 "127.0.0.1\tlocalhost\n"
630 self.assertFileMode(self.tmpname, 0644)
632 def testRemovingSingleExistingHost(self):
633 RemoveEtcHostsEntry(self.tmpname, 'localhost')
635 self.assertFileContent(self.tmpname,
636 "# This is a test file for /etc/hosts\n"
637 "192.0.2.1 router gw\n")
638 self.assertFileMode(self.tmpname, 0644)
640 def testRemovingNonExistingHost(self):
641 RemoveEtcHostsEntry(self.tmpname, 'myhost')
643 self.assertFileContent(self.tmpname,
644 "# This is a test file for /etc/hosts\n"
645 "127.0.0.1\tlocalhost\n"
646 "192.0.2.1 router gw\n")
647 self.assertFileMode(self.tmpname, 0644)
649 def testRemovingAlias(self):
650 RemoveEtcHostsEntry(self.tmpname, 'gw')
652 self.assertFileContent(self.tmpname,
653 "# This is a test file for /etc/hosts\n"
654 "127.0.0.1\tlocalhost\n"
655 "192.0.2.1 router\n")
656 self.assertFileMode(self.tmpname, 0644)
659 class TestGetMounts(unittest.TestCase):
660 """Test case for GetMounts()."""
663 "rootfs / rootfs rw 0 0\n"
664 "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
665 "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
668 self.tmpfile = tempfile.NamedTemporaryFile()
669 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
671 def testGetMounts(self):
672 self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
674 ("rootfs", "/", "rootfs", "rw"),
675 ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
676 ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
679 class TestNewUUID(unittest.TestCase):
680 """Test case for NewUUID"""
683 self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
686 class TestFirstFree(unittest.TestCase):
687 """Test case for the FirstFree function"""
691 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
692 self.failUnlessEqual(FirstFree([]), None)
693 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
694 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
695 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
698 class TestTimeFunctions(unittest.TestCase):
699 """Test case for time functions"""
702 self.assertEqual(utils.SplitTime(1), (1, 0))
703 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
704 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
705 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
706 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
707 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
708 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
709 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
711 self.assertRaises(AssertionError, utils.SplitTime, -1)
713 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
714 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
715 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
717 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
719 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
721 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
722 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
723 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
724 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
725 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
728 class FieldSetTestCase(unittest.TestCase):
729 """Test case for FieldSets"""
731 def testSimpleMatch(self):
732 f = utils.FieldSet("a", "b", "c", "def")
733 self.failUnless(f.Matches("a"))
734 self.failIf(f.Matches("d"), "Substring matched")
735 self.failIf(f.Matches("defghi"), "Prefix string matched")
736 self.failIf(f.NonMatching(["b", "c"]))
737 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
738 self.failUnless(f.NonMatching(["a", "d"]))
740 def testRegexMatch(self):
741 f = utils.FieldSet("a", "b([0-9]+)", "c")
742 self.failUnless(f.Matches("b1"))
743 self.failUnless(f.Matches("b99"))
744 self.failIf(f.Matches("b/1"))
745 self.failIf(f.NonMatching(["b12", "c"]))
746 self.failUnless(f.NonMatching(["a", "1"]))
748 class TestForceDictType(unittest.TestCase):
749 """Test case for ForceDictType"""
751 "a": constants.VTYPE_INT,
752 "b": constants.VTYPE_BOOL,
753 "c": constants.VTYPE_STRING,
754 "d": constants.VTYPE_SIZE,
755 "e": constants.VTYPE_MAYBE_STRING,
758 def _fdt(self, dict, allowed_values=None):
759 if allowed_values is None:
760 utils.ForceDictType(dict, self.KEY_TYPES)
762 utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
766 def testSimpleDict(self):
767 self.assertEqual(self._fdt({}), {})
768 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
769 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
770 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
771 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
772 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
773 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
774 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
775 self.assertEqual(self._fdt({'b': False}), {'b': False})
776 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
777 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
778 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
779 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
780 self.assertEqual(self._fdt({"e": None, }), {"e": None, })
781 self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
782 self.assertEqual(self._fdt({"e": False, }), {"e": '', })
783 self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
785 def testErrors(self):
786 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
787 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
788 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
789 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
790 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
791 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
792 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
793 self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
794 self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
795 self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
796 {"b": "hello"}, {"b": "no-such-type"})
799 class RunInSeparateProcess(unittest.TestCase):
801 for exp in [True, False]:
805 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
808 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
809 def _child(carg1, carg2):
810 return carg1 == "Foo" and carg2 == arg
812 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
815 parent_pid = os.getpid()
818 return os.getpid() == parent_pid
820 self.failIf(utils.RunInSeparateProcess(_check))
822 def testSignal(self):
824 os.kill(os.getpid(), signal.SIGTERM)
826 self.assertRaises(errors.GenericError,
827 utils.RunInSeparateProcess, _kill)
829 def testException(self):
831 raise errors.GenericError("This is a test")
833 self.assertRaises(errors.GenericError,
834 utils.RunInSeparateProcess, _exc)
837 class TestValidateServiceName(unittest.TestCase):
840 0, 1, 2, 3, 1024, 65000, 65534, 65535,
845 "0", "80", "1111", "65535",
848 for name in testnames:
849 self.assertEqual(utils.ValidateServiceName(name), name)
851 def testInvalid(self):
853 -15756, -1, 65536, 133428083,
854 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
855 "-8546", "-1", "65536",
859 for name in testnames:
860 self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
863 class TestReadLockedPidFile(unittest.TestCase):
865 self.tmpdir = tempfile.mkdtemp()
868 shutil.rmtree(self.tmpdir)
870 def testNonExistent(self):
871 path = utils.PathJoin(self.tmpdir, "nonexist")
872 self.assert_(utils.ReadLockedPidFile(path) is None)
874 def testUnlocked(self):
875 path = utils.PathJoin(self.tmpdir, "pid")
876 utils.WriteFile(path, data="123")
877 self.assert_(utils.ReadLockedPidFile(path) is None)
879 def testLocked(self):
880 path = utils.PathJoin(self.tmpdir, "pid")
881 utils.WriteFile(path, data="123")
883 fl = utils.FileLock.Open(path)
885 fl.Exclusive(blocking=True)
887 self.assertEqual(utils.ReadLockedPidFile(path), 123)
891 self.assert_(utils.ReadLockedPidFile(path) is None)
894 path = utils.PathJoin(self.tmpdir, "foobar", "pid")
895 utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
896 # open(2) should return ENOTDIR
897 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
900 class TestFindMatch(unittest.TestCase):
904 "bb": {"Two B": True},
905 re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
908 self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
909 self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
911 for i in ["foo", "bar", "bazX"]:
912 for j in range(1, 100, 7):
913 self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
914 ((1, 2, 3), [i, str(j)]))
916 def testNoMatch(self):
917 self.assert_(utils.FindMatch({}, "") is None)
918 self.assert_(utils.FindMatch({}, "foo") is None)
919 self.assert_(utils.FindMatch({}, 1234) is None)
923 re.compile("^(something)$"): "Hello World",
926 self.assert_(utils.FindMatch(data, "") is None)
927 self.assert_(utils.FindMatch(data, "Hello World") is None)
931 def __init__(self, values):
935 return self.values.pop(0)
938 class TestRunningTimeout(unittest.TestCase):
940 self.time_fn = TimeMock([0.0, 0.3, 4.6, 6.5])
942 def testRemainingFloat(self):
943 timeout = utils.RunningTimeout(5.0, True, _time_fn=self.time_fn)
944 self.assertAlmostEqual(timeout.Remaining(), 4.7)
945 self.assertAlmostEqual(timeout.Remaining(), 0.4)
946 self.assertAlmostEqual(timeout.Remaining(), -1.5)
948 def testRemaining(self):
949 self.time_fn = TimeMock([0, 2, 4, 5, 6])
950 timeout = utils.RunningTimeout(5, True, _time_fn=self.time_fn)
951 self.assertEqual(timeout.Remaining(), 3)
952 self.assertEqual(timeout.Remaining(), 1)
953 self.assertEqual(timeout.Remaining(), 0)
954 self.assertEqual(timeout.Remaining(), -1)
956 def testRemainingNonNegative(self):
957 timeout = utils.RunningTimeout(5.0, False, _time_fn=self.time_fn)
958 self.assertAlmostEqual(timeout.Remaining(), 4.7)
959 self.assertAlmostEqual(timeout.Remaining(), 0.4)
960 self.assertEqual(timeout.Remaining(), 0.0)
962 def testNegativeTimeout(self):
963 self.assertRaises(ValueError, utils.RunningTimeout, -1.0, True)
966 class TestTryConvert(unittest.TestCase):
968 for src, fn, result in [
974 self.assertEqual(utils.TryConvert(fn, src), result)
977 class TestIsValidShellParam(unittest.TestCase):
983 self.assertEqual(utils.IsValidShellParam(val), result)
986 class TestBuildShellCmd(unittest.TestCase):
988 self.assertRaises(errors.ProgrammerError, utils.BuildShellCmd,
990 self.assertEqual(utils.BuildShellCmd("ls %s", "ab"), "ls ab")
993 if __name__ == '__main__':
994 testutils.GanetiTestProgram()