2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/ScopeGuard.h>
20 #include <folly/io/IOBuf.h>
28 * An IOBufQueue encapsulates a chain of IOBufs and provides
29 * convenience functions to append data to the back of the chain
30 * and remove data from the front.
32 * You may also prepend data into the headroom of the first buffer in the
38 * This guard should be taken by any method that intends to do any changes
39 * to in data_ (e.g. appending to it).
41 * It flushes the writable tail cache and refills it on destruction.
45 return folly::makeGuard([this] { updateWritableTailCache(); });
48 struct WritableRangeCacheData {
49 std::pair<uint8_t*, uint8_t*> cachedRange;
52 WritableRangeCacheData() = default;
54 WritableRangeCacheData(WritableRangeCacheData&& other)
55 : cachedRange(other.cachedRange), attached(other.attached) {
56 other.cachedRange = {};
57 other.attached = false;
59 WritableRangeCacheData& operator=(WritableRangeCacheData&& other) {
60 cachedRange = other.cachedRange;
61 attached = other.attached;
63 other.cachedRange = {};
64 other.attached = false;
69 WritableRangeCacheData(const WritableRangeCacheData&) = delete;
70 WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete;
75 Options() : cacheChainLength(false) {}
76 bool cacheChainLength;
80 * Commonly used Options, currently the only possible value other than
83 static Options cacheChainLength() {
85 options.cacheChainLength = true;
90 * WritableRangeCache represents a cache of current writable tail and provides
91 * cheap and simple interface to append to it that avoids paying the cost of
92 * preallocate/postallocate pair (i.e. indirections and checks).
94 * The cache is flushed on destruction/copy/move and on non-const accesses to
95 * the underlying IOBufQueue.
97 * Note: there can be only one active cache for a given IOBufQueue, i.e. when
98 * you fill a cache object it automatically invalidates other
101 class WritableRangeCache {
103 explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) {
110 * Move constructor/assignment can move the cached range, but must update
111 * the reference in IOBufQueue.
113 WritableRangeCache(WritableRangeCache&& other)
114 : data_(std::move(other.data_)), queue_(other.queue_) {
115 if (data_.attached) {
116 queue_->updateCacheRef(data_);
119 WritableRangeCache& operator=(WritableRangeCache&& other) {
120 if (data_.attached) {
121 queue_->clearWritableRangeCache();
124 data_ = std::move(other.data_);
125 queue_ = other.queue_;
127 if (data_.attached) {
128 queue_->updateCacheRef(data_);
135 * Copy constructor/assignment cannot copy the cached range.
137 WritableRangeCache(const WritableRangeCache& other)
138 : queue_(other.queue_) {}
139 WritableRangeCache& operator=(const WritableRangeCache& other) {
140 if (data_.attached) {
141 queue_->clearWritableRangeCache();
144 queue_ = other.queue_;
149 ~WritableRangeCache() {
150 if (data_.attached) {
151 queue_->clearWritableRangeCache();
156 * Reset the underlying IOBufQueue, will flush current cache if present.
158 void reset(IOBufQueue* q) {
159 if (data_.attached) {
160 queue_->clearWritableRangeCache();
171 * Get a pointer to the underlying IOBufQueue object.
173 IOBufQueue* queue() {
178 * Return a pointer to the start of cached writable tail.
180 * Note: doesn't populate cache.
182 uint8_t* writableData() {
184 return data_.cachedRange.first;
188 * Return a length of cached writable tail.
190 * Note: doesn't populate cache.
194 return data_.cachedRange.second - data_.cachedRange.first;
198 * Mark n bytes as occupied (e.g. postallocate).
200 void append(size_t n) {
202 // This can happen only if somebody is misusing the interface.
203 // E.g. calling append after touching IOBufQueue or without checking
205 if (LIKELY(data_.cachedRange.first != nullptr)) {
206 DCHECK_LE(n, length());
207 data_.cachedRange.first += n;
214 * Same as append(n), but avoids checking if there is a cache.
215 * The caller must guarantee that the cache is set (e.g. the caller just
216 * called fillCache or checked that it's not empty).
218 void appendUnsafe(size_t n) {
219 data_.cachedRange.first += n;
223 * Fill the cache of writable tail from the underlying IOBufQueue.
226 queue_->fillWritableRangeCache(data_);
230 WritableRangeCacheData data_;
233 FOLLY_NOINLINE void appendSlow(size_t n) {
234 queue_->postallocate(n);
237 void dcheckIntegrity() {
238 // Tail start should always be less than tail end.
239 DCHECK_LE(data_.cachedRange.first, data_.cachedRange.second);
241 data_.cachedRange.first != nullptr ||
242 data_.cachedRange.second == nullptr);
244 // Cached range should be always empty if the cache is not attached.
247 (data_.cachedRange.first == nullptr &&
248 data_.cachedRange.second == nullptr));
250 // We cannot be in attached state if the queue_ is not set.
251 DCHECK(queue_ != nullptr || !data_.attached);
253 // If we're attached and the cache is not empty, then it should coincide
254 // with the tail buffer.
256 !data_.attached || data_.cachedRange.first == nullptr ||
257 (queue_->head_ != nullptr &&
258 data_.cachedRange.first >= queue_->head_->prev()->writableTail() &&
259 data_.cachedRange.second ==
260 queue_->head_->prev()->writableTail() +
261 queue_->head_->prev()->tailroom()));
265 explicit IOBufQueue(const Options& options = Options());
269 * Return a space to prepend bytes and the amount of headroom available.
271 std::pair<void*, uint64_t> headroom();
274 * Indicate that n bytes from the headroom have been used.
276 void markPrepended(uint64_t n);
279 * Prepend an existing range; throws std::overflow_error if not enough
282 void prepend(const void* buf, uint64_t n);
285 * Add a buffer or buffer chain to the end of this queue. The
286 * queue takes ownership of buf.
288 * If pack is true, we try to reduce wastage at the end of this queue
289 * by copying some data from the first buffers in the buf chain (and
290 * releasing the buffers), if possible. If pack is false, we leave
291 * the chain topology unchanged.
293 void append(std::unique_ptr<folly::IOBuf>&& buf,
297 * Add a queue to the end of this queue. The queue takes ownership of
298 * all buffers from the other queue.
300 void append(IOBufQueue& other, bool pack=false);
301 void append(IOBufQueue&& other, bool pack=false) {
302 append(other, pack); // call lvalue reference overload, above
306 * Copy len bytes, starting at buf, to the end of this queue.
307 * The caller retains ownership of the source data.
309 void append(const void* buf, size_t len);
312 * Copy a string to the end of this queue.
313 * The caller retains ownership of the source data.
315 void append(StringPiece sp) {
316 append(sp.data(), sp.size());
320 * Append a chain of IOBuf objects that point to consecutive regions
323 * Just like IOBuf::wrapBuffer, this should only be used when the caller
324 * knows ahead of time and can ensure that all IOBuf objects that will point
325 * to this buffer will be destroyed before the buffer itself is destroyed;
326 * all other caveats from wrapBuffer also apply.
328 * Every buffer except for the last will wrap exactly blockSize bytes.
329 * Importantly, this method may be used to wrap buffers larger than 4GB.
331 void wrapBuffer(const void* buf, size_t len,
332 uint64_t blockSize=(1U << 31)); // default block size: 2GB
335 * Obtain a writable block of contiguous bytes at the end of this
336 * queue, allocating more space if necessary. The amount of space
337 * reserved will be at least min. If min contiguous space is not
338 * available at the end of the queue, and IOBuf with size newAllocationSize
339 * is appended to the chain and returned. The actual available space
340 * may be larger than newAllocationSize, but will be truncated to max,
343 * If the caller subsequently writes anything into the returned space,
344 * it must call the postallocate() method.
346 * @return The starting address of the block and the length in bytes.
348 * @note The point of the preallocate()/postallocate() mechanism is
349 * to support I/O APIs such as Thrift's TAsyncSocket::ReadCallback
350 * that request a buffer from the application and then, in a later
351 * callback, tell the application how much of the buffer they've
354 std::pair<void*,uint64_t> preallocate(
355 uint64_t min, uint64_t newAllocationSize,
356 uint64_t max = std::numeric_limits<uint64_t>::max()) {
357 dcheckCacheIntegrity();
359 if (LIKELY(writableTail() != nullptr && tailroom() >= min)) {
360 return std::make_pair(
361 writableTail(), std::min<uint64_t>(max, tailroom()));
364 return preallocateSlow(min, newAllocationSize, max);
368 * Tell the queue that the caller has written data into the first n
369 * bytes provided by the previous preallocate() call.
371 * @note n should be less than or equal to the size returned by
372 * preallocate(). If n is zero, the caller may skip the call
373 * to postallocate(). If n is nonzero, the caller must not
374 * invoke any other non-const methods on this IOBufQueue between
375 * the call to preallocate and the call to postallocate().
377 void postallocate(uint64_t n) {
378 dcheckCacheIntegrity();
379 DCHECK_LE(cachePtr_->cachedRange.first + n, cachePtr_->cachedRange.second);
380 cachePtr_->cachedRange.first += n;
384 * Obtain a writable block of n contiguous bytes, allocating more space
385 * if necessary, and mark it as used. The caller can fill it later.
387 void* allocate(uint64_t n) {
388 void* p = preallocate(n, n).first;
393 void* writableTail() const {
394 dcheckCacheIntegrity();
395 return cachePtr_->cachedRange.first;
398 size_t tailroom() const {
399 dcheckCacheIntegrity();
400 return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first;
404 * Split off the first n bytes of the queue into a separate IOBuf chain,
405 * and transfer ownership of the new chain to the caller. The IOBufQueue
406 * retains ownership of everything after the split point.
408 * @warning If the split point lies in the middle of some IOBuf within
409 * the chain, this function may, as an implementation detail,
412 * @throws std::underflow_error if n exceeds the number of bytes
415 std::unique_ptr<folly::IOBuf> split(size_t n) {
416 return split(n, true);
420 * Similar to split, but will return the entire queue instead of throwing
421 * if n exceeds the number of bytes in the queue.
423 std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) {
424 return split(n, false);
428 * Similar to IOBuf::trimStart, but works on the whole queue. Will
429 * pop off buffers that have been completely trimmed.
431 void trimStart(size_t amount);
434 * Similar to trimStart, but will trim at most amount bytes and returns
435 * the number of bytes trimmed.
437 size_t trimStartAtMost(size_t amount);
440 * Similar to IOBuf::trimEnd, but works on the whole queue. Will
441 * pop off buffers that have been completely trimmed.
443 void trimEnd(size_t amount);
446 * Similar to trimEnd, but will trim at most amount bytes and returns
447 * the number of bytes trimmed.
449 size_t trimEndAtMost(size_t amount);
452 * Transfer ownership of the queue's entire IOBuf chain to the caller.
454 std::unique_ptr<folly::IOBuf> move() {
455 auto guard = updateGuard();
456 std::unique_ptr<folly::IOBuf> res = std::move(head_);
462 * Access the front IOBuf.
464 * Note: caller will see the current state of the chain, but may not see
465 * future updates immediately, due to the presence of a tail cache.
466 * Note: the caller may potentially clone the chain, thus marking all buffers
467 * as shared. We may still continue writing to the tail of the last
468 * IOBuf without checking if it's shared, but this is fine, since the
469 * cloned IOBufs won't reference that data.
471 const folly::IOBuf* front() const {
477 * returns the first IOBuf in the chain and removes it from the chain
479 * @return first IOBuf in the chain or nullptr if none.
481 std::unique_ptr<folly::IOBuf> pop_front();
484 * Total chain length, only valid if cacheLength was specified in the
487 size_t chainLength() const {
488 if (UNLIKELY(!options_.cacheChainLength)) {
489 throw std::invalid_argument("IOBufQueue: chain length not cached");
491 dcheckCacheIntegrity();
492 return chainLength_ + (cachePtr_->cachedRange.first - tailStart_);
496 * Returns true iff the IOBuf chain length is 0.
499 dcheckCacheIntegrity();
501 (head_->empty() && cachePtr_->cachedRange.first == tailStart_);
504 const Options& options() const {
509 * Clear the queue. Note that this does not release the buffers, it
510 * just sets their length to zero; useful if you want to reuse the
511 * same queue without reallocating.
516 * Append the queue to a std::string. Non-destructive.
518 void appendToString(std::string& out) const;
521 * Calls IOBuf::gather() on the head of the queue, if it exists.
523 void gather(uint64_t maxLength);
526 IOBufQueue(IOBufQueue&&) noexcept;
527 IOBufQueue& operator=(IOBufQueue&&);
530 std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow);
532 static const size_t kChainLengthNotCached = (size_t)-1;
534 IOBufQueue(const IOBufQueue&) = delete;
535 IOBufQueue& operator=(const IOBufQueue&) = delete;
539 // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
540 // because doing it unchecked in postallocate() is faster (no (mis)predicted
542 mutable size_t chainLength_{0};
544 * Everything that has been appended but not yet discarded or moved out
545 * Note: anything that needs to operate on a tail should either call
546 * flushCache() or grab updateGuard() (it will flush the cache itself).
548 std::unique_ptr<folly::IOBuf> head_;
550 mutable uint8_t* tailStart_{nullptr};
551 WritableRangeCacheData* cachePtr_{nullptr};
552 WritableRangeCacheData localCache_;
554 void dcheckCacheIntegrity() const {
555 // Tail start should always be less than tail end.
556 DCHECK_LE(tailStart_, cachePtr_->cachedRange.first);
557 DCHECK_LE(cachePtr_->cachedRange.first, cachePtr_->cachedRange.second);
559 cachePtr_->cachedRange.first != nullptr ||
560 cachePtr_->cachedRange.second == nullptr);
562 // There is always an attached cache instance.
563 DCHECK(cachePtr_->attached);
565 // Either cache is empty or it coincides with the tail.
567 cachePtr_->cachedRange.first == nullptr ||
568 (head_ != nullptr && tailStart_ == head_->prev()->writableTail() &&
569 tailStart_ <= cachePtr_->cachedRange.first &&
570 cachePtr_->cachedRange.first >= head_->prev()->writableTail() &&
571 cachePtr_->cachedRange.second ==
572 head_->prev()->writableTail() + head_->prev()->tailroom()));
576 * Populate dest with writable tail range cache.
578 void fillWritableRangeCache(WritableRangeCacheData& dest) {
579 dcheckCacheIntegrity();
580 if (cachePtr_ != &dest) {
581 dest = std::move(*cachePtr_);
587 * Clear current writable tail cache and reset it to localCache_
589 void clearWritableRangeCache() {
592 if (cachePtr_ != &localCache_) {
593 localCache_ = std::move(*cachePtr_);
594 cachePtr_ = &localCache_;
597 DCHECK(cachePtr_ == &localCache_ && localCache_.attached);
601 * Commit any pending changes to the tail of the queue.
603 void flushCache() const {
604 dcheckCacheIntegrity();
606 if (tailStart_ != cachePtr_->cachedRange.first) {
607 auto buf = head_->prev();
609 buf->writableTail() + buf->tailroom(), cachePtr_->cachedRange.second);
610 auto len = cachePtr_->cachedRange.first - tailStart_;
617 // For WritableRangeCache move assignment/construction.
618 void updateCacheRef(WritableRangeCacheData& newRef) {
623 * Update cached writable tail range. Called by updateGuard()
625 void updateWritableTailCache() {
626 if (LIKELY(head_ != nullptr)) {
627 IOBuf* buf = head_->prev();
628 if (LIKELY(!buf->isSharedOne())) {
629 tailStart_ = buf->writableTail();
630 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
631 tailStart_, tailStart_ + buf->tailroom());
635 tailStart_ = nullptr;
636 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>();
639 std::pair<void*, uint64_t>
640 preallocateSlow(uint64_t min, uint64_t newAllocationSize, uint64_t max);