then-with-Executor
[folly.git] / folly / Subprocess.h
index 5c53a319073b8a1747f9806d5dd915aa2fece643..1c0cd0759625fc5b5f0867045b0bce684b7f8bf5 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -37,7 +37,7 @@
  *    // read from proc.stdout()
  *    proc.wait();
  *
- * A thread-safe version of popen() (type="w", to write from the child):
+ * A thread-safe version of popen() (type="w", to write to the child):
  *    Subprocess proc(cmd, Subprocess::pipeStdin());
  *    // write to proc.stdin()
  *    proc.wait();
 
 #include <sys/types.h>
 #include <signal.h>
+#if __APPLE__
+#include <sys/wait.h>
+#else
 #include <wait.h>
+#endif
 
 #include <exception>
 #include <vector>
 #include <boost/operators.hpp>
 #include <boost/noncopyable.hpp>
 
-#include "folly/io/IOBufQueue.h"
-#include "folly/MapUtil.h"
-#include "folly/Portability.h"
-#include "folly/Range.h"
+#include <folly/FileUtil.h>
+#include <folly/gen/String.h>
+#include <folly/io/IOBufQueue.h>
+#include <folly/MapUtil.h>
+#include <folly/Portability.h>
+#include <folly/Range.h>
 
 namespace folly {
 
@@ -144,10 +150,15 @@ class ProcessReturnCode {
   int rawStatus_;
 };
 
+/**
+ * Base exception thrown by the Subprocess methods.
+ */
+class SubprocessError : public std::exception {};
+
 /**
  * Exception thrown by *Checked methods of Subprocess.
  */
-class CalledProcessError : public std::exception {
+class CalledProcessError : public SubprocessError {
  public:
   explicit CalledProcessError(ProcessReturnCode rc);
   ~CalledProcessError() throw() { }
@@ -158,6 +169,21 @@ class CalledProcessError : public std::exception {
   std::string what_;
 };
 
+/**
+ * Exception thrown if the subprocess cannot be started.
+ */
+class SubprocessSpawnError : public SubprocessError {
+ public:
+  SubprocessSpawnError(const char* executable, int errCode, int errnoValue);
+  ~SubprocessSpawnError() throw() {}
+  const char* what() const throw() FOLLY_OVERRIDE { return what_.c_str(); }
+  int errnoValue() const { return errnoValue_; }
+
+ private:
+  int errnoValue_;
+  std::string what_;
+};
+
 /**
  * Subprocess.
  */
@@ -179,11 +205,7 @@ class Subprocess : private boost::noncopyable {
   class Options : private boost::orable<Options> {
     friend class Subprocess;
    public:
-    Options()
-      : closeOtherFds_(false),
-        usePath_(false),
-        parentDeathSignal_(0) {
-    }
+    Options() {}  // E.g. https://gcc.gnu.org/bugzilla/show_bug.cgi?id=58328
 
     /**
      * Change action for file descriptor fd.
@@ -204,19 +226,23 @@ class Subprocess : private boost::noncopyable {
     /**
      * Shortcut to change the action for standard input.
      */
-    Options& stdin(int action) { return fd(0, action); }
+    Options& stdin(int action) { return fd(STDIN_FILENO, action); }
 
     /**
      * Shortcut to change the action for standard output.
      */
-    Options& stdout(int action) { return fd(1, action); }
+    Options& stdout(int action) { return fd(STDOUT_FILENO, action); }
 
     /**
      * Shortcut to change the action for standard error.
      * Note that stderr(1) will redirect the standard error to the same
      * file descriptor as standard output; the equivalent of bash's "2>&1"
      */
-    Options& stderr(int action) { return fd(2, action); }
+    Options& stderr(int action) { return fd(STDERR_FILENO, action); }
+
+    Options& pipeStdin() { return fd(STDIN_FILENO, PIPE_IN); }
+    Options& pipeStdout() { return fd(STDOUT_FILENO, PIPE_OUT); }
+    Options& pipeStderr() { return fd(STDERR_FILENO, PIPE_OUT); }
 
     /**
      * Close all other fds (other than standard input, output, error,
@@ -237,6 +263,12 @@ class Subprocess : private boost::noncopyable {
      */
     Options& usePath() { usePath_ = true; return *this; }
 
+    /**
+     * Change the child's working directory, after the vfork.
+     */
+    Options& chdir(const std::string& dir) { childDir_ = dir; return *this; }
+
+#if __linux__
     /**
      * Child will receive a signal when the parent exits.
      */
@@ -244,6 +276,17 @@ class Subprocess : private boost::noncopyable {
       parentDeathSignal_ = sig;
       return *this;
     }
+#endif
+
+    /**
+     * Child will be made a process group leader when it starts. Upside: one
+     * can reliably all its kill non-daemonizing descendants.  Downside: the
+     * child will not receive Ctrl-C etc during interactive use.
+     */
+    Options& processGroupLeader() {
+      processGroupLeader_ = true;
+      return *this;
+    }
 
     /**
      * Helpful way to combine Options.
@@ -253,9 +296,13 @@ class Subprocess : private boost::noncopyable {
    private:
     typedef boost::container::flat_map<int, int> FdMap;
     FdMap fdActions_;
-    bool closeOtherFds_;
-    bool usePath_;
-    int parentDeathSignal_;
+    bool closeOtherFds_{false};
+    bool usePath_{false};
+    std::string childDir_;  // "" keeps the parent's working directory
+#if __linux__
+    int parentDeathSignal_{0};
+#endif
+    bool processGroupLeader_{false};
   };
 
   static Options pipeStdin() { return Options().stdin(PIPE); }
@@ -290,60 +337,35 @@ class Subprocess : private boost::noncopyable {
       const std::vector<std::string>* env = nullptr);
 
   /**
-   * Append all data, close the stdin (to-child) fd, and read all data,
-   * except that this is done in a safe manner to prevent deadlocking.
+   * Communicate with the child until all pipes to/from the child are closed.
    *
-   * If writeStdin() is given in flags, the process must have been opened with
-   * stdinFd=PIPE.
+   * The input buffer is written to the process' stdin pipe, and data is read
+   * from the stdout and stderr pipes.  Non-blocking I/O is performed on all
+   * pipes simultaneously to avoid deadlocks.
    *
-   * If readStdout() is given in flags, the first returned value will be the
-   * value read from the child's stdout; the child must have been opened with
-   * stdoutFd=PIPE.
+   * The stdin pipe will be closed after the full input buffer has been written.
+   * An error will be thrown if a non-empty input buffer is supplied but stdin
+   * was not configured as a pipe.
    *
-   * If readStderr() is given in flags, the second returned value will be the
-   * value read from the child's stderr; the child must have been opened with
-   * stderrFd=PIPE.
+   * Returns a pair of buffers containing the data read from stdout and stderr.
+   * If stdout or stderr is not a pipe, an empty IOBuf queue will be returned
+   * for the respective buffer.
    *
-   * Note that communicate() returns when all pipes to/from the child are
-   * closed; the child might stay alive after that, so you must still wait().
+   * Note that communicate() and communicateIOBuf() both return when all
+   * pipes to/from the child are closed; the child might stay alive after
+   * that, so you must still wait().
    *
-   * communicateIOBuf uses IOBufQueue for buffering (which has the advantage
-   * that it won't try to allocate all data at once).  communicate
-   * uses strings for simplicity.
+   * communicateIOBuf() uses IOBufQueue for buffering (which has the
+   * advantage that it won't try to allocate all data at once), but it does
+   * store the subprocess's entire output in memory before returning.
+   *
+   * communicate() uses strings for simplicity.
    */
-  class CommunicateFlags : private boost::orable<CommunicateFlags> {
-    friend class Subprocess;
-   public:
-    CommunicateFlags()
-      : writeStdin_(false), readStdout_(false), readStderr_(false) { }
-    CommunicateFlags& writeStdin() { writeStdin_ = true; return *this; }
-    CommunicateFlags& readStdout() { readStdout_ = true; return *this; }
-    CommunicateFlags& readStderr() { readStderr_ = true; return *this; }
-
-    CommunicateFlags& operator|=(const CommunicateFlags& other);
-   private:
-    bool writeStdin_;
-    bool readStdout_;
-    bool readStderr_;
-  };
-
-  static CommunicateFlags writeStdin() {
-    return CommunicateFlags().writeStdin();
-  }
-  static CommunicateFlags readStdout() {
-    return CommunicateFlags().readStdout();
-  }
-  static CommunicateFlags readStderr() {
-    return CommunicateFlags().readStderr();
-  }
-
   std::pair<IOBufQueue, IOBufQueue> communicateIOBuf(
-      const CommunicateFlags& flags = readStdout(),
-      IOBufQueue data = IOBufQueue());
+      IOBufQueue input = IOBufQueue());
 
   std::pair<std::string, std::string> communicate(
-      const CommunicateFlags& flags = readStdout(),
-      StringPiece data = StringPiece());
+      StringPiece input = StringPiece());
 
   /**
    * Communicate with the child until all pipes to/from the child are closed.
@@ -360,26 +382,145 @@ class Subprocess : private boost::noncopyable {
    * identifying the stream; 0 = child's standard input, etc)
    *
    * The read and write callbacks must read from / write to pfd and return
-   * false during normal operation or true at end-of-file;
-   * communicate() will then close the pipe.  Note that pfd is
-   * nonblocking, so be prepared for read() / write() to return -1 and
-   * set errno to EAGAIN (in which case you should return false).
+   * false during normal operation.  Return true to tell communicate() to
+   * close the pipe.  For readCallback, this might send SIGPIPE to the
+   * child, or make its writes fail with EPIPE, so you should generally
+   * avoid returning true unless you've reached end-of-file.
    *
    * NOTE that you MUST consume all data passed to readCallback (or return
-   * true, which will close the pipe, possibly sending SIGPIPE to the child or
-   * making its writes fail with EPIPE), and you MUST write to a writable pipe
-   * (or return true, which will close the pipe).  To do otherwise is an
-   * error.  You must do this even for pipes you are not interested in.
+   * true to close the pipe).  Similarly, you MUST write to a writable pipe
+   * (or return true to close the pipe).  To do otherwise is an error that
+   * can result in a deadlock.  You must do this even for pipes you are not
+   * interested in.
+   *
+   * Note that pfd is nonblocking, so be prepared for read() / write() to
+   * return -1 and set errno to EAGAIN (in which case you should return
+   * false).  Use readNoInt() from FileUtil.h to handle interrupted reads
+   * for you
    *
    * Note that communicate() returns when all pipes to/from the child are
    * closed; the child might stay alive after that, so you must still wait().
    *
    * Most users won't need to use this; the simpler version of communicate
    * (which buffers data in memory) will probably work fine.
+   *
+   * See ReadLinesCallback for an easy way to consume the child's output
+   * streams line-by-line (or tokenized by another delimiter).
    */
   typedef std::function<bool(int, int)> FdCallback;
   void communicate(FdCallback readCallback, FdCallback writeCallback);
 
+  /**
+   * A readCallback for Subprocess::communicate() that helps you consume
+   * lines (or other delimited pieces) from your subprocess's file
+   * descriptors.  Use the readLinesCallback() helper to get template
+   * deduction.  For example:
+   *
+   *   auto read_cb = Subprocess::readLinesCallback(
+   *     [](int fd, folly::StringPiece s) {
+   *       std::cout << fd << " said: " << s;
+   *       return false;  // Keep reading from the child
+   *     }
+   *   );
+   *   subprocess.communicate(
+   *     // ReadLinesCallback contains StreamSplitter contains IOBuf, making
+   *     // it noncopyable, whereas std::function must be copyable.  So, we
+   *     // keep the callback in a local, and instead pass a reference.
+   *     std::ref(read_cb),
+   *     [](int pdf, int cfd){ return true; }  // Don't write to the child
+   *   );
+   *
+   * If a file line exceeds maxLineLength, your callback will get some
+   * initial chunks of maxLineLength with no trailing delimiters.  The final
+   * chunk of a line is delimiter-terminated iff the delimiter was present
+   * in the input.  In particular, the last line in a file always lacks a
+   * delimiter -- so if a file ends on a delimiter, the final line is empty.
+   *
+   * Like a regular communicate() callback, your fdLineCb() normally returns
+   * false.  It may return true to tell Subprocess to close the underlying
+   * file descriptor.  The child process may then receive SIGPIPE or get
+   * EPIPE errors on writes.
+   */
+  template <class Callback>
+  class ReadLinesCallback {
+   private:
+    // Binds an FD to the client-provided FD+line callback
+    struct StreamSplitterCallback {
+      StreamSplitterCallback(Callback& cb, int fd) : cb_(cb), fd_(fd) { }
+      // The return value semantics are inverted vs StreamSplitter
+      bool operator()(StringPiece s) { return !cb_(fd_, s); }
+      Callback& cb_;
+      int fd_;
+    };
+    typedef gen::StreamSplitter<StreamSplitterCallback> LineSplitter;
+   public:
+    explicit ReadLinesCallback(
+      Callback&& fdLineCb,
+      uint64_t maxLineLength = 0,  // No line length limit by default
+      char delimiter = '\n',
+      uint64_t bufSize = 1024
+    ) : fdLineCb_(std::move(fdLineCb)),
+        maxLineLength_(maxLineLength),
+        delimiter_(delimiter),
+        bufSize_(bufSize) {}
+
+    bool operator()(int pfd, int cfd) {
+      // Make a splitter for this cfd if it doesn't already exist
+      auto it = fdToSplitter_.find(cfd);
+      auto& splitter = (it != fdToSplitter_.end()) ? it->second
+        : fdToSplitter_.emplace(cfd, LineSplitter(
+            delimiter_, StreamSplitterCallback(fdLineCb_, cfd), maxLineLength_
+          )).first->second;
+      // Read as much as we can from this FD
+      char buf[bufSize_];
+      while (true) {
+        ssize_t ret = readNoInt(pfd, buf, bufSize_);
+        if (ret == -1 && errno == EAGAIN) {  // No more data for now
+          return false;
+        }
+        if (ret == 0) {  // Reached end-of-file
+          splitter.flush();  // Ignore return since the file is over anyway
+          return true;
+        }
+        if (!splitter(StringPiece(buf, ret))) {
+          return true;  // The callback told us to stop
+        }
+      }
+    }
+
+   private:
+    Callback fdLineCb_;
+    const uint64_t maxLineLength_;
+    const char delimiter_;
+    const uint64_t bufSize_;
+    // We lazily make splitters for all cfds that get used.
+    std::unordered_map<int, LineSplitter> fdToSplitter_;
+  };
+
+  // Helper to enable template deduction
+  template <class Callback>
+  static ReadLinesCallback<Callback> readLinesCallback(
+      Callback&& fdLineCb,
+      uint64_t maxLineLength = 0,  // No line length limit by default
+      char delimiter = '\n',
+      uint64_t bufSize = 1024) {
+    return ReadLinesCallback<Callback>(
+      std::move(fdLineCb), maxLineLength, delimiter, bufSize
+    );
+  }
+
+  /**
+   * Enable notifications (callbacks) for one pipe to/from child. By default,
+   * all are enabled. Useful for "chatty" communication -- you want to disable
+   * write callbacks until you receive the expected message.
+   */
+  void enableNotifications(int childFd, bool enabled);
+
+  /**
+   * Are notifications for one pipe to/from child enabled?
+   */
+  bool notificationsEnabled(int childFd) const;
+
   /**
    * Return the child's pid, or -1 if the child wasn't successfully spawned
    * or has already been wait()ed upon.
@@ -457,16 +598,36 @@ class Subprocess : private boost::noncopyable {
   static const int RV_RUNNING = ProcessReturnCode::RV_RUNNING;
   static const int RV_NOT_STARTED = ProcessReturnCode::RV_NOT_STARTED;
 
+  // spawn() sets up a pipe to read errors from the child,
+  // then calls spawnInternal() to do the bulk of the work.  Once
+  // spawnInternal() returns it reads the error pipe to see if the child
+  // encountered any errors.
   void spawn(
       std::unique_ptr<const char*[]> argv,
       const char* executable,
       const Options& options,
       const std::vector<std::string>* env);
+  void spawnInternal(
+      std::unique_ptr<const char*[]> argv,
+      const char* executable,
+      Options& options,
+      const std::vector<std::string>* env,
+      int errFd);
 
-  // Action to run in child.
+  // Actions to run in child.
   // Note that this runs after vfork(), so tread lightly.
-  void runChild(const char* executable, char** argv, char** env,
-                const Options& options) const;
+  // Returns 0 on success, or an errno value on failure.
+  int prepareChild(const Options& options,
+                   const sigset_t* sigmask,
+                   const char* childDir) const;
+  int runChild(const char* executable, char** argv, char** env,
+               const Options& options) const;
+
+  /**
+   * Read from the error pipe, and throw SubprocessSpawnError if the child
+   * failed before calling exec().
+   */
+  void readChildErrorPipe(int pfd, const char* executable);
 
   /**
    * Close all file descriptors.
@@ -483,9 +644,11 @@ class Subprocess : private boost::noncopyable {
   // so we're happy with a vector here, even if it means linear erase.
   // sorted by childFd
   struct PipeInfo : private boost::totally_ordered<PipeInfo> {
-    int parentFd;
-    int childFd;
-    int direction;  // one of PIPE_IN / PIPE_OUT
+    int parentFd = -1;
+    int childFd = -1;
+    int direction = PIPE_IN;  // one of PIPE_IN / PIPE_OUT
+    bool enabled = true;
+
     bool operator<(const PipeInfo& other) const {
       return childFd < other.childFd;
     }
@@ -505,19 +668,10 @@ inline Subprocess::Options& Subprocess::Options::operator|=(
   }
   closeOtherFds_ |= other.closeOtherFds_;
   usePath_ |= other.usePath_;
-  return *this;
-}
-
-inline Subprocess::CommunicateFlags& Subprocess::CommunicateFlags::operator|=(
-    const Subprocess::CommunicateFlags& other) {
-  if (this == &other) return *this;
-  writeStdin_ |= other.writeStdin_;
-  readStdout_ |= other.readStdout_;
-  readStderr_ |= other.readStderr_;
+  processGroupLeader_ |= other.processGroupLeader_;
   return *this;
 }
 
 }  // namespace folly
 
 #endif /* FOLLY_SUBPROCESS_H_ */
-