X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2FIOBuf.cpp;h=5698606106d38da2ddb6734ebc8517254b916d86;hb=6e9044430269068cc997e4077a366b5a1628c3af;hp=5bab8716c9ce0722865dc4c6396d6990af5ef056;hpb=3d5106cd605ed237a5625dd8d45223075a57bcae;p=folly.git diff --git a/folly/io/IOBuf.cpp b/folly/io/IOBuf.cpp index 5bab8716..56986061 100644 --- a/folly/io/IOBuf.cpp +++ b/folly/io/IOBuf.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2013 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,12 +14,19 @@ * limitations under the License. */ +#ifndef __STDC_LIMIT_MACROS #define __STDC_LIMIT_MACROS +#endif -#include "folly/io/IOBuf.h" +#include -#include "folly/Malloc.h" -#include "folly/Likely.h" +#include +#include +#include +#include +#include +#include +#include #include #include @@ -28,20 +35,91 @@ using std::unique_ptr; +namespace { + +enum : uint16_t { + kHeapMagic = 0xa5a5, + // This memory segment contains an IOBuf that is still in use + kIOBufInUse = 0x01, + // This memory segment contains buffer data that is still in use + kDataInUse = 0x02, +}; + +enum : uint64_t { + // When create() is called for buffers less than kDefaultCombinedBufSize, + // we allocate a single combined memory segment for the IOBuf and the data + // together. See the comments for createCombined()/createSeparate() for more + // details. + // + // (The size of 1k is largely just a guess here. We could could probably do + // benchmarks of real applications to see if adjusting this number makes a + // difference. Callers that know their exact use case can also explicitly + // call createCombined() or createSeparate().) + kDefaultCombinedBufSize = 1024 +}; + +// Helper function for IOBuf::takeOwnership() +void takeOwnershipError(bool freeOnError, void* buf, + folly::IOBuf::FreeFunction freeFn, + void* userData) { + if (!freeOnError) { + return; + } + if (!freeFn) { + free(buf); + return; + } + try { + freeFn(buf, userData); + } catch (...) { + // The user's free function is not allowed to throw. + // (We are already in the middle of throwing an exception, so + // we cannot let this exception go unhandled.) + abort(); + } +} + +} // unnamed namespace + namespace folly { -const uint32_t IOBuf::kMaxIOBufSize; -// Note: Applying offsetof() to an IOBuf is legal according to C++11, since -// IOBuf is a standard-layout class. However, this isn't legal with earlier -// C++ standards, which require that offsetof() only be used with POD types. -// -// This code compiles with g++ 4.6, but not with g++ 4.4 or earlier versions. -const uint32_t IOBuf::kMaxInternalDataSize = - kMaxIOBufSize - offsetof(folly::IOBuf, int_.buf); +struct IOBuf::HeapPrefix { + HeapPrefix(uint16_t flg) + : magic(kHeapMagic), + flags(flg) {} + ~HeapPrefix() { + // Reset magic to 0 on destruction. This is solely for debugging purposes + // to help catch bugs where someone tries to use HeapStorage after it has + // been deleted. + magic = 0; + } + + uint16_t magic; + std::atomic flags; +}; + +struct IOBuf::HeapStorage { + HeapPrefix prefix; + // The IOBuf is last in the HeapStorage object. + // This way operator new will work even if allocating a subclass of IOBuf + // that requires more space. + folly::IOBuf buf; +}; + +struct IOBuf::HeapFullStorage { + // Make sure jemalloc allocates from the 64-byte class. Putting this here + // because HeapStorage is private so it can't be at namespace level. + static_assert(sizeof(HeapStorage) <= 64, + "IOBuf may not grow over 56 bytes!"); + + HeapStorage hs; + SharedInfo shared; + std::max_align_t align; +}; IOBuf::SharedInfo::SharedInfo() - : freeFn(NULL), - userData(NULL) { + : freeFn(nullptr), + userData(nullptr) { // Use relaxed memory ordering here. Since we are creating a new SharedInfo, // no other threads should be referring to it yet. refcount.store(1, std::memory_order_relaxed); @@ -56,68 +134,135 @@ IOBuf::SharedInfo::SharedInfo(FreeFunction fn, void* arg) } void* IOBuf::operator new(size_t size) { - // Since IOBuf::create() manually allocates space for some IOBuf objects - // using malloc(), override operator new so that all IOBuf objects are - // always allocated using malloc(). This way operator delete can always know - // that free() is the correct way to deallocate the memory. - void* ptr = malloc(size); - - // operator new is not allowed to return NULL - if (UNLIKELY(ptr == NULL)) { + size_t fullSize = offsetof(HeapStorage, buf) + size; + auto* storage = static_cast(malloc(fullSize)); + // operator new is not allowed to return nullptr + if (UNLIKELY(storage == nullptr)) { throw std::bad_alloc(); } - return ptr; + new (&storage->prefix) HeapPrefix(kIOBufInUse); + return &(storage->buf); } -void* IOBuf::operator new(size_t size, void* ptr) { - assert(size <= kMaxIOBufSize); - return ptr; -} +void* IOBuf::operator new(size_t /* size */, void* ptr) { return ptr; } void IOBuf::operator delete(void* ptr) { - // For small buffers, IOBuf::create() manually allocates the space for the - // IOBuf object using malloc(). Therefore we override delete to ensure that - // the IOBuf space is freed using free() rather than a normal delete. - free(ptr); -} - -unique_ptr IOBuf::create(uint32_t capacity) { - // If the desired capacity is less than kMaxInternalDataSize, - // just allocate a single region large enough for both the IOBuf header and - // the data. - if (capacity <= kMaxInternalDataSize) { - void* buf = malloc(kMaxIOBufSize); - if (UNLIKELY(buf == NULL)) { - throw std::bad_alloc(); + auto* storageAddr = static_cast(ptr) - offsetof(HeapStorage, buf); + auto* storage = reinterpret_cast(storageAddr); + releaseStorage(storage, kIOBufInUse); +} + +void IOBuf::releaseStorage(HeapStorage* storage, uint16_t freeFlags) { + CHECK_EQ(storage->prefix.magic, static_cast(kHeapMagic)); + + // Use relaxed memory order here. If we are unlucky and happen to get + // out-of-date data the compare_exchange_weak() call below will catch + // it and load new data with memory_order_acq_rel. + auto flags = storage->prefix.flags.load(std::memory_order_acquire); + DCHECK_EQ((flags & freeFlags), freeFlags); + + while (true) { + uint16_t newFlags = uint16_t(flags & ~freeFlags); + if (newFlags == 0) { + // The storage space is now unused. Free it. + storage->prefix.HeapPrefix::~HeapPrefix(); + free(storage); + return; + } + + // This storage segment still contains portions that are in use. + // Just clear the flags specified in freeFlags for now. + auto ret = storage->prefix.flags.compare_exchange_weak( + flags, newFlags, std::memory_order_acq_rel); + if (ret) { + // We successfully updated the flags. + return; } - uint8_t* bufEnd = static_cast(buf) + kMaxIOBufSize; - unique_ptr iobuf(new(buf) IOBuf(bufEnd)); - assert(iobuf->capacity() >= capacity); - return iobuf; + // We failed to update the flags. Some other thread probably updated them + // and cleared some of the other bits. Continue around the loop to see if + // we are the last user now, or if we need to try updating the flags again. } +} - // Allocate an external buffer - uint8_t* buf; - SharedInfo* sharedInfo; - uint32_t actualCapacity; - allocExtBuffer(capacity, &buf, &sharedInfo, &actualCapacity); +void IOBuf::freeInternalBuf(void* /* buf */, void* userData) { + auto* storage = static_cast(userData); + releaseStorage(storage, kDataInUse); +} - // Allocate the IOBuf header - try { - return unique_ptr(new IOBuf(kExtAllocated, 0, - buf, actualCapacity, - buf, 0, - sharedInfo)); - } catch (...) { - free(buf); - throw; +IOBuf::IOBuf(CreateOp, uint64_t capacity) + : next_(this), + prev_(this), + data_(nullptr), + length_(0), + flagsAndSharedInfo_(0) { + SharedInfo* info; + allocExtBuffer(capacity, &buf_, &info, &capacity_); + setSharedInfo(info); + data_ = buf_; +} + +IOBuf::IOBuf(CopyBufferOp /* op */, + const void* buf, + uint64_t size, + uint64_t headroom, + uint64_t minTailroom) + : IOBuf(CREATE, headroom + size + minTailroom) { + advance(headroom); + if (size > 0) { + assert(buf != nullptr); + memcpy(writableData(), buf, size); + append(size); + } +} + +IOBuf::IOBuf(CopyBufferOp op, ByteRange br, + uint64_t headroom, uint64_t minTailroom) + : IOBuf(op, br.data(), br.size(), headroom, minTailroom) { +} + +unique_ptr IOBuf::create(uint64_t capacity) { + // For smaller-sized buffers, allocate the IOBuf, SharedInfo, and the buffer + // all with a single allocation. + // + // We don't do this for larger buffers since it can be wasteful if the user + // needs to reallocate the buffer but keeps using the same IOBuf object. + // In this case we can't free the data space until the IOBuf is also + // destroyed. Callers can explicitly call createCombined() or + // createSeparate() if they know their use case better, and know if they are + // likely to reallocate the buffer later. + if (capacity <= kDefaultCombinedBufSize) { + return createCombined(capacity); } + return createSeparate(capacity); +} + +unique_ptr IOBuf::createCombined(uint64_t capacity) { + // To save a memory allocation, allocate space for the IOBuf object, the + // SharedInfo struct, and the data itself all with a single call to malloc(). + size_t requiredStorage = offsetof(HeapFullStorage, align) + capacity; + size_t mallocSize = goodMallocSize(requiredStorage); + auto* storage = static_cast(malloc(mallocSize)); + + new (&storage->hs.prefix) HeapPrefix(kIOBufInUse | kDataInUse); + new (&storage->shared) SharedInfo(freeInternalBuf, storage); + + uint8_t* bufAddr = reinterpret_cast(&storage->align); + uint8_t* storageEnd = reinterpret_cast(storage) + mallocSize; + size_t actualCapacity = size_t(storageEnd - bufAddr); + unique_ptr ret(new (&storage->hs.buf) IOBuf( + InternalConstructor(), packFlagsAndSharedInfo(0, &storage->shared), + bufAddr, actualCapacity, bufAddr, 0)); + return ret; +} + +unique_ptr IOBuf::createSeparate(uint64_t capacity) { + return std::make_unique(CREATE, capacity); } unique_ptr IOBuf::createChain( - size_t totalCapacity, uint32_t maxBufCapacity) { + size_t totalCapacity, uint64_t maxBufCapacity) { unique_ptr out = create( std::min(totalCapacity, size_t(maxBufCapacity))); size_t allocatedCapacity = out->capacity(); @@ -132,80 +277,120 @@ unique_ptr IOBuf::createChain( return out; } -unique_ptr IOBuf::takeOwnership(void* buf, uint32_t capacity, - uint32_t length, +IOBuf::IOBuf(TakeOwnershipOp, void* buf, uint64_t capacity, uint64_t length, + FreeFunction freeFn, void* userData, + bool freeOnError) + : next_(this), + prev_(this), + data_(static_cast(buf)), + buf_(static_cast(buf)), + length_(length), + capacity_(capacity), + flagsAndSharedInfo_(packFlagsAndSharedInfo(kFlagFreeSharedInfo, nullptr)) { + try { + setSharedInfo(new SharedInfo(freeFn, userData)); + } catch (...) { + takeOwnershipError(freeOnError, buf, freeFn, userData); + throw; + } +} + +unique_ptr IOBuf::takeOwnership(void* buf, uint64_t capacity, + uint64_t length, FreeFunction freeFn, void* userData, bool freeOnError) { - SharedInfo* sharedInfo = NULL; try { - sharedInfo = new SharedInfo(freeFn, userData); - - uint8_t* bufPtr = static_cast(buf); - return unique_ptr(new IOBuf(kExtUserSupplied, kFlagFreeSharedInfo, - bufPtr, capacity, - bufPtr, length, - sharedInfo)); + // TODO: We could allocate the IOBuf object and SharedInfo all in a single + // memory allocation. We could use the existing HeapStorage class, and + // define a new kSharedInfoInUse flag. We could change our code to call + // releaseStorage(kFlagFreeSharedInfo) when this kFlagFreeSharedInfo, + // rather than directly calling delete. + // + // Note that we always pass freeOnError as false to the constructor. + // If the constructor throws we'll handle it below. (We have to handle + // allocation failures from std::make_unique too.) + return std::make_unique( + TAKE_OWNERSHIP, buf, capacity, length, freeFn, userData, false); } catch (...) { - delete sharedInfo; - if (freeOnError) { - if (freeFn) { - try { - freeFn(buf, userData); - } catch (...) { - // The user's free function is not allowed to throw. - abort(); - } - } else { - free(buf); - } - } + takeOwnershipError(freeOnError, buf, freeFn, userData); throw; } } -unique_ptr IOBuf::wrapBuffer(const void* buf, uint32_t capacity) { - // We cast away the const-ness of the buffer here. - // This is okay since IOBuf users must use unshare() to create a copy of - // this buffer before writing to the buffer. - uint8_t* bufPtr = static_cast(const_cast(buf)); - return unique_ptr(new IOBuf(kExtUserSupplied, kFlagUserOwned, - bufPtr, capacity, - bufPtr, capacity, - NULL)); +IOBuf::IOBuf(WrapBufferOp, const void* buf, uint64_t capacity) + : IOBuf(InternalConstructor(), 0, + // We cast away the const-ness of the buffer here. + // This is okay since IOBuf users must use unshare() to create a copy + // of this buffer before writing to the buffer. + static_cast(const_cast(buf)), capacity, + static_cast(const_cast(buf)), capacity) { } -IOBuf::IOBuf(uint8_t* end) - : next_(this), - prev_(this), - data_(int_.buf), - length_(0), - flags_(0) { - assert(end - int_.buf == kMaxInternalDataSize); - assert(end - reinterpret_cast(this) == kMaxIOBufSize); +IOBuf::IOBuf(WrapBufferOp op, ByteRange br) + : IOBuf(op, br.data(), br.size()) { +} + +unique_ptr IOBuf::wrapBuffer(const void* buf, uint64_t capacity) { + return std::make_unique(WRAP_BUFFER, buf, capacity); +} + +IOBuf IOBuf::wrapBufferAsValue(const void* buf, uint64_t capacity) { + return IOBuf(WrapBufferOp::WRAP_BUFFER, buf, capacity); } -IOBuf::IOBuf(ExtBufTypeEnum type, - uint32_t flags, +IOBuf::IOBuf() noexcept { +} + +IOBuf::IOBuf(IOBuf&& other) noexcept + : data_(other.data_), + buf_(other.buf_), + length_(other.length_), + capacity_(other.capacity_), + flagsAndSharedInfo_(other.flagsAndSharedInfo_) { + // Reset other so it is a clean state to be destroyed. + other.data_ = nullptr; + other.buf_ = nullptr; + other.length_ = 0; + other.capacity_ = 0; + other.flagsAndSharedInfo_ = 0; + + // If other was part of the chain, assume ownership of the rest of its chain. + // (It's only valid to perform move assignment on the head of a chain.) + if (other.next_ != &other) { + next_ = other.next_; + next_->prev_ = this; + other.next_ = &other; + + prev_ = other.prev_; + prev_->next_ = this; + other.prev_ = &other; + } + + // Sanity check to make sure that other is in a valid state to be destroyed. + DCHECK_EQ(other.prev_, &other); + DCHECK_EQ(other.next_, &other); +} + +IOBuf::IOBuf(const IOBuf& other) { + *this = other.cloneAsValue(); +} + +IOBuf::IOBuf(InternalConstructor, + uintptr_t flagsAndSharedInfo, uint8_t* buf, - uint32_t capacity, + uint64_t capacity, uint8_t* data, - uint32_t length, - SharedInfo* sharedInfo) + uint64_t length) : next_(this), prev_(this), data_(data), + buf_(buf), length_(length), - flags_(kFlagExt | flags) { - ext_.capacity = capacity; - ext_.type = type; - ext_.buf = buf; - ext_.sharedInfo = sharedInfo; - + capacity_(capacity), + flagsAndSharedInfo_(flagsAndSharedInfo) { assert(data >= buf); assert(data + length <= buf + capacity); - assert(static_cast(flags & kFlagUserOwned) == - (sharedInfo == NULL)); } IOBuf::~IOBuf() { @@ -218,9 +403,61 @@ IOBuf::~IOBuf() { (void)next_->unlink(); } - if (flags_ & kFlagExt) { - decrementRefcount(); + decrementRefcount(); +} + +IOBuf& IOBuf::operator=(IOBuf&& other) noexcept { + if (this == &other) { + return *this; + } + + // If we are part of a chain, delete the rest of the chain. + while (next_ != this) { + // Since unlink() returns unique_ptr() and we don't store it, + // it will automatically delete the unlinked element. + (void)next_->unlink(); + } + + // Decrement our refcount on the current buffer + decrementRefcount(); + + // Take ownership of the other buffer's data + data_ = other.data_; + buf_ = other.buf_; + length_ = other.length_; + capacity_ = other.capacity_; + flagsAndSharedInfo_ = other.flagsAndSharedInfo_; + // Reset other so it is a clean state to be destroyed. + other.data_ = nullptr; + other.buf_ = nullptr; + other.length_ = 0; + other.capacity_ = 0; + other.flagsAndSharedInfo_ = 0; + + // If other was part of the chain, assume ownership of the rest of its chain. + // (It's only valid to perform move assignment on the head of a chain.) + if (other.next_ != &other) { + next_ = other.next_; + next_->prev_ = this; + other.next_ = &other; + + prev_ = other.prev_; + prev_->next_ = this; + other.prev_ = &other; + } + + // Sanity check to make sure that other is in a valid state to be destroyed. + DCHECK_EQ(other.prev_, &other); + DCHECK_EQ(other.next_, &other); + + return *this; +} + +IOBuf& IOBuf::operator=(const IOBuf& other) { + if (this != &other) { + *this = IOBuf(other); } + return *this; } bool IOBuf::empty() const { @@ -234,8 +471,8 @@ bool IOBuf::empty() const { return true; } -uint32_t IOBuf::countChainElements() const { - uint32_t numElements = 1; +size_t IOBuf::countChainElements() const { + size_t numElements = 1; for (IOBuf* current = next_; current != this; current = current->next_) { ++numElements; } @@ -269,67 +506,95 @@ void IOBuf::prependChain(unique_ptr&& iobuf) { } unique_ptr IOBuf::clone() const { - unique_ptr newHead(cloneOne()); + return std::make_unique(cloneAsValue()); +} + +unique_ptr IOBuf::cloneOne() const { + return std::make_unique(cloneOneAsValue()); +} + +unique_ptr IOBuf::cloneCoalesced() const { + return std::make_unique(cloneCoalescedAsValue()); +} + +IOBuf IOBuf::cloneAsValue() const { + auto tmp = cloneOneAsValue(); for (IOBuf* current = next_; current != this; current = current->next_) { - newHead->prependChain(current->cloneOne()); + tmp.prependChain(current->cloneOne()); } - return newHead; + return tmp; } -unique_ptr IOBuf::cloneOne() const { - if (flags_ & kFlagExt) { - if (ext_.sharedInfo) { - flags_ |= kFlagMaybeShared; - } - unique_ptr iobuf(new IOBuf(static_cast(ext_.type), - flags_, ext_.buf, ext_.capacity, - data_, length_, - ext_.sharedInfo)); - if (ext_.sharedInfo) { - ext_.sharedInfo->refcount.fetch_add(1, std::memory_order_acq_rel); - } - return iobuf; - } else { - // We have an internal data buffer that cannot be shared - // Allocate a new IOBuf and copy the data into it. - unique_ptr iobuf(IOBuf::create(kMaxInternalDataSize)); - assert((iobuf->flags_ & kFlagExt) == 0); - iobuf->data_ += headroom(); - memcpy(iobuf->data_, data_, length_); - iobuf->length_ = length_; - return iobuf; +IOBuf IOBuf::cloneOneAsValue() const { + if (SharedInfo* info = sharedInfo()) { + setFlags(kFlagMaybeShared); + info->refcount.fetch_add(1, std::memory_order_acq_rel); } + return IOBuf( + InternalConstructor(), + flagsAndSharedInfo_, + buf_, + capacity_, + data_, + length_); } -void IOBuf::unshareOneSlow() { - // Internal buffers are always unshared, so unshareOneSlow() can only be - // called for external buffers - assert(flags_ & kFlagExt); +IOBuf IOBuf::cloneCoalescedAsValue() const { + if (!isChained()) { + return cloneOneAsValue(); + } + // Coalesce into newBuf + const uint64_t newLength = computeChainDataLength(); + const uint64_t newHeadroom = headroom(); + const uint64_t newTailroom = prev()->tailroom(); + const uint64_t newCapacity = newLength + newHeadroom + newTailroom; + IOBuf newBuf{CREATE, newCapacity}; + newBuf.advance(newHeadroom); + + auto current = this; + do { + if (current->length() > 0) { + DCHECK_NOTNULL(current->data()); + DCHECK_LE(current->length(), newBuf.tailroom()); + memcpy(newBuf.writableTail(), current->data(), current->length()); + newBuf.append(current->length()); + } + current = current->next(); + } while (current != this); + + DCHECK_EQ(newLength, newBuf.length()); + DCHECK_EQ(newHeadroom, newBuf.headroom()); + DCHECK_LE(newTailroom, newBuf.tailroom()); + return newBuf; +} + +void IOBuf::unshareOneSlow() { // Allocate a new buffer for the data uint8_t* buf; SharedInfo* sharedInfo; - uint32_t actualCapacity; - allocExtBuffer(ext_.capacity, &buf, &sharedInfo, &actualCapacity); + uint64_t actualCapacity; + allocExtBuffer(capacity_, &buf, &sharedInfo, &actualCapacity); // Copy the data // Maintain the same amount of headroom. Since we maintained the same // minimum capacity we also maintain at least the same amount of tailroom. - uint32_t headlen = headroom(); - memcpy(buf + headlen, data_, length_); + uint64_t headlen = headroom(); + if (length_ > 0) { + assert(data_ != nullptr); + memcpy(buf + headlen, data_, length_); + } // Release our reference on the old buffer decrementRefcount(); - // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo - // are not set. - flags_ = kFlagExt; + // Make sure kFlagMaybeShared and kFlagFreeSharedInfo are all cleared. + setFlagsAndSharedInfo(0, sharedInfo); // Update the buffer pointers to point to the new buffer data_ = buf + headlen; - ext_.buf = buf; - ext_.sharedInfo = sharedInfo; + buf_ = buf; } void IOBuf::unshareChained() { @@ -356,11 +621,31 @@ void IOBuf::unshareChained() { coalesceSlow(); } -void IOBuf::coalesceSlow(size_t maxLength) { +void IOBuf::markExternallyShared() { + IOBuf* current = this; + do { + current->markExternallySharedOne(); + current = current->next_; + } while (current != this); +} + +void IOBuf::makeManagedChained() { + assert(isChained()); + + IOBuf* current = this; + while (true) { + current->makeManagedOne(); + current = current->next_; + if (current == this) { + break; + } + } +} + +void IOBuf::coalesceSlow() { // coalesceSlow() should only be called if we are part of a chain of multiple // IOBufs. The caller should have already verified this. - assert(isChained()); - assert(length_ < maxLength); + DCHECK(isChained()); // Compute the length of the entire chain uint64_t newLength = 0; @@ -368,13 +653,37 @@ void IOBuf::coalesceSlow(size_t maxLength) { do { newLength += end->length_; end = end->next_; - } while (newLength < maxLength && end != this); + } while (end != this); - uint64_t newHeadroom = headroom(); - uint64_t newTailroom = end->prev_->tailroom(); - coalesceAndReallocate(newHeadroom, newLength, end, newTailroom); + coalesceAndReallocate(newLength, end); // We should be only element left in the chain now - assert(length_ >= maxLength || !isChained()); + DCHECK(!isChained()); +} + +void IOBuf::coalesceSlow(size_t maxLength) { + // coalesceSlow() should only be called if we are part of a chain of multiple + // IOBufs. The caller should have already verified this. + DCHECK(isChained()); + DCHECK_LT(length_, maxLength); + + // Compute the length of the entire chain + uint64_t newLength = 0; + IOBuf* end = this; + while (true) { + newLength += end->length_; + end = end->next_; + if (newLength >= maxLength) { + break; + } + if (end == this) { + throw std::overflow_error("attempted to coalesce more data than " + "available"); + } + } + + coalesceAndReallocate(newLength, end); + // We should have the requested length now + DCHECK_GE(length_, maxLength); } void IOBuf::coalesceAndReallocate(size_t newHeadroom, @@ -382,16 +691,13 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, IOBuf* end, size_t newTailroom) { uint64_t newCapacity = newLength + newHeadroom + newTailroom; - if (newCapacity > UINT32_MAX) { - throw std::overflow_error("IOBuf chain too large to coalesce"); - } // Allocate space for the coalesced buffer. // We always convert to an external buffer, even if we happened to be an // internal buffer before. uint8_t* newBuf; SharedInfo* newInfo; - uint32_t actualCapacity; + uint64_t actualCapacity; allocExtBuffer(newCapacity, &newBuf, &newInfo, &actualCapacity); // Copy the data into the new buffer @@ -400,27 +706,25 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, IOBuf* current = this; size_t remaining = newLength; do { - assert(current->length_ <= remaining); - remaining -= current->length_; - memcpy(p, current->data_, current->length_); - p += current->length_; + if (current->length_ > 0) { + assert(current->length_ <= remaining); + assert(current->data_ != nullptr); + remaining -= current->length_; + memcpy(p, current->data_, current->length_); + p += current->length_; + } current = current->next_; } while (current != end); assert(remaining == 0); // Point at the new buffer - if (flags_ & kFlagExt) { - decrementRefcount(); - } + decrementRefcount(); - // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo - // are not set. - flags_ = kFlagExt; + // Make sure kFlagMaybeShared and kFlagFreeSharedInfo are all cleared. + setFlagsAndSharedInfo(0, newInfo); - ext_.capacity = actualCapacity; - ext_.type = kExtAllocated; - ext_.buf = newBuf; - ext_.sharedInfo = newInfo; + capacity_ = actualCapacity; + buf_ = newBuf; data_ = newData; length_ = newLength; @@ -433,17 +737,15 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, } void IOBuf::decrementRefcount() { - assert(flags_ & kFlagExt); - // Externally owned buffers don't have a SharedInfo object and aren't managed // by the reference count - if (flags_ & kFlagUserOwned) { - assert(ext_.sharedInfo == NULL); + SharedInfo* info = sharedInfo(); + if (!info) { return; } // Decrement the refcount - uint32_t newcnt = ext_.sharedInfo->refcount.fetch_sub( + uint32_t newcnt = info->refcount.fetch_sub( 1, std::memory_order_acq_rel); // Note that fetch_sub() returns the value before we decremented. // If it is 1, we were the only remaining user; if it is greater there are @@ -465,12 +767,12 @@ void IOBuf::decrementRefcount() { // takeOwnership() store the user's free function with its allocated // SharedInfo object.) However, handling this specially with a flag seems // like it shouldn't be problematic. - if (flags_ & kFlagFreeSharedInfo) { - delete ext_.sharedInfo; + if (flags() & kFlagFreeSharedInfo) { + delete sharedInfo(); } } -void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { +void IOBuf::reserveSlow(uint64_t minHeadroom, uint64_t minTailroom) { size_t newCapacity = (size_t)length_ + minHeadroom + minTailroom; DCHECK_LT(newCapacity, UINT32_MAX); @@ -500,45 +802,39 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { return; } - size_t newAllocatedCapacity = goodExtBufferSize(newCapacity); + size_t newAllocatedCapacity = 0; uint8_t* newBuffer = nullptr; - uint32_t newHeadroom = 0; - uint32_t oldHeadroom = headroom(); + uint64_t newHeadroom = 0; + uint64_t oldHeadroom = headroom(); // If we have a buffer allocated with malloc and we just need more tailroom, - // try to use realloc()/rallocm() to grow the buffer in place. - if ((flags_ & (kFlagExt | kFlagUserOwned)) == kFlagExt && - (ext_.sharedInfo->freeFn == nullptr) && - length_ != 0 && oldHeadroom >= minHeadroom) { + // try to use realloc()/xallocx() to grow the buffer in place. + SharedInfo* info = sharedInfo(); + if (info && (info->freeFn == nullptr) && length_ != 0 && + oldHeadroom >= minHeadroom) { + size_t headSlack = oldHeadroom - minHeadroom; + newAllocatedCapacity = goodExtBufferSize(newCapacity + headSlack); if (usingJEMalloc()) { - size_t headSlack = oldHeadroom - minHeadroom; // We assume that tailroom is more useful and more important than - // headroom (not least because realloc / rallocm allow us to grow the + // headroom (not least because realloc / xallocx allow us to grow the // buffer at the tail, but not at the head) So, if we have more headroom // than we need, we consider that "wasted". We arbitrarily define "too // much" headroom to be 25% of the capacity. if (headSlack * 4 <= newCapacity) { size_t allocatedCapacity = capacity() + sizeof(SharedInfo); - void* p = ext_.buf; + void* p = buf_; if (allocatedCapacity >= jemallocMinInPlaceExpandable) { - int r = rallocm(&p, &newAllocatedCapacity, newAllocatedCapacity, - 0, ALLOCM_NO_MOVE); - if (r == ALLOCM_SUCCESS) { + if (xallocx(p, newAllocatedCapacity, 0, 0) == newAllocatedCapacity) { newBuffer = static_cast(p); newHeadroom = oldHeadroom; - } else if (r == ALLOCM_ERR_OOM) { - // shouldn't happen as we don't actually allocate new memory - // (due to ALLOCM_NO_MOVE) - throw std::bad_alloc(); } - // if ALLOCM_ERR_NOT_MOVED, do nothing, fall back to - // malloc/memcpy/free + // if xallocx failed, do nothing, fall back to malloc/memcpy/free } } } else { // Not using jemalloc size_t copySlack = capacity() - length_; if (copySlack * 2 <= length_) { - void* p = realloc(ext_.buf, newAllocatedCapacity); + void* p = realloc(buf_, newAllocatedCapacity); if (UNLIKELY(p == nullptr)) { throw std::bad_alloc(); } @@ -551,41 +847,43 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { // None of the previous reallocation strategies worked (or we're using // an internal buffer). malloc/copy/free. if (newBuffer == nullptr) { + newAllocatedCapacity = goodExtBufferSize(newCapacity); void* p = malloc(newAllocatedCapacity); if (UNLIKELY(p == nullptr)) { throw std::bad_alloc(); } newBuffer = static_cast(p); - memcpy(newBuffer + minHeadroom, data_, length_); - if ((flags_ & (kFlagExt | kFlagUserOwned)) == kFlagExt) { + if (length_ > 0) { + assert(data_ != nullptr); + memcpy(newBuffer + minHeadroom, data_, length_); + } + if (sharedInfo()) { freeExtBuffer(); } newHeadroom = minHeadroom; } - SharedInfo* info; - uint32_t cap; + uint64_t cap; initExtBuffer(newBuffer, newAllocatedCapacity, &info, &cap); - if (flags_ & kFlagFreeSharedInfo) { - delete ext_.sharedInfo; + if (flags() & kFlagFreeSharedInfo) { + delete sharedInfo(); } - flags_ = kFlagExt; - ext_.capacity = cap; - ext_.type = kExtAllocated; - ext_.buf = newBuffer; - ext_.sharedInfo = info; + setFlagsAndSharedInfo(0, info); + capacity_ = cap; + buf_ = newBuffer; data_ = newBuffer + newHeadroom; // length_ is unchanged } void IOBuf::freeExtBuffer() { - DCHECK((flags_ & (kFlagExt | kFlagUserOwned)) == kFlagExt); + SharedInfo* info = sharedInfo(); + DCHECK(info); - if (ext_.sharedInfo->freeFn) { + if (info->freeFn) { try { - ext_.sharedInfo->freeFn(ext_.buf, ext_.sharedInfo->userData); + info->freeFn(buf_, info->userData); } catch (...) { // The user's free function should never throw. Otherwise we might // throw from the IOBuf destructor. Other code paths like coalesce() @@ -593,28 +891,28 @@ void IOBuf::freeExtBuffer() { abort(); } } else { - free(ext_.buf); + free(buf_); } } -void IOBuf::allocExtBuffer(uint32_t minCapacity, +void IOBuf::allocExtBuffer(uint64_t minCapacity, uint8_t** bufReturn, SharedInfo** infoReturn, - uint32_t* capacityReturn) { + uint64_t* capacityReturn) { size_t mallocSize = goodExtBufferSize(minCapacity); uint8_t* buf = static_cast(malloc(mallocSize)); - if (UNLIKELY(buf == NULL)) { + if (UNLIKELY(buf == nullptr)) { throw std::bad_alloc(); } initExtBuffer(buf, mallocSize, infoReturn, capacityReturn); *bufReturn = buf; } -size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) { +size_t IOBuf::goodExtBufferSize(uint64_t minCapacity) { // Determine how much space we should allocate. We'll store the SharedInfo // for the external buffer just after the buffer itself. (We store it just // after the buffer rather than just before so that the code can still just - // use free(ext_.buf) to free the buffer.) + // use free(buf_) to free the buffer.) size_t minSize = static_cast(minCapacity) + sizeof(SharedInfo); // Add room for padding so that the SharedInfo will be aligned on an 8-byte // boundary. @@ -628,33 +926,25 @@ size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) { void IOBuf::initExtBuffer(uint8_t* buf, size_t mallocSize, SharedInfo** infoReturn, - uint32_t* capacityReturn) { + uint64_t* capacityReturn) { // Find the SharedInfo storage at the end of the buffer // and construct the SharedInfo. uint8_t* infoStart = (buf + mallocSize) - sizeof(SharedInfo); SharedInfo* sharedInfo = new(infoStart) SharedInfo; - size_t actualCapacity = infoStart - buf; - // On the unlikely possibility that the actual capacity is larger than can - // fit in a uint32_t after adding room for the refcount and calling - // goodMallocSize(), truncate downwards if necessary. - if (actualCapacity >= UINT32_MAX) { - *capacityReturn = UINT32_MAX; - } else { - *capacityReturn = actualCapacity; - } - + *capacityReturn = uint64_t(infoStart - buf); *infoReturn = sharedInfo; } fbstring IOBuf::moveToFbString() { - // Externally allocated buffers (malloc) are just fine, everything else needs + // malloc-allocated buffers are just fine, everything else needs // to be turned into one. - if (flags_ != kFlagExt || // not malloc()-ed - headroom() != 0 || // malloc()-ed block doesn't start at beginning - tailroom() == 0 || // no room for NUL terminator - isShared() || // shared - isChained()) { // chained + if (!sharedInfo() || // user owned, not ours to give up + sharedInfo()->freeFn || // not malloc()-ed + headroom() != 0 || // malloc()-ed block doesn't start at beginning + tailroom() == 0 || // no room for NUL terminator + isShared() || // shared + isChained()) { // chained // We might as well get rid of all head and tailroom if we're going // to reallocate; we need 1 byte for NUL terminator. coalesceAndReallocate(0, computeChainDataLength(), this, 1); @@ -666,8 +956,13 @@ fbstring IOBuf::moveToFbString() { length(), capacity(), AcquireMallocatedString()); - // Reset to internal buffer. - flags_ = 0; + if (flags() & kFlagFreeSharedInfo) { + delete sharedInfo(); + } + + // Reset to a state where we can be deleted cleanly + flagsAndSharedInfo_ = 0; + buf_ = nullptr; clear(); return str; } @@ -683,15 +978,76 @@ IOBuf::Iterator IOBuf::cend() const { folly::fbvector IOBuf::getIov() const { folly::fbvector iov; iov.reserve(countChainElements()); + appendToIov(&iov); + return iov; +} + +void IOBuf::appendToIov(folly::fbvector* iov) const { IOBuf const* p = this; do { // some code can get confused by empty iovs, so skip them if (p->length() > 0) { - iov.push_back({(void*)p->data(), p->length()}); + iov->push_back({(void*)p->data(), folly::to(p->length())}); } p = p->next(); } while (p != this); - return iov; +} + +size_t IOBuf::fillIov(struct iovec* iov, size_t len) const { + IOBuf const* p = this; + size_t i = 0; + while (i < len) { + // some code can get confused by empty iovs, so skip them + if (p->length() > 0) { + iov[i].iov_base = const_cast(p->data()); + iov[i].iov_len = p->length(); + i++; + } + p = p->next(); + if (p == this) { + return i; + } + } + return 0; +} + +size_t IOBufHash::operator()(const IOBuf& buf) const { + folly::hash::SpookyHashV2 hasher; + hasher.Init(0, 0); + io::Cursor cursor(&buf); + for (;;) { + auto b = cursor.peekBytes(); + if (b.empty()) { + break; + } + hasher.Update(b.data(), b.size()); + cursor.skip(b.size()); + } + uint64_t h1; + uint64_t h2; + hasher.Final(&h1, &h2); + return h1; +} + +bool IOBufEqual::operator()(const IOBuf& a, const IOBuf& b) const { + io::Cursor ca(&a); + io::Cursor cb(&b); + for (;;) { + auto ba = ca.peekBytes(); + auto bb = cb.peekBytes(); + if (ba.empty() && bb.empty()) { + return true; + } else if (ba.empty() || bb.empty()) { + return false; + } + size_t n = std::min(ba.size(), bb.size()); + DCHECK_GT(n, 0u); + if (memcmp(ba.data(), bb.data(), n)) { + return false; + } + ca.skip(n); + cb.skip(n); + } } } // folly