Codemod: use #include angle brackets in folly and thrift
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #ifndef FOLLY_CONCURRENT_SKIP_LIST_H_
121 #define FOLLY_CONCURRENT_SKIP_LIST_H_
122
123 #include <algorithm>
124 #include <atomic>
125 #include <limits>
126 #include <memory>
127 #include <type_traits>
128 #include <boost/iterator/iterator_facade.hpp>
129 #include <glog/logging.h>
130
131 #include <folly/ConcurrentSkipList-inl.h>
132 #include <folly/Likely.h>
133 #include <folly/Memory.h>
134 #include <folly/SmallLocks.h>
135
136 namespace folly {
137
138 template<typename T,
139          typename Comp = std::less<T>,
140          // All nodes are allocated using provided SimpleAllocator,
141          // it should be thread-safe.
142          typename NodeAlloc = SysAlloc,
143          int MAX_HEIGHT = 24>
144 class ConcurrentSkipList {
145   // MAX_HEIGHT needs to be at least 2 to suppress compiler
146   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
147   // being treated as a scalar in the compiler).
148   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
149       "MAX_HEIGHT can only be in the range of [2, 64)");
150   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
151   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
152
153  public:
154   typedef detail::SkipListNode<T> NodeType;
155   typedef T value_type;
156   typedef T key_type;
157
158   typedef detail::csl_iterator<value_type, NodeType> iterator;
159   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
160
161   class Accessor;
162   class Skipper;
163
164   explicit ConcurrentSkipList(int height, const NodeAlloc& alloc = NodeAlloc())
165     : recycler_(alloc),
166       head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
167       size_(0) { }
168
169   // Convenient function to get an Accessor to a new instance.
170   static Accessor create(int height = 1, const NodeAlloc& alloc = NodeAlloc()) {
171     return Accessor(createInstance(height, alloc));
172   }
173
174   // Create a shared_ptr skiplist object with initial head height.
175   static std::shared_ptr<SkipListType> createInstance(
176       int height = 1, const NodeAlloc& alloc = NodeAlloc()) {
177     return std::make_shared<ConcurrentSkipList>(height, alloc);
178   }
179
180   //===================================================================
181   // Below are implementation details.
182   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
183   //===================================================================
184
185   ~ConcurrentSkipList() {
186     /* static */ if (NodeType::template destroyIsNoOp<NodeAlloc>()) {
187       // Avoid traversing the list if using arena allocator.
188       return;
189     }
190     for (NodeType* current = head_.load(std::memory_order_relaxed); current; ) {
191       NodeType* tmp = current->skip(0);
192       NodeType::destroy(recycler_.alloc(), current);
193       current = tmp;
194     }
195   }
196
197  private:
198   static bool greater(const value_type &data, const NodeType *node) {
199     return node && Comp()(node->data(), data);
200   }
201
202   static bool less(const value_type &data, const NodeType *node) {
203     return (node == nullptr) || Comp()(data, node->data());
204   }
205
206   static int findInsertionPoint(NodeType *cur, int cur_layer,
207       const value_type &data,
208       NodeType *preds[], NodeType *succs[]) {
209     int foundLayer = -1;
210     NodeType *pred = cur;
211     NodeType *foundNode = nullptr;
212     for (int layer = cur_layer; layer >= 0; --layer) {
213       NodeType *node = pred->skip(layer);
214       while (greater(data, node)) {
215         pred = node;
216         node = node->skip(layer);
217       }
218       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
219         foundLayer = layer;
220         foundNode = node;
221       }
222       preds[layer] = pred;
223
224       // if found, succs[0..foundLayer] need to point to the cached foundNode,
225       // as foundNode might be deleted at the same time thus pred->skip() can
226       // return NULL or another node.
227       succs[layer] = foundNode ? foundNode : node;
228     }
229     return foundLayer;
230   }
231
232   size_t size() const { return size_.load(std::memory_order_relaxed); }
233
234   int height() const {
235     return head_.load(std::memory_order_consume)->height();
236   }
237
238   int maxLayer() const { return height() - 1; }
239
240   size_t incrementSize(int delta) {
241     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
242   }
243
244   // Returns the node if found, nullptr otherwise.
245   NodeType* find(const value_type &data) {
246     auto ret = findNode(data);
247     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
248     return nullptr;
249   }
250
251   // lock all the necessary nodes for changing (adding or removing) the list.
252   // returns true if all the lock acquried successfully and the related nodes
253   // are all validate (not in certain pending states), false otherwise.
254   bool lockNodesForChange(int nodeHeight,
255       ScopedLocker guards[MAX_HEIGHT],
256       NodeType *preds[MAX_HEIGHT],
257       NodeType *succs[MAX_HEIGHT],
258       bool adding=true) {
259     NodeType *pred, *succ, *prevPred = nullptr;
260     bool valid = true;
261     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
262       pred = preds[layer];
263       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
264         << " nodeheight=" << nodeHeight;
265       succ = succs[layer];
266       if (pred != prevPred) {
267         guards[layer] = pred->acquireGuard();
268         prevPred = pred;
269       }
270       valid = !pred->markedForRemoval() &&
271         pred->skip(layer) == succ;  // check again after locking
272
273       if (adding) {  // when adding a node, the succ shouldn't be going away
274         valid = valid && (succ == nullptr || !succ->markedForRemoval());
275       }
276     }
277
278     return valid;
279   }
280
281   // Returns a paired value:
282   //   pair.first always stores the pointer to the node with the same input key.
283   //     It could be either the newly added data, or the existed data in the
284   //     list with the same key.
285   //   pair.second stores whether the data is added successfully:
286   //     0 means not added, otherwise reutrns the new size.
287   template<typename U>
288   std::pair<NodeType*, size_t> addOrGetData(U &&data) {
289     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
290     NodeType *newNode;
291     size_t newSize;
292     while (true) {
293       int max_layer = 0;
294       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
295
296       if (layer >= 0) {
297         NodeType *nodeFound = succs[layer];
298         DCHECK(nodeFound != nullptr);
299         if (nodeFound->markedForRemoval()) {
300           continue;  // if it's getting deleted retry finding node.
301         }
302         // wait until fully linked.
303         while (UNLIKELY(!nodeFound->fullyLinked())) {}
304         return std::make_pair(nodeFound, 0);
305       }
306
307       // need to capped at the original height -- the real height may have grown
308       int nodeHeight = detail::SkipListRandomHeight::instance()->
309         getHeight(max_layer + 1);
310
311       ScopedLocker guards[MAX_HEIGHT];
312       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
313         continue; // give up the locks and retry until all valid
314       }
315
316       // locks acquired and all valid, need to modify the links under the locks.
317       newNode =
318         NodeType::create(recycler_.alloc(), nodeHeight, std::forward<U>(data));
319       for (int layer = 0; layer < nodeHeight; ++layer) {
320         newNode->setSkip(layer, succs[layer]);
321         preds[layer]->setSkip(layer, newNode);
322       }
323
324       newNode->setFullyLinked();
325       newSize = incrementSize(1);
326       break;
327     }
328
329     int hgt = height();
330     size_t sizeLimit =
331       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
332
333     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
334       growHeight(hgt + 1);
335     }
336     CHECK_GT(newSize, 0);
337     return std::make_pair(newNode, newSize);
338   }
339
340   bool remove(const value_type &data) {
341     NodeType *nodeToDelete = nullptr;
342     ScopedLocker nodeGuard;
343     bool isMarked = false;
344     int nodeHeight = 0;
345     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
346
347     while (true) {
348       int max_layer = 0;
349       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
350       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
351         return false;
352       }
353
354       if (!isMarked) {
355         nodeToDelete = succs[layer];
356         nodeHeight = nodeToDelete->height();
357         nodeGuard = nodeToDelete->acquireGuard();
358         if (nodeToDelete->markedForRemoval()) return false;
359         nodeToDelete->setMarkedForRemoval();
360         isMarked = true;
361       }
362
363       // acquire pred locks from bottom layer up
364       ScopedLocker guards[MAX_HEIGHT];
365       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
366         continue;  // this will unlock all the locks
367       }
368
369       for (int layer = nodeHeight - 1; layer >= 0; --layer) {
370         preds[layer]->setSkip(layer, nodeToDelete->skip(layer));
371       }
372
373       incrementSize(-1);
374       break;
375     }
376     recycle(nodeToDelete);
377     return true;
378   }
379
380   const value_type *first() const {
381     auto node = head_.load(std::memory_order_consume)->skip(0);
382     return node ? &node->data() : nullptr;
383   }
384
385   const value_type *last() const {
386     NodeType *pred = head_.load(std::memory_order_consume);
387     NodeType *node = nullptr;
388     for (int layer = maxLayer(); layer >= 0; --layer) {
389       do {
390         node = pred->skip(layer);
391         if (node) pred = node;
392       } while (node != nullptr);
393     }
394     return pred == head_.load(std::memory_order_relaxed)
395       ? nullptr : &pred->data();
396   }
397
398   static bool okToDelete(NodeType *candidate, int layer) {
399     DCHECK(candidate != nullptr);
400     return candidate->fullyLinked() &&
401       candidate->maxLayer() == layer &&
402       !candidate->markedForRemoval();
403   }
404
405   // find node for insertion/deleting
406   int findInsertionPointGetMaxLayer(const value_type &data,
407       NodeType *preds[], NodeType *succs[], int *max_layer) const {
408     *max_layer = maxLayer();
409     return findInsertionPoint(head_.load(std::memory_order_consume),
410       *max_layer, data, preds, succs);
411   }
412
413   // Find node for access. Returns a paired values:
414   // pair.first = the first node that no-less than data value
415   // pair.second = 1 when the data value is founded, or 0 otherwise.
416   // This is like lower_bound, but not exact: we could have the node marked for
417   // removal so still need to check that.
418   std::pair<NodeType*, int> findNode(const value_type &data) const {
419     return findNodeDownRight(data);
420   }
421
422   // Find node by first stepping down then stepping right. Based on benchmark
423   // results, this is slightly faster than findNodeRightDown for better
424   // localality on the skipping pointers.
425   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
426     NodeType *pred = head_.load(std::memory_order_consume);
427     int ht = pred->height();
428     NodeType *node = nullptr;
429
430     bool found = false;
431     while (!found) {
432       // stepping down
433       for (; ht > 0 && less(data, pred->skip(ht - 1)); --ht) {}
434       if (ht == 0) return std::make_pair(pred->skip(0), 0);  // not found
435
436       node = pred->skip(--ht);  // node <= data now
437       // stepping right
438       while (greater(data, node)) {
439         pred = node;
440         node = node->skip(ht);
441       }
442       found = !less(data, node);
443     }
444     return std::make_pair(node, found);
445   }
446
447   // find node by first stepping right then stepping down.
448   // We still keep this for reference purposes.
449   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
450     NodeType *pred = head_.load(std::memory_order_consume);
451     NodeType *node = nullptr;
452     auto top = maxLayer();
453     int found = 0;
454     for (int layer = top; !found && layer >= 0; --layer) {
455       node = pred->skip(layer);
456       while (greater(data, node)) {
457         pred = node;
458         node = node->skip(layer);
459       }
460       found = !less(data, node);
461     }
462     return std::make_pair(node, found);
463   }
464
465   NodeType* lower_bound(const value_type &data) const {
466     auto node = findNode(data).first;
467     while (node != nullptr && node->markedForRemoval()) {
468       node = node->skip(0);
469     }
470     return node;
471   }
472
473   void growHeight(int height) {
474     NodeType* oldHead = head_.load(std::memory_order_consume);
475     if (oldHead->height() >= height) {  // someone else already did this
476       return;
477     }
478
479     NodeType* newHead =
480       NodeType::create(recycler_.alloc(), height, value_type(), true);
481
482     { // need to guard the head node in case others are adding/removing
483       // nodes linked to the head.
484       ScopedLocker g = oldHead->acquireGuard();
485       newHead->copyHead(oldHead);
486       NodeType* expected = oldHead;
487       if (!head_.compare_exchange_strong(expected, newHead,
488           std::memory_order_release)) {
489         // if someone has already done the swap, just return.
490         NodeType::destroy(recycler_.alloc(), newHead);
491         return;
492       }
493       oldHead->setMarkedForRemoval();
494     }
495     recycle(oldHead);
496   }
497
498   void recycle(NodeType *node) {
499     recycler_.add(node);
500   }
501
502   detail::NodeRecycler<NodeType, NodeAlloc> recycler_;
503   std::atomic<NodeType*> head_;
504   std::atomic<size_t> size_;
505 };
506
507 template<typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
508 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Accessor {
509   typedef detail::SkipListNode<T> NodeType;
510   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
511  public:
512   typedef T value_type;
513   typedef T key_type;
514   typedef T& reference;
515   typedef T* pointer;
516   typedef const T& const_reference;
517   typedef const T* const_pointer;
518   typedef size_t size_type;
519   typedef Comp key_compare;
520   typedef Comp value_compare;
521
522   typedef typename SkipListType::iterator iterator;
523   typedef typename SkipListType::const_iterator const_iterator;
524   typedef typename SkipListType::Skipper Skipper;
525
526   explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
527     : slHolder_(std::move(skip_list))
528   {
529     sl_ = slHolder_.get();
530     DCHECK(sl_ != nullptr);
531     sl_->recycler_.addRef();
532   }
533
534   // Unsafe initializer: the caller assumes the responsibility to keep
535   // skip_list valid during the whole life cycle of the Acessor.
536   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
537     DCHECK(sl_ != nullptr);
538     sl_->recycler_.addRef();
539   }
540
541   Accessor(const Accessor &accessor) :
542       sl_(accessor.sl_),
543       slHolder_(accessor.slHolder_) {
544     sl_->recycler_.addRef();
545   }
546
547   Accessor& operator=(const Accessor &accessor) {
548     if (this != &accessor) {
549       slHolder_ = accessor.slHolder_;
550       sl_->recycler_.releaseRef();
551       sl_ = accessor.sl_;
552       sl_->recycler_.addRef();
553     }
554     return *this;
555   }
556
557   ~Accessor() {
558     sl_->recycler_.releaseRef();
559   }
560
561   bool empty() const { return sl_->size() == 0; }
562   size_t size() const { return sl_->size(); }
563   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
564
565   // returns end() if the value is not in the list, otherwise returns an
566   // iterator pointing to the data, and it's guaranteed that the data is valid
567   // as far as the Accessor is hold.
568   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
569   const_iterator find(const key_type &value) const {
570     return iterator(sl_->find(value));
571   }
572   size_type count(const key_type &data) const { return contains(data); }
573
574   iterator begin() const {
575     NodeType* head = sl_->head_.load(std::memory_order_consume);
576     return iterator(head->next());
577   }
578   iterator end() const { return iterator(nullptr); }
579   const_iterator cbegin() const { return begin(); }
580   const_iterator cend() const { return end(); }
581
582   template<typename U,
583     typename=typename std::enable_if<std::is_convertible<U, T>::value>::type>
584   std::pair<iterator, bool> insert(U&& data) {
585     auto ret = sl_->addOrGetData(std::forward<U>(data));
586     return std::make_pair(iterator(ret.first), ret.second);
587   }
588   size_t erase(const key_type &data) { return remove(data); }
589
590   iterator lower_bound(const key_type &data) const {
591     return iterator(sl_->lower_bound(data));
592   }
593
594   size_t height() const { return sl_->height(); }
595
596   // first() returns pointer to the first element in the skiplist, or
597   // nullptr if empty.
598   //
599   // last() returns the pointer to the last element in the skiplist,
600   // nullptr if list is empty.
601   //
602   // Note: As concurrent writing can happen, first() is not
603   //   guaranteed to be the min_element() in the list. Similarly
604   //   last() is not guaranteed to be the max_element(), and both of them can
605   //   be invalid (i.e. nullptr), so we name them differently from front() and
606   //   tail() here.
607   const key_type *first() const { return sl_->first(); }
608   const key_type *last() const { return sl_->last(); }
609
610   // Try to remove the last element in the skip list.
611   //
612   // Returns true if we removed it, false if either the list is empty
613   // or a race condition happened (i.e. the used-to-be last element
614   // was already removed by another thread).
615   bool pop_back() {
616     auto last = sl_->last();
617     return last ? sl_->remove(*last) : false;
618   }
619
620   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
621     auto ret = sl_->addOrGetData(data);
622     return std::make_pair(&ret.first->data(), ret.second);
623   }
624
625   SkipListType* skiplist() const { return sl_; }
626
627   // legacy interfaces
628   // TODO:(xliu) remove these.
629   // Returns true if the node is added successfully, false if not, i.e. the
630   // node with the same key already existed in the list.
631   bool contains(const key_type &data) const { return sl_->find(data); }
632   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
633   bool remove(const key_type &data) { return sl_->remove(data); }
634
635  private:
636   SkipListType *sl_;
637   std::shared_ptr<SkipListType> slHolder_;
638 };
639
640 // implements forward iterator concept.
641 template<typename ValT, typename NodeT>
642 class detail::csl_iterator :
643   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
644     ValT, boost::forward_traversal_tag> {
645  public:
646   typedef ValT value_type;
647   typedef value_type& reference;
648   typedef value_type* pointer;
649   typedef ptrdiff_t difference_type;
650
651   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
652
653   template<typename OtherVal, typename OtherNode>
654   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
655       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
656       = 0) : node_(other.node_) {}
657
658   size_t nodeSize() const {
659     return node_ == nullptr ? 0 :
660       node_->height() * sizeof(NodeT*) + sizeof(*this);
661   }
662
663   bool good() const { return node_ != nullptr; }
664
665  private:
666   friend class boost::iterator_core_access;
667   template<class,class> friend class csl_iterator;
668
669   void increment() { node_ = node_->next(); };
670   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
671   value_type& dereference() const { return node_->data(); }
672
673   NodeT* node_;
674 };
675
676 // Skipper interface
677 template<typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
678 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Skipper {
679   typedef detail::SkipListNode<T> NodeType;
680   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
681   typedef typename SkipListType::Accessor Accessor;
682
683  public:
684   typedef T  value_type;
685   typedef T& reference;
686   typedef T* pointer;
687   typedef ptrdiff_t difference_type;
688
689   Skipper(const std::shared_ptr<SkipListType>& skipList) :
690     accessor_(skipList) {
691     init();
692   }
693
694   Skipper(const Accessor& accessor) : accessor_(accessor) {
695     init();
696   }
697
698   void init() {
699     // need to cache the head node
700     NodeType* head_node = head();
701     headHeight_ = head_node->height();
702     for (int i = 0; i < headHeight_; ++i) {
703       preds_[i] = head_node;
704       succs_[i] = head_node->skip(i);
705     }
706     int max_layer = maxLayer();
707     for (int i = 0; i < max_layer; ++i) {
708       hints_[i] = i + 1;
709     }
710     hints_[max_layer] = max_layer;
711   }
712
713   // advance to the next node in the list.
714   Skipper& operator ++() {
715     preds_[0] = succs_[0];
716     succs_[0] = preds_[0]->skip(0);
717     int height = curHeight();
718     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
719       preds_[i] = succs_[i];
720       succs_[i] = preds_[i]->skip(i);
721     }
722     return *this;
723   }
724
725   bool good() const { return succs_[0] != nullptr; }
726
727   int maxLayer() const { return headHeight_ - 1; }
728
729   int curHeight() const {
730     // need to cap the height to the cached head height, as the current node
731     // might be some newly inserted node and also during the time period the
732     // head height may have grown.
733     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
734   }
735
736   const value_type &data() const {
737     DCHECK(succs_[0] != nullptr);
738     return succs_[0]->data();
739   }
740
741   value_type &operator *() const {
742     DCHECK(succs_[0] != nullptr);
743     return succs_[0]->data();
744   }
745
746   value_type *operator->() {
747     DCHECK(succs_[0] != nullptr);
748     return &succs_[0]->data();
749   }
750
751   /*
752    * Skip to the position whose data is no less than the parameter.
753    * (I.e. the lower_bound).
754    *
755    * Returns true if the data is found, false otherwise.
756    */
757   bool to(const value_type &data) {
758     int layer = curHeight() - 1;
759     if (layer < 0) return false;   // reaches the end of the list
760
761     int lyr = hints_[layer];
762     int max_layer = maxLayer();
763     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
764       ++lyr;
765     }
766     hints_[layer] = lyr;  // update the hint
767
768     int foundLayer = SkipListType::
769       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
770     if (foundLayer < 0) return false;
771
772     DCHECK(succs_[0] != nullptr) << "lyr=" << lyr << "; max_layer=" << max_layer;
773     return !succs_[0]->markedForRemoval();
774   }
775
776  private:
777   NodeType* head() const {
778     return accessor_.skiplist()->head_.load(std::memory_order_consume);
779   }
780
781   Accessor accessor_;
782   int headHeight_;
783   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
784   uint8_t hints_[MAX_HEIGHT];
785 };
786
787 } // namespace folly
788
789 #endif  // FOLLY_CONCURRENT_SKIP_LIST_H_