9dcf3c917227df2fa3f1993f3df8b0ac38bfd679
[folly.git] / folly / io / RecordIO.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/io/RecordIO.h"
18
19 #include <sys/types.h>
20 #include <unistd.h>
21
22 #include "folly/Exception.h"
23 #include "folly/FileUtil.h"
24 #include "folly/Memory.h"
25 #include "folly/Portability.h"
26 #include "folly/ScopeGuard.h"
27 #include "folly/String.h"
28
29 namespace folly {
30
31 using namespace recordio_helpers;
32
33 RecordIOWriter::RecordIOWriter(File file, uint32_t fileId)
34   : file_(std::move(file)),
35     fileId_(fileId),
36     writeLock_(file_, std::defer_lock),
37     filePos_(0) {
38   if (!writeLock_.try_lock()) {
39     throw std::runtime_error("RecordIOWriter: file locked by another process");
40   }
41
42   struct stat st;
43   checkUnixError(fstat(file_.fd(), &st), "fstat() failed");
44
45   filePos_ = st.st_size;
46 }
47
48 void RecordIOWriter::write(std::unique_ptr<IOBuf> buf) {
49   size_t totalLength = prependHeader(buf, fileId_);
50   if (totalLength == 0) {
51     return;  // nothing to do
52   }
53
54   DCHECK_EQ(buf->computeChainDataLength(), totalLength);
55
56   // We're going to write.  Reserve space for ourselves.
57   off_t pos = filePos_.fetch_add(totalLength);
58
59 #if FOLLY_HAVE_PWRITEV
60   auto iov = buf->getIov();
61   ssize_t bytes = pwritevFull(file_.fd(), iov.data(), iov.size(), pos);
62 #else
63   buf->unshare();
64   buf->coalesce();
65   ssize_t bytes = pwriteFull(file_.fd(), buf->data(), buf->length(), pos);
66 #endif
67
68   checkUnixError(bytes, "pwrite() failed");
69   DCHECK_EQ(bytes, totalLength);
70 }
71
72 RecordIOReader::RecordIOReader(File file, uint32_t fileId)
73   : map_(std::move(file)),
74     fileId_(fileId) {
75 }
76
77 RecordIOReader::Iterator::Iterator(ByteRange range, uint32_t fileId, off_t pos)
78   : range_(range),
79     fileId_(fileId),
80     recordAndPos_(ByteRange(), 0) {
81   if (pos >= range_.size()) {
82     recordAndPos_.second = off_t(-1);
83     range_.clear();
84   } else {
85     recordAndPos_.second = pos;
86     range_.advance(pos);
87     advanceToValid();
88   }
89 }
90
91 void RecordIOReader::Iterator::advanceToValid() {
92   ByteRange record = findRecord(range_, fileId_).record;
93   if (record.empty()) {
94     recordAndPos_ = std::make_pair(ByteRange(), off_t(-1));
95     range_.clear();  // at end
96   } else {
97     size_t skipped = record.begin() - range_.begin();
98     DCHECK_GE(skipped, headerSize());
99     skipped -= headerSize();
100     range_.advance(skipped);
101     recordAndPos_.first = record;
102     recordAndPos_.second += skipped;
103   }
104 }
105
106 namespace recordio_helpers {
107
108 using namespace detail;
109
110 namespace {
111
112 constexpr uint32_t kHashSeed = 0xdeadbeef;  // for mcurtiss
113
114 uint32_t headerHash(const Header& header) {
115   return hash::SpookyHashV2::Hash32(&header, offsetof(Header, headerHash),
116                                     kHashSeed);
117 }
118
119 std::pair<size_t, uint64_t> dataLengthAndHash(const IOBuf* buf) {
120   size_t len = 0;
121   hash::SpookyHashV2 hasher;
122   hasher.Init(kHashSeed, kHashSeed);
123   for (auto br : *buf) {
124     len += br.size();
125     hasher.Update(br.data(), br.size());
126   }
127   uint64_t hash1;
128   uint64_t hash2;
129   hasher.Final(&hash1, &hash2);
130   if (len + headerSize() >= std::numeric_limits<uint32_t>::max()) {
131     throw std::invalid_argument("Record length must fit in 32 bits");
132   }
133   return std::make_pair(len, hash1);
134 }
135
136 uint64_t dataHash(ByteRange range) {
137   return hash::SpookyHashV2::Hash64(range.data(), range.size(), kHashSeed);
138 }
139
140 }  // namespace
141
142 size_t prependHeader(std::unique_ptr<IOBuf>& buf, uint32_t fileId) {
143   if (fileId == 0) {
144     throw std::invalid_argument("invalid file id");
145   }
146   auto lengthAndHash = dataLengthAndHash(buf.get());
147   if (lengthAndHash.first == 0) {
148     return 0;  // empty, nothing to do, no zero-length records
149   }
150
151   // Prepend to the first buffer in the chain if we have room, otherwise
152   // prepend a new buffer.
153   if (buf->headroom() >= headerSize()) {
154     buf->unshareOne();
155     buf->prepend(headerSize());
156   } else {
157     auto b = IOBuf::create(headerSize());
158     b->append(headerSize());
159     b->appendChain(std::move(buf));
160     buf = std::move(b);
161   }
162   detail::Header* header =
163     reinterpret_cast<detail::Header*>(buf->writableData());
164   memset(header, 0, sizeof(Header));
165   header->magic = detail::Header::kMagic;
166   header->fileId = fileId;
167   header->dataLength = lengthAndHash.first;
168   header->dataHash = lengthAndHash.second;
169   header->headerHash = headerHash(*header);
170
171   return lengthAndHash.first + headerSize();
172 }
173
174 RecordInfo validateRecord(ByteRange range, uint32_t fileId) {
175   if (range.size() <= headerSize()) {  // records may not be empty
176     return {0};
177   }
178   const Header* header = reinterpret_cast<const Header*>(range.begin());
179   range.advance(sizeof(Header));
180   if (header->magic != Header::kMagic ||
181       header->version != 0 ||
182       header->hashFunction != 0 ||
183       header->flags != 0 ||
184       (fileId != 0 && header->fileId != fileId) ||
185       header->dataLength > range.size()) {
186     return {0};
187   }
188   if (headerHash(*header) != header->headerHash) {
189     return {0};
190   }
191   range.reset(range.begin(), header->dataLength);
192   if (dataHash(range) != header->dataHash) {
193     return {0};
194   }
195   return {header->fileId, range};
196 }
197
198 RecordInfo findRecord(ByteRange searchRange,
199                       ByteRange wholeRange,
200                       uint32_t fileId) {
201   static const uint32_t magic = Header::kMagic;
202   static const ByteRange magicRange(reinterpret_cast<const uint8_t*>(&magic),
203                                     sizeof(magic));
204
205   DCHECK_GE(searchRange.begin(), wholeRange.begin());
206   DCHECK_LE(searchRange.end(), wholeRange.end());
207
208   const uint8_t* start = searchRange.begin();
209   const uint8_t* end = std::min(searchRange.end(),
210                                 wholeRange.end() - sizeof(Header));
211   // end-1: the last place where a Header could start
212   while (start < end) {
213     auto p = ByteRange(start, end + sizeof(magic)).find(magicRange);
214     if (p == ByteRange::npos) {
215       break;
216     }
217
218     start += p;
219     auto r = validateRecord(ByteRange(start, wholeRange.end()), fileId);
220     if (!r.record.empty()) {
221       return r;
222     }
223
224     // No repeated prefix in magic, so we can do better than start++
225     start += sizeof(magic);
226   }
227
228   return {0};
229 }
230
231 }  // namespace
232
233 }  // namespaces