2 * Copyright 2013-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/IOBufQueue.h>
25 using std::unique_ptr;
31 const size_t MIN_ALLOC_SIZE = 2000;
32 const size_t MAX_ALLOC_SIZE = 8000;
33 const size_t MAX_PACK_COPY = 4096;
36 * Convenience function to append chain src to chain dst.
39 appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src, bool pack) {
43 IOBuf* tail = dst->prev();
45 // Copy up to MAX_PACK_COPY bytes if we can free buffers; this helps
46 // reduce wastage (the tail's tailroom and the head's headroom) when
47 // joining two IOBufQueues together.
48 size_t copyRemaining = MAX_PACK_COPY;
51 (n = src->length()) < copyRemaining &&
52 n < tail->tailroom()) {
53 memcpy(tail->writableTail(), src->data(), n);
60 tail->appendChain(std::move(src));
69 IOBufQueue::IOBufQueue(const Options& options)
70 : options_(options), cachePtr_(&localCache_) {
71 localCache_.attached = true;
74 IOBufQueue::~IOBufQueue() {
75 clearWritableRangeCache();
78 IOBufQueue::IOBufQueue(IOBufQueue&& other) noexcept
79 : options_(other.options_), cachePtr_(&localCache_) {
80 other.clearWritableRangeCache();
81 head_ = std::move(other.head_);
82 chainLength_ = other.chainLength_;
84 tailStart_ = other.tailStart_;
85 localCache_.cachedRange = other.localCache_.cachedRange;
86 localCache_.attached = true;
88 other.chainLength_ = 0;
89 other.tailStart_ = nullptr;
90 other.localCache_.cachedRange = {nullptr, nullptr};
93 IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
95 other.clearWritableRangeCache();
96 clearWritableRangeCache();
98 options_ = other.options_;
99 head_ = std::move(other.head_);
100 chainLength_ = other.chainLength_;
102 tailStart_ = other.tailStart_;
103 localCache_.cachedRange = other.localCache_.cachedRange;
104 localCache_.attached = true;
106 other.chainLength_ = 0;
107 other.tailStart_ = nullptr;
108 other.localCache_.cachedRange = {nullptr, nullptr};
113 std::pair<void*, uint64_t>
114 IOBufQueue::headroom() {
115 // Note, headroom is independent from the tail, so we don't need to flush the
118 return std::make_pair(head_->writableBuffer(), head_->headroom());
120 return std::make_pair(nullptr, 0);
125 IOBufQueue::markPrepended(uint64_t n) {
129 // Note, headroom is independent from the tail, so we don't need to flush the
137 IOBufQueue::prepend(const void* buf, uint64_t n) {
138 // We're not touching the tail, so we don't need to flush the cache.
139 auto hroom = head_->headroom();
140 if (!head_ || hroom < n) {
141 throw std::overflow_error("Not enough room to prepend");
143 memcpy(head_->writableBuffer() + hroom - n, buf, n);
149 IOBufQueue::append(unique_ptr<IOBuf>&& buf, bool pack) {
153 auto guard = updateGuard();
154 if (options_.cacheChainLength) {
155 chainLength_ += buf->computeChainDataLength();
157 appendToChain(head_, std::move(buf), pack);
161 IOBufQueue::append(IOBufQueue& other, bool pack) {
165 // We're going to chain other, thus we need to grab both guards.
166 auto otherGuard = other.updateGuard();
167 auto guard = updateGuard();
168 if (options_.cacheChainLength) {
169 if (other.options_.cacheChainLength) {
170 chainLength_ += other.chainLength_;
172 chainLength_ += other.head_->computeChainDataLength();
175 appendToChain(head_, std::move(other.head_), pack);
176 other.chainLength_ = 0;
180 IOBufQueue::append(const void* buf, size_t len) {
181 auto guard = updateGuard();
182 auto src = static_cast<const uint8_t*>(buf);
184 if ((head_ == nullptr) || head_->prev()->isSharedOne() ||
185 (head_->prev()->tailroom() == 0)) {
187 IOBuf::create(std::max(MIN_ALLOC_SIZE,
188 std::min(len, MAX_ALLOC_SIZE))),
191 IOBuf* last = head_->prev();
192 uint64_t copyLen = std::min(len, (size_t)last->tailroom());
193 memcpy(last->writableTail(), src, copyLen);
195 last->append(copyLen);
196 chainLength_ += copyLen;
202 IOBufQueue::wrapBuffer(const void* buf, size_t len, uint64_t blockSize) {
203 auto src = static_cast<const uint8_t*>(buf);
205 size_t n = std::min(len, size_t(blockSize));
206 append(IOBuf::wrapBuffer(src, n));
213 IOBufQueue::preallocateSlow(uint64_t min, uint64_t newAllocationSize,
215 // Avoid grabbing update guard, since we're manually setting the cache ptrs.
217 // Allocate a new buffer of the requested max size.
218 unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
220 tailStart_ = newBuf->writableTail();
221 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
222 tailStart_, tailStart_ + newBuf->tailroom());
223 appendToChain(head_, std::move(newBuf), false);
224 return make_pair(writableTail(), std::min<uint64_t>(max, tailroom()));
227 unique_ptr<IOBuf> IOBufQueue::split(size_t n, bool throwOnUnderflow) {
228 auto guard = updateGuard();
229 unique_ptr<IOBuf> result;
231 if (head_ == nullptr) {
232 if (throwOnUnderflow) {
233 throw std::underflow_error(
234 "Attempt to remove more bytes than are present in IOBufQueue");
238 } else if (head_->length() <= n) {
239 n -= head_->length();
240 chainLength_ -= head_->length();
241 unique_ptr<IOBuf> remainder = head_->pop();
242 appendToChain(result, std::move(head_), false);
243 head_ = std::move(remainder);
245 unique_ptr<IOBuf> clone = head_->cloneOne();
246 clone->trimEnd(clone->length() - n);
247 appendToChain(result, std::move(clone), false);
253 if (UNLIKELY(result == nullptr)) {
254 return IOBuf::create(0);
259 void IOBufQueue::trimStart(size_t amount) {
260 auto trimmed = trimStartAtMost(amount);
261 if (trimmed != amount) {
262 throw std::underflow_error(
263 "Attempt to trim more bytes than are present in IOBufQueue");
267 size_t IOBufQueue::trimStartAtMost(size_t amount) {
268 auto guard = updateGuard();
269 auto original = amount;
274 if (head_->length() > amount) {
275 head_->trimStart(amount);
276 chainLength_ -= amount;
280 amount -= head_->length();
281 chainLength_ -= head_->length();
282 head_ = head_->pop();
284 return original - amount;
287 void IOBufQueue::trimEnd(size_t amount) {
288 auto trimmed = trimEndAtMost(amount);
289 if (trimmed != amount) {
290 throw std::underflow_error(
291 "Attempt to trim more bytes than are present in IOBufQueue");
295 size_t IOBufQueue::trimEndAtMost(size_t amount) {
296 auto guard = updateGuard();
297 auto original = amount;
302 if (head_->prev()->length() > amount) {
303 head_->prev()->trimEnd(amount);
304 chainLength_ -= amount;
308 amount -= head_->prev()->length();
309 chainLength_ -= head_->prev()->length();
311 if (head_->isChained()) {
312 head_->prev()->unlink();
317 return original - amount;
320 std::unique_ptr<folly::IOBuf> IOBufQueue::pop_front() {
321 auto guard = updateGuard();
325 chainLength_ -= head_->length();
326 std::unique_ptr<folly::IOBuf> retBuf = std::move(head_);
327 head_ = retBuf->pop();
331 void IOBufQueue::clear() {
335 auto guard = updateGuard();
336 IOBuf* buf = head_.get();
340 } while (buf != head_.get());
344 void IOBufQueue::appendToString(std::string& out) const {
348 auto len = options_.cacheChainLength
349 ? chainLength_ + (cachePtr_->cachedRange.first - tailStart_)
350 : head_->computeChainDataLength() +
351 (cachePtr_->cachedRange.first - tailStart_);
352 out.reserve(out.size() + len);
354 for (auto range : *head_) {
355 out.append(reinterpret_cast<const char*>(range.data()), range.size());
358 if (tailStart_ != cachePtr_->cachedRange.first) {
360 reinterpret_cast<const char*>(tailStart_),
361 cachePtr_->cachedRange.first - tailStart_);
365 void IOBufQueue::gather(uint64_t maxLength) {
366 auto guard = updateGuard();
367 if (head_ != nullptr) {
368 head_->gather(maxLength);