logging: implement FATAL and DFATAL log levels
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #pragma once
121
122 #include <algorithm>
123 #include <atomic>
124 #include <limits>
125 #include <memory>
126 #include <type_traits>
127 #include <boost/iterator/iterator_facade.hpp>
128 #include <glog/logging.h>
129
130 #include <folly/ConcurrentSkipList-inl.h>
131 #include <folly/Likely.h>
132 #include <folly/Memory.h>
133 #include <folly/MicroSpinLock.h>
134
135 namespace folly {
136
137 template<typename T,
138          typename Comp = std::less<T>,
139          // All nodes are allocated using provided SimpleAllocator,
140          // it should be thread-safe.
141          typename NodeAlloc = SysAlloc,
142          int MAX_HEIGHT = 24>
143 class ConcurrentSkipList {
144   // MAX_HEIGHT needs to be at least 2 to suppress compiler
145   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
146   // being treated as a scalar in the compiler).
147   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
148       "MAX_HEIGHT can only be in the range of [2, 64)");
149   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
150   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
151
152  public:
153   typedef detail::SkipListNode<T> NodeType;
154   typedef T value_type;
155   typedef T key_type;
156
157   typedef detail::csl_iterator<value_type, NodeType> iterator;
158   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
159
160   class Accessor;
161   class Skipper;
162
163   explicit ConcurrentSkipList(int height, const NodeAlloc& alloc)
164       : recycler_(alloc),
165         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
166         size_(0) {}
167
168   explicit ConcurrentSkipList(int height)
169       : recycler_(),
170         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
171         size_(0) {}
172
173   // Convenient function to get an Accessor to a new instance.
174   static Accessor create(int height, const NodeAlloc& alloc) {
175     return Accessor(createInstance(height, alloc));
176   }
177
178   static Accessor create(int height = 1) {
179     return Accessor(createInstance(height));
180   }
181
182   // Create a shared_ptr skiplist object with initial head height.
183   static std::shared_ptr<SkipListType> createInstance(int height,
184                                                       const NodeAlloc& alloc) {
185     return std::make_shared<ConcurrentSkipList>(height, alloc);
186   }
187
188   static std::shared_ptr<SkipListType> createInstance(int height = 1) {
189     return std::make_shared<ConcurrentSkipList>(height);
190   }
191
192   //===================================================================
193   // Below are implementation details.
194   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
195   //===================================================================
196
197   ~ConcurrentSkipList() {
198     /* static */ if (NodeType::template DestroyIsNoOp<NodeAlloc>::value) {
199       // Avoid traversing the list if using arena allocator.
200       return;
201     }
202     for (NodeType* current = head_.load(std::memory_order_relaxed); current; ) {
203       NodeType* tmp = current->skip(0);
204       NodeType::destroy(recycler_.alloc(), current);
205       current = tmp;
206     }
207   }
208
209  private:
210   static bool greater(const value_type &data, const NodeType *node) {
211     return node && Comp()(node->data(), data);
212   }
213
214   static bool less(const value_type &data, const NodeType *node) {
215     return (node == nullptr) || Comp()(data, node->data());
216   }
217
218   static int findInsertionPoint(NodeType *cur, int cur_layer,
219       const value_type &data,
220       NodeType *preds[], NodeType *succs[]) {
221     int foundLayer = -1;
222     NodeType *pred = cur;
223     NodeType *foundNode = nullptr;
224     for (int layer = cur_layer; layer >= 0; --layer) {
225       NodeType *node = pred->skip(layer);
226       while (greater(data, node)) {
227         pred = node;
228         node = node->skip(layer);
229       }
230       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
231         foundLayer = layer;
232         foundNode = node;
233       }
234       preds[layer] = pred;
235
236       // if found, succs[0..foundLayer] need to point to the cached foundNode,
237       // as foundNode might be deleted at the same time thus pred->skip() can
238       // return nullptr or another node.
239       succs[layer] = foundNode ? foundNode : node;
240     }
241     return foundLayer;
242   }
243
244   size_t size() const { return size_.load(std::memory_order_relaxed); }
245
246   int height() const {
247     return head_.load(std::memory_order_consume)->height();
248   }
249
250   int maxLayer() const { return height() - 1; }
251
252   size_t incrementSize(int delta) {
253     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
254   }
255
256   // Returns the node if found, nullptr otherwise.
257   NodeType* find(const value_type &data) {
258     auto ret = findNode(data);
259     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
260     return nullptr;
261   }
262
263   // lock all the necessary nodes for changing (adding or removing) the list.
264   // returns true if all the lock acquried successfully and the related nodes
265   // are all validate (not in certain pending states), false otherwise.
266   bool lockNodesForChange(int nodeHeight,
267       ScopedLocker guards[MAX_HEIGHT],
268       NodeType *preds[MAX_HEIGHT],
269       NodeType *succs[MAX_HEIGHT],
270       bool adding=true) {
271     NodeType *pred, *succ, *prevPred = nullptr;
272     bool valid = true;
273     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
274       pred = preds[layer];
275       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
276         << " nodeheight=" << nodeHeight;
277       succ = succs[layer];
278       if (pred != prevPred) {
279         guards[layer] = pred->acquireGuard();
280         prevPred = pred;
281       }
282       valid = !pred->markedForRemoval() &&
283         pred->skip(layer) == succ;  // check again after locking
284
285       if (adding) {  // when adding a node, the succ shouldn't be going away
286         valid = valid && (succ == nullptr || !succ->markedForRemoval());
287       }
288     }
289
290     return valid;
291   }
292
293   // Returns a paired value:
294   //   pair.first always stores the pointer to the node with the same input key.
295   //     It could be either the newly added data, or the existed data in the
296   //     list with the same key.
297   //   pair.second stores whether the data is added successfully:
298   //     0 means not added, otherwise reutrns the new size.
299   template<typename U>
300   std::pair<NodeType*, size_t> addOrGetData(U &&data) {
301     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
302     NodeType *newNode;
303     size_t newSize;
304     while (true) {
305       int max_layer = 0;
306       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
307
308       if (layer >= 0) {
309         NodeType *nodeFound = succs[layer];
310         DCHECK(nodeFound != nullptr);
311         if (nodeFound->markedForRemoval()) {
312           continue;  // if it's getting deleted retry finding node.
313         }
314         // wait until fully linked.
315         while (UNLIKELY(!nodeFound->fullyLinked())) {}
316         return std::make_pair(nodeFound, 0);
317       }
318
319       // need to capped at the original height -- the real height may have grown
320       int nodeHeight = detail::SkipListRandomHeight::instance()->
321         getHeight(max_layer + 1);
322
323       ScopedLocker guards[MAX_HEIGHT];
324       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
325         continue; // give up the locks and retry until all valid
326       }
327
328       // locks acquired and all valid, need to modify the links under the locks.
329       newNode =
330         NodeType::create(recycler_.alloc(), nodeHeight, std::forward<U>(data));
331       for (int k = 0; k < nodeHeight; ++k) {
332         newNode->setSkip(k, succs[k]);
333         preds[k]->setSkip(k, newNode);
334       }
335
336       newNode->setFullyLinked();
337       newSize = incrementSize(1);
338       break;
339     }
340
341     int hgt = height();
342     size_t sizeLimit =
343       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
344
345     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
346       growHeight(hgt + 1);
347     }
348     CHECK_GT(newSize, 0);
349     return std::make_pair(newNode, newSize);
350   }
351
352   bool remove(const value_type &data) {
353     NodeType *nodeToDelete = nullptr;
354     ScopedLocker nodeGuard;
355     bool isMarked = false;
356     int nodeHeight = 0;
357     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
358
359     while (true) {
360       int max_layer = 0;
361       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
362       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
363         return false;
364       }
365
366       if (!isMarked) {
367         nodeToDelete = succs[layer];
368         nodeHeight = nodeToDelete->height();
369         nodeGuard = nodeToDelete->acquireGuard();
370         if (nodeToDelete->markedForRemoval()) return false;
371         nodeToDelete->setMarkedForRemoval();
372         isMarked = true;
373       }
374
375       // acquire pred locks from bottom layer up
376       ScopedLocker guards[MAX_HEIGHT];
377       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
378         continue;  // this will unlock all the locks
379       }
380
381       for (int k = nodeHeight - 1; k >= 0; --k) {
382         preds[k]->setSkip(k, nodeToDelete->skip(k));
383       }
384
385       incrementSize(-1);
386       break;
387     }
388     recycle(nodeToDelete);
389     return true;
390   }
391
392   const value_type *first() const {
393     auto node = head_.load(std::memory_order_consume)->skip(0);
394     return node ? &node->data() : nullptr;
395   }
396
397   const value_type *last() const {
398     NodeType *pred = head_.load(std::memory_order_consume);
399     NodeType *node = nullptr;
400     for (int layer = maxLayer(); layer >= 0; --layer) {
401       do {
402         node = pred->skip(layer);
403         if (node) pred = node;
404       } while (node != nullptr);
405     }
406     return pred == head_.load(std::memory_order_relaxed)
407       ? nullptr : &pred->data();
408   }
409
410   static bool okToDelete(NodeType *candidate, int layer) {
411     DCHECK(candidate != nullptr);
412     return candidate->fullyLinked() &&
413       candidate->maxLayer() == layer &&
414       !candidate->markedForRemoval();
415   }
416
417   // find node for insertion/deleting
418   int findInsertionPointGetMaxLayer(const value_type &data,
419       NodeType *preds[], NodeType *succs[], int *max_layer) const {
420     *max_layer = maxLayer();
421     return findInsertionPoint(head_.load(std::memory_order_consume),
422       *max_layer, data, preds, succs);
423   }
424
425   // Find node for access. Returns a paired values:
426   // pair.first = the first node that no-less than data value
427   // pair.second = 1 when the data value is founded, or 0 otherwise.
428   // This is like lower_bound, but not exact: we could have the node marked for
429   // removal so still need to check that.
430   std::pair<NodeType*, int> findNode(const value_type &data) const {
431     return findNodeDownRight(data);
432   }
433
434   // Find node by first stepping down then stepping right. Based on benchmark
435   // results, this is slightly faster than findNodeRightDown for better
436   // localality on the skipping pointers.
437   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
438     NodeType *pred = head_.load(std::memory_order_consume);
439     int ht = pred->height();
440     NodeType *node = nullptr;
441
442     bool found = false;
443     while (!found) {
444       // stepping down
445       for (; ht > 0 && less(data, node = pred->skip(ht - 1)); --ht) {}
446       if (ht == 0) return std::make_pair(node, 0);  // not found
447       // node <= data now, but we need to fix up ht
448       --ht;
449
450       // stepping right
451       while (greater(data, node)) {
452         pred = node;
453         node = node->skip(ht);
454       }
455       found = !less(data, node);
456     }
457     return std::make_pair(node, found);
458   }
459
460   // find node by first stepping right then stepping down.
461   // We still keep this for reference purposes.
462   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
463     NodeType *pred = head_.load(std::memory_order_consume);
464     NodeType *node = nullptr;
465     auto top = maxLayer();
466     int found = 0;
467     for (int layer = top; !found && layer >= 0; --layer) {
468       node = pred->skip(layer);
469       while (greater(data, node)) {
470         pred = node;
471         node = node->skip(layer);
472       }
473       found = !less(data, node);
474     }
475     return std::make_pair(node, found);
476   }
477
478   NodeType* lower_bound(const value_type &data) const {
479     auto node = findNode(data).first;
480     while (node != nullptr && node->markedForRemoval()) {
481       node = node->skip(0);
482     }
483     return node;
484   }
485
486   void growHeight(int height) {
487     NodeType* oldHead = head_.load(std::memory_order_consume);
488     if (oldHead->height() >= height) {  // someone else already did this
489       return;
490     }
491
492     NodeType* newHead =
493       NodeType::create(recycler_.alloc(), height, value_type(), true);
494
495     { // need to guard the head node in case others are adding/removing
496       // nodes linked to the head.
497       ScopedLocker g = oldHead->acquireGuard();
498       newHead->copyHead(oldHead);
499       NodeType* expected = oldHead;
500       if (!head_.compare_exchange_strong(expected, newHead,
501           std::memory_order_release)) {
502         // if someone has already done the swap, just return.
503         NodeType::destroy(recycler_.alloc(), newHead);
504         return;
505       }
506       oldHead->setMarkedForRemoval();
507     }
508     recycle(oldHead);
509   }
510
511   void recycle(NodeType *node) {
512     recycler_.add(node);
513   }
514
515   detail::NodeRecycler<NodeType, NodeAlloc> recycler_;
516   std::atomic<NodeType*> head_;
517   std::atomic<size_t> size_;
518 };
519
520 template<typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
521 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Accessor {
522   typedef detail::SkipListNode<T> NodeType;
523   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
524  public:
525   typedef T value_type;
526   typedef T key_type;
527   typedef T& reference;
528   typedef T* pointer;
529   typedef const T& const_reference;
530   typedef const T* const_pointer;
531   typedef size_t size_type;
532   typedef Comp key_compare;
533   typedef Comp value_compare;
534
535   typedef typename SkipListType::iterator iterator;
536   typedef typename SkipListType::const_iterator const_iterator;
537   typedef typename SkipListType::Skipper Skipper;
538
539   explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
540     : slHolder_(std::move(skip_list))
541   {
542     sl_ = slHolder_.get();
543     DCHECK(sl_ != nullptr);
544     sl_->recycler_.addRef();
545   }
546
547   // Unsafe initializer: the caller assumes the responsibility to keep
548   // skip_list valid during the whole life cycle of the Acessor.
549   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
550     DCHECK(sl_ != nullptr);
551     sl_->recycler_.addRef();
552   }
553
554   Accessor(const Accessor &accessor) :
555       sl_(accessor.sl_),
556       slHolder_(accessor.slHolder_) {
557     sl_->recycler_.addRef();
558   }
559
560   Accessor& operator=(const Accessor &accessor) {
561     if (this != &accessor) {
562       slHolder_ = accessor.slHolder_;
563       sl_->recycler_.releaseRef();
564       sl_ = accessor.sl_;
565       sl_->recycler_.addRef();
566     }
567     return *this;
568   }
569
570   ~Accessor() {
571     sl_->recycler_.releaseRef();
572   }
573
574   bool empty() const { return sl_->size() == 0; }
575   size_t size() const { return sl_->size(); }
576   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
577
578   // returns end() if the value is not in the list, otherwise returns an
579   // iterator pointing to the data, and it's guaranteed that the data is valid
580   // as far as the Accessor is hold.
581   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
582   const_iterator find(const key_type &value) const {
583     return iterator(sl_->find(value));
584   }
585   size_type count(const key_type &data) const { return contains(data); }
586
587   iterator begin() const {
588     NodeType* head = sl_->head_.load(std::memory_order_consume);
589     return iterator(head->next());
590   }
591   iterator end() const { return iterator(nullptr); }
592   const_iterator cbegin() const { return begin(); }
593   const_iterator cend() const { return end(); }
594
595   template<typename U,
596     typename=typename std::enable_if<std::is_convertible<U, T>::value>::type>
597   std::pair<iterator, bool> insert(U&& data) {
598     auto ret = sl_->addOrGetData(std::forward<U>(data));
599     return std::make_pair(iterator(ret.first), ret.second);
600   }
601   size_t erase(const key_type &data) { return remove(data); }
602
603   iterator lower_bound(const key_type &data) const {
604     return iterator(sl_->lower_bound(data));
605   }
606
607   size_t height() const { return sl_->height(); }
608
609   // first() returns pointer to the first element in the skiplist, or
610   // nullptr if empty.
611   //
612   // last() returns the pointer to the last element in the skiplist,
613   // nullptr if list is empty.
614   //
615   // Note: As concurrent writing can happen, first() is not
616   //   guaranteed to be the min_element() in the list. Similarly
617   //   last() is not guaranteed to be the max_element(), and both of them can
618   //   be invalid (i.e. nullptr), so we name them differently from front() and
619   //   tail() here.
620   const key_type *first() const { return sl_->first(); }
621   const key_type *last() const { return sl_->last(); }
622
623   // Try to remove the last element in the skip list.
624   //
625   // Returns true if we removed it, false if either the list is empty
626   // or a race condition happened (i.e. the used-to-be last element
627   // was already removed by another thread).
628   bool pop_back() {
629     auto last = sl_->last();
630     return last ? sl_->remove(*last) : false;
631   }
632
633   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
634     auto ret = sl_->addOrGetData(data);
635     return std::make_pair(&ret.first->data(), ret.second);
636   }
637
638   SkipListType* skiplist() const { return sl_; }
639
640   // legacy interfaces
641   // TODO:(xliu) remove these.
642   // Returns true if the node is added successfully, false if not, i.e. the
643   // node with the same key already existed in the list.
644   bool contains(const key_type &data) const { return sl_->find(data); }
645   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
646   bool remove(const key_type &data) { return sl_->remove(data); }
647
648  private:
649   SkipListType *sl_;
650   std::shared_ptr<SkipListType> slHolder_;
651 };
652
653 // implements forward iterator concept.
654 template<typename ValT, typename NodeT>
655 class detail::csl_iterator :
656   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
657     ValT, boost::forward_traversal_tag> {
658  public:
659   typedef ValT value_type;
660   typedef value_type& reference;
661   typedef value_type* pointer;
662   typedef ptrdiff_t difference_type;
663
664   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
665
666   template<typename OtherVal, typename OtherNode>
667   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
668       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
669       = 0) : node_(other.node_) {}
670
671   size_t nodeSize() const {
672     return node_ == nullptr ? 0 :
673       node_->height() * sizeof(NodeT*) + sizeof(*this);
674   }
675
676   bool good() const { return node_ != nullptr; }
677
678  private:
679   friend class boost::iterator_core_access;
680   template<class,class> friend class csl_iterator;
681
682   void increment() { node_ = node_->next(); }
683   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
684   value_type& dereference() const { return node_->data(); }
685
686   NodeT* node_;
687 };
688
689 // Skipper interface
690 template<typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
691 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Skipper {
692   typedef detail::SkipListNode<T> NodeType;
693   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
694   typedef typename SkipListType::Accessor Accessor;
695
696  public:
697   typedef T  value_type;
698   typedef T& reference;
699   typedef T* pointer;
700   typedef ptrdiff_t difference_type;
701
702   Skipper(const std::shared_ptr<SkipListType>& skipList) :
703     accessor_(skipList) {
704     init();
705   }
706
707   Skipper(const Accessor& accessor) : accessor_(accessor) {
708     init();
709   }
710
711   void init() {
712     // need to cache the head node
713     NodeType* head_node = head();
714     headHeight_ = head_node->height();
715     for (int i = 0; i < headHeight_; ++i) {
716       preds_[i] = head_node;
717       succs_[i] = head_node->skip(i);
718     }
719     int max_layer = maxLayer();
720     for (int i = 0; i < max_layer; ++i) {
721       hints_[i] = uint8_t(i + 1);
722     }
723     hints_[max_layer] = max_layer;
724   }
725
726   // advance to the next node in the list.
727   Skipper& operator ++() {
728     preds_[0] = succs_[0];
729     succs_[0] = preds_[0]->skip(0);
730     int height = curHeight();
731     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
732       preds_[i] = succs_[i];
733       succs_[i] = preds_[i]->skip(i);
734     }
735     return *this;
736   }
737
738   bool good() const { return succs_[0] != nullptr; }
739
740   int maxLayer() const { return headHeight_ - 1; }
741
742   int curHeight() const {
743     // need to cap the height to the cached head height, as the current node
744     // might be some newly inserted node and also during the time period the
745     // head height may have grown.
746     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
747   }
748
749   const value_type &data() const {
750     DCHECK(succs_[0] != nullptr);
751     return succs_[0]->data();
752   }
753
754   value_type &operator *() const {
755     DCHECK(succs_[0] != nullptr);
756     return succs_[0]->data();
757   }
758
759   value_type *operator->() {
760     DCHECK(succs_[0] != nullptr);
761     return &succs_[0]->data();
762   }
763
764   /*
765    * Skip to the position whose data is no less than the parameter.
766    * (I.e. the lower_bound).
767    *
768    * Returns true if the data is found, false otherwise.
769    */
770   bool to(const value_type &data) {
771     int layer = curHeight() - 1;
772     if (layer < 0) return false;   // reaches the end of the list
773
774     int lyr = hints_[layer];
775     int max_layer = maxLayer();
776     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
777       ++lyr;
778     }
779     hints_[layer] = lyr;  // update the hint
780
781     int foundLayer = SkipListType::
782       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
783     if (foundLayer < 0) return false;
784
785     DCHECK(succs_[0] != nullptr) << "lyr=" << lyr
786                                  << "; max_layer=" << max_layer;
787     return !succs_[0]->markedForRemoval();
788   }
789
790  private:
791   NodeType* head() const {
792     return accessor_.skiplist()->head_.load(std::memory_order_consume);
793   }
794
795   Accessor accessor_;
796   int headHeight_;
797   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
798   uint8_t hints_[MAX_HEIGHT];
799 };
800
801 } // namespace folly