Return if we handle any error messages to avoid unnecessarily calling recv/send
[folly.git] / folly / io / IOBufQueue.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/IOBufQueue.h>
18
19 #include <string.h>
20
21 #include <stdexcept>
22
23 using std::make_pair;
24 using std::pair;
25 using std::unique_ptr;
26
27 namespace {
28
29 using folly::IOBuf;
30
31 const size_t MIN_ALLOC_SIZE = 2000;
32 const size_t MAX_ALLOC_SIZE = 8000;
33 const size_t MAX_PACK_COPY = 4096;
34
35 /**
36  * Convenience function to append chain src to chain dst.
37  */
38 void
39 appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src, bool pack) {
40   if (dst == nullptr) {
41     dst = std::move(src);
42   } else {
43     IOBuf* tail = dst->prev();
44     if (pack) {
45       // Copy up to MAX_PACK_COPY bytes if we can free buffers; this helps
46       // reduce wastage (the tail's tailroom and the head's headroom) when
47       // joining two IOBufQueues together.
48       size_t copyRemaining = MAX_PACK_COPY;
49       uint64_t n;
50       while (src &&
51              (n = src->length()) < copyRemaining &&
52              n < tail->tailroom()) {
53         memcpy(tail->writableTail(), src->data(), n);
54         tail->append(n);
55         copyRemaining -= n;
56         src = src->pop();
57       }
58     }
59     if (src) {
60       tail->appendChain(std::move(src));
61     }
62   }
63 }
64
65 } // namespace
66
67 namespace folly {
68
69 IOBufQueue::IOBufQueue(const Options& options)
70     : options_(options), cachePtr_(&localCache_) {
71   localCache_.attached = true;
72 }
73
74 IOBufQueue::~IOBufQueue() {
75   clearWritableRangeCache();
76 }
77
78 IOBufQueue::IOBufQueue(IOBufQueue&& other) noexcept
79     : options_(other.options_), cachePtr_(&localCache_) {
80   other.clearWritableRangeCache();
81   head_ = std::move(other.head_);
82   chainLength_ = other.chainLength_;
83
84   tailStart_ = other.tailStart_;
85   localCache_.cachedRange = other.localCache_.cachedRange;
86   localCache_.attached = true;
87
88   other.chainLength_ = 0;
89   other.tailStart_ = nullptr;
90   other.localCache_.cachedRange = {nullptr, nullptr};
91 }
92
93 IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
94   if (&other != this) {
95     other.clearWritableRangeCache();
96     clearWritableRangeCache();
97
98     options_ = other.options_;
99     head_ = std::move(other.head_);
100     chainLength_ = other.chainLength_;
101
102     tailStart_ = other.tailStart_;
103     localCache_.cachedRange = other.localCache_.cachedRange;
104     localCache_.attached = true;
105
106     other.chainLength_ = 0;
107     other.tailStart_ = nullptr;
108     other.localCache_.cachedRange = {nullptr, nullptr};
109   }
110   return *this;
111 }
112
113 std::pair<void*, uint64_t>
114 IOBufQueue::headroom() {
115   // Note, headroom is independent from the tail, so we don't need to flush the
116   // cache.
117   if (head_) {
118     return std::make_pair(head_->writableBuffer(), head_->headroom());
119   } else {
120     return std::make_pair(nullptr, 0);
121   }
122 }
123
124 void
125 IOBufQueue::markPrepended(uint64_t n) {
126   if (n == 0) {
127     return;
128   }
129   // Note, headroom is independent from the tail, so we don't need to flush the
130   // cache.
131   assert(head_);
132   head_->prepend(n);
133   chainLength_ += n;
134 }
135
136 void
137 IOBufQueue::prepend(const void* buf, uint64_t n) {
138   // We're not touching the tail, so we don't need to flush the cache.
139   auto hroom = head_->headroom();
140   if (!head_ || hroom < n) {
141     throw std::overflow_error("Not enough room to prepend");
142   }
143   memcpy(head_->writableBuffer() + hroom - n, buf, n);
144   head_->prepend(n);
145   chainLength_ += n;
146 }
147
148 void
149 IOBufQueue::append(unique_ptr<IOBuf>&& buf, bool pack) {
150   if (!buf) {
151     return;
152   }
153   auto guard = updateGuard();
154   if (options_.cacheChainLength) {
155     chainLength_ += buf->computeChainDataLength();
156   }
157   appendToChain(head_, std::move(buf), pack);
158 }
159
160 void
161 IOBufQueue::append(IOBufQueue& other, bool pack) {
162   if (!other.head_) {
163     return;
164   }
165   // We're going to chain other, thus we need to grab both guards.
166   auto otherGuard = other.updateGuard();
167   auto guard = updateGuard();
168   if (options_.cacheChainLength) {
169     if (other.options_.cacheChainLength) {
170       chainLength_ += other.chainLength_;
171     } else {
172       chainLength_ += other.head_->computeChainDataLength();
173     }
174   }
175   appendToChain(head_, std::move(other.head_), pack);
176   other.chainLength_ = 0;
177 }
178
179 void
180 IOBufQueue::append(const void* buf, size_t len) {
181   auto guard = updateGuard();
182   auto src = static_cast<const uint8_t*>(buf);
183   while (len != 0) {
184     if ((head_ == nullptr) || head_->prev()->isSharedOne() ||
185         (head_->prev()->tailroom() == 0)) {
186       appendToChain(head_,
187           IOBuf::create(std::max(MIN_ALLOC_SIZE,
188               std::min(len, MAX_ALLOC_SIZE))),
189           false);
190     }
191     IOBuf* last = head_->prev();
192     uint64_t copyLen = std::min(len, (size_t)last->tailroom());
193     memcpy(last->writableTail(), src, copyLen);
194     src += copyLen;
195     last->append(copyLen);
196     chainLength_ += copyLen;
197     len -= copyLen;
198   }
199 }
200
201 void
202 IOBufQueue::wrapBuffer(const void* buf, size_t len, uint64_t blockSize) {
203   auto src = static_cast<const uint8_t*>(buf);
204   while (len != 0) {
205     size_t n = std::min(len, size_t(blockSize));
206     append(IOBuf::wrapBuffer(src, n));
207     src += n;
208     len -= n;
209   }
210 }
211
212 pair<void*,uint64_t>
213 IOBufQueue::preallocateSlow(uint64_t min, uint64_t newAllocationSize,
214                             uint64_t max) {
215   // Avoid grabbing update guard, since we're manually setting the cache ptrs.
216   flushCache();
217   // Allocate a new buffer of the requested max size.
218   unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
219
220   tailStart_ = newBuf->writableTail();
221   cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
222       tailStart_, tailStart_ + newBuf->tailroom());
223   appendToChain(head_, std::move(newBuf), false);
224   return make_pair(writableTail(), std::min<uint64_t>(max, tailroom()));
225 }
226
227 unique_ptr<IOBuf> IOBufQueue::split(size_t n, bool throwOnUnderflow) {
228   auto guard = updateGuard();
229   unique_ptr<IOBuf> result;
230   while (n != 0) {
231     if (head_ == nullptr) {
232       if (throwOnUnderflow) {
233         throw std::underflow_error(
234             "Attempt to remove more bytes than are present in IOBufQueue");
235       } else {
236         break;
237       }
238     } else if (head_->length() <= n) {
239       n -= head_->length();
240       chainLength_ -= head_->length();
241       unique_ptr<IOBuf> remainder = head_->pop();
242       appendToChain(result, std::move(head_), false);
243       head_ = std::move(remainder);
244     } else {
245       unique_ptr<IOBuf> clone = head_->cloneOne();
246       clone->trimEnd(clone->length() - n);
247       appendToChain(result, std::move(clone), false);
248       head_->trimStart(n);
249       chainLength_ -= n;
250       break;
251     }
252   }
253   if (UNLIKELY(result == nullptr)) {
254     return IOBuf::create(0);
255   }
256   return result;
257 }
258
259 void IOBufQueue::trimStart(size_t amount) {
260   auto trimmed = trimStartAtMost(amount);
261   if (trimmed != amount) {
262     throw std::underflow_error(
263         "Attempt to trim more bytes than are present in IOBufQueue");
264   }
265 }
266
267 size_t IOBufQueue::trimStartAtMost(size_t amount) {
268   auto guard = updateGuard();
269   auto original = amount;
270   while (amount > 0) {
271     if (!head_) {
272       break;
273     }
274     if (head_->length() > amount) {
275       head_->trimStart(amount);
276       chainLength_ -= amount;
277       amount = 0;
278       break;
279     }
280     amount -= head_->length();
281     chainLength_ -= head_->length();
282     head_ = head_->pop();
283   }
284   return original - amount;
285 }
286
287 void IOBufQueue::trimEnd(size_t amount) {
288   auto trimmed = trimEndAtMost(amount);
289   if (trimmed != amount) {
290     throw std::underflow_error(
291         "Attempt to trim more bytes than are present in IOBufQueue");
292   }
293 }
294
295 size_t IOBufQueue::trimEndAtMost(size_t amount) {
296   auto guard = updateGuard();
297   auto original = amount;
298   while (amount > 0) {
299     if (!head_) {
300       break;
301     }
302     if (head_->prev()->length() > amount) {
303       head_->prev()->trimEnd(amount);
304       chainLength_ -= amount;
305       amount = 0;
306       break;
307     }
308     amount -= head_->prev()->length();
309     chainLength_ -= head_->prev()->length();
310
311     if (head_->isChained()) {
312       head_->prev()->unlink();
313     } else {
314       head_.reset();
315     }
316   }
317   return original - amount;
318 }
319
320 std::unique_ptr<folly::IOBuf> IOBufQueue::pop_front() {
321   auto guard = updateGuard();
322   if (!head_) {
323     return nullptr;
324   }
325   chainLength_ -= head_->length();
326   std::unique_ptr<folly::IOBuf> retBuf = std::move(head_);
327   head_ = retBuf->pop();
328   return retBuf;
329 }
330
331 void IOBufQueue::clear() {
332   if (!head_) {
333     return;
334   }
335   auto guard = updateGuard();
336   IOBuf* buf = head_.get();
337   do {
338     buf->clear();
339     buf = buf->next();
340   } while (buf != head_.get());
341   chainLength_ = 0;
342 }
343
344 void IOBufQueue::appendToString(std::string& out) const {
345   if (!head_) {
346     return;
347   }
348   auto len = options_.cacheChainLength
349       ? chainLength_ + (cachePtr_->cachedRange.first - tailStart_)
350       : head_->computeChainDataLength() +
351           (cachePtr_->cachedRange.first - tailStart_);
352   out.reserve(out.size() + len);
353
354   for (auto range : *head_) {
355     out.append(reinterpret_cast<const char*>(range.data()), range.size());
356   }
357
358   if (tailStart_ != cachePtr_->cachedRange.first) {
359     out.append(
360         reinterpret_cast<const char*>(tailStart_),
361         cachePtr_->cachedRange.first - tailStart_);
362   }
363 }
364
365 void IOBufQueue::gather(uint64_t maxLength) {
366   auto guard = updateGuard();
367   if (head_ != nullptr) {
368     head_->gather(maxLength);
369   }
370 }
371
372 } // namespace folly