From: Adam Simpkins Date: Thu, 16 May 2013 02:03:40 +0000 (-0700) Subject: rework the Subprocess::communicate() API X-Git-Tag: v0.22.0~949 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=76ade9184fe7702e0e6547e0bd3553f810304b59 rework the Subprocess::communicate() API Summary: This eliminates the CommunicateFlags argument to communicate(). It now always reads from both stdout and stderr if they were set up a pipes. If a non-empty input buffer was supplied, it always writes that to stdin. This mimics the communicate() behavior of python's subprocess.py module. This also makes it impossible to have buffering deadlocks by forgetting to call communicate() with readStderr(). Test Plan: Ran the existing subprocess tests, and also added a more complicated duplex test that requires communication on stdin, stdout, and stderr all at the same time. Also grepped for all existing users of Subprocess::communicate(), and made sure they will work correctly with the new behavior. Reviewed By: tudorb@fb.com FB internal diff: D814405 --- diff --git a/folly/Subprocess.cpp b/folly/Subprocess.cpp index 11026c29..fb4c19a9 100644 --- a/folly/Subprocess.cpp +++ b/folly/Subprocess.cpp @@ -617,12 +617,11 @@ bool discardRead(int fd) { } // namespace std::pair Subprocess::communicate( - const CommunicateFlags& flags, - StringPiece data) { - IOBufQueue dataQueue; - dataQueue.wrapBuffer(data.data(), data.size()); + StringPiece input) { + IOBufQueue inputQueue; + inputQueue.wrapBuffer(input.data(), input.size()); - auto outQueues = communicateIOBuf(flags, std::move(dataQueue)); + auto outQueues = communicateIOBuf(std::move(inputQueue)); auto outBufs = std::make_pair(outQueues.first.move(), outQueues.second.move()); std::pair out; @@ -640,14 +639,21 @@ std::pair Subprocess::communicate( } std::pair Subprocess::communicateIOBuf( - const CommunicateFlags& flags, - IOBufQueue data) { + IOBufQueue input) { + // If the user supplied a non-empty input buffer, make sure + // that stdin is a pipe so we can write the data. + if (!input.empty()) { + // findByChildFd() will throw std::invalid_argument if no pipe for + // STDIN_FILENO exists + findByChildFd(STDIN_FILENO); + } + std::pair out; auto readCallback = [&] (int pfd, int cfd) -> bool { - if (cfd == 1 && flags.readStdout_) { + if (cfd == STDOUT_FILENO) { return handleRead(pfd, out.first); - } else if (cfd == 2 && flags.readStderr_) { + } else if (cfd == STDERR_FILENO) { return handleRead(pfd, out.second); } else { // Don't close the file descriptor, the child might not like SIGPIPE, @@ -657,11 +663,11 @@ std::pair Subprocess::communicateIOBuf( }; auto writeCallback = [&] (int pfd, int cfd) -> bool { - if (cfd == 0 && flags.writeStdin_) { - return handleWrite(pfd, data); + if (cfd == STDIN_FILENO) { + return handleWrite(pfd, input); } else { // If we don't want to write to this fd, just close it. - return false; + return true; } }; diff --git a/folly/Subprocess.h b/folly/Subprocess.h index 58c9c9c8..94cc4089 100644 --- a/folly/Subprocess.h +++ b/folly/Subprocess.h @@ -224,19 +224,23 @@ class Subprocess : private boost::noncopyable { /** * Shortcut to change the action for standard input. */ - Options& stdin(int action) { return fd(0, action); } + Options& stdin(int action) { return fd(STDIN_FILENO, action); } /** * Shortcut to change the action for standard output. */ - Options& stdout(int action) { return fd(1, action); } + Options& stdout(int action) { return fd(STDOUT_FILENO, action); } /** * Shortcut to change the action for standard error. * Note that stderr(1) will redirect the standard error to the same * file descriptor as standard output; the equivalent of bash's "2>&1" */ - Options& stderr(int action) { return fd(2, action); } + Options& stderr(int action) { return fd(STDERR_FILENO, action); } + + Options& pipeStdin() { return fd(STDIN_FILENO, PIPE_IN); } + Options& pipeStdout() { return fd(STDOUT_FILENO, PIPE_OUT); } + Options& pipeStderr() { return fd(STDERR_FILENO, PIPE_OUT); } /** * Close all other fds (other than standard input, output, error, @@ -310,19 +314,19 @@ class Subprocess : private boost::noncopyable { const std::vector* env = nullptr); /** - * Append all data, close the stdin (to-child) fd, and read all data, - * except that this is done in a safe manner to prevent deadlocking. + * Communicate with the child until all pipes to/from the child are closed. * - * If writeStdin() is given in flags, the process must have been opened with - * stdinFd=PIPE. + * The input buffer is written to the process' stdin pipe, and data is read + * from the stdout and stderr pipes. Non-blocking I/O is performed on all + * pipes simultaneously to avoid deadlocks. * - * If readStdout() is given in flags, the first returned value will be the - * value read from the child's stdout; the child must have been opened with - * stdoutFd=PIPE. + * The stdin pipe will be closed after the full input buffer has been written. + * An error will be thrown if a non-empty input buffer is supplied but stdin + * was not configured as a pipe. * - * If readStderr() is given in flags, the second returned value will be the - * value read from the child's stderr; the child must have been opened with - * stderrFd=PIPE. + * Returns a pair of buffers containing the data read from stdout and stderr. + * If stdout or stderr is not a pipe, an empty IOBuf queue will be returned + * for the respective buffer. * * Note that communicate() returns when all pipes to/from the child are * closed; the child might stay alive after that, so you must still wait(). @@ -331,39 +335,11 @@ class Subprocess : private boost::noncopyable { * that it won't try to allocate all data at once). communicate * uses strings for simplicity. */ - class CommunicateFlags : private boost::orable { - friend class Subprocess; - public: - CommunicateFlags() - : writeStdin_(false), readStdout_(false), readStderr_(false) { } - CommunicateFlags& writeStdin() { writeStdin_ = true; return *this; } - CommunicateFlags& readStdout() { readStdout_ = true; return *this; } - CommunicateFlags& readStderr() { readStderr_ = true; return *this; } - - CommunicateFlags& operator|=(const CommunicateFlags& other); - private: - bool writeStdin_; - bool readStdout_; - bool readStderr_; - }; - - static CommunicateFlags writeStdin() { - return CommunicateFlags().writeStdin(); - } - static CommunicateFlags readStdout() { - return CommunicateFlags().readStdout(); - } - static CommunicateFlags readStderr() { - return CommunicateFlags().readStderr(); - } - std::pair communicateIOBuf( - const CommunicateFlags& flags = readStdout(), - IOBufQueue data = IOBufQueue()); + IOBufQueue input = IOBufQueue()); std::pair communicate( - const CommunicateFlags& flags = readStdout(), - StringPiece data = StringPiece()); + StringPiece input = StringPiece()); /** * Communicate with the child until all pipes to/from the child are closed. @@ -546,15 +522,6 @@ inline Subprocess::Options& Subprocess::Options::operator|=( return *this; } -inline Subprocess::CommunicateFlags& Subprocess::CommunicateFlags::operator|=( - const Subprocess::CommunicateFlags& other) { - if (this == &other) return *this; - writeStdin_ |= other.writeStdin_; - readStdout_ |= other.readStdout_; - readStderr_ |= other.readStderr_; - return *this; -} - } // namespace folly #endif /* FOLLY_SUBPROCESS_H_ */ diff --git a/folly/io/IOBufQueue.h b/folly/io/IOBufQueue.h index 2a63fbf9..71966f49 100644 --- a/folly/io/IOBufQueue.h +++ b/folly/io/IOBufQueue.h @@ -222,6 +222,13 @@ class IOBufQueue { return chainLength_; } + /** + * Returns true iff the IOBuf chain length is 0. + */ + bool empty() const { + return !head_ || head_->empty(); + } + const Options& options() const { return options_; } diff --git a/folly/test/SubprocessTest.cpp b/folly/test/SubprocessTest.cpp index 112e9aae..769eec57 100644 --- a/folly/test/SubprocessTest.cpp +++ b/folly/test/SubprocessTest.cpp @@ -26,6 +26,7 @@ #include "folly/Exception.h" #include "folly/Format.h" +#include "folly/String.h" #include "folly/experimental/Gen.h" #include "folly/experimental/FileGen.h" #include "folly/experimental/StringGen.h" @@ -126,8 +127,7 @@ TEST(SimpleSubprocessTest, FdLeakTest) { checkFdLeak([] { Subprocess proc("echo foo; echo bar >&2", Subprocess::pipeStdout() | Subprocess::pipeStderr()); - auto p = proc.communicate(Subprocess::readStdout() | - Subprocess::readStderr()); + auto p = proc.communicate(); EXPECT_EQ("foo\n", p.first); EXPECT_EQ("bar\n", p.second); proc.waitChecked(); @@ -209,8 +209,7 @@ TEST(CommunicateSubprocessTest, BigWrite) { } Subprocess proc("wc -l", Subprocess::pipeStdin() | Subprocess::pipeStdout()); - auto p = proc.communicate(Subprocess::writeStdin() | Subprocess::readStdout(), - data); + auto p = proc.communicate(data); EXPECT_EQ(folly::format("{}\n", numLines).str(), p.first); proc.waitChecked(); } @@ -223,9 +222,73 @@ TEST(CommunicateSubprocessTest, Duplex) { Subprocess proc("tr a-z A-Z", Subprocess::pipeStdin() | Subprocess::pipeStdout()); - auto p = proc.communicate(Subprocess::writeStdin() | Subprocess::readStdout(), - line); + auto p = proc.communicate(line); EXPECT_EQ(bytes, p.first.size()); EXPECT_EQ(std::string::npos, p.first.find_first_not_of('X')); proc.waitChecked(); } + +TEST(CommunicateSubprocessTest, Duplex2) { + checkFdLeak([] { + // Pipe 200,000 lines through sed + const size_t numCopies = 100000; + auto iobuf = IOBuf::copyBuffer("this is a test\nanother line\n"); + IOBufQueue input; + for (int n = 0; n < numCopies; ++n) { + input.append(iobuf->clone()); + } + + std::vector cmd({ + "sed", "-u", + "-e", "s/a test/a successful test/", + "-e", "/^another line/w/dev/stderr", + }); + auto options = Subprocess::pipeStdin().pipeStdout().pipeStderr().usePath(); + Subprocess proc(cmd, options); + auto out = proc.communicateIOBuf(std::move(input)); + proc.waitChecked(); + + // Convert stdout and stderr to strings so we can call split() on them. + fbstring stdoutStr; + if (out.first.front()) { + stdoutStr = out.first.move()->moveToFbString(); + } + fbstring stderrStr; + if (out.second.front()) { + stderrStr = out.second.move()->moveToFbString(); + } + + // stdout should be a copy of stdin, with "a test" replaced by + // "a successful test" + std::vector stdoutLines; + split('\n', stdoutStr, stdoutLines); + EXPECT_EQ(numCopies * 2 + 1, stdoutLines.size()); + // Strip off the trailing empty line + if (!stdoutLines.empty()) { + EXPECT_EQ("", stdoutLines.back()); + stdoutLines.pop_back(); + } + size_t linenum = 0; + for (const auto& line : stdoutLines) { + if ((linenum & 1) == 0) { + EXPECT_EQ("this is a successful test", line); + } else { + EXPECT_EQ("another line", line); + } + ++linenum; + } + + // stderr should only contain the lines containing "another line" + std::vector stderrLines; + split('\n', stderrStr, stderrLines); + EXPECT_EQ(numCopies + 1, stderrLines.size()); + // Strip off the trailing empty line + if (!stderrLines.empty()) { + EXPECT_EQ("", stderrLines.back()); + stderrLines.pop_back(); + } + for (const auto& line : stderrLines) { + EXPECT_EQ("another line", line); + } + }); +}