fixing double destruction of CSL::data_type
[folly.git] / folly / ConcurrentSkipList.h
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 // @author: Xin Liu <xliux@fb.com>
18 //
19 // A concurrent skip list (CSL) implementation.
20 // Ref: http://www.cs.tau.ac.il/~shanir/nir-pubs-web/Papers/OPODIS2006-BA.pdf
21
22 /*
23
24 This implements a sorted associative container that supports only
25 unique keys.  (Similar to std::set.)
26
27 Features:
28
29   1. Small memory overhead: ~40% less memory overhead compared with
30      std::set (1.6 words per node versus 3). It has an minimum of 4
31      words (7 words if there nodes got deleted) per-list overhead
32      though.
33
34   2. Read accesses (count, find iterator, skipper) are lock-free and
35      mostly wait-free (the only wait a reader may need to do is when
36      the node it is visiting is in a pending stage, i.e. deleting,
37      adding and not fully linked).  Write accesses (remove, add) need
38      to acquire locks, but locks are local to the predecessor nodes
39      and/or successor nodes.
40
41   3. Good high contention performance, comparable single-thread
42      performance.  In the multithreaded case (12 workers), CSL tested
43      10x faster than a RWSpinLocked std::set for an averaged sized
44      list (1K - 1M nodes).
45
46      Comparable read performance to std::set when single threaded,
47      especially when the list size is large, and scales better to
48      larger lists: when the size is small, CSL can be 20-50% slower on
49      find()/contains().  As the size gets large (> 1M elements),
50      find()/contains() can be 30% faster.
51
52      Iterating through a skiplist is similar to iterating through a
53      linked list, thus is much (2-6x) faster than on a std::set
54      (tree-based).  This is especially true for short lists due to
55      better cache locality.  Based on that, it's also faster to
56      intersect two skiplists.
57
58   4. Lazy removal with GC support.  The removed nodes get deleted when
59      the last Accessor to the skiplist is destroyed.
60
61 Caveats:
62
63   1. Write operations are usually 30% slower than std::set in a single
64      threaded environment.
65
66   2. Need to have a head node for each list, which has a 4 word
67      overhead.
68
69   3. When the list is quite small (< 1000 elements), single threaded
70      benchmarks show CSL can be 10x slower than std:set.
71
72   4. The interface requires using an Accessor to access the skiplist.
73     (See below.)
74
75   5. Currently x64 only, due to use of MicroSpinLock.
76
77   6. Freed nodes will not be reclaimed as long as there are ongoing
78      uses of the list.
79
80 Sample usage:
81
82      typedef ConcurrentSkipList<int> SkipListT;
83      shared_ptr<SkipListT> sl(SkipListT::createInstance(init_head_height);
84      {
85        // It's usually good practice to hold an accessor only during
86        // its necessary life cycle (but not in a tight loop as
87        // Accessor creation incurs ref-counting overhead).
88        //
89        // Holding it longer delays garbage-collecting the deleted
90        // nodes in the list.
91        SkipListT::Accessor accessor(sl);
92        accessor.insert(23);
93        accessor.erase(2);
94        for (auto &elem : accessor) {
95          // use elem to access data
96        }
97        ... ...
98      }
99
100  Another useful type is the Skipper accessor.  This is useful if you
101  want to skip to locations in the way std::lower_bound() works,
102  i.e. it can be used for going through the list by skipping to the
103  node no less than a specified key.  The Skipper keeps its location as
104  state, which makes it convenient for things like implementing
105  intersection of two sets efficiently, as it can start from the last
106  visited position.
107
108      {
109        SkipListT::Accessor accessor(sl);
110        SkipListT::Skipper skipper(accessor);
111        skipper.to(30);
112        if (skipper) {
113          CHECK_LE(30, *skipper);
114        }
115        ...  ...
116        // GC may happen when the accessor gets destructed.
117      }
118 */
119
120 #ifndef FOLLY_CONCURRENT_SKIP_LIST_H_
121 #define FOLLY_CONCURRENT_SKIP_LIST_H_
122
123 #include <algorithm>
124 #include <climits>
125 #include <type_traits>
126 #include <utility>
127 #include <vector>
128 #include <atomic>
129 #include <thread>
130 #include <boost/iterator/iterator_facade.hpp>
131 #include <boost/scoped_ptr.hpp>
132 #include <boost/shared_ptr.hpp>
133
134 #include <glog/logging.h>
135 #include "folly/ConcurrentSkipList-inl.h"
136 #include "folly/Likely.h"
137 #include "folly/SmallLocks.h"
138
139 namespace folly {
140
141 template<typename T, typename Comp=std::less<T>, int MAX_HEIGHT=24>
142 class ConcurrentSkipList {
143   // MAX_HEIGHT needs to be at least 2 to suppress compiler
144   // warnings/errors (Werror=uninitialized tiggered due to preds_[1]
145   // being treated as a scalar in the compiler).
146   static_assert(MAX_HEIGHT >= 2 && MAX_HEIGHT < 64,
147       "MAX_HEIGHT can only be in the range of [2, 64)");
148   typedef std::unique_lock<folly::MicroSpinLock> ScopedLocker;
149   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
150
151  public:
152   typedef detail::SkipListNode<T> NodeType;
153   typedef T value_type;
154   typedef T key_type;
155
156
157   typedef detail::csl_iterator<value_type, NodeType> iterator;
158   typedef detail::csl_iterator<const value_type, const NodeType> const_iterator;
159
160   class Accessor;
161   class Skipper;
162
163   // convenient function to get an Accessor to a new instance.
164   static Accessor create(int height=1) {
165     return Accessor(createInstance(height));
166   }
167
168   // create a shared_ptr skiplist object with initial head height.
169   static boost::shared_ptr<SkipListType> createInstance(int height=1) {
170     return boost::shared_ptr<SkipListType>(new SkipListType(height));
171   }
172
173   // create a unique_ptr skiplist object with initial head height.
174   static std::unique_ptr<SkipListType> createRawInstance(int height=1) {
175     return std::unique_ptr<SkipListType>(new SkipListType(height));
176   }
177
178
179   //===================================================================
180   // Below are implementation details.
181   // Please see ConcurrentSkipList::Accessor for stdlib-like APIs.
182   //===================================================================
183
184   ~ConcurrentSkipList() {
185     LOG_IF(FATAL, recycler_.refs() > 0)
186       << "number of accessors is not 0, " << recycler_.refs() << " instead!"
187       << " This shouldn't have happened!";
188     while (NodeType* current = head_.load(std::memory_order_relaxed)) {
189       NodeType* tmp = current->skip(0);
190       NodeType::destroy(current);
191       head_.store(tmp, std::memory_order_relaxed);
192     }
193   }
194
195  private:
196
197   static bool greater(const value_type &data, const NodeType *node) {
198     return node && Comp()(node->data(), data);
199   }
200
201   static bool less(const value_type &data, const NodeType *node) {
202     return (node == nullptr) || Comp()(data, node->data());
203   }
204
205   static int findInsertionPoint(NodeType *cur, int cur_layer,
206       const value_type &data,
207       NodeType *preds[], NodeType *succs[]) {
208     int foundLayer = -1;
209     NodeType *pred = cur;
210     NodeType *foundNode = nullptr;
211     for (int layer = cur_layer; layer >= 0; --layer) {
212       NodeType *node = pred->skip(layer);
213       while (greater(data, node)) {
214         pred = node;
215         node = node->skip(layer);
216       }
217       if (foundLayer == -1 && !less(data, node)) { // the two keys equal
218         foundLayer = layer;
219         foundNode = node;
220       }
221       preds[layer] = pred;
222
223       // if found, succs[0..foundLayer] need to point to the cached foundNode,
224       // as foundNode might be deleted at the same time thus pred->skip() can
225       // return NULL or another node.
226       succs[layer] = foundNode ? foundNode : node;
227     }
228     return foundLayer;
229   }
230
231   struct Recycler : private boost::noncopyable {
232     Recycler() : refs_(0), dirty_(false) { lock_.init(); }
233
234     ~Recycler() {
235       if (nodes_) {
236         for (auto& node : *nodes_) {
237           NodeType::destroy(node);
238         }
239       }
240     }
241
242     void add(NodeType* node) {
243       std::lock_guard<MicroSpinLock> g(lock_);
244       if (nodes_.get() == nullptr) {
245         nodes_.reset(new std::vector<NodeType*>(1, node));
246       } else {
247         nodes_->push_back(node);
248       }
249       DCHECK_GT(refs(), 0);
250       dirty_.store(true, std::memory_order_relaxed);
251     }
252
253     int refs() const {
254       return refs_.load(std::memory_order_relaxed);
255     }
256
257     int addRef() {
258       return refs_.fetch_add(1, std::memory_order_relaxed);
259     }
260
261     int release() {
262       // We don't expect to clean the recycler immediately everytime it is OK
263       // to do so. Here, it is possible that multiple accessors all release at
264       // the same time but nobody would clean the recycler here. If this
265       // happens, the recycler will usually still get cleaned when
266       // such a race doesn't happen. The worst case is the recycler will
267       // eventually get deleted along with the skiplist.
268       if (LIKELY(!dirty_.load(std::memory_order_relaxed) || refs() > 1)) {
269         return refs_.fetch_add(-1, std::memory_order_relaxed);
270       }
271
272       boost::scoped_ptr<std::vector<NodeType*> > newNodes;
273       {
274         std::lock_guard<MicroSpinLock> g(lock_);
275         if (nodes_.get() == nullptr || refs() > 1) {
276           return refs_.fetch_add(-1, std::memory_order_relaxed);
277         }
278         // once refs_ reaches 1 and there is no other accessor, it is safe to
279         // remove all the current nodes in the recycler, as we already acquired
280         // the lock here so no more new nodes can be added, even though new
281         // accessors may be added after that.
282         newNodes.swap(nodes_);
283         dirty_.store(false, std::memory_order_relaxed);
284       }
285
286       // TODO(xliu) should we spawn a thread to do this when there are large
287       // number of nodes in the recycler?
288       for (auto& node : *newNodes) {
289         NodeType::destroy(node);
290       }
291
292       // decrease the ref count at the very end, to minimize the
293       // chance of other threads acquiring lock_ to clear the deleted
294       // nodes again.
295       return refs_.fetch_add(-1, std::memory_order_relaxed);
296     }
297
298    private:
299     boost::scoped_ptr<std::vector<NodeType*>> nodes_;
300     std::atomic<int32_t> refs_; // current number of visitors to the list
301     std::atomic<bool> dirty_; // whether *nodes_ is non-empty
302     MicroSpinLock lock_; // protects access to *nodes_
303   };  // class ConcurrentSkipList::Recycler
304
305   explicit ConcurrentSkipList(int height) :
306     head_(NodeType::create(height, value_type(), true)), size_(0) {}
307
308   size_t size() const { return size_.load(std::memory_order_relaxed); }
309   int height() const {
310     return head_.load(std::memory_order_consume)->height();
311   }
312   int maxLayer() const { return height() - 1; }
313
314   size_t incrementSize(int delta) {
315     return size_.fetch_add(delta, std::memory_order_relaxed) + delta;
316   }
317
318   // Returns the node if found, nullptr otherwise.
319   NodeType* find(const value_type &data) {
320     auto ret = findNode(data);
321     if (ret.second && !ret.first->markedForRemoval()) return ret.first;
322     return nullptr;
323   }
324
325   // lock all the necessary nodes for changing (adding or removing) the list.
326   // returns true if all the lock acquried successfully and the related nodes
327   // are all validate (not in certain pending states), false otherwise.
328   bool lockNodesForChange(int nodeHeight,
329       ScopedLocker guards[MAX_HEIGHT],
330       NodeType *preds[MAX_HEIGHT],
331       NodeType *succs[MAX_HEIGHT],
332       bool adding=true) {
333     NodeType *pred, *succ, *prevPred = nullptr;
334     bool valid = true;
335     for (int layer = 0; valid && layer < nodeHeight; ++layer) {
336       pred = preds[layer];
337       DCHECK(pred != nullptr) << "layer=" << layer << " height=" << height()
338         << " nodeheight=" << nodeHeight;
339       succ = succs[layer];
340       if (pred != prevPred) {
341         guards[layer] = pred->acquireGuard();
342         prevPred = pred;
343       }
344       valid = !pred->markedForRemoval() &&
345         pred->skip(layer) == succ;  // check again after locking
346
347       if (adding) {  // when adding a node, the succ shouldn't be going away
348         valid = valid && (succ == nullptr || !succ->markedForRemoval());
349       }
350     }
351
352     return valid;
353   }
354
355   // Returns a paired value:
356   //   pair.first always stores the pointer to the node with the same input key.
357   //     It could be either the newly added data, or the existed data in the
358   //     list with the same key.
359   //   pair.second stores whether the data is added successfully:
360   //     0 means not added, otherwise reutrns the new size.
361   template<typename U>
362   std::pair<NodeType*, size_t> addOrGetData(U &&data) {
363     NodeType *preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
364     NodeType *newNode;
365     size_t newSize;
366     while (true) {
367       int max_layer = 0;
368       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
369
370       if (layer >= 0) {
371         NodeType *nodeFound = succs[layer];
372         DCHECK(nodeFound != nullptr);
373         if (nodeFound->markedForRemoval()) {
374           continue;  // if it's getting deleted retry finding node.
375         }
376         // wait until fully linked.
377         while (UNLIKELY(!nodeFound->fullyLinked())) {}
378         return std::make_pair(nodeFound, 0);
379       }
380
381       // need to capped at the original height -- the real height may have grown
382       int nodeHeight = detail::SkipListRandomHeight::instance()->
383         getHeight(max_layer + 1);
384
385       ScopedLocker guards[MAX_HEIGHT];
386       if (!lockNodesForChange(nodeHeight, guards, preds, succs)) {
387         continue; // give up the locks and retry until all valid
388       }
389
390       // locks acquired and all valid, need to modify the links under the locks.
391       newNode = NodeType::create(nodeHeight, std::forward<U>(data));
392       for (int layer = 0; layer < nodeHeight; ++layer) {
393         newNode->setSkip(layer, succs[layer]);
394         preds[layer]->setSkip(layer, newNode);
395       }
396
397       newNode->setFullyLinked();
398       newSize = incrementSize(1);
399       break;
400     }
401
402     int hgt = height();
403     size_t sizeLimit =
404       detail::SkipListRandomHeight::instance()->getSizeLimit(hgt);
405
406     if (hgt < MAX_HEIGHT && newSize > sizeLimit) {
407       growHeight(hgt + 1);
408     }
409     CHECK_GT(newSize, 0);
410     return std::make_pair(newNode, newSize);
411   }
412
413   bool remove(const value_type &data) {
414     NodeType *nodeToDelete = nullptr;
415     ScopedLocker nodeGuard;
416     bool isMarked = false;
417     int nodeHeight = 0;
418     NodeType* preds[MAX_HEIGHT], *succs[MAX_HEIGHT];
419
420     while (true) {
421       int max_layer = 0;
422       int layer = findInsertionPointGetMaxLayer(data, preds, succs, &max_layer);
423       if (!isMarked && (layer < 0 || !okToDelete(succs[layer], layer))) {
424         return false;
425       }
426
427       if (!isMarked) {
428         nodeToDelete = succs[layer];
429         nodeHeight = nodeToDelete->height();
430         nodeGuard = nodeToDelete->acquireGuard();
431         if (nodeToDelete->markedForRemoval()) return false;
432         nodeToDelete->setMarkedForRemoval();
433         isMarked = true;
434       }
435
436       // acquire pred locks from bottom layer up
437       ScopedLocker guards[MAX_HEIGHT];
438       if (!lockNodesForChange(nodeHeight, guards, preds, succs, false)) {
439         continue;  // this will unlock all the locks
440       }
441
442       for (int layer = nodeHeight - 1; layer >= 0; --layer) {
443         preds[layer]->setSkip(layer, nodeToDelete->skip(layer));
444       }
445
446       incrementSize(-1);
447       break;
448     }
449     recycle(nodeToDelete);
450     return true;
451   }
452
453   const value_type *first() const {
454     auto node = head_.load(std::memory_order_consume)->skip(0);
455     return node ? &node->data() : nullptr;
456   }
457
458   const value_type *last() const {
459     NodeType *pred = head_.load(std::memory_order_consume);
460     NodeType *node = nullptr;
461     for (int layer = maxLayer(); layer >= 0; --layer) {
462       do {
463         node = pred->skip(layer);
464         if (node) pred = node;
465       } while (node != nullptr);
466     }
467     return pred == head_.load(std::memory_order_relaxed)
468       ? nullptr : &pred->data();
469   }
470
471   static bool okToDelete(NodeType *candidate, int layer) {
472     DCHECK(candidate != nullptr);
473     return candidate->fullyLinked() &&
474       candidate->maxLayer() == layer &&
475       !candidate->markedForRemoval();
476   }
477
478   // find node for insertion/deleting
479   int findInsertionPointGetMaxLayer(const value_type &data,
480       NodeType *preds[], NodeType *succs[], int *max_layer) const {
481     *max_layer = maxLayer();
482     return findInsertionPoint(head_.load(std::memory_order_consume),
483       *max_layer, data, preds, succs);
484   }
485
486   // Find node for access. Returns a paired values:
487   // pair.first = the first node that no-less than data value
488   // pair.second = 1 when the data value is founded, or 0 otherwise.
489   // This is like lower_bound, but not exact: we could have the node marked for
490   // removal so still need to check that.
491   std::pair<NodeType*, int> findNode(const value_type &data) const {
492     return findNodeDownRight(data);
493   }
494
495   // Find node by first stepping down then stepping right. Based on benchmark
496   // results, this is slightly faster than findNodeRightDown for better
497   // localality on the skipping pointers.
498   std::pair<NodeType*, int> findNodeDownRight(const value_type &data) const {
499     NodeType *pred = head_.load(std::memory_order_consume);
500     int ht = pred->height();
501     NodeType *node = nullptr;
502
503     bool found = false;
504     while (!found) {
505       // stepping down
506       for (; ht > 0 && less(data, pred->skip(ht - 1)); --ht) {}
507       if (ht == 0) return std::make_pair(pred->skip(0), 0);  // not found
508
509       node = pred->skip(--ht);  // node <= data now
510       // stepping right
511       while (greater(data, node)) {
512         pred = node;
513         node = node->skip(ht);
514       }
515       found = !less(data, node);
516     }
517     return std::make_pair(node, found);
518   }
519
520   // find node by first stepping right then stepping down.
521   // We still keep this for reference purposes.
522   std::pair<NodeType*, int> findNodeRightDown(const value_type &data) const {
523     NodeType *pred = head_.load(std::memory_order_consume);
524     NodeType *node = nullptr;
525     auto top = maxLayer();
526     int found = 0;
527     for (int layer = top; !found && layer >= 0; --layer) {
528       node = pred->skip(layer);
529       while (greater(data, node)) {
530         pred = node;
531         node = node->skip(layer);
532       }
533       found = !less(data, node);
534     }
535     return std::make_pair(node, found);
536   }
537
538   NodeType* lower_bound(const value_type &data) const {
539     auto node = findNode(data).first;
540     while (node != nullptr && node->markedForRemoval()) {
541       node = node->skip(0);
542     }
543     return node;
544   }
545
546   void growHeight(int height) {
547     NodeType* oldHead = head_.load(std::memory_order_consume);
548     if (oldHead->height() >= height) {  // someone else already did this
549       return;
550     }
551
552     NodeType* newHead = NodeType::create(height, value_type(), true);
553
554     { // need to guard the head node in case others are adding/removing
555       // nodes linked to the head.
556       ScopedLocker g = oldHead->acquireGuard();
557       newHead->copyHead(oldHead);
558       NodeType* expected = oldHead;
559       if (!head_.compare_exchange_strong(expected, newHead,
560           std::memory_order_release)) {
561         // if someone has already done the swap, just return.
562         NodeType::destroy(newHead);
563         return;
564       }
565       oldHead->setMarkedForRemoval();
566     }
567     recycle(oldHead);
568   }
569
570   void recycle(NodeType *node) {
571     recycler_.add(node);
572   }
573
574   std::atomic<NodeType*> head_;
575   Recycler recycler_;
576   std::atomic<size_t> size_;
577 };
578
579 template<typename T, typename Comp, int MAX_HEIGHT>
580 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Accessor {
581   typedef detail::SkipListNode<T> NodeType;
582   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
583  public:
584   typedef T value_type;
585   typedef T key_type;
586   typedef T& reference;
587   typedef T* pointer;
588   typedef const T& const_reference;
589   typedef const T* const_pointer;
590   typedef size_t size_type;
591   typedef Comp key_compare;
592   typedef Comp value_compare;
593
594   typedef typename SkipListType::iterator iterator;
595   typedef typename SkipListType::const_iterator const_iterator;
596   typedef typename SkipListType::Skipper Skipper;
597
598   explicit Accessor(boost::shared_ptr<ConcurrentSkipList> skip_list)
599     : slHolder_(std::move(skip_list))
600   {
601     sl_ = slHolder_.get();
602     DCHECK(sl_ != nullptr);
603     sl_->recycler_.addRef();
604   }
605
606   // Unsafe initializer: the caller assumes the responsibility to keep
607   // skip_list valid during the whole life cycle of the Acessor.
608   explicit Accessor(ConcurrentSkipList *skip_list) : sl_(skip_list) {
609     DCHECK(sl_ != nullptr);
610     sl_->recycler_.addRef();
611   }
612
613   Accessor(const Accessor &accessor) :
614       sl_(accessor.sl_),
615       slHolder_(accessor.slHolder_) {
616     sl_->recycler_.addRef();
617   }
618
619   Accessor& operator=(const Accessor &accessor) {
620     if (this != &accessor) {
621       slHolder_ = accessor.slHolder_;
622       sl_->recycler_.release();
623       sl_ = accessor.sl_;
624       sl_->recycler_.addRef();
625     }
626     return *this;
627   }
628
629   ~Accessor() {
630     sl_->recycler_.release();
631   }
632
633   bool empty() const { return sl_->size() == 0; }
634   size_t size() const { return sl_->size(); }
635   size_type max_size() const { return std::numeric_limits<size_type>::max(); }
636
637   // returns end() if the value is not in the list, otherwise returns an
638   // iterator pointing to the data, and it's guaranteed that the data is valid
639   // as far as the Accessor is hold.
640   iterator find(const key_type &value) { return iterator(sl_->find(value)); }
641   const_iterator find(const key_type &value) const {
642     return iterator(sl_->find(value));
643   }
644   size_type count(const key_type &data) const { return contains(data); }
645
646   iterator begin() const {
647     NodeType* head = sl_->head_.load(std::memory_order_consume);
648     return iterator(head->next());
649   }
650   iterator end() const { return iterator(nullptr); }
651   const_iterator cbegin() const { return begin(); }
652   const_iterator cend() const { return end(); }
653
654   template<typename U,
655     typename=typename std::enable_if<std::is_convertible<U, T>::value>::type>
656   std::pair<iterator, bool> insert(U&& data) {
657     auto ret = sl_->addOrGetData(std::forward<U>(data));
658     return std::make_pair(iterator(ret.first), ret.second);
659   }
660   size_t erase(const key_type &data) { return remove(data); }
661
662   iterator lower_bound(const key_type &data) const {
663     return iterator(sl_->lower_bound(data));
664   }
665
666   size_t height() const { return sl_->height(); }
667
668   // first() returns pointer to the first element in the skiplist, or
669   // nullptr if empty.
670   //
671   // last() returns the pointer to the last element in the skiplist,
672   // nullptr if list is empty.
673   //
674   // Note: As concurrent writing can happen, first() is not
675   //   guaranteed to be the min_element() in the list. Similarly
676   //   last() is not guaranteed to be the max_element(), and both of them can
677   //   be invalid (i.e. nullptr), so we name them differently from front() and
678   //   tail() here.
679   const key_type *first() const { return sl_->first(); }
680   const key_type *last() const { return sl_->last(); }
681
682   // Try to remove the last element in the skip list.
683   //
684   // Returns true if we removed it, false if either the list is empty
685   // or a race condition happened (i.e. the used-to-be last element
686   // was already removed by another thread).
687   bool pop_back() {
688     auto last = sl_->last();
689     return last ? sl_->remove(*last) : false;
690   }
691
692   std::pair<key_type*, bool> addOrGetData(const key_type &data) {
693     auto ret = sl_->addOrGetData(data);
694     return std::make_pair(&ret.first->data(), ret.second);
695   }
696
697   SkipListType* skiplist() const { return sl_; }
698
699   // legacy interfaces
700   // TODO:(xliu) remove these.
701   // Returns true if the node is added successfully, false if not, i.e. the
702   // node with the same key already existed in the list.
703   bool contains(const key_type &data) const { return sl_->find(data); }
704   bool add(const key_type &data) { return sl_->addOrGetData(data).second; }
705   bool remove(const key_type &data) { return sl_->remove(data); }
706
707  private:
708   SkipListType *sl_;
709   boost::shared_ptr<SkipListType> slHolder_;
710 };
711
712 // implements forward iterator concept.
713 template<typename ValT, typename NodeT>
714 class detail::csl_iterator :
715   public boost::iterator_facade<csl_iterator<ValT, NodeT>,
716     ValT, boost::forward_traversal_tag> {
717  public:
718   typedef ValT value_type;
719   typedef value_type& reference;
720   typedef value_type* pointer;
721   typedef ptrdiff_t difference_type;
722
723   explicit csl_iterator(NodeT* node = nullptr) : node_(node) {}
724
725   template<typename OtherVal, typename OtherNode>
726   csl_iterator(const csl_iterator<OtherVal, OtherNode> &other,
727       typename std::enable_if<std::is_convertible<OtherVal, ValT>::value>::type*
728       = 0) : node_(other.node_) {}
729
730   size_t nodeSize() const {
731     return node_ == nullptr ? 0 :
732       node_->height() * sizeof(NodeT*) + sizeof(*this);
733   }
734
735   bool good() const { return node_ != nullptr; }
736
737  private:
738   friend class boost::iterator_core_access;
739   template<class,class> friend class csl_iterator;
740
741   void increment() { node_ = node_->next(); };
742   bool equal(const csl_iterator& other) const { return node_ == other.node_; }
743   value_type& dereference() const { return node_->data(); }
744
745   NodeT* node_;
746 };
747
748 // Skipper interface
749 template<typename T, typename Comp, int MAX_HEIGHT>
750 class ConcurrentSkipList<T, Comp, MAX_HEIGHT>::Skipper {
751   typedef detail::SkipListNode<T> NodeType;
752   typedef ConcurrentSkipList<T, Comp, MAX_HEIGHT> SkipListType;
753   typedef typename SkipListType::Accessor Accessor;
754
755  public:
756   typedef T  value_type;
757   typedef T& reference;
758   typedef T* pointer;
759   typedef ptrdiff_t difference_type;
760
761   Skipper(const boost::shared_ptr<SkipListType>& skipList) :
762     accessor_(skipList) {
763     init();
764   }
765
766   Skipper(const Accessor& accessor) : accessor_(accessor) {
767     init();
768   }
769
770   void init() {
771     // need to cache the head node
772     NodeType* head_node = head();
773     headHeight_ = head_node->height();
774     for (int i = 0; i < headHeight_; ++i) {
775       preds_[i] = head_node;
776       succs_[i] = head_node->skip(i);
777     }
778     int max_layer = maxLayer();
779     for (int i = 0; i < max_layer; ++i) {
780       hints_[i] = i + 1;
781     }
782     hints_[max_layer] = max_layer;
783   }
784
785   // advance to the next node in the list.
786   Skipper& operator ++() {
787     preds_[0] = succs_[0];
788     succs_[0] = preds_[0]->skip(0);
789     int height = curHeight();
790     for (int i = 1; i < height && preds_[0] == succs_[i]; ++i) {
791       preds_[i] = succs_[i];
792       succs_[i] = preds_[i]->skip(i);
793     }
794     return *this;
795   }
796
797   bool good() const { return succs_[0] != nullptr; }
798
799   int maxLayer() const { return headHeight_ - 1; }
800
801   int curHeight() const {
802     // need to cap the height to the cached head height, as the current node
803     // might be some newly inserted node and also during the time period the
804     // head height may have grown.
805     return succs_[0] ? std::min(headHeight_, succs_[0]->height()) : 0;
806   }
807
808   const value_type &data() const {
809     DCHECK(succs_[0] != NULL);
810     return succs_[0]->data();
811   }
812
813   value_type &operator *() const {
814     DCHECK(succs_[0] != NULL);
815     return succs_[0]->data();
816   }
817
818   value_type *operator->() {
819     DCHECK(succs_[0] != NULL);
820     return &succs_[0]->data();
821   }
822
823   /*
824    * Skip to the position whose data is no less than the parameter.
825    * (I.e. the lower_bound).
826    *
827    * Returns true if the data is found, false otherwise.
828    */
829   bool to(const value_type &data) {
830     int layer = curHeight() - 1;
831     if (layer < 0) return false;   // reaches the end of the list
832
833     int lyr = hints_[layer];
834     int max_layer = maxLayer();
835     while (SkipListType::greater(data, succs_[lyr]) && lyr < max_layer) {
836       ++lyr;
837     }
838     hints_[layer] = lyr;  // update the hint
839
840     int foundLayer = SkipListType::
841       findInsertionPoint(preds_[lyr], lyr, data, preds_, succs_);
842     if (foundLayer < 0) return false;
843
844     DCHECK(succs_[0] != NULL) << "lyr=" << lyr << "; max_layer=" << max_layer;
845     return !succs_[0]->markedForRemoval();
846   }
847
848  private:
849   NodeType* head() const {
850     return accessor_.skiplist()->head_.load(std::memory_order_consume);
851   }
852
853   Accessor accessor_;
854   int headHeight_;
855   NodeType *succs_[MAX_HEIGHT], *preds_[MAX_HEIGHT];
856   uint8_t hints_[MAX_HEIGHT];
857 };
858
859 } // namespace folly
860
861 #endif  // FOLLY_CONCURRENT_SKIP_LIST_H_