update readme
[c11concurrency-benchmarks.git] / silo / btree.h
1 #pragma once
2
3 #include <assert.h>
4 #include <malloc.h>
5 #include <pthread.h>
6 #include <stdint.h>
7 #include <stdlib.h>
8 #include <string.h>
9
10 #include <iostream>
11 #include <string>
12 #include <vector>
13 #include <utility>
14 #include <atomic>
15 #include <thread>
16
17 #include "log2.hh"
18 #include "ndb_type_traits.h"
19 #include "varkey.h"
20 #include "counter.h"
21 #include "macros.h"
22 #include "prefetch.h"
23 #include "amd64.h"
24 #include "rcu.h"
25 #include "util.h"
26 #include "small_vector.h"
27 #include "ownership_checker.h"
28
29 namespace private_ {
30   template <typename T, typename P> struct u64manip;
31   template <typename P>
32   struct u64manip<uint64_t, P> {
33     static inline uint64_t Load(uint64_t t) { return t; }
34     static inline void Store(uint64_t &t, uint64_t v) { t = v; }
35     static inline uint64_t
36     Lock(uint64_t &t)
37     {
38 #ifdef CHECK_INVARIANTS
39       INVARIANT(!P::IsLocked(t));
40       t |= P::HDR_LOCKED_MASK;
41       return t;
42 #endif
43       return 0;
44     }
45     static inline uint64_t
46     LockWithSpinCount(uint64_t &t, unsigned &spins)
47     {
48       const uint64_t ret = Lock(t);
49       spins = 0;
50       return ret;
51     }
52     static inline void
53     Unlock(uint64_t &t)
54     {
55 #ifdef CHECK_INVARIANTS
56       INVARIANT(P::IsLocked(t));
57       t &= ~(P::HDR_LOCKED_MASK | P::HDR_MODIFYING_MASK);
58 #endif
59     }
60     static inline uint64_t
61     StableVersion(uint64_t t)
62     {
63       INVARIANT(!P::IsModifying(t));
64       return t;
65     }
66     static inline bool
67     CheckVersion(uint64_t t, uint64_t stablev)
68     {
69       INVARIANT(!P::IsModifying(stablev));
70       INVARIANT((t & ~P::HDR_LOCKED_MASK) == (stablev & ~P::HDR_LOCKED_MASK));
71       return true;
72     }
73   };
74   template <typename P>
75   struct u64manip<std::atomic<uint64_t>, P> {
76     static inline uint64_t
77     Load(const std::atomic<uint64_t> &t)
78     {
79       return t.load(std::memory_order_acquire);
80     }
81     static inline void
82     Store(std::atomic<uint64_t> &t, uint64_t v)
83     {
84       t.store(v, std::memory_order_release);
85     }
86     static inline uint64_t
87     Lock(std::atomic<uint64_t> &t)
88     {
89 #ifdef SPINLOCK_BACKOFF
90       uint64_t backoff_shift = 0;
91 #endif
92       uint64_t v = Load(t);
93       while ((v & P::HDR_LOCKED_MASK) ||
94              !t.compare_exchange_strong(v, v | P::HDR_LOCKED_MASK)) {
95 #ifdef SPINLOCK_BACKOFF
96         if (backoff_shift < 63)
97           backoff_shift++;
98         uint64_t spins = (1UL << backoff_shift) * BACKOFF_SPINS_FACTOR;
99         while (spins) {
100           nop_pause();
101           spins--;
102         }
103 #else
104         nop_pause();
105 #endif
106         v = Load(t);
107       }
108       COMPILER_MEMORY_FENCE;
109       return v;
110     }
111     static inline uint64_t
112     LockWithSpinCount(std::atomic<uint64_t> &t, unsigned &spins)
113     {
114 #ifdef SPINLOCK_BACKOFF
115       uint64_t backoff_shift = 0;
116 #endif
117       spins = 0;
118       uint64_t v = Load(t);
119       while ((v & P::HDR_LOCKED_MASK) ||
120              !t.compare_exchange_strong(v, v | P::HDR_LOCKED_MASK)) {
121 #ifdef SPINLOCK_BACKOFF
122         if (backoff_shift < 63)
123           backoff_shift++;
124         uint64_t backoff_spins = (1UL << backoff_shift) * BACKOFF_SPINS_FACTOR;
125         while (backoff_spins) {
126           nop_pause();
127           backoff_spins--;
128         }
129 #else
130         nop_pause();
131 #endif
132         v = Load(t);
133         spins++;
134       }
135       COMPILER_MEMORY_FENCE;
136       return v;
137     }
138     static inline void
139     Unlock(std::atomic<uint64_t> &v)
140     {
141       INVARIANT(P::IsLocked(v));
142       const uint64_t oldh = Load(v);
143       uint64_t h = oldh;
144       bool newv = false;
145       if ((h & P::HDR_MODIFYING_MASK) ||
146           (h & P::HDR_DELETING_MASK)) {
147         newv = true;
148         const uint64_t n = (h & P::HDR_VERSION_MASK) >> P::HDR_VERSION_SHIFT;
149         h &= ~P::HDR_VERSION_MASK;
150         h |= (((n + 1) << P::HDR_VERSION_SHIFT) & P::HDR_VERSION_MASK);
151       }
152       // clear locked + modifying bits
153       h &= ~(P::HDR_LOCKED_MASK | P::HDR_MODIFYING_MASK);
154       if (newv)
155         INVARIANT(!CheckVersion(oldh, h));
156       INVARIANT(!(h & P::HDR_LOCKED_MASK));
157       INVARIANT(!(h & P::HDR_MODIFYING_MASK));
158       COMPILER_MEMORY_FENCE;
159       Store(v, h);
160     }
161     static inline uint64_t
162     StableVersion(const std::atomic<uint64_t> &t)
163     {
164       uint64_t v = Load(t);
165       while ((v & P::HDR_MODIFYING_MASK)) {
166         nop_pause();
167         v = Load(t);
168       }
169       COMPILER_MEMORY_FENCE;
170       return v;
171     }
172     static inline bool
173     CheckVersion(uint64_t t, uint64_t stablev)
174     {
175       INVARIANT(!(stablev & P::HDR_MODIFYING_MASK));
176       COMPILER_MEMORY_FENCE;
177       return (t & ~P::HDR_LOCKED_MASK) ==
178              (stablev & ~P::HDR_LOCKED_MASK);
179     }
180   };
181 }
182
183 /**
184  * manipulates a btree version
185  *
186  * hdr bits: layout is (actual bytes depend on the NKeysPerNode parameter)
187  *
188  * <-- low bits
189  * [type | key_slots_used | locked | is_root | modifying | deleting | version ]
190  * [0:1  | 1:5            | 5:6    | 6:7     | 7:8       | 8:9      | 9:64    ]
191  *
192  * bit invariants:
193  *   1) modifying => locked
194  *   2) deleting  => locked
195  *
196  * WARNING: the correctness of our concurrency scheme relies on being able
197  * to do a memory reads/writes from/to hdr atomically. x86 architectures
198  * guarantee that aligned writes are atomic (see intel spec)
199  */
200 template <typename VersionType, unsigned NKeysPerNode>
201 class btree_version_manip {
202
203   typedef
204     typename private_::typeutil<VersionType>::func_param_type
205     LoadVersionType;
206
207   typedef
208     private_::u64manip<
209       VersionType,
210       btree_version_manip<VersionType, NKeysPerNode>>
211     U64Manip;
212
213   static inline constexpr uint64_t
214   LowMask(uint64_t nbits)
215   {
216     return (1UL << nbits) - 1UL;
217   }
218
219 public:
220
221   static const uint64_t HDR_TYPE_BITS = 1;
222   static const uint64_t HDR_TYPE_MASK = 0x1;
223
224   static const uint64_t HDR_KEY_SLOTS_SHIFT = HDR_TYPE_BITS;
225   static const uint64_t HDR_KEY_SLOTS_BITS = ceil_log2_const(NKeysPerNode);
226   static const uint64_t HDR_KEY_SLOTS_MASK = LowMask(HDR_KEY_SLOTS_BITS) << HDR_KEY_SLOTS_SHIFT;
227
228   static const uint64_t HDR_LOCKED_SHIFT = HDR_KEY_SLOTS_SHIFT + HDR_KEY_SLOTS_BITS;
229   static const uint64_t HDR_LOCKED_BITS = 1;
230   static const uint64_t HDR_LOCKED_MASK = LowMask(HDR_LOCKED_BITS) << HDR_LOCKED_SHIFT;
231
232   static const uint64_t HDR_IS_ROOT_SHIFT = HDR_LOCKED_SHIFT + HDR_LOCKED_BITS;
233   static const uint64_t HDR_IS_ROOT_BITS = 1;
234   static const uint64_t HDR_IS_ROOT_MASK = LowMask(HDR_IS_ROOT_BITS) << HDR_IS_ROOT_SHIFT;
235
236   static const uint64_t HDR_MODIFYING_SHIFT = HDR_IS_ROOT_SHIFT + HDR_IS_ROOT_BITS;
237   static const uint64_t HDR_MODIFYING_BITS = 1;
238   static const uint64_t HDR_MODIFYING_MASK = LowMask(HDR_MODIFYING_BITS) << HDR_MODIFYING_SHIFT;
239
240   static const uint64_t HDR_DELETING_SHIFT = HDR_MODIFYING_SHIFT + HDR_MODIFYING_BITS;
241   static const uint64_t HDR_DELETING_BITS = 1;
242   static const uint64_t HDR_DELETING_MASK = LowMask(HDR_DELETING_BITS) << HDR_DELETING_SHIFT;
243
244   static const uint64_t HDR_VERSION_SHIFT = HDR_DELETING_SHIFT + HDR_DELETING_BITS;
245   static const uint64_t HDR_VERSION_MASK = ((uint64_t)-1) << HDR_VERSION_SHIFT;
246
247   // sanity checks
248   static_assert(NKeysPerNode >= 1, "XX");
249
250   static_assert(std::numeric_limits<uint64_t>::max() == (
251       HDR_TYPE_MASK |
252       HDR_KEY_SLOTS_MASK |
253       HDR_LOCKED_MASK |
254       HDR_IS_ROOT_MASK |
255       HDR_MODIFYING_MASK |
256       HDR_DELETING_MASK |
257       HDR_VERSION_MASK
258         ), "XX");
259
260   static_assert( !(HDR_TYPE_MASK & HDR_KEY_SLOTS_MASK) , "XX");
261   static_assert( !(HDR_KEY_SLOTS_MASK & HDR_LOCKED_MASK) , "XX");
262   static_assert( !(HDR_LOCKED_MASK & HDR_IS_ROOT_MASK) , "XX");
263   static_assert( !(HDR_IS_ROOT_MASK & HDR_MODIFYING_MASK) , "XX");
264   static_assert( !(HDR_MODIFYING_MASK & HDR_DELETING_MASK) , "XX");
265   static_assert( !(HDR_DELETING_MASK & HDR_VERSION_MASK) , "XX");
266
267   // low level ops
268
269   static inline uint64_t
270   Load(LoadVersionType v)
271   {
272     return U64Manip::Load(v);
273   }
274
275   static inline void
276   Store(VersionType &t, uint64_t v)
277   {
278     U64Manip::Store(t, v);
279   }
280
281   // accessors
282
283   static inline bool
284   IsLeafNode(LoadVersionType v)
285   {
286     return (Load(v) & HDR_TYPE_MASK) == 0;
287   }
288
289   static inline bool
290   IsInternalNode(LoadVersionType v)
291   {
292     return !IsLeafNode(v);
293   }
294
295   static inline size_t
296   KeySlotsUsed(LoadVersionType v)
297   {
298     return (Load(v) & HDR_KEY_SLOTS_MASK) >> HDR_KEY_SLOTS_SHIFT;
299   }
300
301   static inline bool
302   IsLocked(LoadVersionType v)
303   {
304     return (Load(v) & HDR_LOCKED_MASK);
305   }
306
307   static inline bool
308   IsRoot(LoadVersionType v)
309   {
310     return (Load(v) & HDR_IS_ROOT_MASK);
311   }
312
313   static inline bool
314   IsModifying(LoadVersionType v)
315   {
316     return (Load(v) & HDR_MODIFYING_MASK);
317   }
318
319   static inline bool
320   IsDeleting(LoadVersionType v)
321   {
322     return (Load(v) & HDR_DELETING_MASK);
323   }
324
325   static inline uint64_t
326   Version(LoadVersionType v)
327   {
328     return (Load(v) & HDR_VERSION_MASK) >> HDR_VERSION_SHIFT;
329   }
330
331   static std::string
332   VersionInfoStr(LoadVersionType v)
333   {
334     std::ostringstream buf;
335     buf << "[";
336
337     if (IsLeafNode(v))
338       buf << "LEAF";
339     else
340       buf << "INT";
341     buf << " | ";
342
343     buf << KeySlotsUsed(v) << " | ";
344
345     if (IsLocked(v))
346       buf << "LOCKED";
347     else
348       buf << "-";
349     buf << " | ";
350
351     if (IsRoot(v))
352       buf << "ROOT";
353     else
354       buf << "-";
355     buf << " | ";
356
357     if (IsModifying(v))
358       buf << "MOD";
359     else
360       buf << "-";
361     buf << " | ";
362
363     if (IsDeleting(v))
364       buf << "DEL";
365     else
366       buf << "-";
367     buf << " | ";
368
369     buf << Version(v);
370
371     buf << "]";
372     return buf.str();
373   }
374
375   // mutators
376
377   static inline void
378   SetKeySlotsUsed(VersionType &v, size_t n)
379   {
380     INVARIANT(n <= NKeysPerNode);
381     INVARIANT(IsModifying(v));
382     uint64_t h = Load(v);
383     h &= ~HDR_KEY_SLOTS_MASK;
384     h |= (n << HDR_KEY_SLOTS_SHIFT);
385     Store(v, h);
386   }
387
388   static inline void
389   IncKeySlotsUsed(VersionType &v)
390   {
391     SetKeySlotsUsed(v, KeySlotsUsed(v) + 1);
392   }
393
394   static inline void
395   DecKeySlotsUsed(VersionType &v)
396   {
397     INVARIANT(KeySlotsUsed(v) > 0);
398     SetKeySlotsUsed(v, KeySlotsUsed(v) - 1);
399   }
400
401   static inline void
402   SetRoot(VersionType &v)
403   {
404     INVARIANT(IsLocked(v));
405     INVARIANT(!IsRoot(v));
406     uint64_t h = Load(v);
407     h |= HDR_IS_ROOT_MASK;
408     Store(v, h);
409   }
410
411   static inline void
412   ClearRoot(VersionType &v)
413   {
414     INVARIANT(IsLocked(v));
415     INVARIANT(IsRoot(v));
416     uint64_t h = Load(v);
417     h &= ~HDR_IS_ROOT_MASK;
418     Store(v, h);
419   }
420
421   static inline void
422   MarkModifying(VersionType &v)
423   {
424     INVARIANT(IsLocked(v));
425     INVARIANT(!IsModifying(v));
426     uint64_t h = Load(v);
427     h |= HDR_MODIFYING_MASK;
428     Store(v, h);
429   }
430
431   static inline void
432   MarkDeleting(VersionType &v)
433   {
434     INVARIANT(IsLocked(v));
435     INVARIANT(!IsDeleting(v));
436     uint64_t h = Load(v);
437     h |= HDR_DELETING_MASK;
438     Store(v, h);
439   }
440
441   // concurrency control
442
443   static inline uint64_t
444   StableVersion(LoadVersionType v)
445   {
446     return U64Manip::StableVersion(v);
447   }
448
449   static inline uint64_t
450   UnstableVersion(LoadVersionType v)
451   {
452     return Load(v);
453   }
454
455   static inline bool
456   CheckVersion(LoadVersionType v, uint64_t stablev)
457   {
458     return U64Manip::CheckVersion(Load(v), stablev);
459   }
460
461   static inline uint64_t
462   Lock(VersionType &v)
463   {
464     return U64Manip::Lock(v);
465   }
466
467   static inline uint64_t
468   LockWithSpinCount(VersionType &v, unsigned &spins)
469   {
470     return U64Manip::LockWithSpinCount(v, spins);
471   }
472
473   static inline void
474   Unlock(VersionType &v)
475   {
476     U64Manip::Unlock(v);
477   }
478 };
479
480 struct base_btree_config {
481   static const unsigned int NKeysPerNode = 15;
482   static const bool RcuRespCaller = true;
483 };
484
485 struct concurrent_btree_traits : public base_btree_config {
486   typedef std::atomic<uint64_t> VersionType;
487 };
488
489 struct single_threaded_btree_traits : public base_btree_config {
490   typedef uint64_t VersionType;
491 };
492
493 /**
494  * A concurrent, variable key length b+-tree, optimized for read heavy
495  * workloads.
496  *
497  * This b+-tree maps uninterpreted binary strings (key_type) of arbitrary
498  * length to a single pointer (value_type). Binary string values are copied
499  * into the b+-tree, so the caller does not have to worry about preserving
500  * memory for key values.
501  *
502  * This b+-tree does not manage the memory pointed to by value_type. The
503  * pointer is treated completely opaquely.
504  *
505  * So far, this b+-tree has only been tested on 64-bit intel x86 processors.
506  * It's correctness, as it is implemented (not conceptually), requires the
507  * semantics of total store order (TSO) for correctness. To fix this, we would
508  * change compiler fences into actual memory fences, at the very least.
509  */
510 template <typename P>
511 class btree {
512   template <template <typename> class, typename>
513     friend class base_txn_btree;
514 public:
515   typedef varkey key_type;
516   typedef std::string string_type;
517   typedef uint64_t key_slice;
518   typedef uint8_t* value_type;
519   typedef typename std::conditional<!P::RcuRespCaller,
520       scoped_rcu_region,
521       disabled_rcu_region>::type rcu_region;
522
523   // public to assist in testing
524   static const unsigned int NKeysPerNode    = P::NKeysPerNode;
525   static const unsigned int NMinKeysPerNode = P::NKeysPerNode / 2;
526
527 private:
528
529   typedef std::pair<ssize_t, size_t> key_search_ret;
530
531   typedef
532     btree_version_manip<typename P::VersionType, NKeysPerNode>
533     VersionManip;
534   typedef
535     btree_version_manip<uint64_t, NKeysPerNode>
536     RawVersionManip;
537
538   struct node {
539
540     typename P::VersionType hdr_;
541
542 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
543     std::thread::id lock_owner_;
544 #endif /* BTREE_LOCK_OWNERSHIP_CHECKING */
545
546     /**
547      * Keys are assumed to be stored in contiguous sorted order, so that all
548      * the used slots are grouped together. That is, elems in positions
549      * [0, key_slots_used) are valid, and elems in positions
550      * [key_slots_used, NKeysPerNode) are empty
551      */
552     key_slice keys_[NKeysPerNode];
553
554     node() :
555       hdr_()
556 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
557       , lock_owner_()
558 #endif
559     {}
560     ~node()
561     {
562       INVARIANT(!is_locked());
563       INVARIANT(is_deleting());
564     }
565
566     inline bool
567     is_leaf_node() const
568     {
569       return VersionManip::IsLeafNode(hdr_);
570     }
571
572     inline bool
573     is_internal_node() const
574     {
575       return !is_leaf_node();
576     }
577
578     inline size_t
579     key_slots_used() const
580     {
581       return VersionManip::KeySlotsUsed(hdr_);
582     }
583
584     inline void
585     set_key_slots_used(size_t n)
586     {
587       VersionManip::SetKeySlotsUsed(hdr_, n);
588     }
589
590     inline void
591     inc_key_slots_used()
592     {
593       VersionManip::IncKeySlotsUsed(hdr_);
594     }
595
596     inline void
597     dec_key_slots_used()
598     {
599       VersionManip::DecKeySlotsUsed(hdr_);
600     }
601
602     inline bool
603     is_locked() const
604     {
605       return VersionManip::IsLocked(hdr_);
606     }
607
608 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
609     inline bool
610     is_lock_owner() const
611     {
612       return std::this_thread::get_id() == lock_owner_;
613     }
614 #else
615     inline bool
616     is_lock_owner() const
617     {
618       return true;
619     }
620 #endif /* BTREE_LOCK_OWNERSHIP_CHECKING */
621
622     inline uint64_t
623     lock()
624     {
625 #ifdef ENABLE_EVENT_COUNTERS
626       static event_avg_counter
627         evt_avg_btree_leaf_node_lock_acquire_spins(
628             util::cxx_typename<btree<P>>::value() +
629             std::string("_avg_btree_leaf_node_lock_acquire_spins"));
630       static event_avg_counter
631         evt_avg_btree_internal_node_lock_acquire_spins(
632             util::cxx_typename<btree<P>>::value() +
633             std::string("_avg_btree_internal_node_lock_acquire_spins"));
634       unsigned spins;
635       const uint64_t ret = VersionManip::LockWithSpinCount(hdr_, spins);
636       if (is_leaf_node())
637         evt_avg_btree_leaf_node_lock_acquire_spins.offer(spins);
638       else
639         evt_avg_btree_internal_node_lock_acquire_spins.offer(spins);
640 #else
641       const uint64_t ret = VersionManip::Lock(hdr_);
642 #endif
643 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
644       lock_owner_ = std::this_thread::get_id();
645       AddNodeToLockRegion(this);
646       INVARIANT(is_lock_owner());
647 #endif
648       return ret;
649     }
650
651     inline void
652     unlock()
653     {
654 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
655       lock_owner_ = std::thread::id();
656       INVARIANT(!is_lock_owner());
657 #endif
658       VersionManip::Unlock(hdr_);
659     }
660
661     inline bool
662     is_root() const
663     {
664       return VersionManip::IsRoot(hdr_);
665     }
666
667     inline void
668     set_root()
669     {
670       VersionManip::SetRoot(hdr_);
671     }
672
673     inline void
674     clear_root()
675     {
676       VersionManip::ClearRoot(hdr_);
677     }
678
679     inline bool
680     is_modifying() const
681     {
682       return VersionManip::IsModifying(hdr_);
683     }
684
685     inline void
686     mark_modifying()
687     {
688       VersionManip::MarkModifying(hdr_);
689     }
690
691     inline bool
692     is_deleting() const
693     {
694       return VersionManip::IsDeleting(hdr_);
695     }
696
697     inline void
698     mark_deleting()
699     {
700       VersionManip::MarkDeleting(hdr_);
701     }
702
703     inline uint64_t
704     unstable_version() const
705     {
706       return VersionManip::UnstableVersion(hdr_);
707     }
708
709     /**
710      * spin until we get a version which is not modifying (but can be locked)
711      */
712     inline uint64_t
713     stable_version() const
714     {
715       return VersionManip::StableVersion(hdr_);
716     }
717
718     inline bool
719     check_version(uint64_t version) const
720     {
721       return VersionManip::CheckVersion(hdr_, version);
722     }
723
724     inline std::string
725     version_info_str() const
726     {
727       return VersionManip::VersionInfoStr(hdr_);
728     }
729
730     void base_invariant_unique_keys_check() const;
731
732     // [min_key, max_key)
733     void
734     base_invariant_checker(const key_slice *min_key,
735                            const key_slice *max_key,
736                            bool is_root) const;
737
738     /** manually simulated virtual function (so we don't make node virtual) */
739     void
740     invariant_checker(const key_slice *min_key,
741                       const key_slice *max_key,
742                       const node *left_sibling,
743                       const node *right_sibling,
744                       bool is_root) const;
745
746     /** another manually simulated virtual function */
747     inline void prefetch() const;
748   };
749
750   struct leaf_node : public node {
751     union value_or_node_ptr {
752       value_type v_;
753       node *n_;
754     };
755
756     key_slice min_key_; // really is min_key's key slice
757     value_or_node_ptr values_[NKeysPerNode];
758
759     // format is:
760     // [ slice_length | type | unused ]
761     // [    0:4       |  4:5 |  5:8   ]
762     uint8_t lengths_[NKeysPerNode];
763
764     leaf_node *prev_;
765     leaf_node *next_;
766
767     // starts out empty- once set, doesn't get freed until dtor (even if all
768     // keys w/ suffixes get removed)
769     imstring *suffixes_;
770
771     inline ALWAYS_INLINE varkey
772     suffix(size_t i) const
773     {
774       return suffixes_ ? varkey(suffixes_[i]) : varkey();
775     }
776
777     //static event_counter g_evt_suffixes_array_created;
778
779     inline void
780     alloc_suffixes()
781     {
782       INVARIANT(this->is_modifying());
783       INVARIANT(!suffixes_);
784       suffixes_ = new imstring[NKeysPerNode];
785       //++g_evt_suffixes_array_created;
786     }
787
788     inline void
789     ensure_suffixes()
790     {
791       INVARIANT(this->is_modifying());
792       if (!suffixes_)
793         alloc_suffixes();
794     }
795
796     leaf_node();
797     ~leaf_node();
798
799     static const uint64_t LEN_LEN_MASK = 0xf;
800
801     static const uint64_t LEN_TYPE_SHIFT = 4;
802     static const uint64_t LEN_TYPE_MASK = 0x1 << LEN_TYPE_SHIFT;
803
804     inline void
805     prefetch() const
806     {
807 #ifdef BTREE_NODE_PREFETCH
808       prefetch_object(this);
809 #endif
810     }
811
812     inline size_t
813     keyslice_length(size_t n) const
814     {
815       INVARIANT(n < NKeysPerNode);
816       return lengths_[n] & LEN_LEN_MASK;
817     }
818
819     inline void
820     keyslice_set_length(size_t n, size_t len, bool layer)
821     {
822       INVARIANT(n < NKeysPerNode);
823       INVARIANT(this->is_modifying());
824       INVARIANT(len <= 9);
825       INVARIANT(!layer || len == 9);
826       lengths_[n] = (len | (layer ? LEN_TYPE_MASK : 0));
827     }
828
829     inline bool
830     value_is_layer(size_t n) const
831     {
832       INVARIANT(n < NKeysPerNode);
833       return lengths_[n] & LEN_TYPE_MASK;
834     }
835
836     inline void
837     value_set_layer(size_t n)
838     {
839       INVARIANT(n < NKeysPerNode);
840       INVARIANT(this->is_modifying());
841       INVARIANT(keyslice_length(n) == 9);
842       INVARIANT(!value_is_layer(n));
843       lengths_[n] |= LEN_TYPE_MASK;
844     }
845
846     /**
847      * keys[key_search(k).first] == k if key_search(k).first != -1
848      * key does not exist otherwise. considers key length also
849      */
850     inline key_search_ret
851     key_search(key_slice k, size_t len) const
852     {
853       size_t n = this->key_slots_used();
854       ssize_t lower = 0;
855       ssize_t upper = n;
856       while (lower < upper) {
857         ssize_t i = (lower + upper) / 2;
858         key_slice k0 = this->keys_[i];
859         size_t len0 = this->keyslice_length(i);
860         if (k0 < k || (k0 == k && len0 < len))
861           lower = i + 1;
862         else if (k0 == k && len0 == len)
863           return key_search_ret(i, n);
864         else
865           upper = i;
866       }
867       return key_search_ret(-1, n);
868     }
869
870     /**
871      * tightest lower bound key, -1 if no such key exists. operates only
872      * on key slices (internal nodes have unique key slices)
873      */
874     inline key_search_ret
875     key_lower_bound_search(key_slice k, size_t len) const
876     {
877       ssize_t ret = -1;
878       size_t n = this->key_slots_used();
879       ssize_t lower = 0;
880       ssize_t upper = n;
881       while (lower < upper) {
882         ssize_t i = (lower + upper) / 2;
883         key_slice k0 = this->keys_[i];
884         size_t len0 = this->keyslice_length(i);
885         if (k0 < k || (k0 == k && len0 < len)) {
886           ret = i;
887           lower = i + 1;
888         } else if (k0 == k && len0 == len) {
889           return key_search_ret(i, n);
890         } else {
891           upper = i;
892         }
893       }
894       return key_search_ret(ret, n);
895     }
896
897     void
898     invariant_checker_impl(const key_slice *min_key,
899                            const key_slice *max_key,
900                            const node *left_sibling,
901                            const node *right_sibling,
902                            bool is_root) const;
903
904     static inline leaf_node*
905     alloc()
906     {
907       void * const p = rcu::s_instance.alloc(LeafNodeAllocSize);
908       INVARIANT(p);
909       return new (p) leaf_node;
910     }
911
912     static void
913     deleter(void *p)
914     {
915       leaf_node *n = (leaf_node *) p;
916       INVARIANT(n->is_deleting());
917       INVARIANT(!n->is_locked());
918       n->~leaf_node();
919       rcu::s_instance.dealloc(p, LeafNodeAllocSize);
920     }
921
922     static inline void
923     release(leaf_node *n)
924     {
925       if (unlikely(!n))
926         return;
927       n->mark_deleting();
928       rcu::s_instance.free_with_fn(n, deleter);
929     }
930
931   };
932
933   struct internal_node : public node {
934     /**
935      * child at position child_idx is responsible for keys
936      * [keys[child_idx - 1], keys[child_idx])
937      *
938      * in the case where child_idx == 0 or child_idx == this->key_slots_used(), then
939      * the responsiblity value of the min/max key, respectively, is determined
940      * by the parent
941      */
942     node *children_[NKeysPerNode + 1];
943
944     internal_node();
945     ~internal_node();
946
947     inline void
948     prefetch() const
949     {
950 #ifdef BTREE_NODE_PREFETCH
951       prefetch_object(this);
952 #endif
953     }
954
955     /**
956      * keys[key_search(k).first] == k if key_search(k).first != -1
957      * key does not exist otherwise. operates ony on key slices
958      * (internal nodes have unique key slices)
959      */
960     inline key_search_ret
961     key_search(key_slice k) const
962     {
963       size_t n = this->key_slots_used();
964       ssize_t lower = 0;
965       ssize_t upper = n;
966       while (lower < upper) {
967         ssize_t i = (lower + upper) / 2;
968         key_slice k0 = this->keys_[i];
969         if (k0 == k)
970           return key_search_ret(i, n);
971         else if (k0 > k)
972           upper = i;
973         else
974           lower = i + 1;
975       }
976       return key_search_ret(-1, n);
977     }
978
979     /**
980      * tightest lower bound key, -1 if no such key exists. operates only
981      * on key slices (internal nodes have unique key slices)
982      */
983     inline key_search_ret
984     key_lower_bound_search(key_slice k) const
985     {
986       ssize_t ret = -1;
987       size_t n = this->key_slots_used();
988       ssize_t lower = 0;
989       ssize_t upper = n;
990       while (lower < upper) {
991         ssize_t i = (lower + upper) / 2;
992         key_slice k0 = this->keys_[i];
993         if (k0 == k)
994           return key_search_ret(i, n);
995         else if (k0 > k)
996           upper = i;
997         else {
998           ret = i;
999           lower = i + 1;
1000         }
1001       }
1002       return key_search_ret(ret, n);
1003     }
1004
1005     void
1006     invariant_checker_impl(const key_slice *min_key,
1007                            const key_slice *max_key,
1008                            const node *left_sibling,
1009                            const node *right_sibling,
1010                            bool is_root) const;
1011
1012     // XXX: alloc(), deleter(), and release() are copied from leaf_node-
1013     // we should templatize them to avoid code duplication
1014
1015     static inline internal_node*
1016     alloc()
1017     {
1018       void * const p = rcu::s_instance.alloc(InternalNodeAllocSize);
1019       INVARIANT(p);
1020       return new (p) internal_node;
1021     }
1022
1023     static void
1024     deleter(void *p)
1025     {
1026       internal_node *n = (internal_node *) p;
1027       INVARIANT(n->is_deleting());
1028       INVARIANT(!n->is_locked());
1029       n->~internal_node();
1030       rcu::s_instance.dealloc(p, InternalNodeAllocSize);
1031     }
1032
1033     static inline void
1034     release(internal_node *n)
1035     {
1036       if (unlikely(!n))
1037         return;
1038       n->mark_deleting();
1039       rcu::s_instance.free_with_fn(n, deleter);
1040     }
1041
1042   } PACKED;
1043
1044 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
1045 public:
1046   static inline void
1047   NodeLockRegionBegin()
1048   {
1049     ownership_checker<btree<P>, typename btree<P>::node>::NodeLockRegionBegin();
1050   }
1051   static inline void
1052   AssertAllNodeLocksReleased()
1053   {
1054     ownership_checker<btree<P>, typename btree<P>::node>::AssertAllNodeLocksReleased();
1055   }
1056 private:
1057   static inline void
1058   AddNodeToLockRegion(const node *n)
1059   {
1060     ownership_checker<btree<P>, typename btree<P>::node>::AddNodeToLockRegion(n);
1061   }
1062 #endif
1063
1064 #ifdef BTREE_NODE_ALLOC_CACHE_ALIGNED
1065   static const size_t LeafNodeAllocSize = util::round_up<size_t, LG_CACHELINE_SIZE>(sizeof(leaf_node));
1066   static const size_t InternalNodeAllocSize = util::round_up<size_t, LG_CACHELINE_SIZE>(sizeof(internal_node));
1067 #else
1068   static const size_t LeafNodeAllocSize = sizeof(leaf_node);
1069   static const size_t InternalNodeAllocSize = sizeof(internal_node);
1070 #endif
1071
1072   static inline leaf_node*
1073   AsLeaf(node *n)
1074   {
1075     INVARIANT(!n || n->is_leaf_node());
1076     return static_cast<leaf_node *>(n);
1077   }
1078
1079   static inline const leaf_node*
1080   AsLeaf(const node *n)
1081   {
1082     return AsLeaf(const_cast<node *>(n));
1083   }
1084
1085   static inline internal_node*
1086   AsInternal(node *n)
1087   {
1088     INVARIANT(!n || n->is_internal_node());
1089     return static_cast<internal_node *>(n);
1090   }
1091
1092   static inline const internal_node*
1093   AsInternal(const node *n)
1094   {
1095     return AsInternal(const_cast<node *>(n));
1096   }
1097
1098   static inline leaf_node *
1099   AsLeafCheck(node *n, uint64_t v)
1100   {
1101     return likely(n) && RawVersionManip::IsLeafNode(v) ?
1102       static_cast<leaf_node *>(n) : NULL;
1103   }
1104
1105   static inline leaf_node*
1106   AsLeafCheck(node *n)
1107   {
1108     return likely(n) && n->is_leaf_node() ?
1109       static_cast<leaf_node *>(n) : NULL;
1110   }
1111
1112   static inline const leaf_node*
1113   AsLeafCheck(const node *n, uint64_t v)
1114   {
1115     return AsLeafCheck(const_cast<node *>(n), v);
1116   }
1117
1118   static inline const leaf_node*
1119   AsLeafCheck(const node *n)
1120   {
1121     return AsLeafCheck(const_cast<node *>(n));
1122   }
1123
1124   static inline internal_node*
1125   AsInternalCheck(node *n)
1126   {
1127     return likely(n) && n->is_internal_node() ? static_cast<internal_node *>(n) : NULL;
1128   }
1129
1130   static inline const internal_node*
1131   AsInternalCheck(const node *n)
1132   {
1133     return AsInternalCheck(const_cast<node *>(n));
1134   }
1135
1136   static inline void
1137   UnlockNodes(typename util::vec<node *>::type &locked_nodes)
1138   {
1139     for (auto it = locked_nodes.begin(); it != locked_nodes.end(); ++it)
1140       (*it)->unlock();
1141     locked_nodes.clear();
1142   }
1143
1144   template <typename T>
1145   static inline T
1146   UnlockAndReturn(typename util::vec<node *>::type &locked_nodes, T t)
1147   {
1148     UnlockNodes(locked_nodes);
1149     return t;
1150   }
1151
1152   static bool
1153   CheckVersion(uint64_t a, uint64_t b)
1154   {
1155     return VersionManip::CheckVersion(a, b);
1156   }
1157
1158   /**
1159    * Is not thread safe, and does not use RCU to free memory
1160    *
1161    * Should only be called when there are no outstanding operations on
1162    * any nodes reachable from n
1163    */
1164   static void recursive_delete(node *n);
1165
1166   node *volatile root_;
1167
1168 public:
1169
1170   // XXX(stephentu): trying out a very opaque node API for now
1171   typedef struct node node_opaque_t;
1172   typedef std::pair< const node_opaque_t *, uint64_t > versioned_node_t;
1173   struct insert_info_t {
1174     const node_opaque_t* node;
1175     uint64_t old_version;
1176     uint64_t new_version;
1177   };
1178
1179   btree() : root_(leaf_node::alloc())
1180   {
1181     static_assert(
1182         NKeysPerNode > (sizeof(key_slice) + 2), "XX"); // so we can always do a split
1183     static_assert(
1184         NKeysPerNode <=
1185         (VersionManip::HDR_KEY_SLOTS_MASK >> VersionManip::HDR_KEY_SLOTS_SHIFT), "XX");
1186
1187 #ifdef CHECK_INVARIANTS
1188     root_->lock();
1189     root_->set_root();
1190     root_->unlock();
1191 #else
1192     root_->set_root();
1193 #endif /* CHECK_INVARIANTS */
1194   }
1195
1196   ~btree()
1197   {
1198     // NOTE: it is assumed on deletion time there are no
1199     // outstanding requests to the btree, so deletion proceeds
1200     // in a non-threadsafe manner
1201     recursive_delete(root_);
1202     root_ = NULL;
1203   }
1204
1205   /**
1206    * NOT THREAD SAFE
1207    */
1208   inline void
1209   clear()
1210   {
1211     recursive_delete(root_);
1212     root_ = leaf_node::alloc();
1213 #ifdef CHECK_INVARIANTS
1214     root_->lock();
1215     root_->set_root();
1216     root_->unlock();
1217 #else
1218     root_->set_root();
1219 #endif /* CHECK_INVARIANTS */
1220   }
1221
1222   /** Note: invariant checking is not thread safe */
1223   inline void
1224   invariant_checker() const
1225   {
1226     root_->invariant_checker(NULL, NULL, NULL, NULL, true);
1227   }
1228
1229           /** NOTE: the public interface assumes that the caller has taken care
1230            * of setting up RCU */
1231
1232   inline bool
1233   search(const key_type &k, value_type &v,
1234          versioned_node_t *search_info = nullptr) const
1235   {
1236     rcu_region guard;
1237     typename util::vec<leaf_node *>::type ns;
1238     return search_impl(k, v, ns, search_info);
1239   }
1240
1241   /**
1242    * The low level callback interface is as follows:
1243    *
1244    * Consider a scan in the range [a, b):
1245    *   1) on_resp_node() is called at least once per node which
1246    *      has a responibility range that overlaps with the scan range
1247    *   2) invoke() is called per <k, v>-pair such that k is in [a, b)
1248    *
1249    * The order of calling on_resp_node() and invoke() is up to the implementation.
1250    */
1251   class low_level_search_range_callback {
1252   public:
1253     virtual ~low_level_search_range_callback() {}
1254
1255     /**
1256      * This node lies within the search range (at version v)
1257      */
1258     virtual void on_resp_node(const node_opaque_t *n, uint64_t version) = 0;
1259
1260     /**
1261      * This key/value pair was read from node n @ version
1262      */
1263     virtual bool invoke(const string_type &k, value_type v,
1264                         const node_opaque_t *n, uint64_t version) = 0;
1265   };
1266
1267   /**
1268    * A higher level interface if you don't care about node and version numbers
1269    */
1270   class search_range_callback : public low_level_search_range_callback {
1271   public:
1272     virtual void
1273     on_resp_node(const node_opaque_t *n, uint64_t version)
1274     {
1275     }
1276
1277     virtual bool
1278     invoke(const string_type &k, value_type v,
1279            const node_opaque_t *n, uint64_t version)
1280     {
1281       return invoke(k, v);
1282     }
1283
1284     virtual bool invoke(const string_type &k, value_type v) = 0;
1285   };
1286
1287 private:
1288   template <typename T>
1289   class type_callback_wrapper : public search_range_callback {
1290   public:
1291     type_callback_wrapper(T *callback) : callback_(callback) {}
1292     virtual bool
1293     invoke(const string_type &k, value_type v)
1294     {
1295       return callback_->operator()(k, v);
1296     }
1297   private:
1298     T *const callback_;
1299   };
1300
1301   struct leaf_kvinfo {
1302     key_slice key_; // in host endian
1303     key_slice key_big_endian_;
1304     typename leaf_node::value_or_node_ptr vn_;
1305     bool layer_;
1306     size_t length_;
1307     varkey suffix_;
1308     leaf_kvinfo() {} // for STL
1309     leaf_kvinfo(key_slice key,
1310                 typename leaf_node::value_or_node_ptr vn,
1311                 bool layer,
1312                 size_t length,
1313                 const varkey &suffix)
1314       : key_(key), key_big_endian_(util::big_endian_trfm<key_slice>()(key)),
1315         vn_(vn), layer_(layer), length_(length), suffix_(suffix)
1316     {}
1317
1318     inline const char *
1319     keyslice() const
1320     {
1321       return (const char *) &key_big_endian_;
1322     }
1323   };
1324
1325   bool search_range_at_layer(leaf_node *leaf,
1326                              string_type &prefix,
1327                              const key_type &lower,
1328                              bool inc_lower,
1329                              const key_type *upper,
1330                              low_level_search_range_callback &callback) const;
1331
1332 public:
1333
1334   /**
1335    * For all keys in [lower, *upper), invoke callback in ascending order.
1336    * If upper is NULL, then there is no upper bound
1337    *
1338
1339    * This function by default provides a weakly consistent view of the b-tree. For
1340    * instance, consider the following tree, where n = 3 is the max number of
1341    * keys in a node:
1342    *
1343    *              [D|G]
1344    *             /  |  \
1345    *            /   |   \
1346    *           /    |    \
1347    *          /     |     \
1348    *   [A|B|C]<->[D|E|F]<->[G|H|I]
1349    *
1350    * Suppose we want to scan [A, inf), so we traverse to the leftmost leaf node
1351    * and start a left-to-right walk. Suppose we have emitted keys A, B, and C,
1352    * and we are now just about to scan the middle leaf node.  Now suppose
1353    * another thread concurrently does delete(A), followed by a delete(H).  Now
1354    * the scaning thread resumes and emits keys D, E, F, G, and I, omitting H
1355    * because H was deleted. This is an inconsistent view of the b-tree, since
1356    * the scanning thread has observed the deletion of H but did not observe the
1357    * deletion of A, but we know that delete(A) happens before delete(H).
1358    *
1359    * The weakly consistent guarantee provided is the following: all keys
1360    * which, at the time of invocation, are known to exist in the btree
1361    * will be discovered on a scan (provided the key falls within the scan's range),
1362    * and provided there are no concurrent modifications/removals of that key
1363    *
1364    * Note that scans within a single node are consistent
1365    *
1366    * XXX: add other modes which provide better consistency:
1367    * A) locking mode
1368    * B) optimistic validation mode
1369    *
1370    * the last string parameter is an optional string buffer to use:
1371    * if null, a stack allocated string will be used. if not null, must
1372    * ensure:
1373    *   A) buf->empty() at the beginning
1374    *   B) no concurrent mutation of string
1375    * note that string contents upon return are arbitrary
1376    */
1377   void
1378   search_range_call(const key_type &lower,
1379                     const key_type *upper,
1380                     low_level_search_range_callback &callback,
1381                     string_type *buf = nullptr) const;
1382
1383   // (lower, upper]
1384   void
1385   rsearch_range_call(const key_type &upper,
1386                      const key_type *lower,
1387                      low_level_search_range_callback &callback,
1388                      std::string *buf = nullptr) const
1389   {
1390     NDB_UNIMPLEMENTED("rsearch_range_call");
1391   }
1392
1393   /**
1394    * Callback is expected to implement bool operator()(key_slice k, value_type v),
1395    * where the callback returns true if it wants to keep going, false otherwise
1396    *
1397    */
1398   template <typename T>
1399   inline void
1400   search_range(const key_type &lower,
1401                const key_type *upper,
1402                T &callback,
1403                string_type *buf = nullptr) const
1404   {
1405     type_callback_wrapper<T> w(&callback);
1406     search_range_call(lower, upper, w, buf);
1407   }
1408
1409   template <typename F>
1410   inline void
1411   rsearch_range(const key_type &upper,
1412                 const key_type *lower,
1413                 F& callback,
1414                 std::string *buf = nullptr) const
1415   {
1416     NDB_UNIMPLEMENTED("rsearch_range");
1417   }
1418
1419   /**
1420    * returns true if key k did not already exist, false otherwise
1421    * If k exists with a different mapping, still returns false
1422    *
1423    * If false and old_v is not NULL, then the overwritten value of v
1424    * is written into old_v
1425    */
1426   inline bool
1427   insert(const key_type &k, value_type v,
1428          value_type *old_v = NULL,
1429          insert_info_t *insert_info = NULL)
1430   {
1431     rcu_region guard;
1432     return insert_stable_location((node **) &root_, k, v, false, old_v, insert_info);
1433   }
1434
1435   /**
1436    * Only puts k=>v if k does not exist in map. returns true
1437    * if k inserted, false otherwise (k exists already)
1438    */
1439   inline bool
1440   insert_if_absent(const key_type &k, value_type v,
1441                    insert_info_t *insert_info = NULL)
1442   {
1443     rcu_region guard;
1444     return insert_stable_location((node **) &root_, k, v, true, NULL, insert_info);
1445   }
1446
1447   /**
1448    * return true if a value was removed, false otherwise.
1449    *
1450    * if true and old_v is not NULL, then the removed value of v
1451    * is written into old_v
1452    */
1453   inline bool
1454   remove(const key_type &k, value_type *old_v = NULL)
1455   {
1456     rcu_region guard;
1457     return remove_stable_location((node **) &root_, k, old_v);
1458   }
1459
1460 private:
1461   bool
1462   insert_stable_location(node **root_location, const key_type &k, value_type v,
1463                          bool only_if_absent, value_type *old_v,
1464                          insert_info_t *insert_info);
1465
1466   bool
1467   remove_stable_location(node **root_location, const key_type &k, value_type *old_v);
1468
1469 public:
1470
1471   /**
1472    * The tree walk API is a bit strange, due to the optimistic nature of the
1473    * btree.
1474    *
1475    * The way it works is that, on_node_begin() is first called. In
1476    * on_node_begin(), a callback function should read (but not modify) the
1477    * values it is interested in, and save them.
1478    *
1479    * Then, either one of on_node_success() or on_node_failure() is called. If
1480    * on_node_success() is called, then the previous values read in
1481    * on_node_begin() are indeed valid.  If on_node_failure() is called, then
1482    * the previous values are not valid and should be discarded.
1483    */
1484   class tree_walk_callback {
1485   public:
1486     virtual ~tree_walk_callback() {}
1487     virtual void on_node_begin(const node_opaque_t *n) = 0;
1488     virtual void on_node_success() = 0;
1489     virtual void on_node_failure() = 0;
1490   };
1491
1492   void tree_walk(tree_walk_callback &callback) const;
1493
1494 private:
1495   class size_walk_callback : public tree_walk_callback {
1496   public:
1497     size_walk_callback() : spec_size_(0), size_(0) {}
1498     virtual void on_node_begin(const node_opaque_t *n);
1499     virtual void on_node_success();
1500     virtual void on_node_failure();
1501     inline size_t get_size() const { return size_; }
1502   private:
1503     size_t spec_size_;
1504     size_t size_;
1505   };
1506
1507 public:
1508   /**
1509    * Is thread-safe, but not really designed to perform well with concurrent
1510    * modifications. also the value returned is not consistent given concurrent
1511    * modifications
1512    */
1513   inline size_t
1514   size() const
1515   {
1516     size_walk_callback c;
1517     tree_walk(c);
1518     return c.get_size();
1519   }
1520
1521   static inline uint64_t
1522   ExtractVersionNumber(const node_opaque_t *n)
1523   {
1524     // XXX(stephentu): I think we must use stable_version() for
1525     // correctness, but I am not 100% sure. It's definitely correct to use it,
1526     // but maybe we can get away with unstable_version()?
1527     return RawVersionManip::Version(n->stable_version());
1528   }
1529
1530   // [value, has_suffix]
1531   static std::vector< std::pair<value_type, bool> >
1532   ExtractValues(const node_opaque_t *n);
1533
1534   void print() {
1535   }
1536
1537   /**
1538    * Not well defined if n is being concurrently modified, just for debugging
1539    */
1540   static std::string
1541   NodeStringify(const node_opaque_t *n);
1542
1543   static inline size_t
1544   InternalNodeSize()
1545   {
1546     return sizeof(internal_node);
1547   }
1548
1549   static inline size_t
1550   LeafNodeSize()
1551   {
1552     return sizeof(leaf_node);
1553   }
1554
1555 private:
1556
1557   /**
1558    * Move the array slice from [p, n) to the right by k position, occupying [p + k, n + k),
1559    * leaving the values of array[p] to array[p + k -1] undefined. Has no effect if p >= n
1560    *
1561    * Note: Assumes that array[n] is valid memory.
1562    */
1563   template <typename T>
1564   static inline ALWAYS_INLINE void
1565   sift_right(T *array, size_t p, size_t n, size_t k = 1)
1566   {
1567     if (k == 0)
1568       return;
1569     for (size_t i = n + k - 1; i > p + k - 1; i--)
1570       array[i] = array[i - k];
1571   }
1572
1573   // use swap() to do moves, for efficiency- avoid this
1574   // variant when using primitive arrays
1575   template <typename T>
1576   static inline ALWAYS_INLINE void
1577   sift_swap_right(T *array, size_t p, size_t n, size_t k = 1)
1578   {
1579     if (k == 0)
1580       return;
1581     for (size_t i = n + k - 1; i > p + k - 1; i--)
1582       array[i].swap(array[i - k]);
1583   }
1584
1585   /**
1586    * Move the array slice from [p + k, n) to the left by k positions, occupying [p, n - k),
1587    * overwriting array[p]..array[p+k-1] Has no effect if p + k >= n
1588    */
1589   template <typename T>
1590   static inline ALWAYS_INLINE void
1591   sift_left(T *array, size_t p, size_t n, size_t k = 1)
1592   {
1593     if (unlikely(p + k >= n))
1594       return;
1595     for (size_t i = p; i < n - k; i++)
1596       array[i] = array[i + k];
1597   }
1598
1599   template <typename T>
1600   static inline ALWAYS_INLINE void
1601   sift_swap_left(T *array, size_t p, size_t n, size_t k = 1)
1602   {
1603     if (unlikely(p + k >= n))
1604       return;
1605     for (size_t i = p; i < n - k; i++)
1606       array[i].swap(array[i + k]);
1607   }
1608
1609   /**
1610    * Copy [p, n) from source into dest. Has no effect if p >= n
1611    */
1612   template <typename T>
1613   static inline ALWAYS_INLINE void
1614   copy_into(T *dest, T *source, size_t p, size_t n)
1615   {
1616     for (size_t i = p; i < n; i++)
1617       *dest++ = source[i];
1618   }
1619
1620   template <typename T>
1621   static inline ALWAYS_INLINE void
1622   swap_with(T *dest, T *source, size_t p, size_t n)
1623   {
1624     for (size_t i = p; i < n; i++)
1625       (dest++)->swap(source[i]);
1626   }
1627
1628   leaf_node *leftmost_descend_layer(node *n) const;
1629
1630   /**
1631    * Assumes RCU region scope is held
1632    */
1633   bool search_impl(const key_type &k, value_type &v,
1634                    typename util::vec<leaf_node *>::type &leaf_nodes,
1635                    versioned_node_t *search_info = nullptr) const;
1636
1637   static leaf_node *
1638   FindRespLeafNode(
1639       leaf_node *leaf, uint64_t kslice, uint64_t &version);
1640
1641   /**
1642    * traverses the lower leaf levels for a leaf node resp for kslice such that
1643    * version is stable and not deleting. resp info is given via idxmatch +
1644    * idxlowerbound
1645    *
1646    * if idxmatch != -1, then ignore idxlowerbound
1647    *
1648    * note to actually use the info, you still need to validate it (the info is
1649    * tentative as of the version)
1650    */
1651   static leaf_node *
1652   FindRespLeafLowerBound(
1653       leaf_node *leaf, uint64_t kslice,
1654       size_t kslicelen, uint64_t &version,
1655       size_t &n, ssize_t &idxmatch, ssize_t &idxlowerbound);
1656
1657   static leaf_node *
1658   FindRespLeafExact(
1659       leaf_node *leaf, uint64_t kslice,
1660       size_t kslicelen, uint64_t &version,
1661       size_t &n, ssize_t &idxmatch);
1662
1663   typedef std::pair<node *, uint64_t> insert_parent_entry;
1664
1665   enum insert_status {
1666     I_NONE_NOMOD, // no nodes split nor modified
1667     I_NONE_MOD, // no nodes split, but modified
1668     I_RETRY,
1669     I_SPLIT, // node(s) split
1670   };
1671
1672   /**
1673    * insert k=>v into node n. if this insert into n causes it to split into two
1674    * nodes, return the new node (upper half of keys). in this case, min_key is set to the
1675    * smallest key that the new node is responsible for. otherwise return null, in which
1676    * case min_key's value is not defined.
1677    *
1678    * NOTE: our implementation of insert0() is not as efficient as possible, in favor
1679    * of code clarity
1680    */
1681   insert_status
1682   insert0(node *n,
1683           const key_type &k,
1684           value_type v,
1685           bool only_if_absent,
1686           value_type *old_v,
1687           insert_info_t *insert_info,
1688           key_slice &min_key,
1689           node *&new_node,
1690           typename util::vec<insert_parent_entry>::type &parents,
1691           typename util::vec<node *>::type &locked_nodes);
1692
1693   enum remove_status {
1694     R_NONE_NOMOD,
1695     R_NONE_MOD,
1696     R_RETRY,
1697     R_STOLE_FROM_LEFT,
1698     R_STOLE_FROM_RIGHT,
1699     R_MERGE_WITH_LEFT,
1700     R_MERGE_WITH_RIGHT,
1701     R_REPLACE_NODE,
1702   };
1703
1704   inline ALWAYS_INLINE void
1705   remove_pos_from_leaf_node(leaf_node *leaf, size_t pos, size_t n)
1706   {
1707     INVARIANT(leaf->key_slots_used() == n);
1708     INVARIANT(pos < n);
1709     if (leaf->value_is_layer(pos)) {
1710 #ifdef CHECK_INVARIANTS
1711       leaf->values_[pos].n_->lock();
1712 #endif
1713       leaf->values_[pos].n_->mark_deleting();
1714       INVARIANT(leaf->values_[pos].n_->is_leaf_node());
1715       INVARIANT(leaf->values_[pos].n_->key_slots_used() == 0);
1716       leaf_node::release((leaf_node *) leaf->values_[pos].n_);
1717 #ifdef CHECK_INVARIANTS
1718       leaf->values_[pos].n_->unlock();
1719 #endif
1720     }
1721     sift_left(leaf->keys_, pos, n);
1722     sift_left(leaf->values_, pos, n);
1723     sift_left(leaf->lengths_, pos, n);
1724     if (leaf->suffixes_)
1725       sift_swap_left(leaf->suffixes_, pos, n);
1726     leaf->dec_key_slots_used();
1727   }
1728
1729   inline ALWAYS_INLINE void
1730   remove_pos_from_internal_node(
1731       internal_node *internal, size_t key_pos, size_t child_pos, size_t n)
1732   {
1733     INVARIANT(internal->key_slots_used() == n);
1734     INVARIANT(key_pos < n);
1735     INVARIANT(child_pos < n + 1);
1736     sift_left(internal->keys_, key_pos, n);
1737     sift_left(internal->children_, child_pos, n + 1);
1738     internal->dec_key_slots_used();
1739   }
1740
1741   struct remove_parent_entry {
1742     // non-const members for STL
1743     node *parent_;
1744     node *parent_left_sibling_;
1745     node *parent_right_sibling_;
1746     uint64_t parent_version_;
1747
1748     // default ctor for STL
1749     remove_parent_entry()
1750       : parent_(NULL), parent_left_sibling_(NULL),
1751         parent_right_sibling_(NULL), parent_version_(0)
1752     {}
1753
1754     remove_parent_entry(node *parent,
1755                         node *parent_left_sibling,
1756                         node *parent_right_sibling,
1757                         uint64_t parent_version)
1758       : parent_(parent), parent_left_sibling_(parent_left_sibling),
1759         parent_right_sibling_(parent_right_sibling),
1760         parent_version_(parent_version)
1761     {}
1762   };
1763
1764   remove_status
1765   remove0(node *np,
1766           key_slice *min_key,
1767           key_slice *max_key,
1768           const key_type &k,
1769           value_type *old_v,
1770           node *left_node,
1771           node *right_node,
1772           key_slice &new_key,
1773           node *&replace_node,
1774           typename util::vec<remove_parent_entry>::type &parents,
1775           typename util::vec<node *>::type &locked_nodes);
1776 };
1777
1778 template <typename P>
1779 inline void
1780 btree<P>::node::prefetch() const
1781 {
1782   if (is_leaf_node())
1783     AsLeaf(this)->prefetch();
1784   else
1785     AsInternal(this)->prefetch();
1786 }
1787
1788 extern void TestConcurrentBtreeFast();
1789 extern void TestConcurrentBtreeSlow();
1790
1791 #if !NDB_MASSTREE
1792 typedef btree<concurrent_btree_traits> concurrent_btree;
1793 typedef btree<single_threaded_btree_traits> single_threaded_btree;
1794 #endif