change shared_ptr<>* to raw pointer
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #ifndef FOLLY_CONCURRENT_SKIP_LIST_H_
121 #define FOLLY_CONCURRENT_SKIP_LIST_H_
122
123 #include <algorithm>
124 #include <climits>
125 #include <type_traits>
126 #include <utility>
127 #include <vector>
128 #include <atomic>
129 #include <thread>
130 #include <boost/iterator/iterator_facade.hpp>
131 #include <boost/scoped_ptr.hpp>
132 #include <boost/shared_ptr.hpp>
133
134 #include <glog/logging.h>
135 #include "folly/ConcurrentSkipList-inl.h"
136 #include "folly/Likely.h"
137 #include "folly/SmallLocks.h"
138
139 namespace folly {
140
141 template<typename T, typename Comp=std::less<T>, int MAX_HEIGHT=24>
142 class ConcurrentSkipList {
143   // MAX_HEIGHT needs to be at least 2 to suppress compiler
144   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
145   // being treated as a scalar in the compiler).
146   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
147       "MAX_HEIGHT can only be in the range of [2, 64)");
148   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
149   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
150
151  public:
152   typedef detail::SkipListNode<T> NodeType;
153   typedef T value_type;
154   typedef T key_type;
155
156
157   typedef detail::csl_iterator<value_type, NodeType> iterator;
158   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
159
160   class Accessor;
161   class Skipper;
162
163   // convenient function to get an Accessor to a new instance.
164   static Accessor create(int height=1) {
165     return Accessor(createInstance(height));
166   }
167
168   // create a shared_ptr skiplist object with initial head height.
169   static boost::shared_ptr<SkipListType> createInstance(int height=1) {
170     return boost::shared_ptr<SkipListType>(new SkipListType(height));
171   }
172
173   // create a unique_ptr skiplist object with initial head height.
174   static std::unique_ptr<SkipListType> createRawInstance(int height=1) {
175     return std::unique_ptr<SkipListType>(new SkipListType(height));
176   }
177
178
179   //===================================================================
180   // Below are implementation details.
181   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
182   //===================================================================
183
184   ~ConcurrentSkipList() {
185     LOG_IF(FATAL, recycler_.refs() > 0)
186       << "number of accessors is not 0, " << recycler_.refs() << " instead!"
187       << " This shouldn't have happened!";
188     while (NodeType* current = head_.load(std::memory_order_relaxed)) {
189       NodeType* tmp = current->skip(0);
190       NodeType::destroy(current);
191       head_.store(tmp, std::memory_order_relaxed);
192     }
193   }
194
195  private:
196
197   static bool greater(const value_type &data, const NodeType *node) {
198     return node && Comp()(node->data(), data);
199   }
200
201   static bool less(const value_type &data, const NodeType *node) {
202     return (node == nullptr) || Comp()(data, node->data());
203   }
204
205   static int findInsertionPoint(NodeType *cur, int cur_layer,
206       const value_type &data,
207       NodeType *preds[], NodeType *succs[]) {
208     int foundLayer = -1;
209     NodeType *pred = cur;
210     NodeType *foundNode = nullptr;
211     for (int layer = cur_layer; layer >= 0; --layer) {
212       NodeType *node = pred->skip(layer);
213       while (greater(data, node)) {
214         pred = node;
215         node = node->skip(layer);
216       }
217       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
218         foundLayer = layer;
219         foundNode = node;
220       }
221       preds[layer] = pred;
222
223       // if found, succs[0..foundLayer] need to point to the cached foundNode,
224       // as foundNode might be deleted at the same time thus pred->skip() can
225       // return NULL or another node.
226       succs[layer] = foundNode ? foundNode : node;
227     }
228     return foundLayer;
229   }
230
231   struct Recycler : private boost::noncopyable {
232     Recycler() : refs_(0), dirty_(false) { lock_.init(); }
233
234     ~Recycler() {
235       if (nodes_) {
236         for (auto& node : *nodes_) {
237           NodeType::destroy(node);
238         }
239       }
240     }
241
242     void add(NodeType* node) {
243       std::lock_guard<MicroSpinLock> g(lock_);
244       if (nodes_.get() == nullptr) {
245         nodes_.reset(new std::vector<NodeType*>(1, node));
246       } else {
247         nodes_->push_back(node);
248       }
249       DCHECK_GT(refs(), 0);
250       dirty_.store(true, std::memory_order_relaxed);
251     }
252
253     int refs() const {
254       return refs_.load(std::memory_order_relaxed);
255     }
256
257     int addRef() {
258       return refs_.fetch_add(1, std::memory_order_relaxed);
259     }
260
261     int release() {
262       // We don't expect to clean the recycler immediately everytime it is OK
263       // to do so. Here, it is possible that multiple accessors all release at
264       // the same time but nobody would clean the recycler here. If this
265       // happens, the recycler will usually still get cleaned when
266       // such a race doesn't happen. The worst case is the recycler will
267       // eventually get deleted along with the skiplist.
268       if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
269         return refs_.fetch_add(-1, std::memory_order_relaxed);
270       }
271
272       boost::scoped_ptr<std::vector<NodeType*> > newNodes;
273       {
274         std::lock_guard<MicroSpinLock> g(lock_);
275         if (nodes_.get() == nullptr || refs() > 1) {
276           return refs_.fetch_add(-1, std::memory_order_relaxed);
277         }
278         // once refs_ reaches 1 and there is no other accessor, it is safe to
279         // remove all the current nodes in the recycler, as we already acquired
280         // the lock here so no more new nodes can be added, even though new
281         // accessors may be added after that.
282         newNodes.swap(nodes_);
283         dirty_.store(false, std::memory_order_relaxed);
284       }
285
286       // TODO(xliu) should we spawn a thread to do this when there are large
287       // number of nodes in the recycler?
288       for (auto& node : *newNodes) {
289         NodeType::destroy(node);
290       }
291
292       // decrease the ref count at the very end, to minimize the
293       // chance of other threads acquiring lock_ to clear the deleted
294       // nodes again.
295       return refs_.fetch_add(-1, std::memory_order_relaxed);
296     }
297
298    private:
299     boost::scoped_ptr<std::vector<NodeType*>> nodes_;
300     std::atomic<int32_t> refs_; // current number of visitors to the list
301     std::atomic<bool> dirty_; // whether *nodes_ is non-empty
302     MicroSpinLock lock_; // protects access to *nodes_
303   };  // class ConcurrentSkipList::Recycler
304
305   explicit ConcurrentSkipList(int height) :
306     head_(NodeType::create(height, value_type(), true)), size_(0) {}
307
308   size_t size() const { return size_.load(std::memory_order_relaxed); }
309   int height() const {
310     return head_.load(std::memory_order_consume)->height();
311   }
312   int maxLayer() const { return height() - 1; }
313
314   size_t incrementSize(int delta) {
315     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
316   }
317
318   // Returns the node if found, nullptr otherwise.
319   NodeType* find(const value_type &data) {
320     auto ret = findNode(data);
321     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
322     return nullptr;
323   }
324
325   // lock all the necessary nodes for changing (adding or removing) the list.
326   // returns true if all the lock acquried successfully and the related nodes
327   // are all validate (not in certain pending states), false otherwise.
328   bool lockNodesForChange(int nodeHeight,
329       ScopedLocker guards[MAX_HEIGHT],
330       NodeType *preds[MAX_HEIGHT],
331       NodeType *succs[MAX_HEIGHT],
332       bool adding=true) {
333     NodeType *pred, *succ, *prevPred = nullptr;
334     bool valid = true;
335     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
336       pred = preds[layer];
337       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
338         << " nodeheight=" << nodeHeight;
339       succ = succs[layer];
340       if (pred != prevPred) {
341         guards[layer] = pred->acquireGuard();
342         prevPred = pred;
343       }
344       valid = !pred->markedForRemoval() &&
345         pred->skip(layer) == succ;  // check again after locking
346
347       if (adding) {  // when adding a node, the succ shouldn't be going away
348         valid = valid && (succ == nullptr || !succ->markedForRemoval());
349       }
350     }
351
352     return valid;
353   }
354
355   // Returns a paired value:
356   //   pair.first always stores the pointer to the node with the same input key.
357   //     It could be either the newly added data, or the existed data in the
358   //     list with the same key.
359   //   pair.second stores whether the data is added successfully:
360   //     0 means not added, otherwise reutrns the new size.
361   std::pair<NodeType*, size_t> addOrGetData(const value_type &data) {
362     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
363     NodeType *newNode;
364     size_t newSize;
365     while (true) {
366       int max_layer = 0;
367       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
368
369       if (layer >= 0) {
370         NodeType *nodeFound = succs[layer];
371         DCHECK(nodeFound != nullptr);
372         if (nodeFound->markedForRemoval()) {
373           continue;  // if it's getting deleted retry finding node.
374         }
375         // wait until fully linked.
376         while (UNLIKELY(!nodeFound->fullyLinked())) {}
377         return std::make_pair(nodeFound, 0);
378       }
379
380       // need to capped at the original height -- the real height may have grown
381       int nodeHeight = detail::SkipListRandomHeight::instance()->
382         getHeight(max_layer + 1);
383
384       ScopedLocker guards[MAX_HEIGHT];
385       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
386         continue; // give up the locks and retry until all valid
387       }
388
389       // locks acquired and all valid, need to modify the links under the locks.
390       newNode = NodeType::create(nodeHeight, data);
391       for (int layer = 0; layer < nodeHeight; ++layer) {
392         newNode->setSkip(layer, succs[layer]);
393         preds[layer]->setSkip(layer, newNode);
394       }
395
396       newNode->setFullyLinked();
397       newSize = incrementSize(1);
398       break;
399     }
400
401     int hgt = height();
402     size_t sizeLimit =
403       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
404
405     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
406       growHeight(hgt + 1);
407     }
408     CHECK_GT(newSize, 0);
409     return std::make_pair(newNode, newSize);
410   }
411
412   bool remove(const value_type &data) {
413     NodeType *nodeToDelete = nullptr;
414     ScopedLocker nodeGuard;
415     bool isMarked = false;
416     int nodeHeight = 0;
417     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
418
419     while (true) {
420       int max_layer = 0;
421       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
422       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
423         return false;
424       }
425
426       if (!isMarked) {
427         nodeToDelete = succs[layer];
428         nodeHeight = nodeToDelete->height();
429         nodeGuard = nodeToDelete->acquireGuard();
430         if (nodeToDelete->markedForRemoval()) return false;
431         nodeToDelete->setMarkedForRemoval();
432         isMarked = true;
433       }
434
435       // acquire pred locks from bottom layer up
436       ScopedLocker guards[MAX_HEIGHT];
437       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
438         continue;  // this will unlock all the locks
439       }
440
441       for (int layer = nodeHeight - 1; layer >= 0; --layer) {
442         preds[layer]->setSkip(layer, nodeToDelete->skip(layer));
443       }
444
445       incrementSize(-1);
446       break;
447     }
448     recycle(nodeToDelete);
449     return true;
450   }
451
452   const value_type *first() const {
453     auto node = head_.load(std::memory_order_consume)->skip(0);
454     return node ? &node->data() : nullptr;
455   }
456
457   const value_type *last() const {
458     NodeType *pred = head_.load(std::memory_order_consume);
459     NodeType *node = nullptr;
460     for (int layer = maxLayer(); layer >= 0; --layer) {
461       do {
462         node = pred->skip(layer);
463         if (node) pred = node;
464       } while (node != nullptr);
465     }
466     return pred == head_.load(std::memory_order_relaxed)
467       ? nullptr : &pred->data();
468   }
469
470   static bool okToDelete(NodeType *candidate, int layer) {
471     DCHECK(candidate != nullptr);
472     return candidate->fullyLinked() &&
473       candidate->maxLayer() == layer &&
474       !candidate->markedForRemoval();
475   }
476
477   // find node for insertion/deleting
478   int findInsertionPointGetMaxLayer(const value_type &data,
479       NodeType *preds[], NodeType *succs[], int *max_layer) const {
480     *max_layer = maxLayer();
481     return findInsertionPoint(head_.load(std::memory_order_consume),
482       *max_layer, data, preds, succs);
483   }
484
485   // Find node for access. Returns a paired values:
486   // pair.first = the first node that no-less than data value
487   // pair.second = 1 when the data value is founded, or 0 otherwise.
488   // This is like lower_bound, but not exact: we could have the node marked for
489   // removal so still need to check that.
490   std::pair<NodeType*, int> findNode(const value_type &data) const {
491     return findNodeDownRight(data);
492   }
493
494   // Find node by first stepping down then stepping right. Based on benchmark
495   // results, this is slightly faster than findNodeRightDown for better
496   // localality on the skipping pointers.
497   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
498     NodeType *pred = head_.load(std::memory_order_consume);
499     int ht = pred->height();
500     NodeType *node = nullptr;
501
502     bool found = false;
503     while (!found) {
504       // stepping down
505       for (; ht > 0 && less(data, pred->skip(ht - 1)); --ht) {}
506       if (ht == 0) return std::make_pair(pred->skip(0), 0);  // not found
507
508       node = pred->skip(--ht);  // node <= data now
509       // stepping right
510       while (greater(data, node)) {
511         pred = node;
512         node = node->skip(ht);
513       }
514       found = !less(data, node);
515     }
516     return std::make_pair(node, found);
517   }
518
519   // find node by first stepping right then stepping down.
520   // We still keep this for reference purposes.
521   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
522     NodeType *pred = head_.load(std::memory_order_consume);
523     NodeType *node = nullptr;
524     auto top = maxLayer();
525     int found = 0;
526     for (int layer = top; !found && layer >= 0; --layer) {
527       node = pred->skip(layer);
528       while (greater(data, node)) {
529         pred = node;
530         node = node->skip(layer);
531       }
532       found = !less(data, node);
533     }
534     return std::make_pair(node, found);
535   }
536
537   NodeType* lower_bound(const value_type &data) const {
538     auto node = findNode(data).first;
539     while (node != nullptr && node->markedForRemoval()) {
540       node = node->skip(0);
541     }
542     return node;
543   }
544
545   void growHeight(int height) {
546     NodeType* oldHead = head_.load(std::memory_order_consume);
547     if (oldHead->height() >= height) {  // someone else already did this
548       return;
549     }
550
551     NodeType* newHead = NodeType::create(height, value_type(), true);
552
553     { // need to guard the head node in case others are adding/removing
554       // nodes linked to the head.
555       ScopedLocker g = oldHead->acquireGuard();
556       newHead->promoteFrom(oldHead);
557       NodeType* expected = oldHead;
558       if (!head_.compare_exchange_strong(expected, newHead,
559           std::memory_order_release)) {
560         // if someone has already done the swap, just return.
561         NodeType::destroy(newHead);
562         return;
563       }
564       oldHead->setMarkedForRemoval();
565     }
566     recycle(oldHead);
567   }
568
569   void recycle(NodeType *node) {
570     recycler_.add(node);
571   }
572
573   std::atomic<NodeType*> head_;
574   Recycler recycler_;
575   std::atomic<size_t> size_;
576 };
577
578 template<typename T, typename Comp, int MAX_HEIGHT>
579 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Accessor {
580   typedef detail::SkipListNode<T> NodeType;
581   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
582  public:
583   typedef T value_type;
584   typedef T key_type;
585   typedef T& reference;
586   typedef T* pointer;
587   typedef const T& const_reference;
588   typedef const T* const_pointer;
589   typedef size_t size_type;
590   typedef Comp key_compare;
591   typedef Comp value_compare;
592
593   typedef typename SkipListType::iterator iterator;
594   typedef typename SkipListType::const_iterator const_iterator;
595   typedef typename SkipListType::Skipper Skipper;
596
597   explicit Accessor(boost::shared_ptr<ConcurrentSkipList> skip_list)
598     : slHolder_(std::move(skip_list))
599   {
600     sl_ = slHolder_.get();
601     DCHECK(sl_ != nullptr);
602     sl_->recycler_.addRef();
603   }
604
605   // Unsafe initializer: the caller assumes the responsibility to keep
606   // skip_list valid during the whole life cycle of the Acessor.
607   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
608     DCHECK(sl_ != nullptr);
609     sl_->recycler_.addRef();
610   }
611
612   Accessor(const Accessor &accessor) :
613       sl_(accessor.sl_),
614       slHolder_(accessor.slHolder_) {
615     sl_->recycler_.addRef();
616   }
617
618   Accessor& operator=(const Accessor &accessor) {
619     if (this != &accessor) {
620       slHolder_ = accessor.slHolder_;
621       sl_->recycler_.release();
622       sl_ = accessor.sl_;
623       sl_->recycler_.addRef();
624     }
625     return *this;
626   }
627
628   ~Accessor() {
629     sl_->recycler_.release();
630   }
631
632   bool empty() const { return sl_->size() == 0; }
633   size_t size() const { return sl_->size(); }
634   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
635
636   // returns end() if the value is not in the list, otherwise returns an
637   // iterator pointing to the data, and it's guaranteed that the data is valid
638   // as far as the Accessor is hold.
639   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
640   const_iterator find(const key_type &value) const {
641     return iterator(sl_->find(value));
642   }
643   size_type count(const key_type &data) const { return contains(data); }
644
645   iterator begin() const {
646     NodeType* head = sl_->head_.load(std::memory_order_consume);
647     return iterator(head->next());
648   }
649   iterator end() const { return iterator(nullptr); }
650   const_iterator cbegin() const { return begin(); }
651   const_iterator cend() const { return end(); }
652
653   std::pair<iterator, bool> insert(const key_type &data) {
654     auto ret = sl_->addOrGetData(data);
655     return std::make_pair(iterator(ret.first), ret.second);
656   }
657   size_t erase(const key_type &data) { return remove(data); }
658
659   iterator lower_bound(const key_type &data) const {
660     return iterator(sl_->lower_bound(data));
661   }
662
663   size_t height() const { return sl_->height(); }
664
665   // first() returns pointer to the first element in the skiplist, or
666   // nullptr if empty.
667   //
668   // last() returns the pointer to the last element in the skiplist,
669   // nullptr if list is empty.
670   //
671   // Note: As concurrent writing can happen, first() is not
672   //   guaranteed to be the min_element() in the list. Similarly
673   //   last() is not guaranteed to be the max_element(), and both of them can
674   //   be invalid (i.e. nullptr), so we name them differently from front() and
675   //   tail() here.
676   const key_type *first() const { return sl_->first(); }
677   const key_type *last() const { return sl_->last(); }
678
679   // Try to remove the last element in the skip list.
680   //
681   // Returns true if we removed it, false if either the list is empty
682   // or a race condition happened (i.e. the used-to-be last element
683   // was already removed by another thread).
684   bool pop_back() {
685     auto last = sl_->last();
686     return last ? sl_->remove(*last) : false;
687   }
688
689   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
690     auto ret = sl_->addOrGetData(data);
691     return std::make_pair(&ret.first->data(), ret.second);
692   }
693
694   SkipListType* skiplist() const { return sl_; }
695
696   // legacy interfaces
697   // TODO:(xliu) remove these.
698   // Returns true if the node is added successfully, false if not, i.e. the
699   // node with the same key already existed in the list.
700   bool contains(const key_type &data) const { return sl_->find(data); }
701   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
702   bool remove(const key_type &data) { return sl_->remove(data); }
703
704  private:
705   SkipListType *sl_;
706   boost::shared_ptr<SkipListType> slHolder_;
707 };
708
709 // implements forward iterator concept.
710 template<typename ValT, typename NodeT>
711 class detail::csl_iterator :
712   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
713     ValT, boost::forward_traversal_tag> {
714  public:
715   typedef ValT value_type;
716   typedef value_type& reference;
717   typedef value_type* pointer;
718   typedef ptrdiff_t difference_type;
719
720   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
721
722   template<typename OtherVal, typename OtherNode>
723   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
724       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
725       = 0) : node_(other.node_) {}
726
727   size_t nodeSize() const {
728     return node_ == nullptr ? 0 :
729       node_->height() * sizeof(NodeT*) + sizeof(*this);
730   }
731
732   bool good() const { return node_ != nullptr; }
733
734  private:
735   friend class boost::iterator_core_access;
736   template<class,class> friend class csl_iterator;
737
738   void increment() { node_ = node_->next(); };
739   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
740   value_type& dereference() const { return node_->data(); }
741
742   NodeT* node_;
743 };
744
745 // Skipper interface
746 template<typename T, typename Comp, int MAX_HEIGHT>
747 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Skipper {
748   typedef detail::SkipListNode<T> NodeType;
749   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
750   typedef typename SkipListType::Accessor Accessor;
751
752  public:
753   typedef T  value_type;
754   typedef T& reference;
755   typedef T* pointer;
756   typedef ptrdiff_t difference_type;
757
758   Skipper(const boost::shared_ptr<SkipListType>& skipList) :
759     accessor_(skipList) {
760     init();
761   }
762
763   Skipper(const Accessor& accessor) : accessor_(accessor) {
764     init();
765   }
766
767   void init() {
768     // need to cache the head node
769     NodeType* head_node = head();
770     headHeight_ = head_node->height();
771     for (int i = 0; i < headHeight_; ++i) {
772       preds_[i] = head_node;
773       succs_[i] = head_node->skip(i);
774     }
775     int max_layer = maxLayer();
776     for (int i = 0; i < max_layer; ++i) {
777       hints_[i] = i + 1;
778     }
779     hints_[max_layer] = max_layer;
780   }
781
782   // advance to the next node in the list.
783   Skipper& operator ++() {
784     preds_[0] = succs_[0];
785     succs_[0] = preds_[0]->skip(0);
786     int height = curHeight();
787     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
788       preds_[i] = succs_[i];
789       succs_[i] = preds_[i]->skip(i);
790     }
791     return *this;
792   }
793
794   bool good() const { return succs_[0] != nullptr; }
795
796   int maxLayer() const { return headHeight_ - 1; }
797
798   int curHeight() const {
799     // need to cap the height to the cached head height, as the current node
800     // might be some newly inserted node and also during the time period the
801     // head height may have grown.
802     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
803   }
804
805   const value_type &data() const {
806     DCHECK(succs_[0] != NULL);
807     return succs_[0]->data();
808   }
809
810   value_type &operator *() const {
811     DCHECK(succs_[0] != NULL);
812     return succs_[0]->data();
813   }
814
815   value_type *operator->() {
816     DCHECK(succs_[0] != NULL);
817     return &succs_[0]->data();
818   }
819
820   /*
821    * Skip to the position whose data is no less than the parameter.
822    * (I.e. the lower_bound).
823    *
824    * Returns true if the data is found, false otherwise.
825    */
826   bool to(const value_type &data) {
827     int layer = curHeight() - 1;
828     if (layer < 0) return false;   // reaches the end of the list
829
830     int lyr = hints_[layer];
831     int max_layer = maxLayer();
832     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
833       ++lyr;
834     }
835     hints_[layer] = lyr;  // update the hint
836
837     int foundLayer = SkipListType::
838       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
839     if (foundLayer < 0) return false;
840
841     DCHECK(succs_[0] != NULL) << "lyr=" << lyr << "; max_layer=" << max_layer;
842     return !succs_[0]->markedForRemoval();
843   }
844
845  private:
846   NodeType* head() const {
847     return accessor_.skiplist()->head_.load(std::memory_order_consume);
848   }
849
850   Accessor accessor_;
851   int headHeight_;
852   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
853   uint8_t hints_[MAX_HEIGHT];
854 };
855
856 } // namespace folly
857
858 #endif  // FOLLY_CONCURRENT_SKIP_LIST_H_