2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 // @author: Xin Liu <xliux@fb.com>
19 #ifndef FOLLY_CONCURRENTSKIPLIST_INL_H_
20 #define FOLLY_CONCURRENTSKIPLIST_INL_H_
28 #include <type_traits>
30 #include <boost/noncopyable.hpp>
31 #include <boost/random.hpp>
32 #include <boost/type_traits.hpp>
33 #include <glog/logging.h>
35 #include <folly/Memory.h>
36 #include <folly/SmallLocks.h>
37 #include <folly/ThreadLocal.h>
39 namespace folly { namespace detail {
41 template<typename ValT, typename NodeT> class csl_iterator;
44 class SkipListNode : private boost::noncopyable {
47 MARKED_FOR_REMOVAL = (1 << 1),
48 FULLY_LINKED = (1 << 2),
53 template<typename NodeAlloc, typename U,
54 typename=typename std::enable_if<std::is_convertible<U, T>::value>::type>
55 static SkipListNode* create(
56 NodeAlloc& alloc, int height, U&& data, bool isHead = false) {
57 DCHECK(height >= 1 && height < 64) << height;
59 size_t size = sizeof(SkipListNode) +
60 height * sizeof(std::atomic<SkipListNode*>);
61 auto* node = static_cast<SkipListNode*>(alloc.allocate(size));
63 new (node) SkipListNode(height, std::forward<U>(data), isHead);
67 template<typename NodeAlloc>
68 static void destroy(NodeAlloc& alloc, SkipListNode* node) {
69 node->~SkipListNode();
70 alloc.deallocate(node);
73 template<typename NodeAlloc>
74 static constexpr bool destroyIsNoOp() {
75 return IsArenaAllocator<NodeAlloc>::value &&
76 boost::has_trivial_destructor<std::atomic<SkipListNode*>>::value;
79 // copy the head node to a new head node assuming lock acquired
80 SkipListNode* copyHead(SkipListNode* node) {
81 DCHECK(node != nullptr && height_ > node->height_);
82 setFlags(node->getFlags());
83 for (int i = 0; i < node->height_; ++i) {
84 setSkip(i, node->skip(i));
89 inline SkipListNode* skip(int layer) const {
90 DCHECK_LT(layer, height_);
91 return skip_[layer].load(std::memory_order_consume);
94 // next valid node as in the linked list
95 SkipListNode* next() {
98 (node != nullptr && node->markedForRemoval());
99 node = node->skip(0)) {}
103 void setSkip(uint8_t h, SkipListNode* next) {
104 DCHECK_LT(h, height_);
105 skip_[h].store(next, std::memory_order_release);
108 value_type& data() { return data_; }
109 const value_type& data() const { return data_; }
110 int maxLayer() const { return height_ - 1; }
111 int height() const { return height_; }
113 std::unique_lock<MicroSpinLock> acquireGuard() {
114 return std::unique_lock<MicroSpinLock>(spinLock_);
117 bool fullyLinked() const { return getFlags() & FULLY_LINKED; }
118 bool markedForRemoval() const { return getFlags() & MARKED_FOR_REMOVAL; }
119 bool isHeadNode() const { return getFlags() & IS_HEAD_NODE; }
121 void setIsHeadNode() {
122 setFlags(getFlags() | IS_HEAD_NODE);
124 void setFullyLinked() {
125 setFlags(getFlags() | FULLY_LINKED);
127 void setMarkedForRemoval() {
128 setFlags(getFlags() | MARKED_FOR_REMOVAL);
132 // Note! this can only be called from create() as a placement new.
134 SkipListNode(uint8_t height, U&& data, bool isHead) :
135 height_(height), data_(std::forward<U>(data)) {
138 if (isHead) setIsHeadNode();
139 // need to explicitly init the dynamic atomic pointer array
140 for (uint8_t i = 0; i < height_; ++i) {
141 new (&skip_[i]) std::atomic<SkipListNode*>(nullptr);
146 for (uint8_t i = 0; i < height_; ++i) {
151 uint16_t getFlags() const {
152 return flags_.load(std::memory_order_consume);
154 void setFlags(uint16_t flags) {
155 flags_.store(flags, std::memory_order_release);
158 // TODO(xliu): on x86_64, it's possible to squeeze these into
159 // skip_[0] to maybe save 8 bytes depending on the data alignments.
160 // NOTE: currently this is x86_64 only anyway, due to the
162 std::atomic<uint16_t> flags_;
163 const uint8_t height_;
164 MicroSpinLock spinLock_;
168 std::atomic<SkipListNode*> skip_[0];
171 class SkipListRandomHeight {
172 enum { kMaxHeight = 64 };
174 // make it a singleton.
175 static SkipListRandomHeight *instance() {
176 static SkipListRandomHeight instance_;
180 int getHeight(int maxHeight) const {
181 DCHECK_LE(maxHeight, kMaxHeight) << "max height too big!";
182 double p = randomProb();
183 for (int i = 0; i < maxHeight; ++i) {
184 if (p < lookupTable_[i]) {
191 size_t getSizeLimit(int height) const {
192 DCHECK_LT(height, kMaxHeight);
193 return sizeLimitTable_[height];
197 SkipListRandomHeight() { initLookupTable(); }
199 void initLookupTable() {
200 // set skip prob = 1/E
201 static const double kProbInv = exp(1);
202 static const double kProb = 1.0 / kProbInv;
203 static const size_t kMaxSizeLimit = std::numeric_limits<size_t>::max();
205 double sizeLimit = 1;
206 double p = lookupTable_[0] = (1 - kProb);
207 sizeLimitTable_[0] = 1;
208 for (int i = 1; i < kMaxHeight - 1; ++i) {
210 sizeLimit *= kProbInv;
211 lookupTable_[i] = lookupTable_[i - 1] + p;
212 sizeLimitTable_[i] = sizeLimit > kMaxSizeLimit ?
214 static_cast<size_t>(sizeLimit);
216 lookupTable_[kMaxHeight - 1] = 1;
217 sizeLimitTable_[kMaxHeight - 1] = kMaxSizeLimit;
220 static double randomProb() {
221 static ThreadLocal<boost::lagged_fibonacci2281> rng_;
225 double lookupTable_[kMaxHeight];
226 size_t sizeLimitTable_[kMaxHeight];
229 template<typename NodeType, typename NodeAlloc, typename = void>
232 template<typename NodeType, typename NodeAlloc>
233 class NodeRecycler<NodeType, NodeAlloc, typename std::enable_if<
234 !NodeType::template destroyIsNoOp<NodeAlloc>()>::type> {
236 explicit NodeRecycler(const NodeAlloc& alloc)
237 : refs_(0), dirty_(false), alloc_(alloc) { lock_.init(); }
242 for (auto& node : *nodes_) {
243 NodeType::destroy(alloc_, node);
248 void add(NodeType* node) {
249 std::lock_guard<MicroSpinLock> g(lock_);
250 if (nodes_.get() == nullptr) {
251 nodes_.reset(new std::vector<NodeType*>(1, node));
253 nodes_->push_back(node);
255 DCHECK_GT(refs(), 0);
256 dirty_.store(true, std::memory_order_relaxed);
260 return refs_.fetch_add(1, std::memory_order_relaxed);
264 // We don't expect to clean the recycler immediately everytime it is OK
265 // to do so. Here, it is possible that multiple accessors all release at
266 // the same time but nobody would clean the recycler here. If this
267 // happens, the recycler will usually still get cleaned when
268 // such a race doesn't happen. The worst case is the recycler will
269 // eventually get deleted along with the skiplist.
270 if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
271 return refs_.fetch_add(-1, std::memory_order_relaxed);
274 std::unique_ptr<std::vector<NodeType*>> newNodes;
276 std::lock_guard<MicroSpinLock> g(lock_);
277 if (nodes_.get() == nullptr || refs() > 1) {
278 return refs_.fetch_add(-1, std::memory_order_relaxed);
280 // once refs_ reaches 1 and there is no other accessor, it is safe to
281 // remove all the current nodes in the recycler, as we already acquired
282 // the lock here so no more new nodes can be added, even though new
283 // accessors may be added after that.
284 newNodes.swap(nodes_);
285 dirty_.store(false, std::memory_order_relaxed);
288 // TODO(xliu) should we spawn a thread to do this when there are large
289 // number of nodes in the recycler?
290 for (auto& node : *newNodes) {
291 NodeType::destroy(alloc_, node);
294 // decrease the ref count at the very end, to minimize the
295 // chance of other threads acquiring lock_ to clear the deleted
297 return refs_.fetch_add(-1, std::memory_order_relaxed);
300 NodeAlloc& alloc() { return alloc_; }
304 return refs_.load(std::memory_order_relaxed);
307 std::unique_ptr<std::vector<NodeType*>> nodes_;
308 std::atomic<int32_t> refs_; // current number of visitors to the list
309 std::atomic<bool> dirty_; // whether *nodes_ is non-empty
310 MicroSpinLock lock_; // protects access to *nodes_
314 // In case of arena allocator, no recycling is necessary, and it's possible
315 // to save on ConcurrentSkipList size.
316 template<typename NodeType, typename NodeAlloc>
317 class NodeRecycler<NodeType, NodeAlloc, typename std::enable_if<
318 NodeType::template destroyIsNoOp<NodeAlloc>()>::type> {
320 explicit NodeRecycler(const NodeAlloc& alloc) : alloc_(alloc) { }
323 void releaseRef() { }
325 void add(NodeType* node) { }
327 NodeAlloc& alloc() { return alloc_; }
335 #endif // FOLLY_CONCURRENTSKIPLIST_INL_H_