Factor out string stream re-splitting as StreamSplitter
authorAlexey Spiridonov <lesha@fb.com>
Thu, 8 May 2014 00:43:51 +0000 (17:43 -0700)
committerChip Turner <chip@fb.com>
Fri, 25 Jul 2014 16:06:01 +0000 (09:06 -0700)
Summary: This way I can reuse it in Subprocess. It also makes it easy to make a bunch of other convenient tokenization routines (e.g. delimiter-preserving folly::gen tokenizers, file tokenizers, etc, etc).

Test Plan: fbconfig folly/gen/test && fbmake runtests

Reviewed By: tjackson@fb.com

Subscribers: vkatich, tjackson

FB internal diff: D1317973

folly/gen/String-inl.h
folly/gen/String.h
folly/gen/test/StringTest.cpp

index 3f7b7c11b18942c72806a92be36f5c31e69b7b71..5a522ce5e4867dcd6c97a7cd3a2c7fc0ac6742bf 100644 (file)
 
 #include <folly/Conv.h>
 #include <folly/String.h>
-#include <folly/io/IOBuf.h>
 
 namespace folly {
 namespace gen {
 namespace detail {
 
-inline bool splitPrefix(StringPiece& in,
-                        StringPiece& prefix,
-                        StringPiece delimiter) {
-  auto p = in.find(delimiter);
-  if (p != std::string::npos) {
-    prefix.assign(in.data(), in.data() + p);
-    in.advance(p + delimiter.size());
-    return true;
+/**
+ * Finds the first occurrence of delimiter in "in", advances "in" past the
+ * delimiter.  Populates "prefix" with the consumed bytes, including the
+ * delimiter.
+ *
+ * Returns the number of trailing bytes of "prefix" that make up the
+ * delimiter, or 0 if the delimiter was not found.
+ */
+inline size_t splitPrefix(StringPiece& in,
+                          StringPiece& prefix,
+                          char delimiter) {
+  size_t found = in.find(delimiter);
+  if (found != StringPiece::npos) {
+    ++found;
+    prefix.assign(in.data(), in.data() + found);
+    in.advance(found);
+    return 1;
   }
   prefix.clear();
-  return false;
+  return 0;
 }
 
 /**
- * Split by any of the EOL terms: \r, \n, or \r\n.
+ * As above, but supports multibyte delimiters.
  */
-inline bool splitPrefix(StringPiece& in,
-                        StringPiece& prefix,
-                        MixedNewlines) {
-  auto newline = "\r\n";
-  auto p = in.find_first_of(newline);
-  if (p != std::string::npos) {
-    prefix.assign(in.data(), in.data() + p);
-    in.advance(p);
-    if (!in.removePrefix(newline)) {
-      in.advance(1);
-    }
-    return true;
+inline size_t splitPrefix(StringPiece& in,
+                          StringPiece& prefix,
+                          StringPiece delimiter) {
+  auto found = in.find(delimiter);
+  if (found != StringPiece::npos) {
+    found += delimiter.size();
+    prefix.assign(in.data(), in.data() + found);
+    in.advance(found);
+    return delimiter.size();
   }
   prefix.clear();
-  return false;
+  return 0;
 }
 
-inline bool splitPrefix(StringPiece& in, StringPiece& prefix, char delimiter) {
-  auto p = static_cast<const char*>(memchr(in.data(), delimiter, in.size()));
-  if (p) {
-    prefix.assign(in.data(), p);
-    in.assign(p + 1, in.end());
-    return true;
+/**
+ * As above, but splits by any of the EOL terms: \r, \n, or \r\n.
+ */
+inline size_t splitPrefix(StringPiece& in,
+                         StringPiece& prefix,
+                         MixedNewlines) {
+  const auto kCRLF = "\r\n";
+  const size_t kLenCRLF = 2;
+
+  auto p = in.find_first_of(kCRLF);
+  if (p != std::string::npos) {
+    const auto in_start = in.data();
+    auto delim_len = 1;
+    in.advance(p);
+    // Either remove an MS-DOS CR-LF 2-byte newline, or eat 1 byte at a time.
+    if (in.removePrefix(kCRLF)) {
+      delim_len = kLenCRLF;
+    } else {
+      in.advance(delim_len);
+    }
+    prefix.assign(in_start, in.data());
+    return delim_len;
   }
   prefix.clear();
-  return false;
+  return 0;
 }
 
 inline const char* ch(const unsigned char* p) {
   return reinterpret_cast<const char*>(p);
 }
 
+// Chop s into pieces of at most maxLength, feed them to cb
+template <class Callback>
+bool consumeFixedSizeChunks(Callback& cb, StringPiece& s, uint64_t maxLength) {
+  while (!s.empty()) {
+    auto num_to_add = s.size();
+    if (maxLength) {
+      num_to_add = std::min(num_to_add, maxLength);
+    }
+    if (!cb(StringPiece(s.begin(), num_to_add))) {
+      return false;
+    }
+    s.advance(num_to_add);
+  }
+  return true;
+}
+
+// Consumes all of buffer, plus n chars from s.
+template <class Callback>
+bool consumeBufferPlus(Callback& cb, IOBuf& buf, StringPiece& s, uint64_t n) {
+  buf.reserve(0, n);
+  memcpy(buf.writableTail(), s.data(), n);
+  buf.append(n);
+  s.advance(n);
+  if (!cb(StringPiece(detail::ch(buf.data()), buf.length()))) {
+    return false;
+  }
+  buf.clear();
+  return true;
+}
+
+} // namespace detail
+
+template <class Callback>
+bool StreamSplitter<Callback>::flush() {
+  CHECK(maxLength_ == 0 || buffer_.length() < maxLength_);
+  if (!pieceCb_(StringPiece(detail::ch(buffer_.data()), buffer_.length()))) {
+    return false;
+  }
+  // We are ready to handle another stream now.
+  buffer_.clear();
+  return true;
+}
+
+template <class Callback>
+bool StreamSplitter<Callback>::operator()(StringPiece in) {
+  StringPiece prefix;
+  // NB This code assumes a 1-byte delimiter. It's not too hard to support
+  // multibyte delimiters, just remember that maxLength_ chunks can end up
+  // falling in the middle of a delimiter.
+  bool found = detail::splitPrefix(in, prefix, delimiter_);
+  if (buffer_.length() != 0) {
+    if (found) {
+      uint64_t num_to_add = prefix.size();
+      if (maxLength_) {
+        CHECK(buffer_.length() < maxLength_);
+        // Consume as much of prefix as possible without exceeding maxLength_
+        num_to_add = std::min(maxLength_ - buffer_.length(), num_to_add);
+      }
+
+      // Append part of the prefix to the buffer, and send it to the callback
+      if (!detail::consumeBufferPlus(pieceCb_, buffer_, prefix, num_to_add)) {
+        return false;
+      }
+
+      if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) {
+        return false;
+      }
+
+      found = detail::splitPrefix(in, prefix, delimiter_);
+      // Post-conditions:
+      //  - we consumed all of buffer_ and all of the first prefix.
+      //  - found, in, and prefix reflect the second delimiter_ search
+    } else if (maxLength_ && buffer_.length() + in.size() >= maxLength_) {
+      // Send all of buffer_, plus a bit of in, to the callback
+      if (!detail::consumeBufferPlus(
+               pieceCb_, buffer_, in, maxLength_ - buffer_.length())) {
+        return false;
+      }
+      // Post-conditions:
+      //  - we consumed all of buffer, and the minimal # of bytes from in
+      //  - found is false
+    } // Otherwise: found is false & we cannot invoke the callback this turn
+  }
+  // Post-condition: buffer_ is nonempty only if found is false **and**
+  // len(buffer + in) < maxLength_.
+
+  // Send lines to callback directly from input (no buffer)
+  while (found) {  // Buffer guaranteed to be empty
+    if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) {
+      return false;
+    }
+    found = detail::splitPrefix(in, prefix, delimiter_);
+  }
+
+  // No more delimiters left; consume 'in' until it is shorter than maxLength_
+  if (maxLength_) {
+    while (in.size() >= maxLength_) {  // Buffer is guaranteed to be empty
+      if (!pieceCb_(StringPiece(in.begin(), maxLength_))) {
+        return false;
+      }
+      in.advance(maxLength_);
+    }
+  }
+
+  if (!in.empty()) {  // Buffer may be nonempty
+    // Incomplete line left, append to buffer
+    buffer_.reserve(0, in.size());
+    memcpy(buffer_.writableTail(), in.data(), in.size());
+    buffer_.append(in.size());
+  }
+  CHECK(maxLength_ == 0 || buffer_.length() < maxLength_);
+  return true;
+}
+
+namespace detail {
+
 class StringResplitter : public Operator<StringResplitter> {
   char delimiter_;
  public:
@@ -89,58 +226,23 @@ class StringResplitter : public Operator<StringResplitter> {
 
     template <class Body>
     bool apply(Body&& body) const {
-      std::unique_ptr<IOBuf> buffer;
-
-      auto fn = [&](StringPiece in) -> bool {
-        StringPiece prefix;
-        bool found = splitPrefix(in, prefix, this->delimiter_);
-        if (found && buffer && buffer->length() != 0) {
-          // Append to end of buffer, return line
-          if (!prefix.empty()) {
-            buffer->reserve(0, prefix.size());
-            memcpy(buffer->writableTail(), prefix.data(), prefix.size());
-            buffer->append(prefix.size());
-          }
-          if (!body(StringPiece(ch(buffer->data()), buffer->length()))) {
-            return false;
-          }
-          buffer->clear();
-          found = splitPrefix(in, prefix, this->delimiter_);
-        }
-        // Buffer is empty, return lines directly from input (no buffer)
-        while (found) {
-          if (!body(prefix)) {
-            return false;
-          }
-          found = splitPrefix(in, prefix, this->delimiter_);
-        }
-        if (!in.empty()) {
-          // Incomplete line left, append to buffer
-          if (!buffer) {
-            // Arbitrarily assume that we have half a line and get enough
-            // room for twice that.
-            constexpr size_t kDefaultLineSize = 256;
-            buffer = IOBuf::create(std::max(kDefaultLineSize, 2 * in.size()));
-          }
-          buffer->reserve(0, in.size());
-          memcpy(buffer->writableTail(), in.data(), in.size());
-          buffer->append(in.size());
-        }
-        return true;
-      };
-
-      // Iterate
-      if (!source_.apply(std::move(fn))) {
+      auto splitter =
+          streamSplitter(this->delimiter_, [this, &body](StringPiece s) {
+            // The stream ended with a delimiter; our contract is to swallow
+            // the final empty piece.
+            if (s.empty()) {
+              return false;
+            }
+            if (s.back() != this->delimiter_) {
+              return body(s);
+            }
+            s.pop_back();  // Remove the 1-character delimiter
+            return body(s);
+          });
+      if (!source_.apply(splitter)) {
         return false;
       }
-
-      // Incomplete last line
-      if (buffer && buffer->length() != 0) {
-        if (!body(StringPiece(ch(buffer->data()), buffer->length()))) {
-          return false;
-        }
-      }
-      return true;
+      return splitter.flush();
     }
 
     static constexpr bool infinite = Source::infinite;
@@ -176,7 +278,8 @@ class SplitStringSource
   bool apply(Body&& body) const {
     StringPiece rest(source_);
     StringPiece prefix;
-    while (splitPrefix(rest, prefix, this->delimiter_)) {
+    while (size_t delim_len = splitPrefix(rest, prefix, this->delimiter_)) {
+      prefix.subtract(delim_len);  // Remove the delimiter
       if (!body(prefix)) {
         return false;
       }
index b6d56fb4fab315ff26105a3da325d59c1ae4106c..af304f0e6b879ab1e080be095bb21bcbc736f1d2 100644 (file)
@@ -19,6 +19,7 @@
 
 #include <folly/Range.h>
 #include <folly/gen/Base.h>
+#include <folly/io/IOBuf.h>
 
 namespace folly {
 namespace gen {
@@ -48,6 +49,8 @@ class SplitTo;
  *
  * resplit() behaves as if the input strings were concatenated into one long
  * string and then split.
+ *
+ * Equivalently, you can use StreamSplitter outside of a folly::gen setting.
  */
 // make this a template so we don't require StringResplitter to be complete
 // until use
@@ -168,6 +171,77 @@ eachToPair(StringPiece delim) {
       to<fbstring>(delim)));
 }
 
+/**
+ * Outputs exactly the same bytes as the input stream, in different chunks.
+ * A chunk boundary occurs after each delimiter, or, if maxLength is
+ * non-zero, after maxLength bytes, whichever comes first.  Your callback
+ * can return false to stop consuming the stream at any time.
+ *
+ * The splitter buffers the last incomplete chunk, so you must call flush()
+ * to consume the piece of the stream after the final delimiter.  This piece
+ * may be empty.  After a flush(), the splitter can be re-used for a new
+ * stream.
+ *
+ * operator() and flush() return false iff your callback returns false. The
+ * internal buffer is not flushed, so reusing such a splitter will have
+ * indeterminate results.  Same goes if your callback throws.  Feel free to
+ * fix these corner cases if needed.
+ *
+ * Tips:
+ *  - Create via streamSplitter() to take advantage of template deduction.
+ *  - If your callback needs an end-of-stream signal, test for "no
+ *    trailing delimiter **and** shorter than maxLength".
+ *  - You can fine-tune the initial capacity of the internal IOBuf.
+ */
+template <class Callback>
+class StreamSplitter {
+
+ public:
+  StreamSplitter(char delimiter,
+                 Callback&& pieceCb,
+                 uint64_t maxLength = 0,
+                 uint64_t initialCapacity = 0)
+      : buffer_(IOBuf::CREATE, initialCapacity),
+        delimiter_(delimiter),
+        maxLength_(maxLength),
+        pieceCb_(std::move(pieceCb)) {}
+
+  /**
+   * Consume any incomplete last line (may be empty). Do this before
+   * destroying the StreamSplitter, or you will fail to consume part of the
+   * input.
+   *
+   * After flush() you may proceed to consume the next stream via ().
+   *
+   * Returns false if the callback wants no more data, true otherwise.
+   * A return value of false means that this splitter must no longer be used.
+   */
+  bool flush();
+
+  /**
+   * Consume another piece of the input stream.
+   *
+   * Returns false only if your callback refuses to consume more data by
+   * returning false (true otherwise).  A return value of false means that
+   * this splitter must no longer be used.
+   */
+  bool operator()(StringPiece in);
+
+ private:
+  // Holds the current "incomplete" chunk so that chunks can span calls to ()
+  IOBuf buffer_;
+  char delimiter_;
+  uint64_t maxLength_;  // The callback never gets more chars than this
+  Callback pieceCb_;
+};
+
+template <class Callback>  // Helper to enable template deduction
+StreamSplitter<Callback> streamSplitter(char delimiter,
+                                        Callback&& pieceCb,
+                                        uint64_t capacity = 0) {
+  return StreamSplitter<Callback>(delimiter, std::move(pieceCb), capacity);
+}
+
 }  // namespace gen
 }  // namespace folly
 
index 96d61a0086082c7d6da53517af413f9f81ea6979..d6bb7122e6aec85fd463d9ad54ab3b2b013e7d07 100644 (file)
@@ -104,13 +104,15 @@ TEST(StringGen, Split) {
 TEST(StringGen, SplitByNewLine) {
   auto collect = eachTo<std::string>() | as<vector>();
   {
-    auto pieces = lines("hello\n\n world\r\n goodbye\r meow") | collect;
-    EXPECT_EQ(5, pieces.size());
+    auto pieces = lines("hello\n\n world\r\n goodbye\r me\n\row") | collect;
+    EXPECT_EQ(7, pieces.size());
     EXPECT_EQ("hello", pieces[0]);
     EXPECT_EQ("", pieces[1]);
     EXPECT_EQ(" world", pieces[2]);
     EXPECT_EQ(" goodbye", pieces[3]);
-    EXPECT_EQ(" meow", pieces[4]);
+    EXPECT_EQ(" me", pieces[4]);
+    EXPECT_EQ("", pieces[5]);
+    EXPECT_EQ("ow", pieces[6]);
   }
 }
 
@@ -258,6 +260,50 @@ TEST(StringGen, Resplit) {
   }
 }
 
+void checkResplitMaxLength(vector<string> ins,
+                           char delim,
+                           uint64_t maxLength,
+                           vector<string> outs) {
+  vector<std::string> pieces;
+  auto splitter = streamSplitter(delim, [&pieces](StringPiece s) {
+    pieces.push_back(string(s.begin(), s.end()));
+    return true;
+  }, maxLength);
+  for (const auto& in : ins) {
+    splitter(in);
+  }
+  splitter.flush();
+
+  EXPECT_EQ(outs.size(), pieces.size());
+  for (int i = 0; i < outs.size(); ++i) {
+    EXPECT_EQ(outs[i], pieces[i]);
+  }
+
+  // Also check the concatenated input against the same output
+  if (ins.size() > 1) {
+    checkResplitMaxLength({folly::join("", ins)}, delim, maxLength, outs);
+  }
+}
+
+TEST(StringGen, ResplitMaxLength) {
+  checkResplitMaxLength(
+    {"hel", "lo,", ", world", ", goodbye, m", "ew"}, ',', 5,
+    {"hello", ",", ",", " worl", "d,", " good", "bye,", " mew"}
+  );
+  // " meow" cannot be "end of stream", since it's maxLength long
+  checkResplitMaxLength(
+    {"hel", "lo,", ", world", ", goodbye, m", "eow"}, ',', 5,
+    {"hello", ",", ",", " worl", "d,", " good", "bye,", " meow", ""}
+  );
+  checkResplitMaxLength(
+    {"||", "", "", "", "|a|b", "cdefghijklmn", "|opqrst",
+     "uvwx|y|||", "z", "0123456789", "|", ""}, '|', 2,
+    {"|", "|", "|", "a|", "bc", "de", "fg", "hi", "jk", "lm", "n|", "op", "qr",
+     "st", "uv", "wx", "|", "y|", "|", "|", "z0", "12", "34", "56", "78", "9|",
+     ""}
+  );
+}
+
 template<typename F>
 void runUnsplitSuite(F fn) {
   fn("hello, world");