Factor out pipe methods, add takeOwnershipOfPipes
authorAlexey Spiridonov <lesha@fb.com>
Fri, 8 May 2015 01:30:14 +0000 (18:30 -0700)
committerPraveen Kumar Ramakrishnan <praveenr@fb.com>
Tue, 12 May 2015 00:02:41 +0000 (17:02 -0700)
Summary:
In order to create an EventBase'd Suprocess class, I'd like to be able to manage the lifetime of pipes independently of the lifetime of the process. To this effect, I factored out basic Pipe handling, and provided a function that detaches the pipe vector from the Subprocess object.

#6996492 a push-blocking test is broken in trunk

Test Plan: added a unit test, fbconfig -r folly && fbmake runtests && fbmake runtests_opt

Reviewed By: dancol@fb.com

Subscribers: yfeldblum, chalfant, dancol, wez, anarayanan, trunkagent, net-systems@, njormrod, folly-diffs@

FB internal diff: D1699969

Signature: t1:1699969:1430975299:30d291ab7fcc555edddf098b33095a5b29500e76

folly/Subprocess.cpp
folly/Subprocess.h
folly/test/SubprocessTest.cpp

index 36c824c3cfe2a773bcf1868c55ea4df952083a4d..17e997e6854537cb266611a8f453a1513c2e3aab 100644 (file)
@@ -96,6 +96,7 @@ std::string ProcessReturnCode::str() const {
                            (coreDumped() ? " (core dumped)" : ""));
   }
   CHECK(false);  // unreached
+  return "";  // silence GCC warning
 }
 
 CalledProcessError::CalledProcessError(ProcessReturnCode rc)
