Harden failure signal handler in the face of memory corruptions
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #ifndef FOLLY_CONCURRENT_SKIP_LIST_H_
121 #define FOLLY_CONCURRENT_SKIP_LIST_H_
122
123 #include <algorithm>
124 #include <climits>
125 #include <type_traits>
126 #include <utility>
127 #include <vector>
128 #include <atomic>
129 #include <thread>
130 #include <boost/iterator/iterator_facade.hpp>
131 #include <boost/scoped_ptr.hpp>
132 #include <memory>
133
134 #include <glog/logging.h>
135 #include "folly/ConcurrentSkipList-inl.h"
136 #include "folly/Likely.h"
137 #include "folly/SmallLocks.h"
138
139 namespace folly {
140
141 template<typename T, typename Comp=std::less<T>, int MAX_HEIGHT=24>
142 class ConcurrentSkipList {
143   // MAX_HEIGHT needs to be at least 2 to suppress compiler
144   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
145   // being treated as a scalar in the compiler).
146   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
147       "MAX_HEIGHT can only be in the range of [2, 64)");
148   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
149   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
150
151  public:
152   typedef detail::SkipListNode<T> NodeType;
153   typedef T value_type;
154   typedef T key_type;
155
156
157   typedef detail::csl_iterator<value_type, NodeType> iterator;
158   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
159
160   class Accessor;
161   class Skipper;
162
163   // convenient function to get an Accessor to a new instance.
164   static Accessor create(int height=1) {
165     return Accessor(createInstance(height));
166   }
167
168   // create a shared_ptr skiplist object with initial head height.
169   static std::shared_ptr<SkipListType> createInstance(int height=1) {
170     return std::shared_ptr<SkipListType>(new SkipListType(height));
171   }
172
173   // create a unique_ptr skiplist object with initial head height.
174   static std::unique_ptr<SkipListType> createRawInstance(int height=1) {
175     return std::unique_ptr<SkipListType>(new SkipListType(height));
176   }
177
178
179   //===================================================================
180   // Below are implementation details.
181   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
182   //===================================================================
183
184   ~ConcurrentSkipList() {
185     CHECK_EQ(recycler_.refs(), 0);
186     while (NodeType* current = head_.load(std::memory_order_relaxed)) {
187       NodeType* tmp = current->skip(0);
188       NodeType::destroy(current);
189       head_.store(tmp, std::memory_order_relaxed);
190     }
191   }
192
193  private:
194
195   static bool greater(const value_type &data, const NodeType *node) {
196     return node && Comp()(node->data(), data);
197   }
198
199   static bool less(const value_type &data, const NodeType *node) {
200     return (node == nullptr) || Comp()(data, node->data());
201   }
202
203   static int findInsertionPoint(NodeType *cur, int cur_layer,
204       const value_type &data,
205       NodeType *preds[], NodeType *succs[]) {
206     int foundLayer = -1;
207     NodeType *pred = cur;
208     NodeType *foundNode = nullptr;
209     for (int layer = cur_layer; layer >= 0; --layer) {
210       NodeType *node = pred->skip(layer);
211       while (greater(data, node)) {
212         pred = node;
213         node = node->skip(layer);
214       }
215       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
216         foundLayer = layer;
217         foundNode = node;
218       }
219       preds[layer] = pred;
220
221       // if found, succs[0..foundLayer] need to point to the cached foundNode,
222       // as foundNode might be deleted at the same time thus pred->skip() can
223       // return NULL or another node.
224       succs[layer] = foundNode ? foundNode : node;
225     }
226     return foundLayer;
227   }
228
229   struct Recycler : private boost::noncopyable {
230     Recycler() : refs_(0), dirty_(false) { lock_.init(); }
231
232     ~Recycler() {
233       if (nodes_) {
234         for (auto& node : *nodes_) {
235           NodeType::destroy(node);
236         }
237       }
238     }
239
240     void add(NodeType* node) {
241       std::lock_guard<MicroSpinLock> g(lock_);
242       if (nodes_.get() == nullptr) {
243         nodes_.reset(new std::vector<NodeType*>(1, node));
244       } else {
245         nodes_->push_back(node);
246       }
247       DCHECK_GT(refs(), 0);
248       dirty_.store(true, std::memory_order_relaxed);
249     }
250
251     int refs() const {
252       return refs_.load(std::memory_order_relaxed);
253     }
254
255     int addRef() {
256       return refs_.fetch_add(1, std::memory_order_relaxed);
257     }
258
259     int release() {
260       // We don't expect to clean the recycler immediately everytime it is OK
261       // to do so. Here, it is possible that multiple accessors all release at
262       // the same time but nobody would clean the recycler here. If this
263       // happens, the recycler will usually still get cleaned when
264       // such a race doesn't happen. The worst case is the recycler will
265       // eventually get deleted along with the skiplist.
266       if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
267         return refs_.fetch_add(-1, std::memory_order_relaxed);
268       }
269
270       boost::scoped_ptr<std::vector<NodeType*> > newNodes;
271       {
272         std::lock_guard<MicroSpinLock> g(lock_);
273         if (nodes_.get() == nullptr || refs() > 1) {
274           return refs_.fetch_add(-1, std::memory_order_relaxed);
275         }
276         // once refs_ reaches 1 and there is no other accessor, it is safe to
277         // remove all the current nodes in the recycler, as we already acquired
278         // the lock here so no more new nodes can be added, even though new
279         // accessors may be added after that.
280         newNodes.swap(nodes_);
281         dirty_.store(false, std::memory_order_relaxed);
282       }
283
284       // TODO(xliu) should we spawn a thread to do this when there are large
285       // number of nodes in the recycler?
286       for (auto& node : *newNodes) {
287         NodeType::destroy(node);
288       }
289
290       // decrease the ref count at the very end, to minimize the
291       // chance of other threads acquiring lock_ to clear the deleted
292       // nodes again.
293       return refs_.fetch_add(-1, std::memory_order_relaxed);
294     }
295
296    private:
297     boost::scoped_ptr<std::vector<NodeType*>> nodes_;
298     std::atomic<int32_t> refs_; // current number of visitors to the list
299     std::atomic<bool> dirty_; // whether *nodes_ is non-empty
300     MicroSpinLock lock_; // protects access to *nodes_
301   };  // class ConcurrentSkipList::Recycler
302
303   explicit ConcurrentSkipList(int height) :
304     head_(NodeType::create(height, value_type(), true)), size_(0) {}
305
306   size_t size() const { return size_.load(std::memory_order_relaxed); }
307   int height() const {
308     return head_.load(std::memory_order_consume)->height();
309   }
310   int maxLayer() const { return height() - 1; }
311
312   size_t incrementSize(int delta) {
313     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
314   }
315
316   // Returns the node if found, nullptr otherwise.
317   NodeType* find(const value_type &data) {
318     auto ret = findNode(data);
319     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
320     return nullptr;
321   }
322
323   // lock all the necessary nodes for changing (adding or removing) the list.
324   // returns true if all the lock acquried successfully and the related nodes
325   // are all validate (not in certain pending states), false otherwise.
326   bool lockNodesForChange(int nodeHeight,
327       ScopedLocker guards[MAX_HEIGHT],
328       NodeType *preds[MAX_HEIGHT],
329       NodeType *succs[MAX_HEIGHT],
330       bool adding=true) {
331     NodeType *pred, *succ, *prevPred = nullptr;
332     bool valid = true;
333     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
334       pred = preds[layer];
335       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
336         << " nodeheight=" << nodeHeight;
337       succ = succs[layer];
338       if (pred != prevPred) {
339         guards[layer] = pred->acquireGuard();
340         prevPred = pred;
341       }
342       valid = !pred->markedForRemoval() &&
343         pred->skip(layer) == succ;  // check again after locking
344
345       if (adding) {  // when adding a node, the succ shouldn't be going away
346         valid = valid && (succ == nullptr || !succ->markedForRemoval());
347       }
348     }
349
350     return valid;
351   }
352
353   // Returns a paired value:
354   //   pair.first always stores the pointer to the node with the same input key.
355   //     It could be either the newly added data, or the existed data in the
356   //     list with the same key.
357   //   pair.second stores whether the data is added successfully:
358   //     0 means not added, otherwise reutrns the new size.
359   template<typename U>
360   std::pair<NodeType*, size_t> addOrGetData(U &&data) {
361     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
362     NodeType *newNode;
363     size_t newSize;
364     while (true) {
365       int max_layer = 0;
366       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
367
368       if (layer >= 0) {
369         NodeType *nodeFound = succs[layer];
370         DCHECK(nodeFound != nullptr);
371         if (nodeFound->markedForRemoval()) {
372           continue;  // if it's getting deleted retry finding node.
373         }
374         // wait until fully linked.
375         while (UNLIKELY(!nodeFound->fullyLinked())) {}
376         return std::make_pair(nodeFound, 0);
377       }
378
379       // need to capped at the original height -- the real height may have grown
380       int nodeHeight = detail::SkipListRandomHeight::instance()->
381         getHeight(max_layer + 1);
382
383       ScopedLocker guards[MAX_HEIGHT];
384       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
385         continue; // give up the locks and retry until all valid
386       }
387
388       // locks acquired and all valid, need to modify the links under the locks.
389       newNode = NodeType::create(nodeHeight, std::forward<U>(data));
390       for (int layer = 0; layer < nodeHeight; ++layer) {
391         newNode->setSkip(layer, succs[layer]);
392         preds[layer]->setSkip(layer, newNode);
393       }
394
395       newNode->setFullyLinked();
396       newSize = incrementSize(1);
397       break;
398     }
399
400     int hgt = height();
401     size_t sizeLimit =
402       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
403
404     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
405       growHeight(hgt + 1);
406     }
407     CHECK_GT(newSize, 0);
408     return std::make_pair(newNode, newSize);
409   }
410
411   bool remove(const value_type &data) {
412     NodeType *nodeToDelete = nullptr;
413     ScopedLocker nodeGuard;
414     bool isMarked = false;
415     int nodeHeight = 0;
416     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
417
418     while (true) {
419       int max_layer = 0;
420       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
421       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
422         return false;
423       }
424
425       if (!isMarked) {
426         nodeToDelete = succs[layer];
427         nodeHeight = nodeToDelete->height();
428         nodeGuard = nodeToDelete->acquireGuard();
429         if (nodeToDelete->markedForRemoval()) return false;
430         nodeToDelete->setMarkedForRemoval();
431         isMarked = true;
432       }
433
434       // acquire pred locks from bottom layer up
435       ScopedLocker guards[MAX_HEIGHT];
436       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
437         continue;  // this will unlock all the locks
438       }
439
440       for (int layer = nodeHeight - 1; layer >= 0; --layer) {
441         preds[layer]->setSkip(layer, nodeToDelete->skip(layer));
442       }
443
444       incrementSize(-1);
445       break;
446     }
447     recycle(nodeToDelete);
448     return true;
449   }
450
451   const value_type *first() const {
452     auto node = head_.load(std::memory_order_consume)->skip(0);
453     return node ? &node->data() : nullptr;
454   }
455
456   const value_type *last() const {
457     NodeType *pred = head_.load(std::memory_order_consume);
458     NodeType *node = nullptr;
459     for (int layer = maxLayer(); layer >= 0; --layer) {
460       do {
461         node = pred->skip(layer);
462         if (node) pred = node;
463       } while (node != nullptr);
464     }
465     return pred == head_.load(std::memory_order_relaxed)
466       ? nullptr : &pred->data();
467   }
468
469   static bool okToDelete(NodeType *candidate, int layer) {
470     DCHECK(candidate != nullptr);
471     return candidate->fullyLinked() &&
472       candidate->maxLayer() == layer &&
473       !candidate->markedForRemoval();
474   }
475
476   // find node for insertion/deleting
477   int findInsertionPointGetMaxLayer(const value_type &data,
478       NodeType *preds[], NodeType *succs[], int *max_layer) const {
479     *max_layer = maxLayer();
480     return findInsertionPoint(head_.load(std::memory_order_consume),
481       *max_layer, data, preds, succs);
482   }
483
484   // Find node for access. Returns a paired values:
485   // pair.first = the first node that no-less than data value
486   // pair.second = 1 when the data value is founded, or 0 otherwise.
487   // This is like lower_bound, but not exact: we could have the node marked for
488   // removal so still need to check that.
489   std::pair<NodeType*, int> findNode(const value_type &data) const {
490     return findNodeDownRight(data);
491   }
492
493   // Find node by first stepping down then stepping right. Based on benchmark
494   // results, this is slightly faster than findNodeRightDown for better
495   // localality on the skipping pointers.
496   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
497     NodeType *pred = head_.load(std::memory_order_consume);
498     int ht = pred->height();
499     NodeType *node = nullptr;
500
501     bool found = false;
502     while (!found) {
503       // stepping down
504       for (; ht > 0 && less(data, pred->skip(ht - 1)); --ht) {}
505       if (ht == 0) return std::make_pair(pred->skip(0), 0);  // not found
506
507       node = pred->skip(--ht);  // node <= data now
508       // stepping right
509       while (greater(data, node)) {
510         pred = node;
511         node = node->skip(ht);
512       }
513       found = !less(data, node);
514     }
515     return std::make_pair(node, found);
516   }
517
518   // find node by first stepping right then stepping down.
519   // We still keep this for reference purposes.
520   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
521     NodeType *pred = head_.load(std::memory_order_consume);
522     NodeType *node = nullptr;
523     auto top = maxLayer();
524     int found = 0;
525     for (int layer = top; !found && layer >= 0; --layer) {
526       node = pred->skip(layer);
527       while (greater(data, node)) {
528         pred = node;
529         node = node->skip(layer);
530       }
531       found = !less(data, node);
532     }
533     return std::make_pair(node, found);
534   }
535
536   NodeType* lower_bound(const value_type &data) const {
537     auto node = findNode(data).first;
538     while (node != nullptr && node->markedForRemoval()) {
539       node = node->skip(0);
540     }
541     return node;
542   }
543
544   void growHeight(int height) {
545     NodeType* oldHead = head_.load(std::memory_order_consume);
546     if (oldHead->height() >= height) {  // someone else already did this
547       return;
548     }
549
550     NodeType* newHead = NodeType::create(height, value_type(), true);
551
552     { // need to guard the head node in case others are adding/removing
553       // nodes linked to the head.
554       ScopedLocker g = oldHead->acquireGuard();
555       newHead->copyHead(oldHead);
556       NodeType* expected = oldHead;
557       if (!head_.compare_exchange_strong(expected, newHead,
558           std::memory_order_release)) {
559         // if someone has already done the swap, just return.
560         NodeType::destroy(newHead);
561         return;
562       }
563       oldHead->setMarkedForRemoval();
564     }
565     recycle(oldHead);
566   }
567
568   void recycle(NodeType *node) {
569     recycler_.add(node);
570   }
571
572   std::atomic<NodeType*> head_;
573   Recycler recycler_;
574   std::atomic<size_t> size_;
575 };
576
577 template<typename T, typename Comp, int MAX_HEIGHT>
578 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Accessor {
579   typedef detail::SkipListNode<T> NodeType;
580   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
581  public:
582   typedef T value_type;
583   typedef T key_type;
584   typedef T& reference;
585   typedef T* pointer;
586   typedef const T& const_reference;
587   typedef const T* const_pointer;
588   typedef size_t size_type;
589   typedef Comp key_compare;
590   typedef Comp value_compare;
591
592   typedef typename SkipListType::iterator iterator;
593   typedef typename SkipListType::const_iterator const_iterator;
594   typedef typename SkipListType::Skipper Skipper;
595
596   explicit Accessor(std::shared_ptr<ConcurrentSkipList> skip_list)
597     : slHolder_(std::move(skip_list))
598   {
599     sl_ = slHolder_.get();
600     DCHECK(sl_ != nullptr);
601     sl_->recycler_.addRef();
602   }
603
604   // Unsafe initializer: the caller assumes the responsibility to keep
605   // skip_list valid during the whole life cycle of the Acessor.
606   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
607     DCHECK(sl_ != nullptr);
608     sl_->recycler_.addRef();
609   }
610
611   Accessor(const Accessor &accessor) :
612       sl_(accessor.sl_),
613       slHolder_(accessor.slHolder_) {
614     sl_->recycler_.addRef();
615   }
616
617   Accessor& operator=(const Accessor &accessor) {
618     if (this != &accessor) {
619       slHolder_ = accessor.slHolder_;
620       sl_->recycler_.release();
621       sl_ = accessor.sl_;
622       sl_->recycler_.addRef();
623     }
624     return *this;
625   }
626
627   ~Accessor() {
628     sl_->recycler_.release();
629   }
630
631   bool empty() const { return sl_->size() == 0; }
632   size_t size() const { return sl_->size(); }
633   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
634
635   // returns end() if the value is not in the list, otherwise returns an
636   // iterator pointing to the data, and it's guaranteed that the data is valid
637   // as far as the Accessor is hold.
638   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
639   const_iterator find(const key_type &value) const {
640     return iterator(sl_->find(value));
641   }
642   size_type count(const key_type &data) const { return contains(data); }
643
644   iterator begin() const {
645     NodeType* head = sl_->head_.load(std::memory_order_consume);
646     return iterator(head->next());
647   }
648   iterator end() const { return iterator(nullptr); }
649   const_iterator cbegin() const { return begin(); }
650   const_iterator cend() const { return end(); }
651
652   template<typename U,
653     typename=typename std::enable_if<std::is_convertible<U, T>::value>::type>
654   std::pair<iterator, bool> insert(U&& data) {
655     auto ret = sl_->addOrGetData(std::forward<U>(data));
656     return std::make_pair(iterator(ret.first), ret.second);
657   }
658   size_t erase(const key_type &data) { return remove(data); }
659
660   iterator lower_bound(const key_type &data) const {
661     return iterator(sl_->lower_bound(data));
662   }
663
664   size_t height() const { return sl_->height(); }
665
666   // first() returns pointer to the first element in the skiplist, or
667   // nullptr if empty.
668   //
669   // last() returns the pointer to the last element in the skiplist,
670   // nullptr if list is empty.
671   //
672   // Note: As concurrent writing can happen, first() is not
673   //   guaranteed to be the min_element() in the list. Similarly
674   //   last() is not guaranteed to be the max_element(), and both of them can
675   //   be invalid (i.e. nullptr), so we name them differently from front() and
676   //   tail() here.
677   const key_type *first() const { return sl_->first(); }
678   const key_type *last() const { return sl_->last(); }
679
680   // Try to remove the last element in the skip list.
681   //
682   // Returns true if we removed it, false if either the list is empty
683   // or a race condition happened (i.e. the used-to-be last element
684   // was already removed by another thread).
685   bool pop_back() {
686     auto last = sl_->last();
687     return last ? sl_->remove(*last) : false;
688   }
689
690   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
691     auto ret = sl_->addOrGetData(data);
692     return std::make_pair(&ret.first->data(), ret.second);
693   }
694
695   SkipListType* skiplist() const { return sl_; }
696
697   // legacy interfaces
698   // TODO:(xliu) remove these.
699   // Returns true if the node is added successfully, false if not, i.e. the
700   // node with the same key already existed in the list.
701   bool contains(const key_type &data) const { return sl_->find(data); }
702   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
703   bool remove(const key_type &data) { return sl_->remove(data); }
704
705  private:
706   SkipListType *sl_;
707   std::shared_ptr<SkipListType> slHolder_;
708 };
709
710 // implements forward iterator concept.
711 template<typename ValT, typename NodeT>
712 class detail::csl_iterator :
713   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
714     ValT, boost::forward_traversal_tag> {
715  public:
716   typedef ValT value_type;
717   typedef value_type& reference;
718   typedef value_type* pointer;
719   typedef ptrdiff_t difference_type;
720
721   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
722
723   template<typename OtherVal, typename OtherNode>
724   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
725       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
726       = 0) : node_(other.node_) {}
727
728   size_t nodeSize() const {
729     return node_ == nullptr ? 0 :
730       node_->height() * sizeof(NodeT*) + sizeof(*this);
731   }
732
733   bool good() const { return node_ != nullptr; }
734
735  private:
736   friend class boost::iterator_core_access;
737   template<class,class> friend class csl_iterator;
738
739   void increment() { node_ = node_->next(); };
740   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
741   value_type& dereference() const { return node_->data(); }
742
743   NodeT* node_;
744 };
745
746 // Skipper interface
747 template<typename T, typename Comp, int MAX_HEIGHT>
748 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Skipper {
749   typedef detail::SkipListNode<T> NodeType;
750   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
751   typedef typename SkipListType::Accessor Accessor;
752
753  public:
754   typedef T  value_type;
755   typedef T& reference;
756   typedef T* pointer;
757   typedef ptrdiff_t difference_type;
758
759   Skipper(const std::shared_ptr<SkipListType>& skipList) :
760     accessor_(skipList) {
761     init();
762   }
763
764   Skipper(const Accessor& accessor) : accessor_(accessor) {
765     init();
766   }
767
768   void init() {
769     // need to cache the head node
770     NodeType* head_node = head();
771     headHeight_ = head_node->height();
772     for (int i = 0; i < headHeight_; ++i) {
773       preds_[i] = head_node;
774       succs_[i] = head_node->skip(i);
775     }
776     int max_layer = maxLayer();
777     for (int i = 0; i < max_layer; ++i) {
778       hints_[i] = i + 1;
779     }
780     hints_[max_layer] = max_layer;
781   }
782
783   // advance to the next node in the list.
784   Skipper& operator ++() {
785     preds_[0] = succs_[0];
786     succs_[0] = preds_[0]->skip(0);
787     int height = curHeight();
788     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
789       preds_[i] = succs_[i];
790       succs_[i] = preds_[i]->skip(i);
791     }
792     return *this;
793   }
794
795   bool good() const { return succs_[0] != nullptr; }
796
797   int maxLayer() const { return headHeight_ - 1; }
798
799   int curHeight() const {
800     // need to cap the height to the cached head height, as the current node
801     // might be some newly inserted node and also during the time period the
802     // head height may have grown.
803     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
804   }
805
806   const value_type &data() const {
807     DCHECK(succs_[0] != NULL);
808     return succs_[0]->data();
809   }
810
811   value_type &operator *() const {
812     DCHECK(succs_[0] != NULL);
813     return succs_[0]->data();
814   }
815
816   value_type *operator->() {
817     DCHECK(succs_[0] != NULL);
818     return &succs_[0]->data();
819   }
820
821   /*
822    * Skip to the position whose data is no less than the parameter.
823    * (I.e. the lower_bound).
824    *
825    * Returns true if the data is found, false otherwise.
826    */
827   bool to(const value_type &data) {
828     int layer = curHeight() - 1;
829     if (layer < 0) return false;   // reaches the end of the list
830
831     int lyr = hints_[layer];
832     int max_layer = maxLayer();
833     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
834       ++lyr;
835     }
836     hints_[layer] = lyr;  // update the hint
837
838     int foundLayer = SkipListType::
839       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
840     if (foundLayer < 0) return false;
841
842     DCHECK(succs_[0] != NULL) << "lyr=" << lyr << "; max_layer=" << max_layer;
843     return !succs_[0]->markedForRemoval();
844   }
845
846  private:
847   NodeType* head() const {
848     return accessor_.skiplist()->head_.load(std::memory_order_consume);
849   }
850
851   Accessor accessor_;
852   int headHeight_;
853   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
854   uint8_t hints_[MAX_HEIGHT];
855 };
856
857 } // namespace folly
858
859 #endif  // FOLLY_CONCURRENT_SKIP_LIST_H_