IndexedMemPool - pool allocator tailored for lock-free data structures
authorNathan Bronson <ngbronson@fb.com>
Wed, 15 Jan 2014 23:15:31 +0000 (15:15 -0800)
committerJordan DeLong <jdelong@fb.com>
Thu, 16 Jan 2014 19:21:42 +0000 (11:21 -0800)
Summary:
Instances of IndexedMemPool dynamically allocate and then pool
their element type (T), returning 4-byte integer indices that can be
passed to the pool's operator[] method to access or obtain pointers to
the actual elements.  Once they are constructed, elements are never
destroyed.  These two features are useful for lock-free algorithms.
The indexing behavior makes it easy to build tagged pointer-like-things,
since a large number of elements can be managed using fewer bits than a
full pointer.  The pooling behavior makes it safe to read from T-s even
after they have been recycled

Test Plan:
1. unit tests
2. unit tests using DeterministicSchedule
3. this code is moved from tao/queues where it is in production use

Reviewed By: davejwatson@fb.com

FB internal diff: D1089053

folly/IndexedMemPool.h [new file with mode: 0644]
folly/Makefile.am
folly/test/IndexedMemPoolTest.cpp [new file with mode: 0644]

diff --git a/folly/IndexedMemPool.h b/folly/IndexedMemPool.h
new file mode 100644 (file)
index 0000000..cef4d0e
--- /dev/null
@@ -0,0 +1,427 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_DETAIL_INDEXEDMEMPOOL_H
+#define FOLLY_DETAIL_INDEXEDMEMPOOL_H
+
+#include <stdint.h>
+#include <assert.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <boost/noncopyable.hpp>
+#include <folly/AtomicStruct.h>
+#include <folly/detail/CacheLocality.h>
+
+namespace folly {
+
+namespace detail {
+template <typename Pool>
+struct IndexedMemPoolRecycler;
+}
+
+/// Instances of IndexedMemPool dynamically allocate and then pool
+/// their element type (T), returning 4-byte integer indices that can be
+/// passed to the pool's operator[] method to access or obtain pointers
+/// to the actual elements.  Once they are constructed, elements are
+/// never destroyed.  These two features are useful for lock-free
+/// algorithms.  The indexing behavior makes it easy to build tagged
+/// pointer-like-things, since a large number of elements can be managed
+/// using fewer bits than a full pointer.  The pooling behavior makes
+/// it safe to read from T-s even after they have been recycled, since
+/// it is guaranteed that the memory won't have been returned to the OS
+/// and unmapped (the algorithm must still use a mechanism to validate
+/// that the read was correct, but it doesn't have to worry about page
+/// faults), and if the elements use internal sequence numbers it can be
+/// guaranteed that there won't be an ABA match due to the element being
+/// overwritten with a different type that has the same bit pattern.
+///
+/// IMPORTANT: Space for extra elements is allocated to account for those
+/// that are inaccessible because they are in other local lists, so the
+/// actual number of items that can be allocated ranges from capacity to
+/// capacity + (NumLocalLists_-1)*LocalListLimit_.  This is important if
+/// you are trying to maximize the capacity of the pool while constraining
+/// the bit size of the resulting pointers, because the pointers will
+/// actually range up to the boosted capacity.  See maxIndexForCapacity
+/// and capacityForMaxIndex.
+///
+/// To avoid contention, NumLocalLists_ free lists of limited (less than
+/// or equal to LocalListLimit_) size are maintained, and each thread
+/// retrieves and returns entries from its associated local list.  If the
+/// local list becomes too large then elements are placed in bulk in a
+/// global free list.  This allows items to be efficiently recirculated
+/// from consumers to producers.  AccessSpreader is used to access the
+/// local lists, so there is no performance advantage to having more
+/// local lists than L1 caches.
+///
+/// The pool mmap-s the entire necessary address space when the pool is
+/// constructed, but delays element construction.  This means that only
+/// elements that are actually returned to the caller get paged into the
+/// process's resident set (RSS).
+template <typename T,
+          int NumLocalLists_ = 32,
+          int LocalListLimit_ = 200,
+          template<typename> class Atom = std::atomic>
+struct IndexedMemPool : boost::noncopyable {
+  typedef T value_type;
+
+  typedef std::unique_ptr<T, detail::IndexedMemPoolRecycler<IndexedMemPool>>
+      UniquePtr;
+
+  static_assert(LocalListLimit_ <= 255, "LocalListLimit must fit in 8 bits");
+  enum {
+    NumLocalLists = NumLocalLists_,
+    LocalListLimit = LocalListLimit_
+  };
+
+
+  // these are public because clients may need to reason about the number
+  // of bits required to hold indices from a pool, given its capacity
+
+  static constexpr uint32_t maxIndexForCapacity(uint32_t capacity) {
+    // index of uint32_t(-1) == UINT32_MAX is reserved for isAllocated tracking
+    return std::min(uint64_t(capacity) + (NumLocalLists - 1) * LocalListLimit,
+                    uint64_t(uint32_t(-1) - 1));
+  }
+
+  static constexpr uint32_t capacityForMaxIndex(uint32_t maxIndex) {
+    return maxIndex - (NumLocalLists - 1) * LocalListLimit;
+  }
+
+
+  /// Constructs a pool that can allocate at least _capacity_ elements,
+  /// even if all the local lists are full
+  explicit IndexedMemPool(uint32_t capacity)
+    : actualCapacity_(maxIndexForCapacity(capacity))
+    , size_(0)
+    , globalHead_(TaggedPtr{})
+  {
+    const size_t needed = sizeof(Slot) * (actualCapacity_ + 1);
+    long pagesize = sysconf(_SC_PAGESIZE);
+    mmapLength_ = ((needed - 1) & ~(pagesize - 1)) + pagesize;
+    assert(needed <= mmapLength_ && mmapLength_ < needed + pagesize);
+    assert((mmapLength_ % pagesize) == 0);
+
+    slots_ = static_cast<Slot*>(mmap(nullptr, mmapLength_,
+                                     PROT_READ | PROT_WRITE,
+                                     MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
+    if (slots_ == nullptr) {
+      assert(errno == ENOMEM);
+      throw std::bad_alloc();
+    }
+  }
+
+  /// Destroys all of the contained elements
+  ~IndexedMemPool() {
+    for (size_t i = size_; i > 0; --i) {
+      slots_[i].~Slot();
+    }
+    munmap(slots_, mmapLength_);
+  }
+
+  /// Returns a lower bound on the number of elements that may be
+  /// simultaneously allocated and not yet recycled.  Because of the
+  /// local lists it is possible that more elements than this are returned
+  /// successfully
+  size_t capacity() {
+    return capacityForMaxIndex(actualCapacity_);
+  }
+
+  /// Grants ownership of (*this)[retval], or returns 0 if no elements
+  /// are available
+  uint32_t allocIndex() {
+    return localPop(localHead());
+  }
+
+  /// If an element is available, returns a std::unique_ptr to it that will
+  /// recycle the element to the pool when it is reclaimed, otherwise returns
+  /// a null (falsy) std::unique_ptr
+  UniquePtr allocElem() {
+    auto idx = allocIndex();
+    auto ptr = idx == 0 ? nullptr : &slot(idx).elem;
+    return UniquePtr(ptr, typename UniquePtr::deleter_type(this));
+  }
+
+  /// Gives up ownership previously granted by alloc()
+  void recycleIndex(uint32_t idx) {
+    assert(isAllocated(idx));
+    localPush(localHead(), idx);
+  }
+
+  /// Provides access to the pooled element referenced by idx
+  T& operator[](uint32_t idx) {
+    return slot(idx).elem;
+  }
+
+  /// Provides access to the pooled element referenced by idx
+  const T& operator[](uint32_t idx) const {
+    return slot(idx).elem;
+  }
+
+  /// If elem == &pool[idx], then pool.locateElem(elem) == idx.  Also,
+  /// pool.locateElem(nullptr) == 0
+  uint32_t locateElem(const T* elem) const {
+    if (!elem) {
+      return 0;
+    }
+
+    static_assert(std::is_standard_layout<Slot>::value, "offsetof needs POD");
+
+    auto slot = reinterpret_cast<const Slot*>(
+        reinterpret_cast<const char*>(elem) - offsetof(Slot, elem));
+    auto rv = slot - slots_;
+
+    // this assert also tests that rv is in range
+    assert(elem == &(*this)[rv]);
+    return rv;
+  }
+
+  /// Returns true iff idx has been alloc()ed and not recycleIndex()ed
+  bool isAllocated(uint32_t idx) const {
+    return slot(idx).localNext == uint32_t(-1);
+  }
+
+
+ private:
+  ///////////// types
+
+  struct Slot {
+    T elem;
+    uint32_t localNext;
+    uint32_t globalNext;
+
+    Slot() : localNext{}, globalNext{} {}
+  };
+
+  struct TaggedPtr {
+    uint32_t idx;
+
+    // size is bottom 8 bits, tag in top 24.  g++'s code generation for
+    // bitfields seems to depend on the phase of the moon, plus we can
+    // do better because we can rely on other checks to avoid masking
+    uint32_t tagAndSize;
+
+    enum : uint32_t {
+        SizeBits = 8,
+        SizeMask = (1U << SizeBits) - 1,
+        TagIncr = 1U << SizeBits,
+    };
+
+    uint32_t size() const {
+      return tagAndSize & SizeMask;
+    }
+
+    TaggedPtr withSize(uint32_t repl) const {
+      assert(repl <= LocalListLimit);
+      return TaggedPtr{ idx, (tagAndSize & ~SizeMask) | repl };
+    }
+
+    TaggedPtr withSizeIncr() const {
+      assert(size() < LocalListLimit);
+      return TaggedPtr{ idx, tagAndSize + 1 };
+    }
+
+    TaggedPtr withSizeDecr() const {
+      assert(size() > 0);
+      return TaggedPtr{ idx, tagAndSize - 1 };
+    }
+
+    TaggedPtr withIdx(uint32_t repl) const {
+      return TaggedPtr{ repl, tagAndSize + TagIncr };
+    }
+
+    TaggedPtr withEmpty() const {
+      return withIdx(0).withSize(0);
+    }
+  };
+
+  struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING LocalList {
+    AtomicStruct<TaggedPtr,Atom> head;
+
+    LocalList() : head(TaggedPtr{}) {}
+  };
+
+  ////////// fields
+
+  /// the actual number of slots that we will allocate, to guarantee
+  /// that we will satisfy the capacity requested at construction time.
+  /// They will be numbered 1..actualCapacity_ (note the 1-based counting),
+  /// and occupy slots_[1..actualCapacity_].
+  size_t actualCapacity_;
+
+  /// the number of bytes allocated from mmap, which is a multiple of
+  /// the page size of the machine
+  size_t mmapLength_;
+
+  /// this records the number of slots that have actually been constructed.
+  /// To allow use of atomic ++ instead of CAS, we let this overflow.
+  /// The actual number of constructed elements is min(actualCapacity_,
+  /// size_)
+  std::atomic<uint32_t> size_;
+
+  /// raw storage, only 1..min(size_,actualCapacity_) (inclusive) are
+  /// actually constructed.  Note that slots_[0] is not constructed or used
+  Slot* FOLLY_ALIGN_TO_AVOID_FALSE_SHARING slots_;
+
+  /// use AccessSpreader to find your list.  We use stripes instead of
+  /// thread-local to avoid the need to grow or shrink on thread start
+  /// or join.   These are heads of lists chained with localNext
+  LocalList local_[NumLocalLists];
+
+  /// this is the head of a list of node chained by globalNext, that are
+  /// themselves each the head of a list chained by localNext
+  AtomicStruct<TaggedPtr,Atom> FOLLY_ALIGN_TO_AVOID_FALSE_SHARING globalHead_;
+
+  ///////////// private methods
+
+  size_t slotIndex(uint32_t idx) const {
+    assert(0 < idx &&
+           idx <= actualCapacity_ &&
+           idx <= size_.load(std::memory_order_acquire));
+    return idx;
+  }
+
+  Slot& slot(uint32_t idx) {
+    return slots_[slotIndex(idx)];
+  }
+
+  const Slot& slot(uint32_t idx) const {
+    return slots_[slotIndex(idx)];
+  }
+
+  // localHead references a full list chained by localNext.  s should
+  // reference slot(localHead), it is passed as a micro-optimization
+  void globalPush(Slot& s, uint32_t localHead) {
+    while (true) {
+      TaggedPtr gh = globalHead_.load(std::memory_order_acquire);
+      s.globalNext = gh.idx;
+      if (globalHead_.compare_exchange_strong(gh, gh.withIdx(localHead))) {
+        // success
+        return;
+      }
+    }
+  }
+
+  // idx references a single node
+  void localPush(AtomicStruct<TaggedPtr,Atom>& head, uint32_t idx) {
+    Slot& s = slot(idx);
+    TaggedPtr h = head.load(std::memory_order_acquire);
+    while (true) {
+      s.localNext = h.idx;
+
+      if (h.size() == LocalListLimit) {
+        // push will overflow local list, steal it instead
+        if (head.compare_exchange_strong(h, h.withEmpty())) {
+          // steal was successful, put everything in the global list
+          globalPush(s, idx);
+          return;
+        }
+      } else {
+        // local list has space
+        if (head.compare_exchange_strong(h, h.withIdx(idx).withSizeIncr())) {
+          // success
+          return;
+        }
+      }
+      // h was updated by failing CAS
+    }
+  }
+
+  // returns 0 if empty
+  uint32_t globalPop() {
+    while (true) {
+      TaggedPtr gh = globalHead_.load(std::memory_order_acquire);
+      if (gh.idx == 0 || globalHead_.compare_exchange_strong(
+                  gh, gh.withIdx(slot(gh.idx).globalNext))) {
+        // global list is empty, or pop was successful
+        return gh.idx;
+      }
+    }
+  }
+
+  // returns 0 if allocation failed
+  uint32_t localPop(AtomicStruct<TaggedPtr,Atom>& head) {
+    while (true) {
+      TaggedPtr h = head.load(std::memory_order_acquire);
+      if (h.idx != 0) {
+        // local list is non-empty, try to pop
+        Slot& s = slot(h.idx);
+        if (head.compare_exchange_strong(
+                    h, h.withIdx(s.localNext).withSizeDecr())) {
+          // success
+          s.localNext = uint32_t(-1);
+          return h.idx;
+        }
+        continue;
+      }
+
+      uint32_t idx = globalPop();
+      if (idx == 0) {
+        // global list is empty, allocate and construct new slot
+        if (size_.load(std::memory_order_relaxed) >= actualCapacity_ ||
+            (idx = ++size_) > actualCapacity_) {
+          // allocation failed
+          return 0;
+        }
+        // construct it
+        new (&slot(idx)) T();
+        slot(idx).localNext = uint32_t(-1);
+        return idx;
+      }
+
+      Slot& s = slot(idx);
+      if (head.compare_exchange_strong(
+                  h, h.withIdx(s.localNext).withSize(LocalListLimit))) {
+        // global list moved to local list, keep head for us
+        s.localNext = uint32_t(-1);
+        return idx;
+      }
+      // local bulk push failed, return idx to the global list and try again
+      globalPush(s, idx);
+    }
+  }
+
+  AtomicStruct<TaggedPtr,Atom>& localHead() {
+    auto stripe = detail::AccessSpreader<Atom>::current(NumLocalLists);
+    return local_[stripe].head;
+  }
+};
+
+namespace detail {
+
+/// This is a stateful Deleter functor, which allows std::unique_ptr
+/// to track elements allocated from an IndexedMemPool by tracking the
+/// associated pool.  See IndexedMemPool::allocElem.
+template <typename Pool>
+struct IndexedMemPoolRecycler {
+  Pool* pool;
+
+  explicit IndexedMemPoolRecycler(Pool* pool) : pool(pool) {}
+
+  IndexedMemPoolRecycler(const IndexedMemPoolRecycler<Pool>& rhs)
+      = default;
+  IndexedMemPoolRecycler& operator= (const IndexedMemPoolRecycler<Pool>& rhs)
+      = default;
+
+  void operator()(typename Pool::value_type* elem) const {
+    pool->recycleIndex(pool->locateElem(elem));
+  }
+};
+
+}
+
+} // namespace folly
+
+#endif
index f0a3493..ead0045 100644 (file)
@@ -65,6 +65,7 @@ nobase_follyinclude_HEADERS = \
        Format-inl.h \
        GroupVarint.h \
        Hash.h \
+       IndexedMemPool.h \
        IntrusiveList.h \
        io/Cursor.h \
        io/IOBuf.h \
diff --git a/folly/test/IndexedMemPoolTest.cpp b/folly/test/IndexedMemPoolTest.cpp
new file mode 100644 (file)
index 0000000..40f3dd1
--- /dev/null
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/IndexedMemPool.h>
+#include <folly/test/DeterministicSchedule.h>
+#include <thread>
+#include <unistd.h>
+#include <semaphore.h>
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+
+using namespace folly;
+using namespace folly::test;
+
+TEST(IndexedMemPool, unique_ptr) {
+  typedef IndexedMemPool<size_t> Pool;
+  Pool pool(100);
+
+  for (size_t i = 0; i < 100000; ++i) {
+    auto ptr = pool.allocElem();
+    EXPECT_TRUE(!!ptr);
+    *ptr = i;
+  }
+
+  std::vector<Pool::UniquePtr> leak;
+  while (true) {
+    auto ptr = pool.allocElem();
+    if (!ptr) {
+      // good, we finally ran out
+      break;
+    }
+    leak.emplace_back(std::move(ptr));
+    EXPECT_LT(leak.size(), 10000);
+  }
+}
+
+TEST(IndexedMemPool, no_starvation) {
+  const int count = 1000;
+  const int poolSize = 100;
+
+  typedef DeterministicSchedule Sched;
+  Sched sched(Sched::uniform(0));
+
+  typedef IndexedMemPool<int,8,8,DeterministicAtomic> Pool;
+  Pool pool(poolSize);
+
+  for (auto pass = 0; pass < 10; ++pass) {
+    int fd[2];
+    EXPECT_EQ(pipe(fd), 0);
+
+    // makes sure we wait for available nodes, rather than fail allocIndex
+    sem_t allocSem;
+    sem_init(&allocSem, 0, poolSize);
+
+    // this semaphore is only needed for deterministic replay, so that we
+    // always block in an Sched:: operation rather than in a read() syscall
+    sem_t readSem;
+    sem_init(&readSem, 0, 0);
+
+    std::thread produce = Sched::thread([&]() {
+      for (auto i = 0; i < count; ++i) {
+        Sched::wait(&allocSem);
+        uint32_t idx = pool.allocIndex();
+        EXPECT_NE(idx, 0);
+        EXPECT_LE(idx,
+            poolSize + (pool.NumLocalLists - 1) * pool.LocalListLimit);
+        pool[idx] = i;
+        EXPECT_EQ(write(fd[1], &idx, sizeof(idx)), sizeof(idx));
+        Sched::post(&readSem);
+      }
+    });
+
+    std::thread consume = Sched::thread([&]() {
+      for (auto i = 0; i < count; ++i) {
+        uint32_t idx;
+        Sched::wait(&readSem);
+        EXPECT_EQ(read(fd[0], &idx, sizeof(idx)), sizeof(idx));
+        EXPECT_NE(idx, 0);
+        EXPECT_GE(idx, 1);
+        EXPECT_LE(idx,
+            poolSize + (Pool::NumLocalLists - 1) * Pool::LocalListLimit);
+        EXPECT_EQ(pool[idx], i);
+        pool.recycleIndex(idx);
+        Sched::post(&allocSem);
+      }
+    });
+
+    Sched::join(produce);
+    Sched::join(consume);
+    close(fd[0]);
+    close(fd[1]);
+  }
+}
+
+TEST(IndexedMemPool, st_capacity) {
+  // only one local list => capacity is exact
+  typedef IndexedMemPool<int,1,32> Pool;
+  Pool pool(10);
+
+  EXPECT_EQ(pool.capacity(), 10);
+  EXPECT_EQ(Pool::maxIndexForCapacity(10), 10);
+  for (auto i = 0; i < 10; ++i) {
+    EXPECT_NE(pool.allocIndex(), 0);
+  }
+  EXPECT_EQ(pool.allocIndex(), 0);
+}
+
+TEST(IndexedMemPool, mt_capacity) {
+  typedef IndexedMemPool<int,16,32> Pool;
+  Pool pool(1000);
+
+  std::thread threads[10];
+  for (auto i = 0; i < 10; ++i) {
+    threads[i] = std::thread([&]() {
+      for (auto j = 0; j < 100; ++j) {
+        uint32_t idx = pool.allocIndex();
+        EXPECT_NE(idx, 0);
+      }
+    });
+  }
+
+  for (auto i = 0; i < 10; ++i) {
+    threads[i].join();
+  }
+
+  for (auto i = 0; i < 16 * 32; ++i) {
+    pool.allocIndex();
+  }
+  EXPECT_EQ(pool.allocIndex(), 0);
+}
+
+TEST(IndexedMemPool, locate_elem) {
+  IndexedMemPool<int> pool(1000);
+
+  for (auto i = 0; i < 1000; ++i) {
+    auto idx = pool.allocIndex();
+    EXPECT_FALSE(idx == 0);
+    int* elem = &pool[idx];
+    EXPECT_TRUE(idx == pool.locateElem(elem));
+  }
+
+  EXPECT_EQ(pool.locateElem(nullptr), 0);
+}
+
+int main(int argc, char** argv) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  return RUN_ALL_TESTS();
+}