Update the folly/README
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #ifndef FOLLY_CONCURRENT_SKIP_LIST_H_
121 #define FOLLY_CONCURRENT_SKIP_LIST_H_
122
123 #include <algorithm>
124 #include <climits>
125 #include <type_traits>
126 #include <utility>
127 #include <vector>
128 #include <atomic>
129 #include <thread>
130 #include <boost/iterator/iterator_facade.hpp>
131 #include <boost/scoped_ptr.hpp>
132 #include <boost/shared_ptr.hpp>
133
134 #include <glog/logging.h>
135 #include "folly/ConcurrentSkipList-inl.h"
136 #include "folly/Likely.h"
137 #include "folly/SmallLocks.h"
138
139 namespace folly {
140
141 template<typename T, typename Comp=std::less<T>, int MAX_HEIGHT=24>
142 class ConcurrentSkipList {
143   // MAX_HEIGHT needs to be at least 2 to suppress compiler
144   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
145   // being treated as a scalar in the compiler).
146   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
147       "MAX_HEIGHT can only be in the range of [2, 64)");
148   typedef detail::SkipListNode<T> NodeType;
149   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
150   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
151
152  public:
153   typedef T value_type;
154   typedef T key_type;
155
156
157   typedef detail::csl_iterator<value_type, NodeType> iterator;
158   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
159
160   class Accessor;
161   class Skipper;
162
163   // convenient function to get an Accessor to a new instance.
164   static Accessor create(int height=1) {
165     return Accessor(createInstance(height));
166   }
167
168   // create a shared_ptr skiplist object with initial head height.
169   static boost::shared_ptr<SkipListType> createInstance(int height=1) {
170     return boost::shared_ptr<SkipListType>(new SkipListType(height));
171   }
172
173   //===================================================================
174   // Below are implementation details.
175   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
176   //===================================================================
177
178   ~ConcurrentSkipList() {
179     LOG_IF(FATAL, recycler_.refs() > 0)
180       << "number of accessors is not 0, " << recycler_.refs() << " instead!"
181       << " This shouldn't have happened!";
182     while (NodeType* current = head_.load(std::memory_order_relaxed)) {
183       NodeType* tmp = current->skip(0);
184       NodeType::destroy(current);
185       head_.store(tmp, std::memory_order_relaxed);
186     }
187   }
188
189  private:
190
191   static bool greater(const value_type &data, const NodeType *node) {
192     return node && Comp()(node->data(), data);
193   }
194
195   static bool less(const value_type &data, const NodeType *node) {
196     return (node == nullptr) || Comp()(data, node->data());
197   }
198
199   static int findInsertionPoint(NodeType *cur, int cur_layer,
200       const value_type &data,
201       NodeType *preds[], NodeType *succs[]) {
202     int foundLayer = -1;
203     NodeType *pred = cur;
204     NodeType *foundNode = nullptr;
205     for (int layer = cur_layer; layer >= 0; --layer) {
206       NodeType *node = pred->skip(layer);
207       while (greater(data, node)) {
208         pred = node;
209         node = node->skip(layer);
210       }
211       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
212         foundLayer = layer;
213         foundNode = node;
214       }
215       preds[layer] = pred;
216
217       // if found, succs[0..foundLayer] need to point to the cached foundNode,
218       // as foundNode might be deleted at the same time thus pred->skip() can
219       // return NULL or another node.
220       succs[layer] = foundNode ? foundNode : node;
221     }
222     return foundLayer;
223   }
224
225   struct Recycler : private boost::noncopyable {
226     Recycler() : refs_(0), dirty_(false) { lock_.init(); }
227
228     ~Recycler() {
229       if (nodes_) {
230         for (auto& node : *nodes_) {
231           NodeType::destroy(node);
232         }
233       }
234     }
235
236     void add(NodeType* node) {
237       std::lock_guard<MicroSpinLock> g(lock_);
238       if (nodes_.get() == nullptr) {
239         nodes_.reset(new std::vector<NodeType*>(1, node));
240       } else {
241         nodes_->push_back(node);
242       }
243       DCHECK_GT(refs(), 0);
244       dirty_.store(true, std::memory_order_relaxed);
245     }
246
247     int refs() const {
248       return refs_.load(std::memory_order_relaxed);
249     }
250
251     int addRef() {
252       return refs_.fetch_add(1, std::memory_order_relaxed);
253     }
254
255     int release() {
256       // We don't expect to clean the recycler immediately everytime it is OK
257       // to do so. Here, it is possible that multiple accessors all release at
258       // the same time but nobody would clean the recycler here. If this
259       // happens, the recycler will usually still get cleaned when
260       // such a race doesn't happen. The worst case is the recycler will
261       // eventually get deleted along with the skiplist.
262       if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
263         return refs_.fetch_add(-1, std::memory_order_relaxed);
264       }
265
266       boost::scoped_ptr<std::vector<NodeType*> > newNodes;
267       {
268         std::lock_guard<MicroSpinLock> g(lock_);
269         if (nodes_.get() == nullptr || refs() > 1) {
270           return refs_.fetch_add(-1, std::memory_order_relaxed);
271         }
272         // once refs_ reaches 1 and there is no other accessor, it is safe to
273         // remove all the current nodes in the recycler, as we already acquired
274         // the lock here so no more new nodes can be added, even though new
275         // accessors may be added after that.
276         newNodes.swap(nodes_);
277         dirty_.store(false, std::memory_order_relaxed);
278       }
279
280       // TODO(xliu) should we spawn a thread to do this when there are large
281       // number of nodes in the recycler?
282       for (auto& node : *newNodes) {
283         NodeType::destroy(node);
284       }
285
286       // decrease the ref count at the very end, to minimize the
287       // chance of other threads acquiring lock_ to clear the deleted
288       // nodes again.
289       return refs_.fetch_add(-1, std::memory_order_relaxed);
290     }
291
292    private:
293     boost::scoped_ptr<std::vector<NodeType*>> nodes_;
294     std::atomic<int32_t> refs_; // current number of visitors to the list
295     std::atomic<bool> dirty_; // whether *nodes_ is non-empty
296     MicroSpinLock lock_; // protects access to *nodes_
297   };  // class ConcurrentSkipList::Recycler
298
299   explicit ConcurrentSkipList(int height) :
300     head_(NodeType::create(height, value_type(), true)), size_(0) {}
301
302   size_t size() const { return size_.load(std::memory_order_relaxed); }
303   int height() const {
304     return head_.load(std::memory_order_consume)->height();
305   }
306   int maxLayer() const { return height() - 1; }
307
308   size_t incrementSize(int delta) {
309     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
310   }
311
312   // Returns the node if found, nullptr otherwise.
313   NodeType* find(const value_type &data) {
314     auto ret = findNode(data);
315     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
316     return nullptr;
317   }
318
319   // lock all the necessary nodes for changing (adding or removing) the list.
320   // returns true if all the lock acquried successfully and the related nodes
321   // are all validate (not in certain pending states), false otherwise.
322   bool lockNodesForChange(int nodeHeight,
323       ScopedLocker guards[MAX_HEIGHT],
324       NodeType *preds[MAX_HEIGHT],
325       NodeType *succs[MAX_HEIGHT],
326       bool adding=true) {
327     NodeType *pred, *succ, *prevPred = nullptr;
328     bool valid = true;
329     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
330       pred = preds[layer];
331       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
332         << " nodeheight=" << nodeHeight;
333       succ = succs[layer];
334       if (pred != prevPred) {
335         guards[layer] = pred->acquireGuard();
336         prevPred = pred;
337       }
338       valid = !pred->markedForRemoval() &&
339         pred->skip(layer) == succ;  // check again after locking
340
341       if (adding) {  // when adding a node, the succ shouldn't be going away
342         valid = valid && (succ == nullptr || !succ->markedForRemoval());
343       }
344     }
345
346     return valid;
347   }
348
349   // Returns a paired value:
350   //   pair.first always stores the pointer to the node with the same input key.
351   //     It could be either the newly added data, or the existed data in the
352   //     list with the same key.
353   //   pair.second stores whether the data is added successfully:
354   //     0 means not added, otherwise reutrns the new size.
355   std::pair<NodeType*, size_t> addOrGetData(const value_type &data) {
356     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
357     NodeType *newNode;
358     size_t newSize;
359     while (true) {
360       int max_layer = 0;
361       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
362
363       if (layer >= 0) {
364         NodeType *nodeFound = succs[layer];
365         DCHECK(nodeFound != nullptr);
366         if (nodeFound->markedForRemoval()) {
367           continue;  // if it's getting deleted retry finding node.
368         }
369         // wait until fully linked.
370         while (UNLIKELY(!nodeFound->fullyLinked())) {}
371         return std::make_pair(nodeFound, 0);
372       }
373
374       // need to capped at the original height -- the real height may have grown
375       int nodeHeight = detail::SkipListRandomHeight::instance()->
376         getHeight(max_layer + 1);
377
378       ScopedLocker guards[MAX_HEIGHT];
379       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
380         continue; // give up the locks and retry until all valid
381       }
382
383       // locks acquired and all valid, need to modify the links under the locks.
384       newNode = NodeType::create(nodeHeight, data);
385       for (int layer = 0; layer < nodeHeight; ++layer) {
386         newNode->setSkip(layer, succs[layer]);
387         preds[layer]->setSkip(layer, newNode);
388       }
389
390       newNode->setFullyLinked();
391       newSize = incrementSize(1);
392       break;
393     }
394
395     int hgt = height();
396     size_t sizeLimit =
397       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
398
399     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
400       growHeight(hgt + 1);
401     }
402     CHECK_GT(newSize, 0);
403     return std::make_pair(newNode, newSize);
404   }
405
406   bool remove(const value_type &data) {
407     NodeType *nodeToDelete = nullptr;
408     ScopedLocker nodeGuard;
409     bool isMarked = false;
410     int nodeHeight = 0;
411     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
412
413     while (true) {
414       int max_layer = 0;
415       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
416       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
417         return false;
418       }
419
420       if (!isMarked) {
421         nodeToDelete = succs[layer];
422         nodeHeight = nodeToDelete->height();
423         nodeGuard = nodeToDelete->acquireGuard();
424         if (nodeToDelete->markedForRemoval()) return false;
425         nodeToDelete->setMarkedForRemoval();
426         isMarked = true;
427       }
428
429       // acquire pred locks from bottom layer up
430       ScopedLocker guards[MAX_HEIGHT];
431       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
432         continue;  // this will unlock all the locks
433       }
434
435       for (int layer = nodeHeight - 1; layer >= 0; --layer) {
436         preds[layer]->setSkip(layer, nodeToDelete->skip(layer));
437       }
438
439       incrementSize(-1);
440       break;
441     }
442     recycle(nodeToDelete);
443     return true;
444   }
445
446   const value_type *first() const {
447     auto node = head_.load(std::memory_order_consume)->skip(0);
448     return node ? &node->data() : nullptr;
449   }
450
451   const value_type *last() const {
452     NodeType *pred = head_.load(std::memory_order_consume);
453     NodeType *node = nullptr;
454     for (int layer = maxLayer(); layer >= 0; --layer) {
455       do {
456         node = pred->skip(layer);
457         if (node) pred = node;
458       } while (node != nullptr);
459     }
460     return pred == head_.load(std::memory_order_relaxed)
461       ? nullptr : &pred->data();
462   }
463
464   static bool okToDelete(NodeType *candidate, int layer) {
465     DCHECK(candidate != nullptr);
466     return candidate->fullyLinked() &&
467       candidate->maxLayer() == layer &&
468       !candidate->markedForRemoval();
469   }
470
471   // find node for insertion/deleting
472   int findInsertionPointGetMaxLayer(const value_type &data,
473       NodeType *preds[], NodeType *succs[], int *max_layer) const {
474     *max_layer = maxLayer();
475     return findInsertionPoint(head_.load(std::memory_order_consume),
476       *max_layer, data, preds, succs);
477   }
478
479   // Find node for access. Returns a paired values:
480   // pair.first = the first node that no-less than data value
481   // pair.second = 1 when the data value is founded, or 0 otherwise.
482   // This is like lower_bound, but not exact: we could have the node marked for
483   // removal so still need to check that.
484   std::pair<NodeType*, int> findNode(const value_type &data) const {
485     return findNodeDownRight(data);
486   }
487
488   // Find node by first stepping down then stepping right. Based on benchmark
489   // results, this is slightly faster than findNodeRightDown for better
490   // localality on the skipping pointers.
491   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
492     NodeType *pred = head_.load(std::memory_order_consume);
493     int ht = pred->height();
494     NodeType *node = nullptr;
495
496     bool found = false;
497     while (!found) {
498       // stepping down
499       for (; ht > 0 && less(data, pred->skip(ht - 1)); --ht) {}
500       if (ht == 0) return std::make_pair(pred->skip(0), 0);  // not found
501
502       node = pred->skip(--ht);  // node <= data now
503       // stepping right
504       while (greater(data, node)) {
505         pred = node;
506         node = node->skip(ht);
507       }
508       found = !less(data, node);
509     }
510     return std::make_pair(node, found);
511   }
512
513   // find node by first stepping right then stepping down.
514   // We still keep this for reference purposes.
515   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
516     NodeType *pred = head_.load(std::memory_order_consume);
517     NodeType *node = nullptr;
518     auto top = maxLayer();
519     int found = 0;
520     for (int layer = top; !found && layer >= 0; --layer) {
521       node = pred->skip(layer);
522       while (greater(data, node)) {
523         pred = node;
524         node = node->skip(layer);
525       }
526       found = !less(data, node);
527     }
528     return std::make_pair(node, found);
529   }
530
531   NodeType* lower_bound(const value_type &data) const {
532     auto node = findNode(data).first;
533     while (node != nullptr && node->markedForRemoval()) {
534       node = node->skip(0);
535     }
536     return node;
537   }
538
539   void growHeight(int height) {
540     NodeType* oldHead = head_.load(std::memory_order_consume);
541     if (oldHead->height() >= height) {  // someone else already did this
542       return;
543     }
544
545     NodeType* newHead = NodeType::create(height, value_type(), true);
546
547     { // need to guard the head node in case others are adding/removing
548       // nodes linked to the head.
549       ScopedLocker g = oldHead->acquireGuard();
550       newHead->promoteFrom(oldHead);
551       NodeType* expected = oldHead;
552       if (!head_.compare_exchange_strong(expected, newHead,
553           std::memory_order_release)) {
554         // if someone has already done the swap, just return.
555         NodeType::destroy(newHead);
556         return;
557       }
558       oldHead->setMarkedForRemoval();
559     }
560     recycle(oldHead);
561   }
562
563   void recycle(NodeType *node) {
564     recycler_.add(node);
565   }
566
567   std::atomic<NodeType*> head_;
568   Recycler recycler_;
569   std::atomic<size_t> size_;
570 };
571
572 template<typename T, typename Comp, int MAX_HEIGHT>
573 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Accessor {
574   typedef detail::SkipListNode<T> NodeType;
575   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
576  public:
577   typedef T value_type;
578   typedef T key_type;
579   typedef T& reference;
580   typedef T* pointer;
581   typedef const T& const_reference;
582   typedef const T* const_pointer;
583   typedef size_t size_type;
584   typedef Comp key_compare;
585   typedef Comp value_compare;
586
587   typedef typename SkipListType::iterator iterator;
588   typedef typename SkipListType::const_iterator const_iterator;
589   typedef typename SkipListType::Skipper Skipper;
590
591   explicit Accessor(boost::shared_ptr<ConcurrentSkipList> skip_list)
592     : slHolder_(std::move(skip_list))
593   {
594     sl_ = slHolder_.get();
595     DCHECK(sl_ != nullptr);
596     sl_->recycler_.addRef();
597   }
598
599   // Unsafe initializer: the caller assumes the responsibility to keep
600   // skip_list valid during the whole life cycle of the Acessor.
601   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
602     DCHECK(sl_ != nullptr);
603     sl_->recycler_.addRef();
604   }
605
606   Accessor(const Accessor &accessor) :
607       sl_(accessor.sl_),
608       slHolder_(accessor.slHolder_) {
609     sl_->recycler_.addRef();
610   }
611
612   Accessor& operator=(const Accessor &accessor) {
613     if (this != &accessor) {
614       slHolder_ = accessor.slHolder_;
615       sl_->recycler_.release();
616       sl_ = accessor.sl_;
617       sl_->recycler_.addRef();
618     }
619     return *this;
620   }
621
622   ~Accessor() {
623     sl_->recycler_.release();
624   }
625
626   bool empty() const { return sl_->size() == 0; }
627   size_t size() const { return sl_->size(); }
628   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
629
630   // returns end() if the value is not in the list, otherwise returns an
631   // iterator pointing to the data, and it's guaranteed that the data is valid
632   // as far as the Accessor is hold.
633   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
634   const_iterator find(const key_type &value) const {
635     return iterator(sl_->find(value));
636   }
637   size_type count(const key_type &data) const { return contains(data); }
638
639   iterator begin() const {
640     NodeType* head = sl_->head_.load(std::memory_order_consume);
641     return iterator(head->next());
642   }
643   iterator end() const { return iterator(nullptr); }
644   const_iterator cbegin() const { return begin(); }
645   const_iterator cend() const { return end(); }
646
647   std::pair<iterator, bool> insert(const key_type &data) {
648     auto ret = sl_->addOrGetData(data);
649     return std::make_pair(iterator(ret.first), ret.second);
650   }
651   size_t erase(const key_type &data) { return remove(data); }
652
653   iterator lower_bound(const key_type &data) const {
654     return iterator(sl_->lower_bound(data));
655   }
656
657   size_t height() const { return sl_->height(); }
658
659   // first() returns pointer to the first element in the skiplist, or
660   // nullptr if empty.
661   //
662   // last() returns the pointer to the last element in the skiplist,
663   // nullptr if list is empty.
664   //
665   // Note: As concurrent writing can happen, first() is not
666   //   guaranteed to be the min_element() in the list. Similarly
667   //   last() is not guaranteed to be the max_element(), and both of them can
668   //   be invalid (i.e. nullptr), so we name them differently from front() and
669   //   tail() here.
670   const key_type *first() const { return sl_->first(); }
671   const key_type *last() const { return sl_->last(); }
672
673   // Try to remove the last element in the skip list.
674   //
675   // Returns true if we removed it, false if either the list is empty
676   // or a race condition happened (i.e. the used-to-be last element
677   // was already removed by another thread).
678   bool pop_back() {
679     auto last = sl_->last();
680     return last ? sl_->remove(*last) : false;
681   }
682
683   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
684     auto ret = sl_->addOrGetData(data);
685     return std::make_pair(&ret.first->data(), ret.second);
686   }
687
688   SkipListType* skiplist() const { return sl_; }
689
690   // legacy interfaces
691   // TODO:(xliu) remove these.
692   // Returns true if the node is added successfully, false if not, i.e. the
693   // node with the same key already existed in the list.
694   bool contains(const key_type &data) const { return sl_->find(data); }
695   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
696   bool remove(const key_type &data) { return sl_->remove(data); }
697
698  private:
699   SkipListType *sl_;
700   boost::shared_ptr<SkipListType> slHolder_;
701 };
702
703 // implements forward iterator concept.
704 template<typename ValT, typename NodeT>
705 class detail::csl_iterator :
706   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
707     ValT, boost::forward_traversal_tag> {
708  public:
709   typedef ValT value_type;
710   typedef value_type& reference;
711   typedef value_type* pointer;
712   typedef ptrdiff_t difference_type;
713
714   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
715
716   template<typename OtherVal, typename OtherNode>
717   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
718       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
719       = 0) : node_(other.node_) {}
720
721   size_t nodeSize() const {
722     return node_ == nullptr ? 0 :
723       node_->height() * sizeof(NodeT*) + sizeof(*this);
724   }
725
726   bool good() const { return node_ != nullptr; }
727
728  private:
729   friend class boost::iterator_core_access;
730   template<class,class> friend class csl_iterator;
731
732   void increment() { node_ = node_->next(); };
733   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
734   value_type& dereference() const { return node_->data(); }
735
736   NodeT* node_;
737 };
738
739 // Skipper interface
740 template<typename T, typename Comp, int MAX_HEIGHT>
741 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Skipper {
742   typedef detail::SkipListNode<T> NodeType;
743   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
744   typedef typename SkipListType::Accessor Accessor;
745
746  public:
747   typedef T  value_type;
748   typedef T& reference;
749   typedef T* pointer;
750   typedef ptrdiff_t difference_type;
751
752   Skipper(const boost::shared_ptr<SkipListType>& skipList) :
753     accessor_(skipList) {
754     init();
755   }
756
757   Skipper(const Accessor& accessor) : accessor_(accessor) {
758     init();
759   }
760
761   void init() {
762     // need to cache the head node
763     NodeType* head_node = head();
764     headHeight_ = head_node->height();
765     for (int i = 0; i < headHeight_; ++i) {
766       preds_[i] = head_node;
767       succs_[i] = head_node->skip(i);
768     }
769     int max_layer = maxLayer();
770     for (int i = 0; i < max_layer; ++i) {
771       hints_[i] = i + 1;
772     }
773     hints_[max_layer] = max_layer;
774   }
775
776   // advance to the next node in the list.
777   Skipper& operator ++() {
778     preds_[0] = succs_[0];
779     succs_[0] = preds_[0]->skip(0);
780     int height = curHeight();
781     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
782       preds_[i] = succs_[i];
783       succs_[i] = preds_[i]->skip(i);
784     }
785     return *this;
786   }
787
788   bool good() const { return succs_[0] != nullptr; }
789
790   int maxLayer() const { return headHeight_ - 1; }
791
792   int curHeight() const {
793     // need to cap the height to the cached head height, as the current node
794     // might be some newly inserted node and also during the time period the
795     // head height may have grown.
796     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
797   }
798
799   const value_type &data() const {
800     DCHECK(succs_[0] != NULL);
801     return succs_[0]->data();
802   }
803
804   value_type &operator *() const {
805     DCHECK(succs_[0] != NULL);
806     return succs_[0]->data();
807   }
808
809   value_type *operator->() {
810     DCHECK(succs_[0] != NULL);
811     return &succs_[0]->data();
812   }
813
814   /*
815    * Skip to the position whose data is no less than the parameter.
816    * (I.e. the lower_bound).
817    *
818    * Returns true if the data is found, false otherwise.
819    */
820   bool to(const value_type &data) {
821     int layer = curHeight() - 1;
822     if (layer < 0) return false;   // reaches the end of the list
823
824     int lyr = hints_[layer];
825     int max_layer = maxLayer();
826     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
827       ++lyr;
828     }
829     hints_[layer] = lyr;  // update the hint
830
831     int foundLayer = SkipListType::
832       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
833     if (foundLayer < 0) return false;
834
835     DCHECK(succs_[0] != NULL) << "lyr=" << lyr << "; max_layer=" << max_layer;
836     return !succs_[0]->markedForRemoval();
837   }
838
839  private:
840   NodeType* head() const {
841     return accessor_.skiplist()->head_.load(std::memory_order_consume);
842   }
843
844   Accessor accessor_;
845   int headHeight_;
846   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
847   uint8_t hints_[MAX_HEIGHT];
848 };
849
850 } // namespace folly
851
852 #endif  // FOLLY_CONCURRENT_SKIP_LIST_H_