A generic line-reading callback for communicate()
authorAlexey Spiridonov <lesha@fb.com>
Fri, 25 Apr 2014 22:21:49 +0000 (15:21 -0700)
committerChip Turner <chip@fb.com>
Fri, 25 Jul 2014 16:06:04 +0000 (09:06 -0700)
Summary: There are a couple of places where this behavior is useful, and it's not 100% trivial to implement it from scratch. Adding it to Folly to save people code & bugs.

Test Plan: unit tests

Reviewed By: tudorb@fb.com

Subscribers: tjackson, folly@lists, tudorb

FB internal diff: D1297506

folly/Subprocess.cpp
folly/Subprocess.h

index db45c9e4da7d68f7f74b1715c1a853b9bc8cd070..4cd2c7c1088b0021d2244477fd03134c2cf29e82 100644 (file)
@@ -39,7 +39,6 @@
 
 #include <folly/Conv.h>
 #include <folly/Exception.h>
-#include <folly/FileUtil.h>
 #include <folly/ScopeGuard.h>
 #include <folly/String.h>
 #include <folly/io/Cursor.h>
index 5acf6235aa0bfc585eb291f0d5f91999a2cde0d6..a0a5854f3610ba54c92f3adfa0e2a93d07774a7e 100644 (file)
@@ -70,6 +70,8 @@
 #include <boost/operators.hpp>
 #include <boost/noncopyable.hpp>
 
+#include <folly/FileUtil.h>
+#include <folly/gen/String.h>
 #include <folly/io/IOBufQueue.h>
 #include <folly/MapUtil.h>
 #include <folly/Portability.h>
@@ -372,26 +374,133 @@ class Subprocess : private boost::noncopyable {
    * identifying the stream; 0 = child's standard input, etc)
    *
    * The read and write callbacks must read from / write to pfd and return
-   * false during normal operation or true at end-of-file;
-   * communicate() will then close the pipe.  Note that pfd is
-   * nonblocking, so be prepared for read() / write() to return -1 and
-   * set errno to EAGAIN (in which case you should return false).
+   * false during normal operation.  Return true to tell communicate() to
+   * close the pipe.  For readCallback, this might send SIGPIPE to the
+   * child, or make its writes fail with EPIPE, so you should generally
+   * avoid returning true unless you've reached end-of-file.
    *
    * NOTE that you MUST consume all data passed to readCallback (or return
-   * true, which will close the pipe, possibly sending SIGPIPE to the child or
-   * making its writes fail with EPIPE), and you MUST write to a writable pipe
-   * (or return true, which will close the pipe).  To do otherwise is an
-   * error.  You must do this even for pipes you are not interested in.
+   * true to close the pipe).  Similarly, you MUST write to a writable pipe
+   * (or return true to close the pipe).  To do otherwise is an error that
+   * can result in a deadlock.  You must do this even for pipes you are not
+   * interested in.
+   *
+   * Note that pfd is nonblocking, so be prepared for read() / write() to
+   * return -1 and set errno to EAGAIN (in which case you should return
+   * false).  Use readNoInt() from FileUtil.h to handle interrupted reads
+   * for you
    *
    * Note that communicate() returns when all pipes to/from the child are
    * closed; the child might stay alive after that, so you must still wait().
    *
    * Most users won't need to use this; the simpler version of communicate
    * (which buffers data in memory) will probably work fine.
+   *
+   * See ReadLinesCallback for an easy way to consume the child's output
+   * streams line-by-line (or tokenized by another delimiter).
    */
   typedef std::function<bool(int, int)> FdCallback;
   void communicate(FdCallback readCallback, FdCallback writeCallback);
 
