Improve QueueAppender/IOBufQueue performance
[folly.git] / folly / io / IOBufQueue.h
index c801920d34f793cd9e60608af254207652657564..5c627bc9f41e72f4bf3f4e51408dd096d4d67ae7 100644 (file)
@@ -16,6 +16,7 @@
 
 #pragma once
 
+#include <folly/ScopeGuard.h>
 #include <folly/io/IOBuf.h>
 
 #include <stdexcept>
@@ -32,9 +33,46 @@ namespace folly {
  * chain, if any.
  */
 class IOBufQueue {
+ private:
+  /**
+   * This guard should be taken by any method that intends to do any changes
+   * to in data_ (e.g. appending to it).
+   *
+   * It flushes the writable tail cache and refills it on destruction.
+   */
+  auto updateGuard() {
+    flushCache();
+    return folly::makeGuard([this] { updateWritableTailCache(); });
+  }
+
+  struct WritableRangeCacheData {
+    std::pair<uint8_t*, uint8_t*> cachedRange;
+    bool attached{false};
+
+    WritableRangeCacheData() = default;
+
+    WritableRangeCacheData(WritableRangeCacheData&& other)
+        : cachedRange(other.cachedRange), attached(other.attached) {
+      other.cachedRange = {};
+      other.attached = false;
+    }
+    WritableRangeCacheData& operator=(WritableRangeCacheData&& other) {
+      cachedRange = other.cachedRange;
+      attached = other.attached;
+
+      other.cachedRange = {};
+      other.attached = false;
+
+      return *this;
+    }
+
+    WritableRangeCacheData(const WritableRangeCacheData&) = delete;
+    WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete;
+  };
+
  public:
   struct Options {
-    Options() : cacheChainLength(false) { }
+    Options() : cacheChainLength(false) {}
     bool cacheChainLength;
   };
 
@@ -48,7 +86,184 @@ class IOBufQueue {
     return options;
   }
 
+  /**
+   * WritableRangeCache represents a cache of current writable tail and provides
+   * cheap and simple interface to append to it that avoids paying the cost of
+   * preallocate/postallocate pair (i.e. indirections and checks).
+   *
+   * The cache is flushed on destruction/copy/move and on non-const accesses to
+   * the underlying IOBufQueue.
+   *
+   * Note: there can be only one active cache for a given IOBufQueue, i.e. when
+   *       you fill a cache object it automatically invalidates other
+   *       cache (if any).
+   */
+  class WritableRangeCache {
+   public:
+    explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) {
+      if (queue_) {
+        fillCache();
+      }
+    }
+
+    /**
+     * Move constructor/assignment can move the cached range, but must update
+     * the reference in IOBufQueue.
+     */
+    WritableRangeCache(WritableRangeCache&& other)
+        : data_(std::move(other.data_)), queue_(other.queue_) {
+      if (data_.attached) {
+        queue_->updateCacheRef(data_);
+      }
+    }
+    WritableRangeCache& operator=(WritableRangeCache&& other) {
+      if (data_.attached) {
+        queue_->clearWritableRangeCache();
+      }
+
+      data_ = std::move(other.data_);
+      queue_ = other.queue_;
+
+      if (data_.attached) {
+        queue_->updateCacheRef(data_);
+      }
+
+      return *this;
+    }
+
+    /**
+     * Copy constructor/assignment cannot copy the cached range.
+     */
+    WritableRangeCache(const WritableRangeCache& other)
+        : queue_(other.queue_) {}
+    WritableRangeCache& operator=(const WritableRangeCache& other) {
+      if (data_.attached) {
+        queue_->clearWritableRangeCache();
+      }
+
+      queue_ = other.queue_;
+
+      return *this;
+    }
+
+    ~WritableRangeCache() {
+      if (data_.attached) {
+        queue_->clearWritableRangeCache();
+      }
+    }
+
+    /**
+     * Reset the underlying IOBufQueue, will flush current cache if present.
+     */
+    void reset(IOBufQueue* q) {
+      if (data_.attached) {
+        queue_->clearWritableRangeCache();
+      }
+
+      queue_ = q;
+
+      if (queue_) {
+        fillCache();
+      }
+    }
+
+    /**
+     * Get a pointer to the underlying IOBufQueue object.
+     */
+    IOBufQueue* queue() {
+      return queue_;
+    }
+
+    /**
+     * Return a pointer to the start of cached writable tail.
+     *
+     * Note: doesn't populate cache.
+     */
+    uint8_t* writableData() {
+      dcheckIntegrity();
+      return data_.cachedRange.first;
+    }
+
+    /**
+     * Return a length of cached writable tail.
+     *
+     * Note: doesn't populate cache.
+     */
+    size_t length() {
+      dcheckIntegrity();
+      return data_.cachedRange.second - data_.cachedRange.first;
+    }
+
+    /**
+     * Mark n bytes as occupied (e.g. postallocate).
+     */
+    void append(size_t n) {
+      dcheckIntegrity();
+      // This can happen only if somebody is misusing the interface.
+      // E.g. calling append after touching IOBufQueue or without checking
+      // the length().
+      if (LIKELY(data_.cachedRange.first != nullptr)) {
+        DCHECK_LE(n, length());
+        data_.cachedRange.first += n;
+      } else {
+        appendSlow(n);
+      }
+    }
+
+    /**
+     * Same as append(n), but avoids checking if there is a cache.
+     * The caller must guarantee that the cache is set (e.g. the caller just
+     * called fillCache or checked that it's not empty).
+     */
+    void appendUnsafe(size_t n) {
+      data_.cachedRange.first += n;
+    }
+
+    /**
+     * Fill the cache of writable tail from the underlying IOBufQueue.
+     */
+    void fillCache() {
+      queue_->fillWritableRangeCache(data_);
+    }
+
+   private:
+    WritableRangeCacheData data_;
+    IOBufQueue* queue_;
+
+    FOLLY_NOINLINE void appendSlow(size_t n) {
+      queue_->postallocate(n);
+    }
+
+    void dcheckIntegrity() {
+      // Tail start should always be less than tail end.
+      DCHECK_LE(data_.cachedRange.first, data_.cachedRange.second);
+      DCHECK(
+          data_.cachedRange.first != nullptr ||
+          data_.cachedRange.second == nullptr);
+
+      // Cached range should be always empty if the cache is not attached.
+      DCHECK(
+          data_.attached ||
+          (data_.cachedRange.first == nullptr &&
+           data_.cachedRange.second == nullptr));
+
+      // We cannot be in attached state if the queue_ is not set.
+      DCHECK(queue_ != nullptr || !data_.attached);
+
+      // If we're attached and the cache is not empty, then it should coincide
+      // with the tail buffer.
+      DCHECK(
+          !data_.attached || data_.cachedRange.first == nullptr ||
+          (queue_->head_ != nullptr &&
+           data_.cachedRange.first >= queue_->head_->prev()->writableTail() &&
+           data_.cachedRange.second ==
+               queue_->head_->prev()->writableTail() +
+                   queue_->head_->prev()->tailroom()));
+    }
+  };
+
   explicit IOBufQueue(const Options& options = Options());
+  ~IOBufQueue();
 
   /**
    * Return a space to prepend bytes and the amount of headroom available.
@@ -139,10 +354,11 @@ class IOBufQueue {
   std::pair<void*,uint64_t> preallocate(
     uint64_t min, uint64_t newAllocationSize,
     uint64_t max = std::numeric_limits<uint64_t>::max()) {
-    auto buf = tailBuf();
-    if (LIKELY(buf && buf->tailroom() >= min)) {
-      return std::make_pair(buf->writableTail(),
-                            std::min(max, buf->tailroom()));
+    dcheckCacheIntegrity();
+
+    if (LIKELY(writableTail() != nullptr && tailroom() >= min)) {
+      return std::make_pair(
+          writableTail(), std::min<uint64_t>(max, tailroom()));
     }
 
     return preallocateSlow(min, newAllocationSize, max);
@@ -159,8 +375,9 @@ class IOBufQueue {
    *       the call to preallocate and the call to postallocate().
    */
   void postallocate(uint64_t n) {
-    head_->prev()->append(n);
-    chainLength_ += n;
+    dcheckCacheIntegrity();
+    DCHECK_LE(cachePtr_->cachedRange.first + n, cachePtr_->cachedRange.second);
+    cachePtr_->cachedRange.first += n;
   }
 
   /**
@@ -174,13 +391,13 @@ class IOBufQueue {
   }
 
   void* writableTail() const {
-    auto buf = tailBuf();
-    return buf ? buf->writableTail() : nullptr;
+    dcheckCacheIntegrity();
+    return cachePtr_->cachedRange.first;
   }
 
   size_t tailroom() const {
-    auto buf = tailBuf();
-    return buf ? buf->tailroom() : 0;
+    dcheckCacheIntegrity();
+    return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first;
   }
 
   /**
@@ -235,14 +452,24 @@ class IOBufQueue {
    * Transfer ownership of the queue's entire IOBuf chain to the caller.
    */
   std::unique_ptr<folly::IOBuf> move() {
+    auto guard = updateGuard();
+    std::unique_ptr<folly::IOBuf> res = std::move(head_);
     chainLength_ = 0;
-    return std::move(head_);
+    return res;
   }
 
   /**
-   * Access
+   * Access the front IOBuf.
+   *
+   * Note: caller will see the current state of the chain, but may not see
+   *       future updates immediately, due to the presence of a tail cache.
+   * Note: the caller may potentially clone the chain, thus marking all buffers
+   *       as shared. We may still continue writing to the tail of the last
+   *       IOBuf without checking if it's shared, but this is fine, since the
+   *       cloned IOBufs won't reference that data.
    */
   const folly::IOBuf* front() const {
+    flushCache();
     return head_.get();
   }
 
@@ -261,14 +488,17 @@ class IOBufQueue {
     if (UNLIKELY(!options_.cacheChainLength)) {
       throw std::invalid_argument("IOBufQueue: chain length not cached");
     }
-    return chainLength_;
+    dcheckCacheIntegrity();
+    return chainLength_ + (cachePtr_->cachedRange.first - tailStart_);
   }
 
   /**
    * Returns true iff the IOBuf chain length is 0.
    */
   bool empty() const {
-    return !head_ || head_->empty();
+    dcheckCacheIntegrity();
+    return !head_ ||
+        (head_->empty() && cachePtr_->cachedRange.first == tailStart_);
   }
 
   const Options& options() const {
@@ -297,16 +527,6 @@ class IOBufQueue {
   IOBufQueue& operator=(IOBufQueue&&);
 
  private:
-  IOBuf* tailBuf() const {
-    if (UNLIKELY(!head_)) {
-      return nullptr;
-    }
-    IOBuf* buf = head_->prev();
-    return LIKELY(!buf->isSharedOne()) ? buf : nullptr;
-  }
-  std::pair<void*,uint64_t> preallocateSlow(
-    uint64_t min, uint64_t newAllocationSize, uint64_t max);
-
   std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow);
 
   static const size_t kChainLengthNotCached = (size_t)-1;
@@ -319,9 +539,105 @@ class IOBufQueue {
   // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
   // because doing it unchecked in postallocate() is faster (no (mis)predicted
   // branch)
-  size_t chainLength_;
-  /** Everything that has been appended but not yet discarded or moved out */
+  mutable size_t chainLength_{0};
+  /**
+   * Everything that has been appended but not yet discarded or moved out
+   * Note: anything that needs to operate on a tail should either call
+   * flushCache() or grab updateGuard() (it will flush the cache itself).
+   */
   std::unique_ptr<folly::IOBuf> head_;
+
+  mutable uint8_t* tailStart_{nullptr};
+  WritableRangeCacheData* cachePtr_{nullptr};
+  WritableRangeCacheData localCache_;
+
+  void dcheckCacheIntegrity() const {
+    // Tail start should always be less than tail end.
+    DCHECK_LE(tailStart_, cachePtr_->cachedRange.first);
+    DCHECK_LE(cachePtr_->cachedRange.first, cachePtr_->cachedRange.second);
+    DCHECK(
+        cachePtr_->cachedRange.first != nullptr ||
+        cachePtr_->cachedRange.second == nullptr);
+
+    // There is always an attached cache instance.
+    DCHECK(cachePtr_->attached);
+
+    // Either cache is empty or it coincides with the tail.
+    DCHECK(
+        cachePtr_->cachedRange.first == nullptr ||
+        (head_ != nullptr && tailStart_ == head_->prev()->writableTail() &&
+         tailStart_ <= cachePtr_->cachedRange.first &&
+         cachePtr_->cachedRange.first >= head_->prev()->writableTail() &&
+         cachePtr_->cachedRange.second ==
+             head_->prev()->writableTail() + head_->prev()->tailroom()));
+  }
+
+  /**
+   * Populate dest with writable tail range cache.
+   */
+  void fillWritableRangeCache(WritableRangeCacheData& dest) {
+    dcheckCacheIntegrity();
+    if (cachePtr_ != &dest) {
+      dest = std::move(*cachePtr_);
+      cachePtr_ = &dest;
+    }
+  }
+
+  /**
+   * Clear current writable tail cache and reset it to localCache_
+   */
+  void clearWritableRangeCache() {
+    flushCache();
+
+    if (cachePtr_ != &localCache_) {
+      localCache_ = std::move(*cachePtr_);
+      cachePtr_ = &localCache_;
+    }
+
+    DCHECK(cachePtr_ == &localCache_ && localCache_.attached);
+  }
+
+  /**
+   * Commit any pending changes to the tail of the queue.
+   */
+  void flushCache() const {
+    dcheckCacheIntegrity();
+
+    if (tailStart_ != cachePtr_->cachedRange.first) {
+      auto buf = head_->prev();
+      DCHECK_EQ(
+          buf->writableTail() + buf->tailroom(), cachePtr_->cachedRange.second);
+      auto len = cachePtr_->cachedRange.first - tailStart_;
+      buf->append(len);
+      chainLength_ += len;
+      tailStart_ += len;
+    }
+  }
+
+  // For WritableRangeCache move assignment/construction.
+  void updateCacheRef(WritableRangeCacheData& newRef) {
+    cachePtr_ = &newRef;
+  }
+
+  /**
+   * Update cached writable tail range. Called by updateGuard()
+   */
+  void updateWritableTailCache() {
+    if (LIKELY(head_ != nullptr)) {
+      IOBuf* buf = head_->prev();
+      if (LIKELY(!buf->isSharedOne())) {
+        tailStart_ = buf->writableTail();
+        cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
+            tailStart_, tailStart_ + buf->tailroom());
+        return;
+      }
+    }
+    tailStart_ = nullptr;
+    cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>();
+  }
+
+  std::pair<void*, uint64_t>
+  preallocateSlow(uint64_t min, uint64_t newAllocationSize, uint64_t max);
 };
 
 } // namespace folly