@@ -189,13 +190,9 @@ Subprocess::Subprocess(
 Subprocess::~Subprocess() {
   CHECK_NE(returnCode_.state(), ProcessReturnCode::RUNNING)
     << "Subprocess destroyed without reaping child";
-  closeAll();
 }
 
 namespace {
-void closeChecked(int fd) {
-  checkUnixError(::close(fd), "close");
-}
 
 struct ChildErrorInfo {
   int errCode;
@@ -214,16 +211,9 @@ void childError(int errFd, int errCode, int errnoValue) {
 
 }  // namespace
 
-void Subprocess::closeAll() {
-  for (auto& p : pipes_) {
-    closeChecked(p.parentFd);
-  }
-  pipes_.clear();
-}
-
 void Subprocess::setAllNonBlocking() {
   for (auto& p : pipes_) {
-    int fd = p.parentFd;
+    int fd = p.pipe.fd();
     int flags = ::fcntl(fd, F_GETFL);
     checkUnixError(flags, "fcntl");
     int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
@@ -244,12 +234,8 @@ void Subprocess::spawn(
   // Make a copy, we'll mutate options
   Options options(optionsIn);
 
-  // On error, close all of the pipes_
-  auto pipesGuard = makeGuard([&] {
-    for (auto& p : this->pipes_) {
-      CHECK_ERR(::close(p.parentFd));
-    }
-  });
+  // On error, close all pipes_ (ignoring errors, but that seems fine here).
+  auto pipesGuard = makeGuard([this] { pipes_.clear(); });
 
   // Create a pipe to use to receive error information from the child,
   // in case it fails before calling exec()
@@ -325,6 +311,9 @@ void Subprocess::spawnInternal(
       // doesn't need to reset the flag on its end, as we always dup2() the fd,
       // and dup2() fds don't share the close-on-exec flag.
 #if FOLLY_HAVE_PIPE2
+      // If possible, set close-on-exec atomically. Otherwise, a concurrent
+      // Subprocess invocation can fork() between "pipe" and "fnctl",
+      // causing FDs to leak.
       r = ::pipe2(fds, O_CLOEXEC);
       checkUnixError(r, "pipe2");
 #else
@@ -335,21 +324,21 @@ void Subprocess::spawnInternal(
       r = fcntl(fds[1], F_SETFD, FD_CLOEXEC);
       checkUnixError(r, "set FD_CLOEXEC");
 #endif
-      PipeInfo pinfo;
-      pinfo.direction = p.second;
+      pipes_.emplace_back();
+      Pipe& pipe = pipes_.back();
+      pipe.direction = p.second;
       int cfd;
       if (p.second == PIPE_IN) {
         // Child gets reading end
-        pinfo.parentFd = fds[1];
+        pipe.pipe = folly::File(fds[1], /*owns_fd=*/ true);
         cfd = fds[0];
       } else {
-        pinfo.parentFd = fds[0];
+        pipe.pipe = folly::File(fds[0], /*owns_fd=*/ true);
         cfd = fds[1];
       }
       p.second = cfd;  // ensure it gets dup2()ed
-      pinfo.childFd = p.first;
+      pipe.childFd = p.first;
       childFds.push_back(cfd);
-      pipes_.push_back(pinfo);
     }
   }
 
@@ -543,6 +532,8 @@ ProcessReturnCode Subprocess::poll() {
   pid_t found = ::waitpid(pid_, &status, WNOHANG);
   checkUnixError(found, "waitpid");
   if (found != 0) {
+    // Though the child process had quit, this call does not close the pipes
+    // since its descendants may still be using them.
     returnCode_ = ProcessReturnCode(status);
     pid_ = -1;
   }
@@ -566,6 +557,8 @@ ProcessReturnCode Subprocess::wait() {
     found = ::waitpid(pid_, &status, 0);
   } while (found == -1 && errno == EINTR);
   checkUnixError(found, "waitpid");
+  // Though the child process had quit, this call does not close the pipes
+  // since its descendants may still be using them.
   DCHECK_EQ(found, pid_);
   returnCode_ = ProcessReturnCode(status);
   pid_ = -1;
@@ -709,12 +702,14 @@ std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
 
 void Subprocess::communicate(FdCallback readCallback,
                              FdCallback writeCallback) {
+  // This serves to prevent wait() followed by communicate(), but if you
+  // legitimately need that, send a patch to delete this line.
   returnCode_.enforce(ProcessReturnCode::RUNNING);
   setAllNonBlocking();
 
   std::vector<pollfd> fds;
   fds.reserve(pipes_.size());
-  std::vector<int> toClose;
+  std::vector<size_t> toClose;  // indexes into pipes_
   toClose.reserve(pipes_.size());
 
   while (!pipes_.empty()) {
@@ -723,7 +718,7 @@ void Subprocess::communicate(FdCallback readCallback,
 
     for (auto& p : pipes_) {
       pollfd pfd;
-      pfd.fd = p.parentFd;
+      pfd.fd = p.pipe.fd();
       // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
       // child's point of view.
       if (!p.enabled) {
@@ -746,13 +741,14 @@ void Subprocess::communicate(FdCallback readCallback,
 
     for (size_t i = 0; i < pipes_.size(); ++i) {
       auto& p = pipes_[i];
-      DCHECK_EQ(fds[i].fd, p.parentFd);
+      auto parentFd = p.pipe.fd();
+      DCHECK_EQ(fds[i].fd, parentFd);
       short events = fds[i].revents;
 
       bool closed = false;
       if (events & POLLOUT) {
         DCHECK(!(events & POLLIN));
-        if (writeCallback(p.parentFd, p.childFd)) {
+        if (writeCallback(parentFd, p.childFd)) {
           toClose.push_back(i);
           closed = true;
         }
@@ -762,7 +758,7 @@ void Subprocess::communicate(FdCallback readCallback,
       // on) end of file
       if (events & (POLLIN | POLLHUP)) {
         DCHECK(!(events & POLLOUT));
-        if (readCallback(p.parentFd, p.childFd)) {
+        if (readCallback(parentFd, p.childFd)) {
           toClose.push_back(i);
           closed = true;
         }
@@ -777,7 +773,7 @@ void Subprocess::communicate(FdCallback readCallback,
     // Close the fds in reverse order so the indexes hold after erase()
     for (int idx : boost::adaptors::reverse(toClose)) {
       auto pos = pipes_.begin() + idx;
-      closeChecked(pos->parentFd);
+      pos->pipe.close();  // Throws on error
       pipes_.erase(pos);
     }
   }
@@ -791,10 +787,10 @@ bool Subprocess::notificationsEnabled(int childFd) const {
   return pipes_[findByChildFd(childFd)].enabled;
 }
 
-int Subprocess::findByChildFd(int childFd) const {
+size_t Subprocess::findByChildFd(int childFd) const {
   auto pos = std::lower_bound(
       pipes_.begin(), pipes_.end(), childFd,
-      [] (const PipeInfo& info, int fd) { return info.childFd < fd; });
+      [] (const Pipe& pipe, int fd) { return pipe.childFd < fd; });
   if (pos == pipes_.end() || pos->childFd != childFd) {
     throw std::invalid_argument(folly::to<std::string>(
         "child fd not found ", childFd));
@@ -804,10 +800,19 @@ int Subprocess::findByChildFd(int childFd) const {
 
 void Subprocess::closeParentFd(int childFd) {
   int idx = findByChildFd(childFd);
-  closeChecked(pipes_[idx].parentFd);
+  pipes_[idx].pipe.close();  // May throw
   pipes_.erase(pipes_.begin() + idx);
 }
 
+std::vector<Subprocess::ChildPipe> Subprocess::takeOwnershipOfPipes() {
+  std::vector<Subprocess::ChildPipe> pipes;
+  for (auto& p : pipes_) {
+    pipes.emplace_back(ChildPipe{p.childFd, std::move(p.pipe)});
+  }
+  pipes_.clear();
+  return pipes;
+}
+
 namespace {
 
 class Initializer {
index 1c0cd0759625fc5b5f0867045b0bce684b7f8bf5..cd6afa13c784dc7574fbf802d848dbf47dc0b131 100644 (file)
  * output, and standard error to/from child descriptors in the parent,
  * or to create communication pipes between the child and the parent.
  *
- * The simplest example is a thread-safe version of the system() library
+ * The simplest example is a thread-safe [1] version of the system() library
  * function:
  *    Subprocess(cmd).wait();
  * which executes the command using the default shell and waits for it
  * to complete, returning the exit status.
  *
- * A thread-safe version of popen() (type="r", to read from the child):
+ * A thread-safe [1] version of popen() (type="r", to read from the child):
  *    Subprocess proc(cmd, Subprocess::pipeStdout());
  *    // read from proc.stdout()
  *    proc.wait();
  *
- * A thread-safe version of popen() (type="w", to write to the child):
+ * A thread-safe [1] version of popen() (type="w", to write to the child):
  *    Subprocess proc(cmd, Subprocess::pipeStdin());
  *    // write to proc.stdin()
  *    proc.wait();
  *
- * If you want to redirect both stdin and stdout to pipes, you can, but
- * note that you're subject to a variety of deadlocks.  You'll want to use
- * nonblocking I/O; look at the implementation of communicate() for an example.
+ * If you want to redirect both stdin and stdout to pipes, you can, but note
+ * that you're subject to a variety of deadlocks.  You'll want to use
+ * nonblocking I/O, like the callback version of communicate().
  *
- * communicate() is a way to communicate to a child via its standard input,
- * standard output, and standard error.  It buffers everything in memory,
- * so it's not great for large amounts of data (or long-running processes),
- * but it insulates you from the deadlocks mentioned above.
+ * The string or IOBuf-based variants of communicate() are the simplest way
+ * to communicate with a child via its standard input, standard output, and
+ * standard error.  They buffer everything in memory, so they are not great
+ * for large amounts of data (or long-running processes), but they are much
+ * simpler than the callback version.
+ *
+ * == A note on thread-safety ==
+ *
+ * [1] "thread-safe" refers ONLY to the fact that Subprocess is very careful
+ * to fork in a way that does not cause grief in multithreaded programs.
+ *
+ * Caveat: If your system does not have the atomic pipe2 system call, it is
+ * not safe to concurrently call Subprocess from different threads.
+ * Therefore, it is best to have a single thread be responsible for spawning
+ * subprocesses.
+ *
+ * A particular instances of Subprocess is emphatically **not** thread-safe.
+ * If you need to simultaneously communicate via the pipes, and interact
+ * with the Subprocess state, your best bet is to:
+ *  - takeOwnershipOfPipes() to separate the pipe I/O from the subprocess.
+ *  - Only interact with the Subprocess from one thread at a time.
+ *
+ * The current implementation of communicate() cannot be safely interrupted.
+ * To do so correctly, one would need to use EventFD, or open a dedicated
+ * pipe to be messaged from a different thread -- in particular, kill() will
+ * not do, since a descendant may keep the pipes open indefinitely.
+ *
+ * So, once you call communicate(), you must wait for it to return, and not
+ * touch the pipes from other threads.  closeParentFd() is emphatically
+ * unsafe to call concurrently, and even sendSignal() is not a good idea.
+ * You can perhaps give the Subprocess's PID to a different thread before
+ * starting communicate(), and use that PID to send a signal without
+ * accessing the Subprocess object.  In that case, you will need a mutex
+ * that ensures you don't wait() before you sent said signal.  In a
+ * nutshell, don't do this.
+ *
+ * In fact, signals are inherently concurrency-unsafe on Unix: if you signal
+ * a PID, while another thread is in waitpid(), the signal may fire either
+ * before or after the process is reaped.  This means that your signal can,
+ * in pathological circumstances, be delivered to the wrong process (ouch!).
+ * To avoid this, you should only use non-blocking waits (i.e. poll()), and
+ * make sure to serialize your signals (i.e. kill()) with the waits --
+ * either wait & signal from the same thread, or use a mutex.
  */
 #ifndef FOLLY_SUBPROCESS_H_
 #define FOLLY_SUBPROCESS_H_
 #include <boost/operators.hpp>
 #include <boost/noncopyable.hpp>
 
+#include <folly/File.h>
 #include <folly/FileUtil.h>
 #include <folly/gen/String.h>
 #include <folly/io/IOBufQueue.h>
@@ -336,6 +376,67 @@ class Subprocess : private boost::noncopyable {
       const Options& options = Options(),
       const std::vector<std::string>* env = nullptr);
 
+  ////
+  //// The methods below only manipulate the process state, and do not
+  //// affect its communication pipes.
+  ////
+
+  /**
+   * Return the child's pid, or -1 if the child wasn't successfully spawned
+   * or has already been wait()ed upon.
+   */
+  pid_t pid() const;
+
+  /**
+   * Return the child's status (as per wait()) if the process has already
+   * been waited on, -1 if the process is still running, or -2 if the
+   * process hasn't been successfully started.  NOTE that this does not call
+   * waitpid() or Subprocess::poll(), but simply returns the status stored
+   * in the Subprocess object.
+   */
+  ProcessReturnCode returnCode() const { return returnCode_; }
+
+  /**
+   * Poll the child's status and return it, return -1 if the process
+   * is still running.  NOTE that it is illegal to call poll again after
+   * poll indicated that the process has terminated, or to call poll on a
+   * process that hasn't been successfully started (the constructor threw an
+   * exception).
+   */
+  ProcessReturnCode poll();
+
+  /**
+   * Poll the child's status.  If the process is still running, return false.
+   * Otherwise, return true if the process exited with status 0 (success),
+   * or throw CalledProcessError if the process exited with a non-zero status.
+   */
+  bool pollChecked();
+
+  /**
+   * Wait for the process to terminate and return its status.
+   * Similarly to poll, it is illegal to call wait after the process
+   * has already been reaped or if the process has not successfully started.
+   */
+  ProcessReturnCode wait();
+
+  /**
+   * Wait for the process to terminate, throw if unsuccessful.
+   */
+  void waitChecked();
+
+  /**
+   * Send a signal to the child.  Shortcuts for the commonly used Unix
+   * signals are below.
+   */
+  void sendSignal(int signal);
+  void terminate() { sendSignal(SIGTERM); }
+  void kill() { sendSignal(SIGKILL); }
+
+  ////
+  //// The methods below only affect the process's communication pipes, but
+  //// not its return code or state (they do not poll() or wait()).
+  ////
+
   /**
    * Communicate with the child until all pipes to/from the child are closed.
    *
@@ -370,6 +471,8 @@ class Subprocess : private boost::noncopyable {
   /**
    * Communicate with the child until all pipes to/from the child are closed.
    *
+   * == Semantics ==
+   *
    * readCallback(pfd, cfd) will be called whenever there's data available
    * on any pipe *from* the child (PIPE_OUT).  pfd is the file descriptor
    * in the parent (that you use to read from); cfd is the file descriptor
@@ -387,25 +490,50 @@ class Subprocess : private boost::noncopyable {
    * child, or make its writes fail with EPIPE, so you should generally
    * avoid returning true unless you've reached end-of-file.
    *
-   * NOTE that you MUST consume all data passed to readCallback (or return
-   * true to close the pipe).  Similarly, you MUST write to a writable pipe
-   * (or return true to close the pipe).  To do otherwise is an error that
-   * can result in a deadlock.  You must do this even for pipes you are not
+   * communicate() returns when all pipes to/from the child are closed; the
+   * child might stay alive after that, so you must still wait().
+   * Conversely, the child may quit long before its pipes are closed, since
+   * its descendants can keep them alive forever.
+   *
+   * Most users won't need to use this callback version; the simpler version
+   * of communicate (which buffers data in memory) will probably work fine.
+   *
+   * == Things you must get correct ==
+   *
+   * 1) You MUST consume all data passed to readCallback (or return true to
+   * close the pipe).  Similarly, you MUST write to a writable pipe (or
+   * return true to close the pipe).  To do otherwise is an error that can
+   * result in a deadlock.  You must do this even for pipes you are not
    * interested in.
    *
-   * Note that pfd is nonblocking, so be prepared for read() / write() to
-   * return -1 and set errno to EAGAIN (in which case you should return
-   * false).  Use readNoInt() from FileUtil.h to handle interrupted reads
-   * for you
+   * 2) pfd is nonblocking, so be prepared for read() / write() to return -1
+   * and set errno to EAGAIN (in which case you should return false).  Use
+   * readNoInt() from FileUtil.h to handle interrupted reads for you.
    *
-   * Note that communicate() returns when all pipes to/from the child are
-   * closed; the child might stay alive after that, so you must still wait().
+   * 3) Your callbacks MUST NOT call any of the Subprocess methods that
+   * manipulate the pipe FDs.  Check the docblocks, but, for example,
+   * neither closeParentFd (return true instead) nor takeOwnershipOfPipes
+   * are safe.  Stick to reading/writing from pfd, as appropriate.
    *
-   * Most users won't need to use this; the simpler version of communicate
-   * (which buffers data in memory) will probably work fine.
+   * == Good to know ==
    *
-   * See ReadLinesCallback for an easy way to consume the child's output
+   * 1) See ReadLinesCallback for an easy way to consume the child's output
    * streams line-by-line (or tokenized by another delimiter).
+   *
+   * 2) "Wait until the descendants close the pipes" is usually the behavior
+   * you want, since the descendants may have something to say even if the
+   * immediate child is dead.  If you need to be able to force-close all
+   * parent FDs, communicate() will NOT work for you.  Do it your own way by
+   * using takeOwnershipOfPipes().
+   *
+   * Why not? You can return "true" from your callbacks to sever active
+   * pipes, but inactive ones can remain open indefinitely.  It is
+   * impossible to safely close inactive pipes while another thread is
+   * blocked in communicate().  This is BY DESIGN.  Racing communicate()'s
+   * read/write callbacks can result in wrong I/O and data corruption.  This
+   * class would need internal synchronization and timeouts, a poor and
+   * expensive implementation choice, in order to make closeParentFd()
+   * thread-safe.
    */
   typedef std::function<bool(int, int)> FdCallback;
   void communicate(FdCallback readCallback, FdCallback writeCallback);
@@ -510,61 +638,39 @@ class Subprocess : private boost::noncopyable {
   }
 
   /**
-   * Enable notifications (callbacks) for one pipe to/from child. By default,
-   * all are enabled. Useful for "chatty" communication -- you want to disable
-   * write callbacks until you receive the expected message.
+   * communicate() callbacks can use this to temporarily enable/disable
+   * notifications (callbacks) for a pipe to/from the child.  By default,
+   * all are enabled.  Useful for "chatty" communication -- you want to
+   * disable write callbacks until you receive the expected message.
+   *
+   * Disabling a pipe does not free you from the requirement to consume all
+   * incoming data.  Failing to do so will easily create deadlock bugs.
+   *
+   * Throws if the childFd is not known.
    */
   void enableNotifications(int childFd, bool enabled);
 
   /**
-   * Are notifications for one pipe to/from child enabled?
+   * Are notifications for one pipe to/from child enabled?  Throws if the
+   * childFd is not known.
    */
   bool notificationsEnabled(int childFd) const;
 
-  /**
-   * Return the child's pid, or -1 if the child wasn't successfully spawned
-   * or has already been wait()ed upon.
-   */
-  pid_t pid() const;
-
-  /**
-   * Return the child's status (as per wait()) if the process has already
-   * been waited on, -1 if the process is still running, or -2 if the process
-   * hasn't been successfully started.  NOTE that this does not poll, but
-   * returns the status stored in the Subprocess object.
-   */
-  ProcessReturnCode returnCode() const { return returnCode_; }
-
-  /**
-   * Poll the child's status and return it, return -1 if the process
-   * is still running.  NOTE that it is illegal to call poll again after
-   * poll indicated that the process has terminated, or to call poll on a
-   * process that hasn't been successfully started (the constructor threw an
-   * exception).
-   */
-  ProcessReturnCode poll();
-
-  /**
-   * Poll the child's status.  If the process is still running, return false.
-   * Otherwise, return true if the process exited with status 0 (success),
-   * or throw CalledProcessError if the process exited with a non-zero status.
-   */
-  bool pollChecked();
-
-  /**
-   * Wait for the process to terminate and return its status.
-   * Similarly to poll, it is illegal to call wait after the process
-   * has already been reaped or if the process has not successfully started.
-   */
-  ProcessReturnCode wait();
+  ////
+  //// The following methods are meant for the cases when communicate() is
+  //// not suitable.  You should not need them when you call communicate(),
+  //// and, in fact, it is INHERENTLY UNSAFE to use closeParentFd() or
+  //// takeOwnershipOfPipes() from a communicate() callback.
+  ////
 
   /**
-   * Wait for the process to terminate, throw if unsuccessful.
+   * Close the parent file descriptor given a file descriptor in the child.
+   * DO NOT USE from communicate() callbacks; make them return true instead.
    */
-  void waitChecked();
+  void closeParentFd(int childFd);
 
   /**
-   * Set all pipes from / to child non-blocking.  communicate() does
+   * Set all pipes from / to child to be non-blocking.  communicate() does
    * this for you.
    */
   void setAllNonBlocking();
@@ -572,27 +678,33 @@ class Subprocess : private boost::noncopyable {
   /**
    * Get parent file descriptor corresponding to the given file descriptor
    * in the child.  Throws if childFd isn't a pipe (PIPE_IN / PIPE_OUT).
-   * Do not close() the return file descriptor; use closeParentFd, below.
+   * Do not close() the returned file descriptor; use closeParentFd, above.
    */
   int parentFd(int childFd) const {
-    return pipes_[findByChildFd(childFd)].parentFd;
+    return pipes_[findByChildFd(childFd)].pipe.fd();
   }
   int stdin() const { return parentFd(0); }
   int stdout() const { return parentFd(1); }
   int stderr() const { return parentFd(2); }
 
   /**
-   * Close the parent file descriptor given a file descriptor in the child.
-   */
-  void closeParentFd(int childFd);
-
-  /**
-   * Send a signal to the child.  Shortcuts for the commonly used Unix
-   * signals are below.
+   * The child's pipes are logically separate from the process metadata
+   * (they may even be kept alive by the child's descendants).  This call
+   * lets you manage the pipes' lifetime separetely from the lifetime of the
+   * child process.
+   *
+   * After this call, the Subprocess instance will have no knowledge of
+   * these pipes, and the caller assumes responsibility for managing their
+   * lifetimes.  Pro-tip: prefer to explicitly close() the pipes, since
+   * folly::File would otherwise silently suppress I/O errors.
+   *
+   * No, you may NOT call this from a communicate() callback.
    */
-  void sendSignal(int signal);
-  void terminate() { sendSignal(SIGTERM); }
-  void kill() { sendSignal(SIGKILL); }
+  struct ChildPipe {
+    int childFd;
+    folly::File pipe;  // Owns the parent FD
+  };
+  std::vector<ChildPipe> takeOwnershipOfPipes();
 
  private:
   static const int RV_RUNNING = ProcessReturnCode::RV_RUNNING;
@@ -629,34 +741,40 @@ class Subprocess : private boost::noncopyable {
    */
   void readChildErrorPipe(int pfd, const char* executable);
 
-  /**
-   * Close all file descriptors.
-   */
-  void closeAll();
+  // Returns an index into pipes_. Throws std::invalid_argument if not found.
+  size_t findByChildFd(const int childFd) const;
 
-  // return index in pipes_
-  int findByChildFd(int childFd) const;
 
   pid_t pid_;
   ProcessReturnCode returnCode_;
 
-  // The number of pipes between parent and child is assumed to be small,
-  // so we're happy with a vector here, even if it means linear erase.
-  // sorted by childFd
-  struct PipeInfo : private boost::totally_ordered<PipeInfo> {
-    int parentFd = -1;
-    int childFd = -1;
-    int direction = PIPE_IN;  // one of PIPE_IN / PIPE_OUT
-    bool enabled = true;
-
-    bool operator<(const PipeInfo& other) const {
+  /**
+   * Represents a pipe between this process, and the child process (or its
+   * descendant).  To interact with these pipes, you can use communicate(),
+   * or use parentFd() and related methods, or separate them from the
+   * Subprocess instance entirely via takeOwnershipOfPipes().
+   */
+  struct Pipe : private boost::totally_ordered<Pipe> {
+    folly::File pipe; // Our end of the pipe, wrapped in a File to auto-close.
+    int childFd = -1; // Identifies the pipe: what FD is this in the child?
+    int direction = PIPE_IN; // one of PIPE_IN / PIPE_OUT
+    bool enabled = true; // Are notifications enabled in communicate()?
+
+    bool operator<(const Pipe& other) const {
       return childFd < other.childFd;
     }
-    bool operator==(const PipeInfo& other) const {
+    bool operator==(const Pipe& other) const {
       return childFd == other.childFd;
     }
   };
-  std::vector<PipeInfo> pipes_;
+
+  // Populated at process start according to fdActions, empty after
+  // takeOwnershipOfPipes().  Sorted by childFd.  Can only have elements
+  // erased, but not inserted, after being populated.
+  //
+  // The number of pipes between parent and child is assumed to be small,
+  // so we're happy with a vector here, even if it means linear erase.
+  std::vector<Pipe> pipes_;
 };
 
 inline Subprocess::Options& Subprocess::Options::operator|=(
index 3ce96c118f45549862cdfd14d398e6376763f8b7..0137bed2aef86189439a3cbf8781dbc40ab7beb0 100644 (file)
@@ -444,3 +444,21 @@ TEST(CommunicateSubprocessTest, Chatty) {
     EXPECT_EQ(0, proc.wait().exitStatus());
   });
 }
+
+TEST(CommunicateSubprocessTest, TakeOwnershipOfPipes) {
+  std::vector<Subprocess::ChildPipe> pipes;
+  {
+    Subprocess proc(
+      "echo $'oh\\nmy\\ncat' | wc -l &", Subprocess::pipeStdout()
+    );
+    pipes = proc.takeOwnershipOfPipes();
+    proc.waitChecked();
+  }
+  EXPECT_EQ(1, pipes.size());
+  EXPECT_EQ(1, pipes[0].childFd);
+
+  char buf[10];
+  EXPECT_EQ(2, readFull(pipes[0].pipe.fd(), buf, 10));
+  buf[2] = 0;
+  EXPECT_EQ("3\n", std::string(buf));
+}