Stream operations: file access, iteration, splitting.
authorTudor Bosman <tudorb@fb.com>
Wed, 2 May 2012 23:42:35 +0000 (16:42 -0700)
committerJordan DeLong <jdelong@fb.com>
Mon, 17 Sep 2012 01:26:19 +0000 (18:26 -0700)
Summary: Intended to complement and replace strings::byLine.

Test Plan: stream_test

Reviewed By: delong.j@fb.com

FB internal diff: D463341

folly/Range.h
folly/experimental/io/Stream-inl.h [new file with mode: 0644]
folly/experimental/io/Stream.cpp [new file with mode: 0644]
folly/experimental/io/Stream.h [new file with mode: 0644]
folly/experimental/io/test/StreamTest.cpp [new file with mode: 0644]

index 65b7441d0b53055ab5fd2ffb7db4341c62496e6c..ac316bc53cb8b557ef34c7f8aec525aa8686a9ab 100644 (file)
@@ -187,7 +187,8 @@ public:
 
   // Allow implicit conversion from Range<const char*> (aka StringPiece) to
   // Range<const unsigned char*> (aka ByteRange), as they're both frequently
-  // used to represent ranges of bytes.
+  // used to represent ranges of bytes.  Allow explicit conversion in the other
+  // direction.
   template <class OtherIter, typename std::enable_if<
       (std::is_same<Iter, const unsigned char*>::value &&
        std::is_same<OtherIter, const char*>::value), int>::type = 0>
@@ -196,6 +197,14 @@ public:
       e_(reinterpret_cast<const unsigned char*>(other.end())) {
   }
 
