RecordIO: robust record-based streaming I/O
authorTudor Bosman <tudorb@fb.com>
Thu, 25 Apr 2013 03:29:08 +0000 (20:29 -0700)
committerSara Golemon <sgolemon@fb.com>
Mon, 20 May 2013 18:01:26 +0000 (11:01 -0700)
Summary:
RecordIO provides an interface to write and read streams of variable-length
records that is resilient in the face of failure and data corruption.  If
the stream is corrupted in any way, you will lose records, but the stream
will resynchronize.

We have one implementation of RecordIO reader/writer that reads from / writes
to regular files (using mmap(), even for writes, so we can preserve state
even in case of process death -- the buffer cache will flush things to disk
eventually) and we expose enough of the guts (in a reasonably clean way)
so you can build your own on top of other backends.

Test Plan: test added

Reviewed By: mmcurtiss@fb.com

FB internal diff: D790275

folly/io/RecordIO-inl.h [new file with mode: 0644]
folly/io/RecordIO.cpp [new file with mode: 0644]
folly/io/RecordIO.h [new file with mode: 0644]
folly/io/test/RecordIOTest.cpp [new file with mode: 0644]

diff --git a/folly/io/RecordIO-inl.h b/folly/io/RecordIO-inl.h
new file mode 100644 (file)
index 0000000..32d615e
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_RECORDIO_H_
+#error This file may only be included from folly/io/RecordIO.h
+#endif
+
+#include <boost/iterator/iterator_facade.hpp>
+
+#include "folly/SpookyHashV2.h"
+
+namespace folly {
+
+class RecordIOReader::Iterator : public boost::iterator_facade<
+    RecordIOReader::Iterator,
+    const std::pair<ByteRange, off_t>,
+    boost::forward_traversal_tag> {
+  friend class boost::iterator_core_access;
+  friend class RecordIOReader;
+ private:
+  Iterator(ByteRange range, uint32_t fileId, off_t pos);
+
+  reference dereference() const { return recordAndPos_; }
+  bool equal(const Iterator& other) const { return range_ == other.range_; }
+  void increment() {
+    size_t skip = recordio_helpers::headerSize() + recordAndPos_.first.size();
+    recordAndPos_.second += skip;
+    range_.advance(skip);
+    advanceToValid();
+  }
+
+  void advanceToValid();
+  ByteRange range_;
+  uint32_t fileId_;
+  // stored as a pair so we can return by reference in dereference()
+  std::pair<ByteRange, off_t> recordAndPos_;
+};
+
+inline auto RecordIOReader::cbegin() const -> Iterator { return seek(0); }
+inline auto RecordIOReader::begin() const -> Iterator { return cbegin(); }
+inline auto RecordIOReader::cend() const -> Iterator { return seek(off_t(-1)); }
+inline auto RecordIOReader::end() const -> Iterator { return cend(); }
+inline auto RecordIOReader::seek(off_t pos) const -> Iterator {
+  return Iterator(map_.range(), fileId_, pos);
+}
+
+namespace recordio_helpers {
+
+namespace detail {
+
+struct Header {
+  // First 4 bytes of SHA1("zuck"), big-endian
+  // Any values will do, except that the sequence must not have a
+  // repeated prefix (that is, if we see kMagic, we know that the next
+  // occurrence must start at least 4 bytes later)
+  static constexpr uint32_t kMagic = 0xeac313a1;
+  uint32_t magic;
+  uint8_t  version;       // backwards incompatible version, currently 0
+  uint8_t  hashFunction;  // 0 = SpookyHashV2
+  uint16_t flags;         // reserved (must be 0)
+  uint32_t fileId;        // unique file ID
+  uint32_t dataLength;
+  uint64_t dataHash;
+  uint32_t headerHash;  // must be last
+} __attribute__((packed));
+
+static_assert(offsetof(Header, headerHash) + sizeof(Header::headerHash) ==
+              sizeof(Header), "invalid header layout");
+
+}  // namespace detail
+
+constexpr size_t headerSize() { return sizeof(detail::Header); }
+
+inline RecordInfo findRecord(ByteRange range, uint32_t fileId) {
+  return findRecord(range, range, fileId);
+}
+
+}  // namespace recordio_helpers
+
+}  // namespaces
+
diff --git a/folly/io/RecordIO.cpp b/folly/io/RecordIO.cpp
new file mode 100644 (file)
index 0000000..5eba9bc
--- /dev/null
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/io/RecordIO.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "folly/Exception.h"
+#include "folly/FileUtil.h"
+#include "folly/Memory.h"
+#include "folly/ScopeGuard.h"
+#include "folly/String.h"
+
+namespace folly {
+
+using namespace recordio_helpers;
+
+RecordIOWriter::RecordIOWriter(File file, uint32_t fileId)
+  : file_(std::move(file)),
+    fileId_(fileId),
+    writeLock_(file_, std::defer_lock),
+    filePos_(0) {
+  if (!writeLock_.try_lock()) {
+    throw std::runtime_error("RecordIOWriter: file locked by another process");
+  }
+
+  struct stat st;
+  checkUnixError(fstat(file_.fd(), &st), "fstat() failed");
+
+  filePos_ = st.st_size;
+}
+
+void RecordIOWriter::write(std::unique_ptr<IOBuf> buf) {
+  size_t totalLength = prependHeader(buf, fileId_);
+  if (totalLength == 0) {
+    return;  // nothing to do
+  }
+
+  // TODO(tudorb): Maybe use pwritev, but for now we're copying everything in
+  // one place.
+  buf->unshare();
+  buf->coalesce();
+  DCHECK_EQ(buf->length(), totalLength);
+
+  // We're going to write.  Reserve space for ourselves.
+  off_t pos = filePos_.fetch_add(buf->length());
+  ssize_t bytes = pwriteFull(file_.fd(), buf->data(), buf->length(), pos);
+  checkUnixError(bytes, "pwrite() failed");
+  DCHECK_EQ(bytes, buf->length());
+}
+
+RecordIOReader::RecordIOReader(File file, uint32_t fileId)
+  : map_(std::move(file)),
+    fileId_(fileId) {
+}
+
+RecordIOReader::Iterator::Iterator(ByteRange range, uint32_t fileId, off_t pos)
+  : range_(range),
+    fileId_(fileId),
+    recordAndPos_(ByteRange(), 0) {
+  if (pos >= range_.size()) {
+    recordAndPos_.second = off_t(-1);
+    range_.clear();
+  } else {
+    recordAndPos_.second = pos;
+    range_.advance(pos);
+    advanceToValid();
+  }
+}
+
+void RecordIOReader::Iterator::advanceToValid() {
+  ByteRange record = findRecord(range_, fileId_).record;
+  if (record.empty()) {
+    recordAndPos_ = std::make_pair(ByteRange(), off_t(-1));
+    range_.clear();  // at end
+  } else {
+    size_t skipped = record.begin() - range_.begin();
+    DCHECK_GE(skipped, headerSize());
+    skipped -= headerSize();
+    range_.advance(skipped);
+    recordAndPos_.first = record;
+    recordAndPos_.second += skipped;
+  }
+}
+
+namespace recordio_helpers {
+
+using namespace detail;
+
+namespace {
+
+constexpr uint32_t kHashSeed = 0xdeadbeef;  // for mcurtiss
+
+uint32_t headerHash(const Header& header) {
+  return hash::SpookyHashV2::Hash32(&header, offsetof(Header, headerHash),
+                                    kHashSeed);
+}
+
+std::pair<size_t, uint64_t> dataLengthAndHash(const IOBuf* buf) {
+  size_t len = 0;
+  hash::SpookyHashV2 hasher;
+  hasher.Init(kHashSeed, kHashSeed);
+  for (auto br : *buf) {
+    len += br.size();
+    hasher.Update(br.data(), br.size());
+  }
+  uint64_t hash1;
+  uint64_t hash2;
+  hasher.Final(&hash1, &hash2);
+  if (len + headerSize() >= std::numeric_limits<uint32_t>::max()) {
+    throw std::invalid_argument("Record length must fit in 32 bits");
+  }
+  return std::make_pair(len, hash1);
+}
+
+uint64_t dataHash(ByteRange range) {
+  return hash::SpookyHashV2::Hash64(range.data(), range.size(), kHashSeed);
+}
+
+}  // namespace
+
+size_t prependHeader(std::unique_ptr<IOBuf>& buf, uint32_t fileId) {
+  if (fileId == 0) {
+    throw std::invalid_argument("invalid file id");
+  }
+  auto lengthAndHash = dataLengthAndHash(buf.get());
+  if (lengthAndHash.first == 0) {
+    return 0;  // empty, nothing to do, no zero-length records
+  }
+
+  // Prepend to the first buffer in the chain if we have room, otherwise
+  // prepend a new buffer.
+  if (buf->headroom() >= headerSize()) {
+    buf->unshareOne();
+    buf->prepend(headerSize());
+  } else {
+    auto b = IOBuf::create(headerSize());
+    b->append(headerSize());
+    b->appendChain(std::move(buf));
+    buf = std::move(b);
+  }
+  detail::Header* header =
+    reinterpret_cast<detail::Header*>(buf->writableData());
+  memset(header, 0, sizeof(Header));
+  header->magic = detail::Header::kMagic;
+  header->fileId = fileId;
+  header->dataLength = lengthAndHash.first;
+  header->dataHash = lengthAndHash.second;
+  header->headerHash = headerHash(*header);
+
+  return lengthAndHash.first + headerSize();
+}
+
+RecordInfo validateRecord(ByteRange range, uint32_t fileId) {
+  if (range.size() <= headerSize()) {  // records may not be empty
+    return {0};
+  }
+  const Header* header = reinterpret_cast<const Header*>(range.begin());
+  range.advance(sizeof(Header));
+  if (header->magic != Header::kMagic ||
+      header->version != 0 ||
+      header->hashFunction != 0 ||
+      header->flags != 0 ||
+      (fileId != 0 && header->fileId != fileId) ||
+      header->dataLength > range.size()) {
+    return {0};
+  }
+  if (headerHash(*header) != header->headerHash) {
+    return {0};
+  }
+  range.reset(range.begin(), header->dataLength);
+  if (dataHash(range) != header->dataHash) {
+    return {0};
+  }
+  return {header->fileId, range};
+}
+
+RecordInfo findRecord(ByteRange searchRange,
+                      ByteRange wholeRange,
+                      uint32_t fileId) {
+  static const uint32_t magic = Header::kMagic;
+  static const ByteRange magicRange(reinterpret_cast<const uint8_t*>(&magic),
+                                    sizeof(magic));
+  static constexpr size_t headerTail = sizeof(Header) - sizeof(magic);
+
+  DCHECK_GE(searchRange.begin(), wholeRange.begin());
+  DCHECK_LE(searchRange.end(), wholeRange.end());
+
+  const uint8_t* start = searchRange.begin();
+  const uint8_t* end = std::min(searchRange.end(),
+                                wholeRange.end() - sizeof(Header));
+  // end-1: the last place where a Header could start
+  while (start < end) {
+    auto p = ByteRange(start, end + sizeof(magic)).find(magicRange);
+    if (p == ByteRange::npos) {
+      break;
+    }
+
+    start += p;
+    auto r = validateRecord(ByteRange(start, wholeRange.end()), fileId);
+    if (!r.record.empty()) {
+      return r;
+    }
+
+    // No repeated prefix in magic, so we can do better than start++
+    start += sizeof(magic);
+  }
+
+  return {0};
+}
+
+}  // namespace
+
+}  // namespaces
diff --git a/folly/io/RecordIO.h b/folly/io/RecordIO.h
new file mode 100644 (file)
index 0000000..14c88f2
--- /dev/null
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * RecordIO: self-synchronizing stream of variable length records
+ *
+ * RecordIO gives you the ability to write a stream of variable length records
+ * and read them later even in the face of data corruption -- randomly inserted
+ * or deleted chunks of the file, or modified data.  When reading, you may lose
+ * corrupted records, but the stream will resynchronize automatically.
+ */
+#ifndef FOLLY_IO_RECORDIO_H_
+#define FOLLY_IO_RECORDIO_H_
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+
+#include "folly/File.h"
+#include "folly/Range.h"
+#include "folly/MemoryMapping.h"
+#include "folly/io/IOBuf.h"
+
+namespace folly {
+
+/**
+ * Class to write a stream of RecordIO records to a file.
+ *
+ * RecordIOWriter is thread-safe
+ */
+class RecordIOWriter {
+ public:
+  /**
+   * Create a RecordIOWriter around a file; will append to the end of
+   * file if it exists.
+   *
+   * Each file must have a non-zero file id, which is embedded in all
+   * record headers.  Readers will only return records with the requested
+   * file id (or, if the reader is created with fileId=0 in the constructor,
+   * the reader will return all records).  File ids are only used to allow
+   * resynchronization if you store RecordIO records (with headers) inside
+   * other RecordIO records (for example, if a record consists of a fragment
+   * from another RecordIO file).  If you're not planning to do that,
+   * the defaults are fine.
+   */
+  explicit RecordIOWriter(File file, uint32_t fileId = 1);
+
+  /**
+   * Write a record.  We will use at most headerSize() bytes of headroom,
+   * you might want to arrange that before copying your data into it.
+   */
+  void write(std::unique_ptr<IOBuf> buf);
+
+  /**
+   * Return the position in the file where the next byte will be written.
+   * Conservative, as stuff can be written at any time from another thread.
+   */
+  off_t filePos() const { return filePos_; }
+
+ private:
+  File file_;
+  uint32_t fileId_;
+  std::unique_lock<File> writeLock_;
+  std::atomic<off_t> filePos_;
+};
+
+/**
+ * Class to read from a RecordIO file.  Will skip invalid records.
+ */
+class RecordIOReader {
+ public:
+  class Iterator;
+
+  /**
+   * RecordIOReader is iterable, returning pairs of ByteRange (record content)
+   * and position in file where the record (including header) begins.
+   * Note that the position includes the header, that is, it can be passed back
+   * to seek().
+   */
+  typedef Iterator iterator;
+  typedef Iterator const_iterator;
+  typedef std::pair<ByteRange, off_t> value_type;
+  typedef value_type& reference;
+  typedef const value_type& const_reference;
+
+  /**
+   * A record reader with a fileId of 0 will return all records.
+   * A record reader with a non-zero fileId will only return records where
+   * the fileId matches.
+   */
+  explicit RecordIOReader(File file, uint32_t fileId = 0);
+
+  Iterator cbegin() const;
+  Iterator begin() const;
+  Iterator cend() const;
+  Iterator end() const;
+
+  /**
+   * Create an iterator to the first valid record after pos.
+   */
+  Iterator seek(off_t pos) const;
+
+ private:
+  MemoryMapping map_;
+  uint32_t fileId_;
+};
+
+namespace recordio_helpers {
+
+// We're exposing the guts of the RecordIO implementation for two reasons:
+// 1. It makes unit testing easier, and
+// 2. It allows you to build different RecordIO readers / writers that use
+// different storage systems underneath (not standard files)
+
+/**
+ * Header size.
+ */
+constexpr size_t headerSize();  // defined in RecordIO-inl.h
+
+/**
+ * Write a header in the buffer.  We will prepend the header to the front
+ * of the chain.  Do not write the buffer if empty (we don't allow empty
+ * records).  Returns the total length, including header (0 if empty)
+ * (same as buf->computeChainDataLength(), but likely faster)
+ *
+ * The fileId should be unique per stream and allows you to have RecordIO
+ * headers stored inside the data (for example, have an entire RecordIO
+ * file stored as a record inside another RecordIO file).  The fileId may
+ * not be 0.
+ */
+size_t prependHeader(std::unique_ptr<IOBuf>& buf, uint32_t fileId = 1);
+
+/**
+ * Search for the first valid record that begins in searchRange (which must be
+ * a subrange of wholeRange).  Returns the record data (not the header) if
+ * found, ByteRange() otherwise.
+ *
+ * The fileId may be 0, in which case we'll return the first valid record for
+ * *any* fileId, or non-zero, in which case we'll only look for records with
+ * the requested fileId.
+ */
+struct RecordInfo {
+  uint32_t fileId;
+  ByteRange record;
+};
+RecordInfo findRecord(ByteRange searchRange,
+                      ByteRange wholeRange,
+                      uint32_t fileId);
+
+/**
+ * Search for the first valid record in range.
+ */
+RecordInfo findRecord(ByteRange range, uint32_t fileId);
+
+/**
+ * Check if there is a valid record at the beginning of range.  Returns the
+ * record data (not the header) if the record is valid, ByteRange() otherwise.
+ */
+RecordInfo validateRecord(ByteRange range, uint32_t fileId);
+
+}  // namespace recordio_helpers
+
+}  // namespaces
+
+#include "folly/io/RecordIO-inl.h"
+
+#endif /* FOLLY_IO_RECORDIO_H_ */
+
diff --git a/folly/io/test/RecordIOTest.cpp b/folly/io/test/RecordIOTest.cpp
new file mode 100644 (file)
index 0000000..9f4d269
--- /dev/null
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/io/RecordIO.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <random>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Conv.h"
+#include "folly/FBString.h"
+#include "folly/Random.h"
+#include "folly/experimental/TestUtil.h"
+#include "folly/io/IOBufQueue.h"
+
+DEFINE_int32(random_seed, folly::randomNumberSeed(), "random seed");
+
+namespace folly { namespace test {
+
+namespace {
+// shortcut
+ByteRange br(StringPiece sp) { return ByteRange(sp); }
+StringPiece sp(ByteRange br) { return StringPiece(br); }
+
+template <class T>
+std::unique_ptr<IOBuf> iobufs(std::initializer_list<T> ranges) {
+  IOBufQueue queue;
+  for (auto& range : ranges) {
+    StringPiece r(range);
+    queue.append(IOBuf::wrapBuffer(r.data(), r.size()));
+  }
+  return queue.move();
+}
+
+}  // namespace
+
+TEST(RecordIOTest, Simple) {
+  TemporaryFile file;
+  {
+    RecordIOWriter writer(file.fd());
+    writer.write(iobufs({"hello ", "world"}));
+    writer.write(iobufs({"goodbye"}));
+  }
+  {
+    RecordIOReader reader(file.fd());
+    auto it = reader.begin();
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("hello world", sp((it++)->first));
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("goodbye", sp((it++)->first));
+    EXPECT_TRUE(it == reader.end());
+  }
+  {
+    RecordIOWriter writer(file.fd());
+    writer.write(iobufs({"meow"}));
+    writer.write(iobufs({"woof"}));
+  }
+  {
+    RecordIOReader reader(file.fd());
+    auto it = reader.begin();
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("hello world", sp((it++)->first));
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("goodbye", sp((it++)->first));
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("meow", sp((it++)->first));
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("woof", sp((it++)->first));
+    EXPECT_TRUE(it == reader.end());
+  }
+}
+
+TEST(RecordIOTest, SmallRecords) {
+  constexpr size_t kSize = 10;
+  char tmp[kSize];
+  memset(tmp, 'x', kSize);
+  TemporaryFile file;
+  {
+    RecordIOWriter writer(file.fd());
+    for (int i = 0; i < kSize; ++i) {  // record of size 0 should be ignored
+      writer.write(IOBuf::wrapBuffer(tmp, i));
+    }
+  }
+  {
+    RecordIOReader reader(file.fd());
+    auto it = reader.begin();
+    for (int i = 1; i < kSize; ++i) {
+      ASSERT_FALSE(it == reader.end());
+      EXPECT_EQ(StringPiece(tmp, i), sp((it++)->first));
+    }
+    EXPECT_TRUE(it == reader.end());
+  }
+}
+
+TEST(RecordIOTest, MultipleFileIds) {
+  TemporaryFile file;
+  {
+    RecordIOWriter writer(file.fd(), 1);
+    writer.write(iobufs({"hello"}));
+  }
+  {
+    RecordIOWriter writer(file.fd(), 2);
+    writer.write(iobufs({"world"}));
+  }
+  {
+    RecordIOWriter writer(file.fd(), 1);
+    writer.write(iobufs({"goodbye"}));
+  }
+  {
+    RecordIOReader reader(file.fd(), 0);  // return all
+    auto it = reader.begin();
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("hello", sp((it++)->first));
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("world", sp((it++)->first));
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("goodbye", sp((it++)->first));
+    EXPECT_TRUE(it == reader.end());
+  }
+  {
+    RecordIOReader reader(file.fd(), 1);
+    auto it = reader.begin();
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("hello", sp((it++)->first));
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("goodbye", sp((it++)->first));
+    EXPECT_TRUE(it == reader.end());
+  }
+  {
+    RecordIOReader reader(file.fd(), 2);
+    auto it = reader.begin();
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("world", sp((it++)->first));
+    EXPECT_TRUE(it == reader.end());
+  }
+  {
+    RecordIOReader reader(file.fd(), 3);
+    auto it = reader.begin();
+    EXPECT_TRUE(it == reader.end());
+  }
+}
+
+TEST(RecordIOTest, ExtraMagic) {
+  TemporaryFile file;
+  {
+    RecordIOWriter writer(file.fd());
+    writer.write(iobufs({"hello"}));
+  }
+  uint8_t buf[recordio_helpers::headerSize() + 5];
+  EXPECT_EQ(0, lseek(file.fd(), 0, SEEK_SET));
+  EXPECT_EQ(sizeof(buf), read(file.fd(), buf, sizeof(buf)));
+  // Append an extra magic
+  const uint32_t magic = recordio_helpers::detail::Header::kMagic;
+  EXPECT_EQ(sizeof(magic), write(file.fd(), &magic, sizeof(magic)));
+  // and an extra record
+  EXPECT_EQ(sizeof(buf), write(file.fd(), buf, sizeof(buf)));
+  {
+    RecordIOReader reader(file.fd());
+    auto it = reader.begin();
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("hello", sp((it++)->first));
+    ASSERT_FALSE(it == reader.end());
+    EXPECT_EQ("hello", sp((it++)->first));
+    EXPECT_TRUE(it == reader.end());
+  }
+}
+
+namespace {
+void corrupt(int fd, off_t pos) {
+  uint8_t val = 0;
+  EXPECT_EQ(1, pread(fd, &val, 1, pos));
+  ++val;
+  EXPECT_EQ(1, pwrite(fd, &val, 1, pos));
+}
+}  // namespace
+
+TEST(RecordIOTest, Randomized) {
+  SCOPED_TRACE(to<std::string>("Random seed is ", FLAGS_random_seed));
+  std::mt19937 rnd(FLAGS_random_seed);
+
+  size_t recordCount =
+    std::uniform_int_distribution<uint32_t>(30, 300)(rnd);
+
+  std::uniform_int_distribution<uint32_t> recordSizeDist(1, 3 << 16);
+  std::uniform_int_distribution<uint32_t> charDist(0, 255);
+  std::uniform_int_distribution<uint32_t> junkDist(0, 1 << 20);
+  // corrupt 1/5 of all records
+  std::uniform_int_distribution<uint32_t> corruptDist(0, 4);
+
+  std::vector<std::pair<fbstring, off_t>> records;
+  std::vector<off_t> corruptPositions;
+  records.reserve(recordCount);
+  TemporaryFile file;
+
+  fbstring record;
+  // Recreate the writer multiple times so we test that we create a
+  // continuous stream
+  for (size_t i = 0; i < 3; ++i) {
+    RecordIOWriter writer(file.fd());
+    for (size_t j = 0; j < recordCount; ++j) {
+      off_t beginPos = writer.filePos();
+      record.clear();
+      size_t recordSize = recordSizeDist(rnd);
+      record.reserve(recordSize);
+      for (size_t k = 0; k < recordSize; ++k) {
+        record.push_back(charDist(rnd));
+      }
+      writer.write(iobufs({record}));
+
+      bool corrupt = (corruptDist(rnd) == 0);
+      if (corrupt) {
+        // Corrupt one random byte in the record (including header)
+        std::uniform_int_distribution<uint32_t> corruptByteDist(
+            0, recordSize + recordio_helpers::headerSize() - 1);
+        off_t corruptRel = corruptByteDist(rnd);
+        VLOG(1) << "n=" << records.size() << " bpos=" << beginPos
+                << " rsize=" << record.size()
+                << " corrupt rel=" << corruptRel
+                << " abs=" << beginPos + corruptRel;
+        corruptPositions.push_back(beginPos + corruptRel);
+      } else {
+        VLOG(2) << "n=" << records.size() << " bpos=" << beginPos
+                << " rsize=" << record.size()
+                << " good";
+        records.emplace_back(std::move(record), beginPos);
+      }
+    }
+    VLOG(1) << "n=" << records.size() << " close abs=" << writer.filePos();
+  }
+
+  for (auto& pos : corruptPositions) {
+    corrupt(file.fd(), pos);
+  }
+
+  {
+    size_t i = 0;
+    RecordIOReader reader(file.fd());
+    for (auto& r : reader) {
+      SCOPED_TRACE(i);
+      ASSERT_LT(i, records.size());
+      EXPECT_EQ(records[i].first, sp(r.first));
+      EXPECT_EQ(records[i].second, r.second);
+      ++i;
+    }
+    EXPECT_EQ(records.size(), i);
+  }
+}
+
+}}  // namespaces
+
+int main(int argc, char *argv[]) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  return RUN_ALL_TESTS();
+}
+