+  /**
+   * A readCallback for Subprocess::communicate() that helps you consume
+   * lines (or other delimited pieces) from your subprocess's file
+   * descriptors.  Use the readLinesCallback() helper to get template
+   * deduction.  For example:
+   *
+   *   auto read_cb = Subprocess::readLinesCallback(
+   *     [](int fd, folly::StringPiece s) {
+   *       std::cout << fd << " said: " << s;
+   *       return false;  // Keep reading from the child
+   *     }
+   *   );
+   *   subprocess.communicate(
+   *     // ReadLinesCallback contains StreamSplitter contains IOBuf, making
+   *     // it noncopyable, whereas std::function must be copyable.  So, we
+   *     // keep the callback in a local, and instead pass a reference.
+   *     std::ref(read_cb),
+   *     [](int pdf, int cfd){ return true; }  // Don't write to the child
+   *   );
+   *
+   * If a file line exceeds maxLineLength, your callback will get some
+   * initial chunks of maxLineLength with no trailing delimiters.  The final
+   * chunk of a line is delimiter-terminated iff the delimiter was present
+   * in the input.  In particular, the last line in a file always lacks a
+   * delimiter -- so if a file ends on a delimiter, the final line is empty.
+   *
+   * Like a regular communicate() callback, your fdLineCb() normally returns
+   * false.  It may return true to tell Subprocess to close the underlying
+   * file descriptor.  The child process may then receive SIGPIPE or get
+   * EPIPE errors on writes.
+   */
+  template <class Callback>
+  class ReadLinesCallback {
+   private:
+    // Binds an FD to the client-provided FD+line callback
+    struct StreamSplitterCallback {
+      StreamSplitterCallback(Callback& cb, int fd) : cb_(cb), fd_(fd) { }
+      // The return value semantics are inverted vs StreamSplitter
+      bool operator()(StringPiece s) { return !cb_(fd_, s); }
+      Callback& cb_;
+      int fd_;
+    };
+    typedef gen::StreamSplitter<StreamSplitterCallback> LineSplitter;
+   public:
+    explicit ReadLinesCallback(
+      Callback&& fdLineCb,
+      uint64_t maxLineLength = 0,  // No line length limit by default
+      char delimiter = '\n',
+      uint64_t bufSize = 1024
+    ) : fdLineCb_(std::move(fdLineCb)),
+        maxLineLength_(maxLineLength),
+        delimiter_(delimiter),
+        bufSize_(bufSize) {}
+
+    bool operator()(int pfd, int cfd) {
+      // Make a splitter for this cfd if it doesn't already exist
+      auto it = fdToSplitter_.find(cfd);
+      auto& splitter = (it != fdToSplitter_.end()) ? it->second
+        : fdToSplitter_.emplace(cfd, LineSplitter(
+            delimiter_, StreamSplitterCallback(fdLineCb_, cfd), maxLineLength_
+          )).first->second;
+      // Read as much as we can from this FD
+      char buf[bufSize_];
+      while (true) {
+        ssize_t ret = readNoInt(pfd, buf, bufSize_);
+        if (ret == -1 && errno == EAGAIN) {  // No more data for now
+          return false;
+        }
+        if (ret == 0) {  // Reached end-of-file
+          splitter.flush();  // Ignore return since the file is over anyway
+          return true;
+        }
+        if (!splitter(StringPiece(buf, ret))) {
+          return true;  // The callback told us to stop
+        }
+      }
+    }
+
+   private:
+    Callback fdLineCb_;
+    const uint64_t maxLineLength_;
+    const char delimiter_;
+    const uint64_t bufSize_;
+    // We lazily make splitters for all cfds that get used.
+    std::unordered_map<int, LineSplitter> fdToSplitter_;
+  };
+
+  // Helper to enable template deduction
+  template <class Callback>
+  static ReadLinesCallback<Callback> readLinesCallback(
+      Callback&& fdLineCb,
+      uint64_t maxLineLength = 0,  // No line length limit by default
+      char delimiter = '\n',
+      uint64_t bufSize = 1024) {
+    return ReadLinesCallback<Callback>(
+      std::move(fdLineCb), maxLineLength, delimiter, bufSize
+    );
+  }
+
   /**
    * Enable notifications (callbacks) for one pipe to/from child. By default,
    * all are enabled. Useful for "chatty" communication -- you want to disable