X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2FIOBuf.cpp;h=763522b74abc1b2134c9d20966a522f69f0641af;hb=6a6efad5faf702f61820268cca8726a83bc6a9d2;hp=cf961e7315def377a2a3f7cf89a566884fb2dee2;hpb=cb5272724fbe18e937bbe8bb8a0a279027255032;p=folly.git diff --git a/folly/io/IOBuf.cpp b/folly/io/IOBuf.cpp index cf961e73..763522b7 100644 --- a/folly/io/IOBuf.cpp +++ b/folly/io/IOBuf.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2013 Facebook, Inc. + * Copyright 2016 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,12 +14,19 @@ * limitations under the License. */ +#ifndef __STDC_LIMIT_MACROS #define __STDC_LIMIT_MACROS +#endif -#include "folly/io/IOBuf.h" +#include -#include "folly/Malloc.h" -#include "folly/Likely.h" +#include +#include +#include +#include +#include +#include +#include #include #include @@ -38,7 +45,7 @@ enum : uint16_t { kDataInUse = 0x02, }; -enum : uint32_t { +enum : uint64_t { // When create() is called for buffers less than kDefaultCombinedBufSize, // we allocate a single combined memory segment for the IOBuf and the data // together. See the comments for createCombined()/createSeparate() for more @@ -51,8 +58,29 @@ enum : uint32_t { kDefaultCombinedBufSize = 1024 }; +// Helper function for IOBuf::takeOwnership() +void takeOwnershipError(bool freeOnError, void* buf, + folly::IOBuf::FreeFunction freeFn, + void* userData) { + if (!freeOnError) { + return; + } + if (!freeFn) { + free(buf); + return; + } + try { + freeFn(buf, userData); + } catch (...) { + // The user's free function is not allowed to throw. + // (We are already in the middle of throwing an exception, so + // we cannot let this exception go unhandled.) + abort(); + } } +} // unnamed namespace + namespace folly { struct IOBuf::HeapPrefix { @@ -79,14 +107,19 @@ struct IOBuf::HeapStorage { }; struct IOBuf::HeapFullStorage { + // Make sure jemalloc allocates from the 64-byte class. Putting this here + // because HeapStorage is private so it can't be at namespace level. + static_assert(sizeof(HeapStorage) <= 64, + "IOBuf may not grow over 56 bytes!"); + HeapStorage hs; SharedInfo shared; - MaxAlign align; + std::max_align_t align; }; IOBuf::SharedInfo::SharedInfo() - : freeFn(NULL), - userData(NULL) { + : freeFn(nullptr), + userData(nullptr) { // Use relaxed memory ordering here. Since we are creating a new SharedInfo, // no other threads should be referring to it yet. refcount.store(1, std::memory_order_relaxed); @@ -112,9 +145,7 @@ void* IOBuf::operator new(size_t size) { return &(storage->buf); } -void* IOBuf::operator new(size_t size, void* ptr) { - return ptr; -} +void* IOBuf::operator new(size_t /* size */, void* ptr) { return ptr; } void IOBuf::operator delete(void* ptr) { auto* storageAddr = static_cast(ptr) - offsetof(HeapStorage, buf); @@ -123,7 +154,7 @@ void IOBuf::operator delete(void* ptr) { } void IOBuf::releaseStorage(HeapStorage* storage, uint16_t freeFlags) { - CHECK_EQ(storage->prefix.magic, kHeapMagic); + CHECK_EQ(storage->prefix.magic, static_cast(kHeapMagic)); // Use relaxed memory order here. If we are unlucky and happen to get // out-of-date data the compare_exchange_weak() call below will catch @@ -155,12 +186,40 @@ void IOBuf::releaseStorage(HeapStorage* storage, uint16_t freeFlags) { } } -void IOBuf::freeInternalBuf(void* buf, void* userData) { +void IOBuf::freeInternalBuf(void* /* buf */, void* userData) { auto* storage = static_cast(userData); releaseStorage(storage, kDataInUse); } -unique_ptr IOBuf::create(uint32_t capacity) { +IOBuf::IOBuf(CreateOp, uint64_t capacity) + : next_(this), + prev_(this), + data_(nullptr), + length_(0), + flagsAndSharedInfo_(0) { + SharedInfo* info; + allocExtBuffer(capacity, &buf_, &info, &capacity_); + setSharedInfo(info); + data_ = buf_; +} + +IOBuf::IOBuf(CopyBufferOp /* op */, + const void* buf, + uint64_t size, + uint64_t headroom, + uint64_t minTailroom) + : IOBuf(CREATE, headroom + size + minTailroom) { + advance(headroom); + memcpy(writableData(), buf, size); + append(size); +} + +IOBuf::IOBuf(CopyBufferOp op, ByteRange br, + uint64_t headroom, uint64_t minTailroom) + : IOBuf(op, br.data(), br.size(), headroom, minTailroom) { +} + +unique_ptr IOBuf::create(uint64_t capacity) { // For smaller-sized buffers, allocate the IOBuf, SharedInfo, and the buffer // all with a single allocation. // @@ -176,7 +235,7 @@ unique_ptr IOBuf::create(uint32_t capacity) { return createSeparate(capacity); } -unique_ptr IOBuf::createCombined(uint32_t capacity) { +unique_ptr IOBuf::createCombined(uint64_t capacity) { // To save a memory allocation, allocate space for the IOBuf object, the // SharedInfo struct, and the data itself all with a single call to malloc(). size_t requiredStorage = offsetof(HeapFullStorage, align) + capacity; @@ -190,32 +249,17 @@ unique_ptr IOBuf::createCombined(uint32_t capacity) { uint8_t* storageEnd = reinterpret_cast(storage) + mallocSize; size_t actualCapacity = storageEnd - bufAddr; unique_ptr ret(new (&storage->hs.buf) IOBuf( - kCombinedAlloc, 0, bufAddr, actualCapacity, - bufAddr, 0, &storage->shared)); + InternalConstructor(), packFlagsAndSharedInfo(0, &storage->shared), + bufAddr, actualCapacity, bufAddr, 0)); return ret; } -unique_ptr IOBuf::createSeparate(uint32_t capacity) { - // Allocate an external buffer - uint8_t* buf; - SharedInfo* sharedInfo; - uint32_t actualCapacity; - allocExtBuffer(capacity, &buf, &sharedInfo, &actualCapacity); - - // Allocate the IOBuf header - try { - return unique_ptr(new IOBuf(kExtAllocated, 0, - buf, actualCapacity, - buf, 0, - sharedInfo)); - } catch (...) { - free(buf); - throw; - } +unique_ptr IOBuf::createSeparate(uint64_t capacity) { + return make_unique(CREATE, capacity); } unique_ptr IOBuf::createChain( - size_t totalCapacity, uint32_t maxBufCapacity) { + size_t totalCapacity, uint64_t maxBufCapacity) { unique_ptr out = create( std::min(totalCapacity, size_t(maxBufCapacity))); size_t allocatedCapacity = out->capacity(); @@ -230,74 +274,120 @@ unique_ptr IOBuf::createChain( return out; } -unique_ptr IOBuf::takeOwnership(void* buf, uint32_t capacity, - uint32_t length, +IOBuf::IOBuf(TakeOwnershipOp, void* buf, uint64_t capacity, uint64_t length, + FreeFunction freeFn, void* userData, + bool freeOnError) + : next_(this), + prev_(this), + data_(static_cast(buf)), + buf_(static_cast(buf)), + length_(length), + capacity_(capacity), + flagsAndSharedInfo_(packFlagsAndSharedInfo(kFlagFreeSharedInfo, nullptr)) { + try { + setSharedInfo(new SharedInfo(freeFn, userData)); + } catch (...) { + takeOwnershipError(freeOnError, buf, freeFn, userData); + throw; + } +} + +unique_ptr IOBuf::takeOwnership(void* buf, uint64_t capacity, + uint64_t length, FreeFunction freeFn, void* userData, bool freeOnError) { - SharedInfo* sharedInfo = NULL; try { // TODO: We could allocate the IOBuf object and SharedInfo all in a single // memory allocation. We could use the existing HeapStorage class, and // define a new kSharedInfoInUse flag. We could change our code to call // releaseStorage(kFlagFreeSharedInfo) when this kFlagFreeSharedInfo, // rather than directly calling delete. - sharedInfo = new SharedInfo(freeFn, userData); - - uint8_t* bufPtr = static_cast(buf); - return unique_ptr(new IOBuf(kExtUserSupplied, kFlagFreeSharedInfo, - bufPtr, capacity, - bufPtr, length, - sharedInfo)); + // + // Note that we always pass freeOnError as false to the constructor. + // If the constructor throws we'll handle it below. (We have to handle + // allocation failures from make_unique too.) + return make_unique(TAKE_OWNERSHIP, buf, capacity, length, + freeFn, userData, false); } catch (...) { - delete sharedInfo; - if (freeOnError) { - if (freeFn) { - try { - freeFn(buf, userData); - } catch (...) { - // The user's free function is not allowed to throw. - abort(); - } - } else { - free(buf); - } - } + takeOwnershipError(freeOnError, buf, freeFn, userData); throw; } } -unique_ptr IOBuf::wrapBuffer(const void* buf, uint32_t capacity) { - // We cast away the const-ness of the buffer here. - // This is okay since IOBuf users must use unshare() to create a copy of - // this buffer before writing to the buffer. - uint8_t* bufPtr = static_cast(const_cast(buf)); - return unique_ptr(new IOBuf(kExtUserSupplied, kFlagUserOwned, - bufPtr, capacity, - bufPtr, capacity, - NULL)); +IOBuf::IOBuf(WrapBufferOp, const void* buf, uint64_t capacity) + : IOBuf(InternalConstructor(), 0, + // We cast away the const-ness of the buffer here. + // This is okay since IOBuf users must use unshare() to create a copy + // of this buffer before writing to the buffer. + static_cast(const_cast(buf)), capacity, + static_cast(const_cast(buf)), capacity) { +} + +IOBuf::IOBuf(WrapBufferOp op, ByteRange br) + : IOBuf(op, br.data(), br.size()) { +} + +unique_ptr IOBuf::wrapBuffer(const void* buf, uint64_t capacity) { + return make_unique(WRAP_BUFFER, buf, capacity); +} + +IOBuf IOBuf::wrapBufferAsValue(const void* buf, uint64_t capacity) { + return IOBuf(WrapBufferOp::WRAP_BUFFER, buf, capacity); +} + +IOBuf::IOBuf() noexcept { } -IOBuf::IOBuf(ExtBufTypeEnum type, - uint32_t flags, +IOBuf::IOBuf(IOBuf&& other) noexcept + : data_(other.data_), + buf_(other.buf_), + length_(other.length_), + capacity_(other.capacity_), + flagsAndSharedInfo_(other.flagsAndSharedInfo_) { + // Reset other so it is a clean state to be destroyed. + other.data_ = nullptr; + other.buf_ = nullptr; + other.length_ = 0; + other.capacity_ = 0; + other.flagsAndSharedInfo_ = 0; + + // If other was part of the chain, assume ownership of the rest of its chain. + // (It's only valid to perform move assignment on the head of a chain.) + if (other.next_ != &other) { + next_ = other.next_; + next_->prev_ = this; + other.next_ = &other; + + prev_ = other.prev_; + prev_->next_ = this; + other.prev_ = &other; + } + + // Sanity check to make sure that other is in a valid state to be destroyed. + DCHECK_EQ(other.prev_, &other); + DCHECK_EQ(other.next_, &other); +} + +IOBuf::IOBuf(const IOBuf& other) { + *this = other.cloneAsValue(); +} + +IOBuf::IOBuf(InternalConstructor, + uintptr_t flagsAndSharedInfo, uint8_t* buf, - uint32_t capacity, + uint64_t capacity, uint8_t* data, - uint32_t length, - SharedInfo* sharedInfo) + uint64_t length) : next_(this), prev_(this), data_(data), buf_(buf), length_(length), capacity_(capacity), - flags_(flags), - type_(type), - sharedInfo_(sharedInfo) { + flagsAndSharedInfo_(flagsAndSharedInfo) { assert(data >= buf); assert(data + length <= buf + capacity); - assert(static_cast(flags & kFlagUserOwned) == - (sharedInfo == NULL)); } IOBuf::~IOBuf() { @@ -313,6 +403,60 @@ IOBuf::~IOBuf() { decrementRefcount(); } +IOBuf& IOBuf::operator=(IOBuf&& other) noexcept { + if (this == &other) { + return *this; + } + + // If we are part of a chain, delete the rest of the chain. + while (next_ != this) { + // Since unlink() returns unique_ptr() and we don't store it, + // it will automatically delete the unlinked element. + (void)next_->unlink(); + } + + // Decrement our refcount on the current buffer + decrementRefcount(); + + // Take ownership of the other buffer's data + data_ = other.data_; + buf_ = other.buf_; + length_ = other.length_; + capacity_ = other.capacity_; + flagsAndSharedInfo_ = other.flagsAndSharedInfo_; + // Reset other so it is a clean state to be destroyed. + other.data_ = nullptr; + other.buf_ = nullptr; + other.length_ = 0; + other.capacity_ = 0; + other.flagsAndSharedInfo_ = 0; + + // If other was part of the chain, assume ownership of the rest of its chain. + // (It's only valid to perform move assignment on the head of a chain.) + if (other.next_ != &other) { + next_ = other.next_; + next_->prev_ = this; + other.next_ = &other; + + prev_ = other.prev_; + prev_->next_ = this; + other.prev_ = &other; + } + + // Sanity check to make sure that other is in a valid state to be destroyed. + DCHECK_EQ(other.prev_, &other); + DCHECK_EQ(other.next_, &other); + + return *this; +} + +IOBuf& IOBuf::operator=(const IOBuf& other) { + if (this != &other) { + *this = IOBuf(other); + } + return *this; +} + bool IOBuf::empty() const { const IOBuf* current = this; do { @@ -324,8 +468,8 @@ bool IOBuf::empty() const { return true; } -uint32_t IOBuf::countChainElements() const { - uint32_t numElements = 1; +size_t IOBuf::countChainElements() const { + size_t numElements = 1; for (IOBuf* current = next_; current != this; current = current->next_) { ++numElements; } @@ -359,52 +503,58 @@ void IOBuf::prependChain(unique_ptr&& iobuf) { } unique_ptr IOBuf::clone() const { - unique_ptr newHead(cloneOne()); + return make_unique(cloneAsValue()); +} + +unique_ptr IOBuf::cloneOne() const { + return make_unique(cloneOneAsValue()); +} + +IOBuf IOBuf::cloneAsValue() const { + auto tmp = cloneOneAsValue(); for (IOBuf* current = next_; current != this; current = current->next_) { - newHead->prependChain(current->cloneOne()); + tmp.prependChain(current->cloneOne()); } - return newHead; + return tmp; } -unique_ptr IOBuf::cloneOne() const { - if (sharedInfo_) { - flags_ |= kFlagMaybeShared; - } - unique_ptr iobuf(new IOBuf(static_cast(type_), - flags_, buf_, capacity_, - data_, length_, - sharedInfo_)); - if (sharedInfo_) { - sharedInfo_->refcount.fetch_add(1, std::memory_order_acq_rel); +IOBuf IOBuf::cloneOneAsValue() const { + if (SharedInfo* info = sharedInfo()) { + setFlags(kFlagMaybeShared); + info->refcount.fetch_add(1, std::memory_order_acq_rel); } - return iobuf; + return IOBuf( + InternalConstructor(), + flagsAndSharedInfo_, + buf_, + capacity_, + data_, + length_); } void IOBuf::unshareOneSlow() { // Allocate a new buffer for the data uint8_t* buf; SharedInfo* sharedInfo; - uint32_t actualCapacity; + uint64_t actualCapacity; allocExtBuffer(capacity_, &buf, &sharedInfo, &actualCapacity); // Copy the data // Maintain the same amount of headroom. Since we maintained the same // minimum capacity we also maintain at least the same amount of tailroom. - uint32_t headlen = headroom(); + uint64_t headlen = headroom(); memcpy(buf + headlen, data_, length_); // Release our reference on the old buffer decrementRefcount(); - // Make sure kFlagUserOwned, kFlagMaybeShared, and kFlagFreeSharedInfo - // are all cleared. - flags_ = 0; + // Make sure kFlagMaybeShared and kFlagFreeSharedInfo are all cleared. + setFlagsAndSharedInfo(0, sharedInfo); // Update the buffer pointers to point to the new buffer data_ = buf + headlen; buf_ = buf; - sharedInfo_ = sharedInfo; } void IOBuf::unshareChained() { @@ -431,11 +581,23 @@ void IOBuf::unshareChained() { coalesceSlow(); } -void IOBuf::coalesceSlow(size_t maxLength) { +void IOBuf::makeManagedChained() { + assert(isChained()); + + IOBuf* current = this; + while (true) { + current->makeManagedOne(); + current = current->next_; + if (current == this) { + break; + } + } +} + +void IOBuf::coalesceSlow() { // coalesceSlow() should only be called if we are part of a chain of multiple // IOBufs. The caller should have already verified this. - assert(isChained()); - assert(length_ < maxLength); + DCHECK(isChained()); // Compute the length of the entire chain uint64_t newLength = 0; @@ -443,13 +605,37 @@ void IOBuf::coalesceSlow(size_t maxLength) { do { newLength += end->length_; end = end->next_; - } while (newLength < maxLength && end != this); + } while (end != this); - uint64_t newHeadroom = headroom(); - uint64_t newTailroom = end->prev_->tailroom(); - coalesceAndReallocate(newHeadroom, newLength, end, newTailroom); + coalesceAndReallocate(newLength, end); // We should be only element left in the chain now - assert(length_ >= maxLength || !isChained()); + DCHECK(!isChained()); +} + +void IOBuf::coalesceSlow(size_t maxLength) { + // coalesceSlow() should only be called if we are part of a chain of multiple + // IOBufs. The caller should have already verified this. + DCHECK(isChained()); + DCHECK_LT(length_, maxLength); + + // Compute the length of the entire chain + uint64_t newLength = 0; + IOBuf* end = this; + while (true) { + newLength += end->length_; + end = end->next_; + if (newLength >= maxLength) { + break; + } + if (end == this) { + throw std::overflow_error("attempted to coalesce more data than " + "available"); + } + } + + coalesceAndReallocate(newLength, end); + // We should have the requested length now + DCHECK_GE(length_, maxLength); } void IOBuf::coalesceAndReallocate(size_t newHeadroom, @@ -457,16 +643,13 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, IOBuf* end, size_t newTailroom) { uint64_t newCapacity = newLength + newHeadroom + newTailroom; - if (newCapacity > UINT32_MAX) { - throw std::overflow_error("IOBuf chain too large to coalesce"); - } // Allocate space for the coalesced buffer. // We always convert to an external buffer, even if we happened to be an // internal buffer before. uint8_t* newBuf; SharedInfo* newInfo; - uint32_t actualCapacity; + uint64_t actualCapacity; allocExtBuffer(newCapacity, &newBuf, &newInfo, &actualCapacity); // Copy the data into the new buffer @@ -486,14 +669,11 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, // Point at the new buffer decrementRefcount(); - // Make sure kFlagUserOwned, kFlagMaybeShared, and kFlagFreeSharedInfo - // are all cleared. - flags_ = 0; + // Make sure kFlagMaybeShared and kFlagFreeSharedInfo are all cleared. + setFlagsAndSharedInfo(0, newInfo); capacity_ = actualCapacity; - type_ = kExtAllocated; buf_ = newBuf; - sharedInfo_ = newInfo; data_ = newData; length_ = newLength; @@ -508,13 +688,13 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, void IOBuf::decrementRefcount() { // Externally owned buffers don't have a SharedInfo object and aren't managed // by the reference count - if (flags_ & kFlagUserOwned) { - assert(sharedInfo_ == nullptr); + SharedInfo* info = sharedInfo(); + if (!info) { return; } // Decrement the refcount - uint32_t newcnt = sharedInfo_->refcount.fetch_sub( + uint32_t newcnt = info->refcount.fetch_sub( 1, std::memory_order_acq_rel); // Note that fetch_sub() returns the value before we decremented. // If it is 1, we were the only remaining user; if it is greater there are @@ -536,12 +716,12 @@ void IOBuf::decrementRefcount() { // takeOwnership() store the user's free function with its allocated // SharedInfo object.) However, handling this specially with a flag seems // like it shouldn't be problematic. - if (flags_ & kFlagFreeSharedInfo) { - delete sharedInfo_; + if (flags() & kFlagFreeSharedInfo) { + delete sharedInfo(); } } -void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { +void IOBuf::reserveSlow(uint64_t minHeadroom, uint64_t minTailroom) { size_t newCapacity = (size_t)length_ + minHeadroom + minTailroom; DCHECK_LT(newCapacity, UINT32_MAX); @@ -571,19 +751,21 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { return; } - size_t newAllocatedCapacity = goodExtBufferSize(newCapacity); + size_t newAllocatedCapacity = 0; uint8_t* newBuffer = nullptr; - uint32_t newHeadroom = 0; - uint32_t oldHeadroom = headroom(); + uint64_t newHeadroom = 0; + uint64_t oldHeadroom = headroom(); // If we have a buffer allocated with malloc and we just need more tailroom, - // try to use realloc()/rallocm() to grow the buffer in place. - if ((flags_ & kFlagUserOwned) == 0 && (sharedInfo_->freeFn == nullptr) && - length_ != 0 && oldHeadroom >= minHeadroom) { + // try to use realloc()/xallocx() to grow the buffer in place. + SharedInfo* info = sharedInfo(); + if (info && (info->freeFn == nullptr) && length_ != 0 && + oldHeadroom >= minHeadroom) { + size_t headSlack = oldHeadroom - minHeadroom; + newAllocatedCapacity = goodExtBufferSize(newCapacity + headSlack); if (usingJEMalloc()) { - size_t headSlack = oldHeadroom - minHeadroom; // We assume that tailroom is more useful and more important than - // headroom (not least because realloc / rallocm allow us to grow the + // headroom (not least because realloc / xallocx allow us to grow the // buffer at the tail, but not at the head) So, if we have more headroom // than we need, we consider that "wasted". We arbitrarily define "too // much" headroom to be 25% of the capacity. @@ -591,18 +773,11 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { size_t allocatedCapacity = capacity() + sizeof(SharedInfo); void* p = buf_; if (allocatedCapacity >= jemallocMinInPlaceExpandable) { - int r = rallocm(&p, &newAllocatedCapacity, newAllocatedCapacity, - 0, ALLOCM_NO_MOVE); - if (r == ALLOCM_SUCCESS) { + if (xallocx(p, newAllocatedCapacity, 0, 0) == newAllocatedCapacity) { newBuffer = static_cast(p); newHeadroom = oldHeadroom; - } else if (r == ALLOCM_ERR_OOM) { - // shouldn't happen as we don't actually allocate new memory - // (due to ALLOCM_NO_MOVE) - throw std::bad_alloc(); } - // if ALLOCM_ERR_NOT_MOVED, do nothing, fall back to - // malloc/memcpy/free + // if xallocx failed, do nothing, fall back to malloc/memcpy/free } } } else { // Not using jemalloc @@ -621,41 +796,40 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { // None of the previous reallocation strategies worked (or we're using // an internal buffer). malloc/copy/free. if (newBuffer == nullptr) { + newAllocatedCapacity = goodExtBufferSize(newCapacity); void* p = malloc(newAllocatedCapacity); if (UNLIKELY(p == nullptr)) { throw std::bad_alloc(); } newBuffer = static_cast(p); memcpy(newBuffer + minHeadroom, data_, length_); - if ((flags_ & kFlagUserOwned) == 0) { + if (sharedInfo()) { freeExtBuffer(); } newHeadroom = minHeadroom; } - SharedInfo* info; - uint32_t cap; + uint64_t cap; initExtBuffer(newBuffer, newAllocatedCapacity, &info, &cap); - if (flags_ & kFlagFreeSharedInfo) { - delete sharedInfo_; + if (flags() & kFlagFreeSharedInfo) { + delete sharedInfo(); } - flags_ = 0; + setFlagsAndSharedInfo(0, info); capacity_ = cap; - type_ = kExtAllocated; buf_ = newBuffer; - sharedInfo_ = info; data_ = newBuffer + newHeadroom; // length_ is unchanged } void IOBuf::freeExtBuffer() { - DCHECK((flags_ & kFlagUserOwned) == 0); + SharedInfo* info = sharedInfo(); + DCHECK(info); - if (sharedInfo_->freeFn) { + if (info->freeFn) { try { - sharedInfo_->freeFn(buf_, sharedInfo_->userData); + info->freeFn(buf_, info->userData); } catch (...) { // The user's free function should never throw. Otherwise we might // throw from the IOBuf destructor. Other code paths like coalesce() @@ -667,20 +841,20 @@ void IOBuf::freeExtBuffer() { } } -void IOBuf::allocExtBuffer(uint32_t minCapacity, +void IOBuf::allocExtBuffer(uint64_t minCapacity, uint8_t** bufReturn, SharedInfo** infoReturn, - uint32_t* capacityReturn) { + uint64_t* capacityReturn) { size_t mallocSize = goodExtBufferSize(minCapacity); uint8_t* buf = static_cast(malloc(mallocSize)); - if (UNLIKELY(buf == NULL)) { + if (UNLIKELY(buf == nullptr)) { throw std::bad_alloc(); } initExtBuffer(buf, mallocSize, infoReturn, capacityReturn); *bufReturn = buf; } -size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) { +size_t IOBuf::goodExtBufferSize(uint64_t minCapacity) { // Determine how much space we should allocate. We'll store the SharedInfo // for the external buffer just after the buffer itself. (We store it just // after the buffer rather than just before so that the code can still just @@ -698,34 +872,25 @@ size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) { void IOBuf::initExtBuffer(uint8_t* buf, size_t mallocSize, SharedInfo** infoReturn, - uint32_t* capacityReturn) { + uint64_t* capacityReturn) { // Find the SharedInfo storage at the end of the buffer // and construct the SharedInfo. uint8_t* infoStart = (buf + mallocSize) - sizeof(SharedInfo); SharedInfo* sharedInfo = new(infoStart) SharedInfo; - size_t actualCapacity = infoStart - buf; - // On the unlikely possibility that the actual capacity is larger than can - // fit in a uint32_t after adding room for the refcount and calling - // goodMallocSize(), truncate downwards if necessary. - if (actualCapacity >= UINT32_MAX) { - *capacityReturn = UINT32_MAX; - } else { - *capacityReturn = actualCapacity; - } - + *capacityReturn = infoStart - buf; *infoReturn = sharedInfo; } fbstring IOBuf::moveToFbString() { // malloc-allocated buffers are just fine, everything else needs // to be turned into one. - if ((flags_ & kFlagUserOwned) || // user owned, not ours to give up - sharedInfo_->freeFn != nullptr || // not malloc()-ed - headroom() != 0 || // malloc()-ed block doesn't start at beginning - tailroom() == 0 || // no room for NUL terminator - isShared() || // shared - isChained()) { // chained + if (!sharedInfo() || // user owned, not ours to give up + sharedInfo()->freeFn || // not malloc()-ed + headroom() != 0 || // malloc()-ed block doesn't start at beginning + tailroom() == 0 || // no room for NUL terminator + isShared() || // shared + isChained()) { // chained // We might as well get rid of all head and tailroom if we're going // to reallocate; we need 1 byte for NUL terminator. coalesceAndReallocate(0, computeChainDataLength(), this, 1); @@ -737,13 +902,12 @@ fbstring IOBuf::moveToFbString() { length(), capacity(), AcquireMallocatedString()); - if (flags_ & kFlagFreeSharedInfo) { - delete sharedInfo_; + if (flags() & kFlagFreeSharedInfo) { + delete sharedInfo(); } // Reset to a state where we can be deleted cleanly - flags_ = kFlagUserOwned; - sharedInfo_ = nullptr; + flagsAndSharedInfo_ = 0; buf_ = nullptr; clear(); return str; @@ -760,15 +924,76 @@ IOBuf::Iterator IOBuf::cend() const { folly::fbvector IOBuf::getIov() const { folly::fbvector iov; iov.reserve(countChainElements()); + appendToIov(&iov); + return iov; +} + +void IOBuf::appendToIov(folly::fbvector* iov) const { IOBuf const* p = this; do { // some code can get confused by empty iovs, so skip them if (p->length() > 0) { - iov.push_back({(void*)p->data(), p->length()}); + iov->push_back({(void*)p->data(), folly::to(p->length())}); } p = p->next(); } while (p != this); - return iov; +} + +size_t IOBuf::fillIov(struct iovec* iov, size_t len) const { + IOBuf const* p = this; + size_t i = 0; + while (i < len) { + // some code can get confused by empty iovs, so skip them + if (p->length() > 0) { + iov[i].iov_base = const_cast(p->data()); + iov[i].iov_len = p->length(); + i++; + } + p = p->next(); + if (p == this) { + return i; + } + } + return 0; +} + +size_t IOBufHash::operator()(const IOBuf& buf) const { + folly::hash::SpookyHashV2 hasher; + hasher.Init(0, 0); + io::Cursor cursor(&buf); + for (;;) { + auto b = cursor.peekBytes(); + if (b.empty()) { + break; + } + hasher.Update(b.data(), b.size()); + cursor.skip(b.size()); + } + uint64_t h1; + uint64_t h2; + hasher.Final(&h1, &h2); + return h1; +} + +bool IOBufEqual::operator()(const IOBuf& a, const IOBuf& b) const { + io::Cursor ca(&a); + io::Cursor cb(&b); + for (;;) { + auto ba = ca.peekBytes(); + auto bb = cb.peekBytes(); + if (ba.empty() && bb.empty()) { + return true; + } else if (ba.empty() || bb.empty()) { + return false; + } + size_t n = std::min(ba.size(), bb.size()); + DCHECK_GT(n, 0); + if (memcmp(ba.data(), bb.data(), n)) { + return false; + } + ca.skip(n); + cb.skip(n); + } } } // folly