1a2fdadcd5d6f71fb638aaa52b23de0689fa1548
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #pragma once
121
122 #include <algorithm>
123 #include <atomic>
124 #include <limits>
125 #include <memory>
126 #include <type_traits>
127
128 #include <boost/iterator/iterator_facade.hpp>
129 #include <glog/logging.h>
130
131 #include <folly/ConcurrentSkipList-inl.h>
132 #include <folly/Likely.h>
133 #include <folly/Memory.h>
134 #include <folly/MicroSpinLock.h>
135
136 namespace folly {
137
138 template <
139     typename T,
140     typename Comp = std::less<T>,
141     // All nodes are allocated using provided SimpleAllocator,
142     // it should be thread-safe.
143     typename NodeAlloc = SysAlloc,
144     int MAX_HEIGHT = 24>
145 class ConcurrentSkipList {
146   // MAX_HEIGHT needs to be at least 2 to suppress compiler
147   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
148   // being treated as a scalar in the compiler).
149   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
150       "MAX_HEIGHT can only be in the range of [2, 64)");
151   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
152   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
153
154  public:
155   typedef detail::SkipListNode<T> NodeType;
156   typedef T value_type;
157   typedef T key_type;
158
159   typedef detail::csl_iterator<value_type, NodeType> iterator;
160   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
161
162   class Accessor;
163   class Skipper;
164
165   explicit ConcurrentSkipList(int height, const NodeAlloc& alloc)
166       : recycler_(alloc),
167         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
168         size_(0) {}
169
170   explicit ConcurrentSkipList(int height)
171       : recycler_(),
172         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
173         size_(0) {}
174
175   // Convenient function to get an Accessor to a new instance.
176   static Accessor create(int height, const NodeAlloc& alloc) {
177     return Accessor(createInstance(height, alloc));
178   }
179
180   static Accessor create(int height = 1) {
181     return Accessor(createInstance(height));
182   }
183
184   // Create a shared_ptr skiplist object with initial head height.
185   static std::shared_ptr<SkipListType> createInstance(int height,
186                                                       const NodeAlloc& alloc) {
187     return std::make_shared<ConcurrentSkipList>(height, alloc);
188   }
189
190   static std::shared_ptr<SkipListType> createInstance(int height = 1) {
191     return std::make_shared<ConcurrentSkipList>(height);
192   }
193
194   //===================================================================
195   // Below are implementation details.
196   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
197   //===================================================================
198
199   ~ConcurrentSkipList() {
200     /* static */ if (NodeType::template DestroyIsNoOp<NodeAlloc>::value) {
201       // Avoid traversing the list if using arena allocator.
202       return;
203     }
204     for (NodeType* current = head_.load(std::memory_order_relaxed); current; ) {
205       NodeType* tmp = current->skip(0);
206       NodeType::destroy(recycler_.alloc(), current);
207       current = tmp;
208     }
209   }
210
211  private:
212   static bool greater(const value_type &data, const NodeType *node) {
213     return node && Comp()(node->data(), data);
214   }
215
216   static bool less(const value_type &data, const NodeType *node) {
217     return (node == nullptr) || Comp()(data, node->data());
218   }
219
220   static int findInsertionPoint(NodeType *cur, int cur_layer,
221       const value_type &data,
222       NodeType *preds[], NodeType *succs[]) {
223     int foundLayer = -1;
224     NodeType *pred = cur;
225     NodeType *foundNode = nullptr;
226     for (int layer = cur_layer; layer >= 0; --layer) {
227       NodeType *node = pred->skip(layer);
228       while (greater(data, node)) {
229         pred = node;
230         node = node->skip(layer);
231       }
232       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
233         foundLayer = layer;
234         foundNode = node;
235       }
236       preds[layer] = pred;
237
238       // if found, succs[0..foundLayer] need to point to the cached foundNode,
239       // as foundNode might be deleted at the same time thus pred->skip() can
240       // return nullptr or another node.
241       succs[layer] = foundNode ? foundNode : node;
242     }
243     return foundLayer;
244   }
245
246   size_t size() const { return size_.load(std::memory_order_relaxed); }
247
248   int height() const {
249     return head_.load(std::memory_order_consume)->height();
250   }
251
252   int maxLayer() const { return height() - 1; }
253
254   size_t incrementSize(int delta) {
255     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
256   }
257
258   // Returns the node if found, nullptr otherwise.
259   NodeType* find(const value_type &data) {
260     auto ret = findNode(data);
261     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
262     return nullptr;
263   }
264
265   // lock all the necessary nodes for changing (adding or removing) the list.
266   // returns true if all the lock acquried successfully and the related nodes
267   // are all validate (not in certain pending states), false otherwise.
268   bool lockNodesForChange(int nodeHeight,
269       ScopedLocker guards[MAX_HEIGHT],
270       NodeType *preds[MAX_HEIGHT],
271       NodeType *succs[MAX_HEIGHT],
272       bool adding=true) {
273     NodeType *pred, *succ, *prevPred = nullptr;
274     bool valid = true;
275     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
276       pred = preds[layer];
277       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
278         << " nodeheight=" << nodeHeight;
279       succ = succs[layer];
280       if (pred != prevPred) {
281         guards[layer] = pred->acquireGuard();
282         prevPred = pred;
283       }
284       valid = !pred->markedForRemoval() &&
285         pred->skip(layer) == succ;  // check again after locking
286
287       if (adding) {  // when adding a node, the succ shouldn't be going away
288         valid = valid && (succ == nullptr || !succ->markedForRemoval());
289       }
290     }
291
292     return valid;
293   }
294
295   // Returns a paired value:
296   //   pair.first always stores the pointer to the node with the same input key.
297   //     It could be either the newly added data, or the existed data in the
298   //     list with the same key.
299   //   pair.second stores whether the data is added successfully:
300   //     0 means not added, otherwise reutrns the new size.
301   template <typename U>
302   std::pair<NodeType*, size_t> addOrGetData(U &&data) {
303     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
304     NodeType *newNode;
305     size_t newSize;
306     while (true) {
307       int max_layer = 0;
308       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
309
310       if (layer >= 0) {
311         NodeType *nodeFound = succs[layer];
312         DCHECK(nodeFound != nullptr);
313         if (nodeFound->markedForRemoval()) {
314           continue;  // if it's getting deleted retry finding node.
315         }
316         // wait until fully linked.
317         while (UNLIKELY(!nodeFound->fullyLinked())) {}
318         return std::make_pair(nodeFound, 0);
319       }
320
321       // need to capped at the original height -- the real height may have grown
322       int nodeHeight = detail::SkipListRandomHeight::instance()->
323         getHeight(max_layer + 1);
324
325       ScopedLocker guards[MAX_HEIGHT];
326       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
327         continue; // give up the locks and retry until all valid
328       }
329
330       // locks acquired and all valid, need to modify the links under the locks.
331       newNode =
332         NodeType::create(recycler_.alloc(), nodeHeight, std::forward<U>(data));
333       for (int k = 0; k < nodeHeight; ++k) {
334         newNode->setSkip(k, succs[k]);
335         preds[k]->setSkip(k, newNode);
336       }
337
338       newNode->setFullyLinked();
339       newSize = incrementSize(1);
340       break;
341     }
342
343     int hgt = height();
344     size_t sizeLimit =
345       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
346
347     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
348       growHeight(hgt + 1);
349     }
350     CHECK_GT(newSize, 0);
351     return std::make_pair(newNode, newSize);
352   }
353
354   bool remove(const value_type &data) {
355     NodeType *nodeToDelete = nullptr;
356     ScopedLocker nodeGuard;
357     bool isMarked = false;
358     int nodeHeight = 0;
359     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
360
361     while (true) {
362       int max_layer = 0;
363       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
364       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
365         return false;
366       }
367
368       if (!isMarked) {
369         nodeToDelete = succs[layer];
370         nodeHeight = nodeToDelete->height();
371         nodeGuard = nodeToDelete->acquireGuard();
372         if (nodeToDelete->markedForRemoval()) return false;
373         nodeToDelete->setMarkedForRemoval();
374         isMarked = true;
375       }
376
377       // acquire pred locks from bottom layer up
378       ScopedLocker guards[MAX_HEIGHT];
379       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
380         continue;  // this will unlock all the locks
381       }
382
383       for (int k = nodeHeight - 1; k >= 0; --k) {
384         preds[k]->setSkip(k, nodeToDelete->skip(k));
385       }
386
387       incrementSize(-1);
388       break;
389     }
390     recycle(nodeToDelete);
391     return true;
392   }
393
394   const value_type *first() const {
395     auto node = head_.load(std::memory_order_consume)->skip(0);
396     return node ? &node->data() : nullptr;
397   }
398
399   const value_type *last() const {
400     NodeType *pred = head_.load(std::memory_order_consume);
401     NodeType *node = nullptr;
402     for (int layer = maxLayer(); layer >= 0; --layer) {
403       do {
404         node = pred->skip(layer);
405         if (node) pred = node;
406       } while (node != nullptr);
407     }
408     return pred == head_.load(std::memory_order_relaxed)
409       ? nullptr : &pred->data();
410   }
411
412   static bool okToDelete(NodeType *candidate, int layer) {
413     DCHECK(candidate != nullptr);
414     return candidate->fullyLinked() &&
415       candidate->maxLayer() == layer &&
416       !candidate->markedForRemoval();
417   }
418
419   // find node for insertion/deleting
420   int findInsertionPointGetMaxLayer(const value_type &data,
421       NodeType *preds[], NodeType *succs[], int *max_layer) const {
422     *max_layer = maxLayer();
423     return findInsertionPoint(head_.load(std::memory_order_consume),
424       *max_layer, data, preds, succs);
425   }
426
427   // Find node for access. Returns a paired values:
428   // pair.first = the first node that no-less than data value
429   // pair.second = 1 when the data value is founded, or 0 otherwise.
430   // This is like lower_bound, but not exact: we could have the node marked for
431   // removal so still need to check that.
432   std::pair<NodeType*, int> findNode(const value_type &data) const {
433     return findNodeDownRight(data);
434   }
435
436   // Find node by first stepping down then stepping right. Based on benchmark
437   // results, this is slightly faster than findNodeRightDown for better
438   // localality on the skipping pointers.
439   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
440     NodeType *pred = head_.load(std::memory_order_consume);
441     int ht = pred->height();
442     NodeType *node = nullptr;
443
444     bool found = false;
445     while (!found) {
446       // stepping down
447       for (; ht > 0 && less(data, node = pred->skip(ht - 1)); --ht) {}
448       if (ht == 0) return std::make_pair(node, 0);  // not found
449       // node <= data now, but we need to fix up ht
450       --ht;
451
452       // stepping right
453       while (greater(data, node)) {
454         pred = node;
455         node = node->skip(ht);
456       }
457       found = !less(data, node);
458     }
459     return std::make_pair(node, found);
460   }
461
462   // find node by first stepping right then stepping down.
463   // We still keep this for reference purposes.
464   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
465     NodeType *pred = head_.load(std::memory_order_consume);
466     NodeType *node = nullptr;
467     auto top = maxLayer();
468     int found = 0;
469     for (int layer = top; !found && layer >= 0; --layer) {
470       node = pred->skip(layer);
471       while (greater(data, node)) {
472         pred = node;
473         node = node->skip(layer);
474       }
475       found = !less(data, node);
476     }
477     return std::make_pair(node, found);
478   }
479
480   NodeType* lower_bound(const value_type &data) const {
481     auto node = findNode(data).first;
482     while (node != nullptr && node->markedForRemoval()) {
483       node = node->skip(0);
484     }
485     return node;
486   }
487
488   void growHeight(int height) {
489     NodeType* oldHead = head_.load(std::memory_order_consume);
490     if (oldHead->height() >= height) {  // someone else already did this
491       return;
492     }
493
494     NodeType* newHead =
495       NodeType::create(recycler_.alloc(), height, value_type(), true);
496
497     { // need to guard the head node in case others are adding/removing
498       // nodes linked to the head.
499       ScopedLocker g = oldHead->acquireGuard();
500       newHead->copyHead(oldHead);
501       NodeType* expected = oldHead;
502       if (!head_.compare_exchange_strong(expected, newHead,
503           std::memory_order_release)) {
504         // if someone has already done the swap, just return.
505         NodeType::destroy(recycler_.alloc(), newHead);
506         return;
507       }
508       oldHead->setMarkedForRemoval();
509     }
510     recycle(oldHead);
511   }
512
513   void recycle(NodeType *node) {
514     recycler_.add(node);
515   }
516
517   detail::NodeRecycler<NodeType, NodeAlloc> recycler_;
518   std::atomic<NodeType*> head_;
519   std::atomic<size_t> size_;
520 };
521
522 template <typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
523 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Accessor {
524   typedef detail::SkipListNode<T> NodeType;
525   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
526  public:
527   typedef T value_type;
528   typedef T key_type;
529   typedef T& reference;
530   typedef T* pointer;
531   typedef const T& const_reference;
532   typedef const T* const_pointer;
533   typedef size_t size_type;
534   typedef Comp key_compare;
535   typedef Comp value_compare;
536
537   typedef typename SkipListType::iterator iterator;
538   typedef typename SkipListType::const_iterator const_iterator;
539   typedef typename SkipListType::Skipper Skipper;
540
541   explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
542     : slHolder_(std::move(skip_list))
543   {
544     sl_ = slHolder_.get();
545     DCHECK(sl_ != nullptr);
546     sl_->recycler_.addRef();
547   }
548
549   // Unsafe initializer: the caller assumes the responsibility to keep
550   // skip_list valid during the whole life cycle of the Acessor.
551   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
552     DCHECK(sl_ != nullptr);
553     sl_->recycler_.addRef();
554   }
555
556   Accessor(const Accessor &accessor) :
557       sl_(accessor.sl_),
558       slHolder_(accessor.slHolder_) {
559     sl_->recycler_.addRef();
560   }
561
562   Accessor& operator=(const Accessor &accessor) {
563     if (this != &accessor) {
564       slHolder_ = accessor.slHolder_;
565       sl_->recycler_.releaseRef();
566       sl_ = accessor.sl_;
567       sl_->recycler_.addRef();
568     }
569     return *this;
570   }
571
572   ~Accessor() {
573     sl_->recycler_.releaseRef();
574   }
575
576   bool empty() const { return sl_->size() == 0; }
577   size_t size() const { return sl_->size(); }
578   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
579
580   // returns end() if the value is not in the list, otherwise returns an
581   // iterator pointing to the data, and it's guaranteed that the data is valid
582   // as far as the Accessor is hold.
583   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
584   const_iterator find(const key_type &value) const {
585     return iterator(sl_->find(value));
586   }
587   size_type count(const key_type &data) const { return contains(data); }
588
589   iterator begin() const {
590     NodeType* head = sl_->head_.load(std::memory_order_consume);
591     return iterator(head->next());
592   }
593   iterator end() const { return iterator(nullptr); }
594   const_iterator cbegin() const { return begin(); }
595   const_iterator cend() const { return end(); }
596
597   template <
598       typename U,
599       typename =
600           typename std::enable_if<std::is_convertible<U, T>::value>::type>
601   std::pair<iterator, bool> insert(U&& data) {
602     auto ret = sl_->addOrGetData(std::forward<U>(data));
603     return std::make_pair(iterator(ret.first), ret.second);
604   }
605   size_t erase(const key_type &data) { return remove(data); }
606
607   iterator lower_bound(const key_type &data) const {
608     return iterator(sl_->lower_bound(data));
609   }
610
611   size_t height() const { return sl_->height(); }
612
613   // first() returns pointer to the first element in the skiplist, or
614   // nullptr if empty.
615   //
616   // last() returns the pointer to the last element in the skiplist,
617   // nullptr if list is empty.
618   //
619   // Note: As concurrent writing can happen, first() is not
620   //   guaranteed to be the min_element() in the list. Similarly
621   //   last() is not guaranteed to be the max_element(), and both of them can
622   //   be invalid (i.e. nullptr), so we name them differently from front() and
623   //   tail() here.
624   const key_type *first() const { return sl_->first(); }
625   const key_type *last() const { return sl_->last(); }
626
627   // Try to remove the last element in the skip list.
628   //
629   // Returns true if we removed it, false if either the list is empty
630   // or a race condition happened (i.e. the used-to-be last element
631   // was already removed by another thread).
632   bool pop_back() {
633     auto last = sl_->last();
634     return last ? sl_->remove(*last) : false;
635   }
636
637   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
638     auto ret = sl_->addOrGetData(data);
639     return std::make_pair(&ret.first->data(), ret.second);
640   }
641
642   SkipListType* skiplist() const { return sl_; }
643
644   // legacy interfaces
645   // TODO:(xliu) remove these.
646   // Returns true if the node is added successfully, false if not, i.e. the
647   // node with the same key already existed in the list.
648   bool contains(const key_type &data) const { return sl_->find(data); }
649   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
650   bool remove(const key_type &data) { return sl_->remove(data); }
651
652  private:
653   SkipListType *sl_;
654   std::shared_ptr<SkipListType> slHolder_;
655 };
656
657 // implements forward iterator concept.
658 template <typename ValT, typename NodeT>
659 class detail::csl_iterator :
660   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
661     ValT, boost::forward_traversal_tag> {
662  public:
663   typedef ValT value_type;
664   typedef value_type& reference;
665   typedef value_type* pointer;
666   typedef ptrdiff_t difference_type;
667
668   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
669
670   template <typename OtherVal, typename OtherNode>
671   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
672       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
673       = 0) : node_(other.node_) {}
674
675   size_t nodeSize() const {
676     return node_ == nullptr ? 0 :
677       node_->height() * sizeof(NodeT*) + sizeof(*this);
678   }
679
680   bool good() const { return node_ != nullptr; }
681
682  private:
683   friend class boost::iterator_core_access;
684   template <class, class> friend class csl_iterator;
685
686   void increment() { node_ = node_->next(); }
687   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
688   value_type& dereference() const { return node_->data(); }
689
690   NodeT* node_;
691 };
692
693 // Skipper interface
694 template <typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
695 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Skipper {
696   typedef detail::SkipListNode<T> NodeType;
697   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
698   typedef typename SkipListType::Accessor Accessor;
699
700  public:
701   typedef T  value_type;
702   typedef T& reference;
703   typedef T* pointer;
704   typedef ptrdiff_t difference_type;
705
706   Skipper(const std::shared_ptr<SkipListType>& skipList) :
707     accessor_(skipList) {
708     init();
709   }
710
711   Skipper(const Accessor& accessor) : accessor_(accessor) {
712     init();
713   }
714
715   void init() {
716     // need to cache the head node
717     NodeType* head_node = head();
718     headHeight_ = head_node->height();
719     for (int i = 0; i < headHeight_; ++i) {
720       preds_[i] = head_node;
721       succs_[i] = head_node->skip(i);
722     }
723     int max_layer = maxLayer();
724     for (int i = 0; i < max_layer; ++i) {
725       hints_[i] = uint8_t(i + 1);
726     }
727     hints_[max_layer] = max_layer;
728   }
729
730   // advance to the next node in the list.
731   Skipper& operator ++() {
732     preds_[0] = succs_[0];
733     succs_[0] = preds_[0]->skip(0);
734     int height = curHeight();
735     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
736       preds_[i] = succs_[i];
737       succs_[i] = preds_[i]->skip(i);
738     }
739     return *this;
740   }
741
742   bool good() const { return succs_[0] != nullptr; }
743
744   int maxLayer() const { return headHeight_ - 1; }
745
746   int curHeight() const {
747     // need to cap the height to the cached head height, as the current node
748     // might be some newly inserted node and also during the time period the
749     // head height may have grown.
750     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
751   }
752
753   const value_type &data() const {
754     DCHECK(succs_[0] != nullptr);
755     return succs_[0]->data();
756   }
757
758   value_type &operator *() const {
759     DCHECK(succs_[0] != nullptr);
760     return succs_[0]->data();
761   }
762
763   value_type *operator->() {
764     DCHECK(succs_[0] != nullptr);
765     return &succs_[0]->data();
766   }
767
768   /*
769    * Skip to the position whose data is no less than the parameter.
770    * (I.e. the lower_bound).
771    *
772    * Returns true if the data is found, false otherwise.
773    */
774   bool to(const value_type &data) {
775     int layer = curHeight() - 1;
776     if (layer < 0) return false;   // reaches the end of the list
777
778     int lyr = hints_[layer];
779     int max_layer = maxLayer();
780     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
781       ++lyr;
782     }
783     hints_[layer] = lyr;  // update the hint
784
785     int foundLayer = SkipListType::
786       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
787     if (foundLayer < 0) return false;
788
789     DCHECK(succs_[0] != nullptr) << "lyr=" << lyr
790                                  << "; max_layer=" << max_layer;
791     return !succs_[0]->markedForRemoval();
792   }
793
794  private:
795   NodeType* head() const {
796     return accessor_.skiplist()->head_.load(std::memory_order_consume);
797   }
798
799   Accessor accessor_;
800   int headHeight_;
801   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
802   uint8_t hints_[MAX_HEIGHT];
803 };
804
805 } // namespace folly