def RunCmd(cmd, env=None, output=None, cwd="/", reset_env=False,
interactive=False, timeout=None, noclose_fds=None,
- _postfork_fn=None):
+ input_fd=None, _postfork_fn=None):
"""Execute a (shell) command.
The command should not read from its standard input, as it will be
@type noclose_fds: list
@param noclose_fds: list of additional (fd >=3) file descriptors to leave
open for the child process
+ @type input_fd: C{file}-like object or numeric file descriptor
+ @param input_fd: File descriptor for process' standard input
@param _postfork_fn: Callback run after fork but before timeout (unittest)
@rtype: L{RunResult}
@return: RunResult instance
raise errors.ProgrammerError("Parameters 'output' and 'interactive' can"
" not be provided at the same time")
+ if not (output is None or input_fd is None):
+ # The current logic in "_RunCmdFile", which is used when output is defined,
+ # does not support input files (not hard to implement, though)
+ raise errors.ProgrammerError("Parameters 'output' and 'input_fd' can"
+ " not be used at the same time")
+
if isinstance(cmd, basestring):
strcmd = cmd
shell = True
if output is None:
out, err, status, timeout_action = _RunCmdPipe(cmd, cmd_env, shell, cwd,
interactive, timeout,
- noclose_fds,
+ noclose_fds, input_fd,
_postfork_fn=_postfork_fn)
else:
assert _postfork_fn is None, \
"_postfork_fn not supported if output provided"
+ assert input_fd is None
timeout_action = _TIMEOUT_NONE
status = _RunCmdFile(cmd, cmd_env, shell, output, cwd, noclose_fds)
out = err = ""
def _RunCmdPipe(cmd, env, via_shell, cwd, interactive, timeout, noclose_fds,
+ input_fd,
_linger_timeout=constants.CHILD_LINGER_TIMEOUT,
_postfork_fn=None):
"""Run a command and return its output.
@type noclose_fds: list
@param noclose_fds: list of additional (fd >=3) file descriptors to leave
open for the child process
+ @type input_fd: C{file}-like object or numeric file descriptor
+ @param input_fd: File descriptor for process' standard input
@param _postfork_fn: Function run after fork but before timeout (unittest)
@rtype: tuple
@return: (out, err, status)
"""
poller = select.poll()
- stderr = subprocess.PIPE
- stdout = subprocess.PIPE
- stdin = subprocess.PIPE
-
if interactive:
- stderr = stdout = stdin = None
+ stderr = None
+ stdout = None
+ else:
+ stderr = subprocess.PIPE
+ stdout = subprocess.PIPE
+
+ if input_fd:
+ stdin = input_fd
+ elif interactive:
+ stdin = None
+ else:
+ stdin = subprocess.PIPE
if noclose_fds:
preexec_fn = lambda: CloseFDs(noclose_fds)
timeout_action = _TIMEOUT_NONE
+ # subprocess: "If the stdin argument is PIPE, this attribute is a file object
+ # that provides input to the child process. Otherwise, it is None."
+ assert (stdin == subprocess.PIPE) ^ (child.stdin is None), \
+ "subprocess' stdin did not behave as documented"
+
if not interactive:
- child.stdin.close()
+ if child.stdin is not None:
+ child.stdin.close()
poller.register(child.stdout, select.POLLIN)
poller.register(child.stderr, select.POLLIN)
fdmap = {
(out, err, status, ta) = \
utils.process._RunCmdPipe(cmd, {}, False, "/", False,
timeout, [self.proc_ready_helper.write_fd],
+ None,
_linger_timeout=0.2,
_postfork_fn=self.proc_ready_helper.Ready)
self.assert_(status < 0)
finally:
temp.close()
+ def testNoInputRead(self):
+ testfile = self._TestDataFilename("cert1.pem")
+
+ result = utils.RunCmd(["cat"], timeout=10.0)
+ self.assertFalse(result.failed)
+ self.assertEqual(result.stderr, "")
+ self.assertEqual(result.stdout, "")
+
+ def testInputFileHandle(self):
+ testfile = self._TestDataFilename("cert1.pem")
+
+ result = utils.RunCmd(["cat"], input_fd=open(testfile, "r"))
+ self.assertFalse(result.failed)
+ self.assertEqual(result.stdout, utils.ReadFile(testfile))
+ self.assertEqual(result.stderr, "")
+
+ def testInputNumericFileDescriptor(self):
+ testfile = self._TestDataFilename("cert2.pem")
+
+ fh = open(testfile, "r")
+ try:
+ result = utils.RunCmd(["cat"], input_fd=fh.fileno())
+ finally:
+ fh.close()
+
+ self.assertFalse(result.failed)
+ self.assertEqual(result.stdout, utils.ReadFile(testfile))
+ self.assertEqual(result.stderr, "")
+
+ def testInputWithCloseFds(self):
+ testfile = self._TestDataFilename("cert1.pem")
+
+ temp = open(self.fname, "r+")
+ try:
+ temp.write("test283523367")
+ temp.seek(0)
+
+ result = utils.RunCmd(["/bin/bash", "-c",
+ ("cat && read -u %s; echo $REPLY" %
+ temp.fileno())],
+ input_fd=open(testfile, "r"),
+ noclose_fds=[temp.fileno()])
+ self.assertFalse(result.failed)
+ self.assertEqual(result.stdout.strip(),
+ utils.ReadFile(testfile) + "test283523367")
+ self.assertEqual(result.stderr, "")
+ finally:
+ temp.close()
+
+ def testOutputAndInteractive(self):
+ self.assertRaises(errors.ProgrammerError, utils.RunCmd,
+ [], output=self.fname, interactive=True)
+
+ def testOutputAndInput(self):
+ self.assertRaises(errors.ProgrammerError, utils.RunCmd,
+ [], output=self.fname, input_fd=open(self.fname))
+
class TestRunParts(testutils.GanetiTestCase):
"""Testing case for the RunParts function"""