From: Tudor Bosman Date: Mon, 14 Jan 2013 21:52:31 +0000 (-0800) Subject: graduate IOBuf out of folly/experimental X-Git-Tag: v0.22.0~1090 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=7fb1e3d7114111532bde7f15e17b1e5df89616cf graduate IOBuf out of folly/experimental Summary: Move IOBuf and related code to folly/io. Test Plan: fbconfig -r folly && fbmake runtests_opt, fbconfig unicorn/test && fbmake opt Reviewed By: andrewcox@fb.com FB internal diff: D678331 --- diff --git a/folly/Subprocess.cpp b/folly/Subprocess.cpp index 9070f4ca..e8963dc8 100644 --- a/folly/Subprocess.cpp +++ b/folly/Subprocess.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2012 Facebook, Inc. + * Copyright 2013 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ #include "folly/Conv.h" #include "folly/ScopeGuard.h" #include "folly/String.h" -#include "folly/experimental/io/Cursor.h" +#include "folly/io/Cursor.h" extern char** environ; diff --git a/folly/Subprocess.h b/folly/Subprocess.h index cd083fa2..47ec4260 100644 --- a/folly/Subprocess.h +++ b/folly/Subprocess.h @@ -1,5 +1,5 @@ /* - * Copyright 2012 Facebook, Inc. + * Copyright 2013 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,7 +66,7 @@ #include #include -#include "folly/experimental/io/IOBufQueue.h" +#include "folly/io/IOBufQueue.h" #include "folly/MapUtil.h" #include "folly/Portability.h" #include "folly/Range.h" diff --git a/folly/experimental/FileGen.h b/folly/experimental/FileGen.h index ab5af127..a819a7b7 100644 --- a/folly/experimental/FileGen.h +++ b/folly/experimental/FileGen.h @@ -1,5 +1,5 @@ /* - * Copyright 2012 Facebook, Inc. + * Copyright 2013 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ #include "folly/experimental/File.h" #include "folly/experimental/Gen.h" -#include "folly/experimental/io/IOBuf.h" +#include "folly/io/IOBuf.h" namespace folly { namespace gen { diff --git a/folly/experimental/StringGen-inl.h b/folly/experimental/StringGen-inl.h index e3ad981b..005381a1 100644 --- a/folly/experimental/StringGen-inl.h +++ b/folly/experimental/StringGen-inl.h @@ -1,5 +1,5 @@ /* - * Copyright 2012 Facebook, Inc. + * Copyright 2013 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ #error This file may only be included from folly/experimental/StringGen.h #endif -#include "folly/experimental/io/IOBuf.h" +#include "folly/io/IOBuf.h" namespace folly { namespace gen { diff --git a/folly/io/Cursor.h b/folly/io/Cursor.h new file mode 100644 index 00000000..811c2e6a --- /dev/null +++ b/folly/io/Cursor.h @@ -0,0 +1,480 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_CURSOR_H +#define FOLLY_CURSOR_H + +#include +#include +#include +#include +#include + +#include "folly/Bits.h" +#include "folly/io/IOBuf.h" +#include "folly/Likely.h" + +/** + * Cursor class for fast iteration over IOBuf chains. + * + * Cursor - Read-only access + * + * RWPrivateCursor - Read-write access, assumes private access to IOBuf chain + * RWUnshareCursor - Read-write access, calls unshare on write (COW) + * Appender - Write access, assumes private access to IOBuf chian + * + * Note that RW cursors write in the preallocated part of buffers (that is, + * between the buffer's data() and tail()), while Appenders append to the end + * of the buffer (between the buffer's tail() and bufferEnd()). Appenders + * automatically adjust the buffer pointers, so you may only use one + * Appender with a buffer chain; for this reason, Appenders assume private + * access to the buffer (you need to call unshare() yourself if necessary). + **/ +namespace folly { namespace io { +namespace detail { + +template +class CursorBase { + public: + const uint8_t* data() const { + return crtBuf_->data() + offset_; + } + + // Space available in the current IOBuf. May be 0; use peek() instead which + // will always point to a non-empty chunk of data or at the end of the + // chain. + size_t length() const { + return crtBuf_->length() - offset_; + } + + Derived& operator+=(size_t offset) { + Derived* p = static_cast(this); + p->skip(offset); + return *p; + } + + template + typename std::enable_if::value, T>::type + read() { + T val; + pull(&val, sizeof(T)); + return val; + } + + template + T readBE() { + return Endian::big(read()); + } + + template + T readLE() { + return Endian::little(read()); + } + + explicit CursorBase(BufType* buf) + : crtBuf_(buf) + , offset_(0) + , buffer_(buf) {} + + // Make all the templated classes friends for copy constructor. + template friend class CursorBase; + + template + explicit CursorBase(const T& cursor) { + crtBuf_ = cursor.crtBuf_; + offset_ = cursor.offset_; + buffer_ = cursor.buffer_; + } + + // reset cursor to point to a new buffer. + void reset(BufType* buf) { + crtBuf_ = buf; + buffer_ = buf; + offset_ = 0; + } + + /** + * Return the available data in the current buffer. + * If you want to gather more data from the chain into a contiguous region + * (for hopefully zero-copy access), use gather() before peek(). + */ + std::pair peek() { + // Ensure that we're pointing to valid data + size_t available = length(); + while (UNLIKELY(available == 0 && tryAdvanceBuffer())) { + available = length(); + } + + return std::make_pair(data(), available); + } + + void pull(void* buf, size_t length) { + if (UNLIKELY(pullAtMost(buf, length) != length)) { + throw std::out_of_range("underflow"); + } + } + + void clone(std::unique_ptr& buf, size_t length) { + if (UNLIKELY(cloneAtMost(buf, length) != length)) { + throw std::out_of_range("underflow"); + } + } + + void skip(size_t length) { + if (UNLIKELY(skipAtMost(length) != length)) { + throw std::out_of_range("underflow"); + } + } + + size_t pullAtMost(void* buf, size_t len) { + uint8_t* p = reinterpret_cast(buf); + size_t copied = 0; + for (;;) { + // Fast path: it all fits in one buffer. + size_t available = length(); + if (LIKELY(available >= len)) { + memcpy(p, data(), len); + offset_ += len; + return copied + len; + } + + memcpy(p, data(), available); + copied += available; + if (UNLIKELY(!tryAdvanceBuffer())) { + return copied; + } + p += available; + len -= available; + } + } + + size_t cloneAtMost(std::unique_ptr& buf, size_t len) { + buf.reset(nullptr); + + std::unique_ptr tmp; + size_t copied = 0; + for (;;) { + // Fast path: it all fits in one buffer. + size_t available = length(); + if (LIKELY(available >= len)) { + tmp = crtBuf_->cloneOne(); + tmp->trimStart(offset_); + tmp->trimEnd(tmp->length() - len); + offset_ += len; + if (!buf) { + buf = std::move(tmp); + } else { + buf->prependChain(std::move(tmp)); + } + return copied + len; + } + + tmp = crtBuf_->cloneOne(); + tmp->trimStart(offset_); + if (!buf) { + buf = std::move(tmp); + } else { + buf->prependChain(std::move(tmp)); + } + + copied += available; + if (UNLIKELY(!tryAdvanceBuffer())) { + return copied; + } + len -= available; + } + } + + size_t skipAtMost(size_t len) { + size_t skipped = 0; + for (;;) { + // Fast path: it all fits in one buffer. + size_t available = length(); + if (LIKELY(available >= len)) { + offset_ += len; + return skipped + len; + } + + skipped += available; + if (UNLIKELY(!tryAdvanceBuffer())) { + return skipped; + } + len -= available; + } + } + + protected: + BufType* crtBuf_; + size_t offset_; + + ~CursorBase(){} + + bool tryAdvanceBuffer() { + BufType* nextBuf = crtBuf_->next(); + if (UNLIKELY(nextBuf == buffer_)) { + offset_ = crtBuf_->length(); + return false; + } + + offset_ = 0; + crtBuf_ = nextBuf; + static_cast(this)->advanceDone(); + return true; + } + + private: + void advanceDone() { + } + + BufType* buffer_; +}; + +template +class Writable { + public: + template + typename std::enable_if::value>::type + write(T value) { + const uint8_t* u8 = reinterpret_cast(&value); + push(u8, sizeof(T)); + } + + template + void writeBE(T value) { + write(Endian::big(value)); + } + + template + void writeLE(T value) { + write(Endian::little(value)); + } + + void push(const uint8_t* buf, size_t len) { + Derived* d = static_cast(this); + if (d->pushAtMost(buf, len) != len) { + throw std::out_of_range("overflow"); + } + } +}; + +} // namespace detail + +class Cursor : public detail::CursorBase { + public: + explicit Cursor(const IOBuf* buf) + : detail::CursorBase(buf) {} + + template + explicit Cursor(CursorType& cursor) + : detail::CursorBase(cursor) {} +}; + +enum class CursorAccess { + PRIVATE, + UNSHARE +}; + +template +class RWCursor + : public detail::CursorBase, IOBuf>, + public detail::Writable> { + friend class detail::CursorBase, IOBuf>; + public: + explicit RWCursor(IOBuf* buf) + : detail::CursorBase, IOBuf>(buf), + maybeShared_(true) {} + + template + explicit RWCursor(CursorType& cursor) + : detail::CursorBase, IOBuf>(cursor), + maybeShared_(true) {} + /** + * Gather at least n bytes contiguously into the current buffer, + * by coalescing subsequent buffers from the chain as necessary. + */ + void gather(size_t n) { + this->crtBuf_->gather(this->offset_ + n); + } + + size_t pushAtMost(const uint8_t* buf, size_t len) { + size_t copied = 0; + for (;;) { + // Fast path: the current buffer is big enough. + size_t available = this->length(); + if (LIKELY(available >= len)) { + if (access == CursorAccess::UNSHARE) { + maybeUnshare(); + } + memcpy(writableData(), buf, len); + this->offset_ += len; + return copied + len; + } + + if (access == CursorAccess::UNSHARE) { + maybeUnshare(); + } + memcpy(writableData(), buf, available); + copied += available; + if (UNLIKELY(!this->tryAdvanceBuffer())) { + return copied; + } + buf += available; + len -= available; + } + } + + void insert(std::unique_ptr buf) { + folly::IOBuf* nextBuf; + if (this->offset_ == 0) { + // Can just prepend + nextBuf = buf.get(); + this->crtBuf_->prependChain(std::move(buf)); + } else { + std::unique_ptr remaining; + if (this->crtBuf_->length() - this->offset_ > 0) { + // Need to split current IOBuf in two. + remaining = this->crtBuf_->cloneOne(); + remaining->trimStart(this->offset_); + nextBuf = remaining.get(); + buf->prependChain(std::move(remaining)); + } else { + // Can just append + nextBuf = this->crtBuf_->next(); + } + this->crtBuf_->trimEnd(this->length()); + this->crtBuf_->appendChain(std::move(buf)); + } + // Jump past the new links + this->offset_ = 0; + this->crtBuf_ = nextBuf; + } + + uint8_t* writableData() { + return this->crtBuf_->writableData() + this->offset_; + } + + private: + void maybeUnshare() { + if (UNLIKELY(maybeShared_)) { + this->crtBuf_->unshareOne(); + maybeShared_ = false; + } + } + + void advanceDone() { + maybeShared_ = true; + } + + bool maybeShared_; +}; + +typedef RWCursor RWPrivateCursor; +typedef RWCursor RWUnshareCursor; + +/** + * Append to the end of a buffer chain, growing the chain (by allocating new + * buffers) in increments of at least growth bytes every time. Won't grow + * (and push() and ensure() will throw) if growth == 0. + * + * TODO(tudorb): add a flavor of Appender that reallocates one IOBuf instead + * of chaining. + */ +class Appender : public detail::Writable { + public: + Appender(IOBuf* buf, uint32_t growth) + : buffer_(buf), + crtBuf_(buf->prev()), + growth_(growth) { + } + + uint8_t* writableData() { + return crtBuf_->writableTail(); + } + + size_t length() const { + return crtBuf_->tailroom(); + } + + /** + * Mark n bytes (must be <= length()) as appended, as per the + * IOBuf::append() method. + */ + void append(size_t n) { + crtBuf_->append(n); + } + + /** + * Ensure at least n contiguous bytes available to write. + * Postcondition: length() >= n. + */ + void ensure(uint32_t n) { + if (LIKELY(length() >= n)) { + return; + } + + // Waste the rest of the current buffer and allocate a new one. + // Don't make it too small, either. + if (growth_ == 0) { + throw std::out_of_range("can't grow buffer chain"); + } + + n = std::max(n, growth_); + buffer_->prependChain(IOBuf::create(n)); + crtBuf_ = buffer_->prev(); + } + + size_t pushAtMost(const uint8_t* buf, size_t len) { + size_t copied = 0; + for (;;) { + // Fast path: it all fits in one buffer. + size_t available = length(); + if (LIKELY(available >= len)) { + memcpy(writableData(), buf, len); + append(len); + return copied + len; + } + + memcpy(writableData(), buf, available); + append(available); + copied += available; + if (UNLIKELY(!tryGrowChain())) { + return copied; + } + buf += available; + len -= available; + } + } + + private: + bool tryGrowChain() { + assert(crtBuf_->next() == buffer_); + if (growth_ == 0) { + return false; + } + + buffer_->prependChain(IOBuf::create(growth_)); + crtBuf_ = buffer_->prev(); + return true; + } + + IOBuf* buffer_; + IOBuf* crtBuf_; + uint32_t growth_; +}; + +}} // folly::io + +#endif // FOLLY_CURSOR_H diff --git a/folly/io/IOBuf.cpp b/folly/io/IOBuf.cpp new file mode 100644 index 00000000..aac0be51 --- /dev/null +++ b/folly/io/IOBuf.cpp @@ -0,0 +1,646 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define __STDC_LIMIT_MACROS + +#include "folly/io/IOBuf.h" + +#include "folly/Malloc.h" +#include "folly/Likely.h" + +#include +#include +#include +#include + +using std::unique_ptr; + +namespace folly { + +const uint32_t IOBuf::kMaxIOBufSize; +// Note: Applying offsetof() to an IOBuf is legal according to C++11, since +// IOBuf is a standard-layout class. However, this isn't legal with earlier +// C++ standards, which require that offsetof() only be used with POD types. +// +// This code compiles with g++ 4.6, but not with g++ 4.4 or earlier versions. +const uint32_t IOBuf::kMaxInternalDataSize = + kMaxIOBufSize - offsetof(folly::IOBuf, int_.buf); + +IOBuf::SharedInfo::SharedInfo() + : freeFn(NULL), + userData(NULL) { + // Use relaxed memory ordering here. Since we are creating a new SharedInfo, + // no other threads should be referring to it yet. + refcount.store(1, std::memory_order_relaxed); +} + +IOBuf::SharedInfo::SharedInfo(FreeFunction fn, void* arg) + : freeFn(fn), + userData(arg) { + // Use relaxed memory ordering here. Since we are creating a new SharedInfo, + // no other threads should be referring to it yet. + refcount.store(1, std::memory_order_relaxed); +} + +void* IOBuf::operator new(size_t size) { + // Since IOBuf::create() manually allocates space for some IOBuf objects + // using malloc(), override operator new so that all IOBuf objects are + // always allocated using malloc(). This way operator delete can always know + // that free() is the correct way to deallocate the memory. + void* ptr = malloc(size); + + // operator new is not allowed to return NULL + if (UNLIKELY(ptr == NULL)) { + throw std::bad_alloc(); + } + + return ptr; +} + +void* IOBuf::operator new(size_t size, void* ptr) { + assert(size <= kMaxIOBufSize); + return ptr; +} + +void IOBuf::operator delete(void* ptr) { + // For small buffers, IOBuf::create() manually allocates the space for the + // IOBuf object using malloc(). Therefore we override delete to ensure that + // the IOBuf space is freed using free() rather than a normal delete. + free(ptr); +} + +unique_ptr IOBuf::create(uint32_t capacity) { + // If the desired capacity is less than kMaxInternalDataSize, + // just allocate a single region large enough for both the IOBuf header and + // the data. + if (capacity <= kMaxInternalDataSize) { + void* buf = malloc(kMaxIOBufSize); + if (UNLIKELY(buf == NULL)) { + throw std::bad_alloc(); + } + + uint8_t* bufEnd = static_cast(buf) + kMaxIOBufSize; + unique_ptr iobuf(new(buf) IOBuf(bufEnd)); + assert(iobuf->capacity() >= capacity); + return iobuf; + } + + // Allocate an external buffer + uint8_t* buf; + SharedInfo* sharedInfo; + uint32_t actualCapacity; + allocExtBuffer(capacity, &buf, &sharedInfo, &actualCapacity); + + // Allocate the IOBuf header + try { + return unique_ptr(new IOBuf(kExtAllocated, 0, + buf, actualCapacity, + buf, 0, + sharedInfo)); + } catch (...) { + free(buf); + throw; + } +} + +unique_ptr IOBuf::takeOwnership(void* buf, uint32_t capacity, + uint32_t length, + FreeFunction freeFn, + void* userData, + bool freeOnError) { + SharedInfo* sharedInfo = NULL; + try { + sharedInfo = new SharedInfo(freeFn, userData); + + uint8_t* bufPtr = static_cast(buf); + return unique_ptr(new IOBuf(kExtUserSupplied, kFlagFreeSharedInfo, + bufPtr, capacity, + bufPtr, length, + sharedInfo)); + } catch (...) { + delete sharedInfo; + if (freeOnError) { + if (freeFn) { + try { + freeFn(buf, userData); + } catch (...) { + // The user's free function is not allowed to throw. + abort(); + } + } else { + free(buf); + } + } + throw; + } +} + +unique_ptr IOBuf::wrapBuffer(const void* buf, uint32_t capacity) { + // We cast away the const-ness of the buffer here. + // This is okay since IOBuf users must use unshare() to create a copy of + // this buffer before writing to the buffer. + uint8_t* bufPtr = static_cast(const_cast(buf)); + return unique_ptr(new IOBuf(kExtUserSupplied, kFlagUserOwned, + bufPtr, capacity, + bufPtr, capacity, + NULL)); +} + +IOBuf::IOBuf(uint8_t* end) + : next_(this), + prev_(this), + data_(int_.buf), + length_(0), + flags_(0) { + assert(end - int_.buf == kMaxInternalDataSize); + assert(end - reinterpret_cast(this) == kMaxIOBufSize); +} + +IOBuf::IOBuf(ExtBufTypeEnum type, + uint32_t flags, + uint8_t* buf, + uint32_t capacity, + uint8_t* data, + uint32_t length, + SharedInfo* sharedInfo) + : next_(this), + prev_(this), + data_(data), + length_(length), + flags_(kFlagExt | flags) { + ext_.capacity = capacity; + ext_.type = type; + ext_.buf = buf; + ext_.sharedInfo = sharedInfo; + + assert(data >= buf); + assert(data + length <= buf + capacity); + assert(static_cast(flags & kFlagUserOwned) == + (sharedInfo == NULL)); +} + +IOBuf::~IOBuf() { + // Destroying an IOBuf destroys the entire chain. + // Users of IOBuf should only explicitly delete the head of any chain. + // The other elements in the chain will be automatically destroyed. + while (next_ != this) { + // Since unlink() returns unique_ptr() and we don't store it, + // it will automatically delete the unlinked element. + (void)next_->unlink(); + } + + if (flags_ & kFlagExt) { + decrementRefcount(); + } +} + +bool IOBuf::empty() const { + const IOBuf* current = this; + do { + if (current->length() != 0) { + return false; + } + current = current->next_; + } while (current != this); + return true; +} + +uint32_t IOBuf::countChainElements() const { + uint32_t numElements = 1; + for (IOBuf* current = next_; current != this; current = current->next_) { + ++numElements; + } + return numElements; +} + +uint64_t IOBuf::computeChainDataLength() const { + uint64_t fullLength = length_; + for (IOBuf* current = next_; current != this; current = current->next_) { + fullLength += current->length_; + } + return fullLength; +} + +void IOBuf::prependChain(unique_ptr&& iobuf) { + // Take ownership of the specified IOBuf + IOBuf* other = iobuf.release(); + + // Remember the pointer to the tail of the other chain + IOBuf* otherTail = other->prev_; + + // Hook up prev_->next_ to point at the start of the other chain, + // and other->prev_ to point at prev_ + prev_->next_ = other; + other->prev_ = prev_; + + // Hook up otherTail->next_ to point at us, + // and prev_ to point back at otherTail, + otherTail->next_ = this; + prev_ = otherTail; +} + +unique_ptr IOBuf::clone() const { + unique_ptr newHead(cloneOne()); + + for (IOBuf* current = next_; current != this; current = current->next_) { + newHead->prependChain(current->cloneOne()); + } + + return newHead; +} + +unique_ptr IOBuf::cloneOne() const { + if (flags_ & kFlagExt) { + unique_ptr iobuf(new IOBuf(static_cast(ext_.type), + flags_, ext_.buf, ext_.capacity, + data_, length_, + ext_.sharedInfo)); + if (ext_.sharedInfo) { + ext_.sharedInfo->refcount.fetch_add(1, std::memory_order_acq_rel); + } + return iobuf; + } else { + // We have an internal data buffer that cannot be shared + // Allocate a new IOBuf and copy the data into it. + unique_ptr iobuf(IOBuf::create(kMaxInternalDataSize)); + assert((iobuf->flags_ & kFlagExt) == 0); + iobuf->data_ += headroom(); + memcpy(iobuf->data_, data_, length_); + iobuf->length_ = length_; + return iobuf; + } +} + +void IOBuf::unshareOneSlow() { + // Internal buffers are always unshared, so unshareOneSlow() can only be + // called for external buffers + assert(flags_ & kFlagExt); + + // Allocate a new buffer for the data + uint8_t* buf; + SharedInfo* sharedInfo; + uint32_t actualCapacity; + allocExtBuffer(ext_.capacity, &buf, &sharedInfo, &actualCapacity); + + // Copy the data + // Maintain the same amount of headroom. Since we maintained the same + // minimum capacity we also maintain at least the same amount of tailroom. + uint32_t headlen = headroom(); + memcpy(buf + headlen, data_, length_); + + // Release our reference on the old buffer + decrementRefcount(); + // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo + // are not set. + flags_ = kFlagExt; + + // Update the buffer pointers to point to the new buffer + data_ = buf + headlen; + ext_.buf = buf; + ext_.sharedInfo = sharedInfo; +} + +void IOBuf::unshareChained() { + // unshareChained() should only be called if we are part of a chain of + // multiple IOBufs. The caller should have already verified this. + assert(isChained()); + + IOBuf* current = this; + while (true) { + if (current->isSharedOne()) { + // we have to unshare + break; + } + + current = current->next_; + if (current == this) { + // None of the IOBufs in the chain are shared, + // so return without doing anything + return; + } + } + + // We have to unshare. Let coalesceSlow() do the work. + coalesceSlow(); +} + +void IOBuf::coalesceSlow(size_t maxLength) { + // coalesceSlow() should only be called if we are part of a chain of multiple + // IOBufs. The caller should have already verified this. + assert(isChained()); + assert(length_ < maxLength); + + // Compute the length of the entire chain + uint64_t newLength = 0; + IOBuf* end = this; + do { + newLength += end->length_; + end = end->next_; + } while (newLength < maxLength && end != this); + + uint64_t newHeadroom = headroom(); + uint64_t newTailroom = end->prev_->tailroom(); + coalesceAndReallocate(newHeadroom, newLength, end, newTailroom); + // We should be only element left in the chain now + assert(length_ >= maxLength || !isChained()); +} + +void IOBuf::coalesceAndReallocate(size_t newHeadroom, + size_t newLength, + IOBuf* end, + size_t newTailroom) { + uint64_t newCapacity = newLength + newHeadroom + newTailroom; + if (newCapacity > UINT32_MAX) { + throw std::overflow_error("IOBuf chain too large to coalesce"); + } + + // Allocate space for the coalesced buffer. + // We always convert to an external buffer, even if we happened to be an + // internal buffer before. + uint8_t* newBuf; + SharedInfo* newInfo; + uint32_t actualCapacity; + allocExtBuffer(newCapacity, &newBuf, &newInfo, &actualCapacity); + + // Copy the data into the new buffer + uint8_t* newData = newBuf + newHeadroom; + uint8_t* p = newData; + IOBuf* current = this; + size_t remaining = newLength; + do { + assert(current->length_ <= remaining); + remaining -= current->length_; + memcpy(p, current->data_, current->length_); + p += current->length_; + current = current->next_; + } while (current != end); + assert(remaining == 0); + + // Point at the new buffer + if (flags_ & kFlagExt) { + decrementRefcount(); + } + + // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo + // are not set. + flags_ = kFlagExt; + + ext_.capacity = actualCapacity; + ext_.type = kExtAllocated; + ext_.buf = newBuf; + ext_.sharedInfo = newInfo; + data_ = newData; + length_ = newLength; + + // Separate from the rest of our chain. + // Since we don't store the unique_ptr returned by separateChain(), + // this will immediately delete the returned subchain. + if (isChained()) { + (void)separateChain(next_, current->prev_); + } +} + +void IOBuf::decrementRefcount() { + assert(flags_ & kFlagExt); + + // Externally owned buffers don't have a SharedInfo object and aren't managed + // by the reference count + if (flags_ & kFlagUserOwned) { + assert(ext_.sharedInfo == NULL); + return; + } + + // Decrement the refcount + uint32_t newcnt = ext_.sharedInfo->refcount.fetch_sub( + 1, std::memory_order_acq_rel); + // Note that fetch_sub() returns the value before we decremented. + // If it is 1, we were the only remaining user; if it is greater there are + // still other users. + if (newcnt > 1) { + return; + } + + // We were the last user. Free the buffer + if (ext_.sharedInfo->freeFn != NULL) { + try { + ext_.sharedInfo->freeFn(ext_.buf, ext_.sharedInfo->userData); + } catch (...) { + // The user's free function should never throw. Otherwise we might + // throw from the IOBuf destructor. Other code paths like coalesce() + // also assume that decrementRefcount() cannot throw. + abort(); + } + } else { + free(ext_.buf); + } + + // Free the SharedInfo if it was allocated separately. + // + // This is only used by takeOwnership(). + // + // To avoid this special case handling in decrementRefcount(), we could have + // takeOwnership() set a custom freeFn() that calls the user's free function + // then frees the SharedInfo object. (This would require that + // takeOwnership() store the user's free function with its allocated + // SharedInfo object.) However, handling this specially with a flag seems + // like it shouldn't be problematic. + if (flags_ & kFlagFreeSharedInfo) { + delete ext_.sharedInfo; + } +} + +void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { + size_t newCapacity = (size_t)length_ + minHeadroom + minTailroom; + CHECK_LT(newCapacity, UINT32_MAX); + + // We'll need to reallocate the buffer. + // There are a few options. + // - If we have enough total room, move the data around in the buffer + // and adjust the data_ pointer. + // - If we're using an internal buffer, we'll switch to an external + // buffer with enough headroom and tailroom. + // - If we have enough headroom (headroom() >= minHeadroom) but not too much + // (so we don't waste memory), we can try one of two things, depending on + // whether we use jemalloc or not: + // - If using jemalloc, we can try to expand in place, avoiding a memcpy() + // - If not using jemalloc and we don't have too much to copy, + // we'll use realloc() (note that realloc might have to copy + // headroom + data + tailroom, see smartRealloc in folly/Malloc.h) + // - Otherwise, bite the bullet and reallocate. + if (headroom() + tailroom() >= minHeadroom + minTailroom) { + uint8_t* newData = writableBuffer() + minHeadroom; + memmove(newData, data_, length_); + data_ = newData; + return; + } + + size_t newAllocatedCapacity = goodExtBufferSize(newCapacity); + uint8_t* newBuffer = nullptr; + uint32_t newHeadroom = 0; + uint32_t oldHeadroom = headroom(); + + if ((flags_ & kFlagExt) && length_ != 0 && oldHeadroom >= minHeadroom) { + if (usingJEMalloc()) { + size_t headSlack = oldHeadroom - minHeadroom; + // We assume that tailroom is more useful and more important than + // tailroom (not least because realloc / rallocm allow us to grow the + // buffer at the tail, but not at the head) So, if we have more headroom + // than we need, we consider that "wasted". We arbitrarily define "too + // much" headroom to be 25% of the capacity. + if (headSlack * 4 <= newCapacity) { + size_t allocatedCapacity = capacity() + sizeof(SharedInfo); + void* p = ext_.buf; + if (allocatedCapacity >= jemallocMinInPlaceExpandable) { + int r = rallocm(&p, &newAllocatedCapacity, newAllocatedCapacity, + 0, ALLOCM_NO_MOVE); + if (r == ALLOCM_SUCCESS) { + newBuffer = static_cast(p); + newHeadroom = oldHeadroom; + } else if (r == ALLOCM_ERR_OOM) { + // shouldn't happen as we don't actually allocate new memory + // (due to ALLOCM_NO_MOVE) + throw std::bad_alloc(); + } + // if ALLOCM_ERR_NOT_MOVED, do nothing, fall back to + // malloc/memcpy/free + } + } + } else { // Not using jemalloc + size_t copySlack = capacity() - length_; + if (copySlack * 2 <= length_) { + void* p = realloc(ext_.buf, newAllocatedCapacity); + if (UNLIKELY(p == nullptr)) { + throw std::bad_alloc(); + } + newBuffer = static_cast(p); + newHeadroom = oldHeadroom; + } + } + } + + // None of the previous reallocation strategies worked (or we're using + // an internal buffer). malloc/copy/free. + if (newBuffer == nullptr) { + void* p = malloc(newAllocatedCapacity); + if (UNLIKELY(p == nullptr)) { + throw std::bad_alloc(); + } + newBuffer = static_cast(p); + memcpy(newBuffer + minHeadroom, data_, length_); + if (flags_ & kFlagExt) { + free(ext_.buf); + } + newHeadroom = minHeadroom; + } + + SharedInfo* info; + uint32_t cap; + initExtBuffer(newBuffer, newAllocatedCapacity, &info, &cap); + + flags_ = kFlagExt; + + ext_.capacity = cap; + ext_.type = kExtAllocated; + ext_.buf = newBuffer; + ext_.sharedInfo = info; + data_ = newBuffer + newHeadroom; + // length_ is unchanged +} + +void IOBuf::allocExtBuffer(uint32_t minCapacity, + uint8_t** bufReturn, + SharedInfo** infoReturn, + uint32_t* capacityReturn) { + size_t mallocSize = goodExtBufferSize(minCapacity); + uint8_t* buf = static_cast(malloc(mallocSize)); + if (UNLIKELY(buf == NULL)) { + throw std::bad_alloc(); + } + initExtBuffer(buf, mallocSize, infoReturn, capacityReturn); + *bufReturn = buf; +} + +size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) { + // Determine how much space we should allocate. We'll store the SharedInfo + // for the external buffer just after the buffer itself. (We store it just + // after the buffer rather than just before so that the code can still just + // use free(ext_.buf) to free the buffer.) + size_t minSize = static_cast(minCapacity) + sizeof(SharedInfo); + // Add room for padding so that the SharedInfo will be aligned on an 8-byte + // boundary. + minSize = (minSize + 7) & ~7; + + // Use goodMallocSize() to bump up the capacity to a decent size to request + // from malloc, so we can use all of the space that malloc will probably give + // us anyway. + return goodMallocSize(minSize); +} + +void IOBuf::initExtBuffer(uint8_t* buf, size_t mallocSize, + SharedInfo** infoReturn, + uint32_t* capacityReturn) { + // Find the SharedInfo storage at the end of the buffer + // and construct the SharedInfo. + uint8_t* infoStart = (buf + mallocSize) - sizeof(SharedInfo); + SharedInfo* sharedInfo = new(infoStart) SharedInfo; + + size_t actualCapacity = infoStart - buf; + // On the unlikely possibility that the actual capacity is larger than can + // fit in a uint32_t after adding room for the refcount and calling + // goodMallocSize(), truncate downwards if necessary. + if (actualCapacity >= UINT32_MAX) { + *capacityReturn = UINT32_MAX; + } else { + *capacityReturn = actualCapacity; + } + + *infoReturn = sharedInfo; +} + +fbstring IOBuf::moveToFbString() { + // Externally allocated buffers (malloc) are just fine, everything else needs + // to be turned into one. + if (flags_ != kFlagExt || // not malloc()-ed + headroom() != 0 || // malloc()-ed block doesn't start at beginning + tailroom() == 0 || // no room for NUL terminator + isShared() || // shared + isChained()) { // chained + // We might as well get rid of all head and tailroom if we're going + // to reallocate; we need 1 byte for NUL terminator. + coalesceAndReallocate(0, computeChainDataLength(), this, 1); + } + + // Ensure NUL terminated + *writableTail() = 0; + fbstring str(reinterpret_cast(writableData()), + length(), capacity(), + AcquireMallocatedString()); + + // Reset to internal buffer. + flags_ = 0; + clear(); + return str; +} + +IOBuf::Iterator IOBuf::cbegin() const { + return Iterator(this, this); +} + +IOBuf::Iterator IOBuf::cend() const { + return Iterator(nullptr, nullptr); +} + +} // folly diff --git a/folly/io/IOBuf.h b/folly/io/IOBuf.h new file mode 100644 index 00000000..90dcb1e3 --- /dev/null +++ b/folly/io/IOBuf.h @@ -0,0 +1,1197 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_IO_IOBUF_H_ +#define FOLLY_IO_IOBUF_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "folly/FBString.h" +#include "folly/Range.h" + +namespace folly { + +/** + * An IOBuf is a pointer to a buffer of data. + * + * IOBuf objects are intended to be used primarily for networking code, and are + * modelled somewhat after FreeBSD's mbuf data structure, and Linux's sk_buff + * structure. + * + * IOBuf objects facilitate zero-copy network programming, by allowing multiple + * IOBuf objects to point to the same underlying buffer of data, using a + * reference count to track when the buffer is no longer needed and can be + * freed. + * + * + * Data Layout + * ----------- + * + * The IOBuf itself is a small object containing a pointer to the buffer and + * information about which segment of the buffer contains valid data. + * + * The data layout looks like this: + * + * +-------+ + * | IOBuf | + * +-------+ + * / + * | + * v + * +------------+--------------------+-----------+ + * | headroom | data | tailroom | + * +------------+--------------------+-----------+ + * ^ ^ ^ ^ + * buffer() data() tail() bufferEnd() + * + * The length() method returns the length of the valid data; capacity() + * returns the entire capacity of the buffer (from buffer() to bufferEnd()). + * The headroom() and tailroom() methods return the amount of unused capacity + * available before and after the data. + * + * + * Buffer Sharing + * -------------- + * + * The buffer itself is reference counted, and multiple IOBuf objects may point + * to the same buffer. Each IOBuf may point to a different section of valid + * data within the underlying buffer. For example, if multiple protocol + * requests are read from the network into a single buffer, a separate IOBuf + * may be created for each request, all sharing the same underlying buffer. + * + * In other words, when multiple IOBufs share the same underlying buffer, the + * data() and tail() methods on each IOBuf may point to a different segment of + * the data. However, the buffer() and bufferEnd() methods will point to the + * same location for all IOBufs sharing the same underlying buffer. + * + * +-----------+ +---------+ + * | IOBuf 1 | | IOBuf 2 | + * +-----------+ +---------+ + * | | _____/ | + * data | tail |/ data | tail + * v v v + * +-------------------------------------+ + * | | | | | + * +-------------------------------------+ + * + * If you only read data from an IOBuf, you don't need to worry about other + * IOBuf objects possibly sharing the same underlying buffer. However, if you + * ever write to the buffer you need to first ensure that no other IOBufs point + * to the same buffer. The unshare() method may be used to ensure that you + * have an unshared buffer. + * + * + * IOBuf Chains + * ------------ + * + * IOBuf objects also contain pointers to next and previous IOBuf objects. + * This can be used to represent a single logical piece of data that its stored + * in non-contiguous chunks in separate buffers. + * + * A single IOBuf object can only belong to one chain at a time. + * + * IOBuf chains are always circular. The "prev" pointer in the head of the + * chain points to the tail of the chain. However, it is up to the user to + * decide which IOBuf is the head. Internally the IOBuf code does not care + * which element is the head. + * + * The lifetime of all IOBufs in the chain are linked: when one element in the + * chain is deleted, all other chained elements are also deleted. Conceptually + * it is simplest to treat this as if the head of the chain owns all other + * IOBufs in the chain. When you delete the head of the chain, it will delete + * the other elements as well. For this reason, prependChain() and + * appendChain() take ownership of of the new elements being added to this + * chain. + * + * When the coalesce() method is used to coalesce an entire IOBuf chain into a + * single IOBuf, all other IOBufs in the chain are eliminated and automatically + * deleted. The unshare() method may coalesce the chain; if it does it will + * similarly delete all IOBufs eliminated from the chain. + * + * As discussed in the following section, it is up to the user to maintain a + * lock around the entire IOBuf chain if multiple threads need to access the + * chain. IOBuf does not provide any internal locking. + * + * + * Synchronization + * --------------- + * + * When used in multithread programs, a single IOBuf object should only be used + * in a single thread at a time. If a caller uses a single IOBuf across + * multiple threads the caller is responsible for using an external lock to + * synchronize access to the IOBuf. + * + * Two separate IOBuf objects may be accessed concurrently in separate threads + * without locking, even if they point to the same underlying buffer. The + * buffer reference count is always accessed atomically, and no other + * operations should affect other IOBufs that point to the same data segment. + * The caller is responsible for using unshare() to ensure that the data buffer + * is not shared by other IOBufs before writing to it, and this ensures that + * the data itself is not modified in one thread while also being accessed from + * another thread. + * + * For IOBuf chains, no two IOBufs in the same chain should be accessed + * simultaneously in separate threads. The caller must maintain a lock around + * the entire chain if the chain, or individual IOBufs in the chain, may be + * accessed by multiple threads. + * + * + * IOBuf Object Allocation/Sharing + * ------------------------------- + * + * IOBuf objects themselves are always allocated on the heap. The IOBuf + * constructors are private, so IOBuf objects may not be created on the stack. + * In part this is done since some IOBuf objects use small-buffer optimization + * and contain the buffer data immediately after the IOBuf object itself. The + * coalesce() and unshare() methods also expect to be able to delete subsequent + * IOBuf objects in the chain if they are no longer needed due to coalescing. + * + * The IOBuf structure also does not provide room for an intrusive refcount on + * the IOBuf object itself, only the underlying data buffer is reference + * counted. If users want to share the same IOBuf object between multiple + * parts of the code, they are responsible for managing this sharing on their + * own. (For example, by using a shared_ptr. Alternatively, users always have + * the option of using clone() to create a second IOBuf that points to the same + * underlying buffer.) + * + * With jemalloc, allocating small objects like IOBuf objects should be + * relatively fast, and the cost of allocating IOBuf objects on the heap and + * cloning new IOBufs should be relatively cheap. + */ +namespace detail { +// Is T a unique_ptr<> to a standard-layout type? +template struct IsUniquePtrToSL + : public std::false_type { }; +template +struct IsUniquePtrToSL< + std::unique_ptr, + typename std::enable_if::value>::type> + : public std::true_type { }; +} // namespace detail + +class IOBuf { + public: + class Iterator; + + typedef ByteRange value_type; + typedef Iterator iterator; + typedef Iterator const_iterator; + + typedef void (*FreeFunction)(void* buf, void* userData); + + /** + * Allocate a new IOBuf object with the requested capacity. + * + * Returns a new IOBuf object that must be (eventually) deleted by the + * caller. The returned IOBuf may actually have slightly more capacity than + * requested. + * + * The data pointer will initially point to the start of the newly allocated + * buffer, and will have a data length of 0. + * + * Throws std::bad_alloc on error. + */ + static std::unique_ptr create(uint32_t capacity); + + /** + * Create a new IOBuf pointing to an existing data buffer. + * + * The new IOBuffer will assume ownership of the buffer, and free it by + * calling the specified FreeFunction when the last IOBuf pointing to this + * buffer is destroyed. The function will be called with a pointer to the + * buffer as the first argument, and the supplied userData value as the + * second argument. The free function must never throw exceptions. + * + * If no FreeFunction is specified, the buffer will be freed using free(). + * + * The IOBuf data pointer will initially point to the start of the buffer, + * + * In the first version of this function, the length of data is unspecified + * and is initialized to the capacity of the buffer + * + * In the second version, the user specifies the valid length of data + * in the buffer + * + * On error, std::bad_alloc will be thrown. If freeOnError is true (the + * default) the buffer will be freed before throwing the error. + */ + static std::unique_ptr takeOwnership(void* buf, uint32_t capacity, + FreeFunction freeFn = NULL, + void* userData = NULL, + bool freeOnError = true) { + return takeOwnership(buf, capacity, capacity, freeFn, + userData, freeOnError); + } + + static std::unique_ptr takeOwnership(void* buf, uint32_t capacity, + uint32_t length, + FreeFunction freeFn = NULL, + void* userData = NULL, + bool freeOnError = true); + + /** + * Create a new IOBuf pointing to an existing data buffer made up of + * count objects of a given standard-layout type. + * + * This is dangerous -- it is essentially equivalent to doing + * reinterpret_cast on your data -- but it's often useful + * for serialization / deserialization. + * + * The new IOBuffer will assume ownership of the buffer, and free it + * appropriately (by calling the UniquePtr's custom deleter, or by calling + * delete or delete[] appropriately if there is no custom deleter) + * when the buffer is destroyed. The custom deleter, if any, must never + * throw exceptions. + * + * The IOBuf data pointer will initially point to the start of the buffer, + * and the length will be the full capacity of the buffer (count * + * sizeof(T)). + * + * On error, std::bad_alloc will be thrown, and the buffer will be freed + * before throwing the error. + */ + template + static typename std::enable_if::value, + std::unique_ptr>::type + takeOwnership(UniquePtr&& buf, size_t count=1); + + /** + * Create a new IOBuf object that points to an existing user-owned buffer. + * + * This should only be used when the caller knows the lifetime of the IOBuf + * object ahead of time and can ensure that all IOBuf objects that will point + * to this buffer will be destroyed before the buffer itself is destroyed. + * + * This buffer will not be freed automatically when the last IOBuf + * referencing it is destroyed. It is the caller's responsibility to free + * the buffer after the last IOBuf has been destroyed. + * + * The IOBuf data pointer will initially point to the start of the buffer, + * and the length will be the full capacity of the buffer. + * + * An IOBuf created using wrapBuffer() will always be reported as shared. + * unshare() may be used to create a writable copy of the buffer. + * + * On error, std::bad_alloc will be thrown. + */ + static std::unique_ptr wrapBuffer(const void* buf, uint32_t capacity); + + /** + * Convenience function to create a new IOBuf object that copies data from a + * user-supplied buffer, optionally allocating a given amount of + * headroom and tailroom. + */ + static std::unique_ptr copyBuffer(const void* buf, uint32_t size, + uint32_t headroom=0, + uint32_t minTailroom=0); + + /** + * Convenience function to create a new IOBuf object that copies data from a + * user-supplied string, optionally allocating a given amount of + * headroom and tailroom. + * + * Beware when attempting to invoke this function with a constant string + * literal and a headroom argument: you will likely end up invoking the + * version of copyBuffer() above. IOBuf::copyBuffer("hello", 3) will treat + * the first argument as a const void*, and will invoke the version of + * copyBuffer() above, with the size argument of 3. + */ + static std::unique_ptr copyBuffer(const std::string& buf, + uint32_t headroom=0, + uint32_t minTailroom=0); + + /** + * A version of copyBuffer() that returns a null pointer if the input string + * is empty. + */ + static std::unique_ptr maybeCopyBuffer(const std::string& buf, + uint32_t headroom=0, + uint32_t minTailroom=0); + + /** + * Convenience function to free a chain of IOBufs held by a unique_ptr. + */ + static void destroy(std::unique_ptr&& data) { + auto destroyer = std::move(data); + } + + /** + * Destroy this IOBuf. + * + * Deleting an IOBuf will automatically destroy all IOBufs in the chain. + * (See the comments above regarding the ownership model of IOBuf chains. + * All subsequent IOBufs in the chain are considered to be owned by the head + * of the chain. Users should only explicitly delete the head of a chain.) + * + * When each individual IOBuf is destroyed, it will release its reference + * count on the underlying buffer. If it was the last user of the buffer, + * the buffer will be freed. + */ + ~IOBuf(); + + /** + * Check whether the chain is empty (i.e., whether the IOBufs in the + * chain have a total data length of zero). + * + * This method is semantically equivalent to + * i->computeChainDataLength()==0 + * but may run faster because it can short-circuit as soon as it + * encounters a buffer with length()!=0 + */ + bool empty() const; + + /** + * Get the pointer to the start of the data. + */ + const uint8_t* data() const { + return data_; + } + + /** + * Get a writable pointer to the start of the data. + * + * The caller is responsible for calling unshare() first to ensure that it is + * actually safe to write to the buffer. + */ + uint8_t* writableData() { + return data_; + } + + /** + * Get the pointer to the end of the data. + */ + const uint8_t* tail() const { + return data_ + length_; + } + + /** + * Get a writable pointer to the end of the data. + * + * The caller is responsible for calling unshare() first to ensure that it is + * actually safe to write to the buffer. + */ + uint8_t* writableTail() { + return data_ + length_; + } + + /** + * Get the data length. + */ + uint32_t length() const { + return length_; + } + + /** + * Get the amount of head room. + * + * Returns the number of bytes in the buffer before the start of the data. + */ + uint32_t headroom() const { + return data_ - buffer(); + } + + /** + * Get the amount of tail room. + * + * Returns the number of bytes in the buffer after the end of the data. + */ + uint32_t tailroom() const { + return bufferEnd() - tail(); + } + + /** + * Get the pointer to the start of the buffer. + * + * Note that this is the pointer to the very beginning of the usable buffer, + * not the start of valid data within the buffer. Use the data() method to + * get a pointer to the start of the data within the buffer. + */ + const uint8_t* buffer() const { + return (flags_ & kFlagExt) ? ext_.buf : int_.buf; + } + + /** + * Get a writable pointer to the start of the buffer. + * + * The caller is responsible for calling unshare() first to ensure that it is + * actually safe to write to the buffer. + */ + uint8_t* writableBuffer() { + return (flags_ & kFlagExt) ? ext_.buf : int_.buf; + } + + /** + * Get the pointer to the end of the buffer. + * + * Note that this is the pointer to the very end of the usable buffer, + * not the end of valid data within the buffer. Use the tail() method to + * get a pointer to the end of the data within the buffer. + */ + const uint8_t* bufferEnd() const { + return (flags_ & kFlagExt) ? + ext_.buf + ext_.capacity : + int_.buf + kMaxInternalDataSize; + } + + /** + * Get the total size of the buffer. + * + * This returns the total usable length of the buffer. Use the length() + * method to get the length of the actual valid data in this IOBuf. + */ + uint32_t capacity() const { + return (flags_ & kFlagExt) ? ext_.capacity : kMaxInternalDataSize; + } + + /** + * Get a pointer to the next IOBuf in this chain. + */ + IOBuf* next() { + return next_; + } + const IOBuf* next() const { + return next_; + } + + /** + * Get a pointer to the previous IOBuf in this chain. + */ + IOBuf* prev() { + return prev_; + } + const IOBuf* prev() const { + return prev_; + } + + /** + * Shift the data forwards in the buffer. + * + * This shifts the data pointer forwards in the buffer to increase the + * headroom. This is commonly used to increase the headroom in a newly + * allocated buffer. + * + * The caller is responsible for ensuring that there is sufficient + * tailroom in the buffer before calling advance(). + * + * If there is a non-zero data length, advance() will use memmove() to shift + * the data forwards in the buffer. In this case, the caller is responsible + * for making sure the buffer is unshared, so it will not affect other IOBufs + * that may be sharing the same underlying buffer. + */ + void advance(uint32_t amount) { + // In debug builds, assert if there is a problem. + assert(amount <= tailroom()); + + if (length_ > 0) { + memmove(data_ + amount, data_, length_); + } + data_ += amount; + } + + /** + * Shift the data backwards in the buffer. + * + * The caller is responsible for ensuring that there is sufficient headroom + * in the buffer before calling retreat(). + * + * If there is a non-zero data length, retreat() will use memmove() to shift + * the data backwards in the buffer. In this case, the caller is responsible + * for making sure the buffer is unshared, so it will not affect other IOBufs + * that may be sharing the same underlying buffer. + */ + void retreat(uint32_t amount) { + // In debug builds, assert if there is a problem. + assert(amount <= headroom()); + + if (length_ > 0) { + memmove(data_ - amount, data_, length_); + } + data_ -= amount; + } + + /** + * Adjust the data pointer to include more valid data at the beginning. + * + * This moves the data pointer backwards to include more of the available + * buffer. The caller is responsible for ensuring that there is sufficient + * headroom for the new data. The caller is also responsible for populating + * this section with valid data. + * + * This does not modify any actual data in the buffer. + */ + void prepend(uint32_t amount) { + CHECK(amount <= headroom()); + data_ -= amount; + length_ += amount; + } + + /** + * Adjust the tail pointer to include more valid data at the end. + * + * This moves the tail pointer forwards to include more of the available + * buffer. The caller is responsible for ensuring that there is sufficient + * tailroom for the new data. The caller is also responsible for populating + * this section with valid data. + * + * This does not modify any actual data in the buffer. + */ + void append(uint32_t amount) { + CHECK(amount <= tailroom()); + length_ += amount; + } + + /** + * Adjust the data pointer forwards to include less valid data. + * + * This moves the data pointer forwards so that the first amount bytes are no + * longer considered valid data. The caller is responsible for ensuring that + * amount is less than or equal to the actual data length. + * + * This does not modify any actual data in the buffer. + */ + void trimStart(uint32_t amount) { + CHECK(amount <= length_); + data_ += amount; + length_ -= amount; + } + + /** + * Adjust the tail pointer backwards to include less valid data. + * + * This moves the tail pointer backwards so that the last amount bytes are no + * longer considered valid data. The caller is responsible for ensuring that + * amount is less than or equal to the actual data length. + * + * This does not modify any actual data in the buffer. + */ + void trimEnd(uint32_t amount) { + CHECK(amount <= length_); + length_ -= amount; + } + + /** + * Clear the buffer. + * + * Postcondition: headroom() == 0, length() == 0, tailroom() == capacity() + */ + void clear() { + data_ = writableBuffer(); + length_ = 0; + } + + /** + * Ensure that this buffer has at least minHeadroom headroom bytes and at + * least minTailroom tailroom bytes. The buffer must be writable + * (you must call unshare() before this, if necessary). + * + * Postcondition: headroom() >= minHeadroom, tailroom() >= minTailroom, + * the data (between data() and data() + length()) is preserved. + */ + void reserve(uint32_t minHeadroom, uint32_t minTailroom) { + // Maybe we don't need to do anything. + if (headroom() >= minHeadroom && tailroom() >= minTailroom) { + return; + } + // If the buffer is empty but we have enough total room (head + tail), + // move the data_ pointer around. + if (length() == 0 && + headroom() + tailroom() >= minHeadroom + minTailroom) { + data_ = writableBuffer() + minHeadroom; + return; + } + // Bah, we have to do actual work. + reserveSlow(minHeadroom, minTailroom); + } + + /** + * Return true if this IOBuf is part of a chain of multiple IOBufs, or false + * if this is the only IOBuf in its chain. + */ + bool isChained() const { + assert((next_ == this) == (prev_ == this)); + return next_ != this; + } + + /** + * Get the number of IOBufs in this chain. + * + * Beware that this method has to walk the entire chain. + * Use isChained() if you just want to check if this IOBuf is part of a chain + * or not. + */ + uint32_t countChainElements() const; + + /** + * Get the length of all the data in this IOBuf chain. + * + * Beware that this method has to walk the entire chain. + */ + uint64_t computeChainDataLength() const; + + /** + * Insert another IOBuf chain immediately before this IOBuf. + * + * For example, if there are two IOBuf chains (A, B, C) and (D, E, F), + * and B->prependChain(D) is called, the (D, E, F) chain will be subsumed + * and become part of the chain starting at A, which will now look like + * (A, D, E, F, B, C) + * + * Note that since IOBuf chains are circular, head->prependChain(other) can + * be used to append the other chain at the very end of the chain pointed to + * by head. For example, if there are two IOBuf chains (A, B, C) and + * (D, E, F), and A->prependChain(D) is called, the chain starting at A will + * now consist of (A, B, C, D, E, F) + * + * The elements in the specified IOBuf chain will become part of this chain, + * and will be owned by the head of this chain. When this chain is + * destroyed, all elements in the supplied chain will also be destroyed. + * + * For this reason, appendChain() only accepts an rvalue-reference to a + * unique_ptr(), to make it clear that it is taking ownership of the supplied + * chain. If you have a raw pointer, you can pass in a new temporary + * unique_ptr around the raw pointer. If you have an existing, + * non-temporary unique_ptr, you must call std::move(ptr) to make it clear + * that you are destroying the original pointer. + */ + void prependChain(std::unique_ptr&& iobuf); + + /** + * Append another IOBuf chain immediately after this IOBuf. + * + * For example, if there are two IOBuf chains (A, B, C) and (D, E, F), + * and B->appendChain(D) is called, the (D, E, F) chain will be subsumed + * and become part of the chain starting at A, which will now look like + * (A, B, D, E, F, C) + * + * The elements in the specified IOBuf chain will become part of this chain, + * and will be owned by the head of this chain. When this chain is + * destroyed, all elements in the supplied chain will also be destroyed. + * + * For this reason, appendChain() only accepts an rvalue-reference to a + * unique_ptr(), to make it clear that it is taking ownership of the supplied + * chain. If you have a raw pointer, you can pass in a new temporary + * unique_ptr around the raw pointer. If you have an existing, + * non-temporary unique_ptr, you must call std::move(ptr) to make it clear + * that you are destroying the original pointer. + */ + void appendChain(std::unique_ptr&& iobuf) { + // Just use prependChain() on the next element in our chain + next_->prependChain(std::move(iobuf)); + } + + /** + * Remove this IOBuf from its current chain. + * + * Since ownership of all elements an IOBuf chain is normally maintained by + * the head of the chain, unlink() transfers ownership of this IOBuf from the + * chain and gives it to the caller. A new unique_ptr to the IOBuf is + * returned to the caller. The caller must store the returned unique_ptr (or + * call release() on it) to take ownership, otherwise the IOBuf will be + * immediately destroyed. + * + * Since unlink transfers ownership of the IOBuf to the caller, be careful + * not to call unlink() on the head of a chain if you already maintain + * ownership on the head of the chain via other means. The pop() method + * is a better choice for that situation. + */ + std::unique_ptr unlink() { + next_->prev_ = prev_; + prev_->next_ = next_; + prev_ = this; + next_ = this; + return std::unique_ptr(this); + } + + /** + * Remove this IOBuf from its current chain and return a unique_ptr to + * the IOBuf that formerly followed it in the chain. + */ + std::unique_ptr pop() { + IOBuf *next = next_; + next_->prev_ = prev_; + prev_->next_ = next_; + prev_ = this; + next_ = this; + return std::unique_ptr((next == this) ? NULL : next); + } + + /** + * Remove a subchain from this chain. + * + * Remove the subchain starting at head and ending at tail from this chain. + * + * Returns a unique_ptr pointing to head. (In other words, ownership of the + * head of the subchain is transferred to the caller.) If the caller ignores + * the return value and lets the unique_ptr be destroyed, the subchain will + * be immediately destroyed. + * + * The subchain referenced by the specified head and tail must be part of the + * same chain as the current IOBuf, but must not contain the current IOBuf. + * However, the specified head and tail may be equal to each other (i.e., + * they may be a subchain of length 1). + */ + std::unique_ptr separateChain(IOBuf* head, IOBuf* tail) { + assert(head != this); + assert(tail != this); + + head->prev_->next_ = tail->next_; + tail->next_->prev_ = head->prev_; + + head->prev_ = tail; + tail->next_ = head; + + return std::unique_ptr(head); + } + + /** + * Return true if at least one of the IOBufs in this chain are shared, + * or false if all of the IOBufs point to unique buffers. + * + * Use isSharedOne() to only check this IOBuf rather than the entire chain. + */ + bool isShared() const { + const IOBuf* current = this; + while (true) { + if (current->isSharedOne()) { + return true; + } + current = current->next_; + if (current == this) { + return false; + } + } + } + + /** + * Return true if other IOBufs are also pointing to the buffer used by this + * IOBuf, and false otherwise. + * + * If this IOBuf points at a buffer owned by another (non-IOBuf) part of the + * code (i.e., if the IOBuf was created using wrapBuffer(), or was cloned + * from such an IOBuf), it is always considered shared. + * + * This only checks the current IOBuf, and not other IOBufs in the chain. + */ + bool isSharedOne() const { + // If this is a user-owned buffer, it is always considered shared + if (flags_ & kFlagUserOwned) { + return true; + } + + if (flags_ & kFlagExt) { + return ext_.sharedInfo->refcount.load(std::memory_order_acquire) > 1; + } else { + return false; + } + } + + /** + * Ensure that this IOBuf has a unique buffer that is not shared by other + * IOBufs. + * + * unshare() operates on an entire chain of IOBuf objects. If the chain is + * shared, it may also coalesce the chain when making it unique. If the + * chain is coalesced, subsequent IOBuf objects in the current chain will be + * automatically deleted. + * + * Note that buffers owned by other (non-IOBuf) users are automatically + * considered shared. + * + * Throws std::bad_alloc on error. On error the IOBuf chain will be + * unmodified. + * + * Currently unshare may also throw std::overflow_error if it tries to + * coalesce. (TODO: In the future it would be nice if unshare() were smart + * enough not to coalesce the entire buffer if the data is too large. + * However, in practice this seems unlikely to become an issue.) + */ + void unshare() { + if (isChained()) { + unshareChained(); + } else { + unshareOne(); + } + } + + /** + * Ensure that this IOBuf has a unique buffer that is not shared by other + * IOBufs. + * + * unshareOne() operates on a single IOBuf object. This IOBuf will have a + * unique buffer after unshareOne() returns, but other IOBufs in the chain + * may still be shared after unshareOne() returns. + * + * Throws std::bad_alloc on error. On error the IOBuf will be unmodified. + */ + void unshareOne() { + if (isSharedOne()) { + unshareOneSlow(); + } + } + + /** + * Coalesce this IOBuf chain into a single buffer. + * + * This method moves all of the data in this IOBuf chain into a single + * contiguous buffer, if it is not already in one buffer. After coalesce() + * returns, this IOBuf will be a chain of length one. Other IOBufs in the + * chain will be automatically deleted. + * + * After coalescing, the IOBuf will have at least as much headroom as the + * first IOBuf in the chain, and at least as much tailroom as the last IOBuf + * in the chain. + * + * Throws std::bad_alloc on error. On error the IOBuf chain will be + * unmodified. Throws std::overflow_error if the length of the entire chain + * larger than can be described by a uint32_t capacity. + */ + void coalesce() { + if (!isChained()) { + return; + } + coalesceSlow(); + } + + /** + * Ensure that this chain has at least maxLength bytes available as a + * contiguous memory range. + * + * This method coalesces whole buffers in the chain into this buffer as + * necessary until this buffer's length() is at least maxLength. + * + * After coalescing, the IOBuf will have at least as much headroom as the + * first IOBuf in the chain, and at least as much tailroom as the last IOBuf + * that was coalesced. + * + * Throws std::bad_alloc on error. On error the IOBuf chain will be + * unmodified. Throws std::overflow_error if the length of the coalesced + * portion of the chain is larger than can be described by a uint32_t + * capacity. (Although maxLength is uint32_t, gather() doesn't split + * buffers, so coalescing whole buffers may result in a capacity that can't + * be described in uint32_t. + * + * Upon return, either enough of the chain was coalesced into a contiguous + * region, or the entire chain was coalesced. That is, + * length() >= maxLength || !isChained() is true. + */ + void gather(uint32_t maxLength) { + if (!isChained() || length_ >= maxLength) { + return; + } + coalesceSlow(maxLength); + } + + /** + * Return a new IOBuf chain sharing the same data as this chain. + * + * The new IOBuf chain will normally point to the same underlying data + * buffers as the original chain. (The one exception to this is if some of + * the IOBufs in this chain contain small internal data buffers which cannot + * be shared.) + */ + std::unique_ptr clone() const; + + /** + * Return a new IOBuf with the same data as this IOBuf. + * + * The new IOBuf returned will not be part of a chain (even if this IOBuf is + * part of a larger chain). + */ + std::unique_ptr cloneOne() const; + + // Overridden operator new and delete. + // These directly use malloc() and free() to allocate the space for IOBuf + // objects. This is needed since IOBuf::create() manually uses malloc when + // allocating IOBuf objects with an internal buffer. + void* operator new(size_t size); + void* operator new(size_t size, void* ptr); + void operator delete(void* ptr); + + /** + * Destructively convert this IOBuf to a fbstring efficiently. + * We rely on fbstring's AcquireMallocatedString constructor to + * transfer memory. + */ + fbstring moveToFbString(); + + /** + * Iteration support: a chain of IOBufs may be iterated through using + * STL-style iterators over const ByteRanges. Iterators are only invalidated + * if the IOBuf that they currently point to is removed. + */ + Iterator cbegin() const; + Iterator cend() const; + Iterator begin() const; + Iterator end() const; + + private: + enum FlagsEnum { + kFlagExt = 0x1, + kFlagUserOwned = 0x2, + kFlagFreeSharedInfo = 0x4, + }; + + // Values for the ExternalBuf type field. + // We currently don't really use this for anything, other than to have it + // around for debugging purposes. We store it at the moment just because we + // have the 4 extra bytes in the ExternalBuf struct that would just be + // padding otherwise. + enum ExtBufTypeEnum { + kExtAllocated = 0, + kExtUserSupplied = 1, + kExtUserOwned = 2, + }; + + struct SharedInfo { + SharedInfo(); + SharedInfo(FreeFunction fn, void* arg); + + // A pointer to a function to call to free the buffer when the refcount + // hits 0. If this is NULL, free() will be used instead. + FreeFunction freeFn; + void* userData; + std::atomic refcount; + }; + struct ExternalBuf { + uint32_t capacity; + uint32_t type; + uint8_t* buf; + // SharedInfo may be NULL if kFlagUserOwned is set. It is non-NULL + // in all other cases. + SharedInfo* sharedInfo; + }; + struct InternalBuf { + uint8_t buf[] __attribute__((aligned)); + }; + + // The maximum size for an IOBuf object, including any internal data buffer + static const uint32_t kMaxIOBufSize = 256; + static const uint32_t kMaxInternalDataSize; + + // Forbidden copy constructor and assignment opererator + IOBuf(IOBuf const &); + IOBuf& operator=(IOBuf const &); + + /** + * Create a new IOBuf with internal data. + * + * end is a pointer to the end of the IOBuf's internal data buffer. + */ + explicit IOBuf(uint8_t* end); + + /** + * Create a new IOBuf pointing to an external buffer. + * + * The caller is responsible for holding a reference count for this new + * IOBuf. The IOBuf constructor does not automatically increment the + * reference count. + */ + IOBuf(ExtBufTypeEnum type, uint32_t flags, + uint8_t* buf, uint32_t capacity, + uint8_t* data, uint32_t length, + SharedInfo* sharedInfo); + + void unshareOneSlow(); + void unshareChained(); + void coalesceSlow(size_t maxLength=std::numeric_limits::max()); + // newLength must be the entire length of the buffers between this and + // end (no truncation) + void coalesceAndReallocate( + size_t newHeadroom, + size_t newLength, + IOBuf* end, + size_t newTailroom); + void decrementRefcount(); + void reserveSlow(uint32_t minHeadroom, uint32_t minTailroom); + + static size_t goodExtBufferSize(uint32_t minCapacity); + static void initExtBuffer(uint8_t* buf, size_t mallocSize, + SharedInfo** infoReturn, + uint32_t* capacityReturn); + static void allocExtBuffer(uint32_t minCapacity, + uint8_t** bufReturn, + SharedInfo** infoReturn, + uint32_t* capacityReturn); + + /* + * Member variables + */ + + /* + * Links to the next and the previous IOBuf in this chain. + * + * The chain is circularly linked (the last element in the chain points back + * at the head), and next_ and prev_ can never be NULL. If this IOBuf is the + * only element in the chain, next_ and prev_ will both point to this. + */ + IOBuf* next_; + IOBuf* prev_; + + /* + * A pointer to the start of the data referenced by this IOBuf, and the + * length of the data. + * + * This may refer to any subsection of the actual buffer capacity. + */ + uint8_t* data_; + uint32_t length_; + uint32_t flags_; + + union { + ExternalBuf ext_; + InternalBuf int_; + }; + + struct DeleterBase { + virtual ~DeleterBase() { } + virtual void dispose(void* p) = 0; + }; + + template + struct UniquePtrDeleter : public DeleterBase { + typedef typename UniquePtr::pointer Pointer; + typedef typename UniquePtr::deleter_type Deleter; + + explicit UniquePtrDeleter(Deleter deleter) : deleter_(std::move(deleter)){ } + void dispose(void* p) { + try { + deleter_(static_cast(p)); + delete this; + } catch (...) { + abort(); + } + } + + private: + Deleter deleter_; + }; + + static void freeUniquePtrBuffer(void* ptr, void* userData) { + static_cast(userData)->dispose(ptr); + } +}; + +template +typename std::enable_if::value, + std::unique_ptr>::type +IOBuf::takeOwnership(UniquePtr&& buf, size_t count) { + size_t size = count * sizeof(typename UniquePtr::element_type); + CHECK_LT(size, size_t(std::numeric_limits::max())); + auto deleter = new UniquePtrDeleter(buf.get_deleter()); + return takeOwnership(buf.release(), + size, + &IOBuf::freeUniquePtrBuffer, + deleter); +} + +inline std::unique_ptr IOBuf::copyBuffer( + const void* data, uint32_t size, uint32_t headroom, + uint32_t minTailroom) { + uint32_t capacity = headroom + size + minTailroom; + std::unique_ptr buf = create(capacity); + buf->advance(headroom); + memcpy(buf->writableData(), data, size); + buf->append(size); + return buf; +} + +inline std::unique_ptr IOBuf::copyBuffer(const std::string& buf, + uint32_t headroom, + uint32_t minTailroom) { + return copyBuffer(buf.data(), buf.size(), headroom, minTailroom); +} + +inline std::unique_ptr IOBuf::maybeCopyBuffer(const std::string& buf, + uint32_t headroom, + uint32_t minTailroom) { + if (buf.empty()) { + return nullptr; + } + return copyBuffer(buf.data(), buf.size(), headroom, minTailroom); +} + +class IOBuf::Iterator : public boost::iterator_facade< + IOBuf::Iterator, // Derived + const ByteRange, // Value + boost::forward_traversal_tag // Category or traversal + > { + friend class boost::iterator_core_access; + public: + // Note that IOBufs are stored as a circular list without a guard node, + // so pos == end is ambiguous (it may mean "begin" or "end"). To solve + // the ambiguity (at the cost of one extra comparison in the "increment" + // code path), we define end iterators as having pos_ == end_ == nullptr + // and we only allow forward iteration. + explicit Iterator(const IOBuf* pos, const IOBuf* end) + : pos_(pos), + end_(end) { + // Sadly, we must return by const reference, not by value. + if (pos_) { + setVal(); + } + } + + private: + void setVal() { + val_ = ByteRange(pos_->data(), pos_->tail()); + } + + void adjustForEnd() { + if (pos_ == end_) { + pos_ = end_ = nullptr; + val_ = ByteRange(); + } else { + setVal(); + } + } + + const ByteRange& dereference() const { + return val_; + } + + bool equal(const Iterator& other) const { + // We must compare end_ in addition to pos_, because forward traversal + // requires that if two iterators are equal (a == b) and dereferenceable, + // then ++a == ++b. + return pos_ == other.pos_ && end_ == other.end_; + } + + void increment() { + pos_ = pos_->next(); + adjustForEnd(); + } + + const IOBuf* pos_; + const IOBuf* end_; + ByteRange val_; +}; + +inline IOBuf::Iterator IOBuf::begin() const { return cbegin(); } +inline IOBuf::Iterator IOBuf::end() const { return cend(); } + +} // folly + +#endif // FOLLY_IO_IOBUF_H_ diff --git a/folly/io/IOBufQueue.cpp b/folly/io/IOBufQueue.cpp new file mode 100644 index 00000000..9cb991a4 --- /dev/null +++ b/folly/io/IOBufQueue.cpp @@ -0,0 +1,268 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/io/IOBufQueue.h" + +#include + +#include + +using std::make_pair; +using std::pair; +using std::unique_ptr; + +namespace { + +using folly::IOBuf; + +const size_t MIN_ALLOC_SIZE = 2000; +const size_t MAX_ALLOC_SIZE = 8000; // Must fit within a uint32_t + +/** + * Convenience function to append chain src to chain dst. + */ +void +appendToChain(unique_ptr& dst, unique_ptr&& src) { + if (dst == NULL) { + dst = std::move(src); + } else { + dst->prev()->appendChain(std::move(src)); + } +} + +} // anonymous namespace + +namespace folly { + +IOBufQueue::IOBufQueue(const Options& options) + : options_(options), + chainLength_(0) { +} + +IOBufQueue::IOBufQueue(IOBufQueue&& other) + : options_(other.options_), + chainLength_(other.chainLength_), + head_(std::move(other.head_)) { + other.chainLength_ = 0; +} + +IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) { + if (&other != this) { + options_ = other.options_; + chainLength_ = other.chainLength_; + head_ = std::move(other.head_); + other.chainLength_ = 0; + } + return *this; +} + +std::pair +IOBufQueue::headroom() { + if (head_) { + return std::make_pair(head_->writableBuffer(), head_->headroom()); + } else { + return std::make_pair(nullptr, 0); + } +} + +void +IOBufQueue::markPrepended(uint32_t n) { + if (n == 0) { + return; + } + assert(head_); + head_->prepend(n); + if (options_.cacheChainLength) { + chainLength_ += n; + } +} + +void +IOBufQueue::prepend(const void* buf, uint32_t n) { + auto p = headroom(); + if (n > p.second) { + throw std::overflow_error("Not enough room to prepend"); + } + memcpy(static_cast(p.first) + p.second - n, buf, n); + markPrepended(n); +} + +void +IOBufQueue::append(unique_ptr&& buf) { + if (!buf) { + return; + } + if (options_.cacheChainLength) { + chainLength_ += buf->computeChainDataLength(); + } + appendToChain(head_, std::move(buf)); +} + +void +IOBufQueue::append(IOBufQueue& other) { + if (!other.head_) { + return; + } + if (options_.cacheChainLength) { + if (other.options_.cacheChainLength) { + chainLength_ += other.chainLength_; + } else { + chainLength_ += other.head_->computeChainDataLength(); + } + } + appendToChain(head_, std::move(other.head_)); + other.chainLength_ = 0; +} + +void +IOBufQueue::append(const void* buf, size_t len) { + auto src = static_cast(buf); + while (len != 0) { + if ((head_ == NULL) || head_->prev()->isSharedOne() || + (head_->prev()->tailroom() == 0)) { + appendToChain(head_, std::move( + IOBuf::create(std::max(MIN_ALLOC_SIZE, + std::min(len, MAX_ALLOC_SIZE))))); + } + IOBuf* last = head_->prev(); + uint32_t copyLen = std::min(len, (size_t)last->tailroom()); + memcpy(last->writableTail(), src, copyLen); + src += copyLen; + last->append(copyLen); + if (options_.cacheChainLength) { + chainLength_ += copyLen; + } + len -= copyLen; + } +} + +void +IOBufQueue::wrapBuffer(const void* buf, size_t len, uint32_t blockSize) { + auto src = static_cast(buf); + while (len != 0) { + size_t n = std::min(len, size_t(blockSize)); + append(IOBuf::wrapBuffer(src, n)); + src += n; + len -= n; + } +} + +pair +IOBufQueue::preallocate(uint32_t min, uint32_t newAllocationSize, + uint32_t max) { + if (head_ != NULL) { + // If there's enough space left over at the end of the queue, use that. + IOBuf* last = head_->prev(); + if (!last->isSharedOne()) { + uint32_t avail = last->tailroom(); + if (avail >= min) { + return make_pair(last->writableTail(), std::min(max, avail)); + } + } + } + // Allocate a new buffer of the requested max size. + unique_ptr newBuf(IOBuf::create(std::max(min, newAllocationSize))); + appendToChain(head_, std::move(newBuf)); + IOBuf* last = head_->prev(); + return make_pair(last->writableTail(), + std::min(max, last->tailroom())); +} + +void +IOBufQueue::postallocate(uint32_t n) { + head_->prev()->append(n); + if (options_.cacheChainLength) { + chainLength_ += n; + } +} + +unique_ptr +IOBufQueue::split(size_t n) { + unique_ptr result; + while (n != 0) { + if (head_ == NULL) { + throw std::underflow_error( + "Attempt to remove more bytes than are present in IOBufQueue"); + } else if (head_->length() <= n) { + n -= head_->length(); + if (options_.cacheChainLength) { + chainLength_ -= head_->length(); + } + unique_ptr remainder = head_->pop(); + appendToChain(result, std::move(head_)); + head_ = std::move(remainder); + } else { + unique_ptr clone = head_->cloneOne(); + clone->trimEnd(clone->length() - n); + appendToChain(result, std::move(clone)); + head_->trimStart(n); + if (options_.cacheChainLength) { + chainLength_ -= n; + } + break; + } + } + return std::move(result); +} + +void IOBufQueue::trimStart(size_t amount) { + while (amount > 0) { + if (!head_) { + throw std::underflow_error( + "Attempt to trim more bytes than are present in IOBufQueue"); + } + if (head_->length() > amount) { + head_->trimStart(amount); + if (options_.cacheChainLength) { + chainLength_ -= amount; + } + break; + } + amount -= head_->length(); + if (options_.cacheChainLength) { + chainLength_ -= head_->length(); + } + head_ = head_->pop(); + } +} + +void IOBufQueue::trimEnd(size_t amount) { + while (amount > 0) { + if (!head_) { + throw std::underflow_error( + "Attempt to trim more bytes than are present in IOBufQueue"); + } + if (head_->prev()->length() > amount) { + head_->prev()->trimEnd(amount); + if (options_.cacheChainLength) { + chainLength_ -= amount; + } + break; + } + amount -= head_->prev()->length(); + if (options_.cacheChainLength) { + chainLength_ -= head_->prev()->length(); + } + unique_ptr b = head_->prev()->unlink(); + + // Null queue if we unlinked the head. + if (b.get() == head_.get()) { + head_.reset(); + } + } +} + +} // folly diff --git a/folly/io/IOBufQueue.h b/folly/io/IOBufQueue.h new file mode 100644 index 00000000..23de232d --- /dev/null +++ b/folly/io/IOBufQueue.h @@ -0,0 +1,234 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_IO_IOBUF_QUEUE_H +#define FOLLY_IO_IOBUF_QUEUE_H + +#include "folly/io/IOBuf.h" + +#include +#include + +namespace folly { + +/** + * An IOBufQueue encapsulates a chain of IOBufs and provides + * convenience functions to append data to the back of the chain + * and remove data from the front. + * + * You may also prepend data into the headroom of the first buffer in the + * chain, if any. + */ +class IOBufQueue { + public: + struct Options { + Options() : cacheChainLength(false) { } + bool cacheChainLength; + }; + + /** + * Commonly used Options, currently the only possible value other than + * the default. + */ + static Options cacheChainLength() { + Options options; + options.cacheChainLength = true; + return options; + } + + explicit IOBufQueue(const Options& options = Options()); + + /** + * Return a space to prepend bytes and the amount of headroom available. + */ + std::pair headroom(); + + /** + * Indicate that n bytes from the headroom have been used. + */ + void markPrepended(uint32_t n); + + /** + * Prepend an existing range; throws std::overflow_error if not enough + * room. + */ + void prepend(const void* buf, uint32_t n); + + /** + * Add a buffer or buffer chain to the end of this queue. The + * queue takes ownership of buf. + */ + void append(std::unique_ptr&& buf); + + /** + * Add a queue to the end of this queue. The queue takes ownership of + * all buffers from the other queue. + */ + void append(IOBufQueue& other); + void append(IOBufQueue&& other) { + append(other); // call lvalue reference overload, above + } + + /** + * Copy len bytes, starting at buf, to the end of this queue. + * The caller retains ownership of the source data. + */ + void append(const void* buf, size_t len); + + /** + * Copy a string to the end of this queue. + * The caller retains ownership of the source data. + */ + void append(const std::string& buf) { + append(buf.data(), buf.length()); + } + + /** + * Append a chain of IOBuf objects that point to consecutive regions + * within buf. + * + * Just like IOBuf::wrapBuffer, this should only be used when the caller + * knows ahead of time and can ensure that all IOBuf objects that will point + * to this buffer will be destroyed before the buffer itself is destroyed; + * all other caveats from wrapBuffer also apply. + * + * Every buffer except for the last will wrap exactly blockSize bytes. + * Importantly, this method may be used to wrap buffers larger than 4GB. + */ + void wrapBuffer(const void* buf, size_t len, + uint32_t blockSize=(1U << 31)); // default block size: 2GB + + /** + * Obtain a writable block of contiguous bytes at the end of this + * queue, allocating more space if necessary. The amount of space + * reserved will be at least min. If min contiguous space is not + * available at the end of the queue, and IOBuf with size newAllocationSize + * is appended to the chain and returned. The actual available space + * may be larger than newAllocationSize, but will be truncated to max, + * if specified. + * + * If the caller subsequently writes anything into the returned space, + * it must call the postallocate() method. + * + * @return The starting address of the block and the length in bytes. + * + * @note The point of the preallocate()/postallocate() mechanism is + * to support I/O APIs such as Thrift's TAsyncSocket::ReadCallback + * that request a buffer from the application and then, in a later + * callback, tell the application how much of the buffer they've + * filled with data. + */ + std::pair preallocate( + uint32_t min, uint32_t newAllocationSize, + uint32_t max = std::numeric_limits::max()); + + /** + * Tell the queue that the caller has written data into the first n + * bytes provided by the previous preallocate() call. + * + * @note n should be less than or equal to the size returned by + * preallocate(). If n is zero, the caller may skip the call + * to postallocate(). If n is nonzero, the caller must not + * invoke any other non-const methods on this IOBufQueue between + * the call to preallocate and the call to postallocate(). + */ + void postallocate(uint32_t n); + + /** + * Obtain a writable block of n contiguous bytes, allocating more space + * if necessary, and mark it as used. The caller can fill it later. + */ + void* allocate(uint32_t n) { + void* p = preallocate(n, n).first; + postallocate(n); + return p; + } + + /** + * Split off the first n bytes of the queue into a separate IOBuf chain, + * and transfer ownership of the new chain to the caller. The IOBufQueue + * retains ownership of everything after the split point. + * + * @warning If the split point lies in the middle of some IOBuf within + * the chain, this function may, as an implementation detail, + * clone that IOBuf. + * + * @throws std::underflow_error if n exceeds the number of bytes + * in the queue. + */ + std::unique_ptr split(size_t n); + + /** + * Similar to IOBuf::trimStart, but works on the whole queue. Will + * pop off buffers that have been completely trimmed. + */ + void trimStart(size_t amount); + + /** + * Similar to IOBuf::trimEnd, but works on the whole queue. Will + * pop off buffers that have been completely trimmed. + */ + void trimEnd(size_t amount); + + /** + * Transfer ownership of the queue's entire IOBuf chain to the caller. + */ + std::unique_ptr move() { + chainLength_ = 0; + return std::move(head_); + } + + /** + * Access + */ + const folly::IOBuf* front() const { + return head_.get(); + } + + /** + * Total chain length, only valid if cacheLength was specified in the + * constructor. + */ + size_t chainLength() const { + if (!options_.cacheChainLength) { + throw std::invalid_argument("IOBufQueue: chain length not cached"); + } + return chainLength_; + } + + const Options& options() const { + return options_; + } + + /** Movable */ + IOBufQueue(IOBufQueue&&); + IOBufQueue& operator=(IOBufQueue&&); + + private: + static const size_t kChainLengthNotCached = (size_t)-1; + /** Not copyable */ + IOBufQueue(const IOBufQueue&) = delete; + IOBufQueue& operator=(const IOBufQueue&) = delete; + + Options options_; + size_t chainLength_; + /** Everything that has been appended but not yet discarded or moved out */ + std::unique_ptr head_; +}; + +} // folly + +#endif // FOLLY_IO_IOBUF_QUEUE_H diff --git a/folly/io/TypedIOBuf.h b/folly/io/TypedIOBuf.h new file mode 100644 index 00000000..3e8d2297 --- /dev/null +++ b/folly/io/TypedIOBuf.h @@ -0,0 +1,206 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_IO_TYPEDIOBUF_H_ +#define FOLLY_IO_TYPEDIOBUF_H_ + +#include +#include +#include +#include "folly/io/IOBuf.h" + +namespace folly { + +/** + * Wrapper class to handle a IOBuf as a typed buffer (to a standard layout + * class). + * + * This class punts on alignment, and assumes that you know what you're doing. + * + * All methods are wrappers around the corresponding IOBuf methods. The + * TypedIOBuf object is stateless, so it's perfectly okay to access the + * underlying IOBuf in between TypedIOBuf method calls. + */ +template +class TypedIOBuf { + static_assert(std::is_standard_layout::value, "must be standard layout"); + public: + typedef T value_type; + typedef value_type& reference; + typedef const value_type& const_reference; + typedef uint32_t size_type; + typedef value_type* iterator; + typedef const value_type* const_iterator; + + explicit TypedIOBuf(IOBuf* buf) : buf_(buf) { } + + IOBuf* ioBuf() { + return buf_; + } + const IOBuf* ioBuf() const { + return buf_; + } + + bool empty() const { + return buf_->empty(); + } + const T* data() const { + return cast(buf_->data()); + } + T* writableData() { + return cast(buf_->writableData()); + } + const T* tail() const { + return cast(buf_->tail()); + } + T* writableTail() { + return cast(buf_->writableTail()); + } + uint32_t length() const { + return sdiv(buf_->length()); + } + uint32_t size() const { return length(); } + + uint32_t headroom() const { + return sdiv(buf_->headroom()); + } + uint32_t tailroom() const { + return sdiv(buf_->tailroom()); + } + const T* buffer() const { + return cast(buf_->buffer()); + } + T* writableBuffer() { + return cast(buf_->writableBuffer()); + } + const T* bufferEnd() const { + return cast(buf_->bufferEnd()); + } + uint32_t capacity() const { + return sdiv(buf_->capacity()); + } + void advance(uint32_t n) { + buf_->advance(smul(n)); + } + void retreat(uint32_t n) { + buf_->retreat(smul(n)); + } + void prepend(uint32_t n) { + buf_->prepend(smul(n)); + } + void append(uint32_t n) { + buf_->append(smul(n)); + } + void trimStart(uint32_t n) { + buf_->trimStart(smul(n)); + } + void trimEnd(uint32_t n) { + buf_->trimEnd(smul(n)); + } + void clear() { + buf_->clear(); + } + void reserve(uint32_t minHeadroom, uint32_t minTailroom) { + buf_->reserve(smul(minHeadroom), smul(minTailroom)); + } + void reserve(uint32_t minTailroom) { reserve(0, minTailroom); } + + const T* cbegin() const { return data(); } + const T* cend() const { return tail(); } + const T* begin() const { return cbegin(); } + const T* end() const { return cend(); } + T* begin() { return writableData(); } + T* end() { return writableTail(); } + + const T& front() const { + assert(!empty()); + return *begin(); + } + T& front() { + assert(!empty()); + return *begin(); + } + const T& back() const { + assert(!empty()); + return end()[-1]; + } + T& back() { + assert(!empty()); + return end()[-1]; + } + + /** + * Simple wrapper to make it easier to treat this TypedIOBuf as an array of + * T. + */ + const T& operator[](ssize_t idx) const { + assert(idx >= 0 && idx < length()); + return data()[idx]; + } + + /** + * Append one element. + */ + void push(const T& data) { + push(&data, &data + 1); + } + void push_back(const T& data) { push(data); } + + /** + * Append multiple elements in a sequence; will call distance(). + */ + template + void push(IT begin, IT end) { + auto n = std::distance(begin, end); + reserve(headroom(), n); + std::copy(begin, end, writableTail()); + append(n); + } + + // Movable + TypedIOBuf(TypedIOBuf&&) = default; + TypedIOBuf& operator=(TypedIOBuf&&) = default; + + private: + // Non-copyable + TypedIOBuf(const TypedIOBuf&) = delete; + TypedIOBuf& operator=(const TypedIOBuf&) = delete; + + // cast to T* + static T* cast(uint8_t* p) { + return reinterpret_cast(p); + } + static const T* cast(const uint8_t* p) { + return reinterpret_cast(p); + } + // divide by size + static uint32_t sdiv(uint32_t n) { + return n / sizeof(T); + } + // multiply by size + static uint32_t smul(uint32_t n) { + // In debug mode, check for overflow + assert((uint64_t(n) * sizeof(T)) < (uint64_t(1) << 32)); + return n * sizeof(T); + } + + IOBuf* buf_; +}; + +} // namespace folly + +#endif /* FOLLY_IO_TYPEDIOBUF_H_ */ + diff --git a/folly/io/test/IOBufCursorTest.cpp b/folly/io/test/IOBufCursorTest.cpp new file mode 100644 index 00000000..c1fe8967 --- /dev/null +++ b/folly/io/test/IOBufCursorTest.cpp @@ -0,0 +1,415 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/io/IOBuf.h" + +#include +#include +#include +#include "folly/Benchmark.h" +#include "folly/Range.h" +#include "folly/io/Cursor.h" + +DECLARE_bool(benchmark); + +using folly::IOBuf; +using std::unique_ptr; +using namespace folly::io; + +TEST(IOBuf, RWCursor) { + unique_ptr iobuf1(IOBuf::create(20)); + iobuf1->append(20); + unique_ptr iobuf2(IOBuf::create(20)); + iobuf2->append(20); + + IOBuf* iob2ptr = iobuf2.get(); + iobuf1->prependChain(std::move(iobuf2)); + + EXPECT_TRUE(iobuf1->isChained()); + + RWPrivateCursor wcursor(iobuf1.get()); + Cursor rcursor(iobuf1.get()); + wcursor.writeLE((uint64_t)1); + wcursor.writeLE((uint64_t)1); + wcursor.writeLE((uint64_t)1); + wcursor.write((uint8_t)1); + + EXPECT_EQ(1, rcursor.readLE()); + rcursor.skip(8); + EXPECT_EQ(1, rcursor.readLE()); + rcursor.skip(0); + EXPECT_EQ(0, rcursor.read()); + EXPECT_EQ(0, rcursor.read()); + EXPECT_EQ(0, rcursor.read()); + EXPECT_EQ(0, rcursor.read()); + EXPECT_EQ(1, rcursor.read()); +} + +TEST(IOBuf, skip) { + unique_ptr iobuf1(IOBuf::create(20)); + iobuf1->append(20); + RWPrivateCursor wcursor(iobuf1.get()); + wcursor.write((uint8_t)1); + wcursor.write((uint8_t)2); + Cursor cursor(iobuf1.get()); + cursor.skip(1); + EXPECT_EQ(2, cursor.read()); +} + +TEST(IOBuf, reset) { + unique_ptr iobuf1(IOBuf::create(20)); + iobuf1->append(20); + RWPrivateCursor wcursor(iobuf1.get()); + wcursor.write((uint8_t)1); + wcursor.write((uint8_t)2); + wcursor.reset(iobuf1.get()); + EXPECT_EQ(1, wcursor.read()); +} + +TEST(IOBuf, copy_assign_convert) { + unique_ptr iobuf1(IOBuf::create(20)); + iobuf1->append(20); + RWPrivateCursor wcursor(iobuf1.get()); + RWPrivateCursor cursor2(wcursor); + RWPrivateCursor cursor3(iobuf1.get()); + + wcursor.write((uint8_t)1); + cursor3 = wcursor; + wcursor.write((uint8_t)2); + Cursor cursor4(wcursor); + RWPrivateCursor cursor5(wcursor); + wcursor.write((uint8_t)3); + + EXPECT_EQ(1, cursor2.read()); + EXPECT_EQ(2, cursor3.read()); + EXPECT_EQ(3, cursor4.read()); +} + +TEST(IOBuf, overloading) { + unique_ptr iobuf1(IOBuf::create(20)); + iobuf1->append(20); + RWPrivateCursor wcursor(iobuf1.get()); + wcursor += 1; + wcursor.write((uint8_t)1); + Cursor cursor(iobuf1.get()); + cursor += 1; + EXPECT_EQ(1, cursor.read()); +} + +TEST(IOBuf, endian) { + unique_ptr iobuf1(IOBuf::create(20)); + iobuf1->append(20); + RWPrivateCursor wcursor(iobuf1.get()); + Cursor rcursor(iobuf1.get()); + uint16_t v = 1; + int16_t vu = -1; + wcursor.writeBE(v); + wcursor.writeBE(vu); + // Try a couple combinations to ensure they were generated correctly + wcursor.writeBE(vu); + wcursor.writeLE(vu); + wcursor.writeLE(vu); + wcursor.writeLE(v); + EXPECT_EQ(v, rcursor.readBE()); +} + +TEST(IOBuf, Cursor) { + unique_ptr iobuf1(IOBuf::create(1)); + iobuf1->append(1); + RWPrivateCursor c(iobuf1.get()); + c.write((uint8_t)40); // OK + try { + c.write((uint8_t)10); // Bad write, checked should except. + EXPECT_EQ(true, false); + } catch (...) { + } +} + +TEST(IOBuf, UnshareCursor) { + uint8_t buf = 0; + unique_ptr iobuf1(IOBuf::wrapBuffer(&buf, 1)); + unique_ptr iobuf2(IOBuf::wrapBuffer(&buf, 1)); + RWUnshareCursor c1(iobuf1.get()); + RWUnshareCursor c2(iobuf2.get()); + + c1.write((uint8_t)10); // This should duplicate the two buffers. + uint8_t t = c2.read(); + EXPECT_EQ(0, t); + + iobuf1 = IOBuf::wrapBuffer(&buf, 1); + iobuf2 = IOBuf::wrapBuffer(&buf, 1); + RWPrivateCursor c3(iobuf1.get()); + RWPrivateCursor c4(iobuf2.get()); + + c3.write((uint8_t)10); // This should _not_ duplicate the two buffers. + t = c4.read(); + EXPECT_EQ(10, t); +} + +namespace { +void append(std::unique_ptr& buf, folly::StringPiece data) { + EXPECT_LE(data.size(), buf->tailroom()); + memcpy(buf->writableData(), data.data(), data.size()); + buf->append(data.size()); +} + +void append(Appender& appender, folly::StringPiece data) { + appender.push(reinterpret_cast(data.data()), data.size()); +} + +std::string toString(const IOBuf& buf) { + std::string str; + Cursor cursor(&buf); + std::pair p; + while ((p = cursor.peek()).second) { + str.append(reinterpret_cast(p.first), p.second); + cursor.skip(p.second); + } + return str; +} + +} // namespace + +TEST(IOBuf, PullAndPeek) { + std::unique_ptr iobuf1(IOBuf::create(10)); + append(iobuf1, "he"); + std::unique_ptr iobuf2(IOBuf::create(10)); + append(iobuf2, "llo "); + std::unique_ptr iobuf3(IOBuf::create(10)); + append(iobuf3, "world"); + iobuf1->prependChain(std::move(iobuf2)); + iobuf1->prependChain(std::move(iobuf3)); + EXPECT_EQ(3, iobuf1->countChainElements()); + EXPECT_EQ(11, iobuf1->computeChainDataLength()); + + char buf[12]; + memset(buf, 0, sizeof(buf)); + Cursor(iobuf1.get()).pull(buf, 11); + EXPECT_EQ("hello world", std::string(buf)); + + memset(buf, 0, sizeof(buf)); + EXPECT_EQ(11, Cursor(iobuf1.get()).pullAtMost(buf, 20)); + EXPECT_EQ("hello world", std::string(buf)); + + EXPECT_THROW({Cursor(iobuf1.get()).pull(buf, 20);}, + std::out_of_range); + + { + RWPrivateCursor cursor(iobuf1.get()); + auto p = cursor.peek(); + EXPECT_EQ("he", std::string(reinterpret_cast(p.first), + p.second)); + cursor.skip(p.second); + p = cursor.peek(); + EXPECT_EQ("llo ", std::string(reinterpret_cast(p.first), + p.second)); + cursor.skip(p.second); + p = cursor.peek(); + EXPECT_EQ("world", std::string(reinterpret_cast(p.first), + p.second)); + cursor.skip(p.second); + EXPECT_EQ(3, iobuf1->countChainElements()); + EXPECT_EQ(11, iobuf1->computeChainDataLength()); + } + + { + RWPrivateCursor cursor(iobuf1.get()); + cursor.gather(11); + auto p = cursor.peek(); + EXPECT_EQ("hello world", std::string(reinterpret_cast(p.first), p.second)); + EXPECT_EQ(1, iobuf1->countChainElements()); + EXPECT_EQ(11, iobuf1->computeChainDataLength()); + } +} + +TEST(IOBuf, cloneAndInsert) { + std::unique_ptr iobuf1(IOBuf::create(10)); + append(iobuf1, "he"); + std::unique_ptr iobuf2(IOBuf::create(10)); + append(iobuf2, "llo "); + std::unique_ptr iobuf3(IOBuf::create(10)); + append(iobuf3, "world"); + iobuf1->prependChain(std::move(iobuf2)); + iobuf1->prependChain(std::move(iobuf3)); + EXPECT_EQ(3, iobuf1->countChainElements()); + EXPECT_EQ(11, iobuf1->computeChainDataLength()); + + std::unique_ptr cloned; + + Cursor(iobuf1.get()).clone(cloned, 3); + EXPECT_EQ(2, cloned->countChainElements()); + EXPECT_EQ(3, cloned->computeChainDataLength()); + + + EXPECT_EQ(11, Cursor(iobuf1.get()).cloneAtMost(cloned, 20)); + EXPECT_EQ(3, cloned->countChainElements()); + EXPECT_EQ(11, cloned->computeChainDataLength()); + + + EXPECT_THROW({Cursor(iobuf1.get()).clone(cloned, 20);}, + std::out_of_range); + + { + // Check that inserting in the middle of an iobuf splits + RWPrivateCursor cursor(iobuf1.get()); + Cursor(iobuf1.get()).clone(cloned, 3); + EXPECT_EQ(2, cloned->countChainElements()); + EXPECT_EQ(3, cloned->computeChainDataLength()); + + cursor.skip(1); + + cursor.insert(std::move(cloned)); + EXPECT_EQ(6, iobuf1->countChainElements()); + EXPECT_EQ(14, iobuf1->computeChainDataLength()); + // Check that nextBuf got set correctly + cursor.read(); + } + + { + // Check that inserting at the end doesn't create empty buf + RWPrivateCursor cursor(iobuf1.get()); + Cursor(iobuf1.get()).clone(cloned, 1); + EXPECT_EQ(1, cloned->countChainElements()); + EXPECT_EQ(1, cloned->computeChainDataLength()); + + cursor.skip(1); + + cursor.insert(std::move(cloned)); + EXPECT_EQ(7, iobuf1->countChainElements()); + EXPECT_EQ(15, iobuf1->computeChainDataLength()); + // Check that nextBuf got set correctly + cursor.read(); + } + { + // Check that inserting at the beginning doesn't create empty buf + RWPrivateCursor cursor(iobuf1.get()); + Cursor(iobuf1.get()).clone(cloned, 1); + EXPECT_EQ(1, cloned->countChainElements()); + EXPECT_EQ(1, cloned->computeChainDataLength()); + + cursor.insert(std::move(cloned)); + EXPECT_EQ(8, iobuf1->countChainElements()); + EXPECT_EQ(16, iobuf1->computeChainDataLength()); + // Check that nextBuf got set correctly + cursor.read(); + } +} + +TEST(IOBuf, Appender) { + std::unique_ptr head(IOBuf::create(10)); + append(head, "hello"); + + Appender app(head.get(), 10); + uint32_t cap = head->capacity(); + uint32_t len1 = app.length(); + EXPECT_EQ(cap - 5, len1); + app.ensure(len1); // won't grow + EXPECT_EQ(len1, app.length()); + app.ensure(len1 + 1); // will grow + EXPECT_LE(len1 + 1, app.length()); + + append(app, " world"); + EXPECT_EQ("hello world", toString(*head)); +} + +int benchmark_size = 1000; +unique_ptr iobuf_benchmark; + +unique_ptr iobuf_read_benchmark; + +template +void runBenchmark() { + CursClass c(iobuf_benchmark.get()); + + for(int i = 0; i < benchmark_size; i++) { + c.write((uint8_t)0); + } +} + +BENCHMARK(rwPrivateCursorBenchmark, iters) { + while (--iters) { + runBenchmark(); + } +} + +BENCHMARK(rwUnshareCursorBenchmark, iters) { + while (--iters) { + runBenchmark(); + } +} + + +BENCHMARK(cursorBenchmark, iters) { + while (--iters) { + Cursor c(iobuf_read_benchmark.get()); + for(int i = 0; i < benchmark_size ; i++) { + c.read(); + } + } +} + +BENCHMARK(skipBenchmark, iters) { + uint8_t buf; + while (--iters) { + Cursor c(iobuf_read_benchmark.get()); + for(int i = 0; i < benchmark_size ; i++) { + c.peek(); + c.skip(1); + } + } +} + +// fbmake opt +// _bin/folly/experimental/io/test/iobuf_cursor_test -benchmark +// +// Benchmark Iters Total t t/iter iter/sec +// --------------------------------------------------------------------------- +// rwPrivateCursorBenchmark 100000 142.9 ms 1.429 us 683.5 k +// rwUnshareCursorBenchmark 100000 309.3 ms 3.093 us 315.7 k +// cursorBenchmark 100000 741.4 ms 7.414 us 131.7 k +// skipBenchmark 100000 738.9 ms 7.389 us 132.2 k +// +// uname -a: +// +// Linux dev2159.snc6.facebook.com 2.6.33-7_fbk15_104e4d0 #1 SMP +// Tue Oct 19 22:40:30 PDT 2010 x86_64 x86_64 x86_64 GNU/Linux +// +// 72GB RAM, 2 CPUs (Intel(R) Xeon(R) CPU L5630 @ 2.13GHz) +// hyperthreading disabled + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + + auto ret = RUN_ALL_TESTS(); + + if (ret == 0 && FLAGS_benchmark) { + iobuf_benchmark = IOBuf::create(benchmark_size); + iobuf_benchmark->append(benchmark_size); + + iobuf_read_benchmark = IOBuf::create(1); + for (int i = 0; i < benchmark_size; i++) { + unique_ptr iobuf2(IOBuf::create(1)); + iobuf2->append(1); + iobuf_read_benchmark->prependChain(std::move(iobuf2)); + } + + folly::runBenchmarks(); + } + + return ret; +} diff --git a/folly/io/test/IOBufQueueTest.cpp b/folly/io/test/IOBufQueueTest.cpp new file mode 100644 index 00000000..d1a5ba30 --- /dev/null +++ b/folly/io/test/IOBufQueueTest.cpp @@ -0,0 +1,274 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/io/IOBufQueue.h" +#include "folly/Range.h" + +#include +#include + +#include +#include +#include + +using folly::IOBuf; +using folly::IOBufQueue; +using folly::StringPiece; +using std::pair; +using std::string; +using std::unique_ptr; + +// String Comma Length macro for string literals +#define SCL(x) (x), sizeof(x) - 1 + +namespace { + +IOBufQueue::Options clOptions; +struct Initializer { + Initializer() { + clOptions.cacheChainLength = true; + } +}; +Initializer initializer; + +unique_ptr +stringToIOBuf(const char* s, uint32_t len) { + unique_ptr buf = IOBuf::create(len); + memcpy(buf->writableTail(), s, len); + buf->append(len); + return std::move(buf); +} + +void checkConsistency(const IOBufQueue& queue) { + if (queue.options().cacheChainLength) { + size_t len = queue.front() ? queue.front()->computeChainDataLength() : 0; + EXPECT_EQ(len, queue.chainLength()); + } +} + +} + +TEST(IOBufQueue, Simple) { + IOBufQueue queue(clOptions); + EXPECT_EQ(NULL, queue.front()); + queue.append(SCL("")); + EXPECT_EQ(NULL, queue.front()); + queue.append(unique_ptr()); + EXPECT_EQ(NULL, queue.front()); + string emptyString; + queue.append(emptyString); + EXPECT_EQ(NULL, queue.front()); +} + +TEST(IOBufQueue, Append) { + IOBufQueue queue(clOptions); + queue.append(SCL("Hello")); + IOBufQueue queue2(clOptions); + queue2.append(SCL(", ")); + queue2.append(SCL("World")); + checkConsistency(queue); + checkConsistency(queue2); + queue.append(queue2.move()); + checkConsistency(queue); + checkConsistency(queue2); + const IOBuf* chain = queue.front(); + EXPECT_NE((IOBuf*)NULL, chain); + EXPECT_EQ(12, chain->computeChainDataLength()); + EXPECT_EQ(NULL, queue2.front()); +} + +TEST(IOBufQueue, Append2) { + IOBufQueue queue(clOptions); + queue.append(SCL("Hello")); + IOBufQueue queue2(clOptions); + queue2.append(SCL(", ")); + queue2.append(SCL("World")); + checkConsistency(queue); + checkConsistency(queue2); + queue.append(queue2); + checkConsistency(queue); + checkConsistency(queue2); + const IOBuf* chain = queue.front(); + EXPECT_NE((IOBuf*)NULL, chain); + EXPECT_EQ(12, chain->computeChainDataLength()); + EXPECT_EQ(NULL, queue2.front()); +} + +TEST(IOBufQueue, Split) { + IOBufQueue queue(clOptions); + queue.append(stringToIOBuf(SCL("Hello"))); + queue.append(stringToIOBuf(SCL(","))); + queue.append(stringToIOBuf(SCL(" "))); + queue.append(stringToIOBuf(SCL(""))); + queue.append(stringToIOBuf(SCL("World"))); + checkConsistency(queue); + EXPECT_EQ(12, queue.front()->computeChainDataLength()); + + unique_ptr prefix(queue.split(1)); + checkConsistency(queue); + EXPECT_EQ(1, prefix->computeChainDataLength()); + EXPECT_EQ(11, queue.front()->computeChainDataLength()); + prefix = queue.split(2); + checkConsistency(queue); + EXPECT_EQ(2, prefix->computeChainDataLength()); + EXPECT_EQ(9, queue.front()->computeChainDataLength()); + prefix = queue.split(3); + checkConsistency(queue); + EXPECT_EQ(3, prefix->computeChainDataLength()); + EXPECT_EQ(6, queue.front()->computeChainDataLength()); + prefix = queue.split(1); + checkConsistency(queue); + EXPECT_EQ(1, prefix->computeChainDataLength()); + EXPECT_EQ(5, queue.front()->computeChainDataLength()); + prefix = queue.split(5); + checkConsistency(queue); + EXPECT_EQ(5, prefix->computeChainDataLength()); + EXPECT_EQ((IOBuf*)NULL, queue.front()); + + queue.append(stringToIOBuf(SCL("Hello,"))); + queue.append(stringToIOBuf(SCL(" World"))); + checkConsistency(queue); + bool exceptionFired = false; + EXPECT_THROW({prefix = queue.split(13);}, std::underflow_error); + checkConsistency(queue); +} + +TEST(IOBufQueue, Preallocate) { + IOBufQueue queue(clOptions); + queue.append(string("Hello")); + pair writable = queue.preallocate(2, 64, 64); + checkConsistency(queue); + EXPECT_NE((void*)NULL, writable.first); + EXPECT_LE(2, writable.second); + EXPECT_GE(64, writable.second); + memcpy(writable.first, SCL(", ")); + queue.postallocate(2); + checkConsistency(queue); + EXPECT_EQ(7, queue.front()->computeChainDataLength()); + queue.append(SCL("World")); + checkConsistency(queue); + EXPECT_EQ(12, queue.front()->computeChainDataLength()); + // There are not 2048 bytes available, this will alloc a new buf + writable = queue.preallocate(2048, 4096); + checkConsistency(queue); + EXPECT_LE(2048, writable.second); + // IOBuf allocates more than newAllocationSize, and we didn't cap it + EXPECT_GE(writable.second, 4096); + queue.postallocate(writable.second); + // queue has no empty space, make sure we allocate at least min, even if + // newAllocationSize < min + writable = queue.preallocate(1024, 1, 1024); + checkConsistency(queue); + EXPECT_EQ(1024, writable.second); +} + +TEST(IOBufQueue, Wrap) { + IOBufQueue queue(clOptions); + const char* buf = "hello world goodbye"; + size_t len = strlen(buf); + queue.wrapBuffer(buf, len, 6); + auto iob = queue.move(); + EXPECT_EQ((len - 1) / 6 + 1, iob->countChainElements()); + iob->unshare(); + iob->coalesce(); + EXPECT_EQ(StringPiece(buf), + StringPiece(reinterpret_cast(iob->data()), + iob->length())); +} + +TEST(IOBufQueue, trim) { + IOBufQueue queue(clOptions); + unique_ptr a = IOBuf::create(4); + a->append(4); + queue.append(std::move(a)); + checkConsistency(queue); + a = IOBuf::create(6); + a->append(6); + queue.append(std::move(a)); + checkConsistency(queue); + a = IOBuf::create(8); + a->append(8); + queue.append(std::move(a)); + checkConsistency(queue); + a = IOBuf::create(10); + a->append(10); + queue.append(std::move(a)); + checkConsistency(queue); + + EXPECT_EQ(4, queue.front()->countChainElements()); + EXPECT_EQ(28, queue.front()->computeChainDataLength()); + EXPECT_EQ(4, queue.front()->length()); + + queue.trimStart(1); + checkConsistency(queue); + EXPECT_EQ(4, queue.front()->countChainElements()); + EXPECT_EQ(27, queue.front()->computeChainDataLength()); + EXPECT_EQ(3, queue.front()->length()); + + queue.trimStart(5); + checkConsistency(queue); + EXPECT_EQ(3, queue.front()->countChainElements()); + EXPECT_EQ(22, queue.front()->computeChainDataLength()); + EXPECT_EQ(4, queue.front()->length()); + + queue.trimEnd(1); + checkConsistency(queue); + EXPECT_EQ(3, queue.front()->countChainElements()); + EXPECT_EQ(21, queue.front()->computeChainDataLength()); + EXPECT_EQ(9, queue.front()->prev()->length()); + + queue.trimEnd(20); + checkConsistency(queue); + EXPECT_EQ(1, queue.front()->countChainElements()); + EXPECT_EQ(1, queue.front()->computeChainDataLength()); + EXPECT_EQ(1, queue.front()->prev()->length()); + + queue.trimEnd(1); + checkConsistency(queue); + EXPECT_EQ(NULL, queue.front()); + + EXPECT_THROW(queue.trimStart(2), std::underflow_error); + checkConsistency(queue); + + EXPECT_THROW(queue.trimEnd(30), std::underflow_error); + checkConsistency(queue); +} + +TEST(IOBufQueue, Prepend) { + folly::IOBufQueue queue; + + auto buf = folly::IOBuf::create(10); + buf->advance(5); + queue.append(std::move(buf)); + + queue.append(SCL(" World")); + queue.prepend(SCL("Hello")); + + EXPECT_THROW(queue.prepend(SCL("x")), std::overflow_error); + + auto out = queue.move(); + out->coalesce(); + EXPECT_EQ("Hello World", + StringPiece(reinterpret_cast(out->data()), + out->length())); +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + + return RUN_ALL_TESTS(); +} diff --git a/folly/io/test/IOBufTest.cpp b/folly/io/test/IOBufTest.cpp new file mode 100644 index 00000000..3bb86cf1 --- /dev/null +++ b/folly/io/test/IOBufTest.cpp @@ -0,0 +1,772 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/io/IOBuf.h" +#include "folly/io/TypedIOBuf.h" + +// googletest requires std::tr1::tuple, not std::tuple +#include + +#include +#include +#include + +#include "folly/Malloc.h" +#include "folly/Range.h" + +using folly::fbstring; +using folly::IOBuf; +using folly::TypedIOBuf; +using folly::StringPiece; +using folly::ByteRange; +using std::unique_ptr; + +void append(std::unique_ptr& buf, StringPiece str) { + EXPECT_LE(str.size(), buf->tailroom()); + memcpy(buf->writableData(), str.data(), str.size()); + buf->append(str.size()); +} + +void prepend(std::unique_ptr& buf, StringPiece str) { + EXPECT_LE(str.size(), buf->headroom()); + memcpy(buf->writableData() - str.size(), str.data(), str.size()); + buf->prepend(str.size()); +} + +TEST(IOBuf, Simple) { + unique_ptr buf(IOBuf::create(100)); + uint32_t cap = buf->capacity(); + EXPECT_LE(100, cap); + EXPECT_EQ(0, buf->headroom()); + EXPECT_EQ(0, buf->length()); + EXPECT_EQ(cap, buf->tailroom()); + + append(buf, "world"); + buf->advance(10); + EXPECT_EQ(10, buf->headroom()); + EXPECT_EQ(5, buf->length()); + EXPECT_EQ(cap - 15, buf->tailroom()); + + prepend(buf, "hello "); + EXPECT_EQ(4, buf->headroom()); + EXPECT_EQ(11, buf->length()); + EXPECT_EQ(cap - 15, buf->tailroom()); + + const char* p = reinterpret_cast(buf->data()); + EXPECT_EQ("hello world", std::string(p, buf->length())); + + buf->clear(); + EXPECT_EQ(0, buf->headroom()); + EXPECT_EQ(0, buf->length()); + EXPECT_EQ(cap, buf->tailroom()); +} + + +void testAllocSize(uint32_t requestedCapacity) { + unique_ptr iobuf(IOBuf::create(requestedCapacity)); + EXPECT_GE(iobuf->capacity(), requestedCapacity); +} + +TEST(IOBuf, AllocSizes) { + // Try with a small allocation size that should fit in the internal buffer + testAllocSize(28); + + // Try with a large allocation size that will require an external buffer. + testAllocSize(9000); + + // 220 bytes is currently the cutoff + // (It would be nice to use the IOBuf::kMaxInternalDataSize constant, + // but it's private and it doesn't seem worth making it public just for this + // test code.) + testAllocSize(220); + testAllocSize(219); + testAllocSize(221); +} + +void deleteArrayBuffer(void *buf, void* arg) { + uint32_t* deleteCount = static_cast(arg); + ++(*deleteCount); + uint8_t* bufPtr = static_cast(buf); + delete[] bufPtr; +} + +TEST(IOBuf, TakeOwnership) { + uint32_t size1 = 99; + uint8_t *buf1 = static_cast(malloc(size1)); + unique_ptr iobuf1(IOBuf::takeOwnership(buf1, size1)); + EXPECT_EQ(buf1, iobuf1->data()); + EXPECT_EQ(size1, iobuf1->length()); + EXPECT_EQ(buf1, iobuf1->buffer()); + EXPECT_EQ(size1, iobuf1->capacity()); + + uint32_t deleteCount = 0; + uint32_t size2 = 4321; + uint8_t *buf2 = new uint8_t[size2]; + unique_ptr iobuf2(IOBuf::takeOwnership(buf2, size2, + deleteArrayBuffer, + &deleteCount)); + EXPECT_EQ(buf2, iobuf2->data()); + EXPECT_EQ(size2, iobuf2->length()); + EXPECT_EQ(buf2, iobuf2->buffer()); + EXPECT_EQ(size2, iobuf2->capacity()); + EXPECT_EQ(0, deleteCount); + iobuf2.reset(); + EXPECT_EQ(1, deleteCount); + + deleteCount = 0; + uint32_t size3 = 3456; + uint8_t *buf3 = new uint8_t[size3]; + uint32_t length3 = 48; + unique_ptr iobuf3(IOBuf::takeOwnership(buf3, size3, length3, + deleteArrayBuffer, + &deleteCount)); + EXPECT_EQ(buf3, iobuf3->data()); + EXPECT_EQ(length3, iobuf3->length()); + EXPECT_EQ(buf3, iobuf3->buffer()); + EXPECT_EQ(size3, iobuf3->capacity()); + EXPECT_EQ(0, deleteCount); + iobuf3.reset(); + EXPECT_EQ(1, deleteCount); + + +} + +TEST(IOBuf, WrapBuffer) { + const uint32_t size1 = 1234; + uint8_t buf1[size1]; + unique_ptr iobuf1(IOBuf::wrapBuffer(buf1, size1)); + EXPECT_EQ(buf1, iobuf1->data()); + EXPECT_EQ(size1, iobuf1->length()); + EXPECT_EQ(buf1, iobuf1->buffer()); + EXPECT_EQ(size1, iobuf1->capacity()); + + uint32_t size2 = 0x1234; + unique_ptr buf2(new uint8_t[size2]); + unique_ptr iobuf2(IOBuf::wrapBuffer(buf2.get(), size2)); + EXPECT_EQ(buf2.get(), iobuf2->data()); + EXPECT_EQ(size2, iobuf2->length()); + EXPECT_EQ(buf2.get(), iobuf2->buffer()); + EXPECT_EQ(size2, iobuf2->capacity()); +} + +void fillBuf(uint8_t* buf, uint32_t length, boost::mt19937& gen) { + for (uint32_t n = 0; n < length; ++n) { + buf[n] = static_cast(gen() & 0xff); + } +} + +void fillBuf(IOBuf* buf, boost::mt19937& gen) { + buf->unshare(); + fillBuf(buf->writableData(), buf->length(), gen); +} + +void checkBuf(const uint8_t* buf, uint32_t length, boost::mt19937& gen) { + // Rather than using EXPECT_EQ() to check each character, + // count the number of differences and the first character that differs. + // This way on error we'll report just that information, rather than tons of + // failed checks for each byte in the buffer. + uint32_t numDifferences = 0; + uint32_t firstDiffIndex = 0; + uint8_t firstDiffExpected = 0; + for (uint32_t n = 0; n < length; ++n) { + uint8_t expected = static_cast(gen() & 0xff); + if (buf[n] == expected) { + continue; + } + + if (numDifferences == 0) { + firstDiffIndex = n; + firstDiffExpected = expected; + } + ++numDifferences; + } + + EXPECT_EQ(0, numDifferences); + if (numDifferences > 0) { + // Cast to int so it will be printed numerically + // rather than as a char if the check fails + EXPECT_EQ(static_cast(buf[firstDiffIndex]), + static_cast(firstDiffExpected)); + } +} + +void checkBuf(IOBuf* buf, boost::mt19937& gen) { + checkBuf(buf->data(), buf->length(), gen); +} + +void checkBuf(ByteRange buf, boost::mt19937& gen) { + checkBuf(buf.data(), buf.size(), gen); +} + +void checkChain(IOBuf* buf, boost::mt19937& gen) { + IOBuf *current = buf; + do { + checkBuf(current->data(), current->length(), gen); + current = current->next(); + } while (current != buf); +} + +TEST(IOBuf, Chaining) { + uint32_t fillSeed = 0x12345678; + boost::mt19937 gen(fillSeed); + + // An IOBuf with external storage + uint32_t headroom = 123; + unique_ptr iob1(IOBuf::create(2048)); + iob1->advance(headroom); + iob1->append(1500); + fillBuf(iob1.get(), gen); + + // An IOBuf with internal storage + unique_ptr iob2(IOBuf::create(20)); + iob2->append(20); + fillBuf(iob2.get(), gen); + + // An IOBuf around a buffer it doesn't own + uint8_t localbuf[1234]; + fillBuf(localbuf, 1234, gen); + unique_ptr iob3(IOBuf::wrapBuffer(localbuf, sizeof(localbuf))); + + // An IOBuf taking ownership of a user-supplied buffer + uint32_t heapBufSize = 900; + uint8_t* heapBuf = static_cast(malloc(heapBufSize)); + fillBuf(heapBuf, heapBufSize, gen); + unique_ptr iob4(IOBuf::takeOwnership(heapBuf, heapBufSize)); + + // An IOBuf taking ownership of a user-supplied buffer with + // a custom free function + uint32_t arrayBufSize = 321; + uint8_t* arrayBuf = new uint8_t[arrayBufSize]; + fillBuf(arrayBuf, arrayBufSize, gen); + uint32_t arrayBufFreeCount = 0; + unique_ptr iob5(IOBuf::takeOwnership(arrayBuf, arrayBufSize, + deleteArrayBuffer, + &arrayBufFreeCount)); + + EXPECT_FALSE(iob1->isChained()); + EXPECT_FALSE(iob2->isChained()); + EXPECT_FALSE(iob3->isChained()); + EXPECT_FALSE(iob4->isChained()); + EXPECT_FALSE(iob5->isChained()); + + EXPECT_FALSE(iob1->isSharedOne()); + EXPECT_FALSE(iob2->isSharedOne()); + EXPECT_TRUE(iob3->isSharedOne()); // since we own the buffer + EXPECT_FALSE(iob4->isSharedOne()); + EXPECT_FALSE(iob5->isSharedOne()); + + // Chain the buffers all together + // Since we are going to relinquish ownership of iob2-5 to the chain, + // store raw pointers to them so we can reference them later. + IOBuf* iob2ptr = iob2.get(); + IOBuf* iob3ptr = iob3.get(); + IOBuf* iob4ptr = iob4.get(); + IOBuf* iob5ptr = iob5.get(); + + iob1->prependChain(std::move(iob2)); + iob1->prependChain(std::move(iob4)); + iob2ptr->appendChain(std::move(iob3)); + iob1->prependChain(std::move(iob5)); + + EXPECT_EQ(iob2ptr, iob1->next()); + EXPECT_EQ(iob3ptr, iob2ptr->next()); + EXPECT_EQ(iob4ptr, iob3ptr->next()); + EXPECT_EQ(iob5ptr, iob4ptr->next()); + EXPECT_EQ(iob1.get(), iob5ptr->next()); + + EXPECT_EQ(iob5ptr, iob1->prev()); + EXPECT_EQ(iob1.get(), iob2ptr->prev()); + EXPECT_EQ(iob2ptr, iob3ptr->prev()); + EXPECT_EQ(iob3ptr, iob4ptr->prev()); + EXPECT_EQ(iob4ptr, iob5ptr->prev()); + + EXPECT_TRUE(iob1->isChained()); + EXPECT_TRUE(iob2ptr->isChained()); + EXPECT_TRUE(iob3ptr->isChained()); + EXPECT_TRUE(iob4ptr->isChained()); + EXPECT_TRUE(iob5ptr->isChained()); + + uint64_t fullLength = (iob1->length() + iob2ptr->length() + + iob3ptr->length() + iob4ptr->length() + + iob5ptr->length()); + EXPECT_EQ(5, iob1->countChainElements()); + EXPECT_EQ(fullLength, iob1->computeChainDataLength()); + + // Since iob3 is shared, the entire buffer should report itself as shared + EXPECT_TRUE(iob1->isShared()); + // Unshare just iob3 + iob3ptr->unshareOne(); + EXPECT_FALSE(iob3ptr->isSharedOne()); + // Now everything in the chain should be unshared. + // Check on all members of the chain just for good measure + EXPECT_FALSE(iob1->isShared()); + EXPECT_FALSE(iob2ptr->isShared()); + EXPECT_FALSE(iob3ptr->isShared()); + EXPECT_FALSE(iob4ptr->isShared()); + EXPECT_FALSE(iob5ptr->isShared()); + + // Check iteration + gen.seed(fillSeed); + size_t count = 0; + for (auto buf : *iob1) { + checkBuf(buf, gen); + ++count; + } + EXPECT_EQ(5, count); + + // Clone one of the IOBufs in the chain + unique_ptr iob4clone = iob4ptr->cloneOne(); + gen.seed(fillSeed); + checkBuf(iob1.get(), gen); + checkBuf(iob2ptr, gen); + checkBuf(iob3ptr, gen); + checkBuf(iob4clone.get(), gen); + checkBuf(iob5ptr, gen); + + EXPECT_TRUE(iob1->isShared()); + EXPECT_TRUE(iob2ptr->isShared()); + EXPECT_TRUE(iob3ptr->isShared()); + EXPECT_TRUE(iob4ptr->isShared()); + EXPECT_TRUE(iob5ptr->isShared()); + + EXPECT_FALSE(iob1->isSharedOne()); + EXPECT_FALSE(iob2ptr->isSharedOne()); + EXPECT_FALSE(iob3ptr->isSharedOne()); + EXPECT_TRUE(iob4ptr->isSharedOne()); + EXPECT_FALSE(iob5ptr->isSharedOne()); + + // Unshare that clone + EXPECT_TRUE(iob4clone->isSharedOne()); + iob4clone->unshare(); + EXPECT_FALSE(iob4clone->isSharedOne()); + EXPECT_FALSE(iob4ptr->isSharedOne()); + EXPECT_FALSE(iob1->isShared()); + iob4clone.reset(); + + + // Create a clone of a different IOBuf + EXPECT_FALSE(iob1->isShared()); + EXPECT_FALSE(iob3ptr->isSharedOne()); + + unique_ptr iob3clone = iob3ptr->cloneOne(); + gen.seed(fillSeed); + checkBuf(iob1.get(), gen); + checkBuf(iob2ptr, gen); + checkBuf(iob3clone.get(), gen); + checkBuf(iob4ptr, gen); + checkBuf(iob5ptr, gen); + + EXPECT_TRUE(iob1->isShared()); + EXPECT_TRUE(iob3ptr->isSharedOne()); + EXPECT_FALSE(iob1->isSharedOne()); + + // Delete the clone and make sure the original is unshared + iob3clone.reset(); + EXPECT_FALSE(iob1->isShared()); + EXPECT_FALSE(iob3ptr->isSharedOne()); + + + // Clone the entire chain + unique_ptr chainClone = iob1->clone(); + // Verify that the data is correct. + EXPECT_EQ(fullLength, chainClone->computeChainDataLength()); + gen.seed(fillSeed); + checkChain(chainClone.get(), gen); + + // Check that the buffers report sharing correctly + EXPECT_TRUE(chainClone->isShared()); + EXPECT_TRUE(iob1->isShared()); + + EXPECT_TRUE(iob1->isSharedOne()); + // since iob2 has a small internal buffer, it will never be shared + EXPECT_FALSE(iob2ptr->isSharedOne()); + EXPECT_TRUE(iob3ptr->isSharedOne()); + EXPECT_TRUE(iob4ptr->isSharedOne()); + EXPECT_TRUE(iob5ptr->isSharedOne()); + + // Unshare the cloned chain + chainClone->unshare(); + EXPECT_FALSE(chainClone->isShared()); + EXPECT_FALSE(iob1->isShared()); + + // Make sure the unshared result still has the same data + EXPECT_EQ(fullLength, chainClone->computeChainDataLength()); + gen.seed(fillSeed); + checkChain(chainClone.get(), gen); + + // Destroy this chain + chainClone.reset(); + + + // Clone a new chain + EXPECT_FALSE(iob1->isShared()); + chainClone = iob1->clone(); + EXPECT_TRUE(iob1->isShared()); + EXPECT_TRUE(chainClone->isShared()); + + // Delete the original chain + iob1.reset(); + EXPECT_FALSE(chainClone->isShared()); + + // Coalesce the chain + // + // Coalescing this chain will create a new buffer and release the last + // refcount on the original buffers we created. Also make sure + // that arrayBufFreeCount increases to one to indicate that arrayBuf was + // freed. + EXPECT_EQ(5, chainClone->countChainElements()); + EXPECT_EQ(0, arrayBufFreeCount); + + // Buffer lengths: 1500 20 1234 900 321 + // Coalesce the first 3 buffers + chainClone->gather(1521); + EXPECT_EQ(3, chainClone->countChainElements()); + EXPECT_EQ(0, arrayBufFreeCount); + + // Make sure the data is still the same after coalescing + EXPECT_EQ(fullLength, chainClone->computeChainDataLength()); + gen.seed(fillSeed); + checkChain(chainClone.get(), gen); + + // Coalesce the entire chain + chainClone->coalesce(); + EXPECT_EQ(1, chainClone->countChainElements()); + EXPECT_EQ(1, arrayBufFreeCount); + + // Make sure the data is still the same after coalescing + EXPECT_EQ(fullLength, chainClone->computeChainDataLength()); + gen.seed(fillSeed); + checkChain(chainClone.get(), gen); + + // Make a new chain to test the unlink and pop operations + iob1 = IOBuf::create(1); + iob1->append(1); + IOBuf *iob1ptr = iob1.get(); + iob2 = IOBuf::create(3); + iob2->append(3); + iob2ptr = iob2.get(); + iob3 = IOBuf::create(5); + iob3->append(5); + iob3ptr = iob3.get(); + iob4 = IOBuf::create(7); + iob4->append(7); + iob4ptr = iob4.get(); + iob1->appendChain(std::move(iob2)); + iob1->prev()->appendChain(std::move(iob3)); + iob1->prev()->appendChain(std::move(iob4)); + EXPECT_EQ(4, iob1->countChainElements()); + EXPECT_EQ(16, iob1->computeChainDataLength()); + + // Unlink from the middle of the chain + iob3 = iob3ptr->unlink(); + EXPECT_TRUE(iob3.get() == iob3ptr); + EXPECT_EQ(3, iob1->countChainElements()); + EXPECT_EQ(11, iob1->computeChainDataLength()); + + // Unlink from the end of the chain + iob4 = iob1->prev()->unlink(); + EXPECT_TRUE(iob4.get() == iob4ptr); + EXPECT_EQ(2, iob1->countChainElements()); + EXPECT_TRUE(iob1->next() == iob2ptr); + EXPECT_EQ(4, iob1->computeChainDataLength()); + + // Pop from the front of the chain + iob2 = iob1->pop(); + EXPECT_TRUE(iob1.get() == iob1ptr); + EXPECT_EQ(1, iob1->countChainElements()); + EXPECT_EQ(1, iob1->computeChainDataLength()); + EXPECT_TRUE(iob2.get() == iob2ptr); + EXPECT_EQ(1, iob2->countChainElements()); + EXPECT_EQ(3, iob2->computeChainDataLength()); +} + +TEST(IOBuf, Reserve) { + uint32_t fillSeed = 0x23456789; + boost::mt19937 gen(fillSeed); + + // Reserve does nothing if empty and doesn't have to grow the buffer + { + gen.seed(fillSeed); + unique_ptr iob(IOBuf::create(2000)); + EXPECT_EQ(0, iob->headroom()); + const void* p1 = iob->buffer(); + iob->reserve(5, 15); + EXPECT_LE(5, iob->headroom()); + EXPECT_EQ(p1, iob->buffer()); + } + + // Reserve doesn't reallocate if we have enough total room + { + gen.seed(fillSeed); + unique_ptr iob(IOBuf::create(2000)); + iob->append(100); + fillBuf(iob.get(), gen); + EXPECT_EQ(0, iob->headroom()); + EXPECT_EQ(100, iob->length()); + const void* p1 = iob->buffer(); + const uint8_t* d1 = iob->data(); + iob->reserve(100, 1800); + EXPECT_LE(100, iob->headroom()); + EXPECT_EQ(p1, iob->buffer()); + EXPECT_EQ(d1 + 100, iob->data()); + gen.seed(fillSeed); + checkBuf(iob.get(), gen); + } + + // Reserve reallocates if we don't have enough total room. + // NOTE that, with jemalloc, we know that this won't reallocate in place + // as the size is less than jemallocMinInPlaceExpanadable + { + gen.seed(fillSeed); + unique_ptr iob(IOBuf::create(2000)); + iob->append(100); + fillBuf(iob.get(), gen); + EXPECT_EQ(0, iob->headroom()); + EXPECT_EQ(100, iob->length()); + const void* p1 = iob->buffer(); + const uint8_t* d1 = iob->data(); + iob->reserve(100, 2512); // allocation sizes are multiples of 256 + EXPECT_LE(100, iob->headroom()); + if (folly::usingJEMalloc()) { + EXPECT_NE(p1, iob->buffer()); + } + gen.seed(fillSeed); + checkBuf(iob.get(), gen); + } + + // Test reserve from internal buffer, this used to segfault + { + unique_ptr iob(IOBuf::create(0)); + iob->reserve(0, 2000); + EXPECT_EQ(0, iob->headroom()); + EXPECT_LE(2000, iob->tailroom()); + } +} + +TEST(IOBuf, copyBuffer) { + std::string s("hello"); + auto buf = IOBuf::copyBuffer(s.data(), s.size(), 1, 2); + EXPECT_EQ(1, buf->headroom()); + EXPECT_EQ(s, std::string(reinterpret_cast(buf->data()), + buf->length())); + EXPECT_LE(2, buf->tailroom()); + + buf = IOBuf::copyBuffer(s, 5, 7); + EXPECT_EQ(5, buf->headroom()); + EXPECT_EQ(s, std::string(reinterpret_cast(buf->data()), + buf->length())); + EXPECT_LE(7, buf->tailroom()); + + std::string empty; + buf = IOBuf::copyBuffer(empty, 3, 6); + EXPECT_EQ(3, buf->headroom()); + EXPECT_EQ(0, buf->length()); + EXPECT_LE(6, buf->tailroom()); +} + +TEST(IOBuf, maybeCopyBuffer) { + std::string s("this is a test"); + auto buf = IOBuf::maybeCopyBuffer(s, 1, 2); + EXPECT_EQ(1, buf->headroom()); + EXPECT_EQ(s, std::string(reinterpret_cast(buf->data()), + buf->length())); + EXPECT_LE(2, buf->tailroom()); + + std::string empty; + buf = IOBuf::maybeCopyBuffer("", 5, 7); + EXPECT_EQ(nullptr, buf.get()); + + buf = IOBuf::maybeCopyBuffer(""); + EXPECT_EQ(nullptr, buf.get()); +} + +namespace { + +int customDeleterCount = 0; +int destructorCount = 0; +struct OwnershipTestClass { + explicit OwnershipTestClass(int v = 0) : val(v) { } + ~OwnershipTestClass() { + ++destructorCount; + } + int val; +}; + +typedef std::function CustomDeleter; + +void customDelete(OwnershipTestClass* p) { + ++customDeleterCount; + delete p; +} + +void customDeleteArray(OwnershipTestClass* p) { + ++customDeleterCount; + delete[] p; +} + +} // namespace + +TEST(IOBuf, takeOwnershipUniquePtr) { + destructorCount = 0; + { + std::unique_ptr p(new OwnershipTestClass()); + } + EXPECT_EQ(1, destructorCount); + + destructorCount = 0; + { + std::unique_ptr p(new OwnershipTestClass[2]); + } + EXPECT_EQ(2, destructorCount); + + destructorCount = 0; + { + std::unique_ptr p(new OwnershipTestClass()); + std::unique_ptr buf(IOBuf::takeOwnership(std::move(p))); + EXPECT_EQ(sizeof(OwnershipTestClass), buf->length()); + EXPECT_EQ(0, destructorCount); + } + EXPECT_EQ(1, destructorCount); + + destructorCount = 0; + { + std::unique_ptr p(new OwnershipTestClass[2]); + std::unique_ptr buf(IOBuf::takeOwnership(std::move(p), 2)); + EXPECT_EQ(2 * sizeof(OwnershipTestClass), buf->length()); + EXPECT_EQ(0, destructorCount); + } + EXPECT_EQ(2, destructorCount); + + customDeleterCount = 0; + destructorCount = 0; + { + std::unique_ptr + p(new OwnershipTestClass(), customDelete); + std::unique_ptr buf(IOBuf::takeOwnership(std::move(p))); + EXPECT_EQ(sizeof(OwnershipTestClass), buf->length()); + EXPECT_EQ(0, destructorCount); + } + EXPECT_EQ(1, destructorCount); + EXPECT_EQ(1, customDeleterCount); + + customDeleterCount = 0; + destructorCount = 0; + { + std::unique_ptr + p(new OwnershipTestClass[2], customDeleteArray); + std::unique_ptr buf(IOBuf::takeOwnership(std::move(p), 2)); + EXPECT_EQ(2 * sizeof(OwnershipTestClass), buf->length()); + EXPECT_EQ(0, destructorCount); + } + EXPECT_EQ(2, destructorCount); + EXPECT_EQ(1, customDeleterCount); +} + +TEST(IOBuf, Alignment) { + // max_align_t doesn't exist in gcc 4.6.2 + struct MaxAlign { + char c; + } __attribute__((aligned)); + size_t alignment = alignof(MaxAlign); + + std::vector sizes {0, 1, 64, 256, 1024, 1 << 10}; + for (size_t size : sizes) { + auto buf = IOBuf::create(size); + uintptr_t p = reinterpret_cast(buf->data()); + EXPECT_EQ(0, p & (alignment - 1)) << "size=" << size; + } +} + +TEST(TypedIOBuf, Simple) { + auto buf = IOBuf::create(0); + TypedIOBuf typed(buf.get()); + const uint64_t n = 10000; + typed.reserve(0, n); + EXPECT_LE(n, typed.capacity()); + for (uint64_t i = 0; i < n; i++) { + *typed.writableTail() = i; + typed.append(1); + } + EXPECT_EQ(n, typed.length()); + for (uint64_t i = 0; i < n; i++) { + EXPECT_EQ(i, typed.data()[i]); + } +} + +// chain element size, number of elements in chain, shared +class MoveToFbStringTest + : public ::testing::TestWithParam> { + protected: + void SetUp() { + std::tr1::tie(elementSize_, elementCount_, shared_) = GetParam(); + buf_ = makeBuf(); + for (int i = 0; i < elementCount_ - 1; ++i) { + buf_->prependChain(makeBuf()); + } + EXPECT_EQ(elementCount_, buf_->countChainElements()); + EXPECT_EQ(elementCount_ * elementSize_, buf_->computeChainDataLength()); + if (shared_) { + buf2_ = buf_->clone(); + EXPECT_EQ(elementCount_, buf2_->countChainElements()); + EXPECT_EQ(elementCount_ * elementSize_, buf2_->computeChainDataLength()); + } + } + + std::unique_ptr makeBuf() { + auto buf = IOBuf::create(elementSize_); + memset(buf->writableTail(), 'x', elementSize_); + buf->append(elementSize_); + return buf; + } + + void check(std::unique_ptr& buf) { + fbstring str = buf->moveToFbString(); + EXPECT_EQ(elementCount_ * elementSize_, str.size()); + EXPECT_EQ(elementCount_ * elementSize_, strspn(str.c_str(), "x")); + EXPECT_EQ(0, buf->length()); + EXPECT_EQ(1, buf->countChainElements()); + EXPECT_EQ(0, buf->computeChainDataLength()); + EXPECT_FALSE(buf->isChained()); + } + + int elementSize_; + int elementCount_; + bool shared_; + std::unique_ptr buf_; + std::unique_ptr buf2_; +}; + +TEST_P(MoveToFbStringTest, Simple) { + check(buf_); + if (shared_) { + check(buf2_); + } +} + +INSTANTIATE_TEST_CASE_P( + MoveToFbString, + MoveToFbStringTest, + ::testing::Combine( + ::testing::Values(0, 1, 24, 256, 1 << 10, 1 << 20), // element size + ::testing::Values(1, 2, 10), // element count + ::testing::Bool())); // shared + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + + return RUN_ALL_TESTS(); +} diff --git a/folly/io/test/NetworkBenchmark.cpp b/folly/io/test/NetworkBenchmark.cpp new file mode 100644 index 00000000..ccba6f3e --- /dev/null +++ b/folly/io/test/NetworkBenchmark.cpp @@ -0,0 +1,172 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/io/IOBuf.h" + +#include +#include "folly/Benchmark.h" +#include "folly/io/Cursor.h" + +#include + +using folly::IOBuf; +using std::unique_ptr; +using namespace folly::io; +using namespace std; + +size_t buf_size = 0; +size_t num_bufs = 0; + +BENCHMARK(reserveBenchmark, iters) { + while (--iters) { + unique_ptr iobuf1(IOBuf::create(buf_size)); + iobuf1->append(buf_size); + for (size_t bufs = num_bufs; bufs > 1; bufs --) { + iobuf1->reserve(0, buf_size); + iobuf1->append(buf_size); + } + } +} + +BENCHMARK(chainBenchmark, iters) { + while (--iters) { + unique_ptr iobuf1(IOBuf::create(buf_size)); + iobuf1->append(buf_size); + for (size_t bufs = num_bufs; bufs > 1; bufs --) { + unique_ptr iobufNext(IOBuf::create(buf_size)); + iobuf1->prependChain(std::move(iobufNext)); + } + } +} + +vector> bufPool; +inline unique_ptr poolGetIOBuf() { + if (bufPool.size() > 0) { + unique_ptr ret = std::move(bufPool.back()); + bufPool.pop_back(); + return std::move(ret); + } else { + unique_ptr iobuf(IOBuf::create(buf_size)); + iobuf->append(buf_size); + return std::move(iobuf); + } +} + +inline void poolPutIOBuf(unique_ptr&& buf) { + unique_ptr head = std::move(buf); + while (head) { + unique_ptr next = std::move(head->pop()); + bufPool.push_back(std::move(head)); + head = std::move(next); + } +} + +BENCHMARK(poolBenchmark, iters) { + while (--iters) { + unique_ptr head = std::move(poolGetIOBuf()); + for (size_t bufs = num_bufs; bufs > 1; bufs --) { + unique_ptr iobufNext = std::move(poolGetIOBuf()); + head->prependChain(std::move(iobufNext)); + } + // cleanup + poolPutIOBuf(std::move(head)); + } +} + +void setNumbers(size_t size, size_t num) { + buf_size = size; + num_bufs = num; + bufPool.clear(); + + printf("\nBuffer size: %zu, number of buffers: %zu\n\n", size, num); +} + +/* +------------------------------------------------------------------------------ +reserveBenchmark 100000 9.186 ms 91.86 ns 10.38 M +chainBenchmark 100000 59.44 ms 594.4 ns 1.604 M +poolBenchmark 100000 15.87 ms 158.7 ns 6.01 M + +Buffer size: 100, number of buffers: 10 + +Benchmark Iters Total t t/iter iter/sec +------------------------------------------------------------------------------ +reserveBenchmark 100000 62 ms 620 ns 1.538 M +chainBenchmark 100000 59.48 ms 594.8 ns 1.603 M +poolBenchmark 100000 16.07 ms 160.7 ns 5.933 M + +Buffer size: 2048, number of buffers: 10 + +Benchmark Iters Total t t/iter iter/sec +------------------------------------------------------------------------------ +reserveBenchmark 100000 148.4 ms 1.484 us 658.2 k +chainBenchmark 100000 140.9 ms 1.409 us 693 k +poolBenchmark 100000 16.73 ms 167.3 ns 5.7 M + +Buffer size: 10000, number of buffers: 10 + +Benchmark Iters Total t t/iter iter/sec +------------------------------------------------------------------------------ +reserveBenchmark 100000 234 ms 2.34 us 417.3 k +chainBenchmark 100000 142.3 ms 1.423 us 686.1 k +poolBenchmark 100000 16.78 ms 167.8 ns 5.684 M + +Buffer size: 100000, number of buffers: 10 + +Benchmark Iters Total t t/iter iter/sec +------------------------------------------------------------------------------ +reserveBenchmark 100000 186.5 ms 1.865 us 523.5 k +chainBenchmark 100000 360.5 ms 3.605 us 270.9 k +poolBenchmark 100000 16.52 ms 165.2 ns 5.772 M + +Buffer size: 1000000, number of buffers: 10 + +Benchmark Iters Total t t/iter iter/sec +------------------------------------------------------------------------------ +reserveBenchmark 156 2.084 s 13.36 ms 74.84 +chainBenchmark 30082 2.001 s 66.5 us 14.68 k +poolBenchmark 100000 18.18 ms 181.8 ns 5.244 M + + +Buffer size: 10, number of buffers: 20 + +Benchmark Iters Total t t/iter iter/sec +------------------------------------------------------------------------------ +reserveBenchmark 100000 12.54 ms 125.4 ns 7.603 M +chainBenchmark 100000 118.6 ms 1.186 us 823.2 k +poolBenchmark 100000 32.2 ms 322 ns 2.962 M +*/ +int main(int argc, char** argv) { + google::ParseCommandLineFlags(&argc, &argv, true); + + setNumbers(10, 10); + folly::runBenchmarks(); + setNumbers(100, 10); + folly::runBenchmarks(); + setNumbers(2048, 10); + folly::runBenchmarks(); + setNumbers(10000, 10); + folly::runBenchmarks(); + setNumbers(100000, 10); + folly::runBenchmarks(); + setNumbers(1000000, 10); + folly::runBenchmarks(); + + setNumbers(10, 20); + folly::runBenchmarks(); + + return 0; +}