+  template <class OtherIter, typename std::enable_if<
+      (std::is_same<Iter, const char*>::value &&
+       std::is_same<OtherIter, const unsigned char*>::value), int>::type = 0>
+  explicit Range(const Range<OtherIter>& other)
+    : b_(reinterpret_cast<const char*>(other.begin())),
+      e_(reinterpret_cast<const char*>(other.end())) {
+  }
+
   void clear() {
     b_ = Iter();
     e_ = Iter();
diff --git a/folly/experimental/io/Stream-inl.h b/folly/experimental/io/Stream-inl.h
new file mode 100644 (file)
index 0000000..d4a7f3f
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_STREAM_H_
+#error This file may only be included from Stream.h
+#endif
+
+#include <string.h>
+
+#include <glog/logging.h>
+
+namespace folly {
+
+template <class Stream>
+InputByteStreamSplitter<Stream>::InputByteStreamSplitter(
+    char delimiter, Stream stream)
+  : done_(false),
+    delimiter_(delimiter),
+    stream_(std::move(stream)) {
+}
+
+template <class Stream>
+bool InputByteStreamSplitter<Stream>::operator()(ByteRange& chunk) {
+  DCHECK_EQ(buffer_->length(), 0);
+  chunk.clear();
+  if (rest_.empty()) {
+    if (done_) {
+      return false;
+    } else if (!stream_(rest_)) {
+      done_ = true;
+      return false;
+    }
+  }
+
+  auto p = static_cast<const unsigned char*>(memchr(rest_.data(), delimiter_,
+                                                    rest_.size()));
+  if (p) {
+    chunk.assign(rest_.data(), p);
+    rest_.assign(p + 1, rest_.end());
+    return true;
+  }
+
+  // Incomplete line read, copy to buffer
+  if (!buffer_) {
+    static const size_t kDefaultLineSize = 256;
+    // Arbitrarily assume that we have half of a line in rest_, and
+    // get enough room for twice that.
+    buffer_ = IOBuf::create(std::max(kDefaultLineSize, 2 * rest_.size()));
+  } else {
+    buffer_->reserve(0, rest_.size());
+  }
+  memcpy(buffer_->writableTail(), rest_.data(), rest_.size());
+  buffer_->append(rest_.size());
+
+  while (stream_(rest_)) {
+    auto p = static_cast<const unsigned char*>(
+        memchr(rest_.data(), delimiter_, rest_.size()));
+    if (p) {
+      // Copy everything up to the delimiter and return it
+      size_t n = p - rest_.data();
+      buffer_->reserve(0, n);
+      memcpy(buffer_->writableTail(), rest_.data(), n);
+      buffer_->append(n);
+      chunk.reset(buffer_->data(), buffer_->length());
+      buffer_->trimStart(buffer_->length());
+      rest_.assign(p + 1, rest_.end());
+      return true;
+    }
+
+    // Nope, copy the entire chunk that we read
+    buffer_->reserve(0, rest_.size());
+    memcpy(buffer_->writableTail(), rest_.data(), rest_.size());
+    buffer_->append(rest_.size());
+  }
+
+  // Incomplete last line
+  done_ = true;
+  rest_.clear();
+  chunk.reset(buffer_->data(), buffer_->length());
+  buffer_->trimStart(buffer_->length());
+  return true;
+}
+
+}  // namespace folly
+
diff --git a/folly/experimental/io/Stream.cpp b/folly/experimental/io/Stream.cpp
new file mode 100644 (file)
index 0000000..2f519dc
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/io/Stream.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <stdexcept>
+#include <system_error>
+
+#include "folly/String.h"
+
+namespace folly {
+
+FileInputByteStream::FileInputByteStream(int fd, bool ownsFd, size_t bufferSize)
+  : fd_(fd),
+    ownsFd_(ownsFd),
+    buffer_(IOBuf::create(bufferSize)) {
+}
+
+FileInputByteStream::FileInputByteStream(int fd, bool ownsFd,
+                                 std::unique_ptr<IOBuf>&& buffer)
+  : fd_(fd),
+    ownsFd_(ownsFd),
+    buffer_(std::move(buffer)) {
+  buffer_->clear();
+}
+
+bool FileInputByteStream::operator()(ByteRange& chunk) {
+  ssize_t n = ::read(fd_, buffer_->writableTail(), buffer_->capacity());
+  if (n == -1) {
+    throw std::system_error(errno, std::system_category(), "read failed");
+  }
+  chunk.reset(buffer_->tail(), n);
+  return (n != 0);
+}
+
+FileInputByteStream::FileInputByteStream(FileInputByteStream&& other)
+  : fd_(other.fd_),
+    ownsFd_(other.ownsFd_),
+    buffer_(std::move(other.buffer_)) {
+  other.fd_ = -1;
+  other.ownsFd_ = false;
+}
+
+FileInputByteStream& FileInputByteStream::operator=(
+    FileInputByteStream&& other) {
+  if (&other != this) {
+    closeNoThrow();
+    fd_ = other.fd_;
+    ownsFd_ = other.ownsFd_;
+    buffer_ = std::move(other.buffer_);
+    other.fd_ = -1;
+    other.ownsFd_ = false;
+  }
+  return *this;
+}
+
+FileInputByteStream::~FileInputByteStream() {
+  closeNoThrow();
+}
+
+void FileInputByteStream::closeNoThrow() {
+  if (!ownsFd_) {
+    return;
+  }
+  ownsFd_ = false;
+  if (::close(fd_) == -1) {
+    PLOG(ERROR) << "close failed";
+  }
+}
+
+InputByteStreamSplitter<FileInputByteStream> byLine(
+    const char* fileName, char delim) {
+  int fd = ::open(fileName, O_RDONLY);
+  if (fd == -1) {
+    throw std::system_error(errno, std::system_category(), "open failed");
+  }
+  return makeInputByteStreamSplitter(delim, FileInputByteStream(fd, true));
+}
+
+}  // namespace folly
+
diff --git a/folly/experimental/io/Stream.h b/folly/experimental/io/Stream.h
new file mode 100644 (file)
index 0000000..15b58aa
--- /dev/null
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_STREAM_H_
+#define FOLLY_IO_STREAM_H_
+
+#include <boost/iterator/iterator_facade.hpp>
+#include <glog/logging.h>
+
+#include "folly/Range.h"
+#include "folly/FBString.h"
+#include "folly/experimental/io/IOBuf.h"
+
+namespace folly {
+
+/**
+ * An InputByteStream is a functional object with the following signature:
+ *
+ *   bool operator()(ByteRange& data);
+ *
+ * Input byte streams must be movable.
+ *
+ * The stream returns false at EOF; otherwise, it returns true and sets data to
+ * the next chunk of data from the stream.  The memory that data points to must
+ * remain valid until the next call to the stream.  In case of error, the
+ * stream throws an exception.
+ *
+ * The meaning of a "chunk" is left up to the stream implementation.  Some
+ * streams return chunks limited to the size of an internal buffer.  Other
+ * streams return the entire input as one (potentially huge) ByteRange.
+ * Others assign meaning to chunks: StreamSplitter returns "lines" -- sequences
+ * of bytes between delimiters.  This ambiguity is intentional; resolving it
+ * would significantly increase the complexity of the code.
+ *
+ * An OutputByteStream is an object with the following signature:
+ *
+ *   void operator()(ByteRange data);
+ *   void close();
+ *
+ * Output byte streams must be movable.
+ *
+ * The stream appends a chunk of data to the stream when calling operator().
+ * close() closes the stream, allowing us to detect any errors before
+ * destroying the stream object (to avoid throwing exceptions from the
+ * destructor).  The destructor must close the stream if close() was not
+ * explicitly called, and abort the program if closing the stream caused
+ * an error.
+ *
+ * Just like with input byte streams, the meaning of a "chunk" is left up
+ * to the stream implementation.  Some streams will just append all chunks
+ * as given; others might assign meaning to chunks and (for example) append
+ * delimiters between chunks.
+ */
+
+template <class Stream> class InputByteStreamIterator;
+
+/**
+ * Convenient base class template to derive all streams from; provides begin()
+ * and end() for iterator access.  This class makes use of the curriously
+ * recurring template pattern; your stream class S may derive from
+ * InputByteStreamBase<S>.
+ *
+ * Deriving from InputByteStreamBase<S> is not required, but is convenient.
+ */
+template <class Derived>
+class InputByteStreamBase {
+ public:
+  InputByteStreamIterator<Derived> begin() {
+    return InputByteStreamIterator<Derived>(static_cast<Derived&>(*this));
+  }
+
+  InputByteStreamIterator<Derived> end() {
+    return InputByteStreamIterator<Derived>();
+  }
+
+  InputByteStreamBase() { }
+  InputByteStreamBase(InputByteStreamBase&&) = default;
+  InputByteStreamBase& operator=(InputByteStreamBase&&) = default;
+
+ private:
+  InputByteStreamBase(const InputByteStreamBase&) = delete;
+  InputByteStreamBase& operator=(const InputByteStreamBase&) = delete;
+};
+
+/**
+ * Stream iterator
+ */
+template <class Stream>
+class InputByteStreamIterator
+  : public boost::iterator_facade<
+      InputByteStreamIterator<Stream>,
+      const ByteRange,
+      boost::single_pass_traversal_tag> {
+ public:
+  InputByteStreamIterator() : stream_(nullptr) { }
+
+  explicit InputByteStreamIterator(Stream& stream) : stream_(&stream) {
+    increment();
+  }
+
+ private:
+  friend class boost::iterator_core_access;
+
+  void increment() {
+    DCHECK(stream_);
+    if (stream_ && !(*stream_)(chunk_)) {
+      stream_ = nullptr;
+    }
+  }
+
+  // This is a single pass iterator, so all we care about is that
+  // equal forms an equivalence class on the subset of iterators that it's
+  // defined on.  In our case, only identical (same object) iterators and
+  // past-the-end iterators compare equal.  (so that it != end() works)
+  bool equal(const InputByteStreamIterator& other) const {
+    return (this == &other) || (!stream_ && !other.stream_);
+  }
+
+  const ByteRange& dereference() const {
+    DCHECK(stream_);
+    return chunk_;
+  }
+
+  Stream* stream_;
+  ByteRange chunk_;
+};
+
+/**
+ * Stream that read()s from a file.
+ */
+class FileInputByteStream : public InputByteStreamBase<FileInputByteStream> {
+ public:
+  static const size_t kDefaultBufferSize = 4096;
+  explicit FileInputByteStream(int fd,
+                               bool ownsFd = false,
+                               size_t bufferSize = kDefaultBufferSize);
+  FileInputByteStream(int fd, bool ownsFd, std::unique_ptr<IOBuf>&& buffer);
+  FileInputByteStream(FileInputByteStream&& other);
+  FileInputByteStream& operator=(FileInputByteStream&& other);
+  ~FileInputByteStream();
+  bool operator()(ByteRange& chunk);
+
+ private:
+  void closeNoThrow();
+
+  int fd_;
+  bool ownsFd_;
+  std::unique_ptr<IOBuf> buffer_;
+};
+
+/**
+ * Split a stream on a delimiter.  Returns "lines" between delimiters;
+ * the delimiters are not included in the returned string.
+ *
+ * Note that the InputByteStreamSplitter acts as a stream itself, and you can
+ * iterate over it.
+ */
+template <class Stream>
+class InputByteStreamSplitter
+  : public InputByteStreamBase<InputByteStreamSplitter<Stream>> {
+ public:
+  InputByteStreamSplitter(char delimiter, Stream stream);
+  bool operator()(ByteRange& chunk);
+
+  InputByteStreamSplitter(InputByteStreamSplitter&&) = default;
+  InputByteStreamSplitter& operator=(InputByteStreamSplitter&&) = default;
+
+ private:
+  InputByteStreamSplitter(const InputByteStreamSplitter&) = delete;
+  InputByteStreamSplitter& operator=(const InputByteStreamSplitter&) = delete;
+
+  bool done_;
+  char delimiter_;
+  Stream stream_;
+  std::unique_ptr<IOBuf> buffer_;
+  ByteRange rest_;
+};
+
+/**
+ * Shortcut to create a stream splitter around a stream and deduce
+ * the type of the template argument.
+ */
+template <class Stream>
+InputByteStreamSplitter<Stream> makeInputByteStreamSplitter(
+    char delimiter, Stream stream) {
+  return InputByteStreamSplitter<Stream>(delimiter, std::move(stream));
+}
+
+/**
+ * Create a stream that splits a file into chunks (default: lines, with
+ * '\n' as the delimiter)
+ */
+InputByteStreamSplitter<FileInputByteStream> byLine(
+    const char* fileName, char delim='\n');
+
+// overload for std::string
+inline InputByteStreamSplitter<FileInputByteStream> byLine(
+    const std::string& fileName, char delim='\n') {
+  return byLine(fileName.c_str(), delim);
+}
+
+// overload for fbstring
+inline InputByteStreamSplitter<FileInputByteStream> byLine(
+    const fbstring& fileName, char delim='\n') {
+  return byLine(fileName.c_str(), delim);
+}
+
+}  // namespace folly
+
+#include "folly/experimental/io/Stream-inl.h"
+
+#endif /* FOLLY_IO_STREAM_H_ */
+
diff --git a/folly/experimental/io/test/StreamTest.cpp b/folly/experimental/io/test/StreamTest.cpp
new file mode 100644 (file)
index 0000000..f0d3603
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2012 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/io/Stream.h"
+
+#include <vector>
+#include <string>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Benchmark.h"
+#include "folly/experimental/TestUtil.h"
+
+using namespace folly;
+
+namespace {
+
+std::vector<std::string> streamSplit(const std::string& str, char delimiter,
+                                     size_t maxChunkSize = (size_t)-1) {
+  size_t pos = 0;
+  auto cb = [&] (ByteRange& sp) mutable -> bool {
+    if (pos == str.size()) return false;
+    size_t n = std::min(str.size() - pos, maxChunkSize);
+    sp.reset(reinterpret_cast<const unsigned char*>(&(str[pos])), n);
+    pos += n;
+    return true;
+  };
+
+  std::vector<std::string> result;
+  for (auto line : makeInputByteStreamSplitter(delimiter, cb)) {
+    result.push_back(StringPiece(line).str());
+  }
+
+  return result;
+}
+
+}  // namespace
+
+TEST(InputByteStreamSplitter, Empty) {
+  {
+    auto pieces = streamSplit("", ',');
+    EXPECT_EQ(0, pieces.size());
+  }
+
+  // The last delimiter is eaten, just like std::getline
+  {
+    auto pieces = streamSplit(",", ',');
+    EXPECT_EQ(1, pieces.size());
+    EXPECT_EQ("", pieces[0]);
+  }
+
+  {
+    auto pieces = streamSplit(",,", ',');
+    EXPECT_EQ(2, pieces.size());
+    EXPECT_EQ("", pieces[0]);
+    EXPECT_EQ("", pieces[1]);
+  }
+}
+
+TEST(InputByteStreamSplitter, Simple) {
+  std::string str = "hello,, world, goodbye, meow";
+
+  for (size_t chunkSize = 1; chunkSize <= str.size(); ++chunkSize) {
+    auto pieces = streamSplit(str, ',', chunkSize);
+    EXPECT_EQ(5, pieces.size());
+    EXPECT_EQ("hello", pieces[0]);
+    EXPECT_EQ("", pieces[1]);
+    EXPECT_EQ(" world", pieces[2]);
+    EXPECT_EQ(" goodbye", pieces[3]);
+    EXPECT_EQ(" meow", pieces[4]);
+  }
+}
+
+TEST(ByLine, Simple) {
+  test::TemporaryFile file("ByLine");
+  static const std::string lines(
+      "Hello world\n"
+      "This is the second line\n"
+      "\n"
+      "\n"
+      "a few empty lines above\n"
+      "incomplete last line");
+  EXPECT_EQ(lines.size(), write(file.fd(), lines.data(), lines.size()));
+
+  auto expected = streamSplit(lines, '\n');
+  std::vector<std::string> found;
+  for (auto& line : byLine(file.path())) {
+    found.push_back(StringPiece(line).str());
+  }
+
+  EXPECT_TRUE(expected == found);
+}
+
+int main(int argc, char *argv[]) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  auto ret = RUN_ALL_TESTS();
+  if (!ret) {
+    folly::runBenchmarksOnFlag();
+  }
+  return ret;
+}
+