Clean up build and remove generated files
[c11concurrency-benchmarks.git] / silo / btree_impl.h
1 #pragma once
2
3 #include <unistd.h>
4
5 #include <iostream>
6 #include <map>
7 #include <set>
8 #include <stack>
9 #include <vector>
10 #include <sstream>
11 #include <atomic>
12 #include <memory>
13
14 #include "core.h"
15 #include "btree.h"
16 #include "thread.h"
17 #include "txn.h"
18 #include "util.h"
19 #include "scopedperf.hh"
20
21 template <typename P>
22 void
23 btree<P>::node::base_invariant_unique_keys_check() const
24 {
25   size_t n = this->key_slots_used();
26   if (n == 0)
27     return;
28   if (is_leaf_node()) {
29     const leaf_node *leaf = AsLeaf(this);
30     typedef std::pair<key_slice, size_t> leaf_key;
31     leaf_key prev;
32     prev.first = keys_[0];
33     prev.second = leaf->keyslice_length(0);
34     ALWAYS_ASSERT(prev.second <= 9);
35     ALWAYS_ASSERT(!leaf->is_layer(0) || prev.second == 9);
36     if (!leaf->is_layer(0) && prev.second == 9) {
37       ALWAYS_ASSERT(leaf->suffixes_);
38       ALWAYS_ASSERT(leaf->suffixes_[0].size() >= 1);
39     }
40     for (size_t i = 1; i < n; i++) {
41       leaf_key cur_key;
42       cur_key.first = keys_[i];
43       cur_key.second = leaf->keyslice_length(i);
44       ALWAYS_ASSERT(cur_key.second <= 9);
45       ALWAYS_ASSERT(!leaf->is_layer(i) || cur_key.second == 9);
46       if (!leaf->is_layer(i) && cur_key.second == 9) {
47         ALWAYS_ASSERT(leaf->suffixes_);
48         ALWAYS_ASSERT(leaf->suffixes_[i].size() >= 1);
49       }
50       ALWAYS_ASSERT(cur_key > prev);
51       prev = cur_key;
52     }
53   } else {
54     key_slice prev = keys_[0];
55     for (size_t i = 1; i < n; i++) {
56       ALWAYS_ASSERT(keys_[i] > prev);
57       prev = keys_[i];
58     }
59   }
60 }
61
62 template <typename P>
63 void
64 btree<P>::node::base_invariant_checker(const key_slice *min_key,
65                                        const key_slice *max_key,
66                                        bool is_root) const
67 {
68   ALWAYS_ASSERT(!is_locked());
69   ALWAYS_ASSERT(!is_modifying());
70   ALWAYS_ASSERT(this->is_root() == is_root);
71   size_t n = this->key_slots_used();
72   ALWAYS_ASSERT(n <= NKeysPerNode);
73   if (is_root) {
74     if (is_internal_node())
75       ALWAYS_ASSERT(n >= 1);
76   } else {
77     if (is_internal_node())
78       ALWAYS_ASSERT(n >= NMinKeysPerNode);
79     else
80       // key-slices constrain splits
81       ALWAYS_ASSERT(n >= 1);
82   }
83   for (size_t i = 0; i < n; i++) {
84     ALWAYS_ASSERT(!min_key || keys_[i] >= *min_key);
85     ALWAYS_ASSERT(!max_key || keys_[i] < *max_key);
86   }
87   base_invariant_unique_keys_check();
88 }
89
90 template <typename P>
91 void
92 btree<P>::node::invariant_checker(const key_slice *min_key,
93                                   const key_slice *max_key,
94                                   const node *left_sibling,
95                                   const node *right_sibling,
96                                   bool is_root) const
97 {
98   is_leaf_node() ?
99     AsLeaf(this)->invariant_checker_impl(min_key, max_key, left_sibling, right_sibling, is_root) :
100     AsInternal(this)->invariant_checker_impl(min_key, max_key, left_sibling, right_sibling, is_root) ;
101 }
102
103 //static event_counter evt_btree_leaf_node_creates("btree_leaf_node_creates");
104 //static event_counter evt_btree_leaf_node_deletes("btree_leaf_node_deletes");
105
106 //event_counter btree<P>::leaf_node::g_evt_suffixes_array_created("btree_leaf_node_suffix_array_creates");
107
108 template <typename P>
109 btree<P>::leaf_node::leaf_node()
110   : node(), min_key_(0), prev_(NULL), next_(NULL), suffixes_(NULL)
111 {
112   //++evt_btree_leaf_node_creates;
113 }
114
115 template <typename P>
116 btree<P>::leaf_node::~leaf_node()
117 {
118   if (suffixes_)
119     delete [] suffixes_;
120   //suffixes_ = NULL;
121   //++evt_btree_leaf_node_deletes;
122 }
123
124 template <typename P>
125 void
126 btree<P>::leaf_node::invariant_checker_impl(const key_slice *min_key,
127                                             const key_slice *max_key,
128                                             const node *left_sibling,
129                                             const node *right_sibling,
130                                             bool is_root) const
131 {
132   this->base_invariant_checker(min_key, max_key, is_root);
133   ALWAYS_ASSERT(!min_key || *min_key == this->min_key_);
134   ALWAYS_ASSERT(!is_root || min_key == NULL);
135   ALWAYS_ASSERT(!is_root || max_key == NULL);
136   ALWAYS_ASSERT(is_root || this->key_slots_used() > 0);
137   size_t n = this->key_slots_used();
138   for (size_t i = 0; i < n; i++)
139     if (this->is_layer(i))
140       this->values_[i].n_->invariant_checker(NULL, NULL, NULL, NULL, true);
141 }
142
143 //static event_counter evt_btree_internal_node_creates("btree_internal_node_creates");
144 //static event_counter evt_btree_internal_node_deletes("btree_internal_node_deletes");
145
146 template <typename P>
147 btree<P>::internal_node::internal_node()
148 {
149   VersionManip::Store(this->hdr_, 1);
150   //++evt_btree_internal_node_creates;
151 }
152
153 template <typename P>
154 btree<P>::internal_node::~internal_node()
155 {
156   //++evt_btree_internal_node_deletes;
157 }
158
159 template <typename P>
160 void
161 btree<P>::internal_node::invariant_checker_impl(const key_slice *min_key,
162                                                 const key_slice *max_key,
163                                                 const node *left_sibling,
164                                                 const node *right_sibling,
165                                                 bool is_root) const
166 {
167   this->base_invariant_checker(min_key, max_key, is_root);
168   size_t n = this->key_slots_used();
169   for (size_t i = 0; i <= n; i++) {
170     ALWAYS_ASSERT(this->children_[i] != NULL);
171     if (i == 0) {
172       const node *left_child_sibling = NULL;
173       if (left_sibling)
174         left_child_sibling = AsInternal(left_sibling)->children_[left_sibling->key_slots_used()];
175       this->children_[0]->invariant_checker(min_key, &this->keys_[0], left_child_sibling, this->children_[i + 1], false);
176     } else if (i == n) {
177       const node *right_child_sibling = NULL;
178       if (right_sibling)
179         right_child_sibling = AsInternal(right_sibling)->children_[0];
180       this->children_[n]->invariant_checker(&this->keys_[n - 1], max_key, this->children_[i - 1], right_child_sibling, false);
181     } else {
182       this->children_[i]->invariant_checker(&this->keys_[i - 1], &this->keys_[i], this->children_[i - 1], this->children_[i + 1], false);
183     }
184   }
185   if (!n || this->children_[0]->is_internal_node())
186     return;
187   for (size_t i = 0; i <= n; i++) {
188     const node *left_child_sibling = NULL;
189     const node *right_child_sibling = NULL;
190     if (left_sibling)
191       left_child_sibling = AsInternal(left_sibling)->children_[left_sibling->key_slots_used()];
192     if (right_sibling)
193       right_child_sibling = AsInternal(right_sibling)->children_[0];
194     const leaf_node *child_prev = (i == 0) ? AsLeaf(left_child_sibling) : AsLeaf(this->children_[i - 1]);
195     const leaf_node *child_next = (i == n) ? AsLeaf(right_child_sibling) : AsLeaf(this->children_[i + 1]);
196     ALWAYS_ASSERT(AsLeaf(this->children_[i])->prev_ == child_prev);
197     ALWAYS_ASSERT(AsLeaf(this->children_[i])->next_ == child_next);
198   }
199 }
200
201 template <typename P>
202 void
203 btree<P>::recursive_delete(node *n)
204 {
205   if (leaf_node *leaf = AsLeafCheck(n)) {
206 #ifdef CHECK_INVARIANTS
207     leaf->lock();
208     leaf->mark_deleting();
209     leaf->unlock();
210 #endif
211     size_t n = leaf->key_slots_used();
212     for (size_t i = 0; i < n; i++)
213       if (leaf->is_layer(i))
214         recursive_delete(leaf->values_[i].n_);
215     leaf_node::deleter(leaf);
216   } else {
217     internal_node *internal = AsInternal(n);
218     size_t n = internal->key_slots_used();
219     for (size_t i = 0; i < n + 1; i++)
220       recursive_delete(internal->children_[i]);
221 #ifdef CHECK_INVARIANTS
222     internal->lock();
223     internal->mark_deleting();
224     internal->unlock();
225 #endif
226     internal_node::deleter(internal);
227   }
228 }
229
230 //STATIC_COUNTER_DECL(scopedperf::tsc_ctr, btree_search_impl_tsc, btree_search_impl_perf_cg);
231
232 template <typename P>
233 bool
234 btree<P>::search_impl(const key_type &k, value_type &v,
235                       typename util::vec<leaf_node *>::type &leaf_nodes,
236                       versioned_node_t *search_info) const
237 {
238   INVARIANT(rcu::s_instance.in_rcu_region());
239   //ANON_REGION("btree<P>::search_impl:", &btree_search_impl_perf_cg);
240   INVARIANT(leaf_nodes.empty());
241
242 retry:
243   node *cur;
244   key_type kcur;
245   uint64_t kslice;
246   size_t kslicelen;
247   if (likely(leaf_nodes.empty())) {
248     kcur = k;
249     cur = root_;
250     kslice = k.slice();
251     kslicelen = std::min(k.size(), size_t(9));
252   } else {
253     kcur = k.shift_many(leaf_nodes.size() - 1);
254     cur = leaf_nodes.back();
255     kslice = kcur.slice();
256     kslicelen = std::min(kcur.size(), size_t(9));
257     leaf_nodes.pop_back();
258   }
259
260   while (true) {
261     // each iteration of this while loop tries to descend
262     // down node "cur", looking for key kcur
263
264 process:
265     uint64_t version = cur->stable_version();
266     if (unlikely(RawVersionManip::IsDeleting(version)))
267       // XXX: maybe we can only retry at the parent of this node, not the
268       // root node of the b-tree *layer*
269       goto retry;
270     if (leaf_node *leaf = AsLeafCheck(cur, version)) {
271       leaf->prefetch();
272       if (search_info) {
273         search_info->first = leaf;
274         search_info->second = RawVersionManip::Version(version);
275       }
276       key_search_ret kret = leaf->key_search(kslice, kslicelen);
277       ssize_t ret = kret.first;
278       if (ret != -1) {
279         // found
280         typename leaf_node::value_or_node_ptr vn = leaf->values_[ret];
281         const bool is_layer = leaf->is_layer(ret);
282         INVARIANT(!is_layer || kslicelen == 9);
283         varkey suffix(leaf->suffix(ret));
284         if (unlikely(!leaf->check_version(version)))
285           goto process;
286         leaf_nodes.push_back(leaf);
287
288         if (!is_layer) {
289           // check suffixes
290           if (kslicelen == 9 && suffix != kcur.shift())
291             return false;
292           v = vn.v_;
293           return true;
294         }
295
296         // search the next layer
297         cur = vn.n_;
298         kcur = kcur.shift();
299         kslice = kcur.slice();
300         kslicelen = std::min(kcur.size(), size_t(9));
301         continue;
302       }
303
304       // leaf might have lost responsibility for key k during the descend. we
305       // need to check this and adjust accordingly
306       if (unlikely(kslice < leaf->min_key_)) {
307         // try to go left
308         leaf_node *left_sibling = leaf->prev_;
309         if (unlikely(!leaf->check_version(version)))
310           goto process;
311         if (likely(left_sibling)) {
312           cur = left_sibling;
313           continue;
314         } else {
315           // XXX: this case shouldn't be possible...
316           goto retry;
317         }
318       } else {
319         // try to go right
320         leaf_node *right_sibling = leaf->next_;
321         if (unlikely(!leaf->check_version(version)))
322           goto process;
323         if (unlikely(!right_sibling)) {
324           leaf_nodes.push_back(leaf);
325           return false;
326         }
327         right_sibling->prefetch();
328         uint64_t right_version = right_sibling->stable_version();
329         key_slice right_min_key = right_sibling->min_key_;
330         if (unlikely(!right_sibling->check_version(right_version)))
331           goto process;
332         if (unlikely(kslice >= right_min_key)) {
333           cur = right_sibling;
334           continue;
335         }
336       }
337
338       leaf_nodes.push_back(leaf);
339       return false;
340     } else {
341       internal_node *internal = AsInternal(cur);
342       internal->prefetch();
343       key_search_ret kret = internal->key_lower_bound_search(kslice);
344       ssize_t ret = kret.first;
345       if (ret != -1)
346         cur = internal->children_[ret + 1];
347       else
348         cur = internal->children_[0];
349       if (unlikely(!internal->check_version(version)))
350         goto process;
351       INVARIANT(kret.second);
352     }
353   }
354 }
355
356 template <typename S>
357 class string_restore {
358 public:
359   inline string_restore(S &s, size_t n)
360     : s_(&s), n_(n) {}
361   inline ~string_restore()
362   {
363     s_->resize(n_);
364   }
365 private:
366   S *s_;
367   size_t n_;
368 };
369
370 // recursively read the range from the layer down
371 //
372 // all args relative to prefix.
373 //
374 // prefix is non-const, meaning that this function might modify the contents.
375 // it is guaranteed, however, to restore prefix to the state it was before the
376 // invocation
377 //
378 // returns true if we keep going
379 template <typename P>
380 bool
381 btree<P>::search_range_at_layer(
382     leaf_node *leaf,
383     string_type &prefix,
384     const key_type &lower,
385     bool inc_lower,
386     const key_type *upper,
387     low_level_search_range_callback &callback) const
388 {
389   VERBOSE(std::cerr << "search_range_at_layer: prefix.size()=" << prefix.size() << std::endl);
390
391   key_slice last_keyslice = 0;
392   size_t last_keyslice_len = 0;
393   bool emitted_last_keyslice = false;
394   if (!inc_lower) {
395     last_keyslice = lower.slice();
396     last_keyslice_len = std::min(lower.size(), size_t(9));
397     emitted_last_keyslice = true;
398   }
399
400   key_slice lower_slice = lower.slice();
401   key_slice next_key = lower_slice;
402   const size_t prefix_size = prefix.size();
403   // NB: DON'T CALL RESERVE()
404   //prefix.reserve(prefix_size + 8); // allow for next layer
405   const uint64_t upper_slice = upper ? upper->slice() : 0;
406   string_restore<string_type> restorer(prefix, prefix_size);
407   while (!upper || next_key <= upper_slice) {
408     leaf->prefetch();
409
410     typename util::vec<leaf_kvinfo>::type buf;
411     const uint64_t version = leaf->stable_version();
412     key_slice leaf_min_key = leaf->min_key_;
413     if (leaf_min_key > next_key) {
414       // go left
415       leaf_node *left_sibling = leaf->prev_;
416       if (unlikely(!leaf->check_version(version)))
417         // try this node again
418         continue;
419       // try from left_sibling
420       leaf = left_sibling;
421       INVARIANT(leaf);
422       continue;
423     }
424
425     // grab all keys in [lower_slice, upper_slice]. we'll do boundary condition
426     // checking later (outside of the critical section)
427     for (size_t i = 0; i < leaf->key_slots_used(); i++) {
428       // XXX(stephentu): this 1st filter is overly conservative and we can do
429       // better
430       if ((leaf->keys_[i] > lower_slice ||
431            (leaf->keys_[i] == lower_slice &&
432             leaf->keyslice_length(i) >= std::min(lower.size(), size_t(9)))) &&
433           (!upper || leaf->keys_[i] <= upper_slice))
434         buf.emplace_back(
435             leaf->keys_[i],
436             leaf->values_[i],
437             leaf->is_layer(i),
438             leaf->keyslice_length(i),
439             leaf->suffix(i));
440     }
441
442     leaf_node *const right_sibling = leaf->next_;
443     key_slice leaf_max_key = right_sibling ? right_sibling->min_key_ : 0;
444
445     if (unlikely(!leaf->check_version(version)))
446       continue;
447
448     callback.on_resp_node(leaf, RawVersionManip::Version(version));
449
450     for (size_t i = 0; i < buf.size(); i++) {
451       // check to see if we already omitted a key <= buf[i]: if so, don't omit it
452       if (emitted_last_keyslice &&
453           ((buf[i].key_ < last_keyslice) ||
454            (buf[i].key_ == last_keyslice && buf[i].length_ <= last_keyslice_len)))
455         continue;
456       const size_t ncpy = std::min(buf[i].length_, size_t(8));
457       // XXX: prefix.end() calls _M_leak()
458       //prefix.replace(prefix.begin() + prefix_size, prefix.end(), buf[i].keyslice(), ncpy);
459       prefix.replace(prefix_size, string_type::npos, buf[i].keyslice(), ncpy);
460       if (buf[i].layer_) {
461         // recurse into layer
462         leaf_node *const next_layer = leftmost_descend_layer(buf[i].vn_.n_);
463         varkey zerokey;
464         if (emitted_last_keyslice && last_keyslice == buf[i].key_)
465           // NB(stephentu): this is implied by the filter above
466           INVARIANT(last_keyslice_len <= 8);
467         if (!search_range_at_layer(next_layer, prefix, zerokey, false, NULL, callback))
468           return false;
469       } else {
470         // check if we are before the start
471         if (buf[i].key_ == lower_slice) {
472           if (buf[i].length_ <= 8) {
473             if (buf[i].length_ < lower.size())
474               // skip
475               continue;
476           } else {
477             INVARIANT(buf[i].length_ == 9);
478             if (lower.size() > 8 && buf[i].suffix_ < lower.shift())
479               // skip
480               continue;
481           }
482         }
483
484         // check if we are after the end
485         if (upper && buf[i].key_ == upper_slice) {
486           if (buf[i].length_ == 9) {
487             if (upper->size() <= 8)
488               break;
489             if (buf[i].suffix_ >= upper->shift())
490               break;
491           } else if (buf[i].length_ >= upper->size()) {
492             break;
493           }
494         }
495         if (buf[i].length_ == 9)
496           prefix.append((const char *) buf[i].suffix_.data(), buf[i].suffix_.size());
497         // we give the actual version # minus all the other bits, b/c they are not
498         // important here and make comparison easier at higher layers
499         if (!callback.invoke(prefix, buf[i].vn_.v_, leaf, RawVersionManip::Version(version)))
500           return false;
501       }
502       last_keyslice = buf[i].key_;
503       last_keyslice_len = buf[i].length_;
504       emitted_last_keyslice = true;
505     }
506
507     if (!right_sibling)
508       // we're done
509       return true;
510
511     next_key = leaf_max_key;
512     leaf = right_sibling;
513   }
514
515   return true;
516 }
517
518 template <typename P>
519 void
520 btree<P>::search_range_call(const key_type &lower,
521                             const key_type *upper,
522                             low_level_search_range_callback &callback,
523                             string_type *buf) const
524 {
525   rcu_region guard;
526   INVARIANT(rcu::s_instance.in_rcu_region());
527   if (unlikely(upper && *upper <= lower))
528     return;
529   typename util::vec<leaf_node *>::type leaf_nodes;
530   value_type v = 0;
531   search_impl(lower, v, leaf_nodes);
532   INVARIANT(!leaf_nodes.empty());
533   bool first = true;
534   string_type prefix_tmp, *prefix_px;
535   if (buf)
536     prefix_px = buf;
537   else
538     prefix_px = &prefix_tmp;
539   string_type &prefix(*prefix_px);
540   INVARIANT(prefix.empty());
541   prefix.assign((const char *) lower.data(), 8 * (leaf_nodes.size() - 1));
542   while (!leaf_nodes.empty()) {
543     leaf_node *cur = leaf_nodes.back();
544     leaf_nodes.pop_back();
545     key_type layer_upper;
546     bool layer_has_upper = false;
547     if (upper && upper->size() >= (8 * leaf_nodes.size())) {
548       layer_upper = upper->shift_many(leaf_nodes.size());
549       layer_has_upper = true;
550     }
551 #ifdef CHECK_INVARIANTS
552     string_type prefix_before(prefix);
553 #endif
554     if (!search_range_at_layer(
555           cur, prefix, lower.shift_many(leaf_nodes.size()),
556           first, layer_has_upper ? &layer_upper : NULL, callback))
557       return;
558 #ifdef CHECK_INVARIANTS
559     INVARIANT(prefix == prefix_before);
560 #endif
561     first = false;
562     if (!leaf_nodes.empty()) {
563       INVARIANT(prefix.size() >= 8);
564       prefix.resize(prefix.size() - 8);
565     }
566   }
567 }
568
569 template <typename P>
570 bool
571 btree<P>::remove_stable_location(node **root_location, const key_type &k, value_type *old_v)
572 {
573   INVARIANT(rcu::s_instance.in_rcu_region());
574 retry:
575   key_slice new_key;
576   node *replace_node = NULL;
577   typename util::vec<remove_parent_entry>::type parents;
578   typename util::vec<node *>::type locked_nodes;
579   node *local_root = *root_location;
580   remove_status status = remove0(local_root,
581       NULL, /* min_key */
582       NULL, /* max_key */
583       k,
584       old_v,
585       NULL, /* left_node */
586       NULL, /* right_node */
587       new_key,
588       replace_node,
589       parents,
590       locked_nodes);
591   switch (status) {
592   case R_NONE_NOMOD:
593     return false;
594   case R_NONE_MOD:
595     return true;
596   case R_RETRY:
597     goto retry;
598   case R_REPLACE_NODE:
599     INVARIANT(local_root->is_deleting());
600     INVARIANT(local_root->is_lock_owner());
601     INVARIANT(local_root->is_root());
602     INVARIANT(local_root == *root_location);
603     replace_node->set_root();
604     local_root->clear_root();
605     COMPILER_MEMORY_FENCE;
606     *root_location = replace_node;
607     // locks are still held here
608     return UnlockAndReturn(locked_nodes, true);
609   default:
610     ALWAYS_ASSERT(false);
611     return false;
612   }
613   ALWAYS_ASSERT(false);
614   return false;
615 }
616
617 template <typename P>
618 typename btree<P>::leaf_node *
619 btree<P>::leftmost_descend_layer(node *n) const
620 {
621   node *cur = n;
622   while (true) {
623     if (leaf_node *leaf = AsLeafCheck(cur))
624       return leaf;
625     internal_node *internal = AsInternal(cur);
626     uint64_t version = cur->stable_version();
627     node *child = internal->children_[0];
628     if (unlikely(!internal->check_version(version)))
629       continue;
630     cur = child;
631   }
632 }
633
634 template <typename P>
635 void
636 btree<P>::tree_walk(tree_walk_callback &callback) const
637 {
638   rcu_region guard;
639   INVARIANT(rcu::s_instance.in_rcu_region());
640   std::vector<node *> q;
641   // XXX: not sure if cast is safe
642   q.push_back((node *) root_);
643   while (!q.empty()) {
644     node *cur = q.back();
645     q.pop_back();
646     cur->prefetch();
647     leaf_node *leaf = leftmost_descend_layer(cur);
648     INVARIANT(leaf);
649     while (leaf) {
650       leaf->prefetch();
651     process:
652       const uint64_t version = leaf->stable_version();
653       const size_t n = leaf->key_slots_used();
654       std::vector<node *> layers;
655       for (size_t i = 0; i < n; i++)
656         if (leaf->is_layer(i))
657           layers.push_back(leaf->values_[i].n_);
658       leaf_node *next = leaf->next_;
659       callback.on_node_begin(leaf);
660       if (unlikely(!leaf->check_version(version))) {
661         callback.on_node_failure();
662         goto process;
663       }
664       callback.on_node_success();
665       leaf = next;
666       q.insert(q.end(), layers.begin(), layers.end());
667     }
668   }
669 }
670
671 template <typename P>
672 void
673 btree<P>::size_walk_callback::on_node_begin(const node_opaque_t *n)
674 {
675   INVARIANT(n->is_leaf_node());
676   INVARIANT(spec_size_ == 0);
677   const leaf_node *leaf = (const leaf_node *) n;
678   const size_t sz = leaf->key_slots_used();
679   for (size_t i = 0; i < sz; i++)
680     if (!leaf->is_layer(i))
681       spec_size_++;
682 }
683
684 template <typename P>
685 void
686 btree<P>::size_walk_callback::on_node_success()
687 {
688   size_ += spec_size_;
689   spec_size_ = 0;
690 }
691
692 template <typename P>
693 void
694 btree<P>::size_walk_callback::on_node_failure()
695 {
696   spec_size_ = 0;
697 }
698
699 template <typename P>
700 typename btree<P>::leaf_node *
701 btree<P>::FindRespLeafNode(
702       leaf_node *leaf,
703       uint64_t kslice,
704       uint64_t &version)
705 {
706 retry:
707   version = leaf->stable_version();
708   if (unlikely(leaf->is_deleting())) {
709     leaf_node *left = leaf->prev_;
710     if (left) {
711       leaf = left;
712       goto retry;
713     }
714     leaf_node *right = leaf->next_;
715     if (right) {
716       leaf = right;
717       goto retry;
718     }
719     // XXX(stephentu): not sure if we can *really* depend on this,
720     // need to convince ourselves why this is not possible!
721     ALWAYS_ASSERT(false);
722   }
723   if (unlikely(kslice < leaf->min_key_)) {
724     // we need to go left
725     leaf_node *left = leaf->prev_;
726     if (left)
727       leaf = left;
728     goto retry;
729   }
730   leaf_node *right = leaf->next_;
731
732   // NB(stephentu): the only way for right->min_key_ to decrease, is for the
733   // current leaf node to split. therefore, it is un-necessary to ensure stable
734   // version on the next node (we only need to ensure it on the current node)
735   if (likely(right) && unlikely(kslice >= right->min_key_)) {
736     leaf = right;
737     goto retry;
738   }
739
740   //if (likely(right)) {
741   //  const uint64_t right_version = right->stable_version();
742   //  const uint64_t right_min_key = right->min_key_;
743   //  if (unlikely(!right->check_version(right_version)))
744   //    goto retry;
745   //  if (unlikely(kslice >= right_min_key)) {
746   //    leaf = right;
747   //    goto retry;
748   //  }
749   //}
750
751   return leaf;
752 }
753
754 template <typename P>
755 typename btree<P>::leaf_node *
756 btree<P>::FindRespLeafLowerBound(
757       leaf_node *leaf,
758       uint64_t kslice,
759       size_t kslicelen,
760       uint64_t &version,
761       size_t &n,
762       ssize_t &idxmatch,
763       ssize_t &idxlowerbound)
764 {
765   leaf = FindRespLeafNode(leaf, kslice, version);
766
767   // use 0 for slice length, so we can a pointer <= all elements
768   // with the same slice
769   const key_search_ret kret = leaf->key_lower_bound_search(kslice, 0);
770   const ssize_t ret = kret.first;
771   n = kret.second;
772
773   // count the number of values already here with the same kslice.
774   idxmatch = -1;
775   idxlowerbound = ret;
776   for (size_t i = (ret == -1 ? 0 : ret); i < n; i++) {
777     if (leaf->keys_[i] < kslice) {
778       continue;
779     } else if (leaf->keys_[i] == kslice) {
780       const size_t kslicelen0 = leaf->keyslice_length(i);
781       if (kslicelen0 <= kslicelen) {
782         // invariant doesn't hold, b/c values can be changing
783         // concurrently (leaf is not assumed to be locked)
784         //INVARIANT(idxmatch == -1);
785         idxlowerbound = i;
786         if (kslicelen0 == kslicelen)
787           idxmatch = i;
788       }
789     } else {
790       break;
791     }
792   }
793
794   return leaf;
795 }
796
797 template <typename P>
798 typename btree<P>::leaf_node *
799 btree<P>::FindRespLeafExact(
800       leaf_node *leaf,
801       uint64_t kslice,
802       size_t kslicelen,
803       uint64_t &version,
804       size_t &n,
805       ssize_t &idxmatch)
806 {
807   leaf = FindRespLeafNode(leaf, kslice, version);
808   key_search_ret kret = leaf->key_search(kslice, kslicelen);
809   idxmatch = kret.first;
810   n = kret.second;
811   return leaf;
812 }
813
814 template <typename P>
815 typename btree<P>::insert_status
816 btree<P>::insert0(node *np,
817                   const key_type &k,
818                   value_type v,
819                   bool only_if_absent,
820                   value_type *old_v,
821                   insert_info_t *insert_info,
822                   key_slice &min_key,
823                   node *&new_node,
824                   typename util::vec<insert_parent_entry>::type &parents,
825                   typename util::vec<node *>::type &locked_nodes)
826 {
827   uint64_t kslice = k.slice();
828   size_t kslicelen = std::min(k.size(), size_t(9));
829
830   np->prefetch();
831   if (leaf_node *leaf = AsLeafCheck(np)) {
832     // locked nodes are acquired bottom to top
833     INVARIANT(locked_nodes.empty());
834
835 retry_cur_leaf:
836     uint64_t version;
837     size_t n;
838     ssize_t lenmatch, lenlowerbound;
839     leaf_node *resp_leaf = FindRespLeafLowerBound(
840         leaf, kslice, kslicelen, version, n, lenmatch, lenlowerbound);
841
842     // len match case
843     if (lenmatch != -1) {
844       // exact match case
845       if (kslicelen <= 8 ||
846           (!resp_leaf->is_layer(lenmatch) &&
847            resp_leaf->suffix(lenmatch) == k.shift())) {
848         const uint64_t locked_version = resp_leaf->lock();
849         if (unlikely(!btree::CheckVersion(version, locked_version))) {
850           resp_leaf->unlock();
851           goto retry_cur_leaf;
852         }
853         locked_nodes.push_back(resp_leaf);
854         // easy case- we don't modify the node itself
855         if (old_v)
856           *old_v = resp_leaf->values_[lenmatch].v_;
857         if (!only_if_absent)
858           resp_leaf->values_[lenmatch].v_ = v;
859         if (insert_info)
860           insert_info->node = 0;
861         return UnlockAndReturn(locked_nodes, I_NONE_NOMOD);
862       }
863       INVARIANT(kslicelen == 9);
864       if (resp_leaf->is_layer(lenmatch)) {
865         node *subroot = resp_leaf->values_[lenmatch].n_;
866         INVARIANT(subroot);
867         if (unlikely(!resp_leaf->check_version(version)))
868           goto retry_cur_leaf;
869         key_slice mk;
870         node *ret;
871         typename util::vec<insert_parent_entry>::type subparents;
872         typename util::vec<node *>::type sub_locked_nodes;
873         const insert_status status =
874           insert0(subroot, k.shift(), v, only_if_absent, old_v, insert_info,
875               mk, ret, subparents, sub_locked_nodes);
876
877         switch (status) {
878         case I_NONE_NOMOD:
879         case I_NONE_MOD:
880         case I_RETRY:
881           INVARIANT(sub_locked_nodes.empty());
882           return status;
883
884         case I_SPLIT:
885           // the subroot split, so we need to find the leaf again, lock the
886           // node, and create a new internal node
887
888           INVARIANT(ret);
889           INVARIANT(ret->key_slots_used() > 0);
890
891           for (;;) {
892             resp_leaf = FindRespLeafLowerBound(
893                 resp_leaf, kslice, kslicelen, version, n, lenmatch, lenlowerbound);
894             const uint64_t locked_version = resp_leaf->lock();
895             if (likely(btree::CheckVersion(version, locked_version))) {
896               locked_nodes.push_back(resp_leaf);
897               break;
898             }
899             resp_leaf->unlock();
900           }
901
902           INVARIANT(lenmatch != -1);
903           INVARIANT(resp_leaf->is_layer(lenmatch));
904           subroot = resp_leaf->values_[lenmatch].n_;
905           INVARIANT(subroot->is_modifying());
906           INVARIANT(subroot->is_lock_owner());
907           INVARIANT(subroot->is_root());
908
909           internal_node *new_root = internal_node::alloc();
910 #ifdef CHECK_INVARIANTS
911           new_root->lock();
912           new_root->mark_modifying();
913           locked_nodes.push_back(new_root);
914 #endif /* CHECK_INVARIANTS */
915           new_root->children_[0] = subroot;
916           new_root->children_[1] = ret;
917           new_root->keys_[0] = mk;
918           new_root->set_key_slots_used(1);
919           new_root->set_root();
920           subroot->clear_root();
921           resp_leaf->values_[lenmatch].n_ = new_root;
922
923           // locks are still held here
924           UnlockNodes(sub_locked_nodes);
925           return UnlockAndReturn(locked_nodes, I_NONE_MOD);
926         }
927         ALWAYS_ASSERT(false);
928
929       } else {
930         const uint64_t locked_version = resp_leaf->lock();
931         if (unlikely(!btree::CheckVersion(version, locked_version))) {
932           resp_leaf->unlock();
933           goto retry_cur_leaf;
934         }
935         locked_nodes.push_back(resp_leaf);
936
937         INVARIANT(resp_leaf->suffixes_); // b/c lenmatch != -1 and this is not a layer
938         // need to create a new btree layer, and add both existing key and
939         // new key to it
940
941         // XXX: need to mark modifying because we cannot change both the
942         // value and the type atomically
943         resp_leaf->mark_modifying();
944
945         leaf_node *new_root = leaf_node::alloc();
946 #ifdef CHECK_INVARIANTS
947         new_root->lock();
948         new_root->mark_modifying();
949 #endif /* CHECK_INVARIANTS */
950         new_root->set_root();
951         varkey old_slice(resp_leaf->suffix(lenmatch));
952         new_root->keys_[0] = old_slice.slice();
953         new_root->values_[0] = resp_leaf->values_[lenmatch];
954         new_root->keyslice_set_length(0, std::min(old_slice.size(), size_t(9)), false);
955         new_root->inc_key_slots_used();
956         if (new_root->keyslice_length(0) == 9) {
957           new_root->alloc_suffixes();
958           rcu_imstring i(old_slice.data() + 8, old_slice.size() - 8);
959           new_root->suffixes_[0].swap(i);
960         }
961         resp_leaf->values_[lenmatch].n_ = new_root;
962         {
963           rcu_imstring i;
964           resp_leaf->suffixes_[lenmatch].swap(i);
965         }
966         resp_leaf->value_set_layer(lenmatch);
967 #ifdef CHECK_INVARIANTS
968         new_root->unlock();
969 #endif /* CHECK_INVARIANTS */
970
971         key_slice mk;
972         node *ret;
973         typename util::vec<insert_parent_entry>::type subparents;
974         typename util::vec<node *>::type sub_locked_nodes;
975         const insert_status status =
976           insert0(new_root, k.shift(), v, only_if_absent, old_v, insert_info,
977               mk, ret, subparents, sub_locked_nodes);
978         if (status != I_NONE_MOD)
979           INVARIANT(false);
980         INVARIANT(sub_locked_nodes.empty());
981         return UnlockAndReturn(locked_nodes, I_NONE_MOD);
982       }
983     }
984
985     // lenlowerbound + 1 is the slot (0-based index) we want the new key to go
986     // into, in the leaf node
987     if (n < NKeysPerNode) {
988       const uint64_t locked_version = resp_leaf->lock();
989       if (unlikely(!btree::CheckVersion(version, locked_version))) {
990         resp_leaf->unlock();
991         goto retry_cur_leaf;
992       }
993       locked_nodes.push_back(resp_leaf);
994
995       // also easy case- we only need to make local modifications
996       resp_leaf->mark_modifying();
997
998       sift_right(resp_leaf->keys_, lenlowerbound + 1, n);
999       resp_leaf->keys_[lenlowerbound + 1] = kslice;
1000       sift_right(resp_leaf->values_, lenlowerbound + 1, n);
1001       resp_leaf->values_[lenlowerbound + 1].v_ = v;
1002       sift_right(resp_leaf->lengths_, lenlowerbound + 1, n);
1003       resp_leaf->keyslice_set_length(lenlowerbound + 1, kslicelen, false);
1004       if (resp_leaf->suffixes_)
1005         sift_swap_right(resp_leaf->suffixes_, lenlowerbound + 1, n);
1006       if (kslicelen == 9) {
1007         resp_leaf->ensure_suffixes();
1008         rcu_imstring i(k.data() + 8, k.size() - 8);
1009         resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
1010       } else if (resp_leaf->suffixes_) {
1011         rcu_imstring i;
1012         resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
1013       }
1014       resp_leaf->inc_key_slots_used();
1015
1016 //#ifdef CHECK_INVARIANTS
1017 //      resp_leaf->base_invariant_unique_keys_check();
1018 //#endif
1019       if (insert_info) {
1020         insert_info->node = resp_leaf;
1021         insert_info->old_version = RawVersionManip::Version(resp_leaf->unstable_version()); // we hold lock on leaf
1022         insert_info->new_version = insert_info->old_version + 1;
1023       }
1024       return UnlockAndReturn(locked_nodes, I_NONE_MOD);
1025     } else {
1026       INVARIANT(n == NKeysPerNode);
1027
1028       if (unlikely(resp_leaf != leaf))
1029         // sigh, we really do need parent points- if resp_leaf != leaf, then
1030         // all the parent points we saved on the way down are no longer valid
1031         return UnlockAndReturn(locked_nodes, I_RETRY);
1032       const uint64_t locked_version = resp_leaf->lock();
1033       if (unlikely(!btree::CheckVersion(version, locked_version))) {
1034         resp_leaf->unlock();
1035         goto retry_cur_leaf;
1036       }
1037       locked_nodes.push_back(resp_leaf);
1038
1039       // we need to split the current node, potentially causing a bunch of
1040       // splits to happen in ancestors. to make this safe w/o
1041       // using a very complicated locking protocol, we will first acquire all
1042       // locks on nodes which will be modified, in left-to-right,
1043       // bottom-to-top order
1044
1045       if (parents.empty()) {
1046         if (unlikely(!resp_leaf->is_root()))
1047           return UnlockAndReturn(locked_nodes, I_RETRY);
1048         //INVARIANT(resp_leaf == root);
1049       } else {
1050         for (auto rit = parents.rbegin(); rit != parents.rend(); ++rit) {
1051           // lock the parent
1052           node *p = rit->first;
1053           p->lock();
1054           locked_nodes.push_back(p);
1055           if (unlikely(!p->check_version(rit->second)))
1056             // in traversing down the tree, an ancestor of this node was
1057             // modified- to be safe, we start over
1058             return UnlockAndReturn(locked_nodes, I_RETRY);
1059           if ((rit + 1) == parents.rend()) {
1060             // did the root change?
1061             if (unlikely(!p->is_root()))
1062               return UnlockAndReturn(locked_nodes, I_RETRY);
1063             //INVARIANT(p == root);
1064           }
1065
1066           // since the child needs a split, see if we have room in the parent-
1067           // if we don't have room, we'll also need to split the parent, in which
1068           // case we must grab its parent's lock
1069           INVARIANT(p->is_internal_node());
1070           size_t parent_n = p->key_slots_used();
1071           INVARIANT(parent_n > 0 && parent_n <= NKeysPerNode);
1072           if (parent_n < NKeysPerNode)
1073             // can stop locking up now, since this node won't split
1074             break;
1075         }
1076       }
1077
1078       // at this point, we have locked all nodes which will be split/modified
1079       // modulo the new nodes to be created
1080       resp_leaf->mark_modifying();
1081
1082       leaf_node *new_leaf = leaf_node::alloc();
1083       new_leaf->prefetch();
1084
1085 #ifdef CHECK_INVARIANTS
1086       new_leaf->lock();
1087       new_leaf->mark_modifying();
1088       locked_nodes.push_back(new_leaf);
1089 #endif /* CHECK_INVARIANTS */
1090
1091       if (!resp_leaf->next_ && resp_leaf->keys_[n - 1] < kslice) {
1092         // sequential insert optimization- in this case, we don't bother
1093         // splitting the node. instead, keep the current leaf node full, and
1094         // insert the new key into the new leaf node (violating the btree invariant)
1095         //
1096         // this optimization is commonly implemented, including in masstree and
1097         // berkeley db- w/o this optimization, sequential inserts leave the all
1098         // nodes half full
1099
1100         new_leaf->keys_[0] = kslice;
1101         new_leaf->values_[0].v_ = v;
1102         new_leaf->keyslice_set_length(0, kslicelen, false);
1103         if (kslicelen == 9) {
1104           new_leaf->alloc_suffixes();
1105           rcu_imstring i(k.data() + 8, k.size() - 8);
1106           new_leaf->suffixes_[0].swap(i);
1107         }
1108         new_leaf->set_key_slots_used(1);
1109
1110       } else {
1111         // regular case
1112
1113         // compute how many keys the smaller node would have if we put the
1114         // new key in the left (old) or right (new) side of the split.
1115         // then choose the split which maximizes the mininum.
1116         //
1117         // XXX: do this in a more elegant way
1118
1119         // a split point S is a number such that indices [0, s) go into the left
1120         // partition, and indices [s, N) go into the right partition
1121         size_t left_split_point, right_split_point;
1122
1123         left_split_point = NKeysPerNode / 2;
1124         for (ssize_t i = left_split_point - 1; i >= 0; i--) {
1125           if (likely(resp_leaf->keys_[i] != resp_leaf->keys_[left_split_point]))
1126             break;
1127           left_split_point--;
1128         }
1129         INVARIANT(left_split_point <= NKeysPerNode);
1130         INVARIANT(left_split_point == 0 || resp_leaf->keys_[left_split_point - 1] != resp_leaf->keys_[left_split_point]);
1131
1132         right_split_point = NKeysPerNode / 2;
1133         for (ssize_t i = right_split_point - 1; i >= 0 && i < ssize_t(NKeysPerNode) - 1; i++) {
1134           if (likely(resp_leaf->keys_[i] != resp_leaf->keys_[right_split_point]))
1135             break;
1136           right_split_point++;
1137         }
1138         INVARIANT(right_split_point <= NKeysPerNode);
1139         INVARIANT(right_split_point == 0 || resp_leaf->keys_[right_split_point - 1] != resp_leaf->keys_[right_split_point]);
1140
1141         size_t split_point;
1142         if (std::min(left_split_point, NKeysPerNode - left_split_point) <
1143             std::min(right_split_point, NKeysPerNode - right_split_point))
1144           split_point = right_split_point;
1145         else
1146           split_point = left_split_point;
1147
1148         if (split_point <= size_t(lenlowerbound + 1) && resp_leaf->keys_[split_point - 1] != kslice) {
1149           // put new key in new leaf (right)
1150           size_t pos = lenlowerbound + 1 - split_point;
1151
1152           copy_into(&new_leaf->keys_[0], resp_leaf->keys_, split_point, lenlowerbound + 1);
1153           new_leaf->keys_[pos] = kslice;
1154           copy_into(&new_leaf->keys_[pos + 1], resp_leaf->keys_, lenlowerbound + 1, NKeysPerNode);
1155
1156           copy_into(&new_leaf->values_[0], resp_leaf->values_, split_point, lenlowerbound + 1);
1157           new_leaf->values_[pos].v_ = v;
1158           copy_into(&new_leaf->values_[pos + 1], resp_leaf->values_, lenlowerbound + 1, NKeysPerNode);
1159
1160           copy_into(&new_leaf->lengths_[0], resp_leaf->lengths_, split_point, lenlowerbound + 1);
1161           new_leaf->keyslice_set_length(pos, kslicelen, false);
1162           copy_into(&new_leaf->lengths_[pos + 1], resp_leaf->lengths_, lenlowerbound + 1, NKeysPerNode);
1163
1164           if (resp_leaf->suffixes_) {
1165             new_leaf->ensure_suffixes();
1166             swap_with(&new_leaf->suffixes_[0], resp_leaf->suffixes_, split_point, lenlowerbound + 1);
1167           }
1168           if (kslicelen == 9) {
1169             new_leaf->ensure_suffixes();
1170             rcu_imstring i(k.data() + 8, k.size() - 8);
1171             new_leaf->suffixes_[pos].swap(i);
1172           } else if (new_leaf->suffixes_) {
1173             rcu_imstring i;
1174             new_leaf->suffixes_[pos].swap(i);
1175           }
1176           if (resp_leaf->suffixes_) {
1177             new_leaf->ensure_suffixes();
1178             swap_with(&new_leaf->suffixes_[pos + 1], resp_leaf->suffixes_, lenlowerbound + 1, NKeysPerNode);
1179           }
1180
1181           resp_leaf->set_key_slots_used(split_point);
1182           new_leaf->set_key_slots_used(NKeysPerNode - split_point + 1);
1183
1184 #ifdef CHECK_INVARIANTS
1185           resp_leaf->base_invariant_unique_keys_check();
1186           new_leaf->base_invariant_unique_keys_check();
1187           INVARIANT(resp_leaf->keys_[split_point - 1] < new_leaf->keys_[0]);
1188 #endif /* CHECK_INVARIANTS */
1189
1190         } else {
1191           // XXX: not really sure if this invariant is true, but we rely
1192           // on it for now
1193           INVARIANT(size_t(lenlowerbound + 1) <= split_point);
1194
1195           // put new key in original leaf
1196           copy_into(&new_leaf->keys_[0], resp_leaf->keys_, split_point, NKeysPerNode);
1197           copy_into(&new_leaf->values_[0], resp_leaf->values_, split_point, NKeysPerNode);
1198           copy_into(&new_leaf->lengths_[0], resp_leaf->lengths_, split_point, NKeysPerNode);
1199           if (resp_leaf->suffixes_) {
1200             new_leaf->ensure_suffixes();
1201             swap_with(&new_leaf->suffixes_[0], resp_leaf->suffixes_, split_point, NKeysPerNode);
1202           }
1203
1204           sift_right(resp_leaf->keys_, lenlowerbound + 1, split_point);
1205           resp_leaf->keys_[lenlowerbound + 1] = kslice;
1206           sift_right(resp_leaf->values_, lenlowerbound + 1, split_point);
1207           resp_leaf->values_[lenlowerbound + 1].v_ = v;
1208           sift_right(resp_leaf->lengths_, lenlowerbound + 1, split_point);
1209           resp_leaf->keyslice_set_length(lenlowerbound + 1, kslicelen, false);
1210           if (resp_leaf->suffixes_)
1211             sift_swap_right(resp_leaf->suffixes_, lenlowerbound + 1, split_point);
1212           if (kslicelen == 9) {
1213             resp_leaf->ensure_suffixes();
1214             rcu_imstring i(k.data() + 8, k.size() - 8);
1215             resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
1216           } else if (resp_leaf->suffixes_) {
1217             rcu_imstring i;
1218             resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
1219           }
1220
1221           resp_leaf->set_key_slots_used(split_point + 1);
1222           new_leaf->set_key_slots_used(NKeysPerNode - split_point);
1223
1224 #ifdef CHECK_INVARIANTS
1225           resp_leaf->base_invariant_unique_keys_check();
1226           new_leaf->base_invariant_unique_keys_check();
1227           INVARIANT(resp_leaf->keys_[split_point] < new_leaf->keys_[0]);
1228 #endif /* CHECK_INVARIANTS */
1229         }
1230       }
1231
1232       // pointer adjustment
1233       new_leaf->prev_ = resp_leaf;
1234       new_leaf->next_ = resp_leaf->next_;
1235       if (resp_leaf->next_)
1236         resp_leaf->next_->prev_ = new_leaf;
1237       resp_leaf->next_ = new_leaf;
1238
1239       min_key = new_leaf->keys_[0];
1240       new_leaf->min_key_ = min_key;
1241       new_node = new_leaf;
1242
1243       if (insert_info) {
1244         insert_info->node = resp_leaf;
1245         insert_info->old_version = RawVersionManip::Version(resp_leaf->unstable_version()); // we hold lock on leaf
1246         insert_info->new_version = insert_info->old_version + 1;
1247       }
1248
1249       return I_SPLIT;
1250     }
1251   } else {
1252     internal_node *internal = AsInternal(np);
1253     uint64_t version = internal->stable_version();
1254     if (unlikely(RawVersionManip::IsDeleting(version)))
1255       return UnlockAndReturn(locked_nodes, I_RETRY);
1256     key_search_ret kret = internal->key_lower_bound_search(kslice);
1257     ssize_t ret = kret.first;
1258     size_t n = kret.second;
1259     size_t child_idx = (ret == -1) ? 0 : ret + 1;
1260     node *child_ptr = internal->children_[child_idx];
1261     if (unlikely(!internal->check_version(version)))
1262       return UnlockAndReturn(locked_nodes, I_RETRY);
1263     parents.push_back(insert_parent_entry(internal, version));
1264     key_slice mk = 0;
1265     node *new_child = NULL;
1266     insert_status status =
1267       insert0(child_ptr, k, v, only_if_absent, old_v, insert_info,
1268               mk, new_child, parents, locked_nodes);
1269     if (status != I_SPLIT) {
1270       INVARIANT(locked_nodes.empty());
1271       return status;
1272     }
1273     INVARIANT(new_child);
1274     INVARIANT(internal->is_locked()); // previous call to insert0() must lock internal node for insertion
1275     INVARIANT(internal->is_lock_owner());
1276     INVARIANT(internal->check_version(version));
1277     INVARIANT(new_child->key_slots_used() > 0);
1278     INVARIANT(n > 0);
1279     internal->mark_modifying();
1280     if (n < NKeysPerNode) {
1281       sift_right(internal->keys_, child_idx, n);
1282       internal->keys_[child_idx] = mk;
1283       sift_right(internal->children_, child_idx + 1, n + 1);
1284       internal->children_[child_idx + 1] = new_child;
1285       internal->inc_key_slots_used();
1286       return UnlockAndReturn(locked_nodes, I_NONE_MOD);
1287     } else {
1288       INVARIANT(n == NKeysPerNode);
1289       INVARIANT(ret == internal->key_lower_bound_search(mk).first);
1290
1291       internal_node *new_internal = internal_node::alloc();
1292       new_internal->prefetch();
1293 #ifdef CHECK_INVARIANTS
1294       new_internal->lock();
1295       new_internal->mark_modifying();
1296       locked_nodes.push_back(new_internal);
1297 #endif /* CHECK_INVARIANTS */
1298
1299       // there are three cases post-split:
1300       // (1) mk goes in the original node
1301       // (2) mk is the key we push up
1302       // (3) mk goes in the new node
1303
1304       const ssize_t split_point = NMinKeysPerNode - 1;
1305       if (ret < split_point) {
1306         // case (1)
1307         min_key = internal->keys_[split_point];
1308
1309         copy_into(&new_internal->keys_[0], internal->keys_, NMinKeysPerNode, NKeysPerNode);
1310         copy_into(&new_internal->children_[0], internal->children_, NMinKeysPerNode, NKeysPerNode + 1);
1311         new_internal->set_key_slots_used(NKeysPerNode - NMinKeysPerNode);
1312
1313         sift_right(internal->keys_, child_idx, NMinKeysPerNode - 1);
1314         internal->keys_[child_idx] = mk;
1315         sift_right(internal->children_, child_idx + 1, NMinKeysPerNode);
1316         internal->children_[child_idx + 1] = new_child;
1317         internal->set_key_slots_used(NMinKeysPerNode);
1318
1319       } else if (ret == split_point) {
1320         // case (2)
1321         min_key = mk;
1322
1323         copy_into(&new_internal->keys_[0], internal->keys_, NMinKeysPerNode, NKeysPerNode);
1324         copy_into(&new_internal->children_[1], internal->children_, NMinKeysPerNode + 1, NKeysPerNode + 1);
1325         new_internal->children_[0] = new_child;
1326         new_internal->set_key_slots_used(NKeysPerNode - NMinKeysPerNode);
1327         internal->set_key_slots_used(NMinKeysPerNode);
1328
1329       } else {
1330         // case (3)
1331         min_key = internal->keys_[NMinKeysPerNode];
1332
1333         size_t pos = child_idx - NMinKeysPerNode - 1;
1334
1335         copy_into(&new_internal->keys_[0], internal->keys_, NMinKeysPerNode + 1, child_idx);
1336         new_internal->keys_[pos] = mk;
1337         copy_into(&new_internal->keys_[pos + 1], internal->keys_, child_idx, NKeysPerNode);
1338
1339         copy_into(&new_internal->children_[0], internal->children_, NMinKeysPerNode + 1, child_idx + 1);
1340         new_internal->children_[pos + 1] = new_child;
1341         copy_into(&new_internal->children_[pos + 2], internal->children_, child_idx + 1, NKeysPerNode + 1);
1342
1343         new_internal->set_key_slots_used(NKeysPerNode - NMinKeysPerNode);
1344         internal->set_key_slots_used(NMinKeysPerNode);
1345       }
1346
1347       INVARIANT(internal->keys_[internal->key_slots_used() - 1] < new_internal->keys_[0]);
1348       new_node = new_internal;
1349       return I_SPLIT;
1350     }
1351   }
1352 }
1353
1354 template <typename P>
1355 bool
1356 btree<P>::insert_stable_location(
1357     node **root_location, const key_type &k, value_type v,
1358     bool only_if_absent, value_type *old_v,
1359     insert_info_t *insert_info)
1360 {
1361   INVARIANT(rcu::s_instance.in_rcu_region());
1362 retry:
1363   key_slice mk;
1364   node *ret;
1365   typename util::vec<insert_parent_entry>::type parents;
1366   typename util::vec<node *>::type locked_nodes;
1367   node *local_root = *root_location;
1368   const insert_status status =
1369     insert0(local_root, k, v, only_if_absent, old_v, insert_info,
1370             mk, ret, parents, locked_nodes);
1371   INVARIANT(status == I_SPLIT || locked_nodes.empty());
1372   switch (status) {
1373   case I_NONE_NOMOD:
1374     return false;
1375   case I_NONE_MOD:
1376     return true;
1377   case I_RETRY:
1378     goto retry;
1379   case I_SPLIT:
1380     INVARIANT(ret);
1381     INVARIANT(ret->key_slots_used() > 0);
1382     INVARIANT(local_root->is_modifying());
1383     INVARIANT(local_root->is_lock_owner());
1384     INVARIANT(local_root->is_root());
1385     INVARIANT(local_root == *root_location);
1386     internal_node *new_root = internal_node::alloc();
1387 #ifdef CHECK_INVARIANTS
1388     new_root->lock();
1389     new_root->mark_modifying();
1390     locked_nodes.push_back(new_root);
1391 #endif /* CHECK_INVARIANTS */
1392     new_root->children_[0] = local_root;
1393     new_root->children_[1] = ret;
1394     new_root->keys_[0] = mk;
1395     new_root->set_key_slots_used(1);
1396     new_root->set_root();
1397     local_root->clear_root();
1398     COMPILER_MEMORY_FENCE;
1399     *root_location = new_root;
1400     // locks are still held here
1401     return UnlockAndReturn(locked_nodes, true);
1402   }
1403   ALWAYS_ASSERT(false);
1404   return false;
1405 }
1406
1407 /**
1408  * remove is very tricky to get right!
1409  *
1410  * XXX: optimize remove so it holds less locks
1411  */
1412 template <typename P>
1413 typename btree<P>::remove_status
1414 btree<P>::remove0(node *np,
1415                   key_slice *min_key,
1416                   key_slice *max_key,
1417                   const key_type &k,
1418                   value_type *old_v,
1419                   node *left_node,
1420                   node *right_node,
1421                   key_slice &new_key,
1422                   node *&replace_node,
1423                   typename util::vec<remove_parent_entry>::type &parents,
1424                   typename util::vec<node *>::type &locked_nodes)
1425 {
1426   uint64_t kslice = k.slice();
1427   size_t kslicelen = std::min(k.size(), size_t(9));
1428
1429   np->prefetch();
1430   if (leaf_node *leaf = AsLeafCheck(np)) {
1431     INVARIANT(locked_nodes.empty());
1432
1433     SINGLE_THREADED_INVARIANT(!left_node || (leaf->prev_ == left_node && AsLeaf(left_node)->next == leaf));
1434     SINGLE_THREADED_INVARIANT(!right_node || (leaf->next_ == right_node && AsLeaf(right_node)->prev == leaf));
1435
1436 retry_cur_leaf:
1437     uint64_t version;
1438     size_t n;
1439     ssize_t ret;
1440     leaf_node *resp_leaf = FindRespLeafExact(
1441         leaf, kslice, kslicelen, version, n, ret);
1442
1443     if (ret == -1) {
1444       if (unlikely(!resp_leaf->check_version(version)))
1445         goto retry_cur_leaf;
1446       return UnlockAndReturn(locked_nodes, R_NONE_NOMOD);
1447     }
1448     if (kslicelen == 9) {
1449       if (resp_leaf->is_layer(ret)) {
1450         node *subroot = resp_leaf->values_[ret].n_;
1451         INVARIANT(subroot);
1452         if (unlikely(!resp_leaf->check_version(version)))
1453           goto retry_cur_leaf;
1454
1455         key_slice new_key;
1456         node *replace_node = NULL;
1457         typename util::vec<remove_parent_entry>::type sub_parents;
1458         typename util::vec<node *>::type sub_locked_nodes;
1459         remove_status status = remove0(subroot,
1460             NULL, /* min_key */
1461             NULL, /* max_key */
1462             k.shift(),
1463             old_v,
1464             NULL, /* left_node */
1465             NULL, /* right_node */
1466             new_key,
1467             replace_node,
1468             sub_parents,
1469             sub_locked_nodes);
1470         switch (status) {
1471         case R_NONE_NOMOD:
1472         case R_NONE_MOD:
1473         case R_RETRY:
1474           INVARIANT(sub_locked_nodes.empty());
1475           return status;
1476
1477         case R_REPLACE_NODE:
1478           INVARIANT(replace_node);
1479           for (;;) {
1480             resp_leaf = FindRespLeafExact(
1481                 resp_leaf, kslice, kslicelen, version, n, ret);
1482             const uint64_t locked_version = resp_leaf->lock();
1483             if (likely(btree::CheckVersion(version, locked_version))) {
1484               locked_nodes.push_back(resp_leaf);
1485               break;
1486             }
1487             resp_leaf->unlock();
1488           }
1489
1490           INVARIANT(subroot->is_deleting());
1491           INVARIANT(subroot->is_lock_owner());
1492           INVARIANT(subroot->is_root());
1493           replace_node->set_root();
1494           subroot->clear_root();
1495           resp_leaf->values_[ret].n_ = replace_node;
1496
1497           // XXX: need to re-merge back when layer size becomes 1, but skip this
1498           // for now
1499
1500           // locks are still held here
1501           UnlockNodes(sub_locked_nodes);
1502           return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1503
1504         default:
1505           break;
1506         }
1507         ALWAYS_ASSERT(false);
1508
1509       } else {
1510         // suffix check
1511         if (resp_leaf->suffix(ret) != k.shift()) {
1512           if (unlikely(!resp_leaf->check_version(version)))
1513             goto retry_cur_leaf;
1514           return UnlockAndReturn(locked_nodes, R_NONE_NOMOD);
1515         }
1516       }
1517     }
1518
1519     //INVARIANT(!resp_leaf->is_layer(ret));
1520     if (n > NMinKeysPerNode) {
1521       const uint64_t locked_version = resp_leaf->lock();
1522       if (unlikely(!btree::CheckVersion(version, locked_version))) {
1523         resp_leaf->unlock();
1524         goto retry_cur_leaf;
1525       }
1526       locked_nodes.push_back(resp_leaf);
1527       if (old_v)
1528         *old_v = resp_leaf->values_[ret].v_;
1529       resp_leaf->mark_modifying();
1530       remove_pos_from_leaf_node(resp_leaf, ret, n);
1531       return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1532     } else {
1533
1534       if (unlikely(resp_leaf != leaf))
1535         return UnlockAndReturn(locked_nodes, R_RETRY);
1536       const uint64_t locked_version = leaf->lock();
1537       if (unlikely(!btree::CheckVersion(version, locked_version))) {
1538         leaf->unlock();
1539         goto retry_cur_leaf;
1540       }
1541       locked_nodes.push_back(leaf);
1542       if (old_v)
1543         *old_v = leaf->values_[ret].v_;
1544
1545       uint64_t leaf_version = leaf->unstable_version();
1546
1547       leaf_node *left_sibling = AsLeaf(left_node);
1548       leaf_node *right_sibling = AsLeaf(right_node);
1549
1550       // NOTE: remember that our locking discipline is left-to-right,
1551       // bottom-to-top. Here, we must acquire all locks on nodes being
1552       // modified in the tree. How we choose to handle removes is in the
1553       // following preference:
1554       //   1) steal from right node
1555       //   2) merge with right node
1556       //   3) steal from left node
1557       //   4) merge with left node
1558       //
1559       // Btree invariants guarantee that at least one of the options above is
1560       // available (except for the root node). We pick the right node first,
1561       // because our locking discipline allows us to directly lock the right
1562       // node.  If (1) and (2) cannot be satisfied, then we must first
1563       // *unlock* the current node, lock the left node, relock the current
1564       // node, and check nothing changed in between.
1565
1566       if (right_sibling) {
1567         right_sibling->lock();
1568         locked_nodes.push_back(right_sibling);
1569       } else if (left_sibling) {
1570         leaf->unlock();
1571         left_sibling->lock();
1572         locked_nodes.push_back(left_sibling);
1573         leaf->lock();
1574         if (unlikely(!leaf->check_version(leaf_version)))
1575           return UnlockAndReturn(locked_nodes, R_RETRY);
1576       } else {
1577         INVARIANT(parents.empty());
1578         if (unlikely(!leaf->is_root()))
1579           return UnlockAndReturn(locked_nodes, R_RETRY);
1580         //INVARIANT(leaf == root);
1581       }
1582
1583       for (typename util::vec<remove_parent_entry>::type::reverse_iterator rit = parents.rbegin();
1584            rit != parents.rend(); ++rit) {
1585         node *p = rit->parent_;
1586         node *l = rit->parent_left_sibling_;
1587         node *r = rit->parent_right_sibling_;
1588         uint64_t p_version = rit->parent_version_;
1589         p->lock();
1590         locked_nodes.push_back(p);
1591         if (unlikely(!p->check_version(p_version)))
1592           return UnlockAndReturn(locked_nodes, R_RETRY);
1593         size_t p_n = p->key_slots_used();
1594         if (p_n > NMinKeysPerNode)
1595           break;
1596         if (r) {
1597           r->lock();
1598           locked_nodes.push_back(r);
1599         } else if (l) {
1600           p->unlock();
1601           l->lock();
1602           locked_nodes.push_back(l);
1603           p->lock();
1604           if (unlikely(!p->check_version(p_version)))
1605             return UnlockAndReturn(locked_nodes, R_RETRY);
1606         } else {
1607           if (unlikely(!p->is_root()))
1608             return UnlockAndReturn(locked_nodes, R_RETRY);
1609           //INVARIANT(p == root);
1610         }
1611       }
1612
1613       leaf->mark_modifying();
1614
1615       if (right_sibling) {
1616         right_sibling->mark_modifying();
1617         size_t right_n = right_sibling->key_slots_used();
1618         if (right_n > NMinKeysPerNode) {
1619           // steal first contiguous key slices from right
1620           INVARIANT(right_sibling->keys_[0] > leaf->keys_[n - 1]);
1621
1622           // indices [0, steal_point) will be taken from the right
1623           size_t steal_point = 1;
1624           for (size_t i = 0; i < right_n - 1; i++, steal_point++)
1625             if (likely(right_sibling->keys_[i] != right_sibling->keys_[steal_point]))
1626               break;
1627
1628           INVARIANT(steal_point <= sizeof(key_slice) + 2);
1629           INVARIANT(steal_point <= right_n);
1630
1631           // to steal, we need to ensure:
1632           // 1) we have enough room to steal
1633           // 2) the right sibling will not be empty after the steal
1634           if ((n - 1 + steal_point) <= NKeysPerNode && steal_point < right_n) {
1635             sift_left(leaf->keys_, ret, n);
1636             copy_into(&leaf->keys_[n - 1], right_sibling->keys_, 0, steal_point);
1637             sift_left(leaf->values_, ret, n);
1638             copy_into(&leaf->values_[n - 1], right_sibling->values_, 0, steal_point);
1639             sift_left(leaf->lengths_, ret, n);
1640             copy_into(&leaf->lengths_[n - 1], right_sibling->lengths_, 0, steal_point);
1641             if (leaf->suffixes_)
1642               sift_swap_left(leaf->suffixes_, ret, n);
1643             if (right_sibling->suffixes_) {
1644               leaf->ensure_suffixes();
1645               swap_with(&leaf->suffixes_[n - 1], right_sibling->suffixes_, 0, steal_point);
1646             }
1647
1648             sift_left(right_sibling->keys_, 0, right_n, steal_point);
1649             sift_left(right_sibling->values_, 0, right_n, steal_point);
1650             sift_left(right_sibling->lengths_, 0, right_n, steal_point);
1651             if (right_sibling->suffixes_)
1652               sift_swap_left(right_sibling->suffixes_, 0, right_n, steal_point);
1653             leaf->set_key_slots_used(n - 1 + steal_point);
1654             right_sibling->set_key_slots_used(right_n - steal_point);
1655             new_key = right_sibling->keys_[0];
1656             right_sibling->min_key_ = new_key;
1657
1658 #ifdef CHECK_INVARIANTS
1659             leaf->base_invariant_unique_keys_check();
1660             right_sibling->base_invariant_unique_keys_check();
1661             INVARIANT(leaf->keys_[n - 1 + steal_point - 1] < new_key);
1662 #endif /* CHECK_INVARIANTS */
1663
1664             return R_STOLE_FROM_RIGHT;
1665           } else {
1666             // can't steal, so try merging- but only merge if we have room,
1667             // otherwise just allow this node to have less elements
1668             if ((n - 1 + right_n) > NKeysPerNode) {
1669               INVARIANT(n > 1); // if we can't steal or merge, we must have
1670                                 // enough elements to just remove one w/o going empty
1671               remove_pos_from_leaf_node(leaf, ret, n);
1672               return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1673             }
1674           }
1675         }
1676
1677         // merge right sibling into this node
1678         INVARIANT(right_sibling->keys_[0] > leaf->keys_[n - 1]);
1679         INVARIANT((right_n + (n - 1)) <= NKeysPerNode);
1680
1681         sift_left(leaf->keys_, ret, n);
1682         copy_into(&leaf->keys_[n - 1], right_sibling->keys_, 0, right_n);
1683
1684         sift_left(leaf->values_, ret, n);
1685         copy_into(&leaf->values_[n - 1], right_sibling->values_, 0, right_n);
1686
1687         sift_left(leaf->lengths_, ret, n);
1688         copy_into(&leaf->lengths_[n - 1], right_sibling->lengths_, 0, right_n);
1689
1690         if (leaf->suffixes_)
1691           sift_swap_left(leaf->suffixes_, ret, n);
1692
1693         if (right_sibling->suffixes_) {
1694           leaf->ensure_suffixes();
1695           swap_with(&leaf->suffixes_[n - 1], right_sibling->suffixes_, 0, right_n);
1696         }
1697
1698         leaf->set_key_slots_used(right_n + (n - 1));
1699         leaf->next_ = right_sibling->next_;
1700         if (right_sibling->next_)
1701           right_sibling->next_->prev_ = leaf;
1702
1703         // leaf->next_->prev won't change because we hold lock for both leaf
1704         // and right_sibling
1705         INVARIANT(!leaf->next_ || leaf->next_->prev_ == leaf);
1706
1707         // leaf->prev_->next might change, however, since the left node could be
1708         // splitting (and we might hold a pointer to the left-split of the left node,
1709         // before it gets updated)
1710         SINGLE_THREADED_INVARIANT(!leaf->prev_ || leaf->prev_->next_ == leaf);
1711
1712 //#ifdef CHECK_INVARIANTS
1713 //        leaf->base_invariant_unique_keys_check();
1714 //#endif
1715         leaf_node::release(right_sibling);
1716         return R_MERGE_WITH_RIGHT;
1717       }
1718
1719       if (left_sibling) {
1720         left_sibling->mark_modifying();
1721         size_t left_n = left_sibling->key_slots_used();
1722         if (left_n > NMinKeysPerNode) {
1723           // try to steal from left
1724           INVARIANT(left_sibling->keys_[left_n - 1] < leaf->keys_[0]);
1725
1726           // indices [steal_point, left_n) will be taken from the left
1727           size_t steal_point = left_n - 1;
1728           for (ssize_t i = steal_point - 1; i >= 0; i--, steal_point--)
1729             if (likely(left_sibling->keys_[i] != left_sibling->keys_[steal_point]))
1730               break;
1731
1732           size_t nstolen = left_n - steal_point;
1733           INVARIANT(nstolen <= sizeof(key_slice) + 2);
1734           INVARIANT(steal_point < left_n);
1735
1736           if ((n - 1 + nstolen) <= NKeysPerNode && steal_point > 0) {
1737             sift_right(leaf->keys_, ret + 1, n, nstolen - 1);
1738             sift_right(leaf->keys_, 0, ret, nstolen);
1739             copy_into(&leaf->keys_[0], &left_sibling->keys_[0], left_n - nstolen, left_n);
1740
1741             sift_right(leaf->values_, ret + 1, n, nstolen - 1);
1742             sift_right(leaf->values_, 0, ret, nstolen);
1743             copy_into(&leaf->values_[0], &left_sibling->values_[0], left_n - nstolen, left_n);
1744
1745             sift_right(leaf->lengths_, ret + 1, n, nstolen - 1);
1746             sift_right(leaf->lengths_, 0, ret, nstolen);
1747             copy_into(&leaf->lengths_[0], &left_sibling->lengths_[0], left_n - nstolen, left_n);
1748
1749             if (leaf->suffixes_) {
1750               sift_swap_right(leaf->suffixes_, ret + 1, n, nstolen - 1);
1751               sift_swap_right(leaf->suffixes_, 0, ret, nstolen);
1752             }
1753             if (left_sibling->suffixes_) {
1754               leaf->ensure_suffixes();
1755               swap_with(&leaf->suffixes_[0], &left_sibling->suffixes_[0], left_n - nstolen, left_n);
1756             }
1757
1758             left_sibling->set_key_slots_used(left_n - nstolen);
1759             leaf->set_key_slots_used(n - 1 + nstolen);
1760             new_key = leaf->keys_[0];
1761             leaf->min_key_ = new_key;
1762
1763 #ifdef CHECK_INVARIANTS
1764             leaf->base_invariant_unique_keys_check();
1765             left_sibling->base_invariant_unique_keys_check();
1766             INVARIANT(left_sibling->keys_[left_n - nstolen - 1] < new_key);
1767 #endif /* CHECK_INVARIANTS */
1768
1769             return R_STOLE_FROM_LEFT;
1770           } else {
1771             if ((left_n + (n - 1)) > NKeysPerNode) {
1772               INVARIANT(n > 1);
1773               remove_pos_from_leaf_node(leaf, ret, n);
1774               return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1775             }
1776           }
1777         }
1778
1779         // merge this node into left sibling
1780         INVARIANT(left_sibling->keys_[left_n - 1] < leaf->keys_[0]);
1781         INVARIANT((left_n + (n - 1)) <= NKeysPerNode);
1782
1783         copy_into(&left_sibling->keys_[left_n], leaf->keys_, 0, ret);
1784         copy_into(&left_sibling->keys_[left_n + ret], leaf->keys_, ret + 1, n);
1785
1786         copy_into(&left_sibling->values_[left_n], leaf->values_, 0, ret);
1787         copy_into(&left_sibling->values_[left_n + ret], leaf->values_, ret + 1, n);
1788
1789         copy_into(&left_sibling->lengths_[left_n], leaf->lengths_, 0, ret);
1790         copy_into(&left_sibling->lengths_[left_n + ret], leaf->lengths_, ret + 1, n);
1791
1792         if (leaf->suffixes_) {
1793           left_sibling->ensure_suffixes();
1794           swap_with(&left_sibling->suffixes_[left_n], leaf->suffixes_, 0, ret);
1795           swap_with(&left_sibling->suffixes_[left_n + ret], leaf->suffixes_, ret + 1, n);
1796         }
1797
1798         left_sibling->set_key_slots_used(left_n + (n - 1));
1799         left_sibling->next_ = leaf->next_;
1800         if (leaf->next_)
1801           leaf->next_->prev_ = left_sibling;
1802
1803         // see comments in right_sibling case above, for why one of them is INVARIANT and
1804         // the other is SINGLE_THREADED_INVARIANT
1805         INVARIANT(!left_sibling->next_ || left_sibling->next_->prev_ == left_sibling);
1806         SINGLE_THREADED_INVARIANT(
1807             !left_sibling->prev_ ||
1808             left_sibling->prev_->next_ == left_sibling);
1809
1810         //left_sibling->base_invariant_unique_keys_check();
1811         leaf_node::release(leaf);
1812         return R_MERGE_WITH_LEFT;
1813       }
1814
1815       // root node, so we are ok
1816       //INVARIANT(leaf == root);
1817       INVARIANT(leaf->is_root());
1818       remove_pos_from_leaf_node(leaf, ret, n);
1819       return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1820     }
1821   } else {
1822     internal_node *internal = AsInternal(np);
1823     uint64_t version = internal->stable_version();
1824     if (unlikely(RawVersionManip::IsDeleting(version)))
1825       return UnlockAndReturn(locked_nodes, R_RETRY);
1826     key_search_ret kret = internal->key_lower_bound_search(kslice);
1827     ssize_t ret = kret.first;
1828     size_t n = kret.second;
1829     size_t child_idx = (ret == -1) ? 0 : ret + 1;
1830     node *child_ptr = internal->children_[child_idx];
1831     key_slice *child_min_key = child_idx == 0 ? NULL : &internal->keys_[child_idx - 1];
1832     key_slice *child_max_key = child_idx == n ? NULL : &internal->keys_[child_idx];
1833     node *child_left_sibling = child_idx == 0 ? NULL : internal->children_[child_idx - 1];
1834     node *child_right_sibling = child_idx == n ? NULL : internal->children_[child_idx + 1];
1835     if (unlikely(!internal->check_version(version)))
1836       return UnlockAndReturn(locked_nodes, R_RETRY);
1837     parents.push_back(remove_parent_entry(internal, left_node, right_node, version));
1838     INVARIANT(n > 0);
1839     key_slice nk;
1840     node *rn;
1841     remove_status status = remove0(child_ptr,
1842         child_min_key,
1843         child_max_key,
1844         k,
1845         old_v,
1846         child_left_sibling,
1847         child_right_sibling,
1848         nk,
1849         rn,
1850         parents,
1851         locked_nodes);
1852     switch (status) {
1853       case R_NONE_NOMOD:
1854       case R_NONE_MOD:
1855       case R_RETRY:
1856         return status;
1857
1858       case R_STOLE_FROM_LEFT:
1859         INVARIANT(internal->is_locked());
1860         INVARIANT(internal->is_lock_owner());
1861         internal->keys_[child_idx - 1] = nk;
1862         return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1863
1864       case R_STOLE_FROM_RIGHT:
1865         INVARIANT(internal->is_locked());
1866         INVARIANT(internal->is_lock_owner());
1867         internal->keys_[child_idx] = nk;
1868         return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1869
1870       case R_MERGE_WITH_LEFT:
1871       case R_MERGE_WITH_RIGHT:
1872         {
1873           internal->mark_modifying();
1874
1875           size_t del_key_idx, del_child_idx;
1876           if (status == R_MERGE_WITH_LEFT) {
1877             // need to delete key at position (child_idx - 1), and child at
1878             // position (child_idx)
1879             del_key_idx = child_idx - 1;
1880             del_child_idx = child_idx;
1881           } else {
1882             // need to delete key at position (child_idx), and child at
1883             // posiiton (child_idx + 1)
1884             del_key_idx = child_idx;
1885             del_child_idx = child_idx + 1;
1886           }
1887
1888           if (n > NMinKeysPerNode) {
1889             remove_pos_from_internal_node(internal, del_key_idx, del_child_idx, n);
1890             return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1891           }
1892
1893           internal_node *left_sibling = AsInternal(left_node);
1894           internal_node *right_sibling = AsInternal(right_node);
1895
1896           // WARNING: if you change the order of events here, then you must also
1897           // change the locking protocol (see the comment in the leaf node case)
1898
1899           if (right_sibling) {
1900             right_sibling->mark_modifying();
1901             size_t right_n = right_sibling->key_slots_used();
1902             INVARIANT(max_key);
1903             INVARIANT(right_sibling->keys_[0] > internal->keys_[n - 1]);
1904             INVARIANT(*max_key > internal->keys_[n - 1]);
1905             if (right_n > NMinKeysPerNode) {
1906               // steal from right
1907               sift_left(internal->keys_, del_key_idx, n);
1908               internal->keys_[n - 1] = *max_key;
1909
1910               sift_left(internal->children_, del_child_idx, n + 1);
1911               internal->children_[n] = right_sibling->children_[0];
1912
1913               new_key = right_sibling->keys_[0];
1914
1915               sift_left(right_sibling->keys_, 0, right_n);
1916               sift_left(right_sibling->children_, 0, right_n + 1);
1917               right_sibling->dec_key_slots_used();
1918
1919               return R_STOLE_FROM_RIGHT;
1920             } else {
1921               // merge with right
1922               INVARIANT(max_key);
1923
1924               sift_left(internal->keys_, del_key_idx, n);
1925               internal->keys_[n - 1] = *max_key;
1926               copy_into(&internal->keys_[n], right_sibling->keys_, 0, right_n);
1927
1928               sift_left(internal->children_, del_child_idx, n + 1);
1929               copy_into(&internal->children_[n], right_sibling->children_, 0, right_n + 1);
1930
1931               internal->set_key_slots_used(n + right_n);
1932               internal_node::release(right_sibling);
1933               return R_MERGE_WITH_RIGHT;
1934             }
1935           }
1936
1937           if (left_sibling) {
1938             left_sibling->mark_modifying();
1939             size_t left_n = left_sibling->key_slots_used();
1940             INVARIANT(min_key);
1941             INVARIANT(left_sibling->keys_[left_n - 1] < internal->keys_[0]);
1942             INVARIANT(left_sibling->keys_[left_n - 1] < *min_key);
1943             INVARIANT(*min_key < internal->keys_[0]);
1944             if (left_n > NMinKeysPerNode) {
1945               // steal from left
1946               sift_right(internal->keys_, 0, del_key_idx);
1947               internal->keys_[0] = *min_key;
1948
1949               sift_right(internal->children_, 0, del_child_idx);
1950               internal->children_[0] = left_sibling->children_[left_n];
1951
1952               new_key = left_sibling->keys_[left_n - 1];
1953               left_sibling->dec_key_slots_used();
1954
1955               return R_STOLE_FROM_LEFT;
1956             } else {
1957               // merge into left sibling
1958               INVARIANT(min_key);
1959
1960               size_t left_key_j = left_n;
1961               size_t left_child_j = left_n + 1;
1962
1963               left_sibling->keys_[left_key_j++] = *min_key;
1964
1965               copy_into(&left_sibling->keys_[left_key_j], internal->keys_, 0, del_key_idx);
1966               left_key_j += del_key_idx;
1967               copy_into(&left_sibling->keys_[left_key_j], internal->keys_, del_key_idx + 1, n);
1968
1969               copy_into(&left_sibling->children_[left_child_j], internal->children_, 0, del_child_idx);
1970               left_child_j += del_child_idx;
1971               copy_into(&left_sibling->children_[left_child_j], internal->children_, del_child_idx + 1, n + 1);
1972
1973               left_sibling->set_key_slots_used(n + left_n);
1974               internal_node::release(internal);
1975               return R_MERGE_WITH_LEFT;
1976             }
1977           }
1978
1979           //INVARIANT(internal == root);
1980           INVARIANT(internal->is_root());
1981           remove_pos_from_internal_node(internal, del_key_idx, del_child_idx, n);
1982           INVARIANT(internal->key_slots_used() + 1 == n);
1983           if ((n - 1) == 0) {
1984             replace_node = internal->children_[0];
1985             internal_node::release(internal);
1986             return R_REPLACE_NODE;
1987           }
1988
1989           return UnlockAndReturn(locked_nodes, R_NONE_MOD);
1990         }
1991
1992       default:
1993         ALWAYS_ASSERT(false);
1994         return UnlockAndReturn(locked_nodes, R_NONE_NOMOD);
1995     }
1996   }
1997 }
1998
1999 template <typename P>
2000 std::string
2001 btree<P>::NodeStringify(const node_opaque_t *n)
2002 {
2003   std::vector<std::string> keys;
2004   for (size_t i = 0; i < n->key_slots_used(); i++)
2005     keys.push_back(std::string("0x") + util::hexify(n->keys_[i]));
2006
2007   std::ostringstream b;
2008   b << "node[v=" << n->version_info_str()
2009     << ", keys=" << util::format_list(keys.begin(), keys.end());
2010
2011   if (n->is_leaf_node()) {
2012     const leaf_node *leaf = AsLeaf(n);
2013     std::vector<std::string> lengths;
2014     for (size_t i = 0; i < leaf->key_slots_used(); i++) {
2015       std::ostringstream inf;
2016       inf << "<l=" << leaf->keyslice_length(i) << ",is_layer=" << leaf->is_layer(i) << ">";
2017       lengths.push_back(inf.str());
2018     }
2019     b << ", lengths=" << util::format_list(lengths.begin(), lengths.end());
2020   } else {
2021     //const internal_node *internal = AsInternal(n);
2022     // nothing for now
2023   }
2024
2025   b << "]";
2026   return b.str();
2027 }
2028
2029 template <typename P>
2030 std::vector<std::pair<typename btree<P>::value_type, bool>>
2031 btree<P>::ExtractValues(const node_opaque_t *n)
2032 {
2033   std::vector< std::pair<value_type, bool> > ret;
2034   if (!n->is_leaf_node())
2035     return ret;
2036   const leaf_node *leaf = (const leaf_node *) n;
2037   const size_t sz = leaf->key_slots_used();
2038   for (size_t i = 0; i < sz; i++)
2039     if (!leaf->is_layer(i))
2040       ret.emplace_back(leaf->values_[i].v_, leaf->keyslice_length(i) > 8);
2041   return ret;
2042 }