4 # Copyright (C) 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.process"""
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import errors
40 class TestIsProcessAlive(unittest.TestCase):
41 """Testing case for IsProcessAlive"""
45 self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
47 def testNotExisting(self):
48 pid_non_existing = os.fork()
49 if pid_non_existing == 0:
51 elif pid_non_existing < 0:
52 raise SystemError("can't fork")
53 os.waitpid(pid_non_existing, 0)
54 self.assertFalse(utils.IsProcessAlive(pid_non_existing),
55 "nonexisting process detected")
58 class TestGetProcStatusPath(unittest.TestCase):
60 self.assert_("/1234/" in utils.process._GetProcStatusPath(1234))
61 self.assertNotEqual(utils.process._GetProcStatusPath(1),
62 utils.process._GetProcStatusPath(2))
65 class TestIsProcessHandlingSignal(unittest.TestCase):
67 self.tmpdir = tempfile.mkdtemp()
70 shutil.rmtree(self.tmpdir)
72 def testParseSigsetT(self):
73 parse_sigset_t_fn = utils.process._ParseSigsetT
74 self.assertEqual(len(parse_sigset_t_fn("0")), 0)
75 self.assertEqual(parse_sigset_t_fn("1"), set([1]))
76 self.assertEqual(parse_sigset_t_fn("1000a"), set([2, 4, 17]))
77 self.assertEqual(parse_sigset_t_fn("810002"), set([2, 17, 24, ]))
78 self.assertEqual(parse_sigset_t_fn("0000000180000202"),
80 self.assertEqual(parse_sigset_t_fn("0000000180000002"),
82 self.assertEqual(parse_sigset_t_fn("0000000188000002"),
84 self.assertEqual(parse_sigset_t_fn("000000004b813efb"),
85 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
87 self.assertEqual(parse_sigset_t_fn("ffffff"), set(range(1, 25)))
89 def testGetProcStatusField(self):
90 for field in ["SigCgt", "Name", "FDSize"]:
91 for value in ["", "0", "cat", " 1234 KB"]:
94 "%s: %s" % (field, value),
97 result = utils.process._GetProcStatusField(pstatus, field)
98 self.assertEqual(result, value.strip())
101 sp = utils.PathJoin(self.tmpdir, "status")
103 utils.WriteFile(sp, data="\n".join([
105 "State: S (sleeping)",
110 "SigBlk: 0000000000010000",
111 "SigIgn: 0000000000384004",
112 "SigCgt: 000000004b813efb",
113 "CapEff: 0000000000000000",
116 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
118 def testNoSigCgt(self):
119 sp = utils.PathJoin(self.tmpdir, "status")
121 utils.WriteFile(sp, data="\n".join([
125 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
126 1234, 10, status_path=sp)
128 def testNoSuchFile(self):
129 sp = utils.PathJoin(self.tmpdir, "notexist")
131 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
134 def _TestRealProcess():
135 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
136 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
137 raise Exception("SIGUSR1 is handled when it should not be")
139 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
140 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
141 raise Exception("SIGUSR1 is not handled when it should be")
143 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
144 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
145 raise Exception("SIGUSR1 is not handled when it should be")
147 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
148 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
149 raise Exception("SIGUSR1 is handled when it should not be")
153 def testRealProcess(self):
154 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
157 class _PostforkProcessReadyHelper:
158 """A helper to use with _postfork_fn in RunCmd.
160 It makes sure a process has reached a certain state by reading from a fifo.
162 @ivar write_fd: The fd number to write to
165 def __init__(self, timeout):
166 """Initialize the helper.
168 @param fifo_dir: The dir where we can create the fifo
169 @param timeout: The time in seconds to wait before giving up
172 self.timeout = timeout
173 (self.read_fd, self.write_fd) = os.pipe()
175 def Ready(self, pid):
176 """Waits until the process is ready.
178 @param pid: The pid of the process
181 (read_ready, _, _) = select.select([self.read_fd], [], [], self.timeout)
185 raise AssertionError("Timeout %d reached while waiting for process %d"
186 " to become ready" % (self.timeout, pid))
189 """Cleans up the helper.
192 os.close(self.read_fd)
193 os.close(self.write_fd)
196 class TestRunCmd(testutils.GanetiTestCase):
197 """Testing case for the RunCmd function"""
200 testutils.GanetiTestCase.setUp(self)
201 self.magic = time.ctime() + " ganeti test"
202 self.fname = self._CreateTempFile()
203 self.fifo_tmpdir = tempfile.mkdtemp()
204 self.fifo_file = os.path.join(self.fifo_tmpdir, "ganeti_test_fifo")
205 os.mkfifo(self.fifo_file)
207 # If the process is not ready after 20 seconds we have bigger issues
208 self.proc_ready_helper = _PostforkProcessReadyHelper(20)
211 self.proc_ready_helper.Cleanup()
212 shutil.rmtree(self.fifo_tmpdir)
213 testutils.GanetiTestCase.tearDown(self)
216 """Test successful exit code"""
217 result = utils.RunCmd("/bin/sh -c 'exit 0'")
218 self.assertEqual(result.exit_code, 0)
219 self.assertEqual(result.output, "")
222 """Test fail exit code"""
223 result = utils.RunCmd("/bin/sh -c 'exit 1'")
224 self.assertEqual(result.exit_code, 1)
225 self.assertEqual(result.output, "")
227 def testStdout(self):
228 """Test standard output"""
229 cmd = 'echo -n "%s"' % self.magic
230 result = utils.RunCmd("/bin/sh -c '%s'" % cmd)
231 self.assertEqual(result.stdout, self.magic)
232 result = utils.RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
233 self.assertEqual(result.output, "")
234 self.assertFileContent(self.fname, self.magic)
236 def testStderr(self):
237 """Test standard error"""
238 cmd = 'echo -n "%s"' % self.magic
239 result = utils.RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
240 self.assertEqual(result.stderr, self.magic)
241 result = utils.RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
242 self.assertEqual(result.output, "")
243 self.assertFileContent(self.fname, self.magic)
245 def testCombined(self):
246 """Test combined output"""
247 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
248 expected = "A" + self.magic + "B" + self.magic
249 result = utils.RunCmd("/bin/sh -c '%s'" % cmd)
250 self.assertEqual(result.output, expected)
251 result = utils.RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
252 self.assertEqual(result.output, "")
253 self.assertFileContent(self.fname, expected)
255 def testSignal(self):
257 result = utils.RunCmd(["python", "-c",
258 "import os; os.kill(os.getpid(), 15)"])
259 self.assertEqual(result.signal, 15)
260 self.assertEqual(result.output, "")
262 def testTimeoutClean(self):
263 cmd = ("trap 'exit 0' TERM; echo >&%d; read < %s" %
264 (self.proc_ready_helper.write_fd, self.fifo_file))
265 result = utils.RunCmd(["/bin/sh", "-c", cmd], timeout=0.2,
266 noclose_fds=[self.proc_ready_helper.write_fd],
267 _postfork_fn=self.proc_ready_helper.Ready)
268 self.assertEqual(result.exit_code, 0)
270 def testTimeoutKill(self):
271 cmd = ["/bin/sh", "-c", "trap '' TERM; echo >&%d; read < %s" %
272 (self.proc_ready_helper.write_fd, self.fifo_file)]
274 (out, err, status, ta) = \
275 utils.process._RunCmdPipe(cmd, {}, False, "/", False,
276 timeout, [self.proc_ready_helper.write_fd],
279 _postfork_fn=self.proc_ready_helper.Ready)
280 self.assert_(status < 0)
281 self.assertEqual(-status, signal.SIGKILL)
283 def testTimeoutOutputAfterTerm(self):
284 cmd = ("trap 'echo sigtermed; exit 1' TERM; echo >&%d; read < %s" %
285 (self.proc_ready_helper.write_fd, self.fifo_file))
286 result = utils.RunCmd(["/bin/sh", "-c", cmd], timeout=0.2,
287 noclose_fds=[self.proc_ready_helper.write_fd],
288 _postfork_fn=self.proc_ready_helper.Ready)
289 self.assert_(result.failed)
290 self.assertEqual(result.stdout, "sigtermed\n")
292 def testListRun(self):
294 result = utils.RunCmd(["true"])
295 self.assertEqual(result.signal, None)
296 self.assertEqual(result.exit_code, 0)
297 result = utils.RunCmd(["/bin/sh", "-c", "exit 1"])
298 self.assertEqual(result.signal, None)
299 self.assertEqual(result.exit_code, 1)
300 result = utils.RunCmd(["echo", "-n", self.magic])
301 self.assertEqual(result.signal, None)
302 self.assertEqual(result.exit_code, 0)
303 self.assertEqual(result.stdout, self.magic)
305 def testFileEmptyOutput(self):
306 """Test file output"""
307 result = utils.RunCmd(["true"], output=self.fname)
308 self.assertEqual(result.signal, None)
309 self.assertEqual(result.exit_code, 0)
310 self.assertFileContent(self.fname, "")
313 """Test locale environment"""
314 old_env = os.environ.copy()
316 os.environ["LANG"] = "en_US.UTF-8"
317 os.environ["LC_ALL"] = "en_US.UTF-8"
318 result = utils.RunCmd(["locale"])
319 for line in result.output.splitlines():
320 key, value = line.split("=", 1)
321 # Ignore these variables, they're overridden by LC_ALL
322 if key == "LANG" or key == "LANGUAGE":
324 self.failIf(value and value != "C" and value != '"C"',
325 "Variable %s is set to the invalid value '%s'" % (key, value))
329 def testDefaultCwd(self):
330 """Test default working directory"""
331 self.failUnlessEqual(utils.RunCmd(["pwd"]).stdout.strip(), "/")
334 """Test default working directory"""
335 self.failUnlessEqual(utils.RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
336 self.failUnlessEqual(utils.RunCmd(["pwd"], cwd="/tmp").stdout.strip(),
339 self.failUnlessEqual(utils.RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
341 def testResetEnv(self):
342 """Test environment reset functionality"""
343 self.failUnlessEqual(utils.RunCmd(["env"], reset_env=True).stdout.strip(),
345 self.failUnlessEqual(utils.RunCmd(["env"], reset_env=True,
346 env={"FOO": "bar",}).stdout.strip(),
349 def testNoFork(self):
350 """Test that nofork raise an error"""
351 self.assertFalse(utils.process._no_fork)
354 self.assertTrue(utils.process._no_fork)
355 self.assertRaises(errors.ProgrammerError, utils.RunCmd, ["true"])
357 utils.process._no_fork = False
358 self.assertFalse(utils.process._no_fork)
360 def testWrongParams(self):
361 """Test wrong parameters"""
362 self.assertRaises(errors.ProgrammerError, utils.RunCmd, ["true"],
363 output="/dev/null", interactive=True)
365 def testNocloseFds(self):
366 """Test selective fd retention (noclose_fds)"""
367 temp = open(self.fname, "r+")
371 cmd = "read -u %d; echo $REPLY" % temp.fileno()
372 result = utils.RunCmd(["/bin/bash", "-c", cmd])
373 self.assertEqual(result.stdout.strip(), "")
375 result = utils.RunCmd(["/bin/bash", "-c", cmd],
376 noclose_fds=[temp.fileno()])
377 self.assertEqual(result.stdout.strip(), "test")
381 def testNoInputRead(self):
382 testfile = self._TestDataFilename("cert1.pem")
384 result = utils.RunCmd(["cat"], timeout=10.0)
385 self.assertFalse(result.failed)
386 self.assertEqual(result.stderr, "")
387 self.assertEqual(result.stdout, "")
389 def testInputFileHandle(self):
390 testfile = self._TestDataFilename("cert1.pem")
392 result = utils.RunCmd(["cat"], input_fd=open(testfile, "r"))
393 self.assertFalse(result.failed)
394 self.assertEqual(result.stdout, utils.ReadFile(testfile))
395 self.assertEqual(result.stderr, "")
397 def testInputNumericFileDescriptor(self):
398 testfile = self._TestDataFilename("cert2.pem")
400 fh = open(testfile, "r")
402 result = utils.RunCmd(["cat"], input_fd=fh.fileno())
406 self.assertFalse(result.failed)
407 self.assertEqual(result.stdout, utils.ReadFile(testfile))
408 self.assertEqual(result.stderr, "")
410 def testInputWithCloseFds(self):
411 testfile = self._TestDataFilename("cert1.pem")
413 temp = open(self.fname, "r+")
415 temp.write("test283523367")
418 result = utils.RunCmd(["/bin/bash", "-c",
419 ("cat && read -u %s; echo $REPLY" %
421 input_fd=open(testfile, "r"),
422 noclose_fds=[temp.fileno()])
423 self.assertFalse(result.failed)
424 self.assertEqual(result.stdout.strip(),
425 utils.ReadFile(testfile) + "test283523367")
426 self.assertEqual(result.stderr, "")
430 def testOutputAndInteractive(self):
431 self.assertRaises(errors.ProgrammerError, utils.RunCmd,
432 [], output=self.fname, interactive=True)
434 def testOutputAndInput(self):
435 self.assertRaises(errors.ProgrammerError, utils.RunCmd,
436 [], output=self.fname, input_fd=open(self.fname))
439 class TestRunParts(testutils.GanetiTestCase):
440 """Testing case for the RunParts function"""
443 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
446 shutil.rmtree(self.rundir)
449 """Test on an empty dir"""
450 self.failUnlessEqual(utils.RunParts(self.rundir, reset_env=True), [])
452 def testSkipWrongName(self):
453 """Test that wrong files are skipped"""
454 fname = os.path.join(self.rundir, "00test.dot")
455 utils.WriteFile(fname, data="")
456 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
457 relname = os.path.basename(fname)
458 self.failUnlessEqual(utils.RunParts(self.rundir, reset_env=True),
459 [(relname, constants.RUNPARTS_SKIP, None)])
461 def testSkipNonExec(self):
462 """Test that non executable files are skipped"""
463 fname = os.path.join(self.rundir, "00test")
464 utils.WriteFile(fname, data="")
465 relname = os.path.basename(fname)
466 self.failUnlessEqual(utils.RunParts(self.rundir, reset_env=True),
467 [(relname, constants.RUNPARTS_SKIP, None)])
470 """Test error on a broken executable"""
471 fname = os.path.join(self.rundir, "00test")
472 utils.WriteFile(fname, data="")
473 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
474 (relname, status, error) = utils.RunParts(self.rundir, reset_env=True)[0]
475 self.failUnlessEqual(relname, os.path.basename(fname))
476 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
477 self.failUnless(error)
479 def testSorted(self):
480 """Test executions are sorted"""
482 files.append(os.path.join(self.rundir, "64test"))
483 files.append(os.path.join(self.rundir, "00test"))
484 files.append(os.path.join(self.rundir, "42test"))
487 utils.WriteFile(fname, data="")
489 results = utils.RunParts(self.rundir, reset_env=True)
491 for fname in sorted(files):
492 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
495 """Test correct execution"""
496 fname = os.path.join(self.rundir, "00test")
497 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
498 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
499 (relname, status, runresult) = \
500 utils.RunParts(self.rundir, reset_env=True)[0]
501 self.failUnlessEqual(relname, os.path.basename(fname))
502 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
503 self.failUnlessEqual(runresult.stdout, "ciao")
505 def testRunFail(self):
506 """Test correct execution, with run failure"""
507 fname = os.path.join(self.rundir, "00test")
508 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
509 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
510 (relname, status, runresult) = \
511 utils.RunParts(self.rundir, reset_env=True)[0]
512 self.failUnlessEqual(relname, os.path.basename(fname))
513 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
514 self.failUnlessEqual(runresult.exit_code, 1)
515 self.failUnless(runresult.failed)
517 def testRunMix(self):
519 files.append(os.path.join(self.rundir, "00test"))
520 files.append(os.path.join(self.rundir, "42test"))
521 files.append(os.path.join(self.rundir, "64test"))
522 files.append(os.path.join(self.rundir, "99test"))
526 # 1st has errors in execution
527 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
528 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
531 utils.WriteFile(files[1], data="")
533 # 3rd cannot execute properly
534 utils.WriteFile(files[2], data="")
535 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
538 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
539 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
541 results = utils.RunParts(self.rundir, reset_env=True)
543 (relname, status, runresult) = results[0]
544 self.failUnlessEqual(relname, os.path.basename(files[0]))
545 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
546 self.failUnlessEqual(runresult.exit_code, 1)
547 self.failUnless(runresult.failed)
549 (relname, status, runresult) = results[1]
550 self.failUnlessEqual(relname, os.path.basename(files[1]))
551 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
552 self.failUnlessEqual(runresult, None)
554 (relname, status, runresult) = results[2]
555 self.failUnlessEqual(relname, os.path.basename(files[2]))
556 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
557 self.failUnless(runresult)
559 (relname, status, runresult) = results[3]
560 self.failUnlessEqual(relname, os.path.basename(files[3]))
561 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
562 self.failUnlessEqual(runresult.output, "ciao")
563 self.failUnlessEqual(runresult.exit_code, 0)
564 self.failUnless(not runresult.failed)
566 def testMissingDirectory(self):
567 nosuchdir = utils.PathJoin(self.rundir, "no/such/directory")
568 self.assertEqual(utils.RunParts(nosuchdir), [])
571 class TestStartDaemon(testutils.GanetiTestCase):
573 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
574 self.tmpfile = os.path.join(self.tmpdir, "test")
577 shutil.rmtree(self.tmpdir)
580 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
581 self._wait(self.tmpfile, 60.0, "Hello World")
583 def testShellOutput(self):
584 utils.StartDaemon("echo Hello World", output=self.tmpfile)
585 self._wait(self.tmpfile, 60.0, "Hello World")
587 def testNoShellNoOutput(self):
588 utils.StartDaemon(["pwd"])
590 def testNoShellNoOutputTouch(self):
591 testfile = os.path.join(self.tmpdir, "check")
592 self.failIf(os.path.exists(testfile))
593 utils.StartDaemon(["touch", testfile])
594 self._wait(testfile, 60.0, "")
596 def testNoShellOutput(self):
597 utils.StartDaemon(["pwd"], output=self.tmpfile)
598 self._wait(self.tmpfile, 60.0, "/")
600 def testNoShellOutputCwd(self):
601 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
602 self._wait(self.tmpfile, 60.0, os.getcwd())
604 def testShellEnv(self):
605 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
606 env={ "GNT_TEST_VAR": "Hello World", })
607 self._wait(self.tmpfile, 60.0, "Hello World")
609 def testNoShellEnv(self):
610 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
611 env={ "GNT_TEST_VAR": "Hello World", })
612 self._wait(self.tmpfile, 60.0, "Hello World")
614 def testOutputFd(self):
615 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
617 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
620 self._wait(self.tmpfile, 60.0, os.getcwd())
623 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
624 self._wait(self.tmpfile, 60.0, str(pid))
626 def testPidFile(self):
627 pidfile = os.path.join(self.tmpdir, "pid")
628 checkfile = os.path.join(self.tmpdir, "abort")
630 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
633 fd = os.open(pidfile, os.O_RDONLY)
635 # Check file is locked
636 self.assertRaises(errors.LockError, utils.LockFile, fd)
638 pidtext = os.read(fd, 100)
642 self.assertEqual(int(pidtext.strip()), pid)
644 self.assert_(utils.IsProcessAlive(pid))
646 # No matter what happens, kill daemon
647 utils.KillProcess(pid, timeout=5.0, waitpid=False)
648 self.failIf(utils.IsProcessAlive(pid))
650 self.assertEqual(utils.ReadFile(self.tmpfile), "")
652 def _wait(self, path, timeout, expected):
653 # Due to the asynchronous nature of daemon processes, polling is necessary.
654 # A timeout makes sure the test doesn't hang forever.
656 if not (os.path.isfile(path) and
657 utils.ReadFile(path).strip() == expected):
658 raise utils.RetryAgain()
661 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
662 except utils.RetryTimeout:
663 self.fail("Apparently the daemon didn't run in %s seconds and/or"
664 " didn't write the correct output" % timeout)
667 self.assertRaises(errors.OpExecError, utils.StartDaemon,
668 ["./does-NOT-EXIST/here/0123456789"])
669 self.assertRaises(errors.OpExecError, utils.StartDaemon,
670 ["./does-NOT-EXIST/here/0123456789"],
671 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
672 self.assertRaises(errors.OpExecError, utils.StartDaemon,
673 ["./does-NOT-EXIST/here/0123456789"],
674 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
675 self.assertRaises(errors.OpExecError, utils.StartDaemon,
676 ["./does-NOT-EXIST/here/0123456789"],
677 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
679 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
681 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
682 ["./does-NOT-EXIST/here/0123456789"],
683 output=self.tmpfile, output_fd=fd)
688 class RunInSeparateProcess(unittest.TestCase):
690 for exp in [True, False]:
694 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
697 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
698 def _child(carg1, carg2):
699 return carg1 == "Foo" and carg2 == arg
701 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
704 parent_pid = os.getpid()
707 return os.getpid() == parent_pid
709 self.failIf(utils.RunInSeparateProcess(_check))
711 def testSignal(self):
713 os.kill(os.getpid(), signal.SIGTERM)
715 self.assertRaises(errors.GenericError,
716 utils.RunInSeparateProcess, _kill)
718 def testException(self):
720 raise errors.GenericError("This is a test")
722 self.assertRaises(errors.GenericError,
723 utils.RunInSeparateProcess, _exc)
726 if __name__ == "__main__":
727 testutils.GanetiTestProgram()