From bae9c76c2a7f76355d2d31d8fb98718314bb6a82 Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Tue, 27 Aug 2013 18:31:27 -0700 Subject: [PATCH] folly::io::QueueAppender speedup Summary: Make QueueAppender (as used in the thrift2 protocol writer) much faster. Benchmarks in a separate diff (for thrift2). A few things: - remove IOBuf range checks in optimized mode - simplify QueueAppender: maxGrowth wasn't used, removed it - simplify QueueAppender: don't duplicate the work in IOBufQueue, let IOBufQueue keep track of memory - speed up IOBuf::isSharedOne(): fast-path case where the buffer was never shared (so don't read the reference count in that case) - fast-path in QueueAppender if size is known at compile time @override-unit-failures compilation errors outside of folly fixed in subsequent diff Test Plan: folly/io/test, both dbg and opt Reviewed By: davejwatson@fb.com FB internal diff: D946907 --- folly/io/Cursor.h | 83 +++++++++++-------------------- folly/io/IOBuf.cpp | 5 +- folly/io/IOBuf.h | 36 ++++++++++---- folly/io/IOBufQueue.cpp | 62 +++++------------------ folly/io/IOBufQueue.h | 39 +++++++++++++-- folly/io/test/IOBufCursorTest.cpp | 4 +- 6 files changed, 107 insertions(+), 122 deletions(-) diff --git a/folly/io/Cursor.h b/folly/io/Cursor.h index 79fd4cc7..48ba6baf 100644 --- a/folly/io/Cursor.h +++ b/folly/io/Cursor.h @@ -371,17 +371,20 @@ class Writable { typename std::enable_if::value>::type write(T value) { const uint8_t* u8 = reinterpret_cast(&value); + Derived* d = static_cast(this); push(u8, sizeof(T)); } template void writeBE(T value) { - write(Endian::big(value)); + Derived* d = static_cast(this); + d->write(Endian::big(value)); } template void writeLE(T value) { - write(Endian::little(value)); + Derived* d = static_cast(this); + d->write(Endian::little(value)); } void push(const uint8_t* buf, size_t len) { @@ -603,90 +606,62 @@ class QueueAppender : public detail::Writable { * Create an Appender that writes to a IOBufQueue. When we allocate * space in the queue, we grow no more than growth bytes at once * (unless you call ensure() with a bigger value yourself). - * Throw if we ever need to allocate more than maxTotalGrowth. */ - QueueAppender(IOBufQueue* queue, - uint32_t growth, - size_t maxTotalGrowth = std::numeric_limits::max()) { - reset(queue, growth, maxTotalGrowth); + QueueAppender(IOBufQueue* queue, uint32_t growth) { + reset(queue, growth); } - void reset(IOBufQueue* queue, - uint32_t growth, - size_t maxTotalGrowth = std::numeric_limits::max()) { + void reset(IOBufQueue* queue, uint32_t growth) { queue_ = queue; growth_ = growth; - remainingGrowth_ = maxTotalGrowth; - next_ = nullptr; - available_ = 0; } - uint8_t* writableData() { return next_; } + uint8_t* writableData() { + return static_cast(queue_->writableTail()); + } - size_t length() const { return available_; } + size_t length() const { return queue_->tailroom(); } - void append(size_t n) { - assert(n <= available_); - assert(n <= remainingGrowth_); - queue_->postallocate(n); - next_ += n; - available_ -= n; - remainingGrowth_ -= n; - } + void append(size_t n) { queue_->postallocate(n); } // Ensure at least n contiguous; can go above growth_, throws if // not enough room. - void ensure(uint32_t n) { - if (UNLIKELY(n > remainingGrowth_)) { - throw std::out_of_range("overflow"); - } - - if (LIKELY(length() >= n)) { - return; - } + void ensure(uint32_t n) { queue_->preallocate(n, growth_); } - size_t desired = std::min(growth_, remainingGrowth_ - n); - - // Grab some more. - auto p = queue_->preallocate(n, desired); - - next_ = static_cast(p.first); - available_ = p.second; + template + typename std::enable_if::value>::type + write(T value) { + // We can't fail. + auto p = queue_->preallocate(sizeof(T), growth_); + storeUnaligned(p.first, value); + queue_->postallocate(sizeof(T)); } - size_t pushAtMost(const uint8_t* buf, size_t len) { - if (UNLIKELY(len > remainingGrowth_)) { - len = remainingGrowth_; - } + size_t pushAtMost(const uint8_t* buf, size_t len) { size_t remaining = len; while (remaining != 0) { - ensure(std::min(remaining, growth_)); - size_t n = std::min(remaining, available_); - memcpy(next_, buf, n); - buf += n; - remaining -= n; - append(n); + auto p = queue_->preallocate(std::min(remaining, growth_), + growth_, + remaining); + memcpy(p.first, buf, p.second); + queue_->postallocate(p.second); + buf += p.second; + remaining -= p.second; } return len; } - // insert doesn't count towards maxTotalGrowth void insert(std::unique_ptr buf) { if (buf) { queue_->append(std::move(buf), true); - next_ = nullptr; - available_ = 0; } } private: folly::IOBufQueue* queue_; size_t growth_; - size_t remainingGrowth_; - uint8_t* next_; - size_t available_; }; }} // folly::io diff --git a/folly/io/IOBuf.cpp b/folly/io/IOBuf.cpp index 8fc040a5..13ebdac3 100644 --- a/folly/io/IOBuf.cpp +++ b/folly/io/IOBuf.cpp @@ -280,6 +280,9 @@ unique_ptr IOBuf::clone() const { unique_ptr IOBuf::cloneOne() const { if (flags_ & kFlagExt) { + if (ext_.sharedInfo) { + flags_ |= kFlagMaybeShared; + } unique_ptr iobuf(new IOBuf(static_cast(ext_.type), flags_, ext_.buf, ext_.capacity, data_, length_, @@ -480,7 +483,7 @@ void IOBuf::decrementRefcount() { void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { size_t newCapacity = (size_t)length_ + minHeadroom + minTailroom; - CHECK_LT(newCapacity, UINT32_MAX); + DCHECK_LT(newCapacity, UINT32_MAX); // We'll need to reallocate the buffer. // There are a few options. diff --git a/folly/io/IOBuf.h b/folly/io/IOBuf.h index 3a2fa383..3473d05e 100644 --- a/folly/io/IOBuf.h +++ b/folly/io/IOBuf.h @@ -556,7 +556,7 @@ class IOBuf { * This does not modify any actual data in the buffer. */ void prepend(uint32_t amount) { - CHECK(amount <= headroom()); + DCHECK_LE(amount, headroom()); data_ -= amount; length_ += amount; } @@ -572,7 +572,7 @@ class IOBuf { * This does not modify any actual data in the buffer. */ void append(uint32_t amount) { - CHECK(amount <= tailroom()); + DCHECK_LE(amount, tailroom()); length_ += amount; } @@ -586,7 +586,7 @@ class IOBuf { * This does not modify any actual data in the buffer. */ void trimStart(uint32_t amount) { - CHECK(amount <= length_); + DCHECK_LE(amount, length_); data_ += amount; length_ -= amount; } @@ -601,7 +601,7 @@ class IOBuf { * This does not modify any actual data in the buffer. */ void trimEnd(uint32_t amount) { - CHECK(amount <= length_); + DCHECK_LE(amount, length_); length_ -= amount; } @@ -809,16 +809,29 @@ class IOBuf { * This only checks the current IOBuf, and not other IOBufs in the chain. */ bool isSharedOne() const { + if (LIKELY(flags_ & (kFlagUserOwned | kFlagMaybeShared)) == 0) { + return false; + } + // If this is a user-owned buffer, it is always considered shared if (flags_ & kFlagUserOwned) { return true; } - if (flags_ & kFlagExt) { - return ext_.sharedInfo->refcount.load(std::memory_order_acquire) > 1; - } else { - return false; + // an internal buffer wouldn't have kFlagMaybeShared or kFlagUserOwned + // so we would have returned false already. The only remaining case + // is an external buffer which may be shared, so we need to read + // the reference count. + assert((flags_ & (kFlagExt | kFlagMaybeShared)) == + (kFlagExt | kFlagMaybeShared)); + + bool shared = + ext_.sharedInfo->refcount.load(std::memory_order_acquire) > 1; + if (!shared) { + // we're the last one left + flags_ &= ~kFlagMaybeShared; } + return shared; } /** @@ -972,10 +985,11 @@ class IOBuf { Iterator end() const; private: - enum FlagsEnum { + enum FlagsEnum : uint32_t { kFlagExt = 0x1, kFlagUserOwned = 0x2, kFlagFreeSharedInfo = 0x4, + kFlagMaybeShared = 0x8, }; // Values for the ExternalBuf type field. @@ -1082,7 +1096,7 @@ class IOBuf { */ uint8_t* data_; uint32_t length_; - uint32_t flags_; + mutable uint32_t flags_; union { ExternalBuf ext_; @@ -1123,7 +1137,7 @@ typename std::enable_if::value, std::unique_ptr>::type IOBuf::takeOwnership(UniquePtr&& buf, size_t count) { size_t size = count * sizeof(typename UniquePtr::element_type); - CHECK_LT(size, size_t(std::numeric_limits::max())); + DCHECK_LT(size, size_t(std::numeric_limits::max())); auto deleter = new UniquePtrDeleter(buf.get_deleter()); return takeOwnership(buf.release(), size, diff --git a/folly/io/IOBufQueue.cpp b/folly/io/IOBufQueue.cpp index c3137256..5106e063 100644 --- a/folly/io/IOBufQueue.cpp +++ b/folly/io/IOBufQueue.cpp @@ -104,9 +104,7 @@ IOBufQueue::markPrepended(uint32_t n) { } assert(head_); head_->prepend(n); - if (options_.cacheChainLength) { - chainLength_ += n; - } + chainLength_ += n; } void @@ -162,9 +160,7 @@ IOBufQueue::append(const void* buf, size_t len) { memcpy(last->writableTail(), src, copyLen); src += copyLen; last->append(copyLen); - if (options_.cacheChainLength) { - chainLength_ += copyLen; - } + chainLength_ += copyLen; len -= copyLen; } } @@ -181,18 +177,8 @@ IOBufQueue::wrapBuffer(const void* buf, size_t len, uint32_t blockSize) { } pair -IOBufQueue::preallocate(uint32_t min, uint32_t newAllocationSize, - uint32_t max) { - if (head_ != nullptr) { - // If there's enough space left over at the end of the queue, use that. - IOBuf* last = head_->prev(); - if (!last->isSharedOne()) { - uint32_t avail = last->tailroom(); - if (avail >= min) { - return make_pair(last->writableTail(), std::min(max, avail)); - } - } - } +IOBufQueue::preallocateSlow(uint32_t min, uint32_t newAllocationSize, + uint32_t max) { // Allocate a new buffer of the requested max size. unique_ptr newBuf(IOBuf::create(std::max(min, newAllocationSize))); appendToChain(head_, std::move(newBuf), false); @@ -201,14 +187,6 @@ IOBufQueue::preallocate(uint32_t min, uint32_t newAllocationSize, std::min(max, last->tailroom())); } -void -IOBufQueue::postallocate(uint32_t n) { - head_->prev()->append(n); - if (options_.cacheChainLength) { - chainLength_ += n; - } -} - unique_ptr IOBufQueue::split(size_t n) { unique_ptr result; @@ -218,9 +196,7 @@ IOBufQueue::split(size_t n) { "Attempt to remove more bytes than are present in IOBufQueue"); } else if (head_->length() <= n) { n -= head_->length(); - if (options_.cacheChainLength) { - chainLength_ -= head_->length(); - } + chainLength_ -= head_->length(); unique_ptr remainder = head_->pop(); appendToChain(result, std::move(head_), false); head_ = std::move(remainder); @@ -229,9 +205,7 @@ IOBufQueue::split(size_t n) { clone->trimEnd(clone->length() - n); appendToChain(result, std::move(clone), false); head_->trimStart(n); - if (options_.cacheChainLength) { - chainLength_ -= n; - } + chainLength_ -= n; break; } } @@ -246,15 +220,11 @@ void IOBufQueue::trimStart(size_t amount) { } if (head_->length() > amount) { head_->trimStart(amount); - if (options_.cacheChainLength) { - chainLength_ -= amount; - } + chainLength_ -= amount; break; } amount -= head_->length(); - if (options_.cacheChainLength) { - chainLength_ -= head_->length(); - } + chainLength_ -= head_->length(); head_ = head_->pop(); } } @@ -267,15 +237,11 @@ void IOBufQueue::trimEnd(size_t amount) { } if (head_->prev()->length() > amount) { head_->prev()->trimEnd(amount); - if (options_.cacheChainLength) { - chainLength_ -= amount; - } + chainLength_ -= amount; break; } amount -= head_->prev()->length(); - if (options_.cacheChainLength) { - chainLength_ -= head_->prev()->length(); - } + chainLength_ -= head_->prev()->length(); if (head_->isChained()) { head_->prev()->unlink(); @@ -289,9 +255,7 @@ std::unique_ptr IOBufQueue::pop_front() { if (!head_) { return nullptr; } - if (options_.cacheChainLength) { - chainLength_ -= head_->length(); - } + chainLength_ -= head_->length(); std::unique_ptr retBuf = std::move(head_); head_ = retBuf->pop(); return retBuf; @@ -306,9 +270,7 @@ void IOBufQueue::clear() { buf->clear(); buf = buf->next(); } while (buf != head_.get()); - if (options_.cacheChainLength) { - chainLength_ = 0; - } + chainLength_ = 0; } } // folly diff --git a/folly/io/IOBufQueue.h b/folly/io/IOBufQueue.h index 71966f49..7046dc00 100644 --- a/folly/io/IOBufQueue.h +++ b/folly/io/IOBufQueue.h @@ -139,7 +139,15 @@ class IOBufQueue { */ std::pair preallocate( uint32_t min, uint32_t newAllocationSize, - uint32_t max = std::numeric_limits::max()); + uint32_t max = std::numeric_limits::max()) { + auto buf = tailBuf(); + if (LIKELY(buf && buf->tailroom() >= min)) { + return std::make_pair(buf->writableTail(), + std::min(max, buf->tailroom())); + } + + return preallocateSlow(min, newAllocationSize, max); + } /** * Tell the queue that the caller has written data into the first n @@ -151,7 +159,10 @@ class IOBufQueue { * invoke any other non-const methods on this IOBufQueue between * the call to preallocate and the call to postallocate(). */ - void postallocate(uint32_t n); + void postallocate(uint32_t n) { + head_->prev()->append(n); + chainLength_ += n; + } /** * Obtain a writable block of n contiguous bytes, allocating more space @@ -163,6 +174,16 @@ class IOBufQueue { return p; } + void* writableTail() const { + auto buf = tailBuf(); + return buf ? buf->writableTail() : nullptr; + } + + size_t tailroom() const { + auto buf = tailBuf(); + return buf ? buf->tailroom() : 0; + } + /** * Split off the first n bytes of the queue into a separate IOBuf chain, * and transfer ownership of the new chain to the caller. The IOBufQueue @@ -216,7 +237,7 @@ class IOBufQueue { * constructor. */ size_t chainLength() const { - if (!options_.cacheChainLength) { + if (UNLIKELY(!options_.cacheChainLength)) { throw std::invalid_argument("IOBufQueue: chain length not cached"); } return chainLength_; @@ -245,12 +266,24 @@ class IOBufQueue { IOBufQueue& operator=(IOBufQueue&&); private: + IOBuf* tailBuf() const { + if (UNLIKELY(!head_)) return nullptr; + IOBuf* buf = head_->prev(); + return LIKELY(!buf->isSharedOne()) ? buf : nullptr; + } + std::pair preallocateSlow( + uint32_t min, uint32_t newAllocationSize, uint32_t max); + static const size_t kChainLengthNotCached = (size_t)-1; /** Not copyable */ IOBufQueue(const IOBufQueue&) = delete; IOBufQueue& operator=(const IOBufQueue&) = delete; Options options_; + + // NOTE that chainLength_ is still updated even if !options_.cacheChainLength + // because doing it unchecked in postallocate() is faster (no (mis)predicted + // branch) size_t chainLength_; /** Everything that has been appended but not yet discarded or moved out */ std::unique_ptr head_; diff --git a/folly/io/test/IOBufCursorTest.cpp b/folly/io/test/IOBufCursorTest.cpp index 4a5366dd..54536e67 100644 --- a/folly/io/test/IOBufCursorTest.cpp +++ b/folly/io/test/IOBufCursorTest.cpp @@ -330,14 +330,12 @@ TEST(IOBuf, QueueAppender) { folly::IOBufQueue queue; // Allocate 100 bytes at once, but don't grow past 1024 - QueueAppender app(&queue, 100, 1024); + QueueAppender app(&queue, 100); size_t n = 1024 / sizeof(uint32_t); for (uint32_t i = 0; i < n; ++i) { app.writeBE(i); } - EXPECT_THROW({app.writeBE(0);}, std::out_of_range); - // There must be a goodMallocSize between 100 and 1024... EXPECT_LT(1, queue.front()->countChainElements()); const IOBuf* buf = queue.front(); -- 2.34.1