Add a default timeout parameter to HHWheelTimer.
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #ifndef FOLLY_CONCURRENT_SKIP_LIST_H_
121 #define FOLLY_CONCURRENT_SKIP_LIST_H_
122
123 #include <algorithm>
124 #include <atomic>
125 #include <limits>
126 #include <memory>
127 #include <type_traits>
128 #include <boost/iterator/iterator_facade.hpp>
129 #include <glog/logging.h>
130
131 #include <folly/ConcurrentSkipList-inl.h>
132 #include <folly/Likely.h>
133 #include <folly/Memory.h>
134 #include <folly/MicroSpinLock.h>
135
136 namespace folly {
137
138 template<typename T,
139          typename Comp = std::less<T>,
140          // All nodes are allocated using provided SimpleAllocator,
141          // it should be thread-safe.
142          typename NodeAlloc = SysAlloc,
143          int MAX_HEIGHT = 24>
144 class ConcurrentSkipList {
145   // MAX_HEIGHT needs to be at least 2 to suppress compiler
146   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
147   // being treated as a scalar in the compiler).
148   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
149       "MAX_HEIGHT can only be in the range of [2, 64)");
150   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
151   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
152
153  public:
154   typedef detail::SkipListNode<T> NodeType;
155   typedef T value_type;
156   typedef T key_type;
157
158   typedef detail::csl_iterator<value_type, NodeType> iterator;
159   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
160
161   class Accessor;
162   class Skipper;
163
164   explicit ConcurrentSkipList(int height, const NodeAlloc& alloc = NodeAlloc())
165     : recycler_(alloc),
166       head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
167       size_(0) { }
168
169   // Convenient function to get an Accessor to a new instance.
170   static Accessor create(int height = 1, const NodeAlloc& alloc = NodeAlloc()) {
171     return Accessor(createInstance(height, alloc));
172   }
173
174   // Create a shared_ptr skiplist object with initial head height.
175   static std::shared_ptr<SkipListType> createInstance(
176       int height = 1, const NodeAlloc& alloc = NodeAlloc()) {
177     return std::make_shared<ConcurrentSkipList>(height, alloc);
178   }
179
180   //===================================================================
181   // Below are implementation details.
182   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
183   //===================================================================
184
185   ~ConcurrentSkipList() {
186     /* static */ if (NodeType::template destroyIsNoOp<NodeAlloc>()) {
187       // Avoid traversing the list if using arena allocator.
188       return;
189     }
190     for (NodeType* current = head_.load(std::memory_order_relaxed); current; ) {
191       NodeType* tmp = current->skip(0);
192       NodeType::destroy(recycler_.alloc(), current);
193       current = tmp;
194     }
195   }
196
197  private:
198   static bool greater(const value_type &data, const NodeType *node) {
199     return node && Comp()(node->data(), data);
200   }
201
202   static bool less(const value_type &data, const NodeType *node) {
203     return (node == nullptr) || Comp()(data, node->data());
204   }
205
206   static int findInsertionPoint(NodeType *cur, int cur_layer,
207       const value_type &data,
208       NodeType *preds[], NodeType *succs[]) {
209     int foundLayer = -1;
210     NodeType *pred = cur;
211     NodeType *foundNode = nullptr;
212     for (int layer = cur_layer; layer >= 0; --layer) {
213       NodeType *node = pred->skip(layer);
214       while (greater(data, node)) {
215         pred = node;
216         node = node->skip(layer);
217       }
218       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
219         foundLayer = layer;
220         foundNode = node;
221       }
222       preds[layer] = pred;
223
224       // if found, succs[0..foundLayer] need to point to the cached foundNode,
225       // as foundNode might be deleted at the same time thus pred->skip() can
226       // return NULL or another node.
227       succs[layer] = foundNode ? foundNode : node;
228     }
229     return foundLayer;
230   }
231
232   size_t size() const { return size_.load(std::memory_order_relaxed); }
233
234   int height() const {
235     return head_.load(std::memory_order_consume)->height();
236   }
237
238   int maxLayer() const { return height() - 1; }
239
240   size_t incrementSize(int delta) {
241     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
242   }
243
244   // Returns the node if found, nullptr otherwise.
245   NodeType* find(const value_type &data) {
246     auto ret = findNode(data);
247     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
248     return nullptr;
249   }
250
251   // lock all the necessary nodes for changing (adding or removing) the list.
252   // returns true if all the lock acquried successfully and the related nodes
253   // are all validate (not in certain pending states), false otherwise.
254   bool lockNodesForChange(int nodeHeight,
255       ScopedLocker guards[MAX_HEIGHT],
256       NodeType *preds[MAX_HEIGHT],
257       NodeType *succs[MAX_HEIGHT],
258       bool adding=true) {
259     NodeType *pred, *succ, *prevPred = nullptr;
260     bool valid = true;
261     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
262       pred = preds[layer];
263       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
264         << " nodeheight=" << nodeHeight;
265       succ = succs[layer];
266       if (pred != prevPred) {
267         guards[layer] = pred->acquireGuard();
268         prevPred = pred;
269       }
270       valid = !pred->markedForRemoval() &&
271         pred->skip(layer) == succ;  // check again after locking
272
273       if (adding) {  // when adding a node, the succ shouldn't be going away
274         valid = valid && (succ == nullptr || !succ->markedForRemoval());
275       }
276     }
277
278     return valid;
279   }
280
281   // Returns a paired value:
282   //   pair.first always stores the pointer to the node with the same input key.
283   //     It could be either the newly added data, or the existed data in the
284   //     list with the same key.
285   //   pair.second stores whether the data is added successfully:
286   //     0 means not added, otherwise reutrns the new size.
287   template<typename U>
288   std::pair<NodeType*, size_t> addOrGetData(U &&data) {
289     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
290     NodeType *newNode;
291     size_t newSize;
292     while (true) {
293       int max_layer = 0;
294       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
295
296       if (layer >= 0) {
297         NodeType *nodeFound = succs[layer];
298         DCHECK(nodeFound != nullptr);
299         if (nodeFound->markedForRemoval()) {
300           continue;  // if it's getting deleted retry finding node.
301         }
302         // wait until fully linked.
303         while (UNLIKELY(!nodeFound->fullyLinked())) {}
304         return std::make_pair(nodeFound, 0);
305       }
306
307       // need to capped at the original height -- the real height may have grown
308       int nodeHeight = detail::SkipListRandomHeight::instance()->
309         getHeight(max_layer + 1);
310
311       ScopedLocker guards[MAX_HEIGHT];
312       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
313         continue; // give up the locks and retry until all valid
314       }
315
316       // locks acquired and all valid, need to modify the links under the locks.
317       newNode =
318         NodeType::create(recycler_.alloc(), nodeHeight, std::forward<U>(data));
319       for (int layer = 0; layer < nodeHeight; ++layer) {
320         newNode->setSkip(layer, succs[layer]);
321         preds[layer]->setSkip(layer, newNode);
322       }
323
324       newNode->setFullyLinked();
325       newSize = incrementSize(1);
326       break;
327     }
328
329     int hgt = height();
330     size_t sizeLimit =
331       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
332
333     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
334       growHeight(hgt + 1);
335     }
336     CHECK_GT(newSize, 0);
337     return std::make_pair(newNode, newSize);
338   }
339
340   bool remove(const value_type &data) {
341     NodeType *nodeToDelete = nullptr;
342     ScopedLocker nodeGuard;
343     bool isMarked = false;
344     int nodeHeight = 0;
345     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
346
347     while (true) {
348       int max_layer = 0;
349       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
350       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
351         return false;
352       }
353
354       if (!isMarked) {
355         nodeToDelete = succs[layer];
356         nodeHeight = nodeToDelete->height();
357         nodeGuard = nodeToDelete->acquireGuard();
358         if (nodeToDelete->markedForRemoval()) return false;
359         nodeToDelete->setMarkedForRemoval();
360         isMarked = true;
361       }
362
363       // acquire pred locks from bottom layer up
364       ScopedLocker guards[MAX_HEIGHT];
365       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
366         continue;  // this will unlock all the locks
367       }
368
369       for (int layer = nodeHeight - 1; layer >= 0; --layer) {
370         preds[layer]->setSkip(layer, nodeToDelete->skip(layer));
371       }
372
373       incrementSize(-1);
374       break;
375     }
376     recycle(nodeToDelete);
377     return true;
378   }
379
380   const value_type *first() const {
381     auto node = head_.load(std::memory_order_consume)->skip(0);
382     return node ? &node->data() : nullptr;
383   }
384
385   const value_type *last() const {
386     NodeType *pred = head_.load(std::memory_order_consume);
387     NodeType *node = nullptr;
388     for (int layer = maxLayer(); layer >= 0; --layer) {
389       do {
390         node = pred->skip(layer);
391         if (node) pred = node;
392       } while (node != nullptr);
393     }
394     return pred == head_.load(std::memory_order_relaxed)
395       ? nullptr : &pred->data();
396   }
397
398   static bool okToDelete(NodeType *candidate, int layer) {
399     DCHECK(candidate != nullptr);
400     return candidate->fullyLinked() &&
401       candidate->maxLayer() == layer &&
402       !candidate->markedForRemoval();
403   }
404
405   // find node for insertion/deleting
406   int findInsertionPointGetMaxLayer(const value_type &data,
407       NodeType *preds[], NodeType *succs[], int *max_layer) const {
408     *max_layer = maxLayer();
409     return findInsertionPoint(head_.load(std::memory_order_consume),
410       *max_layer, data, preds, succs);
411   }
412
413   // Find node for access. Returns a paired values:
414   // pair.first = the first node that no-less than data value
415   // pair.second = 1 when the data value is founded, or 0 otherwise.
416   // This is like lower_bound, but not exact: we could have the node marked for
417   // removal so still need to check that.
418   std::pair<NodeType*, int> findNode(const value_type &data) const {
419     return findNodeDownRight(data);
420   }
421
422   // Find node by first stepping down then stepping right. Based on benchmark
423   // results, this is slightly faster than findNodeRightDown for better
424   // localality on the skipping pointers.
425   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
426     NodeType *pred = head_.load(std::memory_order_consume);
427     int ht = pred->height();
428     NodeType *node = nullptr;
429
430     bool found = false;
431     while (!found) {
432       // stepping down
433       for (; ht > 0 && less(data, node = pred->skip(ht - 1)); --ht) {}
434       if (ht == 0) return std::make_pair(node, 0);  // not found
435       // node <= data now, but we need to fix up ht
436       --ht;
437
438       // stepping right
439       while (greater(data, node)) {
440         pred = node;
441         node = node->skip(ht);
442       }
443       found = !less(data, node);
444     }
445     return std::make_pair(node, found);
446   }
447
448   // find node by first stepping right then stepping down.
449   // We still keep this for reference purposes.
450   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
451     NodeType *pred = head_.load(std::memory_order_consume);
452     NodeType *node = nullptr;
453     auto top = maxLayer();
454     int found = 0;
455     for (int layer = top; !found && layer >= 0; --layer) {
456       node = pred->skip(layer);
457       while (greater(data, node)) {
458         pred = node;
459         node = node->skip(layer);
460       }
461       found = !less(data, node);
462     }
463     return std::make_pair(node, found);
464   }
465
466   NodeType* lower_bound(const value_type &data) const {
467     auto node = findNode(data).first;
468     while (node != nullptr && node->markedForRemoval()) {
469       node = node->skip(0);
470     }
471     return node;
472   }
473
474   void growHeight(int height) {
475     NodeType* oldHead = head_.load(std::memory_order_consume);
476     if (oldHead->height() >= height) {  // someone else already did this
477       return;
478     }
479
480     NodeType* newHead =
481       NodeType::create(recycler_.alloc(), height, value_type(), true);
482
483     { // need to guard the head node in case others are adding/removing
484       // nodes linked to the head.
485       ScopedLocker g = oldHead->acquireGuard();
486       newHead->copyHead(oldHead);
487       NodeType* expected = oldHead;
488       if (!head_.compare_exchange_strong(expected, newHead,
489           std::memory_order_release)) {
490         // if someone has already done the swap, just return.
491         NodeType::destroy(recycler_.alloc(), newHead);
492         return;
493       }
494       oldHead->setMarkedForRemoval();
495     }
496     recycle(oldHead);
497   }
498
499   void recycle(NodeType *node) {
500     recycler_.add(node);
501   }
502
503   detail::NodeRecycler<NodeType, NodeAlloc> recycler_;
504   std::atomic<NodeType*> head_;
505   std::atomic<size_t> size_;
506 };
507
508 template<typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
509 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Accessor {
510   typedef detail::SkipListNode<T> NodeType;
511   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
512  public:
513   typedef T value_type;
514   typedef T key_type;
515   typedef T& reference;
516   typedef T* pointer;
517   typedef const T& const_reference;
518   typedef const T* const_pointer;
519   typedef size_t size_type;
520   typedef Comp key_compare;
521   typedef Comp value_compare;
522
523   typedef typename SkipListType::iterator iterator;
524   typedef typename SkipListType::const_iterator const_iterator;
525   typedef typename SkipListType::Skipper Skipper;
526
527   explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
528     : slHolder_(std::move(skip_list))
529   {
530     sl_ = slHolder_.get();
531     DCHECK(sl_ != nullptr);
532     sl_->recycler_.addRef();
533   }
534
535   // Unsafe initializer: the caller assumes the responsibility to keep
536   // skip_list valid during the whole life cycle of the Acessor.
537   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
538     DCHECK(sl_ != nullptr);
539     sl_->recycler_.addRef();
540   }
541
542   Accessor(const Accessor &accessor) :
543       sl_(accessor.sl_),
544       slHolder_(accessor.slHolder_) {
545     sl_->recycler_.addRef();
546   }
547
548   Accessor& operator=(const Accessor &accessor) {
549     if (this != &accessor) {
550       slHolder_ = accessor.slHolder_;
551       sl_->recycler_.releaseRef();
552       sl_ = accessor.sl_;
553       sl_->recycler_.addRef();
554     }
555     return *this;
556   }
557
558   ~Accessor() {
559     sl_->recycler_.releaseRef();
560   }
561
562   bool empty() const { return sl_->size() == 0; }
563   size_t size() const { return sl_->size(); }
564   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
565
566   // returns end() if the value is not in the list, otherwise returns an
567   // iterator pointing to the data, and it's guaranteed that the data is valid
568   // as far as the Accessor is hold.
569   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
570   const_iterator find(const key_type &value) const {
571     return iterator(sl_->find(value));
572   }
573   size_type count(const key_type &data) const { return contains(data); }
574
575   iterator begin() const {
576     NodeType* head = sl_->head_.load(std::memory_order_consume);
577     return iterator(head->next());
578   }
579   iterator end() const { return iterator(nullptr); }
580   const_iterator cbegin() const { return begin(); }
581   const_iterator cend() const { return end(); }
582
583   template<typename U,
584     typename=typename std::enable_if<std::is_convertible<U, T>::value>::type>
585   std::pair<iterator, bool> insert(U&& data) {
586     auto ret = sl_->addOrGetData(std::forward<U>(data));
587     return std::make_pair(iterator(ret.first), ret.second);
588   }
589   size_t erase(const key_type &data) { return remove(data); }
590
591   iterator lower_bound(const key_type &data) const {
592     return iterator(sl_->lower_bound(data));
593   }
594
595   size_t height() const { return sl_->height(); }
596
597   // first() returns pointer to the first element in the skiplist, or
598   // nullptr if empty.
599   //
600   // last() returns the pointer to the last element in the skiplist,
601   // nullptr if list is empty.
602   //
603   // Note: As concurrent writing can happen, first() is not
604   //   guaranteed to be the min_element() in the list. Similarly
605   //   last() is not guaranteed to be the max_element(), and both of them can
606   //   be invalid (i.e. nullptr), so we name them differently from front() and
607   //   tail() here.
608   const key_type *first() const { return sl_->first(); }
609   const key_type *last() const { return sl_->last(); }
610
611   // Try to remove the last element in the skip list.
612   //
613   // Returns true if we removed it, false if either the list is empty
614   // or a race condition happened (i.e. the used-to-be last element
615   // was already removed by another thread).
616   bool pop_back() {
617     auto last = sl_->last();
618     return last ? sl_->remove(*last) : false;
619   }
620
621   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
622     auto ret = sl_->addOrGetData(data);
623     return std::make_pair(&ret.first->data(), ret.second);
624   }
625
626   SkipListType* skiplist() const { return sl_; }
627
628   // legacy interfaces
629   // TODO:(xliu) remove these.
630   // Returns true if the node is added successfully, false if not, i.e. the
631   // node with the same key already existed in the list.
632   bool contains(const key_type &data) const { return sl_->find(data); }
633   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
634   bool remove(const key_type &data) { return sl_->remove(data); }
635
636  private:
637   SkipListType *sl_;
638   std::shared_ptr<SkipListType> slHolder_;
639 };
640
641 // implements forward iterator concept.
642 template<typename ValT, typename NodeT>
643 class detail::csl_iterator :
644   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
645     ValT, boost::forward_traversal_tag> {
646  public:
647   typedef ValT value_type;
648   typedef value_type& reference;
649   typedef value_type* pointer;
650   typedef ptrdiff_t difference_type;
651
652   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
653
654   template<typename OtherVal, typename OtherNode>
655   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
656       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
657       = 0) : node_(other.node_) {}
658
659   size_t nodeSize() const {
660     return node_ == nullptr ? 0 :
661       node_->height() * sizeof(NodeT*) + sizeof(*this);
662   }
663
664   bool good() const { return node_ != nullptr; }
665
666  private:
667   friend class boost::iterator_core_access;
668   template<class,class> friend class csl_iterator;
669
670   void increment() { node_ = node_->next(); };
671   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
672   value_type& dereference() const { return node_->data(); }
673
674   NodeT* node_;
675 };
676
677 // Skipper interface
678 template<typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
679 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Skipper {
680   typedef detail::SkipListNode<T> NodeType;
681   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
682   typedef typename SkipListType::Accessor Accessor;
683
684  public:
685   typedef T  value_type;
686   typedef T& reference;
687   typedef T* pointer;
688   typedef ptrdiff_t difference_type;
689
690   Skipper(const std::shared_ptr<SkipListType>& skipList) :
691     accessor_(skipList) {
692     init();
693   }
694
695   Skipper(const Accessor& accessor) : accessor_(accessor) {
696     init();
697   }
698
699   void init() {
700     // need to cache the head node
701     NodeType* head_node = head();
702     headHeight_ = head_node->height();
703     for (int i = 0; i < headHeight_; ++i) {
704       preds_[i] = head_node;
705       succs_[i] = head_node->skip(i);
706     }
707     int max_layer = maxLayer();
708     for (int i = 0; i < max_layer; ++i) {
709       hints_[i] = i + 1;
710     }
711     hints_[max_layer] = max_layer;
712   }
713
714   // advance to the next node in the list.
715   Skipper& operator ++() {
716     preds_[0] = succs_[0];
717     succs_[0] = preds_[0]->skip(0);
718     int height = curHeight();
719     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
720       preds_[i] = succs_[i];
721       succs_[i] = preds_[i]->skip(i);
722     }
723     return *this;
724   }
725
726   bool good() const { return succs_[0] != nullptr; }
727
728   int maxLayer() const { return headHeight_ - 1; }
729
730   int curHeight() const {
731     // need to cap the height to the cached head height, as the current node
732     // might be some newly inserted node and also during the time period the
733     // head height may have grown.
734     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
735   }
736
737   const value_type &data() const {
738     DCHECK(succs_[0] != nullptr);
739     return succs_[0]->data();
740   }
741
742   value_type &operator *() const {
743     DCHECK(succs_[0] != nullptr);
744     return succs_[0]->data();
745   }
746
747   value_type *operator->() {
748     DCHECK(succs_[0] != nullptr);
749     return &succs_[0]->data();
750   }
751
752   /*
753    * Skip to the position whose data is no less than the parameter.
754    * (I.e. the lower_bound).
755    *
756    * Returns true if the data is found, false otherwise.
757    */
758   bool to(const value_type &data) {
759     int layer = curHeight() - 1;
760     if (layer < 0) return false;   // reaches the end of the list
761
762     int lyr = hints_[layer];
763     int max_layer = maxLayer();
764     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
765       ++lyr;
766     }
767     hints_[layer] = lyr;  // update the hint
768
769     int foundLayer = SkipListType::
770       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
771     if (foundLayer < 0) return false;
772
773     DCHECK(succs_[0] != nullptr) << "lyr=" << lyr
774                                  << "; max_layer=" << max_layer;
775     return !succs_[0]->markedForRemoval();
776   }
777
778  private:
779   NodeType* head() const {
780     return accessor_.skiplist()->head_.load(std::memory_order_consume);
781   }
782
783   Accessor accessor_;
784   int headHeight_;
785   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
786   uint8_t hints_[MAX_HEIGHT];
787 };
788
789 } // namespace folly
790
791 #endif  // FOLLY_CONCURRENT_SKIP_LIST_H_