folly::io::QueueAppender speedup
authorTudor Bosman <tudorb@fb.com>
Wed, 28 Aug 2013 01:31:27 +0000 (18:31 -0700)
committerSara Golemon <sgolemon@fb.com>
Thu, 29 Aug 2013 16:54:26 +0000 (09:54 -0700)
Summary:
Make QueueAppender (as used in the thrift2 protocol writer) much faster.
Benchmarks in a separate diff (for thrift2).

A few things:
- remove IOBuf range checks in optimized mode
- simplify QueueAppender: maxGrowth wasn't used, removed it
- simplify QueueAppender: don't duplicate the work in IOBufQueue, let
IOBufQueue keep track of memory
- speed up IOBuf::isSharedOne(): fast-path case where the buffer was never
shared (so don't read the reference count in that case)
- fast-path in QueueAppender if size is known at compile time

@override-unit-failures
compilation errors outside of folly fixed in subsequent diff

Test Plan: folly/io/test, both dbg and opt

Reviewed By: davejwatson@fb.com

FB internal diff: D946907

folly/io/Cursor.h
folly/io/IOBuf.cpp
folly/io/IOBuf.h
folly/io/IOBufQueue.cpp
folly/io/IOBufQueue.h
folly/io/test/IOBufCursorTest.cpp

index 79fd4cc..48ba6ba 100644 (file)
@@ -371,17 +371,20 @@ class Writable {
   typename std::enable_if<std::is_integral<T>::value>::type
   write(T value) {
     const uint8_t* u8 = reinterpret_cast<const uint8_t*>(&value);
+    Derived* d = static_cast<Derived*>(this);
     push(u8, sizeof(T));
   }
 
   template <class T>
   void writeBE(T value) {
-    write(Endian::big(value));
+    Derived* d = static_cast<Derived*>(this);
+    d->write(Endian::big(value));
   }
 
   template <class T>
   void writeLE(T value) {
-    write(Endian::little(value));
+    Derived* d = static_cast<Derived*>(this);
+    d->write(Endian::little(value));
   }
 
   void push(const uint8_t* buf, size_t len) {
@@ -603,90 +606,62 @@ class QueueAppender : public detail::Writable<QueueAppender> {
    * Create an Appender that writes to a IOBufQueue.  When we allocate
    * space in the queue, we grow no more than growth bytes at once
    * (unless you call ensure() with a bigger value yourself).
-   * Throw if we ever need to allocate more than maxTotalGrowth.
    */
-  QueueAppender(IOBufQueue* queue,
-                uint32_t growth,
-                size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
-    reset(queue, growth, maxTotalGrowth);
+  QueueAppender(IOBufQueue* queue, uint32_t growth) {
+    reset(queue, growth);
   }
 
-  void reset(IOBufQueue* queue,
-             uint32_t growth,
-             size_t maxTotalGrowth = std::numeric_limits<size_t>::max()) {
+  void reset(IOBufQueue* queue, uint32_t growth) {
     queue_ = queue;
     growth_ = growth;
-    remainingGrowth_ = maxTotalGrowth;
-    next_ = nullptr;
-    available_ = 0;
   }
 
-  uint8_t* writableData() { return next_; }
+  uint8_t* writableData() {
+    return static_cast<uint8_t*>(queue_->writableTail());
+  }
 
-  size_t length() const { return available_; }
+  size_t length() const { return queue_->tailroom(); }
 
-  void append(size_t n) {
-    assert(n <= available_);
-    assert(n <= remainingGrowth_);
-    queue_->postallocate(n);
-    next_ += n;
-    available_ -= n;
-    remainingGrowth_ -= n;
-  }
+  void append(size_t n) { queue_->postallocate(n); }
 
   // Ensure at least n contiguous; can go above growth_, throws if
   // not enough room.
-  void ensure(uint32_t n) {
-    if (UNLIKELY(n > remainingGrowth_)) {
-      throw std::out_of_range("overflow");
-    }
-
-    if (LIKELY(length() >= n)) {
-      return;
-    }
+  void ensure(uint32_t n) { queue_->preallocate(n, growth_); }
 
-    size_t desired = std::min(growth_, remainingGrowth_ - n);
-
-    // Grab some more.
-    auto p = queue_->preallocate(n, desired);
-
-    next_ = static_cast<uint8_t*>(p.first);
-    available_ = p.second;
+  template <class T>
+  typename std::enable_if<std::is_integral<T>::value>::type
+  write(T value) {
+    // We can't fail.
+    auto p = queue_->preallocate(sizeof(T), growth_);
+    storeUnaligned(p.first, value);
+    queue_->postallocate(sizeof(T));
   }
 
-  size_t pushAtMost(const uint8_t* buf, size_t len) {
-    if (UNLIKELY(len > remainingGrowth_)) {
-      len = remainingGrowth_;
-    }
 
+  size_t pushAtMost(const uint8_t* buf, size_t len) {
     size_t remaining = len;
     while (remaining != 0) {
-      ensure(std::min(remaining, growth_));
-      size_t n = std::min(remaining, available_);
-      memcpy(next_, buf, n);
-      buf += n;
-      remaining -= n;
-      append(n);
+      auto p = queue_->preallocate(std::min(remaining, growth_),
+                                   growth_,
+                                   remaining);
+      memcpy(p.first, buf, p.second);
+      queue_->postallocate(p.second);
+      buf += p.second;
+      remaining -= p.second;
     }
 
     return len;
   }
 
-  // insert doesn't count towards maxTotalGrowth
   void insert(std::unique_ptr<folly::IOBuf> buf) {
     if (buf) {
       queue_->append(std::move(buf), true);
-      next_ = nullptr;
-      available_ = 0;
     }
   }
 
  private:
   folly::IOBufQueue* queue_;
   size_t growth_;
-  size_t remainingGrowth_;
-  uint8_t* next_;
-  size_t available_;
 };
 
 }}  // folly::io
index 8fc040a..13ebdac 100644 (file)
@@ -280,6 +280,9 @@ unique_ptr<IOBuf> IOBuf::clone() const {
 
 unique_ptr<IOBuf> IOBuf::cloneOne() const {
   if (flags_ & kFlagExt) {
+    if (ext_.sharedInfo) {
+      flags_ |= kFlagMaybeShared;
+    }
     unique_ptr<IOBuf> iobuf(new IOBuf(static_cast<ExtBufTypeEnum>(ext_.type),
                                       flags_, ext_.buf, ext_.capacity,
                                       data_, length_,
@@ -480,7 +483,7 @@ void IOBuf::decrementRefcount() {
 
 void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) {
   size_t newCapacity = (size_t)length_ + minHeadroom + minTailroom;
-  CHECK_LT(newCapacity, UINT32_MAX);
+  DCHECK_LT(newCapacity, UINT32_MAX);
 
   // We'll need to reallocate the buffer.
   // There are a few options.
index 3a2fa38..3473d05 100644 (file)
@@ -556,7 +556,7 @@ class IOBuf {
    * This does not modify any actual data in the buffer.
    */
   void prepend(uint32_t amount) {
-    CHECK(amount <= headroom());
+    DCHECK_LE(amount, headroom());
     data_ -= amount;
     length_ += amount;
   }
@@ -572,7 +572,7 @@ class IOBuf {
    * This does not modify any actual data in the buffer.
    */
   void append(uint32_t amount) {
-    CHECK(amount <= tailroom());
+    DCHECK_LE(amount, tailroom());
     length_ += amount;
   }
 
@@ -586,7 +586,7 @@ class IOBuf {
    * This does not modify any actual data in the buffer.
    */
   void trimStart(uint32_t amount) {
-    CHECK(amount <= length_);
+    DCHECK_LE(amount, length_);
     data_ += amount;
     length_ -= amount;
   }
@@ -601,7 +601,7 @@ class IOBuf {
    * This does not modify any actual data in the buffer.
    */
   void trimEnd(uint32_t amount) {
-    CHECK(amount <= length_);
+    DCHECK_LE(amount, length_);
     length_ -= amount;
   }
 
@@ -809,16 +809,29 @@ class IOBuf {
    * This only checks the current IOBuf, and not other IOBufs in the chain.
    */
   bool isSharedOne() const {
+    if (LIKELY(flags_ & (kFlagUserOwned | kFlagMaybeShared)) == 0) {
+      return false;
+    }
+
     // If this is a user-owned buffer, it is always considered shared
     if (flags_ & kFlagUserOwned) {
       return true;
     }
 
-    if (flags_ & kFlagExt) {
-      return ext_.sharedInfo->refcount.load(std::memory_order_acquire) > 1;
-    } else {
-      return false;
+    // an internal buffer wouldn't have kFlagMaybeShared or kFlagUserOwned
+    // so we would have returned false already.  The only remaining case
+    // is an external buffer which may be shared, so we need to read
+    // the reference count.
+    assert((flags_ & (kFlagExt | kFlagMaybeShared)) ==
+           (kFlagExt | kFlagMaybeShared));
+
+    bool shared =
+      ext_.sharedInfo->refcount.load(std::memory_order_acquire) > 1;
+    if (!shared) {
+      // we're the last one left
+      flags_ &= ~kFlagMaybeShared;
     }
+    return shared;
   }
 
   /**
@@ -972,10 +985,11 @@ class IOBuf {
   Iterator end() const;
 
  private:
-  enum FlagsEnum {
+  enum FlagsEnum : uint32_t {
     kFlagExt = 0x1,
     kFlagUserOwned = 0x2,
     kFlagFreeSharedInfo = 0x4,
+    kFlagMaybeShared = 0x8,
   };
 
   // Values for the ExternalBuf type field.
@@ -1082,7 +1096,7 @@ class IOBuf {
    */
   uint8_t* data_;
   uint32_t length_;
-  uint32_t flags_;
+  mutable uint32_t flags_;
 
   union {
     ExternalBuf ext_;
@@ -1123,7 +1137,7 @@ typename std::enable_if<detail::IsUniquePtrToSL<UniquePtr>::value,
                         std::unique_ptr<IOBuf>>::type
 IOBuf::takeOwnership(UniquePtr&& buf, size_t count) {
   size_t size = count * sizeof(typename UniquePtr::element_type);
-  CHECK_LT(size, size_t(std::numeric_limits<uint32_t>::max()));
+  DCHECK_LT(size, size_t(std::numeric_limits<uint32_t>::max()));
   auto deleter = new UniquePtrDeleter<UniquePtr>(buf.get_deleter());
   return takeOwnership(buf.release(),
                        size,
index c313725..5106e06 100644 (file)
@@ -104,9 +104,7 @@ IOBufQueue::markPrepended(uint32_t n) {
   }
   assert(head_);
   head_->prepend(n);
-  if (options_.cacheChainLength) {
-    chainLength_ += n;
-  }
+  chainLength_ += n;
 }
 
 void
@@ -162,9 +160,7 @@ IOBufQueue::append(const void* buf, size_t len) {
     memcpy(last->writableTail(), src, copyLen);
     src += copyLen;
     last->append(copyLen);
-    if (options_.cacheChainLength) {
-      chainLength_ += copyLen;
-    }
+    chainLength_ += copyLen;
     len -= copyLen;
   }
 }
@@ -181,18 +177,8 @@ IOBufQueue::wrapBuffer(const void* buf, size_t len, uint32_t blockSize) {
 }
 
 pair<void*,uint32_t>
-IOBufQueue::preallocate(uint32_t min, uint32_t newAllocationSize,
-                        uint32_t max) {
-  if (head_ != nullptr) {
-    // If there's enough space left over at the end of the queue, use that.
-    IOBuf* last = head_->prev();
-    if (!last->isSharedOne()) {
-      uint32_t avail = last->tailroom();
-      if (avail >= min) {
-        return make_pair(last->writableTail(), std::min(max, avail));
-      }
-    }
-  }
+IOBufQueue::preallocateSlow(uint32_t min, uint32_t newAllocationSize,
+                            uint32_t max) {
   // Allocate a new buffer of the requested max size.
   unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
   appendToChain(head_, std::move(newBuf), false);
@@ -201,14 +187,6 @@ IOBufQueue::preallocate(uint32_t min, uint32_t newAllocationSize,
                    std::min(max, last->tailroom()));
 }
 
-void
-IOBufQueue::postallocate(uint32_t n) {
-  head_->prev()->append(n);
-  if (options_.cacheChainLength) {
-    chainLength_ += n;
-  }
-}
-
 unique_ptr<IOBuf>
 IOBufQueue::split(size_t n) {
   unique_ptr<IOBuf> result;
@@ -218,9 +196,7 @@ IOBufQueue::split(size_t n) {
           "Attempt to remove more bytes than are present in IOBufQueue");
     } else if (head_->length() <= n) {
       n -= head_->length();
-      if (options_.cacheChainLength) {
-        chainLength_ -= head_->length();
-      }
+      chainLength_ -= head_->length();
       unique_ptr<IOBuf> remainder = head_->pop();
       appendToChain(result, std::move(head_), false);
       head_ = std::move(remainder);
@@ -229,9 +205,7 @@ IOBufQueue::split(size_t n) {
       clone->trimEnd(clone->length() - n);
       appendToChain(result, std::move(clone), false);
       head_->trimStart(n);
-      if (options_.cacheChainLength) {
-        chainLength_ -= n;
-      }
+      chainLength_ -= n;
       break;
     }
   }
@@ -246,15 +220,11 @@ void IOBufQueue::trimStart(size_t amount) {
     }
     if (head_->length() > amount) {
       head_->trimStart(amount);
-      if (options_.cacheChainLength) {
-        chainLength_ -= amount;
-      }
+      chainLength_ -= amount;
       break;
     }
     amount -= head_->length();
-    if (options_.cacheChainLength) {
-      chainLength_ -= head_->length();
-    }
+    chainLength_ -= head_->length();
     head_ = head_->pop();
   }
 }
@@ -267,15 +237,11 @@ void IOBufQueue::trimEnd(size_t amount) {
     }
     if (head_->prev()->length() > amount) {
       head_->prev()->trimEnd(amount);
-      if (options_.cacheChainLength) {
-        chainLength_ -= amount;
-      }
+      chainLength_ -= amount;
       break;
     }
     amount -= head_->prev()->length();
-    if (options_.cacheChainLength) {
-      chainLength_ -= head_->prev()->length();
-    }
+    chainLength_ -= head_->prev()->length();
 
     if (head_->isChained()) {
       head_->prev()->unlink();
@@ -289,9 +255,7 @@ std::unique_ptr<folly::IOBuf> IOBufQueue::pop_front() {
   if (!head_) {
     return nullptr;
   }
-  if (options_.cacheChainLength) {
-    chainLength_ -= head_->length();
-  }
+  chainLength_ -= head_->length();
   std::unique_ptr<folly::IOBuf> retBuf = std::move(head_);
   head_ = retBuf->pop();
   return retBuf;
@@ -306,9 +270,7 @@ void IOBufQueue::clear() {
     buf->clear();
     buf = buf->next();
   } while (buf != head_.get());
-  if (options_.cacheChainLength) {
-    chainLength_ = 0;
-  }
+  chainLength_ = 0;
 }
 
 } // folly
index 71966f4..7046dc0 100644 (file)
@@ -139,7 +139,15 @@ class IOBufQueue {
    */
   std::pair<void*,uint32_t> preallocate(
     uint32_t min, uint32_t newAllocationSize,
-    uint32_t max = std::numeric_limits<uint32_t>::max());
+    uint32_t max = std::numeric_limits<uint32_t>::max()) {
+    auto buf = tailBuf();
+    if (LIKELY(buf && buf->tailroom() >= min)) {
+      return std::make_pair(buf->writableTail(),
+                            std::min(max, buf->tailroom()));
+    }
+
+    return preallocateSlow(min, newAllocationSize, max);
+  }
 
   /**
    * Tell the queue that the caller has written data into the first n
@@ -151,7 +159,10 @@ class IOBufQueue {
    *       invoke any other non-const methods on this IOBufQueue between
    *       the call to preallocate and the call to postallocate().
    */
-  void postallocate(uint32_t n);
+  void postallocate(uint32_t n) {
+    head_->prev()->append(n);
+    chainLength_ += n;
+  }
 
   /**
    * Obtain a writable block of n contiguous bytes, allocating more space
@@ -163,6 +174,16 @@ class IOBufQueue {
     return p;
   }
 
+  void* writableTail() const {
+    auto buf = tailBuf();
+    return buf ? buf->writableTail() : nullptr;
+  }
+
+  size_t tailroom() const {
+    auto buf = tailBuf();
+    return buf ? buf->tailroom() : 0;
+  }
+
   /**
    * Split off the first n bytes of the queue into a separate IOBuf chain,
    * and transfer ownership of the new chain to the caller.  The IOBufQueue
@@ -216,7 +237,7 @@ class IOBufQueue {
    * constructor.
    */
   size_t chainLength() const {
-    if (!options_.cacheChainLength) {
+    if (UNLIKELY(!options_.cacheChainLength)) {
       throw std::invalid_argument("IOBufQueue: chain length not cached");
     }
     return chainLength_;
@@ -245,12 +266,24 @@ class IOBufQueue {
   IOBufQueue& operator=(IOBufQueue&&);
 
  private:
+  IOBuf* tailBuf() const {
+    if (UNLIKELY(!head_)) return nullptr;
+    IOBuf* buf = head_->prev();
+    return LIKELY(!buf->isSharedOne()) ? buf : nullptr;
+  }
+  std::pair<void*,uint32_t> preallocateSlow(
+    uint32_t min, uint32_t newAllocationSize, uint32_t max);
+
   static const size_t kChainLengthNotCached = (size_t)-1;
   /** Not copyable */
   IOBufQueue(const IOBufQueue&) = delete;
   IOBufQueue& operator=(const IOBufQueue&) = delete;
 
   Options options_;
+
+  // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
+  // because doing it unchecked in postallocate() is faster (no (mis)predicted
+  // branch)
   size_t chainLength_;
   /** Everything that has been appended but not yet discarded or moved out */
   std::unique_ptr<folly::IOBuf> head_;
index 4a5366d..54536e6 100644 (file)
@@ -330,14 +330,12 @@ TEST(IOBuf, QueueAppender) {
   folly::IOBufQueue queue;
 
   // Allocate 100 bytes at once, but don't grow past 1024
-  QueueAppender app(&queue, 100, 1024);
+  QueueAppender app(&queue, 100);
   size_t n = 1024 / sizeof(uint32_t);
   for (uint32_t i = 0; i < n; ++i) {
     app.writeBE(i);
   }
 
-  EXPECT_THROW({app.writeBE(0);}, std::out_of_range);
-
   // There must be a goodMallocSize between 100 and 1024...
   EXPECT_LT(1, queue.front()->countChainElements());
   const IOBuf* buf = queue.front();