Sort #include lines
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #pragma once
121
122 #include <algorithm>
123 #include <atomic>
124 #include <limits>
125 #include <memory>
126 #include <type_traits>
127
128 #include <boost/iterator/iterator_facade.hpp>
129 #include <glog/logging.h>
130
131 #include <folly/ConcurrentSkipList-inl.h>
132 #include <folly/Likely.h>
133 #include <folly/Memory.h>
134 #include <folly/MicroSpinLock.h>
135
136 namespace folly {
137
138 template<typename T,
139          typename Comp = std::less<T>,
140          // All nodes are allocated using provided SimpleAllocator,
141          // it should be thread-safe.
142          typename NodeAlloc = SysAlloc,
143          int MAX_HEIGHT = 24>
144 class ConcurrentSkipList {
145   // MAX_HEIGHT needs to be at least 2 to suppress compiler
146   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
147   // being treated as a scalar in the compiler).
148   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
149       "MAX_HEIGHT can only be in the range of [2, 64)");
150   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
151   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
152
153  public:
154   typedef detail::SkipListNode<T> NodeType;
155   typedef T value_type;
156   typedef T key_type;
157
158   typedef detail::csl_iterator<value_type, NodeType> iterator;
159   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
160
161   class Accessor;
162   class Skipper;
163
164   explicit ConcurrentSkipList(int height, const NodeAlloc& alloc)
165       : recycler_(alloc),
166         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
167         size_(0) {}
168
169   explicit ConcurrentSkipList(int height)
170       : recycler_(),
171         head_(NodeType::create(recycler_.alloc(), height, value_type(), true)),
172         size_(0) {}
173
174   // Convenient function to get an Accessor to a new instance.
175   static Accessor create(int height, const NodeAlloc& alloc) {
176     return Accessor(createInstance(height, alloc));
177   }
178
179   static Accessor create(int height = 1) {
180     return Accessor(createInstance(height));
181   }
182
183   // Create a shared_ptr skiplist object with initial head height.
184   static std::shared_ptr<SkipListType> createInstance(int height,
185                                                       const NodeAlloc& alloc) {
186     return std::make_shared<ConcurrentSkipList>(height, alloc);
187   }
188
189   static std::shared_ptr<SkipListType> createInstance(int height = 1) {
190     return std::make_shared<ConcurrentSkipList>(height);
191   }
192
193   //===================================================================
194   // Below are implementation details.
195   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
196   //===================================================================
197
198   ~ConcurrentSkipList() {
199     /* static */ if (NodeType::template DestroyIsNoOp<NodeAlloc>::value) {
200       // Avoid traversing the list if using arena allocator.
201       return;
202     }
203     for (NodeType* current = head_.load(std::memory_order_relaxed); current; ) {
204       NodeType* tmp = current->skip(0);
205       NodeType::destroy(recycler_.alloc(), current);
206       current = tmp;
207     }
208   }
209
210  private:
211   static bool greater(const value_type &data, const NodeType *node) {
212     return node && Comp()(node->data(), data);
213   }
214
215   static bool less(const value_type &data, const NodeType *node) {
216     return (node == nullptr) || Comp()(data, node->data());
217   }
218
219   static int findInsertionPoint(NodeType *cur, int cur_layer,
220       const value_type &data,
221       NodeType *preds[], NodeType *succs[]) {
222     int foundLayer = -1;
223     NodeType *pred = cur;
224     NodeType *foundNode = nullptr;
225     for (int layer = cur_layer; layer >= 0; --layer) {
226       NodeType *node = pred->skip(layer);
227       while (greater(data, node)) {
228         pred = node;
229         node = node->skip(layer);
230       }
231       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
232         foundLayer = layer;
233         foundNode = node;
234       }
235       preds[layer] = pred;
236
237       // if found, succs[0..foundLayer] need to point to the cached foundNode,
238       // as foundNode might be deleted at the same time thus pred->skip() can
239       // return nullptr or another node.
240       succs[layer] = foundNode ? foundNode : node;
241     }
242     return foundLayer;
243   }
244
245   size_t size() const { return size_.load(std::memory_order_relaxed); }
246
247   int height() const {
248     return head_.load(std::memory_order_consume)->height();
249   }
250
251   int maxLayer() const { return height() - 1; }
252
253   size_t incrementSize(int delta) {
254     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
255   }
256
257   // Returns the node if found, nullptr otherwise.
258   NodeType* find(const value_type &data) {
259     auto ret = findNode(data);
260     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
261     return nullptr;
262   }
263
264   // lock all the necessary nodes for changing (adding or removing) the list.
265   // returns true if all the lock acquried successfully and the related nodes
266   // are all validate (not in certain pending states), false otherwise.
267   bool lockNodesForChange(int nodeHeight,
268       ScopedLocker guards[MAX_HEIGHT],
269       NodeType *preds[MAX_HEIGHT],
270       NodeType *succs[MAX_HEIGHT],
271       bool adding=true) {
272     NodeType *pred, *succ, *prevPred = nullptr;
273     bool valid = true;
274     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
275       pred = preds[layer];
276       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
277         << " nodeheight=" << nodeHeight;
278       succ = succs[layer];
279       if (pred != prevPred) {
280         guards[layer] = pred->acquireGuard();
281         prevPred = pred;
282       }
283       valid = !pred->markedForRemoval() &&
284         pred->skip(layer) == succ;  // check again after locking
285
286       if (adding) {  // when adding a node, the succ shouldn't be going away
287         valid = valid && (succ == nullptr || !succ->markedForRemoval());
288       }
289     }
290
291     return valid;
292   }
293
294   // Returns a paired value:
295   //   pair.first always stores the pointer to the node with the same input key.
296   //     It could be either the newly added data, or the existed data in the
297   //     list with the same key.
298   //   pair.second stores whether the data is added successfully:
299   //     0 means not added, otherwise reutrns the new size.
300   template<typename U>
301   std::pair<NodeType*, size_t> addOrGetData(U &&data) {
302     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
303     NodeType *newNode;
304     size_t newSize;
305     while (true) {
306       int max_layer = 0;
307       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
308
309       if (layer >= 0) {
310         NodeType *nodeFound = succs[layer];
311         DCHECK(nodeFound != nullptr);
312         if (nodeFound->markedForRemoval()) {
313           continue;  // if it's getting deleted retry finding node.
314         }
315         // wait until fully linked.
316         while (UNLIKELY(!nodeFound->fullyLinked())) {}
317         return std::make_pair(nodeFound, 0);
318       }
319
320       // need to capped at the original height -- the real height may have grown
321       int nodeHeight = detail::SkipListRandomHeight::instance()->
322         getHeight(max_layer + 1);
323
324       ScopedLocker guards[MAX_HEIGHT];
325       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
326         continue; // give up the locks and retry until all valid
327       }
328
329       // locks acquired and all valid, need to modify the links under the locks.
330       newNode =
331         NodeType::create(recycler_.alloc(), nodeHeight, std::forward<U>(data));
332       for (int k = 0; k < nodeHeight; ++k) {
333         newNode->setSkip(k, succs[k]);
334         preds[k]->setSkip(k, newNode);
335       }
336
337       newNode->setFullyLinked();
338       newSize = incrementSize(1);
339       break;
340     }
341
342     int hgt = height();
343     size_t sizeLimit =
344       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
345
346     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
347       growHeight(hgt + 1);
348     }
349     CHECK_GT(newSize, 0);
350     return std::make_pair(newNode, newSize);
351   }
352
353   bool remove(const value_type &data) {
354     NodeType *nodeToDelete = nullptr;
355     ScopedLocker nodeGuard;
356     bool isMarked = false;
357     int nodeHeight = 0;
358     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
359
360     while (true) {
361       int max_layer = 0;
362       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
363       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
364         return false;
365       }
366
367       if (!isMarked) {
368         nodeToDelete = succs[layer];
369         nodeHeight = nodeToDelete->height();
370         nodeGuard = nodeToDelete->acquireGuard();
371         if (nodeToDelete->markedForRemoval()) return false;
372         nodeToDelete->setMarkedForRemoval();
373         isMarked = true;
374       }
375
376       // acquire pred locks from bottom layer up
377       ScopedLocker guards[MAX_HEIGHT];
378       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
379         continue;  // this will unlock all the locks
380       }
381
382       for (int k = nodeHeight - 1; k >= 0; --k) {
383         preds[k]->setSkip(k, nodeToDelete->skip(k));
384       }
385
386       incrementSize(-1);
387       break;
388     }
389     recycle(nodeToDelete);
390     return true;
391   }
392
393   const value_type *first() const {
394     auto node = head_.load(std::memory_order_consume)->skip(0);
395     return node ? &node->data() : nullptr;
396   }
397
398   const value_type *last() const {
399     NodeType *pred = head_.load(std::memory_order_consume);
400     NodeType *node = nullptr;
401     for (int layer = maxLayer(); layer >= 0; --layer) {
402       do {
403         node = pred->skip(layer);
404         if (node) pred = node;
405       } while (node != nullptr);
406     }
407     return pred == head_.load(std::memory_order_relaxed)
408       ? nullptr : &pred->data();
409   }
410
411   static bool okToDelete(NodeType *candidate, int layer) {
412     DCHECK(candidate != nullptr);
413     return candidate->fullyLinked() &&
414       candidate->maxLayer() == layer &&
415       !candidate->markedForRemoval();
416   }
417
418   // find node for insertion/deleting
419   int findInsertionPointGetMaxLayer(const value_type &data,
420       NodeType *preds[], NodeType *succs[], int *max_layer) const {
421     *max_layer = maxLayer();
422     return findInsertionPoint(head_.load(std::memory_order_consume),
423       *max_layer, data, preds, succs);
424   }
425
426   // Find node for access. Returns a paired values:
427   // pair.first = the first node that no-less than data value
428   // pair.second = 1 when the data value is founded, or 0 otherwise.
429   // This is like lower_bound, but not exact: we could have the node marked for
430   // removal so still need to check that.
431   std::pair<NodeType*, int> findNode(const value_type &data) const {
432     return findNodeDownRight(data);
433   }
434
435   // Find node by first stepping down then stepping right. Based on benchmark
436   // results, this is slightly faster than findNodeRightDown for better
437   // localality on the skipping pointers.
438   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
439     NodeType *pred = head_.load(std::memory_order_consume);
440     int ht = pred->height();
441     NodeType *node = nullptr;
442
443     bool found = false;
444     while (!found) {
445       // stepping down
446       for (; ht > 0 && less(data, node = pred->skip(ht - 1)); --ht) {}
447       if (ht == 0) return std::make_pair(node, 0);  // not found
448       // node <= data now, but we need to fix up ht
449       --ht;
450
451       // stepping right
452       while (greater(data, node)) {
453         pred = node;
454         node = node->skip(ht);
455       }
456       found = !less(data, node);
457     }
458     return std::make_pair(node, found);
459   }
460
461   // find node by first stepping right then stepping down.
462   // We still keep this for reference purposes.
463   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
464     NodeType *pred = head_.load(std::memory_order_consume);
465     NodeType *node = nullptr;
466     auto top = maxLayer();
467     int found = 0;
468     for (int layer = top; !found && layer >= 0; --layer) {
469       node = pred->skip(layer);
470       while (greater(data, node)) {
471         pred = node;
472         node = node->skip(layer);
473       }
474       found = !less(data, node);
475     }
476     return std::make_pair(node, found);
477   }
478
479   NodeType* lower_bound(const value_type &data) const {
480     auto node = findNode(data).first;
481     while (node != nullptr && node->markedForRemoval()) {
482       node = node->skip(0);
483     }
484     return node;
485   }
486
487   void growHeight(int height) {
488     NodeType* oldHead = head_.load(std::memory_order_consume);
489     if (oldHead->height() >= height) {  // someone else already did this
490       return;
491     }
492
493     NodeType* newHead =
494       NodeType::create(recycler_.alloc(), height, value_type(), true);
495
496     { // need to guard the head node in case others are adding/removing
497       // nodes linked to the head.
498       ScopedLocker g = oldHead->acquireGuard();
499       newHead->copyHead(oldHead);
500       NodeType* expected = oldHead;
501       if (!head_.compare_exchange_strong(expected, newHead,
502           std::memory_order_release)) {
503         // if someone has already done the swap, just return.
504         NodeType::destroy(recycler_.alloc(), newHead);
505         return;
506       }
507       oldHead->setMarkedForRemoval();
508     }
509     recycle(oldHead);
510   }
511
512   void recycle(NodeType *node) {
513     recycler_.add(node);
514   }
515
516   detail::NodeRecycler<NodeType, NodeAlloc> recycler_;
517   std::atomic<NodeType*> head_;
518   std::atomic<size_t> size_;
519 };
520
521 template<typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
522 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Accessor {
523   typedef detail::SkipListNode<T> NodeType;
524   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
525  public:
526   typedef T value_type;
527   typedef T key_type;
528   typedef T& reference;
529   typedef T* pointer;
530   typedef const T& const_reference;
531   typedef const T* const_pointer;
532   typedef size_t size_type;
533   typedef Comp key_compare;
534   typedef Comp value_compare;
535
536   typedef typename SkipListType::iterator iterator;
537   typedef typename SkipListType::const_iterator const_iterator;
538   typedef typename SkipListType::Skipper Skipper;
539
540   explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
541     : slHolder_(std::move(skip_list))
542   {
543     sl_ = slHolder_.get();
544     DCHECK(sl_ != nullptr);
545     sl_->recycler_.addRef();
546   }
547
548   // Unsafe initializer: the caller assumes the responsibility to keep
549   // skip_list valid during the whole life cycle of the Acessor.
550   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
551     DCHECK(sl_ != nullptr);
552     sl_->recycler_.addRef();
553   }
554
555   Accessor(const Accessor &accessor) :
556       sl_(accessor.sl_),
557       slHolder_(accessor.slHolder_) {
558     sl_->recycler_.addRef();
559   }
560
561   Accessor& operator=(const Accessor &accessor) {
562     if (this != &accessor) {
563       slHolder_ = accessor.slHolder_;
564       sl_->recycler_.releaseRef();
565       sl_ = accessor.sl_;
566       sl_->recycler_.addRef();
567     }
568     return *this;
569   }
570
571   ~Accessor() {
572     sl_->recycler_.releaseRef();
573   }
574
575   bool empty() const { return sl_->size() == 0; }
576   size_t size() const { return sl_->size(); }
577   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
578
579   // returns end() if the value is not in the list, otherwise returns an
580   // iterator pointing to the data, and it's guaranteed that the data is valid
581   // as far as the Accessor is hold.
582   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
583   const_iterator find(const key_type &value) const {
584     return iterator(sl_->find(value));
585   }
586   size_type count(const key_type &data) const { return contains(data); }
587
588   iterator begin() const {
589     NodeType* head = sl_->head_.load(std::memory_order_consume);
590     return iterator(head->next());
591   }
592   iterator end() const { return iterator(nullptr); }
593   const_iterator cbegin() const { return begin(); }
594   const_iterator cend() const { return end(); }
595
596   template<typename U,
597     typename=typename std::enable_if<std::is_convertible<U, T>::value>::type>
598   std::pair<iterator, bool> insert(U&& data) {
599     auto ret = sl_->addOrGetData(std::forward<U>(data));
600     return std::make_pair(iterator(ret.first), ret.second);
601   }
602   size_t erase(const key_type &data) { return remove(data); }
603
604   iterator lower_bound(const key_type &data) const {
605     return iterator(sl_->lower_bound(data));
606   }
607
608   size_t height() const { return sl_->height(); }
609
610   // first() returns pointer to the first element in the skiplist, or
611   // nullptr if empty.
612   //
613   // last() returns the pointer to the last element in the skiplist,
614   // nullptr if list is empty.
615   //
616   // Note: As concurrent writing can happen, first() is not
617   //   guaranteed to be the min_element() in the list. Similarly
618   //   last() is not guaranteed to be the max_element(), and both of them can
619   //   be invalid (i.e. nullptr), so we name them differently from front() and
620   //   tail() here.
621   const key_type *first() const { return sl_->first(); }
622   const key_type *last() const { return sl_->last(); }
623
624   // Try to remove the last element in the skip list.
625   //
626   // Returns true if we removed it, false if either the list is empty
627   // or a race condition happened (i.e. the used-to-be last element
628   // was already removed by another thread).
629   bool pop_back() {
630     auto last = sl_->last();
631     return last ? sl_->remove(*last) : false;
632   }
633
634   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
635     auto ret = sl_->addOrGetData(data);
636     return std::make_pair(&ret.first->data(), ret.second);
637   }
638
639   SkipListType* skiplist() const { return sl_; }
640
641   // legacy interfaces
642   // TODO:(xliu) remove these.
643   // Returns true if the node is added successfully, false if not, i.e. the
644   // node with the same key already existed in the list.
645   bool contains(const key_type &data) const { return sl_->find(data); }
646   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
647   bool remove(const key_type &data) { return sl_->remove(data); }
648
649  private:
650   SkipListType *sl_;
651   std::shared_ptr<SkipListType> slHolder_;
652 };
653
654 // implements forward iterator concept.
655 template<typename ValT, typename NodeT>
656 class detail::csl_iterator :
657   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
658     ValT, boost::forward_traversal_tag> {
659  public:
660   typedef ValT value_type;
661   typedef value_type& reference;
662   typedef value_type* pointer;
663   typedef ptrdiff_t difference_type;
664
665   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
666
667   template<typename OtherVal, typename OtherNode>
668   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
669       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
670       = 0) : node_(other.node_) {}
671
672   size_t nodeSize() const {
673     return node_ == nullptr ? 0 :
674       node_->height() * sizeof(NodeT*) + sizeof(*this);
675   }
676
677   bool good() const { return node_ != nullptr; }
678
679  private:
680   friend class boost::iterator_core_access;
681   template<class,class> friend class csl_iterator;
682
683   void increment() { node_ = node_->next(); }
684   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
685   value_type& dereference() const { return node_->data(); }
686
687   NodeT* node_;
688 };
689
690 // Skipper interface
691 template<typename T, typename Comp, typename NodeAlloc, int MAX_HEIGHT>
692 class ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT>::Skipper {
693   typedef detail::SkipListNode<T> NodeType;
694   typedef ConcurrentSkipList<T, Comp, NodeAlloc, MAX_HEIGHT> SkipListType;
695   typedef typename SkipListType::Accessor Accessor;
696
697  public:
698   typedef T  value_type;
699   typedef T& reference;
700   typedef T* pointer;
701   typedef ptrdiff_t difference_type;
702
703   Skipper(const std::shared_ptr<SkipListType>& skipList) :
704     accessor_(skipList) {
705     init();
706   }
707
708   Skipper(const Accessor& accessor) : accessor_(accessor) {
709     init();
710   }
711
712   void init() {
713     // need to cache the head node
714     NodeType* head_node = head();
715     headHeight_ = head_node->height();
716     for (int i = 0; i < headHeight_; ++i) {
717       preds_[i] = head_node;
718       succs_[i] = head_node->skip(i);
719     }
720     int max_layer = maxLayer();
721     for (int i = 0; i < max_layer; ++i) {
722       hints_[i] = uint8_t(i + 1);
723     }
724     hints_[max_layer] = max_layer;
725   }
726
727   // advance to the next node in the list.
728   Skipper& operator ++() {
729     preds_[0] = succs_[0];
730     succs_[0] = preds_[0]->skip(0);
731     int height = curHeight();
732     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
733       preds_[i] = succs_[i];
734       succs_[i] = preds_[i]->skip(i);
735     }
736     return *this;
737   }
738
739   bool good() const { return succs_[0] != nullptr; }
740
741   int maxLayer() const { return headHeight_ - 1; }
742
743   int curHeight() const {
744     // need to cap the height to the cached head height, as the current node
745     // might be some newly inserted node and also during the time period the
746     // head height may have grown.
747     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
748   }
749
750   const value_type &data() const {
751     DCHECK(succs_[0] != nullptr);
752     return succs_[0]->data();
753   }
754
755   value_type &operator *() const {
756     DCHECK(succs_[0] != nullptr);
757     return succs_[0]->data();
758   }
759
760   value_type *operator->() {
761     DCHECK(succs_[0] != nullptr);
762     return &succs_[0]->data();
763   }
764
765   /*
766    * Skip to the position whose data is no less than the parameter.
767    * (I.e. the lower_bound).
768    *
769    * Returns true if the data is found, false otherwise.
770    */
771   bool to(const value_type &data) {
772     int layer = curHeight() - 1;
773     if (layer < 0) return false;   // reaches the end of the list
774
775     int lyr = hints_[layer];
776     int max_layer = maxLayer();
777     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
778       ++lyr;
779     }
780     hints_[layer] = lyr;  // update the hint
781
782     int foundLayer = SkipListType::
783       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
784     if (foundLayer < 0) return false;
785
786     DCHECK(succs_[0] != nullptr) << "lyr=" << lyr
787                                  << "; max_layer=" << max_layer;
788     return !succs_[0]->markedForRemoval();
789   }
790
791  private:
792   NodeType* head() const {
793     return accessor_.skiplist()->head_.load(std::memory_order_consume);
794   }
795
796   Accessor accessor_;
797   int headHeight_;
798   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
799   uint8_t hints_[MAX_HEIGHT];
800 };
801
802 } // namespace folly