X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2FIOBuf.cpp;h=cf961e7315def377a2a3f7cf89a566884fb2dee2;hb=cb5272724fbe18e937bbe8bb8a0a279027255032;hp=130e7d665493a0b4e6969cb392ae911c6f6b27bc;hpb=d9298481e96658f28b9df5083fef1a70a399eba8;p=folly.git diff --git a/folly/io/IOBuf.cpp b/folly/io/IOBuf.cpp index 130e7d66..cf961e73 100644 --- a/folly/io/IOBuf.cpp +++ b/folly/io/IOBuf.cpp @@ -28,16 +28,61 @@ using std::unique_ptr; +namespace { + +enum : uint16_t { + kHeapMagic = 0xa5a5, + // This memory segment contains an IOBuf that is still in use + kIOBufInUse = 0x01, + // This memory segment contains buffer data that is still in use + kDataInUse = 0x02, +}; + +enum : uint32_t { + // When create() is called for buffers less than kDefaultCombinedBufSize, + // we allocate a single combined memory segment for the IOBuf and the data + // together. See the comments for createCombined()/createSeparate() for more + // details. + // + // (The size of 1k is largely just a guess here. We could could probably do + // benchmarks of real applications to see if adjusting this number makes a + // difference. Callers that know their exact use case can also explicitly + // call createCombined() or createSeparate().) + kDefaultCombinedBufSize = 1024 +}; + +} + namespace folly { -const uint32_t IOBuf::kMaxIOBufSize; -// Note: Applying offsetof() to an IOBuf is legal according to C++11, since -// IOBuf is a standard-layout class. However, this isn't legal with earlier -// C++ standards, which require that offsetof() only be used with POD types. -// -// This code compiles with g++ 4.6, but not with g++ 4.4 or earlier versions. -const uint32_t IOBuf::kMaxInternalDataSize = - kMaxIOBufSize - offsetof(folly::IOBuf, int_.buf); +struct IOBuf::HeapPrefix { + HeapPrefix(uint16_t flg) + : magic(kHeapMagic), + flags(flg) {} + ~HeapPrefix() { + // Reset magic to 0 on destruction. This is solely for debugging purposes + // to help catch bugs where someone tries to use HeapStorage after it has + // been deleted. + magic = 0; + } + + uint16_t magic; + std::atomic flags; +}; + +struct IOBuf::HeapStorage { + HeapPrefix prefix; + // The IOBuf is last in the HeapStorage object. + // This way operator new will work even if allocating a subclass of IOBuf + // that requires more space. + folly::IOBuf buf; +}; + +struct IOBuf::HeapFullStorage { + HeapStorage hs; + SharedInfo shared; + MaxAlign align; +}; IOBuf::SharedInfo::SharedInfo() : freeFn(NULL), @@ -56,48 +101,101 @@ IOBuf::SharedInfo::SharedInfo(FreeFunction fn, void* arg) } void* IOBuf::operator new(size_t size) { - // Since IOBuf::create() manually allocates space for some IOBuf objects - // using malloc(), override operator new so that all IOBuf objects are - // always allocated using malloc(). This way operator delete can always know - // that free() is the correct way to deallocate the memory. - void* ptr = malloc(size); - + size_t fullSize = offsetof(HeapStorage, buf) + size; + auto* storage = static_cast(malloc(fullSize)); // operator new is not allowed to return NULL - if (UNLIKELY(ptr == NULL)) { + if (UNLIKELY(storage == nullptr)) { throw std::bad_alloc(); } - return ptr; + new (&storage->prefix) HeapPrefix(kIOBufInUse); + return &(storage->buf); } void* IOBuf::operator new(size_t size, void* ptr) { - assert(size <= kMaxIOBufSize); return ptr; } void IOBuf::operator delete(void* ptr) { - // For small buffers, IOBuf::create() manually allocates the space for the - // IOBuf object using malloc(). Therefore we override delete to ensure that - // the IOBuf space is freed using free() rather than a normal delete. - free(ptr); + auto* storageAddr = static_cast(ptr) - offsetof(HeapStorage, buf); + auto* storage = reinterpret_cast(storageAddr); + releaseStorage(storage, kIOBufInUse); } -unique_ptr IOBuf::create(uint32_t capacity) { - // If the desired capacity is less than kMaxInternalDataSize, - // just allocate a single region large enough for both the IOBuf header and - // the data. - if (capacity <= kMaxInternalDataSize) { - void* buf = malloc(kMaxIOBufSize); - if (UNLIKELY(buf == NULL)) { - throw std::bad_alloc(); +void IOBuf::releaseStorage(HeapStorage* storage, uint16_t freeFlags) { + CHECK_EQ(storage->prefix.magic, kHeapMagic); + + // Use relaxed memory order here. If we are unlucky and happen to get + // out-of-date data the compare_exchange_weak() call below will catch + // it and load new data with memory_order_acq_rel. + auto flags = storage->prefix.flags.load(std::memory_order_acquire); + DCHECK_EQ((flags & freeFlags), freeFlags); + + while (true) { + uint16_t newFlags = (flags & ~freeFlags); + if (newFlags == 0) { + // The storage space is now unused. Free it. + storage->prefix.HeapPrefix::~HeapPrefix(); + free(storage); + return; + } + + // This storage segment still contains portions that are in use. + // Just clear the flags specified in freeFlags for now. + auto ret = storage->prefix.flags.compare_exchange_weak( + flags, newFlags, std::memory_order_acq_rel); + if (ret) { + // We successfully updated the flags. + return; } - uint8_t* bufEnd = static_cast(buf) + kMaxIOBufSize; - unique_ptr iobuf(new(buf) IOBuf(bufEnd)); - assert(iobuf->capacity() >= capacity); - return iobuf; + // We failed to update the flags. Some other thread probably updated them + // and cleared some of the other bits. Continue around the loop to see if + // we are the last user now, or if we need to try updating the flags again. + } +} + +void IOBuf::freeInternalBuf(void* buf, void* userData) { + auto* storage = static_cast(userData); + releaseStorage(storage, kDataInUse); +} + +unique_ptr IOBuf::create(uint32_t capacity) { + // For smaller-sized buffers, allocate the IOBuf, SharedInfo, and the buffer + // all with a single allocation. + // + // We don't do this for larger buffers since it can be wasteful if the user + // needs to reallocate the buffer but keeps using the same IOBuf object. + // In this case we can't free the data space until the IOBuf is also + // destroyed. Callers can explicitly call createCombined() or + // createSeparate() if they know their use case better, and know if they are + // likely to reallocate the buffer later. + if (capacity <= kDefaultCombinedBufSize) { + return createCombined(capacity); } + return createSeparate(capacity); +} + +unique_ptr IOBuf::createCombined(uint32_t capacity) { + // To save a memory allocation, allocate space for the IOBuf object, the + // SharedInfo struct, and the data itself all with a single call to malloc(). + size_t requiredStorage = offsetof(HeapFullStorage, align) + capacity; + size_t mallocSize = goodMallocSize(requiredStorage); + auto* storage = static_cast(malloc(mallocSize)); + + new (&storage->hs.prefix) HeapPrefix(kIOBufInUse | kDataInUse); + new (&storage->shared) SharedInfo(freeInternalBuf, storage); + uint8_t* bufAddr = reinterpret_cast(&storage->align); + uint8_t* storageEnd = reinterpret_cast(storage) + mallocSize; + size_t actualCapacity = storageEnd - bufAddr; + unique_ptr ret(new (&storage->hs.buf) IOBuf( + kCombinedAlloc, 0, bufAddr, actualCapacity, + bufAddr, 0, &storage->shared)); + return ret; +} + +unique_ptr IOBuf::createSeparate(uint32_t capacity) { // Allocate an external buffer uint8_t* buf; SharedInfo* sharedInfo; @@ -139,6 +237,11 @@ unique_ptr IOBuf::takeOwnership(void* buf, uint32_t capacity, bool freeOnError) { SharedInfo* sharedInfo = NULL; try { + // TODO: We could allocate the IOBuf object and SharedInfo all in a single + // memory allocation. We could use the existing HeapStorage class, and + // define a new kSharedInfoInUse flag. We could change our code to call + // releaseStorage(kFlagFreeSharedInfo) when this kFlagFreeSharedInfo, + // rather than directly calling delete. sharedInfo = new SharedInfo(freeFn, userData); uint8_t* bufPtr = static_cast(buf); @@ -175,16 +278,6 @@ unique_ptr IOBuf::wrapBuffer(const void* buf, uint32_t capacity) { NULL)); } -IOBuf::IOBuf(uint8_t* end) - : next_(this), - prev_(this), - data_(int_.buf), - length_(0), - flags_(0) { - assert(end - int_.buf == kMaxInternalDataSize); - assert(end - reinterpret_cast(this) == kMaxIOBufSize); -} - IOBuf::IOBuf(ExtBufTypeEnum type, uint32_t flags, uint8_t* buf, @@ -195,13 +288,12 @@ IOBuf::IOBuf(ExtBufTypeEnum type, : next_(this), prev_(this), data_(data), + buf_(buf), length_(length), - flags_(kFlagExt | flags) { - ext_.capacity = capacity; - ext_.type = type; - ext_.buf = buf; - ext_.sharedInfo = sharedInfo; - + capacity_(capacity), + flags_(flags), + type_(type), + sharedInfo_(sharedInfo) { assert(data >= buf); assert(data + length <= buf + capacity); assert(static_cast(flags & kFlagUserOwned) == @@ -218,9 +310,7 @@ IOBuf::~IOBuf() { (void)next_->unlink(); } - if (flags_ & kFlagExt) { - decrementRefcount(); - } + decrementRefcount(); } bool IOBuf::empty() const { @@ -279,40 +369,25 @@ unique_ptr IOBuf::clone() const { } unique_ptr IOBuf::cloneOne() const { - if (flags_ & kFlagExt) { - if (ext_.sharedInfo) { - flags_ |= kFlagMaybeShared; - } - unique_ptr iobuf(new IOBuf(static_cast(ext_.type), - flags_, ext_.buf, ext_.capacity, - data_, length_, - ext_.sharedInfo)); - if (ext_.sharedInfo) { - ext_.sharedInfo->refcount.fetch_add(1, std::memory_order_acq_rel); - } - return iobuf; - } else { - // We have an internal data buffer that cannot be shared - // Allocate a new IOBuf and copy the data into it. - unique_ptr iobuf(IOBuf::create(kMaxInternalDataSize)); - assert((iobuf->flags_ & kFlagExt) == 0); - iobuf->data_ += headroom(); - memcpy(iobuf->data_, data_, length_); - iobuf->length_ = length_; - return iobuf; + if (sharedInfo_) { + flags_ |= kFlagMaybeShared; } + unique_ptr iobuf(new IOBuf(static_cast(type_), + flags_, buf_, capacity_, + data_, length_, + sharedInfo_)); + if (sharedInfo_) { + sharedInfo_->refcount.fetch_add(1, std::memory_order_acq_rel); + } + return iobuf; } void IOBuf::unshareOneSlow() { - // Internal buffers are always unshared, so unshareOneSlow() can only be - // called for external buffers - assert(flags_ & kFlagExt); - // Allocate a new buffer for the data uint8_t* buf; SharedInfo* sharedInfo; uint32_t actualCapacity; - allocExtBuffer(ext_.capacity, &buf, &sharedInfo, &actualCapacity); + allocExtBuffer(capacity_, &buf, &sharedInfo, &actualCapacity); // Copy the data // Maintain the same amount of headroom. Since we maintained the same @@ -322,14 +397,14 @@ void IOBuf::unshareOneSlow() { // Release our reference on the old buffer decrementRefcount(); - // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo - // are not set. - flags_ = kFlagExt; + // Make sure kFlagUserOwned, kFlagMaybeShared, and kFlagFreeSharedInfo + // are all cleared. + flags_ = 0; // Update the buffer pointers to point to the new buffer data_ = buf + headlen; - ext_.buf = buf; - ext_.sharedInfo = sharedInfo; + buf_ = buf; + sharedInfo_ = sharedInfo; } void IOBuf::unshareChained() { @@ -409,18 +484,16 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, assert(remaining == 0); // Point at the new buffer - if (flags_ & kFlagExt) { - decrementRefcount(); - } + decrementRefcount(); - // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo - // are not set. - flags_ = kFlagExt; + // Make sure kFlagUserOwned, kFlagMaybeShared, and kFlagFreeSharedInfo + // are all cleared. + flags_ = 0; - ext_.capacity = actualCapacity; - ext_.type = kExtAllocated; - ext_.buf = newBuf; - ext_.sharedInfo = newInfo; + capacity_ = actualCapacity; + type_ = kExtAllocated; + buf_ = newBuf; + sharedInfo_ = newInfo; data_ = newData; length_ = newLength; @@ -433,17 +506,15 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom, } void IOBuf::decrementRefcount() { - assert(flags_ & kFlagExt); - // Externally owned buffers don't have a SharedInfo object and aren't managed // by the reference count if (flags_ & kFlagUserOwned) { - assert(ext_.sharedInfo == NULL); + assert(sharedInfo_ == nullptr); return; } // Decrement the refcount - uint32_t newcnt = ext_.sharedInfo->refcount.fetch_sub( + uint32_t newcnt = sharedInfo_->refcount.fetch_sub( 1, std::memory_order_acq_rel); // Note that fetch_sub() returns the value before we decremented. // If it is 1, we were the only remaining user; if it is greater there are @@ -466,7 +537,7 @@ void IOBuf::decrementRefcount() { // SharedInfo object.) However, handling this specially with a flag seems // like it shouldn't be problematic. if (flags_ & kFlagFreeSharedInfo) { - delete ext_.sharedInfo; + delete sharedInfo_; } } @@ -507,8 +578,7 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { // If we have a buffer allocated with malloc and we just need more tailroom, // try to use realloc()/rallocm() to grow the buffer in place. - if ((flags_ & (kFlagExt | kFlagUserOwned)) == kFlagExt && - (ext_.sharedInfo->freeFn == nullptr) && + if ((flags_ & kFlagUserOwned) == 0 && (sharedInfo_->freeFn == nullptr) && length_ != 0 && oldHeadroom >= minHeadroom) { if (usingJEMalloc()) { size_t headSlack = oldHeadroom - minHeadroom; @@ -519,7 +589,7 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { // much" headroom to be 25% of the capacity. if (headSlack * 4 <= newCapacity) { size_t allocatedCapacity = capacity() + sizeof(SharedInfo); - void* p = ext_.buf; + void* p = buf_; if (allocatedCapacity >= jemallocMinInPlaceExpandable) { int r = rallocm(&p, &newAllocatedCapacity, newAllocatedCapacity, 0, ALLOCM_NO_MOVE); @@ -538,7 +608,7 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { } else { // Not using jemalloc size_t copySlack = capacity() - length_; if (copySlack * 2 <= length_) { - void* p = realloc(ext_.buf, newAllocatedCapacity); + void* p = realloc(buf_, newAllocatedCapacity); if (UNLIKELY(p == nullptr)) { throw std::bad_alloc(); } @@ -557,7 +627,7 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { } newBuffer = static_cast(p); memcpy(newBuffer + minHeadroom, data_, length_); - if ((flags_ & (kFlagExt | kFlagUserOwned)) == kFlagExt) { + if ((flags_ & kFlagUserOwned) == 0) { freeExtBuffer(); } newHeadroom = minHeadroom; @@ -568,24 +638,24 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) { initExtBuffer(newBuffer, newAllocatedCapacity, &info, &cap); if (flags_ & kFlagFreeSharedInfo) { - delete ext_.sharedInfo; + delete sharedInfo_; } - flags_ = kFlagExt; - ext_.capacity = cap; - ext_.type = kExtAllocated; - ext_.buf = newBuffer; - ext_.sharedInfo = info; + flags_ = 0; + capacity_ = cap; + type_ = kExtAllocated; + buf_ = newBuffer; + sharedInfo_ = info; data_ = newBuffer + newHeadroom; // length_ is unchanged } void IOBuf::freeExtBuffer() { - DCHECK((flags_ & (kFlagExt | kFlagUserOwned)) == kFlagExt); + DCHECK((flags_ & kFlagUserOwned) == 0); - if (ext_.sharedInfo->freeFn) { + if (sharedInfo_->freeFn) { try { - ext_.sharedInfo->freeFn(ext_.buf, ext_.sharedInfo->userData); + sharedInfo_->freeFn(buf_, sharedInfo_->userData); } catch (...) { // The user's free function should never throw. Otherwise we might // throw from the IOBuf destructor. Other code paths like coalesce() @@ -593,7 +663,7 @@ void IOBuf::freeExtBuffer() { abort(); } } else { - free(ext_.buf); + free(buf_); } } @@ -614,7 +684,7 @@ size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) { // Determine how much space we should allocate. We'll store the SharedInfo // for the external buffer just after the buffer itself. (We store it just // after the buffer rather than just before so that the code can still just - // use free(ext_.buf) to free the buffer.) + // use free(buf_) to free the buffer.) size_t minSize = static_cast(minCapacity) + sizeof(SharedInfo); // Add room for padding so that the SharedInfo will be aligned on an 8-byte // boundary. @@ -650,8 +720,8 @@ void IOBuf::initExtBuffer(uint8_t* buf, size_t mallocSize, fbstring IOBuf::moveToFbString() { // malloc-allocated buffers are just fine, everything else needs // to be turned into one. - if ((flags_ & (kFlagExt | kFlagUserOwned)) != kFlagExt || // not malloc()-ed - ext_.sharedInfo->freeFn != nullptr || // not malloc()-ed + if ((flags_ & kFlagUserOwned) || // user owned, not ours to give up + sharedInfo_->freeFn != nullptr || // not malloc()-ed headroom() != 0 || // malloc()-ed block doesn't start at beginning tailroom() == 0 || // no room for NUL terminator isShared() || // shared @@ -668,11 +738,13 @@ fbstring IOBuf::moveToFbString() { AcquireMallocatedString()); if (flags_ & kFlagFreeSharedInfo) { - delete ext_.sharedInfo; + delete sharedInfo_; } - // Reset to internal buffer. - flags_ = 0; + // Reset to a state where we can be deleted cleanly + flags_ = kFlagUserOwned; + sharedInfo_ = nullptr; + buf_ = nullptr; clear(); return str; }