Fix copyright lines
[folly.git] / folly / io / IOBufQueue.h
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/ScopeGuard.h>
20 #include <folly/io/IOBuf.h>
21
22 #include <stdexcept>
23 #include <string>
24
25 namespace folly {
26
27 /**
28  * An IOBufQueue encapsulates a chain of IOBufs and provides
29  * convenience functions to append data to the back of the chain
30  * and remove data from the front.
31  *
32  * You may also prepend data into the headroom of the first buffer in the
33  * chain, if any.
34  */
35 class IOBufQueue {
36  private:
37   /**
38    * This guard should be taken by any method that intends to do any changes
39    * to in data_ (e.g. appending to it).
40    *
41    * It flushes the writable tail cache and refills it on destruction.
42    */
43   auto updateGuard() {
44     flushCache();
45     return folly::makeGuard([this] { updateWritableTailCache(); });
46   }
47
48   struct WritableRangeCacheData {
49     std::pair<uint8_t*, uint8_t*> cachedRange;
50     bool attached{false};
51
52     WritableRangeCacheData() = default;
53
54     WritableRangeCacheData(WritableRangeCacheData&& other)
55         : cachedRange(other.cachedRange), attached(other.attached) {
56       other.cachedRange = {};
57       other.attached = false;
58     }
59     WritableRangeCacheData& operator=(WritableRangeCacheData&& other) {
60       cachedRange = other.cachedRange;
61       attached = other.attached;
62
63       other.cachedRange = {};
64       other.attached = false;
65
66       return *this;
67     }
68
69     WritableRangeCacheData(const WritableRangeCacheData&) = delete;
70     WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete;
71   };
72
73  public:
74   struct Options {
75     Options() : cacheChainLength(false) {}
76     bool cacheChainLength;
77   };
78
79   /**
80    * Commonly used Options, currently the only possible value other than
81    * the default.
82    */
83   static Options cacheChainLength() {
84     Options options;
85     options.cacheChainLength = true;
86     return options;
87   }
88
89   /**
90    * WritableRangeCache represents a cache of current writable tail and provides
91    * cheap and simple interface to append to it that avoids paying the cost of
92    * preallocate/postallocate pair (i.e. indirections and checks).
93    *
94    * The cache is flushed on destruction/copy/move and on non-const accesses to
95    * the underlying IOBufQueue.
96    *
97    * Note: there can be only one active cache for a given IOBufQueue, i.e. when
98    *       you fill a cache object it automatically invalidates other
99    *       cache (if any).
100    */
101   class WritableRangeCache {
102    public:
103     explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) {
104       if (queue_) {
105         fillCache();
106       }
107     }
108
109     /**
110      * Move constructor/assignment can move the cached range, but must update
111      * the reference in IOBufQueue.
112      */
113     WritableRangeCache(WritableRangeCache&& other)
114         : data_(std::move(other.data_)), queue_(other.queue_) {
115       if (data_.attached) {
116         queue_->updateCacheRef(data_);
117       }
118     }
119     WritableRangeCache& operator=(WritableRangeCache&& other) {
120       if (data_.attached) {
121         queue_->clearWritableRangeCache();
122       }
123
124       data_ = std::move(other.data_);
125       queue_ = other.queue_;
126
127       if (data_.attached) {
128         queue_->updateCacheRef(data_);
129       }
130
131       return *this;
132     }
133
134     /**
135      * Copy constructor/assignment cannot copy the cached range.
136      */
137     WritableRangeCache(const WritableRangeCache& other)
138         : queue_(other.queue_) {}
139     WritableRangeCache& operator=(const WritableRangeCache& other) {
140       if (data_.attached) {
141         queue_->clearWritableRangeCache();
142       }
143
144       queue_ = other.queue_;
145
146       return *this;
147     }
148
149     ~WritableRangeCache() {
150       if (data_.attached) {
151         queue_->clearWritableRangeCache();
152       }
153     }
154
155     /**
156      * Reset the underlying IOBufQueue, will flush current cache if present.
157      */
158     void reset(IOBufQueue* q) {
159       if (data_.attached) {
160         queue_->clearWritableRangeCache();
161       }
162
163       queue_ = q;
164
165       if (queue_) {
166         fillCache();
167       }
168     }
169
170     /**
171      * Get a pointer to the underlying IOBufQueue object.
172      */
173     IOBufQueue* queue() {
174       return queue_;
175     }
176
177     /**
178      * Return a pointer to the start of cached writable tail.
179      *
180      * Note: doesn't populate cache.
181      */
182     uint8_t* writableData() {
183       dcheckIntegrity();
184       return data_.cachedRange.first;
185     }
186
187     /**
188      * Return a length of cached writable tail.
189      *
190      * Note: doesn't populate cache.
191      */
192     size_t length() {
193       dcheckIntegrity();
194       return data_.cachedRange.second - data_.cachedRange.first;
195     }
196
197     /**
198      * Mark n bytes as occupied (e.g. postallocate).
199      */
200     void append(size_t n) {
201       dcheckIntegrity();
202       // This can happen only if somebody is misusing the interface.
203       // E.g. calling append after touching IOBufQueue or without checking
204       // the length().
205       if (LIKELY(data_.cachedRange.first != nullptr)) {
206         DCHECK_LE(n, length());
207         data_.cachedRange.first += n;
208       } else {
209         appendSlow(n);
210       }
211     }
212
213     /**
214      * Same as append(n), but avoids checking if there is a cache.
215      * The caller must guarantee that the cache is set (e.g. the caller just
216      * called fillCache or checked that it's not empty).
217      */
218     void appendUnsafe(size_t n) {
219       data_.cachedRange.first += n;
220     }
221
222     /**
223      * Fill the cache of writable tail from the underlying IOBufQueue.
224      */
225     void fillCache() {
226       queue_->fillWritableRangeCache(data_);
227     }
228
229    private:
230     WritableRangeCacheData data_;
231     IOBufQueue* queue_;
232
233     FOLLY_NOINLINE void appendSlow(size_t n) {
234       queue_->postallocate(n);
235     }
236
237     void dcheckIntegrity() {
238       // Tail start should always be less than tail end.
239       DCHECK_LE(
240           (void*)data_.cachedRange.first, (void*)data_.cachedRange.second);
241       DCHECK(
242           data_.cachedRange.first != nullptr ||
243           data_.cachedRange.second == nullptr);
244
245       // Cached range should be always empty if the cache is not attached.
246       DCHECK(
247           data_.attached ||
248           (data_.cachedRange.first == nullptr &&
249            data_.cachedRange.second == nullptr));
250
251       // We cannot be in attached state if the queue_ is not set.
252       DCHECK(queue_ != nullptr || !data_.attached);
253
254       // If we're attached and the cache is not empty, then it should coincide
255       // with the tail buffer.
256       DCHECK(
257           !data_.attached || data_.cachedRange.first == nullptr ||
258           (queue_->head_ != nullptr &&
259            data_.cachedRange.first >= queue_->head_->prev()->writableTail() &&
260            data_.cachedRange.second ==
261                queue_->head_->prev()->writableTail() +
262                    queue_->head_->prev()->tailroom()));
263     }
264   };
265
266   explicit IOBufQueue(const Options& options = Options());
267   ~IOBufQueue();
268
269   /**
270    * Return a space to prepend bytes and the amount of headroom available.
271    */
272   std::pair<void*, uint64_t> headroom();
273
274   /**
275    * Indicate that n bytes from the headroom have been used.
276    */
277   void markPrepended(uint64_t n);
278
279   /**
280    * Prepend an existing range; throws std::overflow_error if not enough
281    * room.
282    */
283   void prepend(const void* buf, uint64_t n);
284
285   /**
286    * Add a buffer or buffer chain to the end of this queue. The
287    * queue takes ownership of buf.
288    *
289    * If pack is true, we try to reduce wastage at the end of this queue
290    * by copying some data from the first buffers in the buf chain (and
291    * releasing the buffers), if possible.  If pack is false, we leave
292    * the chain topology unchanged.
293    */
294   void append(std::unique_ptr<folly::IOBuf>&& buf,
295               bool pack=false);
296
297   /**
298    * Add a queue to the end of this queue. The queue takes ownership of
299    * all buffers from the other queue.
300    */
301   void append(IOBufQueue& other, bool pack=false);
302   void append(IOBufQueue&& other, bool pack=false) {
303     append(other, pack);  // call lvalue reference overload, above
304   }
305
306   /**
307    * Copy len bytes, starting at buf, to the end of this queue.
308    * The caller retains ownership of the source data.
309    */
310   void append(const void* buf, size_t len);
311
312   /**
313    * Copy a string to the end of this queue.
314    * The caller retains ownership of the source data.
315    */
316   void append(StringPiece sp) {
317     append(sp.data(), sp.size());
318   }
319
320   /**
321    * Append a chain of IOBuf objects that point to consecutive regions
322    * within buf.
323    *
324    * Just like IOBuf::wrapBuffer, this should only be used when the caller
325    * knows ahead of time and can ensure that all IOBuf objects that will point
326    * to this buffer will be destroyed before the buffer itself is destroyed;
327    * all other caveats from wrapBuffer also apply.
328    *
329    * Every buffer except for the last will wrap exactly blockSize bytes.
330    * Importantly, this method may be used to wrap buffers larger than 4GB.
331    */
332   void wrapBuffer(const void* buf, size_t len,
333                   uint64_t blockSize=(1U << 31));  // default block size: 2GB
334
335   /**
336    * Obtain a writable block of contiguous bytes at the end of this
337    * queue, allocating more space if necessary.  The amount of space
338    * reserved will be at least min.  If min contiguous space is not
339    * available at the end of the queue, and IOBuf with size newAllocationSize
340    * is appended to the chain and returned.  The actual available space
341    * may be larger than newAllocationSize, but will be truncated to max,
342    * if specified.
343    *
344    * If the caller subsequently writes anything into the returned space,
345    * it must call the postallocate() method.
346    *
347    * @return The starting address of the block and the length in bytes.
348    *
349    * @note The point of the preallocate()/postallocate() mechanism is
350    *       to support I/O APIs such as Thrift's TAsyncSocket::ReadCallback
351    *       that request a buffer from the application and then, in a later
352    *       callback, tell the application how much of the buffer they've
353    *       filled with data.
354    */
355   std::pair<void*,uint64_t> preallocate(
356     uint64_t min, uint64_t newAllocationSize,
357     uint64_t max = std::numeric_limits<uint64_t>::max()) {
358     dcheckCacheIntegrity();
359
360     if (LIKELY(writableTail() != nullptr && tailroom() >= min)) {
361       return std::make_pair(
362           writableTail(), std::min<uint64_t>(max, tailroom()));
363     }
364
365     return preallocateSlow(min, newAllocationSize, max);
366   }
367
368   /**
369    * Tell the queue that the caller has written data into the first n
370    * bytes provided by the previous preallocate() call.
371    *
372    * @note n should be less than or equal to the size returned by
373    *       preallocate().  If n is zero, the caller may skip the call
374    *       to postallocate().  If n is nonzero, the caller must not
375    *       invoke any other non-const methods on this IOBufQueue between
376    *       the call to preallocate and the call to postallocate().
377    */
378   void postallocate(uint64_t n) {
379     dcheckCacheIntegrity();
380     DCHECK_LE(
381         (void*)(cachePtr_->cachedRange.first + n),
382         (void*)cachePtr_->cachedRange.second);
383     cachePtr_->cachedRange.first += n;
384   }
385
386   /**
387    * Obtain a writable block of n contiguous bytes, allocating more space
388    * if necessary, and mark it as used.  The caller can fill it later.
389    */
390   void* allocate(uint64_t n) {
391     void* p = preallocate(n, n).first;
392     postallocate(n);
393     return p;
394   }
395
396   void* writableTail() const {
397     dcheckCacheIntegrity();
398     return cachePtr_->cachedRange.first;
399   }
400
401   size_t tailroom() const {
402     dcheckCacheIntegrity();
403     return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first;
404   }
405
406   /**
407    * Split off the first n bytes of the queue into a separate IOBuf chain,
408    * and transfer ownership of the new chain to the caller.  The IOBufQueue
409    * retains ownership of everything after the split point.
410    *
411    * @warning If the split point lies in the middle of some IOBuf within
412    *          the chain, this function may, as an implementation detail,
413    *          clone that IOBuf.
414    *
415    * @throws std::underflow_error if n exceeds the number of bytes
416    *         in the queue.
417    */
418   std::unique_ptr<folly::IOBuf> split(size_t n) {
419     return split(n, true);
420   }
421
422   /**
423    * Similar to split, but will return the entire queue instead of throwing
424    * if n exceeds the number of bytes in the queue.
425    */
426   std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) {
427     return split(n, false);
428   }
429
430   /**
431    * Similar to IOBuf::trimStart, but works on the whole queue.  Will
432    * pop off buffers that have been completely trimmed.
433    */
434   void trimStart(size_t amount);
435
436   /**
437    * Similar to trimStart, but will trim at most amount bytes and returns
438    * the number of bytes trimmed.
439    */
440   size_t trimStartAtMost(size_t amount);
441
442   /**
443    * Similar to IOBuf::trimEnd, but works on the whole queue.  Will
444    * pop off buffers that have been completely trimmed.
445    */
446   void trimEnd(size_t amount);
447
448   /**
449    * Similar to trimEnd, but will trim at most amount bytes and returns
450    * the number of bytes trimmed.
451    */
452   size_t trimEndAtMost(size_t amount);
453
454   /**
455    * Transfer ownership of the queue's entire IOBuf chain to the caller.
456    */
457   std::unique_ptr<folly::IOBuf> move() {
458     auto guard = updateGuard();
459     std::unique_ptr<folly::IOBuf> res = std::move(head_);
460     chainLength_ = 0;
461     return res;
462   }
463
464   /**
465    * Access the front IOBuf.
466    *
467    * Note: caller will see the current state of the chain, but may not see
468    *       future updates immediately, due to the presence of a tail cache.
469    * Note: the caller may potentially clone the chain, thus marking all buffers
470    *       as shared. We may still continue writing to the tail of the last
471    *       IOBuf without checking if it's shared, but this is fine, since the
472    *       cloned IOBufs won't reference that data.
473    */
474   const folly::IOBuf* front() const {
475     flushCache();
476     return head_.get();
477   }
478
479   /**
480    * returns the first IOBuf in the chain and removes it from the chain
481    *
482    * @return first IOBuf in the chain or nullptr if none.
483    */
484   std::unique_ptr<folly::IOBuf> pop_front();
485
486   /**
487    * Total chain length, only valid if cacheLength was specified in the
488    * constructor.
489    */
490   size_t chainLength() const {
491     if (UNLIKELY(!options_.cacheChainLength)) {
492       throw std::invalid_argument("IOBufQueue: chain length not cached");
493     }
494     dcheckCacheIntegrity();
495     return chainLength_ + (cachePtr_->cachedRange.first - tailStart_);
496   }
497
498   /**
499    * Returns true iff the IOBuf chain length is 0.
500    */
501   bool empty() const {
502     dcheckCacheIntegrity();
503     return !head_ ||
504         (head_->empty() && cachePtr_->cachedRange.first == tailStart_);
505   }
506
507   const Options& options() const {
508     return options_;
509   }
510
511   /**
512    * Clear the queue.  Note that this does not release the buffers, it
513    * just sets their length to zero; useful if you want to reuse the
514    * same queue without reallocating.
515    */
516   void clear();
517
518   /**
519    * Append the queue to a std::string. Non-destructive.
520    */
521   void appendToString(std::string& out) const;
522
523   /**
524    * Calls IOBuf::gather() on the head of the queue, if it exists.
525    */
526   void gather(uint64_t maxLength);
527
528   /** Movable */
529   IOBufQueue(IOBufQueue&&) noexcept;
530   IOBufQueue& operator=(IOBufQueue&&);
531
532  private:
533   std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow);
534
535   static const size_t kChainLengthNotCached = (size_t)-1;
536   /** Not copyable */
537   IOBufQueue(const IOBufQueue&) = delete;
538   IOBufQueue& operator=(const IOBufQueue&) = delete;
539
540   Options options_;
541
542   // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
543   // because doing it unchecked in postallocate() is faster (no (mis)predicted
544   // branch)
545   mutable size_t chainLength_{0};
546   /**
547    * Everything that has been appended but not yet discarded or moved out
548    * Note: anything that needs to operate on a tail should either call
549    * flushCache() or grab updateGuard() (it will flush the cache itself).
550    */
551   std::unique_ptr<folly::IOBuf> head_;
552
553   mutable uint8_t* tailStart_{nullptr};
554   WritableRangeCacheData* cachePtr_{nullptr};
555   WritableRangeCacheData localCache_;
556
557   void dcheckCacheIntegrity() const {
558     // Tail start should always be less than tail end.
559     DCHECK_LE((void*)tailStart_, (void*)cachePtr_->cachedRange.first);
560     DCHECK_LE(
561         (void*)cachePtr_->cachedRange.first,
562         (void*)cachePtr_->cachedRange.second);
563     DCHECK(
564         cachePtr_->cachedRange.first != nullptr ||
565         cachePtr_->cachedRange.second == nullptr);
566
567     // There is always an attached cache instance.
568     DCHECK(cachePtr_->attached);
569
570     // Either cache is empty or it coincides with the tail.
571     DCHECK(
572         cachePtr_->cachedRange.first == nullptr ||
573         (head_ != nullptr && tailStart_ == head_->prev()->writableTail() &&
574          tailStart_ <= cachePtr_->cachedRange.first &&
575          cachePtr_->cachedRange.first >= head_->prev()->writableTail() &&
576          cachePtr_->cachedRange.second ==
577              head_->prev()->writableTail() + head_->prev()->tailroom()));
578   }
579
580   /**
581    * Populate dest with writable tail range cache.
582    */
583   void fillWritableRangeCache(WritableRangeCacheData& dest) {
584     dcheckCacheIntegrity();
585     if (cachePtr_ != &dest) {
586       dest = std::move(*cachePtr_);
587       cachePtr_ = &dest;
588     }
589   }
590
591   /**
592    * Clear current writable tail cache and reset it to localCache_
593    */
594   void clearWritableRangeCache() {
595     flushCache();
596
597     if (cachePtr_ != &localCache_) {
598       localCache_ = std::move(*cachePtr_);
599       cachePtr_ = &localCache_;
600     }
601
602     DCHECK(cachePtr_ == &localCache_ && localCache_.attached);
603   }
604
605   /**
606    * Commit any pending changes to the tail of the queue.
607    */
608   void flushCache() const {
609     dcheckCacheIntegrity();
610
611     if (tailStart_ != cachePtr_->cachedRange.first) {
612       auto buf = head_->prev();
613       DCHECK_EQ(
614           (void*)(buf->writableTail() + buf->tailroom()),
615           (void*)cachePtr_->cachedRange.second);
616       auto len = cachePtr_->cachedRange.first - tailStart_;
617       buf->append(len);
618       chainLength_ += len;
619       tailStart_ += len;
620     }
621   }
622
623   // For WritableRangeCache move assignment/construction.
624   void updateCacheRef(WritableRangeCacheData& newRef) {
625     cachePtr_ = &newRef;
626   }
627
628   /**
629    * Update cached writable tail range. Called by updateGuard()
630    */
631   void updateWritableTailCache() {
632     if (LIKELY(head_ != nullptr)) {
633       IOBuf* buf = head_->prev();
634       if (LIKELY(!buf->isSharedOne())) {
635         tailStart_ = buf->writableTail();
636         cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
637             tailStart_, tailStart_ + buf->tailroom());
638         return;
639       }
640     }
641     tailStart_ = nullptr;
642     cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>();
643   }
644
645   std::pair<void*, uint64_t>
646   preallocateSlow(uint64_t min, uint64_t newAllocationSize, uint64_t max);
647 };
648
649 } // namespace folly