make ConcurrentSkipList ctor public
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #ifndef FOLLY_CONCURRENT_SKIP_LIST_H_
121 #define FOLLY_CONCURRENT_SKIP_LIST_H_
122
123 #include <algorithm>
124 #include <atomic>
125 #include <climits>
126 #include <memory>
127 #include <type_traits>
128 #include <utility>
129 #include <vector>
130 #include <boost/iterator/iterator_facade.hpp>
131 #include <glog/logging.h>
132
133 #include "folly/ConcurrentSkipList-inl.h"
134 #include "folly/Likely.h"
135 #include "folly/SmallLocks.h"
136
137 namespace folly {
138
139 template<typename T, typename Comp=std::less<T>, int MAX_HEIGHT=24>
140 class ConcurrentSkipList {
141   // MAX_HEIGHT needs to be at least 2 to suppress compiler
142   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
143   // being treated as a scalar in the compiler).
144   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
145       "MAX_HEIGHT can only be in the range of [2, 64)");
146   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
147   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
148
149  public:
150   typedef detail::SkipListNode<T> NodeType;
151   typedef T value_type;
152   typedef T key_type;
153
154   typedef detail::csl_iterator<value_type, NodeType> iterator;
155   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
156
157   class Accessor;
158   class Skipper;
159
160   explicit ConcurrentSkipList(int height)
161     : head_(NodeType::create(height, value_type(), true)), size_(0) { }
162
163   // Convenient function to get an Accessor to a new instance.
164   static Accessor create(int height = 1) {
165     return Accessor(createInstance(height));
166   }
167
168   // Create a shared_ptr skiplist object with initial head height.
169   static std::shared_ptr<SkipListType> createInstance(int height = 1) {
170     return std::make_shared<ConcurrentSkipList>(height);
171   }
172
173   //===================================================================
174   // Below are implementation details.
175   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
176   //===================================================================
177
178   ~ConcurrentSkipList() {
179     CHECK_EQ(recycler_.refs(), 0);
180     for (NodeType* current = head_.load(std::memory_order_relaxed); current; ) {
181       NodeType* tmp = current->skip(0);
182       NodeType::destroy(current);
183       current = tmp;
184     }
185   }
186
187  private:
188   static bool greater(const value_type &data, const NodeType *node) {
189     return node && Comp()(node->data(), data);
190   }
191
192   static bool less(const value_type &data, const NodeType *node) {
193     return (node == nullptr) || Comp()(data, node->data());
194   }
195
196   static int findInsertionPoint(NodeType *cur, int cur_layer,
197       const value_type &data,
198       NodeType *preds[], NodeType *succs[]) {
199     int foundLayer = -1;
200     NodeType *pred = cur;
201     NodeType *foundNode = nullptr;
202     for (int layer = cur_layer; layer >= 0; --layer) {
203       NodeType *node = pred->skip(layer);
204       while (greater(data, node)) {
205         pred = node;
206         node = node->skip(layer);
207       }
208       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
209         foundLayer = layer;
210         foundNode = node;
211       }
212       preds[layer] = pred;
213
214       // if found, succs[0..foundLayer] need to point to the cached foundNode,
215       // as foundNode might be deleted at the same time thus pred->skip() can
216       // return NULL or another node.
217       succs[layer] = foundNode ? foundNode : node;
218     }
219     return foundLayer;
220   }
221
222   struct Recycler : private boost::noncopyable {
223     Recycler() : refs_(0), dirty_(false) { lock_.init(); }
224
225     ~Recycler() {
226       if (nodes_) {
227         for (auto& node : *nodes_) {
228           NodeType::destroy(node);
229         }
230       }
231     }
232
233     void add(NodeType* node) {
234       std::lock_guard<MicroSpinLock> g(lock_);
235       if (nodes_.get() == nullptr) {
236         nodes_.reset(new std::vector<NodeType*>(1, node));
237       } else {
238         nodes_->push_back(node);
239       }
240       DCHECK_GT(refs(), 0);
241       dirty_.store(true, std::memory_order_relaxed);
242     }
243
244     int refs() const {
245       return refs_.load(std::memory_order_relaxed);
246     }
247
248     int addRef() {
249       return refs_.fetch_add(1, std::memory_order_relaxed);
250     }
251
252     int release() {
253       // We don't expect to clean the recycler immediately everytime it is OK
254       // to do so. Here, it is possible that multiple accessors all release at
255       // the same time but nobody would clean the recycler here. If this
256       // happens, the recycler will usually still get cleaned when
257       // such a race doesn't happen. The worst case is the recycler will
258       // eventually get deleted along with the skiplist.
259       if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
260         return refs_.fetch_add(-1, std::memory_order_relaxed);
261       }
262
263       std::unique_ptr<std::vector<NodeType*>> newNodes;
264       {
265         std::lock_guard<MicroSpinLock> g(lock_);
266         if (nodes_.get() == nullptr || refs() > 1) {
267           return refs_.fetch_add(-1, std::memory_order_relaxed);
268         }
269         // once refs_ reaches 1 and there is no other accessor, it is safe to
270         // remove all the current nodes in the recycler, as we already acquired
271         // the lock here so no more new nodes can be added, even though new
272         // accessors may be added after that.
273         newNodes.swap(nodes_);
274         dirty_.store(false, std::memory_order_relaxed);
275       }
276
277       // TODO(xliu) should we spawn a thread to do this when there are large
278       // number of nodes in the recycler?
279       for (auto& node : *newNodes) {
280         NodeType::destroy(node);
281       }
282
283       // decrease the ref count at the very end, to minimize the
284       // chance of other threads acquiring lock_ to clear the deleted
285       // nodes again.
286       return refs_.fetch_add(-1, std::memory_order_relaxed);
287     }
288
289    private:
290     std::unique_ptr<std::vector<NodeType*>> nodes_;
291     std::atomic<int32_t> refs_; // current number of visitors to the list
292     std::atomic<bool> dirty_; // whether *nodes_ is non-empty
293     MicroSpinLock lock_; // protects access to *nodes_
294   };  // class ConcurrentSkipList::Recycler
295
296   size_t size() const { return size_.load(std::memory_order_relaxed); }
297   int height() const {
298     return head_.load(std::memory_order_consume)->height();
299   }
300   int maxLayer() const { return height() - 1; }
301
302   size_t incrementSize(int delta) {
303     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
304   }
305
306   // Returns the node if found, nullptr otherwise.
307   NodeType* find(const value_type &data) {
308     auto ret = findNode(data);
309     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
310     return nullptr;
311   }
312
313   // lock all the necessary nodes for changing (adding or removing) the list.
314   // returns true if all the lock acquried successfully and the related nodes
315   // are all validate (not in certain pending states), false otherwise.
316   bool lockNodesForChange(int nodeHeight,
317       ScopedLocker guards[MAX_HEIGHT],
318       NodeType *preds[MAX_HEIGHT],
319       NodeType *succs[MAX_HEIGHT],
320       bool adding=true) {
321     NodeType *pred, *succ, *prevPred = nullptr;
322     bool valid = true;
323     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
324       pred = preds[layer];
325       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
326         << " nodeheight=" << nodeHeight;
327       succ = succs[layer];
328       if (pred != prevPred) {
329         guards[layer] = pred->acquireGuard();
330         prevPred = pred;
331       }
332       valid = !pred->markedForRemoval() &&
333         pred->skip(layer) == succ;  // check again after locking
334
335       if (adding) {  // when adding a node, the succ shouldn't be going away
336         valid = valid && (succ == nullptr || !succ->markedForRemoval());
337       }
338     }
339
340     return valid;
341   }
342
343   // Returns a paired value:
344   //   pair.first always stores the pointer to the node with the same input key.
345   //     It could be either the newly added data, or the existed data in the
346   //     list with the same key.
347   //   pair.second stores whether the data is added successfully:
348   //     0 means not added, otherwise reutrns the new size.
349   template<typename U>
350   std::pair<NodeType*, size_t> addOrGetData(U &&data) {
351     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
352     NodeType *newNode;
353     size_t newSize;
354     while (true) {
355       int max_layer = 0;
356       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
357
358       if (layer >= 0) {
359         NodeType *nodeFound = succs[layer];
360         DCHECK(nodeFound != nullptr);
361         if (nodeFound->markedForRemoval()) {
362           continue;  // if it's getting deleted retry finding node.
363         }
364         // wait until fully linked.
365         while (UNLIKELY(!nodeFound->fullyLinked())) {}
366         return std::make_pair(nodeFound, 0);
367       }
368
369       // need to capped at the original height -- the real height may have grown
370       int nodeHeight = detail::SkipListRandomHeight::instance()->
371         getHeight(max_layer + 1);
372
373       ScopedLocker guards[MAX_HEIGHT];
374       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
375         continue; // give up the locks and retry until all valid
376       }
377
378       // locks acquired and all valid, need to modify the links under the locks.
379       newNode = NodeType::create(nodeHeight, std::forward<U>(data));
380       for (int layer = 0; layer < nodeHeight; ++layer) {
381         newNode->setSkip(layer, succs[layer]);
382         preds[layer]->setSkip(layer, newNode);
383       }
384
385       newNode->setFullyLinked();
386       newSize = incrementSize(1);
387       break;
388     }
389
390     int hgt = height();
391     size_t sizeLimit =
392       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
393
394     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
395       growHeight(hgt + 1);
396     }
397     CHECK_GT(newSize, 0);
398     return std::make_pair(newNode, newSize);
399   }
400
401   bool remove(const value_type &data) {
402     NodeType *nodeToDelete = nullptr;
403     ScopedLocker nodeGuard;
404     bool isMarked = false;
405     int nodeHeight = 0;
406     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
407
408     while (true) {
409       int max_layer = 0;
410       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
411       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
412         return false;
413       }
414
415       if (!isMarked) {
416         nodeToDelete = succs[layer];
417         nodeHeight = nodeToDelete->height();
418         nodeGuard = nodeToDelete->acquireGuard();
419         if (nodeToDelete->markedForRemoval()) return false;
420         nodeToDelete->setMarkedForRemoval();
421         isMarked = true;
422       }
423
424       // acquire pred locks from bottom layer up
425       ScopedLocker guards[MAX_HEIGHT];
426       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
427         continue;  // this will unlock all the locks
428       }
429
430       for (int layer = nodeHeight - 1; layer >= 0; --layer) {
431         preds[layer]->setSkip(layer, nodeToDelete->skip(layer));
432       }
433
434       incrementSize(-1);
435       break;
436     }
437     recycle(nodeToDelete);
438     return true;
439   }
440
441   const value_type *first() const {
442     auto node = head_.load(std::memory_order_consume)->skip(0);
443     return node ? &node->data() : nullptr;
444   }
445
446   const value_type *last() const {
447     NodeType *pred = head_.load(std::memory_order_consume);
448     NodeType *node = nullptr;
449     for (int layer = maxLayer(); layer >= 0; --layer) {
450       do {
451         node = pred->skip(layer);
452         if (node) pred = node;
453       } while (node != nullptr);
454     }
455     return pred == head_.load(std::memory_order_relaxed)
456       ? nullptr : &pred->data();
457   }
458
459   static bool okToDelete(NodeType *candidate, int layer) {
460     DCHECK(candidate != nullptr);
461     return candidate->fullyLinked() &&
462       candidate->maxLayer() == layer &&
463       !candidate->markedForRemoval();
464   }
465
466   // find node for insertion/deleting
467   int findInsertionPointGetMaxLayer(const value_type &data,
468       NodeType *preds[], NodeType *succs[], int *max_layer) const {
469     *max_layer = maxLayer();
470     return findInsertionPoint(head_.load(std::memory_order_consume),
471       *max_layer, data, preds, succs);
472   }
473
474   // Find node for access. Returns a paired values:
475   // pair.first = the first node that no-less than data value
476   // pair.second = 1 when the data value is founded, or 0 otherwise.
477   // This is like lower_bound, but not exact: we could have the node marked for
478   // removal so still need to check that.
479   std::pair<NodeType*, int> findNode(const value_type &data) const {
480     return findNodeDownRight(data);
481   }
482
483   // Find node by first stepping down then stepping right. Based on benchmark
484   // results, this is slightly faster than findNodeRightDown for better
485   // localality on the skipping pointers.
486   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
487     NodeType *pred = head_.load(std::memory_order_consume);
488     int ht = pred->height();
489     NodeType *node = nullptr;
490
491     bool found = false;
492     while (!found) {
493       // stepping down
494       for (; ht > 0 && less(data, pred->skip(ht - 1)); --ht) {}
495       if (ht == 0) return std::make_pair(pred->skip(0), 0);  // not found
496
497       node = pred->skip(--ht);  // node <= data now
498       // stepping right
499       while (greater(data, node)) {
500         pred = node;
501         node = node->skip(ht);
502       }
503       found = !less(data, node);
504     }
505     return std::make_pair(node, found);
506   }
507
508   // find node by first stepping right then stepping down.
509   // We still keep this for reference purposes.
510   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
511     NodeType *pred = head_.load(std::memory_order_consume);
512     NodeType *node = nullptr;
513     auto top = maxLayer();
514     int found = 0;
515     for (int layer = top; !found && layer >= 0; --layer) {
516       node = pred->skip(layer);
517       while (greater(data, node)) {
518         pred = node;
519         node = node->skip(layer);
520       }
521       found = !less(data, node);
522     }
523     return std::make_pair(node, found);
524   }
525
526   NodeType* lower_bound(const value_type &data) const {
527     auto node = findNode(data).first;
528     while (node != nullptr && node->markedForRemoval()) {
529       node = node->skip(0);
530     }
531     return node;
532   }
533
534   void growHeight(int height) {
535     NodeType* oldHead = head_.load(std::memory_order_consume);
536     if (oldHead->height() >= height) {  // someone else already did this
537       return;
538     }
539
540     NodeType* newHead = NodeType::create(height, value_type(), true);
541
542     { // need to guard the head node in case others are adding/removing
543       // nodes linked to the head.
544       ScopedLocker g = oldHead->acquireGuard();
545       newHead->copyHead(oldHead);
546       NodeType* expected = oldHead;
547       if (!head_.compare_exchange_strong(expected, newHead,
548           std::memory_order_release)) {
549         // if someone has already done the swap, just return.
550         NodeType::destroy(newHead);
551         return;
552       }
553       oldHead->setMarkedForRemoval();
554     }
555     recycle(oldHead);
556   }
557
558   void recycle(NodeType *node) {
559     recycler_.add(node);
560   }
561
562   std::atomic<NodeType*> head_;
563   Recycler recycler_;
564   std::atomic<size_t> size_;
565 };
566
567 template<typename T, typename Comp, int MAX_HEIGHT>
568 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Accessor {
569   typedef detail::SkipListNode<T> NodeType;
570   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
571  public:
572   typedef T value_type;
573   typedef T key_type;
574   typedef T& reference;
575   typedef T* pointer;
576   typedef const T& const_reference;
577   typedef const T* const_pointer;
578   typedef size_t size_type;
579   typedef Comp key_compare;
580   typedef Comp value_compare;
581
582   typedef typename SkipListType::iterator iterator;
583   typedef typename SkipListType::const_iterator const_iterator;
584   typedef typename SkipListType::Skipper Skipper;
585
586   explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
587     : slHolder_(std::move(skip_list))
588   {
589     sl_ = slHolder_.get();
590     DCHECK(sl_ != nullptr);
591     sl_->recycler_.addRef();
592   }
593
594   // Unsafe initializer: the caller assumes the responsibility to keep
595   // skip_list valid during the whole life cycle of the Acessor.
596   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
597     DCHECK(sl_ != nullptr);
598     sl_->recycler_.addRef();
599   }
600
601   Accessor(const Accessor &accessor) :
602       sl_(accessor.sl_),
603       slHolder_(accessor.slHolder_) {
604     sl_->recycler_.addRef();
605   }
606
607   Accessor& operator=(const Accessor &accessor) {
608     if (this != &accessor) {
609       slHolder_ = accessor.slHolder_;
610       sl_->recycler_.release();
611       sl_ = accessor.sl_;
612       sl_->recycler_.addRef();
613     }
614     return *this;
615   }
616
617   ~Accessor() {
618     sl_->recycler_.release();
619   }
620
621   bool empty() const { return sl_->size() == 0; }
622   size_t size() const { return sl_->size(); }
623   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
624
625   // returns end() if the value is not in the list, otherwise returns an
626   // iterator pointing to the data, and it's guaranteed that the data is valid
627   // as far as the Accessor is hold.
628   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
629   const_iterator find(const key_type &value) const {
630     return iterator(sl_->find(value));
631   }
632   size_type count(const key_type &data) const { return contains(data); }
633
634   iterator begin() const {
635     NodeType* head = sl_->head_.load(std::memory_order_consume);
636     return iterator(head->next());
637   }
638   iterator end() const { return iterator(nullptr); }
639   const_iterator cbegin() const { return begin(); }
640   const_iterator cend() const { return end(); }
641
642   template<typename U,
643     typename=typename std::enable_if<std::is_convertible<U, T>::value>::type>
644   std::pair<iterator, bool> insert(U&& data) {
645     auto ret = sl_->addOrGetData(std::forward<U>(data));
646     return std::make_pair(iterator(ret.first), ret.second);
647   }
648   size_t erase(const key_type &data) { return remove(data); }
649
650   iterator lower_bound(const key_type &data) const {
651     return iterator(sl_->lower_bound(data));
652   }
653
654   size_t height() const { return sl_->height(); }
655
656   // first() returns pointer to the first element in the skiplist, or
657   // nullptr if empty.
658   //
659   // last() returns the pointer to the last element in the skiplist,
660   // nullptr if list is empty.
661   //
662   // Note: As concurrent writing can happen, first() is not
663   //   guaranteed to be the min_element() in the list. Similarly
664   //   last() is not guaranteed to be the max_element(), and both of them can
665   //   be invalid (i.e. nullptr), so we name them differently from front() and
666   //   tail() here.
667   const key_type *first() const { return sl_->first(); }
668   const key_type *last() const { return sl_->last(); }
669
670   // Try to remove the last element in the skip list.
671   //
672   // Returns true if we removed it, false if either the list is empty
673   // or a race condition happened (i.e. the used-to-be last element
674   // was already removed by another thread).
675   bool pop_back() {
676     auto last = sl_->last();
677     return last ? sl_->remove(*last) : false;
678   }
679
680   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
681     auto ret = sl_->addOrGetData(data);
682     return std::make_pair(&ret.first->data(), ret.second);
683   }
684
685   SkipListType* skiplist() const { return sl_; }
686
687   // legacy interfaces
688   // TODO:(xliu) remove these.
689   // Returns true if the node is added successfully, false if not, i.e. the
690   // node with the same key already existed in the list.
691   bool contains(const key_type &data) const { return sl_->find(data); }
692   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
693   bool remove(const key_type &data) { return sl_->remove(data); }
694
695  private:
696   SkipListType *sl_;
697   std::shared_ptr<SkipListType> slHolder_;
698 };
699
700 // implements forward iterator concept.
701 template<typename ValT, typename NodeT>
702 class detail::csl_iterator :
703   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
704     ValT, boost::forward_traversal_tag> {
705  public:
706   typedef ValT value_type;
707   typedef value_type& reference;
708   typedef value_type* pointer;
709   typedef ptrdiff_t difference_type;
710
711   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
712
713   template<typename OtherVal, typename OtherNode>
714   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
715       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
716       = 0) : node_(other.node_) {}
717
718   size_t nodeSize() const {
719     return node_ == nullptr ? 0 :
720       node_->height() * sizeof(NodeT*) + sizeof(*this);
721   }
722
723   bool good() const { return node_ != nullptr; }
724
725  private:
726   friend class boost::iterator_core_access;
727   template<class,class> friend class csl_iterator;
728
729   void increment() { node_ = node_->next(); };
730   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
731   value_type& dereference() const { return node_->data(); }
732
733   NodeT* node_;
734 };
735
736 // Skipper interface
737 template<typename T, typename Comp, int MAX_HEIGHT>
738 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Skipper {
739   typedef detail::SkipListNode<T> NodeType;
740   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
741   typedef typename SkipListType::Accessor Accessor;
742
743  public:
744   typedef T  value_type;
745   typedef T& reference;
746   typedef T* pointer;
747   typedef ptrdiff_t difference_type;
748
749   Skipper(const std::shared_ptr<SkipListType>& skipList) :
750     accessor_(skipList) {
751     init();
752   }
753
754   Skipper(const Accessor& accessor) : accessor_(accessor) {
755     init();
756   }
757
758   void init() {
759     // need to cache the head node
760     NodeType* head_node = head();
761     headHeight_ = head_node->height();
762     for (int i = 0; i < headHeight_; ++i) {
763       preds_[i] = head_node;
764       succs_[i] = head_node->skip(i);
765     }
766     int max_layer = maxLayer();
767     for (int i = 0; i < max_layer; ++i) {
768       hints_[i] = i + 1;
769     }
770     hints_[max_layer] = max_layer;
771   }
772
773   // advance to the next node in the list.
774   Skipper& operator ++() {
775     preds_[0] = succs_[0];
776     succs_[0] = preds_[0]->skip(0);
777     int height = curHeight();
778     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
779       preds_[i] = succs_[i];
780       succs_[i] = preds_[i]->skip(i);
781     }
782     return *this;
783   }
784
785   bool good() const { return succs_[0] != nullptr; }
786
787   int maxLayer() const { return headHeight_ - 1; }
788
789   int curHeight() const {
790     // need to cap the height to the cached head height, as the current node
791     // might be some newly inserted node and also during the time period the
792     // head height may have grown.
793     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
794   }
795
796   const value_type &data() const {
797     DCHECK(succs_[0] != nullptr);
798     return succs_[0]->data();
799   }
800
801   value_type &operator *() const {
802     DCHECK(succs_[0] != nullptr);
803     return succs_[0]->data();
804   }
805
806   value_type *operator->() {
807     DCHECK(succs_[0] != nullptr);
808     return &succs_[0]->data();
809   }
810
811   /*
812    * Skip to the position whose data is no less than the parameter.
813    * (I.e. the lower_bound).
814    *
815    * Returns true if the data is found, false otherwise.
816    */
817   bool to(const value_type &data) {
818     int layer = curHeight() - 1;
819     if (layer < 0) return false;   // reaches the end of the list
820
821     int lyr = hints_[layer];
822     int max_layer = maxLayer();
823     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
824       ++lyr;
825     }
826     hints_[layer] = lyr;  // update the hint
827
828     int foundLayer = SkipListType::
829       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
830     if (foundLayer < 0) return false;
831
832     DCHECK(succs_[0] != nullptr) << "lyr=" << lyr << "; max_layer=" << max_layer;
833     return !succs_[0]->markedForRemoval();
834   }
835
836  private:
837   NodeType* head() const {
838     return accessor_.skiplist()->head_.load(std::memory_order_consume);
839   }
840
841   Accessor accessor_;
842   int headHeight_;
843   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
844   uint8_t hints_[MAX_HEIGHT];
845 };
846
847 } // namespace folly
848
849 #endif  // FOLLY_CONCURRENT_SKIP_LIST_H_