Fix RequestContext held too long issue in EventBase
[folly.git] / folly / io / IOBufQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/ScopeGuard.h>
20 #include <folly/io/IOBuf.h>
21
22 #include <stdexcept>
23 #include <string>
24
25 namespace folly {
26
27 /**
28  * An IOBufQueue encapsulates a chain of IOBufs and provides
29  * convenience functions to append data to the back of the chain
30  * and remove data from the front.
31  *
32  * You may also prepend data into the headroom of the first buffer in the
33  * chain, if any.
34  */
35 class IOBufQueue {
36  private:
37   /**
38    * This guard should be taken by any method that intends to do any changes
39    * to in data_ (e.g. appending to it).
40    *
41    * It flushes the writable tail cache and refills it on destruction.
42    */
43   auto updateGuard() {
44     flushCache();
45     return folly::makeGuard([this] { updateWritableTailCache(); });
46   }
47
48   struct WritableRangeCacheData {
49     std::pair<uint8_t*, uint8_t*> cachedRange;
50     bool attached{false};
51
52     WritableRangeCacheData() = default;
53
54     WritableRangeCacheData(WritableRangeCacheData&& other)
55         : cachedRange(other.cachedRange), attached(other.attached) {
56       other.cachedRange = {};
57       other.attached = false;
58     }
59     WritableRangeCacheData& operator=(WritableRangeCacheData&& other) {
60       cachedRange = other.cachedRange;
61       attached = other.attached;
62
63       other.cachedRange = {};
64       other.attached = false;
65
66       return *this;
67     }
68
69     WritableRangeCacheData(const WritableRangeCacheData&) = delete;
70     WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete;
71   };
72
73  public:
74   struct Options {
75     Options() : cacheChainLength(false) {}
76     bool cacheChainLength;
77   };
78
79   /**
80    * Commonly used Options, currently the only possible value other than
81    * the default.
82    */
83   static Options cacheChainLength() {
84     Options options;
85     options.cacheChainLength = true;
86     return options;
87   }
88
89   /**
90    * WritableRangeCache represents a cache of current writable tail and provides
91    * cheap and simple interface to append to it that avoids paying the cost of
92    * preallocate/postallocate pair (i.e. indirections and checks).
93    *
94    * The cache is flushed on destruction/copy/move and on non-const accesses to
95    * the underlying IOBufQueue.
96    *
97    * Note: there can be only one active cache for a given IOBufQueue, i.e. when
98    *       you fill a cache object it automatically invalidates other
99    *       cache (if any).
100    */
101   class WritableRangeCache {
102    public:
103     explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) {
104       if (queue_) {
105         fillCache();
106       }
107     }
108
109     /**
110      * Move constructor/assignment can move the cached range, but must update
111      * the reference in IOBufQueue.
112      */
113     WritableRangeCache(WritableRangeCache&& other)
114         : data_(std::move(other.data_)), queue_(other.queue_) {
115       if (data_.attached) {
116         queue_->updateCacheRef(data_);
117       }
118     }
119     WritableRangeCache& operator=(WritableRangeCache&& other) {
120       if (data_.attached) {
121         queue_->clearWritableRangeCache();
122       }
123
124       data_ = std::move(other.data_);
125       queue_ = other.queue_;
126
127       if (data_.attached) {
128         queue_->updateCacheRef(data_);
129       }
130
131       return *this;
132     }
133
134     /**
135      * Copy constructor/assignment cannot copy the cached range.
136      */
137     WritableRangeCache(const WritableRangeCache& other)
138         : queue_(other.queue_) {}
139     WritableRangeCache& operator=(const WritableRangeCache& other) {
140       if (data_.attached) {
141         queue_->clearWritableRangeCache();
142       }
143
144       queue_ = other.queue_;
145
146       return *this;
147     }
148
149     ~WritableRangeCache() {
150       if (data_.attached) {
151         queue_->clearWritableRangeCache();
152       }
153     }
154
155     /**
156      * Reset the underlying IOBufQueue, will flush current cache if present.
157      */
158     void reset(IOBufQueue* q) {
159       if (data_.attached) {
160         queue_->clearWritableRangeCache();
161       }
162
163       queue_ = q;
164
165       if (queue_) {
166         fillCache();
167       }
168     }
169
170     /**
171      * Get a pointer to the underlying IOBufQueue object.
172      */
173     IOBufQueue* queue() {
174       return queue_;
175     }
176
177     /**
178      * Return a pointer to the start of cached writable tail.
179      *
180      * Note: doesn't populate cache.
181      */
182     uint8_t* writableData() {
183       dcheckIntegrity();
184       return data_.cachedRange.first;
185     }
186
187     /**
188      * Return a length of cached writable tail.
189      *
190      * Note: doesn't populate cache.
191      */
192     size_t length() {
193       dcheckIntegrity();
194       return data_.cachedRange.second - data_.cachedRange.first;
195     }
196
197     /**
198      * Mark n bytes as occupied (e.g. postallocate).
199      */
200     void append(size_t n) {
201       dcheckIntegrity();
202       // This can happen only if somebody is misusing the interface.
203       // E.g. calling append after touching IOBufQueue or without checking
204       // the length().
205       if (LIKELY(data_.cachedRange.first != nullptr)) {
206         DCHECK_LE(n, length());
207         data_.cachedRange.first += n;
208       } else {
209         appendSlow(n);
210       }
211     }
212
213     /**
214      * Same as append(n), but avoids checking if there is a cache.
215      * The caller must guarantee that the cache is set (e.g. the caller just
216      * called fillCache or checked that it's not empty).
217      */
218     void appendUnsafe(size_t n) {
219       data_.cachedRange.first += n;
220     }
221
222     /**
223      * Fill the cache of writable tail from the underlying IOBufQueue.
224      */
225     void fillCache() {
226       queue_->fillWritableRangeCache(data_);
227     }
228
229    private:
230     WritableRangeCacheData data_;
231     IOBufQueue* queue_;
232
233     FOLLY_NOINLINE void appendSlow(size_t n) {
234       queue_->postallocate(n);
235     }
236
237     void dcheckIntegrity() {
238       // Tail start should always be less than tail end.
239       DCHECK_LE(data_.cachedRange.first, data_.cachedRange.second);
240       DCHECK(
241           data_.cachedRange.first != nullptr ||
242           data_.cachedRange.second == nullptr);
243
244       // Cached range should be always empty if the cache is not attached.
245       DCHECK(
246           data_.attached ||
247           (data_.cachedRange.first == nullptr &&
248            data_.cachedRange.second == nullptr));
249
250       // We cannot be in attached state if the queue_ is not set.
251       DCHECK(queue_ != nullptr || !data_.attached);
252
253       // If we're attached and the cache is not empty, then it should coincide
254       // with the tail buffer.
255       DCHECK(
256           !data_.attached || data_.cachedRange.first == nullptr ||
257           (queue_->head_ != nullptr &&
258            data_.cachedRange.first >= queue_->head_->prev()->writableTail() &&
259            data_.cachedRange.second ==
260                queue_->head_->prev()->writableTail() +
261                    queue_->head_->prev()->tailroom()));
262     }
263   };
264
265   explicit IOBufQueue(const Options& options = Options());
266   ~IOBufQueue();
267
268   /**
269    * Return a space to prepend bytes and the amount of headroom available.
270    */
271   std::pair<void*, uint64_t> headroom();
272
273   /**
274    * Indicate that n bytes from the headroom have been used.
275    */
276   void markPrepended(uint64_t n);
277
278   /**
279    * Prepend an existing range; throws std::overflow_error if not enough
280    * room.
281    */
282   void prepend(const void* buf, uint64_t n);
283
284   /**
285    * Add a buffer or buffer chain to the end of this queue. The
286    * queue takes ownership of buf.
287    *
288    * If pack is true, we try to reduce wastage at the end of this queue
289    * by copying some data from the first buffers in the buf chain (and
290    * releasing the buffers), if possible.  If pack is false, we leave
291    * the chain topology unchanged.
292    */
293   void append(std::unique_ptr<folly::IOBuf>&& buf,
294               bool pack=false);
295
296   /**
297    * Add a queue to the end of this queue. The queue takes ownership of
298    * all buffers from the other queue.
299    */
300   void append(IOBufQueue& other, bool pack=false);
301   void append(IOBufQueue&& other, bool pack=false) {
302     append(other, pack);  // call lvalue reference overload, above
303   }
304
305   /**
306    * Copy len bytes, starting at buf, to the end of this queue.
307    * The caller retains ownership of the source data.
308    */
309   void append(const void* buf, size_t len);
310
311   /**
312    * Copy a string to the end of this queue.
313    * The caller retains ownership of the source data.
314    */
315   void append(StringPiece sp) {
316     append(sp.data(), sp.size());
317   }
318
319   /**
320    * Append a chain of IOBuf objects that point to consecutive regions
321    * within buf.
322    *
323    * Just like IOBuf::wrapBuffer, this should only be used when the caller
324    * knows ahead of time and can ensure that all IOBuf objects that will point
325    * to this buffer will be destroyed before the buffer itself is destroyed;
326    * all other caveats from wrapBuffer also apply.
327    *
328    * Every buffer except for the last will wrap exactly blockSize bytes.
329    * Importantly, this method may be used to wrap buffers larger than 4GB.
330    */
331   void wrapBuffer(const void* buf, size_t len,
332                   uint64_t blockSize=(1U << 31));  // default block size: 2GB
333
334   /**
335    * Obtain a writable block of contiguous bytes at the end of this
336    * queue, allocating more space if necessary.  The amount of space
337    * reserved will be at least min.  If min contiguous space is not
338    * available at the end of the queue, and IOBuf with size newAllocationSize
339    * is appended to the chain and returned.  The actual available space
340    * may be larger than newAllocationSize, but will be truncated to max,
341    * if specified.
342    *
343    * If the caller subsequently writes anything into the returned space,
344    * it must call the postallocate() method.
345    *
346    * @return The starting address of the block and the length in bytes.
347    *
348    * @note The point of the preallocate()/postallocate() mechanism is
349    *       to support I/O APIs such as Thrift's TAsyncSocket::ReadCallback
350    *       that request a buffer from the application and then, in a later
351    *       callback, tell the application how much of the buffer they've
352    *       filled with data.
353    */
354   std::pair<void*,uint64_t> preallocate(
355     uint64_t min, uint64_t newAllocationSize,
356     uint64_t max = std::numeric_limits<uint64_t>::max()) {
357     dcheckCacheIntegrity();
358
359     if (LIKELY(writableTail() != nullptr && tailroom() >= min)) {
360       return std::make_pair(
361           writableTail(), std::min<uint64_t>(max, tailroom()));
362     }
363
364     return preallocateSlow(min, newAllocationSize, max);
365   }
366
367   /**
368    * Tell the queue that the caller has written data into the first n
369    * bytes provided by the previous preallocate() call.
370    *
371    * @note n should be less than or equal to the size returned by
372    *       preallocate().  If n is zero, the caller may skip the call
373    *       to postallocate().  If n is nonzero, the caller must not
374    *       invoke any other non-const methods on this IOBufQueue between
375    *       the call to preallocate and the call to postallocate().
376    */
377   void postallocate(uint64_t n) {
378     dcheckCacheIntegrity();
379     DCHECK_LE(cachePtr_->cachedRange.first + n, cachePtr_->cachedRange.second);
380     cachePtr_->cachedRange.first += n;
381   }
382
383   /**
384    * Obtain a writable block of n contiguous bytes, allocating more space
385    * if necessary, and mark it as used.  The caller can fill it later.
386    */
387   void* allocate(uint64_t n) {
388     void* p = preallocate(n, n).first;
389     postallocate(n);
390     return p;
391   }
392
393   void* writableTail() const {
394     dcheckCacheIntegrity();
395     return cachePtr_->cachedRange.first;
396   }
397
398   size_t tailroom() const {
399     dcheckCacheIntegrity();
400     return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first;
401   }
402
403   /**
404    * Split off the first n bytes of the queue into a separate IOBuf chain,
405    * and transfer ownership of the new chain to the caller.  The IOBufQueue
406    * retains ownership of everything after the split point.
407    *
408    * @warning If the split point lies in the middle of some IOBuf within
409    *          the chain, this function may, as an implementation detail,
410    *          clone that IOBuf.
411    *
412    * @throws std::underflow_error if n exceeds the number of bytes
413    *         in the queue.
414    */
415   std::unique_ptr<folly::IOBuf> split(size_t n) {
416     return split(n, true);
417   }
418
419   /**
420    * Similar to split, but will return the entire queue instead of throwing
421    * if n exceeds the number of bytes in the queue.
422    */
423   std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) {
424     return split(n, false);
425   }
426
427   /**
428    * Similar to IOBuf::trimStart, but works on the whole queue.  Will
429    * pop off buffers that have been completely trimmed.
430    */
431   void trimStart(size_t amount);
432
433   /**
434    * Similar to trimStart, but will trim at most amount bytes and returns
435    * the number of bytes trimmed.
436    */
437   size_t trimStartAtMost(size_t amount);
438
439   /**
440    * Similar to IOBuf::trimEnd, but works on the whole queue.  Will
441    * pop off buffers that have been completely trimmed.
442    */
443   void trimEnd(size_t amount);
444
445   /**
446    * Similar to trimEnd, but will trim at most amount bytes and returns
447    * the number of bytes trimmed.
448    */
449   size_t trimEndAtMost(size_t amount);
450
451   /**
452    * Transfer ownership of the queue's entire IOBuf chain to the caller.
453    */
454   std::unique_ptr<folly::IOBuf> move() {
455     auto guard = updateGuard();
456     std::unique_ptr<folly::IOBuf> res = std::move(head_);
457     chainLength_ = 0;
458     return res;
459   }
460
461   /**
462    * Access the front IOBuf.
463    *
464    * Note: caller will see the current state of the chain, but may not see
465    *       future updates immediately, due to the presence of a tail cache.
466    * Note: the caller may potentially clone the chain, thus marking all buffers
467    *       as shared. We may still continue writing to the tail of the last
468    *       IOBuf without checking if it's shared, but this is fine, since the
469    *       cloned IOBufs won't reference that data.
470    */
471   const folly::IOBuf* front() const {
472     flushCache();
473     return head_.get();
474   }
475
476   /**
477    * returns the first IOBuf in the chain and removes it from the chain
478    *
479    * @return first IOBuf in the chain or nullptr if none.
480    */
481   std::unique_ptr<folly::IOBuf> pop_front();
482
483   /**
484    * Total chain length, only valid if cacheLength was specified in the
485    * constructor.
486    */
487   size_t chainLength() const {
488     if (UNLIKELY(!options_.cacheChainLength)) {
489       throw std::invalid_argument("IOBufQueue: chain length not cached");
490     }
491     dcheckCacheIntegrity();
492     return chainLength_ + (cachePtr_->cachedRange.first - tailStart_);
493   }
494
495   /**
496    * Returns true iff the IOBuf chain length is 0.
497    */
498   bool empty() const {
499     dcheckCacheIntegrity();
500     return !head_ ||
501         (head_->empty() && cachePtr_->cachedRange.first == tailStart_);
502   }
503
504   const Options& options() const {
505     return options_;
506   }
507
508   /**
509    * Clear the queue.  Note that this does not release the buffers, it
510    * just sets their length to zero; useful if you want to reuse the
511    * same queue without reallocating.
512    */
513   void clear();
514
515   /**
516    * Append the queue to a std::string. Non-destructive.
517    */
518   void appendToString(std::string& out) const;
519
520   /**
521    * Calls IOBuf::gather() on the head of the queue, if it exists.
522    */
523   void gather(uint64_t maxLength);
524
525   /** Movable */
526   IOBufQueue(IOBufQueue&&) noexcept;
527   IOBufQueue& operator=(IOBufQueue&&);
528
529  private:
530   std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow);
531
532   static const size_t kChainLengthNotCached = (size_t)-1;
533   /** Not copyable */
534   IOBufQueue(const IOBufQueue&) = delete;
535   IOBufQueue& operator=(const IOBufQueue&) = delete;
536
537   Options options_;
538
539   // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
540   // because doing it unchecked in postallocate() is faster (no (mis)predicted
541   // branch)
542   mutable size_t chainLength_{0};
543   /**
544    * Everything that has been appended but not yet discarded or moved out
545    * Note: anything that needs to operate on a tail should either call
546    * flushCache() or grab updateGuard() (it will flush the cache itself).
547    */
548   std::unique_ptr<folly::IOBuf> head_;
549
550   mutable uint8_t* tailStart_{nullptr};
551   WritableRangeCacheData* cachePtr_{nullptr};
552   WritableRangeCacheData localCache_;
553
554   void dcheckCacheIntegrity() const {
555     // Tail start should always be less than tail end.
556     DCHECK_LE(tailStart_, cachePtr_->cachedRange.first);
557     DCHECK_LE(cachePtr_->cachedRange.first, cachePtr_->cachedRange.second);
558     DCHECK(
559         cachePtr_->cachedRange.first != nullptr ||
560         cachePtr_->cachedRange.second == nullptr);
561
562     // There is always an attached cache instance.
563     DCHECK(cachePtr_->attached);
564
565     // Either cache is empty or it coincides with the tail.
566     DCHECK(
567         cachePtr_->cachedRange.first == nullptr ||
568         (head_ != nullptr && tailStart_ == head_->prev()->writableTail() &&
569          tailStart_ <= cachePtr_->cachedRange.first &&
570          cachePtr_->cachedRange.first >= head_->prev()->writableTail() &&
571          cachePtr_->cachedRange.second ==
572              head_->prev()->writableTail() + head_->prev()->tailroom()));
573   }
574
575   /**
576    * Populate dest with writable tail range cache.
577    */
578   void fillWritableRangeCache(WritableRangeCacheData& dest) {
579     dcheckCacheIntegrity();
580     if (cachePtr_ != &dest) {
581       dest = std::move(*cachePtr_);
582       cachePtr_ = &dest;
583     }
584   }
585
586   /**
587    * Clear current writable tail cache and reset it to localCache_
588    */
589   void clearWritableRangeCache() {
590     flushCache();
591
592     if (cachePtr_ != &localCache_) {
593       localCache_ = std::move(*cachePtr_);
594       cachePtr_ = &localCache_;
595     }
596
597     DCHECK(cachePtr_ == &localCache_ && localCache_.attached);
598   }
599
600   /**
601    * Commit any pending changes to the tail of the queue.
602    */
603   void flushCache() const {
604     dcheckCacheIntegrity();
605
606     if (tailStart_ != cachePtr_->cachedRange.first) {
607       auto buf = head_->prev();
608       DCHECK_EQ(
609           buf->writableTail() + buf->tailroom(), cachePtr_->cachedRange.second);
610       auto len = cachePtr_->cachedRange.first - tailStart_;
611       buf->append(len);
612       chainLength_ += len;
613       tailStart_ += len;
614     }
615   }
616
617   // For WritableRangeCache move assignment/construction.
618   void updateCacheRef(WritableRangeCacheData& newRef) {
619     cachePtr_ = &newRef;
620   }
621
622   /**
623    * Update cached writable tail range. Called by updateGuard()
624    */
625   void updateWritableTailCache() {
626     if (LIKELY(head_ != nullptr)) {
627       IOBuf* buf = head_->prev();
628       if (LIKELY(!buf->isSharedOne())) {
629         tailStart_ = buf->writableTail();
630         cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
631             tailStart_, tailStart_ + buf->tailroom());
632         return;
633       }
634     }
635     tailStart_ = nullptr;
636     cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>();
637   }
638
639   std::pair<void*, uint64_t>
640   preallocateSlow(uint64_t min, uint64_t newAllocationSize, uint64_t max);
641 };
642
643 } // namespace folly