#include <folly/Conv.h>
#include <folly/String.h>
-#include <folly/io/IOBuf.h>
namespace folly {
namespace gen {
namespace detail {
-inline bool splitPrefix(StringPiece& in,
- StringPiece& prefix,
- StringPiece delimiter) {
- auto p = in.find(delimiter);
- if (p != std::string::npos) {
- prefix.assign(in.data(), in.data() + p);
- in.advance(p + delimiter.size());
- return true;
+/**
+ * Finds the first occurrence of delimiter in "in", advances "in" past the
+ * delimiter. Populates "prefix" with the consumed bytes, including the
+ * delimiter.
+ *
+ * Returns the number of trailing bytes of "prefix" that make up the
+ * delimiter, or 0 if the delimiter was not found.
+ */
+inline size_t splitPrefix(StringPiece& in,
+ StringPiece& prefix,
+ char delimiter) {
+ size_t found = in.find(delimiter);
+ if (found != StringPiece::npos) {
+ ++found;
+ prefix.assign(in.data(), in.data() + found);
+ in.advance(found);
+ return 1;
}
prefix.clear();
- return false;
+ return 0;
}
/**
- * Split by any of the EOL terms: \r, \n, or \r\n.
+ * As above, but supports multibyte delimiters.
*/
-inline bool splitPrefix(StringPiece& in,
- StringPiece& prefix,
- MixedNewlines) {
- auto newline = "\r\n";
- auto p = in.find_first_of(newline);
- if (p != std::string::npos) {
- prefix.assign(in.data(), in.data() + p);
- in.advance(p);
- if (!in.removePrefix(newline)) {
- in.advance(1);
- }
- return true;
+inline size_t splitPrefix(StringPiece& in,
+ StringPiece& prefix,
+ StringPiece delimiter) {
+ auto found = in.find(delimiter);
+ if (found != StringPiece::npos) {
+ found += delimiter.size();
+ prefix.assign(in.data(), in.data() + found);
+ in.advance(found);
+ return delimiter.size();
}
prefix.clear();
- return false;
+ return 0;
}
-inline bool splitPrefix(StringPiece& in, StringPiece& prefix, char delimiter) {
- auto p = static_cast<const char*>(memchr(in.data(), delimiter, in.size()));
- if (p) {
- prefix.assign(in.data(), p);
- in.assign(p + 1, in.end());
- return true;
+/**
+ * As above, but splits by any of the EOL terms: \r, \n, or \r\n.
+ */
+inline size_t splitPrefix(StringPiece& in,
+ StringPiece& prefix,
+ MixedNewlines) {
+ const auto kCRLF = "\r\n";
+ const size_t kLenCRLF = 2;
+
+ auto p = in.find_first_of(kCRLF);
+ if (p != std::string::npos) {
+ const auto in_start = in.data();
+ auto delim_len = 1;
+ in.advance(p);
+ // Either remove an MS-DOS CR-LF 2-byte newline, or eat 1 byte at a time.
+ if (in.removePrefix(kCRLF)) {
+ delim_len = kLenCRLF;
+ } else {
+ in.advance(delim_len);
+ }
+ prefix.assign(in_start, in.data());
+ return delim_len;
}
prefix.clear();
- return false;
+ return 0;
}
inline const char* ch(const unsigned char* p) {
return reinterpret_cast<const char*>(p);
}
+// Chop s into pieces of at most maxLength, feed them to cb
+template <class Callback>
+bool consumeFixedSizeChunks(Callback& cb, StringPiece& s, uint64_t maxLength) {
+ while (!s.empty()) {
+ auto num_to_add = s.size();
+ if (maxLength) {
+ num_to_add = std::min(num_to_add, maxLength);
+ }
+ if (!cb(StringPiece(s.begin(), num_to_add))) {
+ return false;
+ }
+ s.advance(num_to_add);
+ }
+ return true;
+}
+
+// Consumes all of buffer, plus n chars from s.
+template <class Callback>
+bool consumeBufferPlus(Callback& cb, IOBuf& buf, StringPiece& s, uint64_t n) {
+ buf.reserve(0, n);
+ memcpy(buf.writableTail(), s.data(), n);
+ buf.append(n);
+ s.advance(n);
+ if (!cb(StringPiece(detail::ch(buf.data()), buf.length()))) {
+ return false;
+ }
+ buf.clear();
+ return true;
+}
+
+} // namespace detail
+
+template <class Callback>
+bool StreamSplitter<Callback>::flush() {
+ CHECK(maxLength_ == 0 || buffer_.length() < maxLength_);
+ if (!pieceCb_(StringPiece(detail::ch(buffer_.data()), buffer_.length()))) {
+ return false;
+ }
+ // We are ready to handle another stream now.
+ buffer_.clear();
+ return true;
+}
+
+template <class Callback>
+bool StreamSplitter<Callback>::operator()(StringPiece in) {
+ StringPiece prefix;
+ // NB This code assumes a 1-byte delimiter. It's not too hard to support
+ // multibyte delimiters, just remember that maxLength_ chunks can end up
+ // falling in the middle of a delimiter.
+ bool found = detail::splitPrefix(in, prefix, delimiter_);
+ if (buffer_.length() != 0) {
+ if (found) {
+ uint64_t num_to_add = prefix.size();
+ if (maxLength_) {
+ CHECK(buffer_.length() < maxLength_);
+ // Consume as much of prefix as possible without exceeding maxLength_
+ num_to_add = std::min(maxLength_ - buffer_.length(), num_to_add);
+ }
+
+ // Append part of the prefix to the buffer, and send it to the callback
+ if (!detail::consumeBufferPlus(pieceCb_, buffer_, prefix, num_to_add)) {
+ return false;
+ }
+
+ if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) {
+ return false;
+ }
+
+ found = detail::splitPrefix(in, prefix, delimiter_);
+ // Post-conditions:
+ // - we consumed all of buffer_ and all of the first prefix.
+ // - found, in, and prefix reflect the second delimiter_ search
+ } else if (maxLength_ && buffer_.length() + in.size() >= maxLength_) {
+ // Send all of buffer_, plus a bit of in, to the callback
+ if (!detail::consumeBufferPlus(
+ pieceCb_, buffer_, in, maxLength_ - buffer_.length())) {
+ return false;
+ }
+ // Post-conditions:
+ // - we consumed all of buffer, and the minimal # of bytes from in
+ // - found is false
+ } // Otherwise: found is false & we cannot invoke the callback this turn
+ }
+ // Post-condition: buffer_ is nonempty only if found is false **and**
+ // len(buffer + in) < maxLength_.
+
+ // Send lines to callback directly from input (no buffer)
+ while (found) { // Buffer guaranteed to be empty
+ if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) {
+ return false;
+ }
+ found = detail::splitPrefix(in, prefix, delimiter_);
+ }
+
+ // No more delimiters left; consume 'in' until it is shorter than maxLength_
+ if (maxLength_) {
+ while (in.size() >= maxLength_) { // Buffer is guaranteed to be empty
+ if (!pieceCb_(StringPiece(in.begin(), maxLength_))) {
+ return false;
+ }
+ in.advance(maxLength_);
+ }
+ }
+
+ if (!in.empty()) { // Buffer may be nonempty
+ // Incomplete line left, append to buffer
+ buffer_.reserve(0, in.size());
+ memcpy(buffer_.writableTail(), in.data(), in.size());
+ buffer_.append(in.size());
+ }
+ CHECK(maxLength_ == 0 || buffer_.length() < maxLength_);
+ return true;
+}
+
+namespace detail {
+
class StringResplitter : public Operator<StringResplitter> {
char delimiter_;
public:
template <class Body>
bool apply(Body&& body) const {
- std::unique_ptr<IOBuf> buffer;
-
- auto fn = [&](StringPiece in) -> bool {
- StringPiece prefix;
- bool found = splitPrefix(in, prefix, this->delimiter_);
- if (found && buffer && buffer->length() != 0) {
- // Append to end of buffer, return line
- if (!prefix.empty()) {
- buffer->reserve(0, prefix.size());
- memcpy(buffer->writableTail(), prefix.data(), prefix.size());
- buffer->append(prefix.size());
- }
- if (!body(StringPiece(ch(buffer->data()), buffer->length()))) {
- return false;
- }
- buffer->clear();
- found = splitPrefix(in, prefix, this->delimiter_);
- }
- // Buffer is empty, return lines directly from input (no buffer)
- while (found) {
- if (!body(prefix)) {
- return false;
- }
- found = splitPrefix(in, prefix, this->delimiter_);
- }
- if (!in.empty()) {
- // Incomplete line left, append to buffer
- if (!buffer) {
- // Arbitrarily assume that we have half a line and get enough
- // room for twice that.
- constexpr size_t kDefaultLineSize = 256;
- buffer = IOBuf::create(std::max(kDefaultLineSize, 2 * in.size()));
- }
- buffer->reserve(0, in.size());
- memcpy(buffer->writableTail(), in.data(), in.size());
- buffer->append(in.size());
- }
- return true;
- };
-
- // Iterate
- if (!source_.apply(std::move(fn))) {
+ auto splitter =
+ streamSplitter(this->delimiter_, [this, &body](StringPiece s) {
+ // The stream ended with a delimiter; our contract is to swallow
+ // the final empty piece.
+ if (s.empty()) {
+ return false;
+ }
+ if (s.back() != this->delimiter_) {
+ return body(s);
+ }
+ s.pop_back(); // Remove the 1-character delimiter
+ return body(s);
+ });
+ if (!source_.apply(splitter)) {
return false;
}
-
- // Incomplete last line
- if (buffer && buffer->length() != 0) {
- if (!body(StringPiece(ch(buffer->data()), buffer->length()))) {
- return false;
- }
- }
- return true;
+ return splitter.flush();
}
static constexpr bool infinite = Source::infinite;
bool apply(Body&& body) const {
StringPiece rest(source_);
StringPiece prefix;
- while (splitPrefix(rest, prefix, this->delimiter_)) {
+ while (size_t delim_len = splitPrefix(rest, prefix, this->delimiter_)) {
+ prefix.subtract(delim_len); // Remove the delimiter
if (!body(prefix)) {
return false;
}