X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2FIOBufQueue.h;h=fb8653c6a35af59195dae63f313ad4e041546761;hb=HEAD;hp=21dd020e4ee4531067b473b85cf459481b397f70;hpb=81823a9cb036baed2a3cfe5b352832e6340e6a39;p=folly.git diff --git a/folly/io/IOBufQueue.h b/folly/io/IOBufQueue.h index 21dd020e..fb8653c6 100644 --- a/folly/io/IOBufQueue.h +++ b/folly/io/IOBufQueue.h @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2013-present Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,10 +14,10 @@ * limitations under the License. */ -#ifndef FOLLY_IO_IOBUF_QUEUE_H -#define FOLLY_IO_IOBUF_QUEUE_H +#pragma once -#include "folly/io/IOBuf.h" +#include +#include #include #include @@ -33,9 +33,46 @@ namespace folly { * chain, if any. */ class IOBufQueue { + private: + /** + * This guard should be taken by any method that intends to do any changes + * to in data_ (e.g. appending to it). + * + * It flushes the writable tail cache and refills it on destruction. + */ + auto updateGuard() { + flushCache(); + return folly::makeGuard([this] { updateWritableTailCache(); }); + } + + struct WritableRangeCacheData { + std::pair cachedRange; + bool attached{false}; + + WritableRangeCacheData() = default; + + WritableRangeCacheData(WritableRangeCacheData&& other) + : cachedRange(other.cachedRange), attached(other.attached) { + other.cachedRange = {}; + other.attached = false; + } + WritableRangeCacheData& operator=(WritableRangeCacheData&& other) { + cachedRange = other.cachedRange; + attached = other.attached; + + other.cachedRange = {}; + other.attached = false; + + return *this; + } + + WritableRangeCacheData(const WritableRangeCacheData&) = delete; + WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete; + }; + public: struct Options { - Options() : cacheChainLength(false) { } + Options() : cacheChainLength(false) {} bool cacheChainLength; }; @@ -49,7 +86,185 @@ class IOBufQueue { return options; } + /** + * WritableRangeCache represents a cache of current writable tail and provides + * cheap and simple interface to append to it that avoids paying the cost of + * preallocate/postallocate pair (i.e. indirections and checks). + * + * The cache is flushed on destruction/copy/move and on non-const accesses to + * the underlying IOBufQueue. + * + * Note: there can be only one active cache for a given IOBufQueue, i.e. when + * you fill a cache object it automatically invalidates other + * cache (if any). + */ + class WritableRangeCache { + public: + explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) { + if (queue_) { + fillCache(); + } + } + + /** + * Move constructor/assignment can move the cached range, but must update + * the reference in IOBufQueue. + */ + WritableRangeCache(WritableRangeCache&& other) + : data_(std::move(other.data_)), queue_(other.queue_) { + if (data_.attached) { + queue_->updateCacheRef(data_); + } + } + WritableRangeCache& operator=(WritableRangeCache&& other) { + if (data_.attached) { + queue_->clearWritableRangeCache(); + } + + data_ = std::move(other.data_); + queue_ = other.queue_; + + if (data_.attached) { + queue_->updateCacheRef(data_); + } + + return *this; + } + + /** + * Copy constructor/assignment cannot copy the cached range. + */ + WritableRangeCache(const WritableRangeCache& other) + : queue_(other.queue_) {} + WritableRangeCache& operator=(const WritableRangeCache& other) { + if (data_.attached) { + queue_->clearWritableRangeCache(); + } + + queue_ = other.queue_; + + return *this; + } + + ~WritableRangeCache() { + if (data_.attached) { + queue_->clearWritableRangeCache(); + } + } + + /** + * Reset the underlying IOBufQueue, will flush current cache if present. + */ + void reset(IOBufQueue* q) { + if (data_.attached) { + queue_->clearWritableRangeCache(); + } + + queue_ = q; + + if (queue_) { + fillCache(); + } + } + + /** + * Get a pointer to the underlying IOBufQueue object. + */ + IOBufQueue* queue() { + return queue_; + } + + /** + * Return a pointer to the start of cached writable tail. + * + * Note: doesn't populate cache. + */ + uint8_t* writableData() { + dcheckIntegrity(); + return data_.cachedRange.first; + } + + /** + * Return a length of cached writable tail. + * + * Note: doesn't populate cache. + */ + size_t length() { + dcheckIntegrity(); + return data_.cachedRange.second - data_.cachedRange.first; + } + + /** + * Mark n bytes as occupied (e.g. postallocate). + */ + void append(size_t n) { + dcheckIntegrity(); + // This can happen only if somebody is misusing the interface. + // E.g. calling append after touching IOBufQueue or without checking + // the length(). + if (LIKELY(data_.cachedRange.first != nullptr)) { + DCHECK_LE(n, length()); + data_.cachedRange.first += n; + } else { + appendSlow(n); + } + } + + /** + * Same as append(n), but avoids checking if there is a cache. + * The caller must guarantee that the cache is set (e.g. the caller just + * called fillCache or checked that it's not empty). + */ + void appendUnsafe(size_t n) { + data_.cachedRange.first += n; + } + + /** + * Fill the cache of writable tail from the underlying IOBufQueue. + */ + void fillCache() { + queue_->fillWritableRangeCache(data_); + } + + private: + WritableRangeCacheData data_; + IOBufQueue* queue_; + + FOLLY_NOINLINE void appendSlow(size_t n) { + queue_->postallocate(n); + } + + void dcheckIntegrity() { + // Tail start should always be less than tail end. + DCHECK_LE( + (void*)data_.cachedRange.first, (void*)data_.cachedRange.second); + DCHECK( + data_.cachedRange.first != nullptr || + data_.cachedRange.second == nullptr); + + // Cached range should be always empty if the cache is not attached. + DCHECK( + data_.attached || + (data_.cachedRange.first == nullptr && + data_.cachedRange.second == nullptr)); + + // We cannot be in attached state if the queue_ is not set. + DCHECK(queue_ != nullptr || !data_.attached); + + // If we're attached and the cache is not empty, then it should coincide + // with the tail buffer. + DCHECK( + !data_.attached || data_.cachedRange.first == nullptr || + (queue_->head_ != nullptr && + data_.cachedRange.first >= queue_->head_->prev()->writableTail() && + data_.cachedRange.second == + queue_->head_->prev()->writableTail() + + queue_->head_->prev()->tailroom())); + } + }; + explicit IOBufQueue(const Options& options = Options()); + ~IOBufQueue(); /** * Return a space to prepend bytes and the amount of headroom available. @@ -98,8 +313,8 @@ class IOBufQueue { * Copy a string to the end of this queue. * The caller retains ownership of the source data. */ - void append(const std::string& buf) { - append(buf.data(), buf.length()); + void append(StringPiece sp) { + append(sp.data(), sp.size()); } /** @@ -140,10 +355,11 @@ class IOBufQueue { std::pair preallocate( uint64_t min, uint64_t newAllocationSize, uint64_t max = std::numeric_limits::max()) { - auto buf = tailBuf(); - if (LIKELY(buf && buf->tailroom() >= min)) { - return std::make_pair(buf->writableTail(), - std::min(max, buf->tailroom())); + dcheckCacheIntegrity(); + + if (LIKELY(writableTail() != nullptr && tailroom() >= min)) { + return std::make_pair( + writableTail(), std::min(max, tailroom())); } return preallocateSlow(min, newAllocationSize, max); @@ -160,8 +376,11 @@ class IOBufQueue { * the call to preallocate and the call to postallocate(). */ void postallocate(uint64_t n) { - head_->prev()->append(n); - chainLength_ += n; + dcheckCacheIntegrity(); + DCHECK_LE( + (void*)(cachePtr_->cachedRange.first + n), + (void*)cachePtr_->cachedRange.second); + cachePtr_->cachedRange.first += n; } /** @@ -175,13 +394,13 @@ class IOBufQueue { } void* writableTail() const { - auto buf = tailBuf(); - return buf ? buf->writableTail() : nullptr; + dcheckCacheIntegrity(); + return cachePtr_->cachedRange.first; } size_t tailroom() const { - auto buf = tailBuf(); - return buf ? buf->tailroom() : 0; + dcheckCacheIntegrity(); + return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first; } /** @@ -196,7 +415,17 @@ class IOBufQueue { * @throws std::underflow_error if n exceeds the number of bytes * in the queue. */ - std::unique_ptr split(size_t n); + std::unique_ptr split(size_t n) { + return split(n, true); + } + + /** + * Similar to split, but will return the entire queue instead of throwing + * if n exceeds the number of bytes in the queue. + */ + std::unique_ptr splitAtMost(size_t n) { + return split(n, false); + } /** * Similar to IOBuf::trimStart, but works on the whole queue. Will @@ -204,24 +433,46 @@ class IOBufQueue { */ void trimStart(size_t amount); + /** + * Similar to trimStart, but will trim at most amount bytes and returns + * the number of bytes trimmed. + */ + size_t trimStartAtMost(size_t amount); + /** * Similar to IOBuf::trimEnd, but works on the whole queue. Will * pop off buffers that have been completely trimmed. */ void trimEnd(size_t amount); + /** + * Similar to trimEnd, but will trim at most amount bytes and returns + * the number of bytes trimmed. + */ + size_t trimEndAtMost(size_t amount); + /** * Transfer ownership of the queue's entire IOBuf chain to the caller. */ std::unique_ptr move() { + auto guard = updateGuard(); + std::unique_ptr res = std::move(head_); chainLength_ = 0; - return std::move(head_); + return res; } /** - * Access + * Access the front IOBuf. + * + * Note: caller will see the current state of the chain, but may not see + * future updates immediately, due to the presence of a tail cache. + * Note: the caller may potentially clone the chain, thus marking all buffers + * as shared. We may still continue writing to the tail of the last + * IOBuf without checking if it's shared, but this is fine, since the + * cloned IOBufs won't reference that data. */ const folly::IOBuf* front() const { + flushCache(); return head_.get(); } @@ -240,14 +491,17 @@ class IOBufQueue { if (UNLIKELY(!options_.cacheChainLength)) { throw std::invalid_argument("IOBufQueue: chain length not cached"); } - return chainLength_; + dcheckCacheIntegrity(); + return chainLength_ + (cachePtr_->cachedRange.first - tailStart_); } /** * Returns true iff the IOBuf chain length is 0. */ bool empty() const { - return !head_ || head_->empty(); + dcheckCacheIntegrity(); + return !head_ || + (head_->empty() && cachePtr_->cachedRange.first == tailStart_); } const Options& options() const { @@ -261,18 +515,22 @@ class IOBufQueue { */ void clear(); + /** + * Append the queue to a std::string. Non-destructive. + */ + void appendToString(std::string& out) const; + + /** + * Calls IOBuf::gather() on the head of the queue, if it exists. + */ + void gather(uint64_t maxLength); + /** Movable */ - IOBufQueue(IOBufQueue&&); + IOBufQueue(IOBufQueue&&) noexcept; IOBufQueue& operator=(IOBufQueue&&); private: - IOBuf* tailBuf() const { - if (UNLIKELY(!head_)) return nullptr; - IOBuf* buf = head_->prev(); - return LIKELY(!buf->isSharedOne()) ? buf : nullptr; - } - std::pair preallocateSlow( - uint64_t min, uint64_t newAllocationSize, uint64_t max); + std::unique_ptr split(size_t n, bool throwOnUnderflow); static const size_t kChainLengthNotCached = (size_t)-1; /** Not copyable */ @@ -284,11 +542,108 @@ class IOBufQueue { // NOTE that chainLength_ is still updated even if !options_.cacheChainLength // because doing it unchecked in postallocate() is faster (no (mis)predicted // branch) - size_t chainLength_; - /** Everything that has been appended but not yet discarded or moved out */ + mutable size_t chainLength_{0}; + /** + * Everything that has been appended but not yet discarded or moved out + * Note: anything that needs to operate on a tail should either call + * flushCache() or grab updateGuard() (it will flush the cache itself). + */ std::unique_ptr head_; -}; -} // folly + mutable uint8_t* tailStart_{nullptr}; + WritableRangeCacheData* cachePtr_{nullptr}; + WritableRangeCacheData localCache_; + + void dcheckCacheIntegrity() const { + // Tail start should always be less than tail end. + DCHECK_LE((void*)tailStart_, (void*)cachePtr_->cachedRange.first); + DCHECK_LE( + (void*)cachePtr_->cachedRange.first, + (void*)cachePtr_->cachedRange.second); + DCHECK( + cachePtr_->cachedRange.first != nullptr || + cachePtr_->cachedRange.second == nullptr); + + // There is always an attached cache instance. + DCHECK(cachePtr_->attached); + + // Either cache is empty or it coincides with the tail. + DCHECK( + cachePtr_->cachedRange.first == nullptr || + (head_ != nullptr && tailStart_ == head_->prev()->writableTail() && + tailStart_ <= cachePtr_->cachedRange.first && + cachePtr_->cachedRange.first >= head_->prev()->writableTail() && + cachePtr_->cachedRange.second == + head_->prev()->writableTail() + head_->prev()->tailroom())); + } + + /** + * Populate dest with writable tail range cache. + */ + void fillWritableRangeCache(WritableRangeCacheData& dest) { + dcheckCacheIntegrity(); + if (cachePtr_ != &dest) { + dest = std::move(*cachePtr_); + cachePtr_ = &dest; + } + } + + /** + * Clear current writable tail cache and reset it to localCache_ + */ + void clearWritableRangeCache() { + flushCache(); + + if (cachePtr_ != &localCache_) { + localCache_ = std::move(*cachePtr_); + cachePtr_ = &localCache_; + } + + DCHECK(cachePtr_ == &localCache_ && localCache_.attached); + } + + /** + * Commit any pending changes to the tail of the queue. + */ + void flushCache() const { + dcheckCacheIntegrity(); + + if (tailStart_ != cachePtr_->cachedRange.first) { + auto buf = head_->prev(); + DCHECK_EQ( + (void*)(buf->writableTail() + buf->tailroom()), + (void*)cachePtr_->cachedRange.second); + auto len = cachePtr_->cachedRange.first - tailStart_; + buf->append(len); + chainLength_ += len; + tailStart_ += len; + } + } + + // For WritableRangeCache move assignment/construction. + void updateCacheRef(WritableRangeCacheData& newRef) { + cachePtr_ = &newRef; + } + + /** + * Update cached writable tail range. Called by updateGuard() + */ + void updateWritableTailCache() { + if (LIKELY(head_ != nullptr)) { + IOBuf* buf = head_->prev(); + if (LIKELY(!buf->isSharedOne())) { + tailStart_ = buf->writableTail(); + cachePtr_->cachedRange = std::pair( + tailStart_, tailStart_ + buf->tailroom()); + return; + } + } + tailStart_ = nullptr; + cachePtr_->cachedRange = std::pair(); + } + + std::pair + preallocateSlow(uint64_t min, uint64_t newAllocationSize, uint64_t max); +}; -#endif // FOLLY_IO_IOBUF_QUEUE_H +} // namespace folly