Improve QueueAppender/IOBufQueue performance
[folly.git] / folly / io / IOBufQueue.h
index f499492e9049a0193fa08eada52d7fa1d863a043..5c627bc9f41e72f4bf3f4e51408dd096d4d67ae7 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#ifndef FOLLY_IO_IOBUF_QUEUE_H
-#define FOLLY_IO_IOBUF_QUEUE_H
+#pragma once
 
-#include "folly/io/IOBuf.h"
+#include <folly/ScopeGuard.h>
+#include <folly/io/IOBuf.h>
 
 #include <stdexcept>
 #include <string>
@@ -33,9 +33,46 @@ namespace folly {
  * chain, if any.
  */
 class IOBufQueue {
+ private:
+  /**
+   * This guard should be taken by any method that intends to do any changes
+   * to in data_ (e.g. appending to it).
+   *
+   * It flushes the writable tail cache and refills it on destruction.
+   */
+  auto updateGuard() {
+    flushCache();
+    return folly::makeGuard([this] { updateWritableTailCache(); });
+  }
+
+  struct WritableRangeCacheData {
+    std::pair<uint8_t*, uint8_t*> cachedRange;
+    bool attached{false};
+
+    WritableRangeCacheData() = default;
+
+    WritableRangeCacheData(WritableRangeCacheData&& other)
+        : cachedRange(other.cachedRange), attached(other.attached) {
+      other.cachedRange = {};
+      other.attached = false;
+    }
+    WritableRangeCacheData& operator=(WritableRangeCacheData&& other) {
+      cachedRange = other.cachedRange;
+      attached = other.attached;
+
+      other.cachedRange = {};
+      other.attached = false;
+
+      return *this;
+    }
+
+    WritableRangeCacheData(const WritableRangeCacheData&) = delete;
+    WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete;
+  };
+
  public:
   struct Options {
-    Options() : cacheChainLength(false) { }
+    Options() : cacheChainLength(false) {}
     bool cacheChainLength;
   };
 
@@ -49,23 +86,200 @@ class IOBufQueue {
     return options;
   }
 
+  /**
+   * WritableRangeCache represents a cache of current writable tail and provides
+   * cheap and simple interface to append to it that avoids paying the cost of
+   * preallocate/postallocate pair (i.e. indirections and checks).
+   *
+   * The cache is flushed on destruction/copy/move and on non-const accesses to
+   * the underlying IOBufQueue.
+   *
+   * Note: there can be only one active cache for a given IOBufQueue, i.e. when
+   *       you fill a cache object it automatically invalidates other
+   *       cache (if any).
+   */
+  class WritableRangeCache {
+   public:
+    explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) {
+      if (queue_) {
+        fillCache();
+      }
+    }
+
+    /**
+     * Move constructor/assignment can move the cached range, but must update
+     * the reference in IOBufQueue.
+     */
+    WritableRangeCache(WritableRangeCache&& other)
+        : data_(std::move(other.data_)), queue_(other.queue_) {
+      if (data_.attached) {
+        queue_->updateCacheRef(data_);
+      }
+    }
+    WritableRangeCache& operator=(WritableRangeCache&& other) {
+      if (data_.attached) {
+        queue_->clearWritableRangeCache();
+      }
+
+      data_ = std::move(other.data_);
+      queue_ = other.queue_;
+
+      if (data_.attached) {
+        queue_->updateCacheRef(data_);
+      }
+
+      return *this;
+    }
+
+    /**
+     * Copy constructor/assignment cannot copy the cached range.
+     */
+    WritableRangeCache(const WritableRangeCache& other)
+        : queue_(other.queue_) {}
+    WritableRangeCache& operator=(const WritableRangeCache& other) {
+      if (data_.attached) {
+        queue_->clearWritableRangeCache();
+      }
+
+      queue_ = other.queue_;
+
+      return *this;
+    }
+
+    ~WritableRangeCache() {
+      if (data_.attached) {
+        queue_->clearWritableRangeCache();
+      }
+    }
+
+    /**
+     * Reset the underlying IOBufQueue, will flush current cache if present.
+     */
+    void reset(IOBufQueue* q) {
+      if (data_.attached) {
+        queue_->clearWritableRangeCache();
+      }
+
+      queue_ = q;
+
+      if (queue_) {
+        fillCache();
+      }
+    }
+
+    /**
+     * Get a pointer to the underlying IOBufQueue object.
+     */
+    IOBufQueue* queue() {
+      return queue_;
+    }
+
+    /**
+     * Return a pointer to the start of cached writable tail.
+     *
+     * Note: doesn't populate cache.
+     */
+    uint8_t* writableData() {
+      dcheckIntegrity();
+      return data_.cachedRange.first;
+    }
+
+    /**
+     * Return a length of cached writable tail.
+     *
+     * Note: doesn't populate cache.
+     */
+    size_t length() {
+      dcheckIntegrity();
+      return data_.cachedRange.second - data_.cachedRange.first;
+    }
+
+    /**
+     * Mark n bytes as occupied (e.g. postallocate).
+     */
+    void append(size_t n) {
+      dcheckIntegrity();
+      // This can happen only if somebody is misusing the interface.
+      // E.g. calling append after touching IOBufQueue or without checking
+      // the length().
+      if (LIKELY(data_.cachedRange.first != nullptr)) {
+        DCHECK_LE(n, length());
+        data_.cachedRange.first += n;
+      } else {
+        appendSlow(n);
+      }
+    }
+
+    /**
+     * Same as append(n), but avoids checking if there is a cache.
+     * The caller must guarantee that the cache is set (e.g. the caller just
+     * called fillCache or checked that it's not empty).
+     */
+    void appendUnsafe(size_t n) {
+      data_.cachedRange.first += n;
+    }
+
+    /**
+     * Fill the cache of writable tail from the underlying IOBufQueue.
+     */
+    void fillCache() {
+      queue_->fillWritableRangeCache(data_);
+    }
+
+   private:
+    WritableRangeCacheData data_;
+    IOBufQueue* queue_;
+
+    FOLLY_NOINLINE void appendSlow(size_t n) {
+      queue_->postallocate(n);
+    }
+
+    void dcheckIntegrity() {
+      // Tail start should always be less than tail end.
+      DCHECK_LE(data_.cachedRange.first, data_.cachedRange.second);
+      DCHECK(
+          data_.cachedRange.first != nullptr ||
+          data_.cachedRange.second == nullptr);
+
+      // Cached range should be always empty if the cache is not attached.
+      DCHECK(
+          data_.attached ||
+          (data_.cachedRange.first == nullptr &&
+           data_.cachedRange.second == nullptr));
+
+      // We cannot be in attached state if the queue_ is not set.
+      DCHECK(queue_ != nullptr || !data_.attached);
+
+      // If we're attached and the cache is not empty, then it should coincide
+      // with the tail buffer.
+      DCHECK(
+          !data_.attached || data_.cachedRange.first == nullptr ||
+          (queue_->head_ != nullptr &&
+           data_.cachedRange.first >= queue_->head_->prev()->writableTail() &&
+           data_.cachedRange.second ==
+               queue_->head_->prev()->writableTail() +
+                   queue_->head_->prev()->tailroom()));
+    }
+  };
+
   explicit IOBufQueue(const Options& options = Options());
+  ~IOBufQueue();
 
   /**
    * Return a space to prepend bytes and the amount of headroom available.
    */
-  std::pair<void*, uint32_t> headroom();
+  std::pair<void*, uint64_t> headroom();
 
   /**
    * Indicate that n bytes from the headroom have been used.
    */
-  void markPrepended(uint32_t n);
+  void markPrepended(uint64_t n);
 
   /**
    * Prepend an existing range; throws std::overflow_error if not enough
    * room.
    */
-  void prepend(const void* buf, uint32_t n);
+  void prepend(const void* buf, uint64_t n);
 
   /**
    * Add a buffer or buffer chain to the end of this queue. The
@@ -98,8 +312,8 @@ class IOBufQueue {
    * Copy a string to the end of this queue.
    * The caller retains ownership of the source data.
    */
-  void append(const std::string& buf) {
-    append(buf.data(), buf.length());
+  void append(StringPiece sp) {
+    append(sp.data(), sp.size());
   }
 
   /**
@@ -115,7 +329,7 @@ class IOBufQueue {
    * Importantly, this method may be used to wrap buffers larger than 4GB.
    */
   void wrapBuffer(const void* buf, size_t len,
-                  uint32_t blockSize=(1U << 31));  // default block size: 2GB
+                  uint64_t blockSize=(1U << 31));  // default block size: 2GB
 
   /**
    * Obtain a writable block of contiguous bytes at the end of this
@@ -137,13 +351,14 @@ class IOBufQueue {
    *       callback, tell the application how much of the buffer they've
    *       filled with data.
    */
-  std::pair<void*,uint32_t> preallocate(
-    uint32_t min, uint32_t newAllocationSize,
-    uint32_t max = std::numeric_limits<uint32_t>::max()) {
-    auto buf = tailBuf();
-    if (LIKELY(buf && buf->tailroom() >= min)) {
-      return std::make_pair(buf->writableTail(),
-                            std::min(max, buf->tailroom()));
+  std::pair<void*,uint64_t> preallocate(
+    uint64_t min, uint64_t newAllocationSize,
+    uint64_t max = std::numeric_limits<uint64_t>::max()) {
+    dcheckCacheIntegrity();
+
+    if (LIKELY(writableTail() != nullptr && tailroom() >= min)) {
+      return std::make_pair(
+          writableTail(), std::min<uint64_t>(max, tailroom()));
     }
 
     return preallocateSlow(min, newAllocationSize, max);
@@ -159,29 +374,30 @@ class IOBufQueue {
    *       invoke any other non-const methods on this IOBufQueue between
    *       the call to preallocate and the call to postallocate().
    */
-  void postallocate(uint32_t n) {
-    head_->prev()->append(n);
-    chainLength_ += n;
+  void postallocate(uint64_t n) {
+    dcheckCacheIntegrity();
+    DCHECK_LE(cachePtr_->cachedRange.first + n, cachePtr_->cachedRange.second);
+    cachePtr_->cachedRange.first += n;
   }
 
   /**
    * Obtain a writable block of n contiguous bytes, allocating more space
    * if necessary, and mark it as used.  The caller can fill it later.
    */
-  void* allocate(uint32_t n) {
+  void* allocate(uint64_t n) {
     void* p = preallocate(n, n).first;
     postallocate(n);
     return p;
   }
 
   void* writableTail() const {
-    auto buf = tailBuf();
-    return buf ? buf->writableTail() : nullptr;
+    dcheckCacheIntegrity();
+    return cachePtr_->cachedRange.first;
   }
 
   size_t tailroom() const {
-    auto buf = tailBuf();
-    return buf ? buf->tailroom() : 0;
+    dcheckCacheIntegrity();
+    return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first;
   }
 
   /**
@@ -196,7 +412,17 @@ class IOBufQueue {
    * @throws std::underflow_error if n exceeds the number of bytes
    *         in the queue.
    */
-  std::unique_ptr<folly::IOBuf> split(size_t n);
+  std::unique_ptr<folly::IOBuf> split(size_t n) {
+    return split(n, true);
+  }
+
+  /**
+   * Similar to split, but will return the entire queue instead of throwing
+   * if n exceeds the number of bytes in the queue.
+   */
+  std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) {
+    return split(n, false);
+  }
 
   /**
    * Similar to IOBuf::trimStart, but works on the whole queue.  Will
@@ -204,24 +430,46 @@ class IOBufQueue {
    */
   void trimStart(size_t amount);
 
+  /**
+   * Similar to trimStart, but will trim at most amount bytes and returns
+   * the number of bytes trimmed.
+   */
+  size_t trimStartAtMost(size_t amount);
+
   /**
    * Similar to IOBuf::trimEnd, but works on the whole queue.  Will
    * pop off buffers that have been completely trimmed.
    */
   void trimEnd(size_t amount);
 
+  /**
+   * Similar to trimEnd, but will trim at most amount bytes and returns
+   * the number of bytes trimmed.
+   */
+  size_t trimEndAtMost(size_t amount);
+
   /**
    * Transfer ownership of the queue's entire IOBuf chain to the caller.
    */
   std::unique_ptr<folly::IOBuf> move() {
+    auto guard = updateGuard();
+    std::unique_ptr<folly::IOBuf> res = std::move(head_);
     chainLength_ = 0;
-    return std::move(head_);
+    return res;
   }
 
   /**
-   * Access
+   * Access the front IOBuf.
+   *
+   * Note: caller will see the current state of the chain, but may not see
+   *       future updates immediately, due to the presence of a tail cache.
+   * Note: the caller may potentially clone the chain, thus marking all buffers
+   *       as shared. We may still continue writing to the tail of the last
+   *       IOBuf without checking if it's shared, but this is fine, since the
+   *       cloned IOBufs won't reference that data.
    */
   const folly::IOBuf* front() const {
+    flushCache();
     return head_.get();
   }
 
@@ -240,14 +488,17 @@ class IOBufQueue {
     if (UNLIKELY(!options_.cacheChainLength)) {
       throw std::invalid_argument("IOBufQueue: chain length not cached");
     }
-    return chainLength_;
+    dcheckCacheIntegrity();
+    return chainLength_ + (cachePtr_->cachedRange.first - tailStart_);
   }
 
   /**
    * Returns true iff the IOBuf chain length is 0.
    */
   bool empty() const {
-    return !head_ || head_->empty();
+    dcheckCacheIntegrity();
+    return !head_ ||
+        (head_->empty() && cachePtr_->cachedRange.first == tailStart_);
   }
 
   const Options& options() const {
@@ -261,18 +512,22 @@ class IOBufQueue {
    */
   void clear();
 
+  /**
+   * Append the queue to a std::string. Non-destructive.
+   */
+  void appendToString(std::string& out) const;
+
+  /**
+   * Calls IOBuf::gather() on the head of the queue, if it exists.
+   */
+  void gather(uint64_t maxLength);
+
   /** Movable */
-  IOBufQueue(IOBufQueue&&);
+  IOBufQueue(IOBufQueue&&) noexcept;
   IOBufQueue& operator=(IOBufQueue&&);
 
  private:
-  IOBuf* tailBuf() const {
-    if (UNLIKELY(!head_)) return nullptr;
-    IOBuf* buf = head_->prev();
-    return LIKELY(!buf->isSharedOne()) ? buf : nullptr;
-  }
-  std::pair<void*,uint32_t> preallocateSlow(
-    uint32_t min, uint32_t newAllocationSize, uint32_t max);
+  std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow);
 
   static const size_t kChainLengthNotCached = (size_t)-1;
   /** Not copyable */
@@ -284,11 +539,105 @@ class IOBufQueue {
   // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
   // because doing it unchecked in postallocate() is faster (no (mis)predicted
   // branch)
-  size_t chainLength_;
-  /** Everything that has been appended but not yet discarded or moved out */
+  mutable size_t chainLength_{0};
+  /**
+   * Everything that has been appended but not yet discarded or moved out
+   * Note: anything that needs to operate on a tail should either call
+   * flushCache() or grab updateGuard() (it will flush the cache itself).
+   */
   std::unique_ptr<folly::IOBuf> head_;
-};
 
-} // folly
+  mutable uint8_t* tailStart_{nullptr};
+  WritableRangeCacheData* cachePtr_{nullptr};
+  WritableRangeCacheData localCache_;
+
+  void dcheckCacheIntegrity() const {
+    // Tail start should always be less than tail end.
+    DCHECK_LE(tailStart_, cachePtr_->cachedRange.first);
+    DCHECK_LE(cachePtr_->cachedRange.first, cachePtr_->cachedRange.second);
+    DCHECK(
+        cachePtr_->cachedRange.first != nullptr ||
+        cachePtr_->cachedRange.second == nullptr);
+
+    // There is always an attached cache instance.
+    DCHECK(cachePtr_->attached);
+
+    // Either cache is empty or it coincides with the tail.
+    DCHECK(
+        cachePtr_->cachedRange.first == nullptr ||
+        (head_ != nullptr && tailStart_ == head_->prev()->writableTail() &&
+         tailStart_ <= cachePtr_->cachedRange.first &&
+         cachePtr_->cachedRange.first >= head_->prev()->writableTail() &&
+         cachePtr_->cachedRange.second ==
+             head_->prev()->writableTail() + head_->prev()->tailroom()));
+  }
+
+  /**
+   * Populate dest with writable tail range cache.
+   */
+  void fillWritableRangeCache(WritableRangeCacheData& dest) {
+    dcheckCacheIntegrity();
+    if (cachePtr_ != &dest) {
+      dest = std::move(*cachePtr_);
+      cachePtr_ = &dest;
+    }
+  }
+
+  /**
+   * Clear current writable tail cache and reset it to localCache_
+   */
+  void clearWritableRangeCache() {
+    flushCache();
+
+    if (cachePtr_ != &localCache_) {
+      localCache_ = std::move(*cachePtr_);
+      cachePtr_ = &localCache_;
+    }
+
+    DCHECK(cachePtr_ == &localCache_ && localCache_.attached);
+  }
+
+  /**
+   * Commit any pending changes to the tail of the queue.
+   */
+  void flushCache() const {
+    dcheckCacheIntegrity();
+
+    if (tailStart_ != cachePtr_->cachedRange.first) {
+      auto buf = head_->prev();
+      DCHECK_EQ(
+          buf->writableTail() + buf->tailroom(), cachePtr_->cachedRange.second);
+      auto len = cachePtr_->cachedRange.first - tailStart_;
+      buf->append(len);
+      chainLength_ += len;
+      tailStart_ += len;
+    }
+  }
+
+  // For WritableRangeCache move assignment/construction.
+  void updateCacheRef(WritableRangeCacheData& newRef) {
+    cachePtr_ = &newRef;
+  }
+
+  /**
+   * Update cached writable tail range. Called by updateGuard()
+   */
+  void updateWritableTailCache() {
+    if (LIKELY(head_ != nullptr)) {
+      IOBuf* buf = head_->prev();
+      if (LIKELY(!buf->isSharedOne())) {
+        tailStart_ = buf->writableTail();
+        cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
+            tailStart_, tailStart_ + buf->tailroom());
+        return;
+      }
+    }
+    tailStart_ = nullptr;
+    cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>();
+  }
+
+  std::pair<void*, uint64_t>
+  preallocateSlow(uint64_t min, uint64_t newAllocationSize, uint64_t max);
+};
 
-#endif // FOLLY_IO_IOBUF_QUEUE_H
+} // namespace folly