fix build when sanitizers are enabled and jemalloc is disabled
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #pragma once
121
122 #include <algorithm>
123 #include <atomic>
124 #include <limits>
125 #include <memory>
126 #include <type_traits>
127
128 #include <boost/iterator/iterator_facade.hpp>
129 #include <glog/logging.h>
130
131 #include <folly/ConcurrentSkipList-inl.h>
132 #include <folly/Likely.h>
133 #include <folly/Memory.h>
134 #include <folly/MicroSpinLock.h>
135
136 namespace folly {
137
138 template <
139     typename T,
140     typename Comp = std::less<T>,
141     // All nodes are allocated using provided SimpleAllocator,
142     // it should be thread-safe.
143     typename NodeAlloc = SysAlloc,
144     int MAX_HEIGHT = 24>
145 class ConcurrentSkipList {
146   // MAX_HEIGHT needs to be at least 2 to suppress compiler
147   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
148   // being treated as a scalar in the compiler).
149   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
150       "MAX_HEIGHT can only be in the range of [2, 64)");
151   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
152   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
153
154  public:
155   typedef detail::SkipListNode<T> NodeType;
156   typedef T value_type;
157   typedef T key_type;
158
159   typedef detail::csl_iterator<value_type, NodeType> iterator;
160   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
161
162   class Accessor;
163   class Skipper;
164
165   explicit ConcurrentSkipList(int height, const NodeAlloc& alloc)
166       : recycler_(alloc),
167         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
168         size_(0) {}
169
170   explicit ConcurrentSkipList(int height)
171       : recycler_(),
172         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
173         size_(0) {}
174
175   // Convenient function to get an Accessor to a new instance.
176   static Accessor create(int height, const NodeAlloc& alloc) {
177     return Accessor(createInstance(height, alloc));
178   }
179
180   static Accessor create(int height = 1) {
181     return Accessor(createInstance(height));
182   }
183
184   // Create a shared_ptr skiplist object with initial head height.
185   static std::shared_ptr<SkipListType> createInstance(int height,
186                                                       const NodeAlloc& alloc) {
187     return std::make_shared<ConcurrentSkipList>(height, alloc);
188   }
189
190   static std::shared_ptr<SkipListType> createInstance(int height = 1) {
191     return std::make_shared<ConcurrentSkipList>(height);
192   }
193
194   //===================================================================
195   // Below are implementation details.
196   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
197   //===================================================================
198
199   ~ConcurrentSkipList() {
200     /* static */ if (NodeType::template DestroyIsNoOp<NodeAlloc>::value) {
201       // Avoid traversing the list if using arena allocator.
202       return;
203     }
204     for (NodeType* current = head_.load(std::memory_order_relaxed); current; ) {
205       NodeType* tmp = current->skip(0);
206       NodeType::destroy(recycler_.alloc(), current);
207       current = tmp;
208     }
209   }
210
211  private:
212   static bool greater(const value_type &data, const NodeType *node) {
213     return node && Comp()(node->data(), data);
214   }
215
216   static bool less(const value_type &data, const NodeType *node) {
217     return (node == nullptr) || Comp()(data, node->data());
218   }
219
220   static int findInsertionPoint(NodeType *cur, int cur_layer,
221       const value_type &data,
222       NodeType *preds[], NodeType *succs[]) {
223     int foundLayer = -1;
224     NodeType *pred = cur;
225     NodeType *foundNode = nullptr;
226     for (int layer = cur_layer; layer >= 0; --layer) {
227       NodeType *node = pred->skip(layer);
228       while (greater(data, node)) {
229         pred = node;
230         node = node->skip(layer);
231       }
232       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
233         foundLayer = layer;
234         foundNode = node;
235       }
236       preds[layer] = pred;
237
238       // if found, succs[0..foundLayer] need to point to the cached foundNode,
239       // as foundNode might be deleted at the same time thus pred->skip() can
240       // return nullptr or another node.
241       succs[layer] = foundNode ? foundNode : node;
242     }
243     return foundLayer;
244   }
245
246   size_t size() const { return size_.load(std::memory_order_relaxed); }
247
248   int height() const {
249     return head_.load(std::memory_order_consume)->height();
250   }
251
252   int maxLayer() const { return height() - 1; }
253
254   size_t incrementSize(int delta) {
255     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
256   }
257
258   // Returns the node if found, nullptr otherwise.
259   NodeType* find(const value_type &data) {
260     auto ret = findNode(data);
261     if (ret.second && !ret.first->markedForRemoval()) {
262       return ret.first;
263     }
264     return nullptr;
265   }
266
267   // lock all the necessary nodes for changing (adding or removing) the list.
268   // returns true if all the lock acquried successfully and the related nodes
269   // are all validate (not in certain pending states), false otherwise.
270   bool lockNodesForChange(int nodeHeight,
271       ScopedLocker guards[MAX_HEIGHT],
272       NodeType *preds[MAX_HEIGHT],
273       NodeType *succs[MAX_HEIGHT],
274       bool adding=true) {
275     NodeType *pred, *succ, *prevPred = nullptr;
276     bool valid = true;
277     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
278       pred = preds[layer];
279       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
280         << " nodeheight=" << nodeHeight;
281       succ = succs[layer];
282       if (pred != prevPred) {
283         guards[layer] = pred->acquireGuard();
284         prevPred = pred;
285       }
286       valid = !pred->markedForRemoval() &&
287         pred->skip(layer) == succ;  // check again after locking
288
289       if (adding) {  // when adding a node, the succ shouldn't be going away
290         valid = valid && (succ == nullptr || !succ->markedForRemoval());
291       }
292     }
293
294     return valid;
295   }
296
297   // Returns a paired value:
298   //   pair.first always stores the pointer to the node with the same input key.
299   //     It could be either the newly added data, or the existed data in the
300   //     list with the same key.
301   //   pair.second stores whether the data is added successfully:
302   //     0 means not added, otherwise reutrns the new size.
303   template <typename U>
304   std::pair<NodeType*, size_t> addOrGetData(U &&data) {
305     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
306     NodeType *newNode;
307     size_t newSize;
308     while (true) {
309       int max_layer = 0;
310       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
311
312       if (layer >= 0) {
313         NodeType *nodeFound = succs[layer];
314         DCHECK(nodeFound != nullptr);
315         if (nodeFound->markedForRemoval()) {
316           continue;  // if it's getting deleted retry finding node.
317         }
318         // wait until fully linked.
319         while (UNLIKELY(!nodeFound->fullyLinked())) {}
320         return std::make_pair(nodeFound, 0);
321       }
322
323       // need to capped at the original height -- the real height may have grown
324       int nodeHeight = detail::SkipListRandomHeight::instance()->
325         getHeight(max_layer + 1);
326
327       ScopedLocker guards[MAX_HEIGHT];
328       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
329         continue; // give up the locks and retry until all valid
330       }
331
332       // locks acquired and all valid, need to modify the links under the locks.
333       newNode =
334         NodeType::create(recycler_.alloc(), nodeHeight, std::forward<U>(data));
335       for (int k = 0; k < nodeHeight; ++k) {
336         newNode->setSkip(k, succs[k]);
337         preds[k]->setSkip(k, newNode);
338       }
339
340       newNode->setFullyLinked();
341       newSize = incrementSize(1);
342       break;
343     }
344
345     int hgt = height();
346     size_t sizeLimit =
347       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
348
349     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
350       growHeight(hgt + 1);
351     }
352     CHECK_GT(newSize, 0);
353     return std::make_pair(newNode, newSize);
354   }
355
356   bool remove(const value_type &data) {
357     NodeType *nodeToDelete = nullptr;
358     ScopedLocker nodeGuard;
359     bool isMarked = false;
360     int nodeHeight = 0;
361     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
362
363     while (true) {
364       int max_layer = 0;
365       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
366       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
367         return false;
368       }
369
370       if (!isMarked) {
371         nodeToDelete = succs[layer];
372         nodeHeight = nodeToDelete->height();
373         nodeGuard = nodeToDelete->acquireGuard();
374         if (nodeToDelete->markedForRemoval()) {
375           return false;
376         }
377         nodeToDelete->setMarkedForRemoval();
378         isMarked = true;
379       }
380
381       // acquire pred locks from bottom layer up
382       ScopedLocker guards[MAX_HEIGHT];
383       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
384         continue;  // this will unlock all the locks
385       }
386
387       for (int k = nodeHeight - 1; k >= 0; --k) {
388         preds[k]->setSkip(k, nodeToDelete->skip(k));
389       }
390
391       incrementSize(-1);
392       break;
393     }
394     recycle(nodeToDelete);
395     return true;
396   }
397
398   const value_type *first() const {
399     auto node = head_.load(std::memory_order_consume)->skip(0);
400     return node ? &node->data() : nullptr;
401   }
402
403   const value_type *last() const {
404     NodeType *pred = head_.load(std::memory_order_consume);
405     NodeType *node = nullptr;
406     for (int layer = maxLayer(); layer >= 0; --layer) {
407       do {
408         node = pred->skip(layer);
409         if (node) {
410           pred = node;
411         }
412       } while (node != nullptr);
413     }
414     return pred == head_.load(std::memory_order_relaxed)
415       ? nullptr : &pred->data();
416   }
417
418   static bool okToDelete(NodeType *candidate, int layer) {
419     DCHECK(candidate != nullptr);
420     return candidate->fullyLinked() &&
421       candidate->maxLayer() == layer &&
422       !candidate->markedForRemoval();
423   }
424
425   // find node for insertion/deleting
426   int findInsertionPointGetMaxLayer(const value_type &data,
427       NodeType *preds[], NodeType *succs[], int *max_layer) const {
428     *max_layer = maxLayer();
429     return findInsertionPoint(head_.load(std::memory_order_consume),
430       *max_layer, data, preds, succs);
431   }
432
433   // Find node for access. Returns a paired values:
434   // pair.first = the first node that no-less than data value
435   // pair.second = 1 when the data value is founded, or 0 otherwise.
436   // This is like lower_bound, but not exact: we could have the node marked for
437   // removal so still need to check that.
438   std::pair<NodeType*, int> findNode(const value_type &data) const {
439     return findNodeDownRight(data);
440   }
441
442   // Find node by first stepping down then stepping right. Based on benchmark
443   // results, this is slightly faster than findNodeRightDown for better
444   // localality on the skipping pointers.
445   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
446     NodeType *pred = head_.load(std::memory_order_consume);
447     int ht = pred->height();
448     NodeType *node = nullptr;
449
450     bool found = false;
451     while (!found) {
452       // stepping down
453       for (; ht > 0 && less(data, node = pred->skip(ht - 1)); --ht) {}
454       if (ht == 0) {
455         return std::make_pair(node, 0); // not found
456       }
457       // node <= data now, but we need to fix up ht
458       --ht;
459
460       // stepping right
461       while (greater(data, node)) {
462         pred = node;
463         node = node->skip(ht);
464       }
465       found = !less(data, node);
466     }
467     return std::make_pair(node, found);
468   }
469
470   // find node by first stepping right then stepping down.
471   // We still keep this for reference purposes.
472   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
473     NodeType *pred = head_.load(std::memory_order_consume);
474     NodeType *node = nullptr;
475     auto top = maxLayer();
476     int found = 0;
477     for (int layer = top; !found && layer >= 0; --layer) {
478       node = pred->skip(layer);
479       while (greater(data, node)) {
480         pred = node;
481         node = node->skip(layer);
482       }
483       found = !less(data, node);
484     }
485     return std::make_pair(node, found);
486   }
487
488   NodeType* lower_bound(const value_type &data) const {
489     auto node = findNode(data).first;
490     while (node != nullptr && node->markedForRemoval()) {
491       node = node->skip(0);
492     }
493     return node;
494   }
495
496   void growHeight(int height) {
497     NodeType* oldHead = head_.load(std::memory_order_consume);
498     if (oldHead->height() >= height) {  // someone else already did this
499       return;
500     }
501
502     NodeType* newHead =
503       NodeType::create(recycler_.alloc(), height, value_type(), true);
504
505     { // need to guard the head node in case others are adding/removing
506       // nodes linked to the head.
507       ScopedLocker g = oldHead->acquireGuard();
508       newHead->copyHead(oldHead);
509       NodeType* expected = oldHead;
510       if (!head_.compare_exchange_strong(expected, newHead,
511           std::memory_order_release)) {
512         // if someone has already done the swap, just return.
513         NodeType::destroy(recycler_.alloc(), newHead);
514         return;
515       }
516       oldHead->setMarkedForRemoval();
517     }
518     recycle(oldHead);
519   }
520
521   void recycle(NodeType *node) {
522     recycler_.add(node);
523   }
524
525   detail::NodeRecycler<NodeType, NodeAlloc> recycler_;
526   std::atomic<NodeType*> head_;
527   std::atomic<size_t> size_;
528 };
529
530 template <typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
531 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Accessor {
532   typedef detail::SkipListNode<T> NodeType;
533   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
534  public:
535   typedef T value_type;
536   typedef T key_type;
537   typedef T& reference;
538   typedef T* pointer;
539   typedef const T& const_reference;
540   typedef const T* const_pointer;
541   typedef size_t size_type;
542   typedef Comp key_compare;
543   typedef Comp value_compare;
544
545   typedef typename SkipListType::iterator iterator;
546   typedef typename SkipListType::const_iterator const_iterator;
547   typedef typename SkipListType::Skipper Skipper;
548
549   explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
550     : slHolder_(std::move(skip_list))
551   {
552     sl_ = slHolder_.get();
553     DCHECK(sl_ != nullptr);
554     sl_->recycler_.addRef();
555   }
556
557   // Unsafe initializer: the caller assumes the responsibility to keep
558   // skip_list valid during the whole life cycle of the Acessor.
559   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
560     DCHECK(sl_ != nullptr);
561     sl_->recycler_.addRef();
562   }
563
564   Accessor(const Accessor &accessor) :
565       sl_(accessor.sl_),
566       slHolder_(accessor.slHolder_) {
567     sl_->recycler_.addRef();
568   }
569
570   Accessor& operator=(const Accessor &accessor) {
571     if (this != &accessor) {
572       slHolder_ = accessor.slHolder_;
573       sl_->recycler_.releaseRef();
574       sl_ = accessor.sl_;
575       sl_->recycler_.addRef();
576     }
577     return *this;
578   }
579
580   ~Accessor() {
581     sl_->recycler_.releaseRef();
582   }
583
584   bool empty() const { return sl_->size() == 0; }
585   size_t size() const { return sl_->size(); }
586   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
587
588   // returns end() if the value is not in the list, otherwise returns an
589   // iterator pointing to the data, and it's guaranteed that the data is valid
590   // as far as the Accessor is hold.
591   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
592   const_iterator find(const key_type &value) const {
593     return iterator(sl_->find(value));
594   }
595   size_type count(const key_type &data) const { return contains(data); }
596
597   iterator begin() const {
598     NodeType* head = sl_->head_.load(std::memory_order_consume);
599     return iterator(head->next());
600   }
601   iterator end() const { return iterator(nullptr); }
602   const_iterator cbegin() const { return begin(); }
603   const_iterator cend() const { return end(); }
604
605   template <
606       typename U,
607       typename =
608           typename std::enable_if<std::is_convertible<U, T>::value>::type>
609   std::pair<iterator, bool> insert(U&& data) {
610     auto ret = sl_->addOrGetData(std::forward<U>(data));
611     return std::make_pair(iterator(ret.first), ret.second);
612   }
613   size_t erase(const key_type &data) { return remove(data); }
614
615   iterator lower_bound(const key_type &data) const {
616     return iterator(sl_->lower_bound(data));
617   }
618
619   size_t height() const { return sl_->height(); }
620
621   // first() returns pointer to the first element in the skiplist, or
622   // nullptr if empty.
623   //
624   // last() returns the pointer to the last element in the skiplist,
625   // nullptr if list is empty.
626   //
627   // Note: As concurrent writing can happen, first() is not
628   //   guaranteed to be the min_element() in the list. Similarly
629   //   last() is not guaranteed to be the max_element(), and both of them can
630   //   be invalid (i.e. nullptr), so we name them differently from front() and
631   //   tail() here.
632   const key_type *first() const { return sl_->first(); }
633   const key_type *last() const { return sl_->last(); }
634
635   // Try to remove the last element in the skip list.
636   //
637   // Returns true if we removed it, false if either the list is empty
638   // or a race condition happened (i.e. the used-to-be last element
639   // was already removed by another thread).
640   bool pop_back() {
641     auto last = sl_->last();
642     return last ? sl_->remove(*last) : false;
643   }
644
645   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
646     auto ret = sl_->addOrGetData(data);
647     return std::make_pair(&ret.first->data(), ret.second);
648   }
649
650   SkipListType* skiplist() const { return sl_; }
651
652   // legacy interfaces
653   // TODO:(xliu) remove these.
654   // Returns true if the node is added successfully, false if not, i.e. the
655   // node with the same key already existed in the list.
656   bool contains(const key_type &data) const { return sl_->find(data); }
657   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
658   bool remove(const key_type &data) { return sl_->remove(data); }
659
660  private:
661   SkipListType *sl_;
662   std::shared_ptr<SkipListType> slHolder_;
663 };
664
665 // implements forward iterator concept.
666 template <typename ValT, typename NodeT>
667 class detail::csl_iterator :
668   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
669     ValT, boost::forward_traversal_tag> {
670  public:
671   typedef ValT value_type;
672   typedef value_type& reference;
673   typedef value_type* pointer;
674   typedef ptrdiff_t difference_type;
675
676   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
677
678   template <typename OtherVal, typename OtherNode>
679   csl_iterator(
680       const csl_iterator<OtherVal, OtherNode>& other,
681       typename std::enable_if<
682           std::is_convertible<OtherVal, ValT>::value>::type* = nullptr)
683       : node_(other.node_) {}
684
685   size_t nodeSize() const {
686     return node_ == nullptr ? 0 :
687       node_->height() * sizeof(NodeT*) + sizeof(*this);
688   }
689
690   bool good() const { return node_ != nullptr; }
691
692  private:
693   friend class boost::iterator_core_access;
694   template <class, class> friend class csl_iterator;
695
696   void increment() { node_ = node_->next(); }
697   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
698   value_type& dereference() const { return node_->data(); }
699
700   NodeT* node_;
701 };
702
703 // Skipper interface
704 template <typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
705 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Skipper {
706   typedef detail::SkipListNode<T> NodeType;
707   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
708   typedef typename SkipListType::Accessor Accessor;
709
710  public:
711   typedef T  value_type;
712   typedef T& reference;
713   typedef T* pointer;
714   typedef ptrdiff_t difference_type;
715
716   Skipper(const std::shared_ptr<SkipListType>& skipList) :
717     accessor_(skipList) {
718     init();
719   }
720
721   Skipper(const Accessor& accessor) : accessor_(accessor) {
722     init();
723   }
724
725   void init() {
726     // need to cache the head node
727     NodeType* head_node = head();
728     headHeight_ = head_node->height();
729     for (int i = 0; i < headHeight_; ++i) {
730       preds_[i] = head_node;
731       succs_[i] = head_node->skip(i);
732     }
733     int max_layer = maxLayer();
734     for (int i = 0; i < max_layer; ++i) {
735       hints_[i] = uint8_t(i + 1);
736     }
737     hints_[max_layer] = max_layer;
738   }
739
740   // advance to the next node in the list.
741   Skipper& operator ++() {
742     preds_[0] = succs_[0];
743     succs_[0] = preds_[0]->skip(0);
744     int height = curHeight();
745     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
746       preds_[i] = succs_[i];
747       succs_[i] = preds_[i]->skip(i);
748     }
749     return *this;
750   }
751
752   bool good() const { return succs_[0] != nullptr; }
753
754   int maxLayer() const { return headHeight_ - 1; }
755
756   int curHeight() const {
757     // need to cap the height to the cached head height, as the current node
758     // might be some newly inserted node and also during the time period the
759     // head height may have grown.
760     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
761   }
762
763   const value_type &data() const {
764     DCHECK(succs_[0] != nullptr);
765     return succs_[0]->data();
766   }
767
768   value_type &operator *() const {
769     DCHECK(succs_[0] != nullptr);
770     return succs_[0]->data();
771   }
772
773   value_type *operator->() {
774     DCHECK(succs_[0] != nullptr);
775     return &succs_[0]->data();
776   }
777
778   /*
779    * Skip to the position whose data is no less than the parameter.
780    * (I.e. the lower_bound).
781    *
782    * Returns true if the data is found, false otherwise.
783    */
784   bool to(const value_type &data) {
785     int layer = curHeight() - 1;
786     if (layer < 0) {
787       return false; // reaches the end of the list
788     }
789
790     int lyr = hints_[layer];
791     int max_layer = maxLayer();
792     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
793       ++lyr;
794     }
795     hints_[layer] = lyr;  // update the hint
796
797     int foundLayer = SkipListType::
798       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
799     if (foundLayer < 0) {
800       return false;
801     }
802
803     DCHECK(succs_[0] != nullptr) << "lyr=" << lyr
804                                  << "; max_layer=" << max_layer;
805     return !succs_[0]->markedForRemoval();
806   }
807
808  private:
809   NodeType* head() const {
810     return accessor_.skiplist()->head_.load(std::memory_order_consume);
811   }
812
813   Accessor accessor_;
814   int headHeight_;
815   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
816   uint8_t hints_[MAX_HEIGHT];
817 };
818
819 } // namespace folly