Make IOBuf support 64-bit length and capacity
[folly.git] / folly / io / IOBuf.cpp
index 8fc040a57103ec8d391386800329140375fad3c0..06bcca464cc1c02842bbfc40657ee40e3d7c7596 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2014 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #include "folly/io/IOBuf.h"
 
-#include "folly/Malloc.h"
+#include "folly/Conv.h"
 #include "folly/Likely.h"
+#include "folly/Malloc.h"
+#include "folly/Memory.h"
+#include "folly/ScopeGuard.h"
 
 #include <stdexcept>
 #include <assert.h>
 
 using std::unique_ptr;
 
+namespace {
+
+enum : uint16_t {
+  kHeapMagic = 0xa5a5,
+  // This memory segment contains an IOBuf that is still in use
+  kIOBufInUse = 0x01,
+  // This memory segment contains buffer data that is still in use
+  kDataInUse = 0x02,
+};
+
+enum : uint64_t {
+  // When create() is called for buffers less than kDefaultCombinedBufSize,
+  // we allocate a single combined memory segment for the IOBuf and the data
+  // together.  See the comments for createCombined()/createSeparate() for more
+  // details.
+  //
+  // (The size of 1k is largely just a guess here.  We could could probably do
+  // benchmarks of real applications to see if adjusting this number makes a
+  // difference.  Callers that know their exact use case can also explicitly
+  // call createCombined() or createSeparate().)
+  kDefaultCombinedBufSize = 1024
+};
+
+// Helper function for IOBuf::takeOwnership()
+void takeOwnershipError(bool freeOnError, void* buf,
+                        folly::IOBuf::FreeFunction freeFn,
+                        void* userData) {
+  if (!freeOnError) {
+    return;
+  }
+  if (!freeFn) {
+    free(buf);
+    return;
+  }
+  try {
+    freeFn(buf, userData);
+  } catch (...) {
+    // The user's free function is not allowed to throw.
+    // (We are already in the middle of throwing an exception, so
+    // we cannot let this exception go unhandled.)
+    abort();
+  }
+}
+
+} // unnamed namespace
+
 namespace folly {
 
-const uint32_t IOBuf::kMaxIOBufSize;
-// Note: Applying offsetof() to an IOBuf is legal according to C++11, since
-// IOBuf is a standard-layout class.  However, this isn't legal with earlier
-// C++ standards, which require that offsetof() only be used with POD types.
-//
-// This code compiles with g++ 4.6, but not with g++ 4.4 or earlier versions.
-const uint32_t IOBuf::kMaxInternalDataSize =
-  kMaxIOBufSize - offsetof(folly::IOBuf, int_.buf);
+struct IOBuf::HeapPrefix {
+  HeapPrefix(uint16_t flg)
+    : magic(kHeapMagic),
+      flags(flg) {}
+  ~HeapPrefix() {
+    // Reset magic to 0 on destruction.  This is solely for debugging purposes
+    // to help catch bugs where someone tries to use HeapStorage after it has
+    // been deleted.
+    magic = 0;
+  }
+
+  uint16_t magic;
+  std::atomic<uint16_t> flags;
+};
+
+struct IOBuf::HeapStorage {
+  HeapPrefix prefix;
+  // The IOBuf is last in the HeapStorage object.
+  // This way operator new will work even if allocating a subclass of IOBuf
+  // that requires more space.
+  folly::IOBuf buf;
+};
+
+struct IOBuf::HeapFullStorage {
+  // Make sure jemalloc allocates from the 64-byte class.  Putting this here
+  // because HeapStorage is private so it can't be at namespace level.
+  static_assert(sizeof(HeapStorage) <= 64,
+                "IOBuf may not grow over 56 bytes!");
+
+  HeapStorage hs;
+  SharedInfo shared;
+  MaxAlign align;
+};
 
 IOBuf::SharedInfo::SharedInfo()
   : freeFn(NULL),
@@ -56,68 +130,131 @@ IOBuf::SharedInfo::SharedInfo(FreeFunction fn, void* arg)
 }
 
 void* IOBuf::operator new(size_t size) {
-  // Since IOBuf::create() manually allocates space for some IOBuf objects
-  // using malloc(), override operator new so that all IOBuf objects are
-  // always allocated using malloc().  This way operator delete can always know
-  // that free() is the correct way to deallocate the memory.
-  void* ptr = malloc(size);
-
+  size_t fullSize = offsetof(HeapStorage, buf) + size;
+  auto* storage = static_cast<HeapStorage*>(malloc(fullSize));
   // operator new is not allowed to return NULL
-  if (UNLIKELY(ptr == NULL)) {
+  if (UNLIKELY(storage == nullptr)) {
     throw std::bad_alloc();
   }
 
-  return ptr;
+  new (&storage->prefix) HeapPrefix(kIOBufInUse);
+  return &(storage->buf);
 }
 
 void* IOBuf::operator new(size_t size, void* ptr) {
-  assert(size <= kMaxIOBufSize);
   return ptr;
 }
 
 void IOBuf::operator delete(void* ptr) {
-  // For small buffers, IOBuf::create() manually allocates the space for the
-  // IOBuf object using malloc().  Therefore we override delete to ensure that
-  // the IOBuf space is freed using free() rather than a normal delete.
-  free(ptr);
-}
-
-unique_ptr<IOBuf> IOBuf::create(uint32_t capacity) {
-  // If the desired capacity is less than kMaxInternalDataSize,
-  // just allocate a single region large enough for both the IOBuf header and
-  // the data.
-  if (capacity <= kMaxInternalDataSize) {
-    void* buf = malloc(kMaxIOBufSize);
-    if (UNLIKELY(buf == NULL)) {
-      throw std::bad_alloc();
+  auto* storageAddr = static_cast<uint8_t*>(ptr) - offsetof(HeapStorage, buf);
+  auto* storage = reinterpret_cast<HeapStorage*>(storageAddr);
+  releaseStorage(storage, kIOBufInUse);
+}
+
+void IOBuf::releaseStorage(HeapStorage* storage, uint16_t freeFlags) {
+  CHECK_EQ(storage->prefix.magic, static_cast<uint16_t>(kHeapMagic));
+
+  // Use relaxed memory order here.  If we are unlucky and happen to get
+  // out-of-date data the compare_exchange_weak() call below will catch
+  // it and load new data with memory_order_acq_rel.
+  auto flags = storage->prefix.flags.load(std::memory_order_acquire);
+  DCHECK_EQ((flags & freeFlags), freeFlags);
+
+  while (true) {
+    uint16_t newFlags = (flags & ~freeFlags);
+    if (newFlags == 0) {
+      // The storage space is now unused.  Free it.
+      storage->prefix.HeapPrefix::~HeapPrefix();
+      free(storage);
+      return;
     }
 
-    uint8_t* bufEnd = static_cast<uint8_t*>(buf) + kMaxIOBufSize;
-    unique_ptr<IOBuf> iobuf(new(buf) IOBuf(bufEnd));
-    assert(iobuf->capacity() >= capacity);
-    return iobuf;
+    // This storage segment still contains portions that are in use.
+    // Just clear the flags specified in freeFlags for now.
+    auto ret = storage->prefix.flags.compare_exchange_weak(
+        flags, newFlags, std::memory_order_acq_rel);
+    if (ret) {
+      // We successfully updated the flags.
+      return;
+    }
+
+    // We failed to update the flags.  Some other thread probably updated them
+    // and cleared some of the other bits.  Continue around the loop to see if
+    // we are the last user now, or if we need to try updating the flags again.
   }
+}
 
-  // Allocate an external buffer
-  uint8_t* buf;
-  SharedInfo* sharedInfo;
-  uint32_t actualCapacity;
-  allocExtBuffer(capacity, &buf, &sharedInfo, &actualCapacity);
+void IOBuf::freeInternalBuf(void* buf, void* userData) {
+  auto* storage = static_cast<HeapStorage*>(userData);
+  releaseStorage(storage, kDataInUse);
+}
 
-  // Allocate the IOBuf header
-  try {
-    return unique_ptr<IOBuf>(new IOBuf(kExtAllocated, 0,
-                                       buf, actualCapacity,
-                                       buf, 0,
-                                       sharedInfo));
-  } catch (...) {
-    free(buf);
-    throw;
+IOBuf::IOBuf(CreateOp, uint64_t capacity)
+  : next_(this),
+    prev_(this),
+    data_(nullptr),
+    length_(0),
+    flagsAndSharedInfo_(0) {
+  SharedInfo* info;
+  allocExtBuffer(capacity, &buf_, &info, &capacity_);
+  setSharedInfo(info);
+  data_ = buf_;
+}
+
+IOBuf::IOBuf(CopyBufferOp op, const void* buf, uint64_t size,
+             uint64_t headroom, uint64_t minTailroom)
+  : IOBuf(CREATE, headroom + size + minTailroom) {
+  advance(headroom);
+  memcpy(writableData(), buf, size);
+  append(size);
+}
+
+IOBuf::IOBuf(CopyBufferOp op, ByteRange br,
+             uint64_t headroom, uint64_t minTailroom)
+  : IOBuf(op, br.data(), br.size(), headroom, minTailroom) {
+}
+
+unique_ptr<IOBuf> IOBuf::create(uint64_t capacity) {
+  // For smaller-sized buffers, allocate the IOBuf, SharedInfo, and the buffer
+  // all with a single allocation.
+  //
+  // We don't do this for larger buffers since it can be wasteful if the user
+  // needs to reallocate the buffer but keeps using the same IOBuf object.
+  // In this case we can't free the data space until the IOBuf is also
+  // destroyed.  Callers can explicitly call createCombined() or
+  // createSeparate() if they know their use case better, and know if they are
+  // likely to reallocate the buffer later.
+  if (capacity <= kDefaultCombinedBufSize) {
+    return createCombined(capacity);
   }
+  return createSeparate(capacity);
+}
+
+unique_ptr<IOBuf> IOBuf::createCombined(uint64_t capacity) {
+  // To save a memory allocation, allocate space for the IOBuf object, the
+  // SharedInfo struct, and the data itself all with a single call to malloc().
+  size_t requiredStorage = offsetof(HeapFullStorage, align) + capacity;
+  size_t mallocSize = goodMallocSize(requiredStorage);
+  auto* storage = static_cast<HeapFullStorage*>(malloc(mallocSize));
+
+  new (&storage->hs.prefix) HeapPrefix(kIOBufInUse | kDataInUse);
+  new (&storage->shared) SharedInfo(freeInternalBuf, storage);
+
+  uint8_t* bufAddr = reinterpret_cast<uint8_t*>(&storage->align);
+  uint8_t* storageEnd = reinterpret_cast<uint8_t*>(storage) + mallocSize;
+  size_t actualCapacity = storageEnd - bufAddr;
+  unique_ptr<IOBuf> ret(new (&storage->hs.buf) IOBuf(
+        InternalConstructor(), packFlagsAndSharedInfo(0, &storage->shared),
+        bufAddr, actualCapacity, bufAddr, 0));
+  return ret;
+}
+
+unique_ptr<IOBuf> IOBuf::createSeparate(uint64_t capacity) {
+  return make_unique<IOBuf>(CREATE, capacity);
 }
 
 unique_ptr<IOBuf> IOBuf::createChain(
-    size_t totalCapacity, uint32_t maxBufCapacity) {
+    size_t totalCapacity, uint64_t maxBufCapacity) {
   unique_ptr<IOBuf> out = create(
       std::min(totalCapacity, size_t(maxBufCapacity)));
   size_t allocatedCapacity = out->capacity();
@@ -132,80 +269,86 @@ unique_ptr<IOBuf> IOBuf::createChain(
   return out;
 }
 
-unique_ptr<IOBuf> IOBuf::takeOwnership(void* buf, uint32_t capacity,
-                                       uint32_t length,
+IOBuf::IOBuf(TakeOwnershipOp, void* buf, uint64_t capacity, uint64_t length,
+             FreeFunction freeFn, void* userData,
+             bool freeOnError)
+  : next_(this),
+    prev_(this),
+    data_(static_cast<uint8_t*>(buf)),
+    buf_(static_cast<uint8_t*>(buf)),
+    length_(length),
+    capacity_(capacity),
+    flagsAndSharedInfo_(packFlagsAndSharedInfo(kFlagFreeSharedInfo, nullptr)) {
+  try {
+    setSharedInfo(new SharedInfo(freeFn, userData));
+  } catch (...) {
+    takeOwnershipError(freeOnError, buf, freeFn, userData);
+    throw;
+  }
+}
+
+unique_ptr<IOBuf> IOBuf::takeOwnership(void* buf, uint64_t capacity,
+                                       uint64_t length,
                                        FreeFunction freeFn,
                                        void* userData,
                                        bool freeOnError) {
-  SharedInfo* sharedInfo = NULL;
   try {
-    sharedInfo = new SharedInfo(freeFn, userData);
-
-    uint8_t* bufPtr = static_cast<uint8_t*>(buf);
-    return unique_ptr<IOBuf>(new IOBuf(kExtUserSupplied, kFlagFreeSharedInfo,
-                                       bufPtr, capacity,
-                                       bufPtr, length,
-                                       sharedInfo));
+    // TODO: We could allocate the IOBuf object and SharedInfo all in a single
+    // memory allocation.  We could use the existing HeapStorage class, and
+    // define a new kSharedInfoInUse flag.  We could change our code to call
+    // releaseStorage(kFlagFreeSharedInfo) when this kFlagFreeSharedInfo,
+    // rather than directly calling delete.
+    //
+    // Note that we always pass freeOnError as false to the constructor.
+    // If the constructor throws we'll handle it below.  (We have to handle
+    // allocation failures from make_unique too.)
+    return make_unique<IOBuf>(TAKE_OWNERSHIP, buf, capacity, length,
+                              freeFn, userData, false);
   } catch (...) {
-    delete sharedInfo;
-    if (freeOnError) {
-      if (freeFn) {
-        try {
-          freeFn(buf, userData);
-        } catch (...) {
-          // The user's free function is not allowed to throw.
-          abort();
-        }
-      } else {
-        free(buf);
-      }
-    }
+    takeOwnershipError(freeOnError, buf, freeFn, userData);
     throw;
   }
 }
 
-unique_ptr<IOBuf> IOBuf::wrapBuffer(const void* buf, uint32_t capacity) {
-  // We cast away the const-ness of the buffer here.
-  // This is okay since IOBuf users must use unshare() to create a copy of
-  // this buffer before writing to the buffer.
-  uint8_t* bufPtr = static_cast<uint8_t*>(const_cast<void*>(buf));
-  return unique_ptr<IOBuf>(new IOBuf(kExtUserSupplied, kFlagUserOwned,
-                                     bufPtr, capacity,
-                                     bufPtr, capacity,
-                                     NULL));
+IOBuf::IOBuf(WrapBufferOp, const void* buf, uint64_t capacity)
+  : IOBuf(InternalConstructor(), 0,
+          // We cast away the const-ness of the buffer here.
+          // This is okay since IOBuf users must use unshare() to create a copy
+          // of this buffer before writing to the buffer.
+          static_cast<uint8_t*>(const_cast<void*>(buf)), capacity,
+          static_cast<uint8_t*>(const_cast<void*>(buf)), capacity) {
 }
 
-IOBuf::IOBuf(uint8_t* end)
-  : next_(this),
-    prev_(this),
-    data_(int_.buf),
-    length_(0),
-    flags_(0) {
-  assert(end - int_.buf == kMaxInternalDataSize);
-  assert(end - reinterpret_cast<uint8_t*>(this) == kMaxIOBufSize);
+IOBuf::IOBuf(WrapBufferOp op, ByteRange br)
+  : IOBuf(op, br.data(), br.size()) {
+}
+
+unique_ptr<IOBuf> IOBuf::wrapBuffer(const void* buf, uint64_t capacity) {
+  return make_unique<IOBuf>(WRAP_BUFFER, buf, capacity);
+}
+
+IOBuf::IOBuf() noexcept {
 }
 
-IOBuf::IOBuf(ExtBufTypeEnum type,
-             uint32_t flags,
+IOBuf::IOBuf(IOBuf&& other) noexcept {
+  *this = std::move(other);
+}
+
+IOBuf::IOBuf(InternalConstructor,
+             uintptr_t flagsAndSharedInfo,
              uint8_t* buf,
-             uint32_t capacity,
+             uint64_t capacity,
              uint8_t* data,
-             uint32_t length,
-             SharedInfo* sharedInfo)
+             uint64_t length)
   : next_(this),
     prev_(this),
     data_(data),
+    buf_(buf),
     length_(length),
-    flags_(kFlagExt | flags) {
-  ext_.capacity = capacity;
-  ext_.type = type;
-  ext_.buf = buf;
-  ext_.sharedInfo = sharedInfo;
-
+    capacity_(capacity),
+    flagsAndSharedInfo_(flagsAndSharedInfo) {
   assert(data >= buf);
   assert(data + length <= buf + capacity);
-  assert(static_cast<bool>(flags & kFlagUserOwned) ==
-         (sharedInfo == NULL));
 }
 
 IOBuf::~IOBuf() {
@@ -218,9 +361,50 @@ IOBuf::~IOBuf() {
     (void)next_->unlink();
   }
 
-  if (flags_ & kFlagExt) {
-    decrementRefcount();
+  decrementRefcount();
+}
+
+IOBuf& IOBuf::operator=(IOBuf&& other) noexcept {
+  // If we are part of a chain, delete the rest of the chain.
+  while (next_ != this) {
+    // Since unlink() returns unique_ptr() and we don't store it,
+    // it will automatically delete the unlinked element.
+    (void)next_->unlink();
+  }
+
+  // Decrement our refcount on the current buffer
+  decrementRefcount();
+
+  // Take ownership of the other buffer's data
+  data_ = other.data_;
+  buf_ = other.buf_;
+  length_ = other.length_;
+  capacity_ = other.capacity_;
+  flagsAndSharedInfo_ = other.flagsAndSharedInfo_;
+  // Reset other so it is a clean state to be destroyed.
+  other.data_ = nullptr;
+  other.buf_ = nullptr;
+  other.length_ = 0;
+  other.capacity_ = 0;
+  other.flagsAndSharedInfo_ = 0;
+
+  // If other was part of the chain, assume ownership of the rest of its chain.
+  // (It's only valid to perform move assignment on the head of a chain.)
+  if (other.next_ != &other) {
+    next_ = other.next_;
+    next_->prev_ = this;
+    other.next_ = &other;
+
+    prev_ = other.prev_;
+    prev_->next_ = this;
+    other.prev_ = &other;
   }
+
+  // Sanity check to make sure that other is in a valid state to be destroyed.
+  DCHECK_EQ(other.prev_, &other);
+  DCHECK_EQ(other.next_, &other);
+
+  return *this;
 }
 
 bool IOBuf::empty() const {
@@ -234,8 +418,8 @@ bool IOBuf::empty() const {
   return true;
 }
 
-uint32_t IOBuf::countChainElements() const {
-  uint32_t numElements = 1;
+size_t IOBuf::countChainElements() const {
+  size_t numElements = 1;
   for (IOBuf* current = next_; current != this; current = current->next_) {
     ++numElements;
   }
@@ -269,64 +453,62 @@ void IOBuf::prependChain(unique_ptr<IOBuf>&& iobuf) {
 }
 
 unique_ptr<IOBuf> IOBuf::clone() const {
-  unique_ptr<IOBuf> newHead(cloneOne());
+  unique_ptr<IOBuf> ret = make_unique<IOBuf>();
+  cloneInto(*ret);
+  return ret;
+}
+
+unique_ptr<IOBuf> IOBuf::cloneOne() const {
+  unique_ptr<IOBuf> ret = make_unique<IOBuf>();
+  cloneOneInto(*ret);
+  return ret;
+}
+
+void IOBuf::cloneInto(IOBuf& other) const {
+  IOBuf tmp;
+  cloneOneInto(tmp);
 
   for (IOBuf* current = next_; current != this; current = current->next_) {
-    newHead->prependChain(current->cloneOne());
+    tmp.prependChain(current->cloneOne());
   }
 
-  return newHead;
+  other = std::move(tmp);
 }
 
-unique_ptr<IOBuf> IOBuf::cloneOne() const {
-  if (flags_ & kFlagExt) {
-    unique_ptr<IOBuf> iobuf(new IOBuf(static_cast<ExtBufTypeEnum>(ext_.type),
-                                      flags_, ext_.buf, ext_.capacity,
-                                      data_, length_,
-                                      ext_.sharedInfo));
-    if (ext_.sharedInfo) {
-      ext_.sharedInfo->refcount.fetch_add(1, std::memory_order_acq_rel);
-    }
-    return iobuf;
-  } else {
-    // We have an internal data buffer that cannot be shared
-    // Allocate a new IOBuf and copy the data into it.
-    unique_ptr<IOBuf> iobuf(IOBuf::create(kMaxInternalDataSize));
-    assert((iobuf->flags_ & kFlagExt) == 0);
-    iobuf->data_ += headroom();
-    memcpy(iobuf->data_, data_, length_);
-    iobuf->length_ = length_;
-    return iobuf;
+void IOBuf::cloneOneInto(IOBuf& other) const {
+  SharedInfo* info = sharedInfo();
+  if (info) {
+    setFlags(kFlagMaybeShared);
+  }
+  other = IOBuf(InternalConstructor(),
+                flagsAndSharedInfo_, buf_, capacity_,
+                data_, length_);
+  if (info) {
+    info->refcount.fetch_add(1, std::memory_order_acq_rel);
   }
 }
 
 void IOBuf::unshareOneSlow() {
-  // Internal buffers are always unshared, so unshareOneSlow() can only be
-  // called for external buffers
-  assert(flags_ & kFlagExt);
-
   // Allocate a new buffer for the data
   uint8_t* buf;
   SharedInfo* sharedInfo;
-  uint32_t actualCapacity;
-  allocExtBuffer(ext_.capacity, &buf, &sharedInfo, &actualCapacity);
+  uint64_t actualCapacity;
+  allocExtBuffer(capacity_, &buf, &sharedInfo, &actualCapacity);
 
   // Copy the data
   // Maintain the same amount of headroom.  Since we maintained the same
   // minimum capacity we also maintain at least the same amount of tailroom.
-  uint32_t headlen = headroom();
+  uint64_t headlen = headroom();
   memcpy(buf + headlen, data_, length_);
 
   // Release our reference on the old buffer
   decrementRefcount();
-  // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo
-  // are not set.
-  flags_ = kFlagExt;
+  // Make sure kFlagMaybeShared and kFlagFreeSharedInfo are all cleared.
+  setFlagsAndSharedInfo(0, sharedInfo);
 
   // Update the buffer pointers to point to the new buffer
   data_ = buf + headlen;
-  ext_.buf = buf;
-  ext_.sharedInfo = sharedInfo;
+  buf_ = buf;
 }
 
 void IOBuf::unshareChained() {
@@ -353,11 +535,10 @@ void IOBuf::unshareChained() {
   coalesceSlow();
 }
 
-void IOBuf::coalesceSlow(size_t maxLength) {
+void IOBuf::coalesceSlow() {
   // coalesceSlow() should only be called if we are part of a chain of multiple
   // IOBufs.  The caller should have already verified this.
-  assert(isChained());
-  assert(length_ < maxLength);
+  DCHECK(isChained());
 
   // Compute the length of the entire chain
   uint64_t newLength = 0;
@@ -365,13 +546,37 @@ void IOBuf::coalesceSlow(size_t maxLength) {
   do {
     newLength += end->length_;
     end = end->next_;
-  } while (newLength < maxLength && end != this);
+  } while (end != this);
 
-  uint64_t newHeadroom = headroom();
-  uint64_t newTailroom = end->prev_->tailroom();
-  coalesceAndReallocate(newHeadroom, newLength, end, newTailroom);
+  coalesceAndReallocate(newLength, end);
   // We should be only element left in the chain now
-  assert(length_ >= maxLength || !isChained());
+  DCHECK(!isChained());
+}
+
+void IOBuf::coalesceSlow(size_t maxLength) {
+  // coalesceSlow() should only be called if we are part of a chain of multiple
+  // IOBufs.  The caller should have already verified this.
+  DCHECK(isChained());
+  DCHECK_LT(length_, maxLength);
+
+  // Compute the length of the entire chain
+  uint64_t newLength = 0;
+  IOBuf* end = this;
+  while (true) {
+    newLength += end->length_;
+    end = end->next_;
+    if (newLength >= maxLength) {
+      break;
+    }
+    if (end == this) {
+      throw std::overflow_error("attempted to coalesce more data than "
+                                "available");
+    }
+  }
+
+  coalesceAndReallocate(newLength, end);
+  // We should have the requested length now
+  DCHECK_GE(length_, maxLength);
 }
 
 void IOBuf::coalesceAndReallocate(size_t newHeadroom,
@@ -388,7 +593,7 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom,
   // internal buffer before.
   uint8_t* newBuf;
   SharedInfo* newInfo;
-  uint32_t actualCapacity;
+  uint64_t actualCapacity;
   allocExtBuffer(newCapacity, &newBuf, &newInfo, &actualCapacity);
 
   // Copy the data into the new buffer
@@ -406,18 +611,13 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom,
   assert(remaining == 0);
 
   // Point at the new buffer
-  if (flags_ & kFlagExt) {
-    decrementRefcount();
-  }
+  decrementRefcount();
 
-  // Make sure kFlagExt is set, and kFlagUserOwned and kFlagFreeSharedInfo
-  // are not set.
-  flags_ = kFlagExt;
+  // Make sure kFlagMaybeShared and kFlagFreeSharedInfo are all cleared.
+  setFlagsAndSharedInfo(0, newInfo);
 
-  ext_.capacity = actualCapacity;
-  ext_.type = kExtAllocated;
-  ext_.buf = newBuf;
-  ext_.sharedInfo = newInfo;
+  capacity_ = actualCapacity;
+  buf_ = newBuf;
   data_ = newData;
   length_ = newLength;
 
@@ -430,17 +630,15 @@ void IOBuf::coalesceAndReallocate(size_t newHeadroom,
 }
 
 void IOBuf::decrementRefcount() {
-  assert(flags_ & kFlagExt);
-
   // Externally owned buffers don't have a SharedInfo object and aren't managed
   // by the reference count
-  if (flags_ & kFlagUserOwned) {
-    assert(ext_.sharedInfo == NULL);
+  SharedInfo* info = sharedInfo();
+  if (!info) {
     return;
   }
 
   // Decrement the refcount
-  uint32_t newcnt = ext_.sharedInfo->refcount.fetch_sub(
+  uint32_t newcnt = info->refcount.fetch_sub(
       1, std::memory_order_acq_rel);
   // Note that fetch_sub() returns the value before we decremented.
   // If it is 1, we were the only remaining user; if it is greater there are
@@ -450,18 +648,7 @@ void IOBuf::decrementRefcount() {
   }
 
   // We were the last user.  Free the buffer
-  if (ext_.sharedInfo->freeFn != NULL) {
-    try {
-      ext_.sharedInfo->freeFn(ext_.buf, ext_.sharedInfo->userData);
-    } catch (...) {
-      // The user's free function should never throw.  Otherwise we might
-      // throw from the IOBuf destructor.  Other code paths like coalesce()
-      // also assume that decrementRefcount() cannot throw.
-      abort();
-    }
-  } else {
-    free(ext_.buf);
-  }
+  freeExtBuffer();
 
   // Free the SharedInfo if it was allocated separately.
   //
@@ -473,14 +660,19 @@ void IOBuf::decrementRefcount() {
   // takeOwnership() store the user's free function with its allocated
   // SharedInfo object.)  However, handling this specially with a flag seems
   // like it shouldn't be problematic.
-  if (flags_ & kFlagFreeSharedInfo) {
-    delete ext_.sharedInfo;
+  if (flags() & kFlagFreeSharedInfo) {
+    delete sharedInfo();
   }
 }
 
-void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) {
+void IOBuf::reserveSlow(uint64_t minHeadroom, uint64_t minTailroom) {
   size_t newCapacity = (size_t)length_ + minHeadroom + minTailroom;
-  CHECK_LT(newCapacity, UINT32_MAX);
+  DCHECK_LT(newCapacity, UINT32_MAX);
+
+  // reserveSlow() is dangerous if anyone else is sharing the buffer, as we may
+  // reallocate and free the original buffer.  It should only ever be called if
+  // we are the only user of the buffer.
+  DCHECK(!isSharedOne());
 
   // We'll need to reallocate the buffer.
   // There are a few options.
@@ -505,26 +697,35 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) {
 
   size_t newAllocatedCapacity = goodExtBufferSize(newCapacity);
   uint8_t* newBuffer = nullptr;
-  uint32_t newHeadroom = 0;
-  uint32_t oldHeadroom = headroom();
-
-  if ((flags_ & kFlagExt) && length_ != 0 && oldHeadroom >= minHeadroom) {
+  uint64_t newHeadroom = 0;
+  uint64_t oldHeadroom = headroom();
+
+  // If we have a buffer allocated with malloc and we just need more tailroom,
+  // try to use realloc()/rallocm() to grow the buffer in place.
+  SharedInfo* info = sharedInfo();
+  if (info && (info->freeFn == nullptr) && length_ != 0 &&
+      oldHeadroom >= minHeadroom) {
     if (usingJEMalloc()) {
       size_t headSlack = oldHeadroom - minHeadroom;
       // We assume that tailroom is more useful and more important than
-      // tailroom (not least because realloc / rallocm allow us to grow the
+      // headroom (not least because realloc / rallocm allow us to grow the
       // buffer at the tail, but not at the head)  So, if we have more headroom
       // than we need, we consider that "wasted".  We arbitrarily define "too
       // much" headroom to be 25% of the capacity.
       if (headSlack * 4 <= newCapacity) {
         size_t allocatedCapacity = capacity() + sizeof(SharedInfo);
-        void* p = ext_.buf;
+        void* p = buf_;
         if (allocatedCapacity >= jemallocMinInPlaceExpandable) {
-          int r = rallocm(&p, &newAllocatedCapacity, newAllocatedCapacity,
+          // rallocm can write to its 2nd arg even if it returns
+          // ALLOCM_ERR_NOT_MOVED. So, we pass a temporary to its 2nd arg and
+          // update newAllocatedCapacity only on success.
+          size_t allocatedSize;
+          int r = rallocm(&p, &allocatedSize, newAllocatedCapacity,
                           0, ALLOCM_NO_MOVE);
           if (r == ALLOCM_SUCCESS) {
             newBuffer = static_cast<uint8_t*>(p);
             newHeadroom = oldHeadroom;
+            newAllocatedCapacity = allocatedSize;
           } else if (r == ALLOCM_ERR_OOM) {
             // shouldn't happen as we don't actually allocate new memory
             // (due to ALLOCM_NO_MOVE)
@@ -537,7 +738,7 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) {
     } else {  // Not using jemalloc
       size_t copySlack = capacity() - length_;
       if (copySlack * 2 <= length_) {
-        void* p = realloc(ext_.buf, newAllocatedCapacity);
+        void* p = realloc(buf_, newAllocatedCapacity);
         if (UNLIKELY(p == nullptr)) {
           throw std::bad_alloc();
         }
@@ -556,30 +757,48 @@ void IOBuf::reserveSlow(uint32_t minHeadroom, uint32_t minTailroom) {
     }
     newBuffer = static_cast<uint8_t*>(p);
     memcpy(newBuffer + minHeadroom, data_, length_);
-    if (flags_ & kFlagExt) {
-      free(ext_.buf);
+    if (sharedInfo()) {
+      freeExtBuffer();
     }
     newHeadroom = minHeadroom;
   }
 
-  SharedInfo* info;
-  uint32_t cap;
+  uint64_t cap;
   initExtBuffer(newBuffer, newAllocatedCapacity, &info, &cap);
 
-  flags_ = kFlagExt;
+  if (flags() & kFlagFreeSharedInfo) {
+    delete sharedInfo();
+  }
 
-  ext_.capacity = cap;
-  ext_.type = kExtAllocated;
-  ext_.buf = newBuffer;
-  ext_.sharedInfo = info;
+  setFlagsAndSharedInfo(0, info);
+  capacity_ = cap;
+  buf_ = newBuffer;
   data_ = newBuffer + newHeadroom;
   // length_ is unchanged
 }
 
-void IOBuf::allocExtBuffer(uint32_t minCapacity,
+void IOBuf::freeExtBuffer() {
+  SharedInfo* info = sharedInfo();
+  DCHECK(info);
+
+  if (info->freeFn) {
+    try {
+      info->freeFn(buf_, info->userData);
+    } catch (...) {
+      // The user's free function should never throw.  Otherwise we might
+      // throw from the IOBuf destructor.  Other code paths like coalesce()
+      // also assume that decrementRefcount() cannot throw.
+      abort();
+    }
+  } else {
+    free(buf_);
+  }
+}
+
+void IOBuf::allocExtBuffer(uint64_t minCapacity,
                            uint8_t** bufReturn,
                            SharedInfo** infoReturn,
-                           uint32_t* capacityReturn) {
+                           uint64_t* capacityReturn) {
   size_t mallocSize = goodExtBufferSize(minCapacity);
   uint8_t* buf = static_cast<uint8_t*>(malloc(mallocSize));
   if (UNLIKELY(buf == NULL)) {
@@ -589,11 +808,11 @@ void IOBuf::allocExtBuffer(uint32_t minCapacity,
   *bufReturn = buf;
 }
 
-size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) {
+size_t IOBuf::goodExtBufferSize(uint64_t minCapacity) {
   // Determine how much space we should allocate.  We'll store the SharedInfo
   // for the external buffer just after the buffer itself.  (We store it just
   // after the buffer rather than just before so that the code can still just
-  // use free(ext_.buf) to free the buffer.)
+  // use free(buf_) to free the buffer.)
   size_t minSize = static_cast<size_t>(minCapacity) + sizeof(SharedInfo);
   // Add room for padding so that the SharedInfo will be aligned on an 8-byte
   // boundary.
@@ -607,33 +826,25 @@ size_t IOBuf::goodExtBufferSize(uint32_t minCapacity) {
 
 void IOBuf::initExtBuffer(uint8_t* buf, size_t mallocSize,
                           SharedInfo** infoReturn,
-                          uint32_t* capacityReturn) {
+                          uint64_t* capacityReturn) {
   // Find the SharedInfo storage at the end of the buffer
   // and construct the SharedInfo.
   uint8_t* infoStart = (buf + mallocSize) - sizeof(SharedInfo);
   SharedInfo* sharedInfo = new(infoStart) SharedInfo;
 
-  size_t actualCapacity = infoStart - buf;
-  // On the unlikely possibility that the actual capacity is larger than can
-  // fit in a uint32_t after adding room for the refcount and calling
-  // goodMallocSize(), truncate downwards if necessary.
-  if (actualCapacity >= UINT32_MAX) {
-    *capacityReturn = UINT32_MAX;
-  } else {
-    *capacityReturn = actualCapacity;
-  }
-
+  *capacityReturn = infoStart - buf;
   *infoReturn = sharedInfo;
 }
 
 fbstring IOBuf::moveToFbString() {
-  // Externally allocated buffers (malloc) are just fine, everything else needs
+  // malloc-allocated buffers are just fine, everything else needs
   // to be turned into one.
-  if (flags_ != kFlagExt ||  // not malloc()-ed
-      headroom() != 0 ||     // malloc()-ed block doesn't start at beginning
-      tailroom() == 0 ||     // no room for NUL terminator
-      isShared() ||          // shared
-      isChained()) {         // chained
+  if (!sharedInfo() ||         // user owned, not ours to give up
+      sharedInfo()->freeFn ||  // not malloc()-ed
+      headroom() != 0 ||       // malloc()-ed block doesn't start at beginning
+      tailroom() == 0 ||       // no room for NUL terminator
+      isShared() ||            // shared
+      isChained()) {           // chained
     // We might as well get rid of all head and tailroom if we're going
     // to reallocate; we need 1 byte for NUL terminator.
     coalesceAndReallocate(0, computeChainDataLength(), this, 1);
@@ -645,8 +856,13 @@ fbstring IOBuf::moveToFbString() {
                length(),  capacity(),
                AcquireMallocatedString());
 
-  // Reset to internal buffer.
-  flags_ = 0;
+  if (flags() & kFlagFreeSharedInfo) {
+    delete sharedInfo();
+  }
+
+  // Reset to a state where we can be deleted cleanly
+  flagsAndSharedInfo_ = 0;
+  buf_ = nullptr;
   clear();
   return str;
 }