--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_RECORDIO_H_
+#error This file may only be included from folly/io/RecordIO.h
+#endif
+
+#include <boost/iterator/iterator_facade.hpp>
+
+#include "folly/SpookyHashV2.h"
+
+namespace folly {
+
+class RecordIOReader::Iterator : public boost::iterator_facade<
+ RecordIOReader::Iterator,
+ const std::pair<ByteRange, off_t>,
+ boost::forward_traversal_tag> {
+ friend class boost::iterator_core_access;
+ friend class RecordIOReader;
+ private:
+ Iterator(ByteRange range, uint32_t fileId, off_t pos);
+
+ reference dereference() const { return recordAndPos_; }
+ bool equal(const Iterator& other) const { return range_ == other.range_; }
+ void increment() {
+ size_t skip = recordio_helpers::headerSize() + recordAndPos_.first.size();
+ recordAndPos_.second += skip;
+ range_.advance(skip);
+ advanceToValid();
+ }
+
+ void advanceToValid();
+ ByteRange range_;
+ uint32_t fileId_;
+ // stored as a pair so we can return by reference in dereference()
+ std::pair<ByteRange, off_t> recordAndPos_;
+};
+
+inline auto RecordIOReader::cbegin() const -> Iterator { return seek(0); }
+inline auto RecordIOReader::begin() const -> Iterator { return cbegin(); }
+inline auto RecordIOReader::cend() const -> Iterator { return seek(off_t(-1)); }
+inline auto RecordIOReader::end() const -> Iterator { return cend(); }
+inline auto RecordIOReader::seek(off_t pos) const -> Iterator {
+ return Iterator(map_.range(), fileId_, pos);
+}
+
+namespace recordio_helpers {
+
+namespace detail {
+
+struct Header {
+ // First 4 bytes of SHA1("zuck"), big-endian
+ // Any values will do, except that the sequence must not have a
+ // repeated prefix (that is, if we see kMagic, we know that the next
+ // occurrence must start at least 4 bytes later)
+ static constexpr uint32_t kMagic = 0xeac313a1;
+ uint32_t magic;
+ uint8_t version; // backwards incompatible version, currently 0
+ uint8_t hashFunction; // 0 = SpookyHashV2
+ uint16_t flags; // reserved (must be 0)
+ uint32_t fileId; // unique file ID
+ uint32_t dataLength;
+ uint64_t dataHash;
+ uint32_t headerHash; // must be last
+} __attribute__((packed));
+
+static_assert(offsetof(Header, headerHash) + sizeof(Header::headerHash) ==
+ sizeof(Header), "invalid header layout");
+
+} // namespace detail
+
+constexpr size_t headerSize() { return sizeof(detail::Header); }
+
+inline RecordInfo findRecord(ByteRange range, uint32_t fileId) {
+ return findRecord(range, range, fileId);
+}
+
+} // namespace recordio_helpers
+
+} // namespaces
+
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/io/RecordIO.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "folly/Exception.h"
+#include "folly/FileUtil.h"
+#include "folly/Memory.h"
+#include "folly/ScopeGuard.h"
+#include "folly/String.h"
+
+namespace folly {
+
+using namespace recordio_helpers;
+
+RecordIOWriter::RecordIOWriter(File file, uint32_t fileId)
+ : file_(std::move(file)),
+ fileId_(fileId),
+ writeLock_(file_, std::defer_lock),
+ filePos_(0) {
+ if (!writeLock_.try_lock()) {
+ throw std::runtime_error("RecordIOWriter: file locked by another process");
+ }
+
+ struct stat st;
+ checkUnixError(fstat(file_.fd(), &st), "fstat() failed");
+
+ filePos_ = st.st_size;
+}
+
+void RecordIOWriter::write(std::unique_ptr<IOBuf> buf) {
+ size_t totalLength = prependHeader(buf, fileId_);
+ if (totalLength == 0) {
+ return; // nothing to do
+ }
+
+ // TODO(tudorb): Maybe use pwritev, but for now we're copying everything in
+ // one place.
+ buf->unshare();
+ buf->coalesce();
+ DCHECK_EQ(buf->length(), totalLength);
+
+ // We're going to write. Reserve space for ourselves.
+ off_t pos = filePos_.fetch_add(buf->length());
+ ssize_t bytes = pwriteFull(file_.fd(), buf->data(), buf->length(), pos);
+ checkUnixError(bytes, "pwrite() failed");
+ DCHECK_EQ(bytes, buf->length());
+}
+
+RecordIOReader::RecordIOReader(File file, uint32_t fileId)
+ : map_(std::move(file)),
+ fileId_(fileId) {
+}
+
+RecordIOReader::Iterator::Iterator(ByteRange range, uint32_t fileId, off_t pos)
+ : range_(range),
+ fileId_(fileId),
+ recordAndPos_(ByteRange(), 0) {
+ if (pos >= range_.size()) {
+ recordAndPos_.second = off_t(-1);
+ range_.clear();
+ } else {
+ recordAndPos_.second = pos;
+ range_.advance(pos);
+ advanceToValid();
+ }
+}
+
+void RecordIOReader::Iterator::advanceToValid() {
+ ByteRange record = findRecord(range_, fileId_).record;
+ if (record.empty()) {
+ recordAndPos_ = std::make_pair(ByteRange(), off_t(-1));
+ range_.clear(); // at end
+ } else {
+ size_t skipped = record.begin() - range_.begin();
+ DCHECK_GE(skipped, headerSize());
+ skipped -= headerSize();
+ range_.advance(skipped);
+ recordAndPos_.first = record;
+ recordAndPos_.second += skipped;
+ }
+}
+
+namespace recordio_helpers {
+
+using namespace detail;
+
+namespace {
+
+constexpr uint32_t kHashSeed = 0xdeadbeef; // for mcurtiss
+
+uint32_t headerHash(const Header& header) {
+ return hash::SpookyHashV2::Hash32(&header, offsetof(Header, headerHash),
+ kHashSeed);
+}
+
+std::pair<size_t, uint64_t> dataLengthAndHash(const IOBuf* buf) {
+ size_t len = 0;
+ hash::SpookyHashV2 hasher;
+ hasher.Init(kHashSeed, kHashSeed);
+ for (auto br : *buf) {
+ len += br.size();
+ hasher.Update(br.data(), br.size());
+ }
+ uint64_t hash1;
+ uint64_t hash2;
+ hasher.Final(&hash1, &hash2);
+ if (len + headerSize() >= std::numeric_limits<uint32_t>::max()) {
+ throw std::invalid_argument("Record length must fit in 32 bits");
+ }
+ return std::make_pair(len, hash1);
+}
+
+uint64_t dataHash(ByteRange range) {
+ return hash::SpookyHashV2::Hash64(range.data(), range.size(), kHashSeed);
+}
+
+} // namespace
+
+size_t prependHeader(std::unique_ptr<IOBuf>& buf, uint32_t fileId) {
+ if (fileId == 0) {
+ throw std::invalid_argument("invalid file id");
+ }
+ auto lengthAndHash = dataLengthAndHash(buf.get());
+ if (lengthAndHash.first == 0) {
+ return 0; // empty, nothing to do, no zero-length records
+ }
+
+ // Prepend to the first buffer in the chain if we have room, otherwise
+ // prepend a new buffer.
+ if (buf->headroom() >= headerSize()) {
+ buf->unshareOne();
+ buf->prepend(headerSize());
+ } else {
+ auto b = IOBuf::create(headerSize());
+ b->append(headerSize());
+ b->appendChain(std::move(buf));
+ buf = std::move(b);
+ }
+ detail::Header* header =
+ reinterpret_cast<detail::Header*>(buf->writableData());
+ memset(header, 0, sizeof(Header));
+ header->magic = detail::Header::kMagic;
+ header->fileId = fileId;
+ header->dataLength = lengthAndHash.first;
+ header->dataHash = lengthAndHash.second;
+ header->headerHash = headerHash(*header);
+
+ return lengthAndHash.first + headerSize();
+}
+
+RecordInfo validateRecord(ByteRange range, uint32_t fileId) {
+ if (range.size() <= headerSize()) { // records may not be empty
+ return {0};
+ }
+ const Header* header = reinterpret_cast<const Header*>(range.begin());
+ range.advance(sizeof(Header));
+ if (header->magic != Header::kMagic ||
+ header->version != 0 ||
+ header->hashFunction != 0 ||
+ header->flags != 0 ||
+ (fileId != 0 && header->fileId != fileId) ||
+ header->dataLength > range.size()) {
+ return {0};
+ }
+ if (headerHash(*header) != header->headerHash) {
+ return {0};
+ }
+ range.reset(range.begin(), header->dataLength);
+ if (dataHash(range) != header->dataHash) {
+ return {0};
+ }
+ return {header->fileId, range};
+}
+
+RecordInfo findRecord(ByteRange searchRange,
+ ByteRange wholeRange,
+ uint32_t fileId) {
+ static const uint32_t magic = Header::kMagic;
+ static const ByteRange magicRange(reinterpret_cast<const uint8_t*>(&magic),
+ sizeof(magic));
+ static constexpr size_t headerTail = sizeof(Header) - sizeof(magic);
+
+ DCHECK_GE(searchRange.begin(), wholeRange.begin());
+ DCHECK_LE(searchRange.end(), wholeRange.end());
+
+ const uint8_t* start = searchRange.begin();
+ const uint8_t* end = std::min(searchRange.end(),
+ wholeRange.end() - sizeof(Header));
+ // end-1: the last place where a Header could start
+ while (start < end) {
+ auto p = ByteRange(start, end + sizeof(magic)).find(magicRange);
+ if (p == ByteRange::npos) {
+ break;
+ }
+
+ start += p;
+ auto r = validateRecord(ByteRange(start, wholeRange.end()), fileId);
+ if (!r.record.empty()) {
+ return r;
+ }
+
+ // No repeated prefix in magic, so we can do better than start++
+ start += sizeof(magic);
+ }
+
+ return {0};
+}
+
+} // namespace
+
+} // namespaces
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * RecordIO: self-synchronizing stream of variable length records
+ *
+ * RecordIO gives you the ability to write a stream of variable length records
+ * and read them later even in the face of data corruption -- randomly inserted
+ * or deleted chunks of the file, or modified data. When reading, you may lose
+ * corrupted records, but the stream will resynchronize automatically.
+ */
+#ifndef FOLLY_IO_RECORDIO_H_
+#define FOLLY_IO_RECORDIO_H_
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+
+#include "folly/File.h"
+#include "folly/Range.h"
+#include "folly/MemoryMapping.h"
+#include "folly/io/IOBuf.h"
+
+namespace folly {
+
+/**
+ * Class to write a stream of RecordIO records to a file.
+ *
+ * RecordIOWriter is thread-safe
+ */
+class RecordIOWriter {
+ public:
+ /**
+ * Create a RecordIOWriter around a file; will append to the end of
+ * file if it exists.
+ *
+ * Each file must have a non-zero file id, which is embedded in all
+ * record headers. Readers will only return records with the requested
+ * file id (or, if the reader is created with fileId=0 in the constructor,
+ * the reader will return all records). File ids are only used to allow
+ * resynchronization if you store RecordIO records (with headers) inside
+ * other RecordIO records (for example, if a record consists of a fragment
+ * from another RecordIO file). If you're not planning to do that,
+ * the defaults are fine.
+ */
+ explicit RecordIOWriter(File file, uint32_t fileId = 1);
+
+ /**
+ * Write a record. We will use at most headerSize() bytes of headroom,
+ * you might want to arrange that before copying your data into it.
+ */
+ void write(std::unique_ptr<IOBuf> buf);
+
+ /**
+ * Return the position in the file where the next byte will be written.
+ * Conservative, as stuff can be written at any time from another thread.
+ */
+ off_t filePos() const { return filePos_; }
+
+ private:
+ File file_;
+ uint32_t fileId_;
+ std::unique_lock<File> writeLock_;
+ std::atomic<off_t> filePos_;
+};
+
+/**
+ * Class to read from a RecordIO file. Will skip invalid records.
+ */
+class RecordIOReader {
+ public:
+ class Iterator;
+
+ /**
+ * RecordIOReader is iterable, returning pairs of ByteRange (record content)
+ * and position in file where the record (including header) begins.
+ * Note that the position includes the header, that is, it can be passed back
+ * to seek().
+ */
+ typedef Iterator iterator;
+ typedef Iterator const_iterator;
+ typedef std::pair<ByteRange, off_t> value_type;
+ typedef value_type& reference;
+ typedef const value_type& const_reference;
+
+ /**
+ * A record reader with a fileId of 0 will return all records.
+ * A record reader with a non-zero fileId will only return records where
+ * the fileId matches.
+ */
+ explicit RecordIOReader(File file, uint32_t fileId = 0);
+
+ Iterator cbegin() const;
+ Iterator begin() const;
+ Iterator cend() const;
+ Iterator end() const;
+
+ /**
+ * Create an iterator to the first valid record after pos.
+ */
+ Iterator seek(off_t pos) const;
+
+ private:
+ MemoryMapping map_;
+ uint32_t fileId_;
+};
+
+namespace recordio_helpers {
+
+// We're exposing the guts of the RecordIO implementation for two reasons:
+// 1. It makes unit testing easier, and
+// 2. It allows you to build different RecordIO readers / writers that use
+// different storage systems underneath (not standard files)
+
+/**
+ * Header size.
+ */
+constexpr size_t headerSize(); // defined in RecordIO-inl.h
+
+/**
+ * Write a header in the buffer. We will prepend the header to the front
+ * of the chain. Do not write the buffer if empty (we don't allow empty
+ * records). Returns the total length, including header (0 if empty)
+ * (same as buf->computeChainDataLength(), but likely faster)
+ *
+ * The fileId should be unique per stream and allows you to have RecordIO
+ * headers stored inside the data (for example, have an entire RecordIO
+ * file stored as a record inside another RecordIO file). The fileId may
+ * not be 0.
+ */
+size_t prependHeader(std::unique_ptr<IOBuf>& buf, uint32_t fileId = 1);
+
+/**
+ * Search for the first valid record that begins in searchRange (which must be
+ * a subrange of wholeRange). Returns the record data (not the header) if
+ * found, ByteRange() otherwise.
+ *
+ * The fileId may be 0, in which case we'll return the first valid record for
+ * *any* fileId, or non-zero, in which case we'll only look for records with
+ * the requested fileId.
+ */
+struct RecordInfo {
+ uint32_t fileId;
+ ByteRange record;
+};
+RecordInfo findRecord(ByteRange searchRange,
+ ByteRange wholeRange,
+ uint32_t fileId);
+
+/**
+ * Search for the first valid record in range.
+ */
+RecordInfo findRecord(ByteRange range, uint32_t fileId);
+
+/**
+ * Check if there is a valid record at the beginning of range. Returns the
+ * record data (not the header) if the record is valid, ByteRange() otherwise.
+ */
+RecordInfo validateRecord(ByteRange range, uint32_t fileId);
+
+} // namespace recordio_helpers
+
+} // namespaces
+
+#include "folly/io/RecordIO-inl.h"
+
+#endif /* FOLLY_IO_RECORDIO_H_ */
+
--- /dev/null
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/io/RecordIO.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <random>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Conv.h"
+#include "folly/FBString.h"
+#include "folly/Random.h"
+#include "folly/experimental/TestUtil.h"
+#include "folly/io/IOBufQueue.h"
+
+DEFINE_int32(random_seed, folly::randomNumberSeed(), "random seed");
+
+namespace folly { namespace test {
+
+namespace {
+// shortcut
+ByteRange br(StringPiece sp) { return ByteRange(sp); }
+StringPiece sp(ByteRange br) { return StringPiece(br); }
+
+template <class T>
+std::unique_ptr<IOBuf> iobufs(std::initializer_list<T> ranges) {
+ IOBufQueue queue;
+ for (auto& range : ranges) {
+ StringPiece r(range);
+ queue.append(IOBuf::wrapBuffer(r.data(), r.size()));
+ }
+ return queue.move();
+}
+
+} // namespace
+
+TEST(RecordIOTest, Simple) {
+ TemporaryFile file;
+ {
+ RecordIOWriter writer(file.fd());
+ writer.write(iobufs({"hello ", "world"}));
+ writer.write(iobufs({"goodbye"}));
+ }
+ {
+ RecordIOReader reader(file.fd());
+ auto it = reader.begin();
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("hello world", sp((it++)->first));
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("goodbye", sp((it++)->first));
+ EXPECT_TRUE(it == reader.end());
+ }
+ {
+ RecordIOWriter writer(file.fd());
+ writer.write(iobufs({"meow"}));
+ writer.write(iobufs({"woof"}));
+ }
+ {
+ RecordIOReader reader(file.fd());
+ auto it = reader.begin();
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("hello world", sp((it++)->first));
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("goodbye", sp((it++)->first));
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("meow", sp((it++)->first));
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("woof", sp((it++)->first));
+ EXPECT_TRUE(it == reader.end());
+ }
+}
+
+TEST(RecordIOTest, SmallRecords) {
+ constexpr size_t kSize = 10;
+ char tmp[kSize];
+ memset(tmp, 'x', kSize);
+ TemporaryFile file;
+ {
+ RecordIOWriter writer(file.fd());
+ for (int i = 0; i < kSize; ++i) { // record of size 0 should be ignored
+ writer.write(IOBuf::wrapBuffer(tmp, i));
+ }
+ }
+ {
+ RecordIOReader reader(file.fd());
+ auto it = reader.begin();
+ for (int i = 1; i < kSize; ++i) {
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ(StringPiece(tmp, i), sp((it++)->first));
+ }
+ EXPECT_TRUE(it == reader.end());
+ }
+}
+
+TEST(RecordIOTest, MultipleFileIds) {
+ TemporaryFile file;
+ {
+ RecordIOWriter writer(file.fd(), 1);
+ writer.write(iobufs({"hello"}));
+ }
+ {
+ RecordIOWriter writer(file.fd(), 2);
+ writer.write(iobufs({"world"}));
+ }
+ {
+ RecordIOWriter writer(file.fd(), 1);
+ writer.write(iobufs({"goodbye"}));
+ }
+ {
+ RecordIOReader reader(file.fd(), 0); // return all
+ auto it = reader.begin();
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("hello", sp((it++)->first));
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("world", sp((it++)->first));
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("goodbye", sp((it++)->first));
+ EXPECT_TRUE(it == reader.end());
+ }
+ {
+ RecordIOReader reader(file.fd(), 1);
+ auto it = reader.begin();
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("hello", sp((it++)->first));
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("goodbye", sp((it++)->first));
+ EXPECT_TRUE(it == reader.end());
+ }
+ {
+ RecordIOReader reader(file.fd(), 2);
+ auto it = reader.begin();
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("world", sp((it++)->first));
+ EXPECT_TRUE(it == reader.end());
+ }
+ {
+ RecordIOReader reader(file.fd(), 3);
+ auto it = reader.begin();
+ EXPECT_TRUE(it == reader.end());
+ }
+}
+
+TEST(RecordIOTest, ExtraMagic) {
+ TemporaryFile file;
+ {
+ RecordIOWriter writer(file.fd());
+ writer.write(iobufs({"hello"}));
+ }
+ uint8_t buf[recordio_helpers::headerSize() + 5];
+ EXPECT_EQ(0, lseek(file.fd(), 0, SEEK_SET));
+ EXPECT_EQ(sizeof(buf), read(file.fd(), buf, sizeof(buf)));
+ // Append an extra magic
+ const uint32_t magic = recordio_helpers::detail::Header::kMagic;
+ EXPECT_EQ(sizeof(magic), write(file.fd(), &magic, sizeof(magic)));
+ // and an extra record
+ EXPECT_EQ(sizeof(buf), write(file.fd(), buf, sizeof(buf)));
+ {
+ RecordIOReader reader(file.fd());
+ auto it = reader.begin();
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("hello", sp((it++)->first));
+ ASSERT_FALSE(it == reader.end());
+ EXPECT_EQ("hello", sp((it++)->first));
+ EXPECT_TRUE(it == reader.end());
+ }
+}
+
+namespace {
+void corrupt(int fd, off_t pos) {
+ uint8_t val = 0;
+ EXPECT_EQ(1, pread(fd, &val, 1, pos));
+ ++val;
+ EXPECT_EQ(1, pwrite(fd, &val, 1, pos));
+}
+} // namespace
+
+TEST(RecordIOTest, Randomized) {
+ SCOPED_TRACE(to<std::string>("Random seed is ", FLAGS_random_seed));
+ std::mt19937 rnd(FLAGS_random_seed);
+
+ size_t recordCount =
+ std::uniform_int_distribution<uint32_t>(30, 300)(rnd);
+
+ std::uniform_int_distribution<uint32_t> recordSizeDist(1, 3 << 16);
+ std::uniform_int_distribution<uint32_t> charDist(0, 255);
+ std::uniform_int_distribution<uint32_t> junkDist(0, 1 << 20);
+ // corrupt 1/5 of all records
+ std::uniform_int_distribution<uint32_t> corruptDist(0, 4);
+
+ std::vector<std::pair<fbstring, off_t>> records;
+ std::vector<off_t> corruptPositions;
+ records.reserve(recordCount);
+ TemporaryFile file;
+
+ fbstring record;
+ // Recreate the writer multiple times so we test that we create a
+ // continuous stream
+ for (size_t i = 0; i < 3; ++i) {
+ RecordIOWriter writer(file.fd());
+ for (size_t j = 0; j < recordCount; ++j) {
+ off_t beginPos = writer.filePos();
+ record.clear();
+ size_t recordSize = recordSizeDist(rnd);
+ record.reserve(recordSize);
+ for (size_t k = 0; k < recordSize; ++k) {
+ record.push_back(charDist(rnd));
+ }
+ writer.write(iobufs({record}));
+
+ bool corrupt = (corruptDist(rnd) == 0);
+ if (corrupt) {
+ // Corrupt one random byte in the record (including header)
+ std::uniform_int_distribution<uint32_t> corruptByteDist(
+ 0, recordSize + recordio_helpers::headerSize() - 1);
+ off_t corruptRel = corruptByteDist(rnd);
+ VLOG(1) << "n=" << records.size() << " bpos=" << beginPos
+ << " rsize=" << record.size()
+ << " corrupt rel=" << corruptRel
+ << " abs=" << beginPos + corruptRel;
+ corruptPositions.push_back(beginPos + corruptRel);
+ } else {
+ VLOG(2) << "n=" << records.size() << " bpos=" << beginPos
+ << " rsize=" << record.size()
+ << " good";
+ records.emplace_back(std::move(record), beginPos);
+ }
+ }
+ VLOG(1) << "n=" << records.size() << " close abs=" << writer.filePos();
+ }
+
+ for (auto& pos : corruptPositions) {
+ corrupt(file.fd(), pos);
+ }
+
+ {
+ size_t i = 0;
+ RecordIOReader reader(file.fd());
+ for (auto& r : reader) {
+ SCOPED_TRACE(i);
+ ASSERT_LT(i, records.size());
+ EXPECT_EQ(records[i].first, sp(r.first));
+ EXPECT_EQ(records[i].second, r.second);
+ ++i;
+ }
+ EXPECT_EQ(records.size(), i);
+ }
+}
+
+}} // namespaces
+
+int main(int argc, char *argv[]) {
+ testing::InitGoogleTest(&argc, argv);
+ google::ParseCommandLineFlags(&argc, &argv, true);
+ return RUN_ALL_TESTS();
+}
+