From 64072ab5f788cd1081bdd98ba6dbaabbcd1fafbd Mon Sep 17 00:00:00 2001 From: Stepan Palamarchuk Date: Mon, 4 Dec 2017 15:08:24 -0800 Subject: [PATCH] Improve QueueAppender/IOBufQueue performance Summary: Currently QueueAppender needs to follow a chain of 4 indirections (QueueAppender->IOBufQueue->IOBuf(head)->IOBuf(tail)->data). This diff adds a cache of writable tail range in IOBufQueue and allows it to be placed externally. Before this diff on hot path QueueAppender::write was ~167 bytes of code (with majority being actually executed), after this diff it's down to ~30 bytes: 0x0000000000419d10 <+0>: mov (%rdi),%rax 0x0000000000419d13 <+3>: cmp %rax,0x8(%rdi) 0x0000000000419d17 <+7>: je 0x419d28 (signed char)+24> 0x0000000000419d19 <+9>: mov %sil,(%rax) 0x0000000000419d1c <+12>: addq $0x1,(%rdi) 0x0000000000419d20 <+16>: retq 0x0000000000419d21 <+17>: nopl 0x0(%rax) 0x0000000000419d28 <+24>: movsbl %sil,%esi 0x0000000000419d2c <+28>: jmpq 0x419ca0 (signed char)> With this diff, Thrift serialization performance is improved up to 2x with production workloads (2x for compact, 3x for binary). Thrift benchmark output: Before: ============================================================================ thrift/lib/cpp2/test/ProtocolBench.cpp relative time/iter iters/s ============================================================================ BinaryProtocol_write_Empty 58.05ns 17.23M BinaryProtocol_write_SmallInt 75.17ns 13.30M BinaryProtocol_write_BigInt 74.60ns 13.41M BinaryProtocol_write_SmallString 85.12ns 11.75M BinaryProtocol_write_BigString 802.96ns 1.25M BinaryProtocol_write_BigBinary 174.69ns 5.72M BinaryProtocol_write_LargeBinary 171.81ns 5.82M BinaryProtocol_write_Mixed 130.97ns 7.64M BinaryProtocol_write_SmallListInt 123.99ns 8.06M BinaryProtocol_write_BigListInt 40.72us 24.56K BinaryProtocol_write_BigListMixed 784.78us 1.27K BinaryProtocol_write_LargeListMixed 98.84ms 10.12 CompactProtocol_write_Empty 64.38ns 15.53M CompactProtocol_write_SmallInt 76.74ns 13.03M CompactProtocol_write_BigInt 83.62ns 11.96M CompactProtocol_write_SmallString 86.05ns 11.62M CompactProtocol_write_BigString 786.18ns 1.27M CompactProtocol_write_BigBinary 184.91ns 5.41M CompactProtocol_write_LargeBinary 182.12ns 5.49M CompactProtocol_write_Mixed 120.89ns 8.27M CompactProtocol_write_SmallListInt 119.74ns 8.35M CompactProtocol_write_BigListInt 43.76us 22.85K CompactProtocol_write_BigListMixed 595.90us 1.68K CompactProtocol_write_LargeListMixed 72.80ms 13.74 ============================================================================ After: ============================================================================ thrift/lib/cpp2/test/ProtocolBench.cpp relative time/iter iters/s ============================================================================ BinaryProtocol_write_Empty 65.97ns 15.16M BinaryProtocol_write_SmallInt 72.31ns 13.83M BinaryProtocol_write_BigInt 72.67ns 13.76M BinaryProtocol_write_SmallString 77.56ns 12.89M BinaryProtocol_write_BigString 782.07ns 1.28M BinaryProtocol_write_BigBinary 179.69ns 5.57M BinaryProtocol_write_LargeBinary 182.62ns 5.48M BinaryProtocol_write_Mixed 91.62ns 10.92M BinaryProtocol_write_SmallListInt 96.22ns 10.39M BinaryProtocol_write_BigListInt 19.65us 50.90K BinaryProtocol_write_BigListMixed 245.69us 4.07K BinaryProtocol_write_LargeListMixed 46.56ms 21.48 CompactProtocol_write_Empty 74.44ns 13.43M CompactProtocol_write_SmallInt 80.35ns 12.45M CompactProtocol_write_BigInt 85.30ns 11.72M CompactProtocol_write_SmallString 82.61ns 12.10M CompactProtocol_write_BigString 784.77ns 1.27M CompactProtocol_write_BigBinary 193.20ns 5.18M CompactProtocol_write_LargeBinary 192.53ns 5.19M CompactProtocol_write_Mixed 99.78ns 10.02M CompactProtocol_write_SmallListInt 104.77ns 9.54M CompactProtocol_write_BigListInt 25.62us 39.03K CompactProtocol_write_BigListMixed 272.42us 3.67K CompactProtocol_write_LargeListMixed 38.32ms 26.09 ============================================================================ QueueAppender Benchmark output (although not very representative due to a tight loop): Before: ============================================================================ folly/io/test/QueueAppenderBenchmark.cpp relative time/iter iters/s ============================================================================ write_uint8 10.50us 95.20K write_uint16 5.48us 182.49K write_uint32 2.73us 366.22K push_64b 9.77us 102.36K push_1024b 112.87us 8.86K append 64.21us 15.57K preallocate_postallocate_1b 16.34us 61.19K preallocate_postallocate_4b 15.56us 64.26K preallocate_postallocate_32b 22.17us 45.11K preallocate_postallocate_256b 149.55us 6.69K ============================================================================ After: ============================================================================ folly/io/test/QueueAppenderBenchmark.cpp relative time/iter iters/s ============================================================================ write_uint8 8.86us 112.81K write_uint16 3.91us 255.68K write_uint32 2.08us 481.78K push_64b 8.24us 121.39K push_1024b 115.50us 8.66K append 67.52us 14.81K preallocate_postallocate_1b 13.86us 72.17K preallocate_postallocate_4b 11.67us 85.71K preallocate_postallocate_32b 20.35us 49.14K preallocate_postallocate_256b 148.57us 6.73K ============================================================================ Reviewed By: yfeldblum Differential Revision: D6427749 fbshipit-source-id: 8495cc74b6106b15d201e37533ae4c0a1abc9d74 --- folly/io/Cursor.h | 69 +++-- folly/io/IOBufQueue.cpp | 81 ++++- folly/io/IOBufQueue.h | 370 +++++++++++++++++++++-- folly/io/test/QueueAppenderBenchmark.cpp | 136 +++++++++ 4 files changed, 591 insertions(+), 65 deletions(-) create mode 100644 folly/io/test/QueueAppenderBenchmark.cpp diff --git a/folly/io/Cursor.h b/folly/io/Cursor.h index ac5ec91f..ba0dd36f 100644 --- a/folly/io/Cursor.h +++ b/folly/io/Cursor.h @@ -982,34 +982,43 @@ class QueueAppender : public detail::Writable { * space in the queue, we grow no more than growth bytes at once * (unless you call ensure() with a bigger value yourself). */ - QueueAppender(IOBufQueue* queue, uint64_t growth) { - reset(queue, growth); - } + QueueAppender(IOBufQueue* queue, uint64_t growth) + : queueCache_(queue), growth_(growth) {} void reset(IOBufQueue* queue, uint64_t growth) { - queue_ = queue; + queueCache_.reset(queue); growth_ = growth; } uint8_t* writableData() { - return static_cast(queue_->writableTail()); + return queueCache_.writableData(); } - size_t length() const { return queue_->tailroom(); } + size_t length() { + return queueCache_.length(); + } - void append(size_t n) { queue_->postallocate(n); } + void append(size_t n) { + queueCache_.append(n); + } // Ensure at least n contiguous; can go above growth_, throws if // not enough room. - void ensure(uint64_t n) { queue_->preallocate(n, growth_); } + void ensure(size_t n) { + if (length() < n) { + ensureSlow(n); + } + } template - typename std::enable_if::value>::type - write(T value) { + typename std::enable_if::value>::type write(T value) { // We can't fail. - auto p = queue_->preallocate(sizeof(T), growth_); - storeUnaligned(p.first, value); - queue_->postallocate(sizeof(T)); + if (length() >= sizeof(T)) { + storeUnaligned(queueCache_.writableData(), value); + queueCache_.appendUnsafe(sizeof(T)); + } else { + writeSlow(value); + } } using detail::Writable::pushAtMost; @@ -1018,27 +1027,25 @@ class QueueAppender : public detail::Writable { const size_t copyLength = std::min(len, length()); if (copyLength != 0) { memcpy(writableData(), buf, copyLength); - append(copyLength); + queueCache_.appendUnsafe(copyLength); buf += copyLength; } - // Allocate more buffers as necessary size_t remaining = len - copyLength; + // Allocate more buffers as necessary while (remaining != 0) { - auto p = queue_->preallocate(std::min(remaining, growth_), - growth_, - remaining); + auto p = queueCache_.queue()->preallocate( + std::min(remaining, growth_), growth_, remaining); memcpy(p.first, buf, p.second); - queue_->postallocate(p.second); + queueCache_.queue()->postallocate(p.second); buf += p.second; remaining -= p.second; } - return len; } void insert(std::unique_ptr buf) { if (buf) { - queue_->append(std::move(buf), true); + queueCache_.queue()->append(std::move(buf), true); } } @@ -1047,9 +1054,25 @@ class QueueAppender : public detail::Writable { } private: - folly::IOBufQueue* queue_; - size_t growth_; + folly::IOBufQueue::WritableRangeCache queueCache_{nullptr}; + size_t growth_{0}; + + FOLLY_NOINLINE void ensureSlow(size_t n) { + queueCache_.queue()->preallocate(n, growth_); + queueCache_.fillCache(); + } + + template + typename std::enable_if::value>::type FOLLY_NOINLINE + writeSlow(T value) { + queueCache_.queue()->preallocate(sizeof(T), growth_); + queueCache_.fillCache(); + + storeUnaligned(queueCache_.writableData(), value); + queueCache_.appendUnsafe(sizeof(T)); + } }; + } // namespace io } // namespace folly diff --git a/folly/io/IOBufQueue.cpp b/folly/io/IOBufQueue.cpp index df1518e0..d54761e4 100644 --- a/folly/io/IOBufQueue.cpp +++ b/folly/io/IOBufQueue.cpp @@ -67,29 +67,53 @@ appendToChain(unique_ptr& dst, unique_ptr&& src, bool pack) { namespace folly { IOBufQueue::IOBufQueue(const Options& options) - : options_(options), - chainLength_(0) { + : options_(options), cachePtr_(&localCache_) { + localCache_.attached = true; +} + +IOBufQueue::~IOBufQueue() { + clearWritableRangeCache(); } IOBufQueue::IOBufQueue(IOBufQueue&& other) noexcept - : options_(other.options_), - chainLength_(other.chainLength_), - head_(std::move(other.head_)) { + : options_(other.options_), cachePtr_(&localCache_) { + other.clearWritableRangeCache(); + head_ = std::move(other.head_); + chainLength_ = other.chainLength_; + + tailStart_ = other.tailStart_; + localCache_.cachedRange = other.localCache_.cachedRange; + localCache_.attached = true; + other.chainLength_ = 0; + other.tailStart_ = nullptr; + other.localCache_.cachedRange = {nullptr, nullptr}; } IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) { if (&other != this) { + other.clearWritableRangeCache(); + clearWritableRangeCache(); + options_ = other.options_; - chainLength_ = other.chainLength_; head_ = std::move(other.head_); + chainLength_ = other.chainLength_; + + tailStart_ = other.tailStart_; + localCache_.cachedRange = other.localCache_.cachedRange; + localCache_.attached = true; + other.chainLength_ = 0; + other.tailStart_ = nullptr; + other.localCache_.cachedRange = {nullptr, nullptr}; } return *this; } std::pair IOBufQueue::headroom() { + // Note, headroom is independent from the tail, so we don't need to flush the + // cache. if (head_) { return std::make_pair(head_->writableBuffer(), head_->headroom()); } else { @@ -102,6 +126,8 @@ IOBufQueue::markPrepended(uint64_t n) { if (n == 0) { return; } + // Note, headroom is independent from the tail, so we don't need to flush the + // cache. assert(head_); head_->prepend(n); chainLength_ += n; @@ -109,12 +135,14 @@ IOBufQueue::markPrepended(uint64_t n) { void IOBufQueue::prepend(const void* buf, uint64_t n) { - auto p = headroom(); - if (n > p.second) { + // We're not touching the tail, so we don't need to flush the cache. + auto hroom = head_->headroom(); + if (!head_ || hroom < n) { throw std::overflow_error("Not enough room to prepend"); } - memcpy(static_cast(p.first) + p.second - n, buf, n); - markPrepended(n); + memcpy(head_->writableBuffer() + hroom - n, buf, n); + head_->prepend(n); + chainLength_ += n; } void @@ -122,6 +150,7 @@ IOBufQueue::append(unique_ptr&& buf, bool pack) { if (!buf) { return; } + auto guard = updateGuard(); if (options_.cacheChainLength) { chainLength_ += buf->computeChainDataLength(); } @@ -133,6 +162,9 @@ IOBufQueue::append(IOBufQueue& other, bool pack) { if (!other.head_) { return; } + // We're going to chain other, thus we need to grab both guards. + auto otherGuard = other.updateGuard(); + auto guard = updateGuard(); if (options_.cacheChainLength) { if (other.options_.cacheChainLength) { chainLength_ += other.chainLength_; @@ -146,6 +178,7 @@ IOBufQueue::append(IOBufQueue& other, bool pack) { void IOBufQueue::append(const void* buf, size_t len) { + auto guard = updateGuard(); auto src = static_cast(buf); while (len != 0) { if ((head_ == nullptr) || head_->prev()->isSharedOne() || @@ -179,15 +212,20 @@ IOBufQueue::wrapBuffer(const void* buf, size_t len, uint64_t blockSize) { pair IOBufQueue::preallocateSlow(uint64_t min, uint64_t newAllocationSize, uint64_t max) { + // Avoid grabbing update guard, since we're manually setting the cache ptrs. + flushCache(); // Allocate a new buffer of the requested max size. unique_ptr newBuf(IOBuf::create(std::max(min, newAllocationSize))); + + tailStart_ = newBuf->writableTail(); + cachePtr_->cachedRange = std::pair( + tailStart_, tailStart_ + newBuf->tailroom()); appendToChain(head_, std::move(newBuf), false); - IOBuf* last = head_->prev(); - return make_pair(last->writableTail(), - std::min(max, last->tailroom())); + return make_pair(writableTail(), std::min(max, tailroom())); } unique_ptr IOBufQueue::split(size_t n, bool throwOnUnderflow) { + auto guard = updateGuard(); unique_ptr result; while (n != 0) { if (head_ == nullptr) { @@ -227,6 +265,7 @@ void IOBufQueue::trimStart(size_t amount) { } size_t IOBufQueue::trimStartAtMost(size_t amount) { + auto guard = updateGuard(); auto original = amount; while (amount > 0) { if (!head_) { @@ -254,6 +293,7 @@ void IOBufQueue::trimEnd(size_t amount) { } size_t IOBufQueue::trimEndAtMost(size_t amount) { + auto guard = updateGuard(); auto original = amount; while (amount > 0) { if (!head_) { @@ -278,6 +318,7 @@ size_t IOBufQueue::trimEndAtMost(size_t amount) { } std::unique_ptr IOBufQueue::pop_front() { + auto guard = updateGuard(); if (!head_) { return nullptr; } @@ -291,6 +332,7 @@ void IOBufQueue::clear() { if (!head_) { return; } + auto guard = updateGuard(); IOBuf* buf = head_.get(); do { buf->clear(); @@ -303,16 +345,25 @@ void IOBufQueue::appendToString(std::string& out) const { if (!head_) { return; } - auto len = - options_.cacheChainLength ? chainLength_ : head_->computeChainDataLength(); + auto len = options_.cacheChainLength + ? chainLength_ + (cachePtr_->cachedRange.first - tailStart_) + : head_->computeChainDataLength() + + (cachePtr_->cachedRange.first - tailStart_); out.reserve(out.size() + len); for (auto range : *head_) { out.append(reinterpret_cast(range.data()), range.size()); } + + if (tailStart_ != cachePtr_->cachedRange.first) { + out.append( + reinterpret_cast(tailStart_), + cachePtr_->cachedRange.first - tailStart_); + } } void IOBufQueue::gather(uint64_t maxLength) { + auto guard = updateGuard(); if (head_ != nullptr) { head_->gather(maxLength); } diff --git a/folly/io/IOBufQueue.h b/folly/io/IOBufQueue.h index c801920d..5c627bc9 100644 --- a/folly/io/IOBufQueue.h +++ b/folly/io/IOBufQueue.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include @@ -32,9 +33,46 @@ namespace folly { * chain, if any. */ class IOBufQueue { + private: + /** + * This guard should be taken by any method that intends to do any changes + * to in data_ (e.g. appending to it). + * + * It flushes the writable tail cache and refills it on destruction. + */ + auto updateGuard() { + flushCache(); + return folly::makeGuard([this] { updateWritableTailCache(); }); + } + + struct WritableRangeCacheData { + std::pair cachedRange; + bool attached{false}; + + WritableRangeCacheData() = default; + + WritableRangeCacheData(WritableRangeCacheData&& other) + : cachedRange(other.cachedRange), attached(other.attached) { + other.cachedRange = {}; + other.attached = false; + } + WritableRangeCacheData& operator=(WritableRangeCacheData&& other) { + cachedRange = other.cachedRange; + attached = other.attached; + + other.cachedRange = {}; + other.attached = false; + + return *this; + } + + WritableRangeCacheData(const WritableRangeCacheData&) = delete; + WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete; + }; + public: struct Options { - Options() : cacheChainLength(false) { } + Options() : cacheChainLength(false) {} bool cacheChainLength; }; @@ -48,7 +86,184 @@ class IOBufQueue { return options; } + /** + * WritableRangeCache represents a cache of current writable tail and provides + * cheap and simple interface to append to it that avoids paying the cost of + * preallocate/postallocate pair (i.e. indirections and checks). + * + * The cache is flushed on destruction/copy/move and on non-const accesses to + * the underlying IOBufQueue. + * + * Note: there can be only one active cache for a given IOBufQueue, i.e. when + * you fill a cache object it automatically invalidates other + * cache (if any). + */ + class WritableRangeCache { + public: + explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) { + if (queue_) { + fillCache(); + } + } + + /** + * Move constructor/assignment can move the cached range, but must update + * the reference in IOBufQueue. + */ + WritableRangeCache(WritableRangeCache&& other) + : data_(std::move(other.data_)), queue_(other.queue_) { + if (data_.attached) { + queue_->updateCacheRef(data_); + } + } + WritableRangeCache& operator=(WritableRangeCache&& other) { + if (data_.attached) { + queue_->clearWritableRangeCache(); + } + + data_ = std::move(other.data_); + queue_ = other.queue_; + + if (data_.attached) { + queue_->updateCacheRef(data_); + } + + return *this; + } + + /** + * Copy constructor/assignment cannot copy the cached range. + */ + WritableRangeCache(const WritableRangeCache& other) + : queue_(other.queue_) {} + WritableRangeCache& operator=(const WritableRangeCache& other) { + if (data_.attached) { + queue_->clearWritableRangeCache(); + } + + queue_ = other.queue_; + + return *this; + } + + ~WritableRangeCache() { + if (data_.attached) { + queue_->clearWritableRangeCache(); + } + } + + /** + * Reset the underlying IOBufQueue, will flush current cache if present. + */ + void reset(IOBufQueue* q) { + if (data_.attached) { + queue_->clearWritableRangeCache(); + } + + queue_ = q; + + if (queue_) { + fillCache(); + } + } + + /** + * Get a pointer to the underlying IOBufQueue object. + */ + IOBufQueue* queue() { + return queue_; + } + + /** + * Return a pointer to the start of cached writable tail. + * + * Note: doesn't populate cache. + */ + uint8_t* writableData() { + dcheckIntegrity(); + return data_.cachedRange.first; + } + + /** + * Return a length of cached writable tail. + * + * Note: doesn't populate cache. + */ + size_t length() { + dcheckIntegrity(); + return data_.cachedRange.second - data_.cachedRange.first; + } + + /** + * Mark n bytes as occupied (e.g. postallocate). + */ + void append(size_t n) { + dcheckIntegrity(); + // This can happen only if somebody is misusing the interface. + // E.g. calling append after touching IOBufQueue or without checking + // the length(). + if (LIKELY(data_.cachedRange.first != nullptr)) { + DCHECK_LE(n, length()); + data_.cachedRange.first += n; + } else { + appendSlow(n); + } + } + + /** + * Same as append(n), but avoids checking if there is a cache. + * The caller must guarantee that the cache is set (e.g. the caller just + * called fillCache or checked that it's not empty). + */ + void appendUnsafe(size_t n) { + data_.cachedRange.first += n; + } + + /** + * Fill the cache of writable tail from the underlying IOBufQueue. + */ + void fillCache() { + queue_->fillWritableRangeCache(data_); + } + + private: + WritableRangeCacheData data_; + IOBufQueue* queue_; + + FOLLY_NOINLINE void appendSlow(size_t n) { + queue_->postallocate(n); + } + + void dcheckIntegrity() { + // Tail start should always be less than tail end. + DCHECK_LE(data_.cachedRange.first, data_.cachedRange.second); + DCHECK( + data_.cachedRange.first != nullptr || + data_.cachedRange.second == nullptr); + + // Cached range should be always empty if the cache is not attached. + DCHECK( + data_.attached || + (data_.cachedRange.first == nullptr && + data_.cachedRange.second == nullptr)); + + // We cannot be in attached state if the queue_ is not set. + DCHECK(queue_ != nullptr || !data_.attached); + + // If we're attached and the cache is not empty, then it should coincide + // with the tail buffer. + DCHECK( + !data_.attached || data_.cachedRange.first == nullptr || + (queue_->head_ != nullptr && + data_.cachedRange.first >= queue_->head_->prev()->writableTail() && + data_.cachedRange.second == + queue_->head_->prev()->writableTail() + + queue_->head_->prev()->tailroom())); + } + }; + explicit IOBufQueue(const Options& options = Options()); + ~IOBufQueue(); /** * Return a space to prepend bytes and the amount of headroom available. @@ -139,10 +354,11 @@ class IOBufQueue { std::pair preallocate( uint64_t min, uint64_t newAllocationSize, uint64_t max = std::numeric_limits::max()) { - auto buf = tailBuf(); - if (LIKELY(buf && buf->tailroom() >= min)) { - return std::make_pair(buf->writableTail(), - std::min(max, buf->tailroom())); + dcheckCacheIntegrity(); + + if (LIKELY(writableTail() != nullptr && tailroom() >= min)) { + return std::make_pair( + writableTail(), std::min(max, tailroom())); } return preallocateSlow(min, newAllocationSize, max); @@ -159,8 +375,9 @@ class IOBufQueue { * the call to preallocate and the call to postallocate(). */ void postallocate(uint64_t n) { - head_->prev()->append(n); - chainLength_ += n; + dcheckCacheIntegrity(); + DCHECK_LE(cachePtr_->cachedRange.first + n, cachePtr_->cachedRange.second); + cachePtr_->cachedRange.first += n; } /** @@ -174,13 +391,13 @@ class IOBufQueue { } void* writableTail() const { - auto buf = tailBuf(); - return buf ? buf->writableTail() : nullptr; + dcheckCacheIntegrity(); + return cachePtr_->cachedRange.first; } size_t tailroom() const { - auto buf = tailBuf(); - return buf ? buf->tailroom() : 0; + dcheckCacheIntegrity(); + return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first; } /** @@ -235,14 +452,24 @@ class IOBufQueue { * Transfer ownership of the queue's entire IOBuf chain to the caller. */ std::unique_ptr move() { + auto guard = updateGuard(); + std::unique_ptr res = std::move(head_); chainLength_ = 0; - return std::move(head_); + return res; } /** - * Access + * Access the front IOBuf. + * + * Note: caller will see the current state of the chain, but may not see + * future updates immediately, due to the presence of a tail cache. + * Note: the caller may potentially clone the chain, thus marking all buffers + * as shared. We may still continue writing to the tail of the last + * IOBuf without checking if it's shared, but this is fine, since the + * cloned IOBufs won't reference that data. */ const folly::IOBuf* front() const { + flushCache(); return head_.get(); } @@ -261,14 +488,17 @@ class IOBufQueue { if (UNLIKELY(!options_.cacheChainLength)) { throw std::invalid_argument("IOBufQueue: chain length not cached"); } - return chainLength_; + dcheckCacheIntegrity(); + return chainLength_ + (cachePtr_->cachedRange.first - tailStart_); } /** * Returns true iff the IOBuf chain length is 0. */ bool empty() const { - return !head_ || head_->empty(); + dcheckCacheIntegrity(); + return !head_ || + (head_->empty() && cachePtr_->cachedRange.first == tailStart_); } const Options& options() const { @@ -297,16 +527,6 @@ class IOBufQueue { IOBufQueue& operator=(IOBufQueue&&); private: - IOBuf* tailBuf() const { - if (UNLIKELY(!head_)) { - return nullptr; - } - IOBuf* buf = head_->prev(); - return LIKELY(!buf->isSharedOne()) ? buf : nullptr; - } - std::pair preallocateSlow( - uint64_t min, uint64_t newAllocationSize, uint64_t max); - std::unique_ptr split(size_t n, bool throwOnUnderflow); static const size_t kChainLengthNotCached = (size_t)-1; @@ -319,9 +539,105 @@ class IOBufQueue { // NOTE that chainLength_ is still updated even if !options_.cacheChainLength // because doing it unchecked in postallocate() is faster (no (mis)predicted // branch) - size_t chainLength_; - /** Everything that has been appended but not yet discarded or moved out */ + mutable size_t chainLength_{0}; + /** + * Everything that has been appended but not yet discarded or moved out + * Note: anything that needs to operate on a tail should either call + * flushCache() or grab updateGuard() (it will flush the cache itself). + */ std::unique_ptr head_; + + mutable uint8_t* tailStart_{nullptr}; + WritableRangeCacheData* cachePtr_{nullptr}; + WritableRangeCacheData localCache_; + + void dcheckCacheIntegrity() const { + // Tail start should always be less than tail end. + DCHECK_LE(tailStart_, cachePtr_->cachedRange.first); + DCHECK_LE(cachePtr_->cachedRange.first, cachePtr_->cachedRange.second); + DCHECK( + cachePtr_->cachedRange.first != nullptr || + cachePtr_->cachedRange.second == nullptr); + + // There is always an attached cache instance. + DCHECK(cachePtr_->attached); + + // Either cache is empty or it coincides with the tail. + DCHECK( + cachePtr_->cachedRange.first == nullptr || + (head_ != nullptr && tailStart_ == head_->prev()->writableTail() && + tailStart_ <= cachePtr_->cachedRange.first && + cachePtr_->cachedRange.first >= head_->prev()->writableTail() && + cachePtr_->cachedRange.second == + head_->prev()->writableTail() + head_->prev()->tailroom())); + } + + /** + * Populate dest with writable tail range cache. + */ + void fillWritableRangeCache(WritableRangeCacheData& dest) { + dcheckCacheIntegrity(); + if (cachePtr_ != &dest) { + dest = std::move(*cachePtr_); + cachePtr_ = &dest; + } + } + + /** + * Clear current writable tail cache and reset it to localCache_ + */ + void clearWritableRangeCache() { + flushCache(); + + if (cachePtr_ != &localCache_) { + localCache_ = std::move(*cachePtr_); + cachePtr_ = &localCache_; + } + + DCHECK(cachePtr_ == &localCache_ && localCache_.attached); + } + + /** + * Commit any pending changes to the tail of the queue. + */ + void flushCache() const { + dcheckCacheIntegrity(); + + if (tailStart_ != cachePtr_->cachedRange.first) { + auto buf = head_->prev(); + DCHECK_EQ( + buf->writableTail() + buf->tailroom(), cachePtr_->cachedRange.second); + auto len = cachePtr_->cachedRange.first - tailStart_; + buf->append(len); + chainLength_ += len; + tailStart_ += len; + } + } + + // For WritableRangeCache move assignment/construction. + void updateCacheRef(WritableRangeCacheData& newRef) { + cachePtr_ = &newRef; + } + + /** + * Update cached writable tail range. Called by updateGuard() + */ + void updateWritableTailCache() { + if (LIKELY(head_ != nullptr)) { + IOBuf* buf = head_->prev(); + if (LIKELY(!buf->isSharedOne())) { + tailStart_ = buf->writableTail(); + cachePtr_->cachedRange = std::pair( + tailStart_, tailStart_ + buf->tailroom()); + return; + } + } + tailStart_ = nullptr; + cachePtr_->cachedRange = std::pair(); + } + + std::pair + preallocateSlow(uint64_t min, uint64_t newAllocationSize, uint64_t max); }; } // namespace folly diff --git a/folly/io/test/QueueAppenderBenchmark.cpp b/folly/io/test/QueueAppenderBenchmark.cpp new file mode 100644 index 00000000..9640beb2 --- /dev/null +++ b/folly/io/test/QueueAppenderBenchmark.cpp @@ -0,0 +1,136 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +DECLARE_bool(benchmark); + +using namespace folly::io; + +constexpr size_t kBenchmarkSize = 4096; + +template +void runArithmeticBench(int64_t iters) { + while (iters--) { + folly::IOBufQueue queue; + QueueAppender appender(&queue, kBenchmarkSize); + for (size_t i = 0; i < kBenchmarkSize / sizeof(T); ++i) { + appender.write((T)0xFB); + } + folly::doNotOptimizeAway(queue.move()); + } +} + +BENCHMARK(write_uint8, iters) { + runArithmeticBench(iters); +} + +BENCHMARK(write_uint16, iters) { + runArithmeticBench(iters); +} + +BENCHMARK(write_uint32, iters) { + runArithmeticBench(iters); +} + +void runPushBenchmark(int64_t iters, const std::string& str) { + constexpr size_t kNumPushPerIter = 1024; + while (iters--) { + folly::IOBufQueue queue; + QueueAppender appender(&queue, kBenchmarkSize); + for (size_t i = 0; i < kNumPushPerIter; ++i) { + appender.push(reinterpret_cast(str.data()), str.size()); + } + folly::doNotOptimizeAway(queue.move()); + } +} + +BENCHMARK(push_64b, iters) { + std::string data; + BENCHMARK_SUSPEND { + data = std::string(64, 'f'); + } + runPushBenchmark(iters, data); +} + +BENCHMARK(push_1024b, iters) { + std::string data; + BENCHMARK_SUSPEND { + data = std::string(1024, 'b'); + } + runPushBenchmark(iters, data); +} + +BENCHMARK(append, iters) { + constexpr size_t kNumAppendPerIter = 1024; + + std::unique_ptr largeBuffer; + BENCHMARK_SUSPEND { + largeBuffer = folly::IOBuf::create(1024); + largeBuffer->append(1024); + } + + while (iters--) { + folly::IOBufQueue queue; + QueueAppender appender(&queue, kBenchmarkSize); + for (size_t i = 0; i < kNumAppendPerIter; ++i) { + appender.insert(largeBuffer->clone()); + } + folly::doNotOptimizeAway(queue.move()); + } +} + +void preallocate_postallocate_bench(int64_t iters, size_t size) { + std::string data; + BENCHMARK_SUSPEND { + data = std::string(size, 'f'); + } + while (iters--) { + folly::IOBufQueue queue; + for (size_t i = 0; i < kBenchmarkSize; ++i) { + auto range = queue.preallocate(size, kBenchmarkSize); + memcpy(range.first, data.data(), size); + queue.postallocate(size); + } + folly::doNotOptimizeAway(queue.move()); + } +} + +BENCHMARK(preallocate_postallocate_1b, iters) { + preallocate_postallocate_bench(iters, 1); +} + +BENCHMARK(preallocate_postallocate_4b, iters) { + preallocate_postallocate_bench(iters, 4); +} + +BENCHMARK(preallocate_postallocate_32b, iters) { + preallocate_postallocate_bench(iters, 32); +} + +BENCHMARK(preallocate_postallocate_256b, iters) { + preallocate_postallocate_bench(iters, 256); +} + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + folly::runBenchmarks(); + return 0; +} -- 2.34.1