Add a default timeout parameter to HHWheelTimer.
[folly.git] / folly / io / IOBufQueue.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/IOBufQueue.h>
18
19 #include <string.h>
20
21 #include <stdexcept>
22
23 using std::make_pair;
24 using std::pair;
25 using std::unique_ptr;
26
27 namespace {
28
29 using folly::IOBuf;
30
31 const size_t MIN_ALLOC_SIZE = 2000;
32 const size_t MAX_ALLOC_SIZE = 8000;
33 const size_t MAX_PACK_COPY = 4096;
34
35 /**
36  * Convenience function to append chain src to chain dst.
37  */
38 void
39 appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src, bool pack) {
40   if (dst == nullptr) {
41     dst = std::move(src);
42   } else {
43     IOBuf* tail = dst->prev();
44     if (pack) {
45       // Copy up to MAX_PACK_COPY bytes if we can free buffers; this helps
46       // reduce wastage (the tail's tailroom and the head's headroom) when
47       // joining two IOBufQueues together.
48       size_t copyRemaining = MAX_PACK_COPY;
49       uint64_t n;
50       while (src &&
51              (n = src->length()) < copyRemaining &&
52              n < tail->tailroom()) {
53         memcpy(tail->writableTail(), src->data(), n);
54         tail->append(n);
55         copyRemaining -= n;
56         src = src->pop();
57       }
58     }
59     if (src) {
60       tail->appendChain(std::move(src));
61     }
62   }
63 }
64
65 } // anonymous namespace
66
67 namespace folly {
68
69 IOBufQueue::IOBufQueue(const Options& options)
70   : options_(options),
71     chainLength_(0) {
72 }
73
74 IOBufQueue::IOBufQueue(IOBufQueue&& other) noexcept
75   : options_(other.options_),
76     chainLength_(other.chainLength_),
77     head_(std::move(other.head_)) {
78   other.chainLength_ = 0;
79 }
80
81 IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
82   if (&other != this) {
83     options_ = other.options_;
84     chainLength_ = other.chainLength_;
85     head_ = std::move(other.head_);
86     other.chainLength_ = 0;
87   }
88   return *this;
89 }
90
91 std::pair<void*, uint64_t>
92 IOBufQueue::headroom() {
93   if (head_) {
94     return std::make_pair(head_->writableBuffer(), head_->headroom());
95   } else {
96     return std::make_pair(nullptr, 0);
97   }
98 }
99
100 void
101 IOBufQueue::markPrepended(uint64_t n) {
102   if (n == 0) {
103     return;
104   }
105   assert(head_);
106   head_->prepend(n);
107   chainLength_ += n;
108 }
109
110 void
111 IOBufQueue::prepend(const void* buf, uint64_t n) {
112   auto p = headroom();
113   if (n > p.second) {
114     throw std::overflow_error("Not enough room to prepend");
115   }
116   memcpy(static_cast<char*>(p.first) + p.second - n, buf, n);
117   markPrepended(n);
118 }
119
120 void
121 IOBufQueue::append(unique_ptr<IOBuf>&& buf, bool pack) {
122   if (!buf) {
123     return;
124   }
125   if (options_.cacheChainLength) {
126     chainLength_ += buf->computeChainDataLength();
127   }
128   appendToChain(head_, std::move(buf), pack);
129 }
130
131 void
132 IOBufQueue::append(IOBufQueue& other, bool pack) {
133   if (!other.head_) {
134     return;
135   }
136   if (options_.cacheChainLength) {
137     if (other.options_.cacheChainLength) {
138       chainLength_ += other.chainLength_;
139     } else {
140       chainLength_ += other.head_->computeChainDataLength();
141     }
142   }
143   appendToChain(head_, std::move(other.head_), pack);
144   other.chainLength_ = 0;
145 }
146
147 void
148 IOBufQueue::append(const void* buf, size_t len) {
149   auto src = static_cast<const uint8_t*>(buf);
150   while (len != 0) {
151     if ((head_ == nullptr) || head_->prev()->isSharedOne() ||
152         (head_->prev()->tailroom() == 0)) {
153       appendToChain(head_, std::move(
154           IOBuf::create(std::max(MIN_ALLOC_SIZE,
155               std::min(len, MAX_ALLOC_SIZE)))),
156           false);
157     }
158     IOBuf* last = head_->prev();
159     uint64_t copyLen = std::min(len, (size_t)last->tailroom());
160     memcpy(last->writableTail(), src, copyLen);
161     src += copyLen;
162     last->append(copyLen);
163     chainLength_ += copyLen;
164     len -= copyLen;
165   }
166 }
167
168 void
169 IOBufQueue::wrapBuffer(const void* buf, size_t len, uint64_t blockSize) {
170   auto src = static_cast<const uint8_t*>(buf);
171   while (len != 0) {
172     size_t n = std::min(len, size_t(blockSize));
173     append(IOBuf::wrapBuffer(src, n));
174     src += n;
175     len -= n;
176   }
177 }
178
179 pair<void*,uint64_t>
180 IOBufQueue::preallocateSlow(uint64_t min, uint64_t newAllocationSize,
181                             uint64_t max) {
182   // Allocate a new buffer of the requested max size.
183   unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
184   appendToChain(head_, std::move(newBuf), false);
185   IOBuf* last = head_->prev();
186   return make_pair(last->writableTail(),
187                    std::min(max, last->tailroom()));
188 }
189
190 unique_ptr<IOBuf>
191 IOBufQueue::split(size_t n) {
192   unique_ptr<IOBuf> result;
193   while (n != 0) {
194     if (head_ == nullptr) {
195       throw std::underflow_error(
196           "Attempt to remove more bytes than are present in IOBufQueue");
197     } else if (head_->length() <= n) {
198       n -= head_->length();
199       chainLength_ -= head_->length();
200       unique_ptr<IOBuf> remainder = head_->pop();
201       appendToChain(result, std::move(head_), false);
202       head_ = std::move(remainder);
203     } else {
204       unique_ptr<IOBuf> clone = head_->cloneOne();
205       clone->trimEnd(clone->length() - n);
206       appendToChain(result, std::move(clone), false);
207       head_->trimStart(n);
208       chainLength_ -= n;
209       break;
210     }
211   }
212   return std::move(result);
213 }
214
215 void IOBufQueue::trimStart(size_t amount) {
216   while (amount > 0) {
217     if (!head_) {
218       throw std::underflow_error(
219         "Attempt to trim more bytes than are present in IOBufQueue");
220     }
221     if (head_->length() > amount) {
222       head_->trimStart(amount);
223       chainLength_ -= amount;
224       break;
225     }
226     amount -= head_->length();
227     chainLength_ -= head_->length();
228     head_ = head_->pop();
229   }
230 }
231
232 void IOBufQueue::trimEnd(size_t amount) {
233   while (amount > 0) {
234     if (!head_) {
235       throw std::underflow_error(
236         "Attempt to trim more bytes than are present in IOBufQueue");
237     }
238     if (head_->prev()->length() > amount) {
239       head_->prev()->trimEnd(amount);
240       chainLength_ -= amount;
241       break;
242     }
243     amount -= head_->prev()->length();
244     chainLength_ -= head_->prev()->length();
245
246     if (head_->isChained()) {
247       head_->prev()->unlink();
248     } else {
249       head_.reset();
250     }
251   }
252 }
253
254 std::unique_ptr<folly::IOBuf> IOBufQueue::pop_front() {
255   if (!head_) {
256     return nullptr;
257   }
258   chainLength_ -= head_->length();
259   std::unique_ptr<folly::IOBuf> retBuf = std::move(head_);
260   head_ = retBuf->pop();
261   return retBuf;
262 }
263
264 void IOBufQueue::clear() {
265   if (!head_) {
266     return;
267   }
268   IOBuf* buf = head_.get();
269   do {
270     buf->clear();
271     buf = buf->next();
272   } while (buf != head_.get());
273   chainLength_ = 0;
274 }
275
276 void IOBufQueue::appendToString(std::string& out) const {
277   if (!head_) {
278     return;
279   }
280   auto len =
281     options_.cacheChainLength ? chainLength_ : head_->computeChainDataLength();
282   out.reserve(out.size() + len);
283
284   for (auto range : *head_) {
285     out.append(reinterpret_cast<const char*>(range.data()), range.size());
286   }
287 }
288
289 } // folly