2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/io/IOBufQueue.h"
25 using std::unique_ptr;
31 const size_t MIN_ALLOC_SIZE = 2000;
32 const size_t MAX_ALLOC_SIZE = 8000; // Must fit within a uint32_t
35 * Convenience function to append chain src to chain dst.
38 appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src) {
42 dst->prev()->appendChain(std::move(src));
46 } // anonymous namespace
50 IOBufQueue::IOBufQueue(const Options& options)
55 IOBufQueue::IOBufQueue(IOBufQueue&& other)
56 : options_(other.options_),
57 chainLength_(other.chainLength_),
58 head_(std::move(other.head_)) {
59 other.chainLength_ = 0;
62 IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
64 options_ = other.options_;
65 chainLength_ = other.chainLength_;
66 head_ = std::move(other.head_);
67 other.chainLength_ = 0;
72 std::pair<void*, uint32_t>
73 IOBufQueue::headroom() {
75 return std::make_pair(head_->writableBuffer(), head_->headroom());
77 return std::make_pair(nullptr, 0);
82 IOBufQueue::markPrepended(uint32_t n) {
88 if (options_.cacheChainLength) {
94 IOBufQueue::prepend(const void* buf, uint32_t n) {
97 throw std::overflow_error("Not enough room to prepend");
99 memcpy(static_cast<char*>(p.first) + p.second - n, buf, n);
104 IOBufQueue::append(unique_ptr<IOBuf>&& buf) {
108 if (options_.cacheChainLength) {
109 chainLength_ += buf->computeChainDataLength();
111 appendToChain(head_, std::move(buf));
115 IOBufQueue::append(IOBufQueue& other) {
119 if (options_.cacheChainLength) {
120 if (other.options_.cacheChainLength) {
121 chainLength_ += other.chainLength_;
123 chainLength_ += other.head_->computeChainDataLength();
126 appendToChain(head_, std::move(other.head_));
127 other.chainLength_ = 0;
131 IOBufQueue::append(const void* buf, size_t len) {
132 auto src = static_cast<const uint8_t*>(buf);
134 if ((head_ == NULL) || head_->prev()->isSharedOne() ||
135 (head_->prev()->tailroom() == 0)) {
136 appendToChain(head_, std::move(
137 IOBuf::create(std::max(MIN_ALLOC_SIZE,
138 std::min(len, MAX_ALLOC_SIZE)))));
140 IOBuf* last = head_->prev();
141 uint32_t copyLen = std::min(len, (size_t)last->tailroom());
142 memcpy(last->writableTail(), src, copyLen);
144 last->append(copyLen);
145 if (options_.cacheChainLength) {
146 chainLength_ += copyLen;
153 IOBufQueue::wrapBuffer(const void* buf, size_t len, uint32_t blockSize) {
154 auto src = static_cast<const uint8_t*>(buf);
156 size_t n = std::min(len, size_t(blockSize));
157 append(IOBuf::wrapBuffer(src, n));
164 IOBufQueue::preallocate(uint32_t min, uint32_t newAllocationSize,
167 // If there's enough space left over at the end of the queue, use that.
168 IOBuf* last = head_->prev();
169 if (!last->isSharedOne()) {
170 uint32_t avail = last->tailroom();
172 return make_pair(last->writableTail(), std::min(max, avail));
176 // Allocate a new buffer of the requested max size.
177 unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
178 appendToChain(head_, std::move(newBuf));
179 IOBuf* last = head_->prev();
180 return make_pair(last->writableTail(),
181 std::min(max, last->tailroom()));
185 IOBufQueue::postallocate(uint32_t n) {
186 head_->prev()->append(n);
187 if (options_.cacheChainLength) {
193 IOBufQueue::split(size_t n) {
194 unique_ptr<IOBuf> result;
197 throw std::underflow_error(
198 "Attempt to remove more bytes than are present in IOBufQueue");
199 } else if (head_->length() <= n) {
200 n -= head_->length();
201 if (options_.cacheChainLength) {
202 chainLength_ -= head_->length();
204 unique_ptr<IOBuf> remainder = head_->pop();
205 appendToChain(result, std::move(head_));
206 head_ = std::move(remainder);
208 unique_ptr<IOBuf> clone = head_->cloneOne();
209 clone->trimEnd(clone->length() - n);
210 appendToChain(result, std::move(clone));
212 if (options_.cacheChainLength) {
218 return std::move(result);
221 void IOBufQueue::trimStart(size_t amount) {
224 throw std::underflow_error(
225 "Attempt to trim more bytes than are present in IOBufQueue");
227 if (head_->length() > amount) {
228 head_->trimStart(amount);
229 if (options_.cacheChainLength) {
230 chainLength_ -= amount;
234 amount -= head_->length();
235 if (options_.cacheChainLength) {
236 chainLength_ -= head_->length();
238 head_ = head_->pop();
242 void IOBufQueue::trimEnd(size_t amount) {
245 throw std::underflow_error(
246 "Attempt to trim more bytes than are present in IOBufQueue");
248 if (head_->prev()->length() > amount) {
249 head_->prev()->trimEnd(amount);
250 if (options_.cacheChainLength) {
251 chainLength_ -= amount;
255 amount -= head_->prev()->length();
256 if (options_.cacheChainLength) {
257 chainLength_ -= head_->prev()->length();
259 unique_ptr<IOBuf> b = head_->prev()->unlink();
261 // Null queue if we unlinked the head.
262 if (b.get() == head_.get()) {