+template <typename NodeType, typename NodeAlloc, typename = void>
+class NodeRecycler;
+
+template <typename NodeType, typename NodeAlloc>
+class NodeRecycler<NodeType, NodeAlloc, typename std::enable_if<
+ !NodeType::template DestroyIsNoOp<NodeAlloc>::value>::type> {
+ public:
+ explicit NodeRecycler(const NodeAlloc& alloc)
+ : refs_(0), dirty_(false), alloc_(alloc) { lock_.init(); }
+
+ explicit NodeRecycler() : refs_(0), dirty_(false) { lock_.init(); }
+
+ ~NodeRecycler() {
+ CHECK_EQ(refs(), 0);
+ if (nodes_) {
+ for (auto& node : *nodes_) {
+ NodeType::destroy(alloc_, node);
+ }
+ }
+ }
+
+ void add(NodeType* node) {
+ std::lock_guard<MicroSpinLock> g(lock_);
+ if (nodes_.get() == nullptr) {
+ nodes_ = std::make_unique<std::vector<NodeType*>>(1, node);
+ } else {
+ nodes_->push_back(node);
+ }
+ DCHECK_GT(refs(), 0);
+ dirty_.store(true, std::memory_order_relaxed);
+ }
+
+ int addRef() {
+ return refs_.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ int releaseRef() {
+ // We don't expect to clean the recycler immediately everytime it is OK
+ // to do so. Here, it is possible that multiple accessors all release at
+ // the same time but nobody would clean the recycler here. If this
+ // happens, the recycler will usually still get cleaned when
+ // such a race doesn't happen. The worst case is the recycler will
+ // eventually get deleted along with the skiplist.
+ if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
+ return refs_.fetch_add(-1, std::memory_order_relaxed);
+ }
+
+ std::unique_ptr<std::vector<NodeType*>> newNodes;
+ {
+ std::lock_guard<MicroSpinLock> g(lock_);
+ if (nodes_.get() == nullptr || refs() > 1) {
+ return refs_.fetch_add(-1, std::memory_order_relaxed);
+ }
+ // once refs_ reaches 1 and there is no other accessor, it is safe to
+ // remove all the current nodes in the recycler, as we already acquired
+ // the lock here so no more new nodes can be added, even though new
+ // accessors may be added after that.
+ newNodes.swap(nodes_);
+ dirty_.store(false, std::memory_order_relaxed);
+ }
+
+ // TODO(xliu) should we spawn a thread to do this when there are large
+ // number of nodes in the recycler?
+ for (auto& node : *newNodes) {
+ NodeType::destroy(alloc_, node);
+ }
+
+ // decrease the ref count at the very end, to minimize the
+ // chance of other threads acquiring lock_ to clear the deleted
+ // nodes again.
+ return refs_.fetch_add(-1, std::memory_order_relaxed);
+ }
+
+ NodeAlloc& alloc() { return alloc_; }
+
+ private:
+ int refs() const {
+ return refs_.load(std::memory_order_relaxed);
+ }
+
+ std::unique_ptr<std::vector<NodeType*>> nodes_;
+ std::atomic<int32_t> refs_; // current number of visitors to the list
+ std::atomic<bool> dirty_; // whether *nodes_ is non-empty
+ MicroSpinLock lock_; // protects access to *nodes_
+ NodeAlloc alloc_;
+};
+
+// In case of arena allocator, no recycling is necessary, and it's possible
+// to save on ConcurrentSkipList size.
+template <typename NodeType, typename NodeAlloc>
+class NodeRecycler<NodeType, NodeAlloc, typename std::enable_if<
+ NodeType::template DestroyIsNoOp<NodeAlloc>::value>::type> {
+ public:
+ explicit NodeRecycler(const NodeAlloc& alloc) : alloc_(alloc) { }
+
+ void addRef() { }
+ void releaseRef() { }
+
+ void add(NodeType* /* node */) {}
+
+ NodeAlloc& alloc() { return alloc_; }
+
+ private:
+ NodeAlloc alloc_;
+};