From: Alexey Spiridonov Date: Thu, 8 May 2014 00:43:51 +0000 (-0700) Subject: Factor out string stream re-splitting as StreamSplitter X-Git-Tag: v0.22.0~444 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=dbcb08a7704f91f03e21db723990592f0df7f5a2;p=folly.git Factor out string stream re-splitting as StreamSplitter Summary: This way I can reuse it in Subprocess. It also makes it easy to make a bunch of other convenient tokenization routines (e.g. delimiter-preserving folly::gen tokenizers, file tokenizers, etc, etc). Test Plan: fbconfig folly/gen/test && fbmake runtests Reviewed By: tjackson@fb.com Subscribers: vkatich, tjackson FB internal diff: D1317973 --- diff --git a/folly/gen/String-inl.h b/folly/gen/String-inl.h index 3f7b7c11..5a522ce5 100644 --- a/folly/gen/String-inl.h +++ b/folly/gen/String-inl.h @@ -20,60 +20,197 @@ #include #include -#include namespace folly { namespace gen { namespace detail { -inline bool splitPrefix(StringPiece& in, - StringPiece& prefix, - StringPiece delimiter) { - auto p = in.find(delimiter); - if (p != std::string::npos) { - prefix.assign(in.data(), in.data() + p); - in.advance(p + delimiter.size()); - return true; +/** + * Finds the first occurrence of delimiter in "in", advances "in" past the + * delimiter. Populates "prefix" with the consumed bytes, including the + * delimiter. + * + * Returns the number of trailing bytes of "prefix" that make up the + * delimiter, or 0 if the delimiter was not found. + */ +inline size_t splitPrefix(StringPiece& in, + StringPiece& prefix, + char delimiter) { + size_t found = in.find(delimiter); + if (found != StringPiece::npos) { + ++found; + prefix.assign(in.data(), in.data() + found); + in.advance(found); + return 1; } prefix.clear(); - return false; + return 0; } /** - * Split by any of the EOL terms: \r, \n, or \r\n. + * As above, but supports multibyte delimiters. */ -inline bool splitPrefix(StringPiece& in, - StringPiece& prefix, - MixedNewlines) { - auto newline = "\r\n"; - auto p = in.find_first_of(newline); - if (p != std::string::npos) { - prefix.assign(in.data(), in.data() + p); - in.advance(p); - if (!in.removePrefix(newline)) { - in.advance(1); - } - return true; +inline size_t splitPrefix(StringPiece& in, + StringPiece& prefix, + StringPiece delimiter) { + auto found = in.find(delimiter); + if (found != StringPiece::npos) { + found += delimiter.size(); + prefix.assign(in.data(), in.data() + found); + in.advance(found); + return delimiter.size(); } prefix.clear(); - return false; + return 0; } -inline bool splitPrefix(StringPiece& in, StringPiece& prefix, char delimiter) { - auto p = static_cast(memchr(in.data(), delimiter, in.size())); - if (p) { - prefix.assign(in.data(), p); - in.assign(p + 1, in.end()); - return true; +/** + * As above, but splits by any of the EOL terms: \r, \n, or \r\n. + */ +inline size_t splitPrefix(StringPiece& in, + StringPiece& prefix, + MixedNewlines) { + const auto kCRLF = "\r\n"; + const size_t kLenCRLF = 2; + + auto p = in.find_first_of(kCRLF); + if (p != std::string::npos) { + const auto in_start = in.data(); + auto delim_len = 1; + in.advance(p); + // Either remove an MS-DOS CR-LF 2-byte newline, or eat 1 byte at a time. + if (in.removePrefix(kCRLF)) { + delim_len = kLenCRLF; + } else { + in.advance(delim_len); + } + prefix.assign(in_start, in.data()); + return delim_len; } prefix.clear(); - return false; + return 0; } inline const char* ch(const unsigned char* p) { return reinterpret_cast(p); } +// Chop s into pieces of at most maxLength, feed them to cb +template +bool consumeFixedSizeChunks(Callback& cb, StringPiece& s, uint64_t maxLength) { + while (!s.empty()) { + auto num_to_add = s.size(); + if (maxLength) { + num_to_add = std::min(num_to_add, maxLength); + } + if (!cb(StringPiece(s.begin(), num_to_add))) { + return false; + } + s.advance(num_to_add); + } + return true; +} + +// Consumes all of buffer, plus n chars from s. +template +bool consumeBufferPlus(Callback& cb, IOBuf& buf, StringPiece& s, uint64_t n) { + buf.reserve(0, n); + memcpy(buf.writableTail(), s.data(), n); + buf.append(n); + s.advance(n); + if (!cb(StringPiece(detail::ch(buf.data()), buf.length()))) { + return false; + } + buf.clear(); + return true; +} + +} // namespace detail + +template +bool StreamSplitter::flush() { + CHECK(maxLength_ == 0 || buffer_.length() < maxLength_); + if (!pieceCb_(StringPiece(detail::ch(buffer_.data()), buffer_.length()))) { + return false; + } + // We are ready to handle another stream now. + buffer_.clear(); + return true; +} + +template +bool StreamSplitter::operator()(StringPiece in) { + StringPiece prefix; + // NB This code assumes a 1-byte delimiter. It's not too hard to support + // multibyte delimiters, just remember that maxLength_ chunks can end up + // falling in the middle of a delimiter. + bool found = detail::splitPrefix(in, prefix, delimiter_); + if (buffer_.length() != 0) { + if (found) { + uint64_t num_to_add = prefix.size(); + if (maxLength_) { + CHECK(buffer_.length() < maxLength_); + // Consume as much of prefix as possible without exceeding maxLength_ + num_to_add = std::min(maxLength_ - buffer_.length(), num_to_add); + } + + // Append part of the prefix to the buffer, and send it to the callback + if (!detail::consumeBufferPlus(pieceCb_, buffer_, prefix, num_to_add)) { + return false; + } + + if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) { + return false; + } + + found = detail::splitPrefix(in, prefix, delimiter_); + // Post-conditions: + // - we consumed all of buffer_ and all of the first prefix. + // - found, in, and prefix reflect the second delimiter_ search + } else if (maxLength_ && buffer_.length() + in.size() >= maxLength_) { + // Send all of buffer_, plus a bit of in, to the callback + if (!detail::consumeBufferPlus( + pieceCb_, buffer_, in, maxLength_ - buffer_.length())) { + return false; + } + // Post-conditions: + // - we consumed all of buffer, and the minimal # of bytes from in + // - found is false + } // Otherwise: found is false & we cannot invoke the callback this turn + } + // Post-condition: buffer_ is nonempty only if found is false **and** + // len(buffer + in) < maxLength_. + + // Send lines to callback directly from input (no buffer) + while (found) { // Buffer guaranteed to be empty + if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) { + return false; + } + found = detail::splitPrefix(in, prefix, delimiter_); + } + + // No more delimiters left; consume 'in' until it is shorter than maxLength_ + if (maxLength_) { + while (in.size() >= maxLength_) { // Buffer is guaranteed to be empty + if (!pieceCb_(StringPiece(in.begin(), maxLength_))) { + return false; + } + in.advance(maxLength_); + } + } + + if (!in.empty()) { // Buffer may be nonempty + // Incomplete line left, append to buffer + buffer_.reserve(0, in.size()); + memcpy(buffer_.writableTail(), in.data(), in.size()); + buffer_.append(in.size()); + } + CHECK(maxLength_ == 0 || buffer_.length() < maxLength_); + return true; +} + +namespace detail { + class StringResplitter : public Operator { char delimiter_; public: @@ -89,58 +226,23 @@ class StringResplitter : public Operator { template bool apply(Body&& body) const { - std::unique_ptr buffer; - - auto fn = [&](StringPiece in) -> bool { - StringPiece prefix; - bool found = splitPrefix(in, prefix, this->delimiter_); - if (found && buffer && buffer->length() != 0) { - // Append to end of buffer, return line - if (!prefix.empty()) { - buffer->reserve(0, prefix.size()); - memcpy(buffer->writableTail(), prefix.data(), prefix.size()); - buffer->append(prefix.size()); - } - if (!body(StringPiece(ch(buffer->data()), buffer->length()))) { - return false; - } - buffer->clear(); - found = splitPrefix(in, prefix, this->delimiter_); - } - // Buffer is empty, return lines directly from input (no buffer) - while (found) { - if (!body(prefix)) { - return false; - } - found = splitPrefix(in, prefix, this->delimiter_); - } - if (!in.empty()) { - // Incomplete line left, append to buffer - if (!buffer) { - // Arbitrarily assume that we have half a line and get enough - // room for twice that. - constexpr size_t kDefaultLineSize = 256; - buffer = IOBuf::create(std::max(kDefaultLineSize, 2 * in.size())); - } - buffer->reserve(0, in.size()); - memcpy(buffer->writableTail(), in.data(), in.size()); - buffer->append(in.size()); - } - return true; - }; - - // Iterate - if (!source_.apply(std::move(fn))) { + auto splitter = + streamSplitter(this->delimiter_, [this, &body](StringPiece s) { + // The stream ended with a delimiter; our contract is to swallow + // the final empty piece. + if (s.empty()) { + return false; + } + if (s.back() != this->delimiter_) { + return body(s); + } + s.pop_back(); // Remove the 1-character delimiter + return body(s); + }); + if (!source_.apply(splitter)) { return false; } - - // Incomplete last line - if (buffer && buffer->length() != 0) { - if (!body(StringPiece(ch(buffer->data()), buffer->length()))) { - return false; - } - } - return true; + return splitter.flush(); } static constexpr bool infinite = Source::infinite; @@ -176,7 +278,8 @@ class SplitStringSource bool apply(Body&& body) const { StringPiece rest(source_); StringPiece prefix; - while (splitPrefix(rest, prefix, this->delimiter_)) { + while (size_t delim_len = splitPrefix(rest, prefix, this->delimiter_)) { + prefix.subtract(delim_len); // Remove the delimiter if (!body(prefix)) { return false; } diff --git a/folly/gen/String.h b/folly/gen/String.h index b6d56fb4..af304f0e 100644 --- a/folly/gen/String.h +++ b/folly/gen/String.h @@ -19,6 +19,7 @@ #include #include +#include namespace folly { namespace gen { @@ -48,6 +49,8 @@ class SplitTo; * * resplit() behaves as if the input strings were concatenated into one long * string and then split. + * + * Equivalently, you can use StreamSplitter outside of a folly::gen setting. */ // make this a template so we don't require StringResplitter to be complete // until use @@ -168,6 +171,77 @@ eachToPair(StringPiece delim) { to(delim))); } +/** + * Outputs exactly the same bytes as the input stream, in different chunks. + * A chunk boundary occurs after each delimiter, or, if maxLength is + * non-zero, after maxLength bytes, whichever comes first. Your callback + * can return false to stop consuming the stream at any time. + * + * The splitter buffers the last incomplete chunk, so you must call flush() + * to consume the piece of the stream after the final delimiter. This piece + * may be empty. After a flush(), the splitter can be re-used for a new + * stream. + * + * operator() and flush() return false iff your callback returns false. The + * internal buffer is not flushed, so reusing such a splitter will have + * indeterminate results. Same goes if your callback throws. Feel free to + * fix these corner cases if needed. + * + * Tips: + * - Create via streamSplitter() to take advantage of template deduction. + * - If your callback needs an end-of-stream signal, test for "no + * trailing delimiter **and** shorter than maxLength". + * - You can fine-tune the initial capacity of the internal IOBuf. + */ +template +class StreamSplitter { + + public: + StreamSplitter(char delimiter, + Callback&& pieceCb, + uint64_t maxLength = 0, + uint64_t initialCapacity = 0) + : buffer_(IOBuf::CREATE, initialCapacity), + delimiter_(delimiter), + maxLength_(maxLength), + pieceCb_(std::move(pieceCb)) {} + + /** + * Consume any incomplete last line (may be empty). Do this before + * destroying the StreamSplitter, or you will fail to consume part of the + * input. + * + * After flush() you may proceed to consume the next stream via (). + * + * Returns false if the callback wants no more data, true otherwise. + * A return value of false means that this splitter must no longer be used. + */ + bool flush(); + + /** + * Consume another piece of the input stream. + * + * Returns false only if your callback refuses to consume more data by + * returning false (true otherwise). A return value of false means that + * this splitter must no longer be used. + */ + bool operator()(StringPiece in); + + private: + // Holds the current "incomplete" chunk so that chunks can span calls to () + IOBuf buffer_; + char delimiter_; + uint64_t maxLength_; // The callback never gets more chars than this + Callback pieceCb_; +}; + +template // Helper to enable template deduction +StreamSplitter streamSplitter(char delimiter, + Callback&& pieceCb, + uint64_t capacity = 0) { + return StreamSplitter(delimiter, std::move(pieceCb), capacity); +} + } // namespace gen } // namespace folly diff --git a/folly/gen/test/StringTest.cpp b/folly/gen/test/StringTest.cpp index 96d61a00..d6bb7122 100644 --- a/folly/gen/test/StringTest.cpp +++ b/folly/gen/test/StringTest.cpp @@ -104,13 +104,15 @@ TEST(StringGen, Split) { TEST(StringGen, SplitByNewLine) { auto collect = eachTo() | as(); { - auto pieces = lines("hello\n\n world\r\n goodbye\r meow") | collect; - EXPECT_EQ(5, pieces.size()); + auto pieces = lines("hello\n\n world\r\n goodbye\r me\n\row") | collect; + EXPECT_EQ(7, pieces.size()); EXPECT_EQ("hello", pieces[0]); EXPECT_EQ("", pieces[1]); EXPECT_EQ(" world", pieces[2]); EXPECT_EQ(" goodbye", pieces[3]); - EXPECT_EQ(" meow", pieces[4]); + EXPECT_EQ(" me", pieces[4]); + EXPECT_EQ("", pieces[5]); + EXPECT_EQ("ow", pieces[6]); } } @@ -258,6 +260,50 @@ TEST(StringGen, Resplit) { } } +void checkResplitMaxLength(vector ins, + char delim, + uint64_t maxLength, + vector outs) { + vector pieces; + auto splitter = streamSplitter(delim, [&pieces](StringPiece s) { + pieces.push_back(string(s.begin(), s.end())); + return true; + }, maxLength); + for (const auto& in : ins) { + splitter(in); + } + splitter.flush(); + + EXPECT_EQ(outs.size(), pieces.size()); + for (int i = 0; i < outs.size(); ++i) { + EXPECT_EQ(outs[i], pieces[i]); + } + + // Also check the concatenated input against the same output + if (ins.size() > 1) { + checkResplitMaxLength({folly::join("", ins)}, delim, maxLength, outs); + } +} + +TEST(StringGen, ResplitMaxLength) { + checkResplitMaxLength( + {"hel", "lo,", ", world", ", goodbye, m", "ew"}, ',', 5, + {"hello", ",", ",", " worl", "d,", " good", "bye,", " mew"} + ); + // " meow" cannot be "end of stream", since it's maxLength long + checkResplitMaxLength( + {"hel", "lo,", ", world", ", goodbye, m", "eow"}, ',', 5, + {"hello", ",", ",", " worl", "d,", " good", "bye,", " meow", ""} + ); + checkResplitMaxLength( + {"||", "", "", "", "|a|b", "cdefghijklmn", "|opqrst", + "uvwx|y|||", "z", "0123456789", "|", ""}, '|', 2, + {"|", "|", "|", "a|", "bc", "de", "fg", "hi", "jk", "lm", "n|", "op", "qr", + "st", "uv", "wx", "|", "y|", "|", "|", "z0", "12", "34", "56", "78", "9|", + ""} + ); +} + template void runUnsplitSuite(F fn) { fn("hello, world");