2 * Copyright 2012 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 // @author: Xin Liu <xliux@fb.com>
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
24 This implements a sorted associative container that supports only
25 unique keys. (Similar to std::set.)
29 1. Small memory overhead: ~40% less memory overhead compared with
30 std::set (1.6 words per node versus 3). It has an minimum of 4
31 words (7 words if there nodes got deleted) per-list overhead
34 2. Read accesses (count, find iterator, skipper) are lock-free and
35 mostly wait-free (the only wait a reader may need to do is when
36 the node it is visiting is in a pending stage, i.e. deleting,
37 adding and not fully linked). Write accesses (remove, add) need
38 to acquire locks, but locks are local to the predecessor nodes
39 and/or successor nodes.
41 3. Good high contention performance, comparable single-thread
42 performance. In the multithreaded case (12 workers), CSL tested
43 10x faster than a RWSpinLocked std::set for an averaged sized
46 Comparable read performance to std::set when single threaded,
47 especially when the list size is large, and scales better to
48 larger lists: when the size is small, CSL can be 20-50% slower on
49 find()/contains(). As the size gets large (> 1M elements),
50 find()/contains() can be 30% faster.
52 Iterating through a skiplist is similar to iterating through a
53 linked list, thus is much (2-6x) faster than on a std::set
54 (tree-based). This is especially true for short lists due to
55 better cache locality. Based on that, it's also faster to
56 intersect two skiplists.
58 4. Lazy removal with GC support. The removed nodes get deleted when
59 the last Accessor to the skiplist is destroyed.
63 1. Write operations are usually 30% slower than std::set in a single
66 2. Need to have a head node for each list, which has a 4 word
69 3. When the list is quite small (< 1000 elements), single threaded
70 benchmarks show CSL can be 10x slower than std:set.
72 4. The interface requires using an Accessor to access the skiplist.
75 5. Currently x64 only, due to use of MicroSpinLock.
77 6. Freed nodes will not be reclaimed as long as there are ongoing
82 typedef ConcurrentSkipList<int> SkipListT;
83 shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
85 // It's usually good practice to hold an accessor only during
86 // its necessary life cycle (but not in a tight loop as
87 // Accessor creation incurs ref-counting overhead).
89 // Holding it longer delays garbage-collecting the deleted
91 SkipListT::Accessor accessor(sl);
94 for (auto &elem : accessor) {
95 // use elem to access data
100 Another useful type is the Skipper accessor. This is useful if you
101 want to skip to locations in the way std::lower_bound() works,
102 i.e. it can be used for going through the list by skipping to the
103 node no less than a specified key. The Skipper keeps its location as
104 state, which makes it convenient for things like implementing
105 intersection of two sets efficiently, as it can start from the last
109 SkipListT::Accessor accessor(sl);
110 SkipListT::Skipper skipper(accessor);
113 CHECK_LE(30, *skipper);
116 // GC may happen when the accessor gets destructed.
120 #ifndef FOLLY_CONCURRENT_SKIP_LIST_H_
121 #define FOLLY_CONCURRENT_SKIP_LIST_H_
125 #include <type_traits>
130 #include <boost/iterator/iterator_facade.hpp>
131 #include <boost/scoped_ptr.hpp>
132 #include <boost/shared_ptr.hpp>
134 #include <glog/logging.h>
135 #include "folly/ConcurrentSkipList-inl.h"
136 #include "folly/Likely.h"
137 #include "folly/SmallLocks.h"
141 template<typename T, typename Comp=std::less<T>, int MAX_HEIGHT=24>
142 class ConcurrentSkipList {
143 // MAX_HEIGHT needs to be at least 2 to suppress compiler
144 // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
145 // being treated as a scalar in the compiler).
146 static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
147 "MAX_HEIGHT can only be in the range of [2, 64)");
148 typedef detail::SkipListNode<T> NodeType;
149 typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
150 typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
153 typedef T value_type;
157 typedef detail::csl_iterator<value_type, NodeType> iterator;
158 typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
163 // convenient function to get an Accessor to a new instance.
164 static Accessor create(int height=1) {
165 return Accessor(createInstance(height));
168 // create a shared_ptr skiplist object with initial head height.
169 static boost::shared_ptr<SkipListType> createInstance(int height=1) {
170 return boost::shared_ptr<SkipListType>(new SkipListType(height));
173 //===================================================================
174 // Below are implementation details.
175 // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
176 //===================================================================
178 ~ConcurrentSkipList() {
179 LOG_IF(FATAL, recycler_.refs() > 0)
180 << "number of accessors is not 0, " << recycler_.refs() << " instead!"
181 << " This shouldn't have happened!";
182 while (NodeType* current = head_.load(std::memory_order_relaxed)) {
183 NodeType* tmp = current->skip(0);
184 NodeType::destroy(current);
185 head_.store(tmp, std::memory_order_relaxed);
191 static bool greater(const value_type &data, const NodeType *node) {
192 return node && Comp()(node->data(), data);
195 static bool less(const value_type &data, const NodeType *node) {
196 return (node == nullptr) || Comp()(data, node->data());
199 static int findInsertionPoint(NodeType *cur, int cur_layer,
200 const value_type &data,
201 NodeType *preds[], NodeType *succs[]) {
203 NodeType *pred = cur;
204 NodeType *foundNode = nullptr;
205 for (int layer = cur_layer; layer >= 0; --layer) {
206 NodeType *node = pred->skip(layer);
207 while (greater(data, node)) {
209 node = node->skip(layer);
211 if (foundLayer == -1 && !less(data, node)) { // the two keys equal
217 // if found, succs[0..foundLayer] need to point to the cached foundNode,
218 // as foundNode might be deleted at the same time thus pred->skip() can
219 // return NULL or another node.
220 succs[layer] = foundNode ? foundNode : node;
225 struct Recycler : private boost::noncopyable {
226 Recycler() : refs_(0), dirty_(false) { lock_.init(); }
230 for (auto& node : *nodes_) {
231 NodeType::destroy(node);
236 void add(NodeType* node) {
237 std::lock_guard<MicroSpinLock> g(lock_);
238 if (nodes_.get() == nullptr) {
239 nodes_.reset(new std::vector<NodeType*>(1, node));
241 nodes_->push_back(node);
243 DCHECK_GT(refs(), 0);
244 dirty_.store(true, std::memory_order_relaxed);
248 return refs_.load(std::memory_order_relaxed);
252 return refs_.fetch_add(1, std::memory_order_relaxed);
256 // We don't expect to clean the recycler immediately everytime it is OK
257 // to do so. Here, it is possible that multiple accessors all release at
258 // the same time but nobody would clean the recycler here. If this
259 // happens, the recycler will usually still get cleaned when
260 // such a race doesn't happen. The worst case is the recycler will
261 // eventually get deleted along with the skiplist.
262 if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
263 return refs_.fetch_add(-1, std::memory_order_relaxed);
266 boost::scoped_ptr<std::vector<NodeType*> > newNodes;
268 std::lock_guard<MicroSpinLock> g(lock_);
269 if (nodes_.get() == nullptr || refs() > 1) {
270 return refs_.fetch_add(-1, std::memory_order_relaxed);
272 // once refs_ reaches 1 and there is no other accessor, it is safe to
273 // remove all the current nodes in the recycler, as we already acquired
274 // the lock here so no more new nodes can be added, even though new
275 // accessors may be added after that.
276 newNodes.swap(nodes_);
277 dirty_.store(false, std::memory_order_relaxed);
280 // TODO(xliu) should we spawn a thread to do this when there are large
281 // number of nodes in the recycler?
282 for (auto& node : *newNodes) {
283 NodeType::destroy(node);
286 // decrease the ref count at the very end, to minimize the
287 // chance of other threads acquiring lock_ to clear the deleted
289 return refs_.fetch_add(-1, std::memory_order_relaxed);
293 boost::scoped_ptr<std::vector<NodeType*>> nodes_;
294 std::atomic<int32_t> refs_; // current number of visitors to the list
295 std::atomic<bool> dirty_; // whether *nodes_ is non-empty
296 MicroSpinLock lock_; // protects access to *nodes_
297 }; // class ConcurrentSkipList::Recycler
299 explicit ConcurrentSkipList(int height) :
300 head_(NodeType::create(height, value_type(), true)), size_(0) {}
302 size_t size() const { return size_.load(std::memory_order_relaxed); }
304 return head_.load(std::memory_order_consume)->height();
306 int maxLayer() const { return height() - 1; }
308 size_t incrementSize(int delta) {
309 return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
312 // Returns the node if found, nullptr otherwise.
313 NodeType* find(const value_type &data) {
314 auto ret = findNode(data);
315 if (ret.second && !ret.first->markedForRemoval()) return ret.first;
319 // lock all the necessary nodes for changing (adding or removing) the list.
320 // returns true if all the lock acquried successfully and the related nodes
321 // are all validate (not in certain pending states), false otherwise.
322 bool lockNodesForChange(int nodeHeight,
323 ScopedLocker guards[MAX_HEIGHT],
324 NodeType *preds[MAX_HEIGHT],
325 NodeType *succs[MAX_HEIGHT],
327 NodeType *pred, *succ, *prevPred = nullptr;
329 for (int layer = 0; valid && layer < nodeHeight; ++layer) {
331 DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
332 << " nodeheight=" << nodeHeight;
334 if (pred != prevPred) {
335 guards[layer] = pred->acquireGuard();
338 valid = !pred->markedForRemoval() &&
339 pred->skip(layer) == succ; // check again after locking
341 if (adding) { // when adding a node, the succ shouldn't be going away
342 valid = valid && (succ == nullptr || !succ->markedForRemoval());
349 // Returns a paired value:
350 // pair.first always stores the pointer to the node with the same input key.
351 // It could be either the newly added data, or the existed data in the
352 // list with the same key.
353 // pair.second stores whether the data is added successfully:
354 // 0 means not added, otherwise reutrns the new size.
355 std::pair<NodeType*, size_t> addOrGetData(const value_type &data) {
356 NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
361 int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
364 NodeType *nodeFound = succs[layer];
365 DCHECK(nodeFound != nullptr);
366 if (nodeFound->markedForRemoval()) {
367 continue; // if it's getting deleted retry finding node.
369 // wait until fully linked.
370 while (UNLIKELY(!nodeFound->fullyLinked())) {}
371 return std::make_pair(nodeFound, 0);
374 // need to capped at the original height -- the real height may have grown
375 int nodeHeight = detail::SkipListRandomHeight::instance()->
376 getHeight(max_layer + 1);
378 ScopedLocker guards[MAX_HEIGHT];
379 if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
380 continue; // give up the locks and retry until all valid
383 // locks acquired and all valid, need to modify the links under the locks.
384 newNode = NodeType::create(nodeHeight, data);
385 for (int layer = 0; layer < nodeHeight; ++layer) {
386 newNode->setSkip(layer, succs[layer]);
387 preds[layer]->setSkip(layer, newNode);
390 newNode->setFullyLinked();
391 newSize = incrementSize(1);
397 detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
399 if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
402 CHECK_GT(newSize, 0);
403 return std::make_pair(newNode, newSize);
406 bool remove(const value_type &data) {
407 NodeType *nodeToDelete = nullptr;
408 ScopedLocker nodeGuard;
409 bool isMarked = false;
411 NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
415 int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
416 if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
421 nodeToDelete = succs[layer];
422 nodeHeight = nodeToDelete->height();
423 nodeGuard = nodeToDelete->acquireGuard();
424 if (nodeToDelete->markedForRemoval()) return false;
425 nodeToDelete->setMarkedForRemoval();
429 // acquire pred locks from bottom layer up
430 ScopedLocker guards[MAX_HEIGHT];
431 if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
432 continue; // this will unlock all the locks
435 for (int layer = nodeHeight - 1; layer >= 0; --layer) {
436 preds[layer]->setSkip(layer, nodeToDelete->skip(layer));
442 recycle(nodeToDelete);
446 const value_type *first() const {
447 auto node = head_.load(std::memory_order_consume)->skip(0);
448 return node ? &node->data() : nullptr;
451 const value_type *last() const {
452 NodeType *pred = head_.load(std::memory_order_consume);
453 NodeType *node = nullptr;
454 for (int layer = maxLayer(); layer >= 0; --layer) {
456 node = pred->skip(layer);
457 if (node) pred = node;
458 } while (node != nullptr);
460 return pred == head_.load(std::memory_order_relaxed)
461 ? nullptr : &pred->data();
464 static bool okToDelete(NodeType *candidate, int layer) {
465 DCHECK(candidate != nullptr);
466 return candidate->fullyLinked() &&
467 candidate->maxLayer() == layer &&
468 !candidate->markedForRemoval();
471 // find node for insertion/deleting
472 int findInsertionPointGetMaxLayer(const value_type &data,
473 NodeType *preds[], NodeType *succs[], int *max_layer) const {
474 *max_layer = maxLayer();
475 return findInsertionPoint(head_.load(std::memory_order_consume),
476 *max_layer, data, preds, succs);
479 // Find node for access. Returns a paired values:
480 // pair.first = the first node that no-less than data value
481 // pair.second = 1 when the data value is founded, or 0 otherwise.
482 // This is like lower_bound, but not exact: we could have the node marked for
483 // removal so still need to check that.
484 std::pair<NodeType*, int> findNode(const value_type &data) const {
485 return findNodeDownRight(data);
488 // Find node by first stepping down then stepping right. Based on benchmark
489 // results, this is slightly faster than findNodeRightDown for better
490 // localality on the skipping pointers.
491 std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
492 NodeType *pred = head_.load(std::memory_order_consume);
493 int ht = pred->height();
494 NodeType *node = nullptr;
499 for (; ht > 0 && less(data, pred->skip(ht - 1)); --ht) {}
500 if (ht == 0) return std::make_pair(pred->skip(0), 0); // not found
502 node = pred->skip(--ht); // node <= data now
504 while (greater(data, node)) {
506 node = node->skip(ht);
508 found = !less(data, node);
510 return std::make_pair(node, found);
513 // find node by first stepping right then stepping down.
514 // We still keep this for reference purposes.
515 std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
516 NodeType *pred = head_.load(std::memory_order_consume);
517 NodeType *node = nullptr;
518 auto top = maxLayer();
520 for (int layer = top; !found && layer >= 0; --layer) {
521 node = pred->skip(layer);
522 while (greater(data, node)) {
524 node = node->skip(layer);
526 found = !less(data, node);
528 return std::make_pair(node, found);
531 NodeType* lower_bound(const value_type &data) const {
532 auto node = findNode(data).first;
533 while (node != nullptr && node->markedForRemoval()) {
534 node = node->skip(0);
539 void growHeight(int height) {
540 NodeType* oldHead = head_.load(std::memory_order_consume);
541 if (oldHead->height() >= height) { // someone else already did this
545 NodeType* newHead = NodeType::create(height, value_type(), true);
547 { // need to guard the head node in case others are adding/removing
548 // nodes linked to the head.
549 ScopedLocker g = oldHead->acquireGuard();
550 newHead->promoteFrom(oldHead);
551 NodeType* expected = oldHead;
552 if (!head_.compare_exchange_strong(expected, newHead,
553 std::memory_order_release)) {
554 // if someone has already done the swap, just return.
555 NodeType::destroy(newHead);
558 oldHead->setMarkedForRemoval();
563 void recycle(NodeType *node) {
567 std::atomic<NodeType*> head_;
569 std::atomic<size_t> size_;
572 template<typename T, typename Comp, int MAX_HEIGHT>
573 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Accessor {
574 typedef detail::SkipListNode<T> NodeType;
575 typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
577 typedef T value_type;
579 typedef T& reference;
581 typedef const T& const_reference;
582 typedef const T* const_pointer;
583 typedef size_t size_type;
584 typedef Comp key_compare;
585 typedef Comp value_compare;
587 typedef typename SkipListType::iterator iterator;
588 typedef typename SkipListType::const_iterator const_iterator;
589 typedef typename SkipListType::Skipper Skipper;
591 explicit Accessor(boost::shared_ptr<ConcurrentSkipList> skip_list)
592 : slHolder_(std::move(skip_list))
594 sl_ = slHolder_.get();
595 DCHECK(sl_ != nullptr);
596 sl_->recycler_.addRef();
599 // Unsafe initializer: the caller assumes the responsibility to keep
600 // skip_list valid during the whole life cycle of the Acessor.
601 explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
602 DCHECK(sl_ != nullptr);
603 sl_->recycler_.addRef();
606 Accessor(const Accessor &accessor) :
608 slHolder_(accessor.slHolder_) {
609 sl_->recycler_.addRef();
612 Accessor& operator=(const Accessor &accessor) {
613 if (this != &accessor) {
614 slHolder_ = accessor.slHolder_;
615 sl_->recycler_.release();
617 sl_->recycler_.addRef();
623 sl_->recycler_.release();
626 bool empty() const { return sl_->size() == 0; }
627 size_t size() const { return sl_->size(); }
628 size_type max_size() const { return std::numeric_limits<size_type>::max(); }
630 // returns end() if the value is not in the list, otherwise returns an
631 // iterator pointing to the data, and it's guaranteed that the data is valid
632 // as far as the Accessor is hold.
633 iterator find(const key_type &value) { return iterator(sl_->find(value)); }
634 const_iterator find(const key_type &value) const {
635 return iterator(sl_->find(value));
637 size_type count(const key_type &data) const { return contains(data); }
639 iterator begin() const {
640 NodeType* head = sl_->head_.load(std::memory_order_consume);
641 return iterator(head->next());
643 iterator end() const { return iterator(nullptr); }
644 const_iterator cbegin() const { return begin(); }
645 const_iterator cend() const { return end(); }
647 std::pair<iterator, bool> insert(const key_type &data) {
648 auto ret = sl_->addOrGetData(data);
649 return std::make_pair(iterator(ret.first), ret.second);
651 size_t erase(const key_type &data) { return remove(data); }
653 iterator lower_bound(const key_type &data) const {
654 return iterator(sl_->lower_bound(data));
657 size_t height() const { return sl_->height(); }
659 // first() returns pointer to the first element in the skiplist, or
662 // last() returns the pointer to the last element in the skiplist,
663 // nullptr if list is empty.
665 // Note: As concurrent writing can happen, first() is not
666 // guaranteed to be the min_element() in the list. Similarly
667 // last() is not guaranteed to be the max_element(), and both of them can
668 // be invalid (i.e. nullptr), so we name them differently from front() and
670 const key_type *first() const { return sl_->first(); }
671 const key_type *last() const { return sl_->last(); }
673 // Try to remove the last element in the skip list.
675 // Returns true if we removed it, false if either the list is empty
676 // or a race condition happened (i.e. the used-to-be last element
677 // was already removed by another thread).
679 auto last = sl_->last();
680 return last ? sl_->remove(*last) : false;
683 std::pair<key_type*, bool> addOrGetData(const key_type &data) {
684 auto ret = sl_->addOrGetData(data);
685 return std::make_pair(&ret.first->data(), ret.second);
688 SkipListType* skiplist() const { return sl_; }
691 // TODO:(xliu) remove these.
692 // Returns true if the node is added successfully, false if not, i.e. the
693 // node with the same key already existed in the list.
694 bool contains(const key_type &data) const { return sl_->find(data); }
695 bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
696 bool remove(const key_type &data) { return sl_->remove(data); }
700 boost::shared_ptr<SkipListType> slHolder_;
703 // implements forward iterator concept.
704 template<typename ValT, typename NodeT>
705 class detail::csl_iterator :
706 public boost::iterator_facade<csl_iterator<ValT, NodeT>,
707 ValT, boost::forward_traversal_tag> {
709 typedef ValT value_type;
710 typedef value_type& reference;
711 typedef value_type* pointer;
712 typedef ptrdiff_t difference_type;
714 explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
716 template<typename OtherVal, typename OtherNode>
717 csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
718 typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
719 = 0) : node_(other.node_) {}
721 size_t nodeSize() const {
722 return node_ == nullptr ? 0 :
723 node_->height() * sizeof(NodeT*) + sizeof(*this);
726 bool good() const { return node_ != nullptr; }
729 friend class boost::iterator_core_access;
730 template<class,class> friend class csl_iterator;
732 void increment() { node_ = node_->next(); };
733 bool equal(const csl_iterator& other) const { return node_ == other.node_; }
734 value_type& dereference() const { return node_->data(); }
740 template<typename T, typename Comp, int MAX_HEIGHT>
741 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Skipper {
742 typedef detail::SkipListNode<T> NodeType;
743 typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
744 typedef typename SkipListType::Accessor Accessor;
747 typedef T value_type;
748 typedef T& reference;
750 typedef ptrdiff_t difference_type;
752 Skipper(const boost::shared_ptr<SkipListType>& skipList) :
753 accessor_(skipList) {
757 Skipper(const Accessor& accessor) : accessor_(accessor) {
762 // need to cache the head node
763 NodeType* head_node = head();
764 headHeight_ = head_node->height();
765 for (int i = 0; i < headHeight_; ++i) {
766 preds_[i] = head_node;
767 succs_[i] = head_node->skip(i);
769 int max_layer = maxLayer();
770 for (int i = 0; i < max_layer; ++i) {
773 hints_[max_layer] = max_layer;
776 // advance to the next node in the list.
777 Skipper& operator ++() {
778 preds_[0] = succs_[0];
779 succs_[0] = preds_[0]->skip(0);
780 int height = curHeight();
781 for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
782 preds_[i] = succs_[i];
783 succs_[i] = preds_[i]->skip(i);
788 bool good() const { return succs_[0] != nullptr; }
790 int maxLayer() const { return headHeight_ - 1; }
792 int curHeight() const {
793 // need to cap the height to the cached head height, as the current node
794 // might be some newly inserted node and also during the time period the
795 // head height may have grown.
796 return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
799 const value_type &data() const {
800 DCHECK(succs_[0] != NULL);
801 return succs_[0]->data();
804 value_type &operator *() const {
805 DCHECK(succs_[0] != NULL);
806 return succs_[0]->data();
809 value_type *operator->() {
810 DCHECK(succs_[0] != NULL);
811 return &succs_[0]->data();
815 * Skip to the position whose data is no less than the parameter.
816 * (I.e. the lower_bound).
818 * Returns true if the data is found, false otherwise.
820 bool to(const value_type &data) {
821 int layer = curHeight() - 1;
822 if (layer < 0) return false; // reaches the end of the list
824 int lyr = hints_[layer];
825 int max_layer = maxLayer();
826 while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
829 hints_[layer] = lyr; // update the hint
831 int foundLayer = SkipListType::
832 findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
833 if (foundLayer < 0) return false;
835 DCHECK(succs_[0] != NULL) << "lyr=" << lyr << "; max_layer=" << max_layer;
836 return !succs_[0]->markedForRemoval();
840 NodeType* head() const {
841 return accessor_.skiplist()->head_.load(std::memory_order_consume);
846 NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
847 uint8_t hints_[MAX_HEIGHT];
852 #endif // FOLLY_CONCURRENT_SKIP_LIST_H_