X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2FIOBuf.cpp;h=06bcca464cc1c02842bbfc40657ee40e3d7c7596;hp=8fc040a57103ec8d391386800329140375fad3c0;hb=81823a9cb036baed2a3cfe5b352832e6340e6a39;hpb=2ba26b052ea53cfa4126981de54677c2af623f66 diff --git a/folly/io/IOBuf.cpp b/folly/io/IOBuf.cpp index 8fc040a5..06bcca46 100644 --- a/folly/io/IOBuf.cpp +++ b/folly/io/IOBuf.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2013 Facebook, Inc. + * Copyright 2014 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,11 @@ #include "folly/io/IOBuf.h" -#include "folly/Malloc.h" +#include "folly/Conv.h" #include "folly/Likely.h" +#include "folly/Malloc.h" +#include "folly/Memory.h" +#include "folly/ScopeGuard.h" #include #include @@ -28,16 +31,87 @@ using std::unique_ptr; +namespace { + +enum : uint16_t { + kHeapMagic = 0xa5a5, + // This memory segment contains an IOBuf that is still in use + kIOBufInUse = 0x01, + // This memory segment contains buffer data that is still in use + kDataInUse = 0x02, +}; + +enum : uint64_t { + // When create() is called for buffers less than kDefaultCombinedBufSize, + // we allocate a single combined memory segment for the IOBuf and the data + // together. See the comments for createCombined()/createSeparate() for more + // details. + // + // (The size of 1k is largely just a guess here. We could could probably do + // benchmarks of real applications to see if adjusting this number makes a + // difference. Callers that know their exact use case can also explicitly + // call createCombined() or createSeparate().) + kDefaultCombinedBufSize = 1024 +}; + +// Helper function for IOBuf::takeOwnership() +void takeOwnershipError(bool freeOnError, void* buf, + folly::IOBuf::FreeFunction freeFn, + void* userData) { + if (!freeOnError) { + return; + } + if (!freeFn) { + free(buf); + return; + } + try { + freeFn(buf, userData); + } catch (...) { + // The user's free function is not allowed to throw. + // (We are already in the middle of throwing an exception, so + // we cannot let this exception go unhandled.) + abort(); + } +} + +} // unnamed namespace + namespace folly { -const uint32_t IOBuf::kMaxIOBufSize; -// Note: Applying offsetof() to an IOBuf is legal according to C++11, since -// IOBuf is a standard-layout class. However, this isn't legal with earlier -// C++ standards, which require that offsetof() only be used with POD types. -// -// This code compiles with g++ 4.6, but not with g++ 4.4 or earlier versions. -const uint32_t IOBuf::kMaxInternalDataSize = - kMaxIOBufSize - offsetof(folly::IOBuf, int_.buf); +struct IOBuf::HeapPrefix { + HeapPrefix(uint16_t flg) + : magic(kHeapMagic), + flags(flg) {} + ~HeapPrefix() { + // Reset magic to 0 on destruction. This is solely for debugging purposes + // to help catch bugs where someone tries to use HeapStorage after it has + // been deleted. + magic = 0; + } + + uint16_t magic; + std::atomic flags; +}; + +struct IOBuf::HeapStorage { + HeapPrefix prefix; + // The IOBuf is last in the HeapStorage object. + // This way operator new will work even if allocating a subclass of IOBuf + // that requires more space. + folly::IOBuf buf; +}; + +struct IOBuf::HeapFullStorage { + // Make sure jemalloc allocates from the 64-byte class. Putting this here + // because HeapStorage is private so it can't be at namespace level. + static_assert(sizeof(HeapStorage) <= 64, + "IOBuf may not grow over 56 bytes!"); + + HeapStorage hs; + SharedInfo shared; + MaxAlign align; +}; IOBuf::SharedInfo::SharedInfo() : freeFn(NULL), @@ -56,68 +130,131 @@ IOBuf::SharedInfo::SharedInfo(FreeFunction fn, void* arg) } void* IOBuf::operator new(size_t size) { - // Since IOBuf::create() manually allocates space for some IOBuf objects - // using malloc(), override operator new so that all IOBuf objects are - // always allocated using malloc(). This way operator delete can always know - // that free() is the correct way to deallocate the memory. - void* ptr = malloc(size); - + size_t fullSize = offsetof(HeapStorage, buf) + size; + auto* storage = static_cast(malloc(fullSize)); // operator new is not allowed to return NULL - if (UNLIKELY(ptr == NULL)) { + if (UNLIKELY(storage == nullptr)) { throw std::bad_alloc(); } - return ptr; + new (&storage->prefix) HeapPrefix(kIOBufInUse); + return &(storage->buf); } void* IOBuf::operator new(size_t size, void* ptr) { - assert(size <= kMaxIOBufSize); return ptr; } void IOBuf::operator delete(void* ptr) { - // For small buffers, IOBuf::create() manually allocates the space for the - // IOBuf object using malloc(). Therefore we override delete to ensure that - // the IOBuf space is freed using free() rather than a normal delete. - free(ptr); -} - -unique_ptr IOBuf::create(uint32_t capacity) { - // If the desired capacity is less than kMaxInternalDataSize, - // just allocate a single region large enough for both the IOBuf header and - // the data. - if (capacity <= kMaxInternalDataSize) { - void* buf = malloc(kMaxIOBufSize); - if (UNLIKELY(buf == NULL)) { - throw std::bad_alloc(); + auto* storageAddr = static_cast(ptr) - offsetof(HeapStorage, buf); + auto* storage = reinterpret_cast(storageAddr); + releaseStorage(storage, kIOBufInUse); +} + +void IOBuf::releaseStorage(HeapStorage* storage, uint16_t freeFlags) { + CHECK_EQ(storage->prefix.magic, static_cast(kHeapMagic)); + + // Use relaxed memory order here. If we are unlucky and happen to get + // out-of-date data the compare_exchange_weak() call below will catch + // it and load new data with memory_order_acq_rel. + auto flags = storage->prefix.flags.load(std::memory_order_acquire); + DCHECK_EQ((flags & freeFlags), freeFlags); + + while (true) { + uint16_t newFlags = (flags & ~freeFlags); + if (newFlags == 0) { + // The storage space is now unused. Free it. + storage->prefix.HeapPrefix::~HeapPrefix(); + free(storage); + return; } - uint8_t* bufEnd = static_cast(buf) + kMaxIOBufSize; - unique_ptr iobuf(new(buf) IOBuf(bufEnd)); - assert(iobuf->capacity() >= capacity); - return iobuf; + // This storage segment still contains portions that are in use. + // Just clear the flags specified in freeFlags for now. + auto ret = storage->prefix.flags.compare_exchange_weak( + flags, newFlags, std::memory_order_acq_rel); + if (ret) { + // We successfully updated the flags. + return; + } + + // We failed to update the flags. Some other thread probably updated them + // and cleared some of the other bits. Continue around the loop to see if + // we are the last user now, or if we need to try updating the flags again. } +} - // Allocate an external buffer - uint8_t* buf; - SharedInfo* sharedInfo; - uint32_t actualCapacity; - allocExtBuffer(capacity, &buf, &sharedInfo, &actualCapacity); +void IOBuf::freeInternalBuf(void* buf, void* userData) { + auto* storage = static_cast(userData); + releaseStorage(storage, kDataInUse); +} - // Allocate the IOBuf header - try { - return unique_ptr(new IOBuf(kExtAllocated, 0, - buf, actualCapacity, - buf, 0, - sharedInfo)); - } catch (...) { - free(buf); - throw; +IOBuf::IOBuf(CreateOp, uint64_t capacity) + : next_(this), + prev_(this), + data_(nullptr), + length_(0), + flagsAndSharedInfo_(0) { + SharedInfo* info; + allocExtBuffer(capacity, &buf_, &info, &capacity_); + setSharedInfo(info); + data_ = buf_; +} + +IOBuf::IOBuf(CopyBufferOp op, const void* buf, uint64_t size, + uint64_t headroom, uint64_t minTailroom) + : IOBuf(CREATE, headroom + size + minTailroom) { + advance(headroom); + memcpy(writableData(), buf, size); + append(size); +} + +IOBuf::IOBuf(CopyBufferOp op, ByteRange br, + uint64_t headroom, uint64_t minTailroom) + : IOBuf(op, br.data(), br.size(), headroom, minTailroom) { +} + +unique_ptr IOBuf::create(uint64_t capacity) { + // For smaller-sized buffers, allocate the IOBuf, SharedInfo, and the buffer + // all with a single allocation. + // + // We don't do this for larger buffers since it can be wasteful if the user + // needs to reallocate the buffer but keeps using the same IOBuf object. + // In this case we can't free the data space until the IOBuf is also + // destroyed. Callers can explicitly call createCombined() or + // createSeparate() if they know their use case better, and know if they are + // likely to reallocate the buffer later. + if (capacity <= kDefaultCombinedBufSize) { + return createCombined(capacity); } + return createSeparate(capacity); +} + +unique_ptr IOBuf::createCombined(uint64_t capacity) { + // To save a memory allocation, allocate space for the IOBuf object, the + // SharedInfo struct, and the data itself all with a single call to malloc(). + size_t requiredStorage = offsetof(HeapFullStorage, align) + capacity; + size_t mallocSize = goodMallocSize(requiredStorage); + auto* storage = static_cast(malloc(mallocSize)); + + new (&storage->hs.prefix) HeapPrefix(kIOBufInUse | kDataInUse); + new (&storage->shared) SharedInfo(freeInternalBuf, storage); + + uint8_t* bufAddr = reinterpret_cast(&storage->align); + uint8_t* storageEnd = reinterpret_cast(storage) + mallocSize; + size_t actualCapacity = storageEnd - bufAddr; + unique_ptr ret(new (&storage->hs.buf) IOBuf( + InternalConstructor(), packFlagsAndSharedInfo(0, &storage->shared), + bufAddr, actualCapacity, bufAddr, 0)); + return ret; +} + +unique_ptr IOBuf::createSeparate(uint64_t capacity) { + return make_unique(CREATE, capacity); } unique_ptr IOBuf::createChain( - size_t totalCapacity, uint32_t maxBufCapacity) { + size_t totalCapacity, uint64_t maxBufCapacity) { unique_ptr out = create( std::min(totalCapacity, size_t(maxBufCapacity))); size_t allocatedCapacity = out->capacity(); @@ -132,80 +269,86 @@ unique_ptr IOBuf::createChain( return out; } -unique_ptr IOBuf::takeOwnership(void* buf, uint32_t capacity, - uint32_t length, +IOBuf::IOBuf(TakeOwnershipOp, void* buf, uint64_t capacity, uint64_t length, + FreeFunction freeFn, void* userData, + bool freeOnError) + : next_(this), + prev_(this), + data_(static_cast(buf)), + buf_(static_cast(buf)), + length_(length), + capacity_(capacity), + flagsAndSharedInfo_(packFlagsAndSharedInfo(kFlagFreeSharedInfo, nullptr)) { + try { + setSharedInfo(new SharedInfo(freeFn, userData)); + } catch (...) { + takeOwnershipError(freeOnError, buf, freeFn, userData); + throw; + } +} + +unique_ptr IOBuf::takeOwnership(void* buf, uint64_t capacity, + uint64_t length, FreeFunction freeFn, void* userData, bool freeOnError) { - SharedInfo* sharedInfo = NULL; try { - sharedInfo = new SharedInfo(freeFn, userData); - - uint8_t* bufPtr = static_cast(buf); - return unique_ptr(new IOBuf(kExtUserSupplied, kFlagFreeSharedInfo, - bufPtr, capacity, - bufPtr, length, - sharedInfo)); + // TODO: We could allocate the IOBuf object and SharedInfo all in a single + // memory allocation. We could use the existing HeapStorage class, and + // define a new kSharedInfoInUse flag. We could change our code to call + // releaseStorage(kFlagFreeSharedInfo) when this kFlagFreeSharedInfo, + // rather than directly calling delete. + // + // Note that we always pass freeOnError as false to the constructor. + // If the constructor throws we'll handle it below. (We have to handle + // allocation failures from make_unique too.) + return make_unique(TAKE_OWNERSHIP, buf, capacity, length, + freeFn, userData, false); } catch (...) { - delete sharedInfo; - if (freeOnError) { - if (freeFn) { - try { - freeFn(buf, userData); - } catch (...) { - // The user's free function is not allowed to throw. - abort(); - } - } else { - free(buf); - } - } + takeOwnershipError(freeOnError, buf, freeFn, userData); throw; } } -unique_ptr IOBuf::wrapBuffer(const void* buf, uint32_t capacity) { - // We cast away the const-ness of the buffer here. - // This is okay since IOBuf users must use unshare() to create a copy of - // this buffer before writing to the buffer. - uint8_t* bufPtr = static_cast(const_cast(buf)); - return unique_ptr(new IOBuf(kExtUserSupplied, kFlagUserOwned, - bufPtr, capacity, - bufPtr, capacity, - NULL)); +IOBuf::IOBuf(WrapBufferOp, const void* buf, uint64_t capacity) + : IOBuf(InternalConstructor(), 0, + // We cast away the const-ness of the buffer here. + // This is okay since IOBuf users must use unshare() to create a copy + // of this buffer before writing to the buffer. + static_cast(const_cast(buf)), capacity, + static_cast(const_cast(buf)), capacity) { } -IOBuf::IOBuf(uint8_t* end) - : next_(this), - prev_(this), - data_(int_.buf), - length_(0), - flags_(0) { - assert(end - int_.buf == kMaxInternalDataSize); - assert(end - reinterpret_cast(this) == kMaxIOBufSize); +IOBuf::IOBuf(WrapBufferOp op, ByteRange br) + : IOBuf(op, br.data(), br.size()) { +} + +unique_ptr IOBuf::wrapBuffer(const void* buf, uint64_t capacity) { + return make_unique(WRAP_BUFFER, buf, capacity); +} + +IOBuf::IOBuf() noexcept { } -IOBuf::IOBuf(ExtBufTypeEnum type, - uint32_t flags, +IOBuf::IOBuf(IOBuf&& other) noexcept { + *this = std::move(other); +} + +IOBuf::IOBuf(InternalConstructor, + uintptr_t flagsAndSharedInfo, uint8_t* buf, - uint32_t capacity, + uint64_t capacity, uint8_t* data, - uint32_t length, - SharedInfo* sharedInfo) + uint64_t length) : next_(this), prev_(this), data_(data), + buf_(buf), length_(length), - flags_(kFlagExt | flags) { - ext_.capacity = capacity; - ext_.type = type; - ext_.buf = buf; - ext_.sharedInfo = sharedInfo; - + capacity_(capacity), + flagsAndSharedInfo_(flagsAndSharedInfo) { assert(data >= buf); assert(data + length <= buf + capacity); - assert(static_cast(flags & kFlagUserOwned) == - (sharedInfo == NULL)); } IOBuf::~IOBuf() { @@ -218,9 +361,50 @@ IOBuf::~IOBuf() { (void)next_->unlink(); } - if (flags_ & kFlagExt) { - decrementRefcount(); + decrementRefcount(); +} + +IOBuf& IOBuf::operator=(IOBuf&& other) noexcept { + // If we are part of a chain, delete the rest of the chain. + while (next_ != this) { + // Since unlink() returns unique_ptr() and we don't store it, + // it will automatically delete the unlinked element. + (void)next_->unlink(); + } + + // Decrement our refcount on the current buffer + decrementRefcount(); + + // Take ownership of the other buffer's data + data_ = other.data_; + buf_ = other.buf_; + length_ = other.length_; + capacity_ = other.capacity_; + flagsAndSharedInfo_ = other.flagsAndSharedInfo_; + // Reset other so it is a clean state to be destroyed. + other.data_ = nullptr; + other.buf_ = nullptr; + other.length_ = 0; + other.capacity_ = 0; + other.flagsAndSharedInfo_ = 0; + + // If other was part of the chain, assume ownership of the rest of its chain. + // (It's only valid to perform move assignment on the head of a chain.) + if (other.next_ != &other) { + next_ = other.next_; + next_->prev_ = this; + other.next_ = &other; + + prev_ = other.prev_; + prev_->next_ = this; + other.prev_ = &other; } + + // Sanity check to make sure that other is in a valid state to be destroyed. + DCHECK_EQ(other.prev_, &other); + DCHECK_EQ(other.next_, &other); + + return *this; } bool IOBuf::empty() const { @@ -234,8 +418,8 @@ bool IOBuf::empty() const { return true; } -uint32_t IOBuf::countChainElements() const { - uint32_t numElements = 1; +size_t IOBuf::countChainElements() const { + size_t numElements = 1; for (IOBuf* current = next_; current != this; current = current->next_) { ++numElements; } @@ -269,64 +453,62 @@ void IOBuf::prependChain(unique_ptr&& iobuf) { } unique_ptr IOBuf::clone() const { - unique_ptr newHead(cloneOne()); + unique_ptr ret = make_unique(); + cloneInto(*ret); + return ret; +} + +unique_ptr IOBuf::cloneOne() const { + unique_ptr ret = make_unique(); + cloneOneInto(*ret); + return ret; +} + +void IOBuf::cloneInto(IOBuf& other) const { + IOBuf tmp; + cloneOneInto(tmp); for (IOBuf* current = next_; current != this; current = current->next_) { - newHead->prependChain(current->cloneOne()); + tmp.prependChain(current->cloneOne()); } - return newHead; + other = std::move(tmp); } -unique_ptr IOBuf::cloneOne() const { - if (flags_ & kFlagExt) { - unique_ptr iobuf(new IOBuf(static_cast(ext_.type), - flags_, ext_.buf, ext_.capacity, - data_, length_, - ext_.sharedInfo)); - if (ext_.sharedInfo) { - ext_.sharedInfo->refcount.fetch_add(1, std::memory_order_acq_rel); - } - return iobuf; - } else { - // We have an internal data buffer that cannot be shared - // Allocate a new IOBuf and copy the data into it. - unique_ptr iobuf(IOBuf::create(kMaxInternalDataSize)); - assert((iobuf->flags_ & kFlagExt) == 0); - iobuf->data_ += headroom(); - memcpy(iobuf->data_, data_, length_); - iobuf->length_ = length_; - return iobuf; +void IOBuf::cloneOneInto(IOBuf& other) const { + SharedInfo* info = sharedInfo(); + if (info) { + setFlags(kFlagMaybeShared); + } + other = IOBuf(InternalConstructor(), + flagsAndSharedInfo_, buf_, capacity_, + data_, length_); + if (info) { + info->refcount.fetch_add(1, std::memory_order_acq_rel); } } void IOBuf::unshareOneSlow() { - // Internal buffers are always unshared, so unshareOneSlow() can only be - // called for external buffers - assert(flags_ & kFlagExt); - // Allocate a new buffer for the data uint8_t* buf; SharedInfo* sharedInfo; - uint32_t actualCapacity; - allocExtBuffer(ext_.capacity, &buf, &sharedInfo, &actualCapacity); + uint64_t actualCapacity; + allocExtBuffer(capacity_, &buf, &sharedInfo, &actualCapacity); // Copy the data // Maintain the same amount of headroom. Since we maintained the same // minimum capacity we also maintain at least the same amount of tailroom. - uint32_t headlen = headroom(); + uint64_t headlen = headroom(); memcpy(buf + headlen, data_, length_); // Release our reference on the old buffer decrementRefcount(); - // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo - // are not set. - flags_ = kFlagExt; + // Make sure kFlagMaybeShared and kFlagFreeSharedInfo are all cleared. + setFlagsAndSharedInfo(0, sharedInfo); // Update the buffer pointers to point to the new buffer data_ = buf + headlen; - ext_.buf = buf; - ext_.sharedInfo = sharedInfo; + buf_ = buf; } void IOBuf::unshareChained() { @@ -353,11 +535,10 @@ void IOBuf::unshareChained() { coalesceSlow(); } -void IOBuf::coalesceSlow(size_t maxLength) { +void IOBuf::coalesceSlow() { // coalesceSlow() should only be called if we are part of a chain of multiple // IOBufs. The caller should have already verified this. - assert(isChained()); - assert(length_ < maxLength); + DCHECK(isChained()); // Compute the length of the entire chain uint64_t newLength = 0; @@ -365,13 +546,37 @@ void IOBuf::coalesceSlow(size_t maxLength) { do { newLength += end->length_; end = end->next_; - } while (newLength < maxLength && end != this); + } while (end != this); - uint64_t newHeadroom = headroom(); - uint64_t newTailroom = end->prev_->tailroom(); - coalesceAndReallocate(newHeadroom, newLength, end, newTailroom); + coalesceAndReallocate(newLength, end); // We should be only element left in the chain now - assert(length_ >= maxLength || !isChained()); + DCHECK(!isChained()); +} + +void IOBuf::coalesceSlow(size_t maxLength) { + // coalesceSlow() should only be called if we are part of a chain of multiple + // IOBufs. The caller should have already verified this. + DCHECK(isChained()); + DCHECK_LT(length_, maxLength); + + // Compute the length of the entire chain + uint64_t newLength = 0; + IOBuf* end = this; + while (true) { + newLength += end->length_; + end = end->next_; + if (newLength >= maxLength) { + break; + } + if (end == this) { + throw std::overflow_error("attempted to coalesce more data than " + "available"); + } + } + + coalesceAndReallocate(newLength, end); + // We should have the requested length now + DCHECK_GE(length_, maxLength); } void IOBuf::coalesceAndReallocate(size_t newHeadroom, @@ -388,7 +593,7 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, // internal buffer before. uint8_t* newBuf; SharedInfo* newInfo; - uint32_t actualCapacity; + uint64_t actualCapacity; allocExtBuffer(newCapacity, &newBuf, &newInfo, &actualCapacity); // Copy the data into the new buffer @@ -406,18 +611,13 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, assert(remaining == 0); // Point at the new buffer - if (flags_ & kFlagExt) { - decrementRefcount(); - } + decrementRefcount(); - // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo - // are not set. - flags_ = kFlagExt; + // Make sure kFlagMaybeShared and kFlagFreeSharedInfo are all cleared. + setFlagsAndSharedInfo(0, newInfo); - ext_.capacity = actualCapacity; - ext_.type = kExtAllocated; - ext_.buf = newBuf; - ext_.sharedInfo = newInfo; + capacity_ = actualCapacity; + buf_ = newBuf; data_ = newData; length_ = newLength; @@ -430,17 +630,15 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, } void IOBuf::decrementRefcount() { - assert(flags_ & kFlagExt); - // Externally owned buffers don't have a SharedInfo object and aren't managed // by the reference count - if (flags_ & kFlagUserOwned) { - assert(ext_.sharedInfo == NULL); + SharedInfo* info = sharedInfo(); + if (!info) { return; } // Decrement the refcount - uint32_t newcnt = ext_.sharedInfo->refcount.fetch_sub( + uint32_t newcnt = info->refcount.fetch_sub( 1, std::memory_order_acq_rel); // Note that fetch_sub() returns the value before we decremented. // If it is 1, we were the only remaining user; if it is greater there are @@ -450,18 +648,7 @@ void IOBuf::decrementRefcount() { } // We were the last user. Free the buffer - if (ext_.sharedInfo->freeFn != NULL) { - try { - ext_.sharedInfo->freeFn(ext_.buf, ext_.sharedInfo->userData); - } catch (...) { - // The user's free function should never throw. Otherwise we might - // throw from the IOBuf destructor. Other code paths like coalesce() - // also assume that decrementRefcount() cannot throw. - abort(); - } - } else { - free(ext_.buf); - } + freeExtBuffer(); // Free the SharedInfo if it was allocated separately. // @@ -473,14 +660,19 @@ void IOBuf::decrementRefcount() { // takeOwnership() store the user's free function with its allocated // SharedInfo object.) However, handling this specially with a flag seems // like it shouldn't be problematic. - if (flags_ & kFlagFreeSharedInfo) { - delete ext_.sharedInfo; + if (flags() & kFlagFreeSharedInfo) { + delete sharedInfo(); } } -void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { +void IOBuf::reserveSlow(uint64_t minHeadroom, uint64_t minTailroom) { size_t newCapacity = (size_t)length_ + minHeadroom + minTailroom; - CHECK_LT(newCapacity, UINT32_MAX); + DCHECK_LT(newCapacity, UINT32_MAX); + + // reserveSlow() is dangerous if anyone else is sharing the buffer, as we may + // reallocate and free the original buffer. It should only ever be called if + // we are the only user of the buffer. + DCHECK(!isSharedOne()); // We'll need to reallocate the buffer. // There are a few options. @@ -505,26 +697,35 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { size_t newAllocatedCapacity = goodExtBufferSize(newCapacity); uint8_t* newBuffer = nullptr; - uint32_t newHeadroom = 0; - uint32_t oldHeadroom = headroom(); - - if ((flags_ & kFlagExt) && length_ != 0 && oldHeadroom >= minHeadroom) { + uint64_t newHeadroom = 0; + uint64_t oldHeadroom = headroom(); + + // If we have a buffer allocated with malloc and we just need more tailroom, + // try to use realloc()/rallocm() to grow the buffer in place. + SharedInfo* info = sharedInfo(); + if (info && (info->freeFn == nullptr) && length_ != 0 && + oldHeadroom >= minHeadroom) { if (usingJEMalloc()) { size_t headSlack = oldHeadroom - minHeadroom; // We assume that tailroom is more useful and more important than - // tailroom (not least because realloc / rallocm allow us to grow the + // headroom (not least because realloc / rallocm allow us to grow the // buffer at the tail, but not at the head) So, if we have more headroom // than we need, we consider that "wasted". We arbitrarily define "too // much" headroom to be 25% of the capacity. if (headSlack * 4 <= newCapacity) { size_t allocatedCapacity = capacity() + sizeof(SharedInfo); - void* p = ext_.buf; + void* p = buf_; if (allocatedCapacity >= jemallocMinInPlaceExpandable) { - int r = rallocm(&p, &newAllocatedCapacity, newAllocatedCapacity, + // rallocm can write to its 2nd arg even if it returns + // ALLOCM_ERR_NOT_MOVED. So, we pass a temporary to its 2nd arg and + // update newAllocatedCapacity only on success. + size_t allocatedSize; + int r = rallocm(&p, &allocatedSize, newAllocatedCapacity, 0, ALLOCM_NO_MOVE); if (r == ALLOCM_SUCCESS) { newBuffer = static_cast(p); newHeadroom = oldHeadroom; + newAllocatedCapacity = allocatedSize; } else if (r == ALLOCM_ERR_OOM) { // shouldn't happen as we don't actually allocate new memory // (due to ALLOCM_NO_MOVE) @@ -537,7 +738,7 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { } else { // Not using jemalloc size_t copySlack = capacity() - length_; if (copySlack * 2 <= length_) { - void* p = realloc(ext_.buf, newAllocatedCapacity); + void* p = realloc(buf_, newAllocatedCapacity); if (UNLIKELY(p == nullptr)) { throw std::bad_alloc(); } @@ -556,30 +757,48 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { } newBuffer = static_cast(p); memcpy(newBuffer + minHeadroom, data_, length_); - if (flags_ & kFlagExt) { - free(ext_.buf); + if (sharedInfo()) { + freeExtBuffer(); } newHeadroom = minHeadroom; } - SharedInfo* info; - uint32_t cap; + uint64_t cap; initExtBuffer(newBuffer, newAllocatedCapacity, &info, &cap); - flags_ = kFlagExt; + if (flags() & kFlagFreeSharedInfo) { + delete sharedInfo(); + } - ext_.capacity = cap; - ext_.type = kExtAllocated; - ext_.buf = newBuffer; - ext_.sharedInfo = info; + setFlagsAndSharedInfo(0, info); + capacity_ = cap; + buf_ = newBuffer; data_ = newBuffer + newHeadroom; // length_ is unchanged } -void IOBuf::allocExtBuffer(uint32_t minCapacity, +void IOBuf::freeExtBuffer() { + SharedInfo* info = sharedInfo(); + DCHECK(info); + + if (info->freeFn) { + try { + info->freeFn(buf_, info->userData); + } catch (...) { + // The user's free function should never throw. Otherwise we might + // throw from the IOBuf destructor. Other code paths like coalesce() + // also assume that decrementRefcount() cannot throw. + abort(); + } + } else { + free(buf_); + } +} + +void IOBuf::allocExtBuffer(uint64_t minCapacity, uint8_t** bufReturn, SharedInfo** infoReturn, - uint32_t* capacityReturn) { + uint64_t* capacityReturn) { size_t mallocSize = goodExtBufferSize(minCapacity); uint8_t* buf = static_cast(malloc(mallocSize)); if (UNLIKELY(buf == NULL)) { @@ -589,11 +808,11 @@ void IOBuf::allocExtBuffer(uint32_t minCapacity, *bufReturn = buf; } -size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) { +size_t IOBuf::goodExtBufferSize(uint64_t minCapacity) { // Determine how much space we should allocate. We'll store the SharedInfo // for the external buffer just after the buffer itself. (We store it just // after the buffer rather than just before so that the code can still just - // use free(ext_.buf) to free the buffer.) + // use free(buf_) to free the buffer.) size_t minSize = static_cast(minCapacity) + sizeof(SharedInfo); // Add room for padding so that the SharedInfo will be aligned on an 8-byte // boundary. @@ -607,33 +826,25 @@ size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) { void IOBuf::initExtBuffer(uint8_t* buf, size_t mallocSize, SharedInfo** infoReturn, - uint32_t* capacityReturn) { + uint64_t* capacityReturn) { // Find the SharedInfo storage at the end of the buffer // and construct the SharedInfo. uint8_t* infoStart = (buf + mallocSize) - sizeof(SharedInfo); SharedInfo* sharedInfo = new(infoStart) SharedInfo; - size_t actualCapacity = infoStart - buf; - // On the unlikely possibility that the actual capacity is larger than can - // fit in a uint32_t after adding room for the refcount and calling - // goodMallocSize(), truncate downwards if necessary. - if (actualCapacity >= UINT32_MAX) { - *capacityReturn = UINT32_MAX; - } else { - *capacityReturn = actualCapacity; - } - + *capacityReturn = infoStart - buf; *infoReturn = sharedInfo; } fbstring IOBuf::moveToFbString() { - // Externally allocated buffers (malloc) are just fine, everything else needs + // malloc-allocated buffers are just fine, everything else needs // to be turned into one. - if (flags_ != kFlagExt || // not malloc()-ed - headroom() != 0 || // malloc()-ed block doesn't start at beginning - tailroom() == 0 || // no room for NUL terminator - isShared() || // shared - isChained()) { // chained + if (!sharedInfo() || // user owned, not ours to give up + sharedInfo()->freeFn || // not malloc()-ed + headroom() != 0 || // malloc()-ed block doesn't start at beginning + tailroom() == 0 || // no room for NUL terminator + isShared() || // shared + isChained()) { // chained // We might as well get rid of all head and tailroom if we're going // to reallocate; we need 1 byte for NUL terminator. coalesceAndReallocate(0, computeChainDataLength(), this, 1); @@ -645,8 +856,13 @@ fbstring IOBuf::moveToFbString() { length(), capacity(), AcquireMallocatedString()); - // Reset to internal buffer. - flags_ = 0; + if (flags() & kFlagFreeSharedInfo) { + delete sharedInfo(); + } + + // Reset to a state where we can be deleted cleanly + flagsAndSharedInfo_ = 0; + buf_ = nullptr; clear(); return str; }