benchmark silo added
[c11concurrency-benchmarks.git] / silo / btree_impl.h
diff --git a/silo/btree_impl.h b/silo/btree_impl.h
new file mode 100644 (file)
index 0000000..0595b90
--- /dev/null
@@ -0,0 +1,2042 @@
+#pragma once
+
+#include <unistd.h>
+
+#include <iostream>
+#include <map>
+#include <set>
+#include <stack>
+#include <vector>
+#include <sstream>
+#include <atomic>
+#include <memory>
+
+#include "core.h"
+#include "btree.h"
+#include "thread.h"
+#include "txn.h"
+#include "util.h"
+#include "scopedperf.hh"
+
+template <typename P>
+void
+btree<P>::node::base_invariant_unique_keys_check() const
+{
+  size_t n = this->key_slots_used();
+  if (n == 0)
+    return;
+  if (is_leaf_node()) {
+    const leaf_node *leaf = AsLeaf(this);
+    typedef std::pair<key_slice, size_t> leaf_key;
+    leaf_key prev;
+    prev.first = keys_[0];
+    prev.second = leaf->keyslice_length(0);
+    ALWAYS_ASSERT(prev.second <= 9);
+    ALWAYS_ASSERT(!leaf->is_layer(0) || prev.second == 9);
+    if (!leaf->is_layer(0) && prev.second == 9) {
+      ALWAYS_ASSERT(leaf->suffixes_);
+      ALWAYS_ASSERT(leaf->suffixes_[0].size() >= 1);
+    }
+    for (size_t i = 1; i < n; i++) {
+      leaf_key cur_key;
+      cur_key.first = keys_[i];
+      cur_key.second = leaf->keyslice_length(i);
+      ALWAYS_ASSERT(cur_key.second <= 9);
+      ALWAYS_ASSERT(!leaf->is_layer(i) || cur_key.second == 9);
+      if (!leaf->is_layer(i) && cur_key.second == 9) {
+        ALWAYS_ASSERT(leaf->suffixes_);
+        ALWAYS_ASSERT(leaf->suffixes_[i].size() >= 1);
+      }
+      ALWAYS_ASSERT(cur_key > prev);
+      prev = cur_key;
+    }
+  } else {
+    key_slice prev = keys_[0];
+    for (size_t i = 1; i < n; i++) {
+      ALWAYS_ASSERT(keys_[i] > prev);
+      prev = keys_[i];
+    }
+  }
+}
+
+template <typename P>
+void
+btree<P>::node::base_invariant_checker(const key_slice *min_key,
+                                       const key_slice *max_key,
+                                       bool is_root) const
+{
+  ALWAYS_ASSERT(!is_locked());
+  ALWAYS_ASSERT(!is_modifying());
+  ALWAYS_ASSERT(this->is_root() == is_root);
+  size_t n = this->key_slots_used();
+  ALWAYS_ASSERT(n <= NKeysPerNode);
+  if (is_root) {
+    if (is_internal_node())
+      ALWAYS_ASSERT(n >= 1);
+  } else {
+    if (is_internal_node())
+      ALWAYS_ASSERT(n >= NMinKeysPerNode);
+    else
+      // key-slices constrain splits
+      ALWAYS_ASSERT(n >= 1);
+  }
+  for (size_t i = 0; i < n; i++) {
+    ALWAYS_ASSERT(!min_key || keys_[i] >= *min_key);
+    ALWAYS_ASSERT(!max_key || keys_[i] < *max_key);
+  }
+  base_invariant_unique_keys_check();
+}
+
+template <typename P>
+void
+btree<P>::node::invariant_checker(const key_slice *min_key,
+                                  const key_slice *max_key,
+                                  const node *left_sibling,
+                                  const node *right_sibling,
+                                  bool is_root) const
+{
+  is_leaf_node() ?
+    AsLeaf(this)->invariant_checker_impl(min_key, max_key, left_sibling, right_sibling, is_root) :
+    AsInternal(this)->invariant_checker_impl(min_key, max_key, left_sibling, right_sibling, is_root) ;
+}
+
+//static event_counter evt_btree_leaf_node_creates("btree_leaf_node_creates");
+//static event_counter evt_btree_leaf_node_deletes("btree_leaf_node_deletes");
+
+//event_counter btree<P>::leaf_node::g_evt_suffixes_array_created("btree_leaf_node_suffix_array_creates");
+
+template <typename P>
+btree<P>::leaf_node::leaf_node()
+  : node(), min_key_(0), prev_(NULL), next_(NULL), suffixes_(NULL)
+{
+  //++evt_btree_leaf_node_creates;
+}
+
+template <typename P>
+btree<P>::leaf_node::~leaf_node()
+{
+  if (suffixes_)
+    delete [] suffixes_;
+  //suffixes_ = NULL;
+  //++evt_btree_leaf_node_deletes;
+}
+
+template <typename P>
+void
+btree<P>::leaf_node::invariant_checker_impl(const key_slice *min_key,
+                                            const key_slice *max_key,
+                                            const node *left_sibling,
+                                            const node *right_sibling,
+                                            bool is_root) const
+{
+  this->base_invariant_checker(min_key, max_key, is_root);
+  ALWAYS_ASSERT(!min_key || *min_key == this->min_key_);
+  ALWAYS_ASSERT(!is_root || min_key == NULL);
+  ALWAYS_ASSERT(!is_root || max_key == NULL);
+  ALWAYS_ASSERT(is_root || this->key_slots_used() > 0);
+  size_t n = this->key_slots_used();
+  for (size_t i = 0; i < n; i++)
+    if (this->is_layer(i))
+      this->values_[i].n_->invariant_checker(NULL, NULL, NULL, NULL, true);
+}
+
+//static event_counter evt_btree_internal_node_creates("btree_internal_node_creates");
+//static event_counter evt_btree_internal_node_deletes("btree_internal_node_deletes");
+
+template <typename P>
+btree<P>::internal_node::internal_node()
+{
+  VersionManip::Store(this->hdr_, 1);
+  //++evt_btree_internal_node_creates;
+}
+
+template <typename P>
+btree<P>::internal_node::~internal_node()
+{
+  //++evt_btree_internal_node_deletes;
+}
+
+template <typename P>
+void
+btree<P>::internal_node::invariant_checker_impl(const key_slice *min_key,
+                                                const key_slice *max_key,
+                                                const node *left_sibling,
+                                                const node *right_sibling,
+                                                bool is_root) const
+{
+  this->base_invariant_checker(min_key, max_key, is_root);
+  size_t n = this->key_slots_used();
+  for (size_t i = 0; i <= n; i++) {
+    ALWAYS_ASSERT(this->children_[i] != NULL);
+    if (i == 0) {
+      const node *left_child_sibling = NULL;
+      if (left_sibling)
+        left_child_sibling = AsInternal(left_sibling)->children_[left_sibling->key_slots_used()];
+      this->children_[0]->invariant_checker(min_key, &this->keys_[0], left_child_sibling, this->children_[i + 1], false);
+    } else if (i == n) {
+      const node *right_child_sibling = NULL;
+      if (right_sibling)
+        right_child_sibling = AsInternal(right_sibling)->children_[0];
+      this->children_[n]->invariant_checker(&this->keys_[n - 1], max_key, this->children_[i - 1], right_child_sibling, false);
+    } else {
+      this->children_[i]->invariant_checker(&this->keys_[i - 1], &this->keys_[i], this->children_[i - 1], this->children_[i + 1], false);
+    }
+  }
+  if (!n || this->children_[0]->is_internal_node())
+    return;
+  for (size_t i = 0; i <= n; i++) {
+    const node *left_child_sibling = NULL;
+    const node *right_child_sibling = NULL;
+    if (left_sibling)
+      left_child_sibling = AsInternal(left_sibling)->children_[left_sibling->key_slots_used()];
+    if (right_sibling)
+      right_child_sibling = AsInternal(right_sibling)->children_[0];
+    const leaf_node *child_prev = (i == 0) ? AsLeaf(left_child_sibling) : AsLeaf(this->children_[i - 1]);
+    const leaf_node *child_next = (i == n) ? AsLeaf(right_child_sibling) : AsLeaf(this->children_[i + 1]);
+    ALWAYS_ASSERT(AsLeaf(this->children_[i])->prev_ == child_prev);
+    ALWAYS_ASSERT(AsLeaf(this->children_[i])->next_ == child_next);
+  }
+}
+
+template <typename P>
+void
+btree<P>::recursive_delete(node *n)
+{
+  if (leaf_node *leaf = AsLeafCheck(n)) {
+#ifdef CHECK_INVARIANTS
+    leaf->lock();
+    leaf->mark_deleting();
+    leaf->unlock();
+#endif
+    size_t n = leaf->key_slots_used();
+    for (size_t i = 0; i < n; i++)
+      if (leaf->is_layer(i))
+        recursive_delete(leaf->values_[i].n_);
+    leaf_node::deleter(leaf);
+  } else {
+    internal_node *internal = AsInternal(n);
+    size_t n = internal->key_slots_used();
+    for (size_t i = 0; i < n + 1; i++)
+      recursive_delete(internal->children_[i]);
+#ifdef CHECK_INVARIANTS
+    internal->lock();
+    internal->mark_deleting();
+    internal->unlock();
+#endif
+    internal_node::deleter(internal);
+  }
+}
+
+//STATIC_COUNTER_DECL(scopedperf::tsc_ctr, btree_search_impl_tsc, btree_search_impl_perf_cg);
+
+template <typename P>
+bool
+btree<P>::search_impl(const key_type &k, value_type &v,
+                      typename util::vec<leaf_node *>::type &leaf_nodes,
+                      versioned_node_t *search_info) const
+{
+  INVARIANT(rcu::s_instance.in_rcu_region());
+  //ANON_REGION("btree<P>::search_impl:", &btree_search_impl_perf_cg);
+  INVARIANT(leaf_nodes.empty());
+
+retry:
+  node *cur;
+  key_type kcur;
+  uint64_t kslice;
+  size_t kslicelen;
+  if (likely(leaf_nodes.empty())) {
+    kcur = k;
+    cur = root_;
+    kslice = k.slice();
+    kslicelen = std::min(k.size(), size_t(9));
+  } else {
+    kcur = k.shift_many(leaf_nodes.size() - 1);
+    cur = leaf_nodes.back();
+    kslice = kcur.slice();
+    kslicelen = std::min(kcur.size(), size_t(9));
+    leaf_nodes.pop_back();
+  }
+
+  while (true) {
+    // each iteration of this while loop tries to descend
+    // down node "cur", looking for key kcur
+
+process:
+    uint64_t version = cur->stable_version();
+    if (unlikely(RawVersionManip::IsDeleting(version)))
+      // XXX: maybe we can only retry at the parent of this node, not the
+      // root node of the b-tree *layer*
+      goto retry;
+    if (leaf_node *leaf = AsLeafCheck(cur, version)) {
+      leaf->prefetch();
+      if (search_info) {
+        search_info->first = leaf;
+        search_info->second = RawVersionManip::Version(version);
+      }
+      key_search_ret kret = leaf->key_search(kslice, kslicelen);
+      ssize_t ret = kret.first;
+      if (ret != -1) {
+        // found
+        typename leaf_node::value_or_node_ptr vn = leaf->values_[ret];
+        const bool is_layer = leaf->is_layer(ret);
+        INVARIANT(!is_layer || kslicelen == 9);
+        varkey suffix(leaf->suffix(ret));
+        if (unlikely(!leaf->check_version(version)))
+          goto process;
+        leaf_nodes.push_back(leaf);
+
+        if (!is_layer) {
+          // check suffixes
+          if (kslicelen == 9 && suffix != kcur.shift())
+            return false;
+          v = vn.v_;
+          return true;
+        }
+
+        // search the next layer
+        cur = vn.n_;
+        kcur = kcur.shift();
+        kslice = kcur.slice();
+        kslicelen = std::min(kcur.size(), size_t(9));
+        continue;
+      }
+
+      // leaf might have lost responsibility for key k during the descend. we
+      // need to check this and adjust accordingly
+      if (unlikely(kslice < leaf->min_key_)) {
+        // try to go left
+        leaf_node *left_sibling = leaf->prev_;
+        if (unlikely(!leaf->check_version(version)))
+          goto process;
+        if (likely(left_sibling)) {
+          cur = left_sibling;
+          continue;
+        } else {
+          // XXX: this case shouldn't be possible...
+          goto retry;
+        }
+      } else {
+        // try to go right
+        leaf_node *right_sibling = leaf->next_;
+        if (unlikely(!leaf->check_version(version)))
+          goto process;
+        if (unlikely(!right_sibling)) {
+          leaf_nodes.push_back(leaf);
+          return false;
+        }
+        right_sibling->prefetch();
+        uint64_t right_version = right_sibling->stable_version();
+        key_slice right_min_key = right_sibling->min_key_;
+        if (unlikely(!right_sibling->check_version(right_version)))
+          goto process;
+        if (unlikely(kslice >= right_min_key)) {
+          cur = right_sibling;
+          continue;
+        }
+      }
+
+      leaf_nodes.push_back(leaf);
+      return false;
+    } else {
+      internal_node *internal = AsInternal(cur);
+      internal->prefetch();
+      key_search_ret kret = internal->key_lower_bound_search(kslice);
+      ssize_t ret = kret.first;
+      if (ret != -1)
+        cur = internal->children_[ret + 1];
+      else
+        cur = internal->children_[0];
+      if (unlikely(!internal->check_version(version)))
+        goto process;
+      INVARIANT(kret.second);
+    }
+  }
+}
+
+template <typename S>
+class string_restore {
+public:
+  inline string_restore(S &s, size_t n)
+    : s_(&s), n_(n) {}
+  inline ~string_restore()
+  {
+    s_->resize(n_);
+  }
+private:
+  S *s_;
+  size_t n_;
+};
+
+// recursively read the range from the layer down
+//
+// all args relative to prefix.
+//
+// prefix is non-const, meaning that this function might modify the contents.
+// it is guaranteed, however, to restore prefix to the state it was before the
+// invocation
+//
+// returns true if we keep going
+template <typename P>
+bool
+btree<P>::search_range_at_layer(
+    leaf_node *leaf,
+    string_type &prefix,
+    const key_type &lower,
+    bool inc_lower,
+    const key_type *upper,
+    low_level_search_range_callback &callback) const
+{
+  VERBOSE(std::cerr << "search_range_at_layer: prefix.size()=" << prefix.size() << std::endl);
+
+  key_slice last_keyslice = 0;
+  size_t last_keyslice_len = 0;
+  bool emitted_last_keyslice = false;
+  if (!inc_lower) {
+    last_keyslice = lower.slice();
+    last_keyslice_len = std::min(lower.size(), size_t(9));
+    emitted_last_keyslice = true;
+  }
+
+  key_slice lower_slice = lower.slice();
+  key_slice next_key = lower_slice;
+  const size_t prefix_size = prefix.size();
+  // NB: DON'T CALL RESERVE()
+  //prefix.reserve(prefix_size + 8); // allow for next layer
+  const uint64_t upper_slice = upper ? upper->slice() : 0;
+  string_restore<string_type> restorer(prefix, prefix_size);
+  while (!upper || next_key <= upper_slice) {
+    leaf->prefetch();
+
+    typename util::vec<leaf_kvinfo>::type buf;
+    const uint64_t version = leaf->stable_version();
+    key_slice leaf_min_key = leaf->min_key_;
+    if (leaf_min_key > next_key) {
+      // go left
+      leaf_node *left_sibling = leaf->prev_;
+      if (unlikely(!leaf->check_version(version)))
+        // try this node again
+        continue;
+      // try from left_sibling
+      leaf = left_sibling;
+      INVARIANT(leaf);
+      continue;
+    }
+
+    // grab all keys in [lower_slice, upper_slice]. we'll do boundary condition
+    // checking later (outside of the critical section)
+    for (size_t i = 0; i < leaf->key_slots_used(); i++) {
+      // XXX(stephentu): this 1st filter is overly conservative and we can do
+      // better
+      if ((leaf->keys_[i] > lower_slice ||
+           (leaf->keys_[i] == lower_slice &&
+            leaf->keyslice_length(i) >= std::min(lower.size(), size_t(9)))) &&
+          (!upper || leaf->keys_[i] <= upper_slice))
+        buf.emplace_back(
+            leaf->keys_[i],
+            leaf->values_[i],
+            leaf->is_layer(i),
+            leaf->keyslice_length(i),
+            leaf->suffix(i));
+    }
+
+    leaf_node *const right_sibling = leaf->next_;
+    key_slice leaf_max_key = right_sibling ? right_sibling->min_key_ : 0;
+
+    if (unlikely(!leaf->check_version(version)))
+      continue;
+
+    callback.on_resp_node(leaf, RawVersionManip::Version(version));
+
+    for (size_t i = 0; i < buf.size(); i++) {
+      // check to see if we already omitted a key <= buf[i]: if so, don't omit it
+      if (emitted_last_keyslice &&
+          ((buf[i].key_ < last_keyslice) ||
+           (buf[i].key_ == last_keyslice && buf[i].length_ <= last_keyslice_len)))
+        continue;
+      const size_t ncpy = std::min(buf[i].length_, size_t(8));
+      // XXX: prefix.end() calls _M_leak()
+      //prefix.replace(prefix.begin() + prefix_size, prefix.end(), buf[i].keyslice(), ncpy);
+      prefix.replace(prefix_size, string_type::npos, buf[i].keyslice(), ncpy);
+      if (buf[i].layer_) {
+        // recurse into layer
+        leaf_node *const next_layer = leftmost_descend_layer(buf[i].vn_.n_);
+        varkey zerokey;
+        if (emitted_last_keyslice && last_keyslice == buf[i].key_)
+          // NB(stephentu): this is implied by the filter above
+          INVARIANT(last_keyslice_len <= 8);
+        if (!search_range_at_layer(next_layer, prefix, zerokey, false, NULL, callback))
+          return false;
+      } else {
+        // check if we are before the start
+        if (buf[i].key_ == lower_slice) {
+          if (buf[i].length_ <= 8) {
+            if (buf[i].length_ < lower.size())
+              // skip
+              continue;
+          } else {
+            INVARIANT(buf[i].length_ == 9);
+            if (lower.size() > 8 && buf[i].suffix_ < lower.shift())
+              // skip
+              continue;
+          }
+        }
+
+        // check if we are after the end
+        if (upper && buf[i].key_ == upper_slice) {
+          if (buf[i].length_ == 9) {
+            if (upper->size() <= 8)
+              break;
+            if (buf[i].suffix_ >= upper->shift())
+              break;
+          } else if (buf[i].length_ >= upper->size()) {
+            break;
+          }
+        }
+        if (buf[i].length_ == 9)
+          prefix.append((const char *) buf[i].suffix_.data(), buf[i].suffix_.size());
+        // we give the actual version # minus all the other bits, b/c they are not
+        // important here and make comparison easier at higher layers
+        if (!callback.invoke(prefix, buf[i].vn_.v_, leaf, RawVersionManip::Version(version)))
+          return false;
+      }
+      last_keyslice = buf[i].key_;
+      last_keyslice_len = buf[i].length_;
+      emitted_last_keyslice = true;
+    }
+
+    if (!right_sibling)
+      // we're done
+      return true;
+
+    next_key = leaf_max_key;
+    leaf = right_sibling;
+  }
+
+  return true;
+}
+
+template <typename P>
+void
+btree<P>::search_range_call(const key_type &lower,
+                            const key_type *upper,
+                            low_level_search_range_callback &callback,
+                            string_type *buf) const
+{
+  rcu_region guard;
+  INVARIANT(rcu::s_instance.in_rcu_region());
+  if (unlikely(upper && *upper <= lower))
+    return;
+  typename util::vec<leaf_node *>::type leaf_nodes;
+  value_type v = 0;
+  search_impl(lower, v, leaf_nodes);
+  INVARIANT(!leaf_nodes.empty());
+  bool first = true;
+  string_type prefix_tmp, *prefix_px;
+  if (buf)
+    prefix_px = buf;
+  else
+    prefix_px = &prefix_tmp;
+  string_type &prefix(*prefix_px);
+  INVARIANT(prefix.empty());
+  prefix.assign((const char *) lower.data(), 8 * (leaf_nodes.size() - 1));
+  while (!leaf_nodes.empty()) {
+    leaf_node *cur = leaf_nodes.back();
+    leaf_nodes.pop_back();
+    key_type layer_upper;
+    bool layer_has_upper = false;
+    if (upper && upper->size() >= (8 * leaf_nodes.size())) {
+      layer_upper = upper->shift_many(leaf_nodes.size());
+      layer_has_upper = true;
+    }
+#ifdef CHECK_INVARIANTS
+    string_type prefix_before(prefix);
+#endif
+    if (!search_range_at_layer(
+          cur, prefix, lower.shift_many(leaf_nodes.size()),
+          first, layer_has_upper ? &layer_upper : NULL, callback))
+      return;
+#ifdef CHECK_INVARIANTS
+    INVARIANT(prefix == prefix_before);
+#endif
+    first = false;
+    if (!leaf_nodes.empty()) {
+      INVARIANT(prefix.size() >= 8);
+      prefix.resize(prefix.size() - 8);
+    }
+  }
+}
+
+template <typename P>
+bool
+btree<P>::remove_stable_location(node **root_location, const key_type &k, value_type *old_v)
+{
+  INVARIANT(rcu::s_instance.in_rcu_region());
+retry:
+  key_slice new_key;
+  node *replace_node = NULL;
+  typename util::vec<remove_parent_entry>::type parents;
+  typename util::vec<node *>::type locked_nodes;
+  node *local_root = *root_location;
+  remove_status status = remove0(local_root,
+      NULL, /* min_key */
+      NULL, /* max_key */
+      k,
+      old_v,
+      NULL, /* left_node */
+      NULL, /* right_node */
+      new_key,
+      replace_node,
+      parents,
+      locked_nodes);
+  switch (status) {
+  case R_NONE_NOMOD:
+    return false;
+  case R_NONE_MOD:
+    return true;
+  case R_RETRY:
+    goto retry;
+  case R_REPLACE_NODE:
+    INVARIANT(local_root->is_deleting());
+    INVARIANT(local_root->is_lock_owner());
+    INVARIANT(local_root->is_root());
+    INVARIANT(local_root == *root_location);
+    replace_node->set_root();
+    local_root->clear_root();
+    COMPILER_MEMORY_FENCE;
+    *root_location = replace_node;
+    // locks are still held here
+    return UnlockAndReturn(locked_nodes, true);
+  default:
+    ALWAYS_ASSERT(false);
+    return false;
+  }
+  ALWAYS_ASSERT(false);
+  return false;
+}
+
+template <typename P>
+typename btree<P>::leaf_node *
+btree<P>::leftmost_descend_layer(node *n) const
+{
+  node *cur = n;
+  while (true) {
+    if (leaf_node *leaf = AsLeafCheck(cur))
+      return leaf;
+    internal_node *internal = AsInternal(cur);
+    uint64_t version = cur->stable_version();
+    node *child = internal->children_[0];
+    if (unlikely(!internal->check_version(version)))
+      continue;
+    cur = child;
+  }
+}
+
+template <typename P>
+void
+btree<P>::tree_walk(tree_walk_callback &callback) const
+{
+  rcu_region guard;
+  INVARIANT(rcu::s_instance.in_rcu_region());
+  std::vector<node *> q;
+  // XXX: not sure if cast is safe
+  q.push_back((node *) root_);
+  while (!q.empty()) {
+    node *cur = q.back();
+    q.pop_back();
+    cur->prefetch();
+    leaf_node *leaf = leftmost_descend_layer(cur);
+    INVARIANT(leaf);
+    while (leaf) {
+      leaf->prefetch();
+    process:
+      const uint64_t version = leaf->stable_version();
+      const size_t n = leaf->key_slots_used();
+      std::vector<node *> layers;
+      for (size_t i = 0; i < n; i++)
+        if (leaf->is_layer(i))
+          layers.push_back(leaf->values_[i].n_);
+      leaf_node *next = leaf->next_;
+      callback.on_node_begin(leaf);
+      if (unlikely(!leaf->check_version(version))) {
+        callback.on_node_failure();
+        goto process;
+      }
+      callback.on_node_success();
+      leaf = next;
+      q.insert(q.end(), layers.begin(), layers.end());
+    }
+  }
+}
+
+template <typename P>
+void
+btree<P>::size_walk_callback::on_node_begin(const node_opaque_t *n)
+{
+  INVARIANT(n->is_leaf_node());
+  INVARIANT(spec_size_ == 0);
+  const leaf_node *leaf = (const leaf_node *) n;
+  const size_t sz = leaf->key_slots_used();
+  for (size_t i = 0; i < sz; i++)
+    if (!leaf->is_layer(i))
+      spec_size_++;
+}
+
+template <typename P>
+void
+btree<P>::size_walk_callback::on_node_success()
+{
+  size_ += spec_size_;
+  spec_size_ = 0;
+}
+
+template <typename P>
+void
+btree<P>::size_walk_callback::on_node_failure()
+{
+  spec_size_ = 0;
+}
+
+template <typename P>
+typename btree<P>::leaf_node *
+btree<P>::FindRespLeafNode(
+      leaf_node *leaf,
+      uint64_t kslice,
+      uint64_t &version)
+{
+retry:
+  version = leaf->stable_version();
+  if (unlikely(leaf->is_deleting())) {
+    leaf_node *left = leaf->prev_;
+    if (left) {
+      leaf = left;
+      goto retry;
+    }
+    leaf_node *right = leaf->next_;
+    if (right) {
+      leaf = right;
+      goto retry;
+    }
+    // XXX(stephentu): not sure if we can *really* depend on this,
+    // need to convince ourselves why this is not possible!
+    ALWAYS_ASSERT(false);
+  }
+  if (unlikely(kslice < leaf->min_key_)) {
+    // we need to go left
+    leaf_node *left = leaf->prev_;
+    if (left)
+      leaf = left;
+    goto retry;
+  }
+  leaf_node *right = leaf->next_;
+
+  // NB(stephentu): the only way for right->min_key_ to decrease, is for the
+  // current leaf node to split. therefore, it is un-necessary to ensure stable
+  // version on the next node (we only need to ensure it on the current node)
+  if (likely(right) && unlikely(kslice >= right->min_key_)) {
+    leaf = right;
+    goto retry;
+  }
+
+  //if (likely(right)) {
+  //  const uint64_t right_version = right->stable_version();
+  //  const uint64_t right_min_key = right->min_key_;
+  //  if (unlikely(!right->check_version(right_version)))
+  //    goto retry;
+  //  if (unlikely(kslice >= right_min_key)) {
+  //    leaf = right;
+  //    goto retry;
+  //  }
+  //}
+
+  return leaf;
+}
+
+template <typename P>
+typename btree<P>::leaf_node *
+btree<P>::FindRespLeafLowerBound(
+      leaf_node *leaf,
+      uint64_t kslice,
+      size_t kslicelen,
+      uint64_t &version,
+      size_t &n,
+      ssize_t &idxmatch,
+      ssize_t &idxlowerbound)
+{
+  leaf = FindRespLeafNode(leaf, kslice, version);
+
+  // use 0 for slice length, so we can a pointer <= all elements
+  // with the same slice
+  const key_search_ret kret = leaf->key_lower_bound_search(kslice, 0);
+  const ssize_t ret = kret.first;
+  n = kret.second;
+
+  // count the number of values already here with the same kslice.
+  idxmatch = -1;
+  idxlowerbound = ret;
+  for (size_t i = (ret == -1 ? 0 : ret); i < n; i++) {
+    if (leaf->keys_[i] < kslice) {
+      continue;
+    } else if (leaf->keys_[i] == kslice) {
+      const size_t kslicelen0 = leaf->keyslice_length(i);
+      if (kslicelen0 <= kslicelen) {
+        // invariant doesn't hold, b/c values can be changing
+        // concurrently (leaf is not assumed to be locked)
+        //INVARIANT(idxmatch == -1);
+        idxlowerbound = i;
+        if (kslicelen0 == kslicelen)
+          idxmatch = i;
+      }
+    } else {
+      break;
+    }
+  }
+
+  return leaf;
+}
+
+template <typename P>
+typename btree<P>::leaf_node *
+btree<P>::FindRespLeafExact(
+      leaf_node *leaf,
+      uint64_t kslice,
+      size_t kslicelen,
+      uint64_t &version,
+      size_t &n,
+      ssize_t &idxmatch)
+{
+  leaf = FindRespLeafNode(leaf, kslice, version);
+  key_search_ret kret = leaf->key_search(kslice, kslicelen);
+  idxmatch = kret.first;
+  n = kret.second;
+  return leaf;
+}
+
+template <typename P>
+typename btree<P>::insert_status
+btree<P>::insert0(node *np,
+                  const key_type &k,
+                  value_type v,
+                  bool only_if_absent,
+                  value_type *old_v,
+                  insert_info_t *insert_info,
+                  key_slice &min_key,
+                  node *&new_node,
+                  typename util::vec<insert_parent_entry>::type &parents,
+                  typename util::vec<node *>::type &locked_nodes)
+{
+  uint64_t kslice = k.slice();
+  size_t kslicelen = std::min(k.size(), size_t(9));
+
+  np->prefetch();
+  if (leaf_node *leaf = AsLeafCheck(np)) {
+    // locked nodes are acquired bottom to top
+    INVARIANT(locked_nodes.empty());
+
+retry_cur_leaf:
+    uint64_t version;
+    size_t n;
+    ssize_t lenmatch, lenlowerbound;
+    leaf_node *resp_leaf = FindRespLeafLowerBound(
+        leaf, kslice, kslicelen, version, n, lenmatch, lenlowerbound);
+
+    // len match case
+    if (lenmatch != -1) {
+      // exact match case
+      if (kslicelen <= 8 ||
+          (!resp_leaf->is_layer(lenmatch) &&
+           resp_leaf->suffix(lenmatch) == k.shift())) {
+        const uint64_t locked_version = resp_leaf->lock();
+        if (unlikely(!btree::CheckVersion(version, locked_version))) {
+          resp_leaf->unlock();
+          goto retry_cur_leaf;
+        }
+        locked_nodes.push_back(resp_leaf);
+        // easy case- we don't modify the node itself
+        if (old_v)
+          *old_v = resp_leaf->values_[lenmatch].v_;
+        if (!only_if_absent)
+          resp_leaf->values_[lenmatch].v_ = v;
+        if (insert_info)
+          insert_info->node = 0;
+        return UnlockAndReturn(locked_nodes, I_NONE_NOMOD);
+      }
+      INVARIANT(kslicelen == 9);
+      if (resp_leaf->is_layer(lenmatch)) {
+        node *subroot = resp_leaf->values_[lenmatch].n_;
+        INVARIANT(subroot);
+        if (unlikely(!resp_leaf->check_version(version)))
+          goto retry_cur_leaf;
+        key_slice mk;
+        node *ret;
+        typename util::vec<insert_parent_entry>::type subparents;
+        typename util::vec<node *>::type sub_locked_nodes;
+        const insert_status status =
+          insert0(subroot, k.shift(), v, only_if_absent, old_v, insert_info,
+              mk, ret, subparents, sub_locked_nodes);
+
+        switch (status) {
+        case I_NONE_NOMOD:
+        case I_NONE_MOD:
+        case I_RETRY:
+          INVARIANT(sub_locked_nodes.empty());
+          return status;
+
+        case I_SPLIT:
+          // the subroot split, so we need to find the leaf again, lock the
+          // node, and create a new internal node
+
+          INVARIANT(ret);
+          INVARIANT(ret->key_slots_used() > 0);
+
+          for (;;) {
+            resp_leaf = FindRespLeafLowerBound(
+                resp_leaf, kslice, kslicelen, version, n, lenmatch, lenlowerbound);
+            const uint64_t locked_version = resp_leaf->lock();
+            if (likely(btree::CheckVersion(version, locked_version))) {
+              locked_nodes.push_back(resp_leaf);
+              break;
+            }
+            resp_leaf->unlock();
+          }
+
+          INVARIANT(lenmatch != -1);
+          INVARIANT(resp_leaf->is_layer(lenmatch));
+          subroot = resp_leaf->values_[lenmatch].n_;
+          INVARIANT(subroot->is_modifying());
+          INVARIANT(subroot->is_lock_owner());
+          INVARIANT(subroot->is_root());
+
+          internal_node *new_root = internal_node::alloc();
+#ifdef CHECK_INVARIANTS
+          new_root->lock();
+          new_root->mark_modifying();
+          locked_nodes.push_back(new_root);
+#endif /* CHECK_INVARIANTS */
+          new_root->children_[0] = subroot;
+          new_root->children_[1] = ret;
+          new_root->keys_[0] = mk;
+          new_root->set_key_slots_used(1);
+          new_root->set_root();
+          subroot->clear_root();
+          resp_leaf->values_[lenmatch].n_ = new_root;
+
+          // locks are still held here
+          UnlockNodes(sub_locked_nodes);
+          return UnlockAndReturn(locked_nodes, I_NONE_MOD);
+        }
+        ALWAYS_ASSERT(false);
+
+      } else {
+        const uint64_t locked_version = resp_leaf->lock();
+        if (unlikely(!btree::CheckVersion(version, locked_version))) {
+          resp_leaf->unlock();
+          goto retry_cur_leaf;
+        }
+        locked_nodes.push_back(resp_leaf);
+
+        INVARIANT(resp_leaf->suffixes_); // b/c lenmatch != -1 and this is not a layer
+        // need to create a new btree layer, and add both existing key and
+        // new key to it
+
+        // XXX: need to mark modifying because we cannot change both the
+        // value and the type atomically
+        resp_leaf->mark_modifying();
+
+        leaf_node *new_root = leaf_node::alloc();
+#ifdef CHECK_INVARIANTS
+        new_root->lock();
+        new_root->mark_modifying();
+#endif /* CHECK_INVARIANTS */
+        new_root->set_root();
+        varkey old_slice(resp_leaf->suffix(lenmatch));
+        new_root->keys_[0] = old_slice.slice();
+        new_root->values_[0] = resp_leaf->values_[lenmatch];
+        new_root->keyslice_set_length(0, std::min(old_slice.size(), size_t(9)), false);
+        new_root->inc_key_slots_used();
+        if (new_root->keyslice_length(0) == 9) {
+          new_root->alloc_suffixes();
+          rcu_imstring i(old_slice.data() + 8, old_slice.size() - 8);
+          new_root->suffixes_[0].swap(i);
+        }
+        resp_leaf->values_[lenmatch].n_ = new_root;
+        {
+          rcu_imstring i;
+          resp_leaf->suffixes_[lenmatch].swap(i);
+        }
+        resp_leaf->value_set_layer(lenmatch);
+#ifdef CHECK_INVARIANTS
+        new_root->unlock();
+#endif /* CHECK_INVARIANTS */
+
+        key_slice mk;
+        node *ret;
+        typename util::vec<insert_parent_entry>::type subparents;
+        typename util::vec<node *>::type sub_locked_nodes;
+        const insert_status status =
+          insert0(new_root, k.shift(), v, only_if_absent, old_v, insert_info,
+              mk, ret, subparents, sub_locked_nodes);
+        if (status != I_NONE_MOD)
+          INVARIANT(false);
+        INVARIANT(sub_locked_nodes.empty());
+        return UnlockAndReturn(locked_nodes, I_NONE_MOD);
+      }
+    }
+
+    // lenlowerbound + 1 is the slot (0-based index) we want the new key to go
+    // into, in the leaf node
+    if (n < NKeysPerNode) {
+      const uint64_t locked_version = resp_leaf->lock();
+      if (unlikely(!btree::CheckVersion(version, locked_version))) {
+        resp_leaf->unlock();
+        goto retry_cur_leaf;
+      }
+      locked_nodes.push_back(resp_leaf);
+
+      // also easy case- we only need to make local modifications
+      resp_leaf->mark_modifying();
+
+      sift_right(resp_leaf->keys_, lenlowerbound + 1, n);
+      resp_leaf->keys_[lenlowerbound + 1] = kslice;
+      sift_right(resp_leaf->values_, lenlowerbound + 1, n);
+      resp_leaf->values_[lenlowerbound + 1].v_ = v;
+      sift_right(resp_leaf->lengths_, lenlowerbound + 1, n);
+      resp_leaf->keyslice_set_length(lenlowerbound + 1, kslicelen, false);
+      if (resp_leaf->suffixes_)
+        sift_swap_right(resp_leaf->suffixes_, lenlowerbound + 1, n);
+      if (kslicelen == 9) {
+        resp_leaf->ensure_suffixes();
+        rcu_imstring i(k.data() + 8, k.size() - 8);
+        resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
+      } else if (resp_leaf->suffixes_) {
+        rcu_imstring i;
+        resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
+      }
+      resp_leaf->inc_key_slots_used();
+
+//#ifdef CHECK_INVARIANTS
+//      resp_leaf->base_invariant_unique_keys_check();
+//#endif
+      if (insert_info) {
+        insert_info->node = resp_leaf;
+        insert_info->old_version = RawVersionManip::Version(resp_leaf->unstable_version()); // we hold lock on leaf
+        insert_info->new_version = insert_info->old_version + 1;
+      }
+      return UnlockAndReturn(locked_nodes, I_NONE_MOD);
+    } else {
+      INVARIANT(n == NKeysPerNode);
+
+      if (unlikely(resp_leaf != leaf))
+        // sigh, we really do need parent points- if resp_leaf != leaf, then
+        // all the parent points we saved on the way down are no longer valid
+        return UnlockAndReturn(locked_nodes, I_RETRY);
+      const uint64_t locked_version = resp_leaf->lock();
+      if (unlikely(!btree::CheckVersion(version, locked_version))) {
+        resp_leaf->unlock();
+        goto retry_cur_leaf;
+      }
+      locked_nodes.push_back(resp_leaf);
+
+      // we need to split the current node, potentially causing a bunch of
+      // splits to happen in ancestors. to make this safe w/o
+      // using a very complicated locking protocol, we will first acquire all
+      // locks on nodes which will be modified, in left-to-right,
+      // bottom-to-top order
+
+      if (parents.empty()) {
+        if (unlikely(!resp_leaf->is_root()))
+          return UnlockAndReturn(locked_nodes, I_RETRY);
+        //INVARIANT(resp_leaf == root);
+      } else {
+        for (auto rit = parents.rbegin(); rit != parents.rend(); ++rit) {
+          // lock the parent
+          node *p = rit->first;
+          p->lock();
+          locked_nodes.push_back(p);
+          if (unlikely(!p->check_version(rit->second)))
+            // in traversing down the tree, an ancestor of this node was
+            // modified- to be safe, we start over
+            return UnlockAndReturn(locked_nodes, I_RETRY);
+          if ((rit + 1) == parents.rend()) {
+            // did the root change?
+            if (unlikely(!p->is_root()))
+              return UnlockAndReturn(locked_nodes, I_RETRY);
+            //INVARIANT(p == root);
+          }
+
+          // since the child needs a split, see if we have room in the parent-
+          // if we don't have room, we'll also need to split the parent, in which
+          // case we must grab its parent's lock
+          INVARIANT(p->is_internal_node());
+          size_t parent_n = p->key_slots_used();
+          INVARIANT(parent_n > 0 && parent_n <= NKeysPerNode);
+          if (parent_n < NKeysPerNode)
+            // can stop locking up now, since this node won't split
+            break;
+        }
+      }
+
+      // at this point, we have locked all nodes which will be split/modified
+      // modulo the new nodes to be created
+      resp_leaf->mark_modifying();
+
+      leaf_node *new_leaf = leaf_node::alloc();
+      new_leaf->prefetch();
+
+#ifdef CHECK_INVARIANTS
+      new_leaf->lock();
+      new_leaf->mark_modifying();
+      locked_nodes.push_back(new_leaf);
+#endif /* CHECK_INVARIANTS */
+
+      if (!resp_leaf->next_ && resp_leaf->keys_[n - 1] < kslice) {
+        // sequential insert optimization- in this case, we don't bother
+        // splitting the node. instead, keep the current leaf node full, and
+        // insert the new key into the new leaf node (violating the btree invariant)
+        //
+        // this optimization is commonly implemented, including in masstree and
+        // berkeley db- w/o this optimization, sequential inserts leave the all
+        // nodes half full
+
+        new_leaf->keys_[0] = kslice;
+        new_leaf->values_[0].v_ = v;
+        new_leaf->keyslice_set_length(0, kslicelen, false);
+        if (kslicelen == 9) {
+          new_leaf->alloc_suffixes();
+          rcu_imstring i(k.data() + 8, k.size() - 8);
+          new_leaf->suffixes_[0].swap(i);
+        }
+        new_leaf->set_key_slots_used(1);
+
+      } else {
+        // regular case
+
+        // compute how many keys the smaller node would have if we put the
+        // new key in the left (old) or right (new) side of the split.
+        // then choose the split which maximizes the mininum.
+        //
+        // XXX: do this in a more elegant way
+
+        // a split point S is a number such that indices [0, s) go into the left
+        // partition, and indices [s, N) go into the right partition
+        size_t left_split_point, right_split_point;
+
+        left_split_point = NKeysPerNode / 2;
+        for (ssize_t i = left_split_point - 1; i >= 0; i--) {
+          if (likely(resp_leaf->keys_[i] != resp_leaf->keys_[left_split_point]))
+            break;
+          left_split_point--;
+        }
+        INVARIANT(left_split_point <= NKeysPerNode);
+        INVARIANT(left_split_point == 0 || resp_leaf->keys_[left_split_point - 1] != resp_leaf->keys_[left_split_point]);
+
+        right_split_point = NKeysPerNode / 2;
+        for (ssize_t i = right_split_point - 1; i >= 0 && i < ssize_t(NKeysPerNode) - 1; i++) {
+          if (likely(resp_leaf->keys_[i] != resp_leaf->keys_[right_split_point]))
+            break;
+          right_split_point++;
+        }
+        INVARIANT(right_split_point <= NKeysPerNode);
+        INVARIANT(right_split_point == 0 || resp_leaf->keys_[right_split_point - 1] != resp_leaf->keys_[right_split_point]);
+
+        size_t split_point;
+        if (std::min(left_split_point, NKeysPerNode - left_split_point) <
+            std::min(right_split_point, NKeysPerNode - right_split_point))
+          split_point = right_split_point;
+        else
+          split_point = left_split_point;
+
+        if (split_point <= size_t(lenlowerbound + 1) && resp_leaf->keys_[split_point - 1] != kslice) {
+          // put new key in new leaf (right)
+          size_t pos = lenlowerbound + 1 - split_point;
+
+          copy_into(&new_leaf->keys_[0], resp_leaf->keys_, split_point, lenlowerbound + 1);
+          new_leaf->keys_[pos] = kslice;
+          copy_into(&new_leaf->keys_[pos + 1], resp_leaf->keys_, lenlowerbound + 1, NKeysPerNode);
+
+          copy_into(&new_leaf->values_[0], resp_leaf->values_, split_point, lenlowerbound + 1);
+          new_leaf->values_[pos].v_ = v;
+          copy_into(&new_leaf->values_[pos + 1], resp_leaf->values_, lenlowerbound + 1, NKeysPerNode);
+
+          copy_into(&new_leaf->lengths_[0], resp_leaf->lengths_, split_point, lenlowerbound + 1);
+          new_leaf->keyslice_set_length(pos, kslicelen, false);
+          copy_into(&new_leaf->lengths_[pos + 1], resp_leaf->lengths_, lenlowerbound + 1, NKeysPerNode);
+
+          if (resp_leaf->suffixes_) {
+            new_leaf->ensure_suffixes();
+            swap_with(&new_leaf->suffixes_[0], resp_leaf->suffixes_, split_point, lenlowerbound + 1);
+          }
+          if (kslicelen == 9) {
+            new_leaf->ensure_suffixes();
+            rcu_imstring i(k.data() + 8, k.size() - 8);
+            new_leaf->suffixes_[pos].swap(i);
+          } else if (new_leaf->suffixes_) {
+            rcu_imstring i;
+            new_leaf->suffixes_[pos].swap(i);
+          }
+          if (resp_leaf->suffixes_) {
+            new_leaf->ensure_suffixes();
+            swap_with(&new_leaf->suffixes_[pos + 1], resp_leaf->suffixes_, lenlowerbound + 1, NKeysPerNode);
+          }
+
+          resp_leaf->set_key_slots_used(split_point);
+          new_leaf->set_key_slots_used(NKeysPerNode - split_point + 1);
+
+#ifdef CHECK_INVARIANTS
+          resp_leaf->base_invariant_unique_keys_check();
+          new_leaf->base_invariant_unique_keys_check();
+          INVARIANT(resp_leaf->keys_[split_point - 1] < new_leaf->keys_[0]);
+#endif /* CHECK_INVARIANTS */
+
+        } else {
+          // XXX: not really sure if this invariant is true, but we rely
+          // on it for now
+          INVARIANT(size_t(lenlowerbound + 1) <= split_point);
+
+          // put new key in original leaf
+          copy_into(&new_leaf->keys_[0], resp_leaf->keys_, split_point, NKeysPerNode);
+          copy_into(&new_leaf->values_[0], resp_leaf->values_, split_point, NKeysPerNode);
+          copy_into(&new_leaf->lengths_[0], resp_leaf->lengths_, split_point, NKeysPerNode);
+          if (resp_leaf->suffixes_) {
+            new_leaf->ensure_suffixes();
+            swap_with(&new_leaf->suffixes_[0], resp_leaf->suffixes_, split_point, NKeysPerNode);
+          }
+
+          sift_right(resp_leaf->keys_, lenlowerbound + 1, split_point);
+          resp_leaf->keys_[lenlowerbound + 1] = kslice;
+          sift_right(resp_leaf->values_, lenlowerbound + 1, split_point);
+          resp_leaf->values_[lenlowerbound + 1].v_ = v;
+          sift_right(resp_leaf->lengths_, lenlowerbound + 1, split_point);
+          resp_leaf->keyslice_set_length(lenlowerbound + 1, kslicelen, false);
+          if (resp_leaf->suffixes_)
+            sift_swap_right(resp_leaf->suffixes_, lenlowerbound + 1, split_point);
+          if (kslicelen == 9) {
+            resp_leaf->ensure_suffixes();
+            rcu_imstring i(k.data() + 8, k.size() - 8);
+            resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
+          } else if (resp_leaf->suffixes_) {
+            rcu_imstring i;
+            resp_leaf->suffixes_[lenlowerbound + 1].swap(i);
+          }
+
+          resp_leaf->set_key_slots_used(split_point + 1);
+          new_leaf->set_key_slots_used(NKeysPerNode - split_point);
+
+#ifdef CHECK_INVARIANTS
+          resp_leaf->base_invariant_unique_keys_check();
+          new_leaf->base_invariant_unique_keys_check();
+          INVARIANT(resp_leaf->keys_[split_point] < new_leaf->keys_[0]);
+#endif /* CHECK_INVARIANTS */
+        }
+      }
+
+      // pointer adjustment
+      new_leaf->prev_ = resp_leaf;
+      new_leaf->next_ = resp_leaf->next_;
+      if (resp_leaf->next_)
+        resp_leaf->next_->prev_ = new_leaf;
+      resp_leaf->next_ = new_leaf;
+
+      min_key = new_leaf->keys_[0];
+      new_leaf->min_key_ = min_key;
+      new_node = new_leaf;
+
+      if (insert_info) {
+        insert_info->node = resp_leaf;
+        insert_info->old_version = RawVersionManip::Version(resp_leaf->unstable_version()); // we hold lock on leaf
+        insert_info->new_version = insert_info->old_version + 1;
+      }
+
+      return I_SPLIT;
+    }
+  } else {
+    internal_node *internal = AsInternal(np);
+    uint64_t version = internal->stable_version();
+    if (unlikely(RawVersionManip::IsDeleting(version)))
+      return UnlockAndReturn(locked_nodes, I_RETRY);
+    key_search_ret kret = internal->key_lower_bound_search(kslice);
+    ssize_t ret = kret.first;
+    size_t n = kret.second;
+    size_t child_idx = (ret == -1) ? 0 : ret + 1;
+    node *child_ptr = internal->children_[child_idx];
+    if (unlikely(!internal->check_version(version)))
+      return UnlockAndReturn(locked_nodes, I_RETRY);
+    parents.push_back(insert_parent_entry(internal, version));
+    key_slice mk = 0;
+    node *new_child = NULL;
+    insert_status status =
+      insert0(child_ptr, k, v, only_if_absent, old_v, insert_info,
+              mk, new_child, parents, locked_nodes);
+    if (status != I_SPLIT) {
+      INVARIANT(locked_nodes.empty());
+      return status;
+    }
+    INVARIANT(new_child);
+    INVARIANT(internal->is_locked()); // previous call to insert0() must lock internal node for insertion
+    INVARIANT(internal->is_lock_owner());
+    INVARIANT(internal->check_version(version));
+    INVARIANT(new_child->key_slots_used() > 0);
+    INVARIANT(n > 0);
+    internal->mark_modifying();
+    if (n < NKeysPerNode) {
+      sift_right(internal->keys_, child_idx, n);
+      internal->keys_[child_idx] = mk;
+      sift_right(internal->children_, child_idx + 1, n + 1);
+      internal->children_[child_idx + 1] = new_child;
+      internal->inc_key_slots_used();
+      return UnlockAndReturn(locked_nodes, I_NONE_MOD);
+    } else {
+      INVARIANT(n == NKeysPerNode);
+      INVARIANT(ret == internal->key_lower_bound_search(mk).first);
+
+      internal_node *new_internal = internal_node::alloc();
+      new_internal->prefetch();
+#ifdef CHECK_INVARIANTS
+      new_internal->lock();
+      new_internal->mark_modifying();
+      locked_nodes.push_back(new_internal);
+#endif /* CHECK_INVARIANTS */
+
+      // there are three cases post-split:
+      // (1) mk goes in the original node
+      // (2) mk is the key we push up
+      // (3) mk goes in the new node
+
+      const ssize_t split_point = NMinKeysPerNode - 1;
+      if (ret < split_point) {
+        // case (1)
+        min_key = internal->keys_[split_point];
+
+        copy_into(&new_internal->keys_[0], internal->keys_, NMinKeysPerNode, NKeysPerNode);
+        copy_into(&new_internal->children_[0], internal->children_, NMinKeysPerNode, NKeysPerNode + 1);
+        new_internal->set_key_slots_used(NKeysPerNode - NMinKeysPerNode);
+
+        sift_right(internal->keys_, child_idx, NMinKeysPerNode - 1);
+        internal->keys_[child_idx] = mk;
+        sift_right(internal->children_, child_idx + 1, NMinKeysPerNode);
+        internal->children_[child_idx + 1] = new_child;
+        internal->set_key_slots_used(NMinKeysPerNode);
+
+      } else if (ret == split_point) {
+        // case (2)
+        min_key = mk;
+
+        copy_into(&new_internal->keys_[0], internal->keys_, NMinKeysPerNode, NKeysPerNode);
+        copy_into(&new_internal->children_[1], internal->children_, NMinKeysPerNode + 1, NKeysPerNode + 1);
+        new_internal->children_[0] = new_child;
+        new_internal->set_key_slots_used(NKeysPerNode - NMinKeysPerNode);
+        internal->set_key_slots_used(NMinKeysPerNode);
+
+      } else {
+        // case (3)
+        min_key = internal->keys_[NMinKeysPerNode];
+
+        size_t pos = child_idx - NMinKeysPerNode - 1;
+
+        copy_into(&new_internal->keys_[0], internal->keys_, NMinKeysPerNode + 1, child_idx);
+        new_internal->keys_[pos] = mk;
+        copy_into(&new_internal->keys_[pos + 1], internal->keys_, child_idx, NKeysPerNode);
+
+        copy_into(&new_internal->children_[0], internal->children_, NMinKeysPerNode + 1, child_idx + 1);
+        new_internal->children_[pos + 1] = new_child;
+        copy_into(&new_internal->children_[pos + 2], internal->children_, child_idx + 1, NKeysPerNode + 1);
+
+        new_internal->set_key_slots_used(NKeysPerNode - NMinKeysPerNode);
+        internal->set_key_slots_used(NMinKeysPerNode);
+      }
+
+      INVARIANT(internal->keys_[internal->key_slots_used() - 1] < new_internal->keys_[0]);
+      new_node = new_internal;
+      return I_SPLIT;
+    }
+  }
+}
+
+template <typename P>
+bool
+btree<P>::insert_stable_location(
+    node **root_location, const key_type &k, value_type v,
+    bool only_if_absent, value_type *old_v,
+    insert_info_t *insert_info)
+{
+  INVARIANT(rcu::s_instance.in_rcu_region());
+retry:
+  key_slice mk;
+  node *ret;
+  typename util::vec<insert_parent_entry>::type parents;
+  typename util::vec<node *>::type locked_nodes;
+  node *local_root = *root_location;
+  const insert_status status =
+    insert0(local_root, k, v, only_if_absent, old_v, insert_info,
+            mk, ret, parents, locked_nodes);
+  INVARIANT(status == I_SPLIT || locked_nodes.empty());
+  switch (status) {
+  case I_NONE_NOMOD:
+    return false;
+  case I_NONE_MOD:
+    return true;
+  case I_RETRY:
+    goto retry;
+  case I_SPLIT:
+    INVARIANT(ret);
+    INVARIANT(ret->key_slots_used() > 0);
+    INVARIANT(local_root->is_modifying());
+    INVARIANT(local_root->is_lock_owner());
+    INVARIANT(local_root->is_root());
+    INVARIANT(local_root == *root_location);
+    internal_node *new_root = internal_node::alloc();
+#ifdef CHECK_INVARIANTS
+    new_root->lock();
+    new_root->mark_modifying();
+    locked_nodes.push_back(new_root);
+#endif /* CHECK_INVARIANTS */
+    new_root->children_[0] = local_root;
+    new_root->children_[1] = ret;
+    new_root->keys_[0] = mk;
+    new_root->set_key_slots_used(1);
+    new_root->set_root();
+    local_root->clear_root();
+    COMPILER_MEMORY_FENCE;
+    *root_location = new_root;
+    // locks are still held here
+    return UnlockAndReturn(locked_nodes, true);
+  }
+  ALWAYS_ASSERT(false);
+  return false;
+}
+
+/**
+ * remove is very tricky to get right!
+ *
+ * XXX: optimize remove so it holds less locks
+ */
+template <typename P>
+typename btree<P>::remove_status
+btree<P>::remove0(node *np,
+                  key_slice *min_key,
+                  key_slice *max_key,
+                  const key_type &k,
+                  value_type *old_v,
+                  node *left_node,
+                  node *right_node,
+                  key_slice &new_key,
+                  node *&replace_node,
+                  typename util::vec<remove_parent_entry>::type &parents,
+                  typename util::vec<node *>::type &locked_nodes)
+{
+  uint64_t kslice = k.slice();
+  size_t kslicelen = std::min(k.size(), size_t(9));
+
+  np->prefetch();
+  if (leaf_node *leaf = AsLeafCheck(np)) {
+    INVARIANT(locked_nodes.empty());
+
+    SINGLE_THREADED_INVARIANT(!left_node || (leaf->prev_ == left_node && AsLeaf(left_node)->next == leaf));
+    SINGLE_THREADED_INVARIANT(!right_node || (leaf->next_ == right_node && AsLeaf(right_node)->prev == leaf));
+
+retry_cur_leaf:
+    uint64_t version;
+    size_t n;
+    ssize_t ret;
+    leaf_node *resp_leaf = FindRespLeafExact(
+        leaf, kslice, kslicelen, version, n, ret);
+
+    if (ret == -1) {
+      if (unlikely(!resp_leaf->check_version(version)))
+        goto retry_cur_leaf;
+      return UnlockAndReturn(locked_nodes, R_NONE_NOMOD);
+    }
+    if (kslicelen == 9) {
+      if (resp_leaf->is_layer(ret)) {
+        node *subroot = resp_leaf->values_[ret].n_;
+        INVARIANT(subroot);
+        if (unlikely(!resp_leaf->check_version(version)))
+          goto retry_cur_leaf;
+
+        key_slice new_key;
+        node *replace_node = NULL;
+        typename util::vec<remove_parent_entry>::type sub_parents;
+        typename util::vec<node *>::type sub_locked_nodes;
+        remove_status status = remove0(subroot,
+            NULL, /* min_key */
+            NULL, /* max_key */
+            k.shift(),
+            old_v,
+            NULL, /* left_node */
+            NULL, /* right_node */
+            new_key,
+            replace_node,
+            sub_parents,
+            sub_locked_nodes);
+        switch (status) {
+        case R_NONE_NOMOD:
+        case R_NONE_MOD:
+        case R_RETRY:
+          INVARIANT(sub_locked_nodes.empty());
+          return status;
+
+        case R_REPLACE_NODE:
+          INVARIANT(replace_node);
+          for (;;) {
+            resp_leaf = FindRespLeafExact(
+                resp_leaf, kslice, kslicelen, version, n, ret);
+            const uint64_t locked_version = resp_leaf->lock();
+            if (likely(btree::CheckVersion(version, locked_version))) {
+              locked_nodes.push_back(resp_leaf);
+              break;
+            }
+            resp_leaf->unlock();
+          }
+
+          INVARIANT(subroot->is_deleting());
+          INVARIANT(subroot->is_lock_owner());
+          INVARIANT(subroot->is_root());
+          replace_node->set_root();
+          subroot->clear_root();
+          resp_leaf->values_[ret].n_ = replace_node;
+
+          // XXX: need to re-merge back when layer size becomes 1, but skip this
+          // for now
+
+          // locks are still held here
+          UnlockNodes(sub_locked_nodes);
+          return UnlockAndReturn(locked_nodes, R_NONE_MOD);
+
+        default:
+          break;
+        }
+        ALWAYS_ASSERT(false);
+
+      } else {
+        // suffix check
+        if (resp_leaf->suffix(ret) != k.shift()) {
+          if (unlikely(!resp_leaf->check_version(version)))
+            goto retry_cur_leaf;
+          return UnlockAndReturn(locked_nodes, R_NONE_NOMOD);
+        }
+      }
+    }
+
+    //INVARIANT(!resp_leaf->is_layer(ret));
+    if (n > NMinKeysPerNode) {
+      const uint64_t locked_version = resp_leaf->lock();
+      if (unlikely(!btree::CheckVersion(version, locked_version))) {
+        resp_leaf->unlock();
+        goto retry_cur_leaf;
+      }
+      locked_nodes.push_back(resp_leaf);
+      if (old_v)
+        *old_v = resp_leaf->values_[ret].v_;
+      resp_leaf->mark_modifying();
+      remove_pos_from_leaf_node(resp_leaf, ret, n);
+      return UnlockAndReturn(locked_nodes, R_NONE_MOD);
+    } else {
+
+      if (unlikely(resp_leaf != leaf))
+        return UnlockAndReturn(locked_nodes, R_RETRY);
+      const uint64_t locked_version = leaf->lock();
+      if (unlikely(!btree::CheckVersion(version, locked_version))) {
+        leaf->unlock();
+        goto retry_cur_leaf;
+      }
+      locked_nodes.push_back(leaf);
+      if (old_v)
+        *old_v = leaf->values_[ret].v_;
+
+      uint64_t leaf_version = leaf->unstable_version();
+
+      leaf_node *left_sibling = AsLeaf(left_node);
+      leaf_node *right_sibling = AsLeaf(right_node);
+
+      // NOTE: remember that our locking discipline is left-to-right,
+      // bottom-to-top. Here, we must acquire all locks on nodes being
+      // modified in the tree. How we choose to handle removes is in the
+      // following preference:
+      //   1) steal from right node
+      //   2) merge with right node
+      //   3) steal from left node
+      //   4) merge with left node
+      //
+      // Btree invariants guarantee that at least one of the options above is
+      // available (except for the root node). We pick the right node first,
+      // because our locking discipline allows us to directly lock the right
+      // node.  If (1) and (2) cannot be satisfied, then we must first
+      // *unlock* the current node, lock the left node, relock the current
+      // node, and check nothing changed in between.
+
+      if (right_sibling) {
+        right_sibling->lock();
+        locked_nodes.push_back(right_sibling);
+      } else if (left_sibling) {
+        leaf->unlock();
+        left_sibling->lock();
+        locked_nodes.push_back(left_sibling);
+        leaf->lock();
+        if (unlikely(!leaf->check_version(leaf_version)))
+          return UnlockAndReturn(locked_nodes, R_RETRY);
+      } else {
+        INVARIANT(parents.empty());
+        if (unlikely(!leaf->is_root()))
+          return UnlockAndReturn(locked_nodes, R_RETRY);
+        //INVARIANT(leaf == root);
+      }
+
+      for (typename util::vec<remove_parent_entry>::type::reverse_iterator rit = parents.rbegin();
+           rit != parents.rend(); ++rit) {
+        node *p = rit->parent_;
+        node *l = rit->parent_left_sibling_;
+        node *r = rit->parent_right_sibling_;
+        uint64_t p_version = rit->parent_version_;
+        p->lock();
+        locked_nodes.push_back(p);
+        if (unlikely(!p->check_version(p_version)))
+          return UnlockAndReturn(locked_nodes, R_RETRY);
+        size_t p_n = p->key_slots_used();
+        if (p_n > NMinKeysPerNode)
+          break;
+        if (r) {
+          r->lock();
+          locked_nodes.push_back(r);
+        } else if (l) {
+          p->unlock();
+          l->lock();
+          locked_nodes.push_back(l);
+          p->lock();
+          if (unlikely(!p->check_version(p_version)))
+            return UnlockAndReturn(locked_nodes, R_RETRY);
+        } else {
+          if (unlikely(!p->is_root()))
+            return UnlockAndReturn(locked_nodes, R_RETRY);
+          //INVARIANT(p == root);
+        }
+      }
+
+      leaf->mark_modifying();
+
+      if (right_sibling) {
+        right_sibling->mark_modifying();
+        size_t right_n = right_sibling->key_slots_used();
+        if (right_n > NMinKeysPerNode) {
+          // steal first contiguous key slices from right
+          INVARIANT(right_sibling->keys_[0] > leaf->keys_[n - 1]);
+
+          // indices [0, steal_point) will be taken from the right
+          size_t steal_point = 1;
+          for (size_t i = 0; i < right_n - 1; i++, steal_point++)
+            if (likely(right_sibling->keys_[i] != right_sibling->keys_[steal_point]))
+              break;
+
+          INVARIANT(steal_point <= sizeof(key_slice) + 2);
+          INVARIANT(steal_point <= right_n);
+
+          // to steal, we need to ensure:
+          // 1) we have enough room to steal
+          // 2) the right sibling will not be empty after the steal
+          if ((n - 1 + steal_point) <= NKeysPerNode && steal_point < right_n) {
+            sift_left(leaf->keys_, ret, n);
+            copy_into(&leaf->keys_[n - 1], right_sibling->keys_, 0, steal_point);
+            sift_left(leaf->values_, ret, n);
+            copy_into(&leaf->values_[n - 1], right_sibling->values_, 0, steal_point);
+            sift_left(leaf->lengths_, ret, n);
+            copy_into(&leaf->lengths_[n - 1], right_sibling->lengths_, 0, steal_point);
+            if (leaf->suffixes_)
+              sift_swap_left(leaf->suffixes_, ret, n);
+            if (right_sibling->suffixes_) {
+              leaf->ensure_suffixes();
+              swap_with(&leaf->suffixes_[n - 1], right_sibling->suffixes_, 0, steal_point);
+            }
+
+            sift_left(right_sibling->keys_, 0, right_n, steal_point);
+            sift_left(right_sibling->values_, 0, right_n, steal_point);
+            sift_left(right_sibling->lengths_, 0, right_n, steal_point);
+            if (right_sibling->suffixes_)
+              sift_swap_left(right_sibling->suffixes_, 0, right_n, steal_point);
+            leaf->set_key_slots_used(n - 1 + steal_point);
+            right_sibling->set_key_slots_used(right_n - steal_point);
+            new_key = right_sibling->keys_[0];
+            right_sibling->min_key_ = new_key;
+
+#ifdef CHECK_INVARIANTS
+            leaf->base_invariant_unique_keys_check();
+            right_sibling->base_invariant_unique_keys_check();
+            INVARIANT(leaf->keys_[n - 1 + steal_point - 1] < new_key);
+#endif /* CHECK_INVARIANTS */
+
+            return R_STOLE_FROM_RIGHT;
+          } else {
+            // can't steal, so try merging- but only merge if we have room,
+            // otherwise just allow this node to have less elements
+            if ((n - 1 + right_n) > NKeysPerNode) {
+              INVARIANT(n > 1); // if we can't steal or merge, we must have
+                                // enough elements to just remove one w/o going empty
+              remove_pos_from_leaf_node(leaf, ret, n);
+              return UnlockAndReturn(locked_nodes, R_NONE_MOD);
+            }
+          }
+        }
+
+        // merge right sibling into this node
+        INVARIANT(right_sibling->keys_[0] > leaf->keys_[n - 1]);
+        INVARIANT((right_n + (n - 1)) <= NKeysPerNode);
+
+        sift_left(leaf->keys_, ret, n);
+        copy_into(&leaf->keys_[n - 1], right_sibling->keys_, 0, right_n);
+
+        sift_left(leaf->values_, ret, n);
+        copy_into(&leaf->values_[n - 1], right_sibling->values_, 0, right_n);
+
+        sift_left(leaf->lengths_, ret, n);
+        copy_into(&leaf->lengths_[n - 1], right_sibling->lengths_, 0, right_n);
+
+        if (leaf->suffixes_)
+          sift_swap_left(leaf->suffixes_, ret, n);
+
+        if (right_sibling->suffixes_) {
+          leaf->ensure_suffixes();
+          swap_with(&leaf->suffixes_[n - 1], right_sibling->suffixes_, 0, right_n);
+        }
+
+        leaf->set_key_slots_used(right_n + (n - 1));
+        leaf->next_ = right_sibling->next_;
+        if (right_sibling->next_)
+          right_sibling->next_->prev_ = leaf;
+
+        // leaf->next_->prev won't change because we hold lock for both leaf
+        // and right_sibling
+        INVARIANT(!leaf->next_ || leaf->next_->prev_ == leaf);
+
+        // leaf->prev_->next might change, however, since the left node could be
+        // splitting (and we might hold a pointer to the left-split of the left node,
+        // before it gets updated)
+        SINGLE_THREADED_INVARIANT(!leaf->prev_ || leaf->prev_->next_ == leaf);
+
+//#ifdef CHECK_INVARIANTS
+//        leaf->base_invariant_unique_keys_check();
+//#endif
+        leaf_node::release(right_sibling);
+        return R_MERGE_WITH_RIGHT;
+      }
+
+      if (left_sibling) {
+        left_sibling->mark_modifying();
+        size_t left_n = left_sibling->key_slots_used();
+        if (left_n > NMinKeysPerNode) {
+          // try to steal from left
+          INVARIANT(left_sibling->keys_[left_n - 1] < leaf->keys_[0]);
+
+          // indices [steal_point, left_n) will be taken from the left
+          size_t steal_point = left_n - 1;
+          for (ssize_t i = steal_point - 1; i >= 0; i--, steal_point--)
+            if (likely(left_sibling->keys_[i] != left_sibling->keys_[steal_point]))
+              break;
+
+          size_t nstolen = left_n - steal_point;
+          INVARIANT(nstolen <= sizeof(key_slice) + 2);
+          INVARIANT(steal_point < left_n);
+
+          if ((n - 1 + nstolen) <= NKeysPerNode && steal_point > 0) {
+            sift_right(leaf->keys_, ret + 1, n, nstolen - 1);
+            sift_right(leaf->keys_, 0, ret, nstolen);
+            copy_into(&leaf->keys_[0], &left_sibling->keys_[0], left_n - nstolen, left_n);
+
+            sift_right(leaf->values_, ret + 1, n, nstolen - 1);
+            sift_right(leaf->values_, 0, ret, nstolen);
+            copy_into(&leaf->values_[0], &left_sibling->values_[0], left_n - nstolen, left_n);
+
+            sift_right(leaf->lengths_, ret + 1, n, nstolen - 1);
+            sift_right(leaf->lengths_, 0, ret, nstolen);
+            copy_into(&leaf->lengths_[0], &left_sibling->lengths_[0], left_n - nstolen, left_n);
+
+            if (leaf->suffixes_) {
+              sift_swap_right(leaf->suffixes_, ret + 1, n, nstolen - 1);
+              sift_swap_right(leaf->suffixes_, 0, ret, nstolen);
+            }
+            if (left_sibling->suffixes_) {
+              leaf->ensure_suffixes();
+              swap_with(&leaf->suffixes_[0], &left_sibling->suffixes_[0], left_n - nstolen, left_n);
+            }
+
+            left_sibling->set_key_slots_used(left_n - nstolen);
+            leaf->set_key_slots_used(n - 1 + nstolen);
+            new_key = leaf->keys_[0];
+            leaf->min_key_ = new_key;
+
+#ifdef CHECK_INVARIANTS
+            leaf->base_invariant_unique_keys_check();
+            left_sibling->base_invariant_unique_keys_check();
+            INVARIANT(left_sibling->keys_[left_n - nstolen - 1] < new_key);
+#endif /* CHECK_INVARIANTS */
+
+            return R_STOLE_FROM_LEFT;
+          } else {
+            if ((left_n + (n - 1)) > NKeysPerNode) {
+              INVARIANT(n > 1);
+              remove_pos_from_leaf_node(leaf, ret, n);
+              return UnlockAndReturn(locked_nodes, R_NONE_MOD);
+            }
+          }
+        }
+
+        // merge this node into left sibling
+        INVARIANT(left_sibling->keys_[left_n - 1] < leaf->keys_[0]);
+        INVARIANT((left_n + (n - 1)) <= NKeysPerNode);
+
+        copy_into(&left_sibling->keys_[left_n], leaf->keys_, 0, ret);
+        copy_into(&left_sibling->keys_[left_n + ret], leaf->keys_, ret + 1, n);
+
+        copy_into(&left_sibling->values_[left_n], leaf->values_, 0, ret);
+        copy_into(&left_sibling->values_[left_n + ret], leaf->values_, ret + 1, n);
+
+        copy_into(&left_sibling->lengths_[left_n], leaf->lengths_, 0, ret);
+        copy_into(&left_sibling->lengths_[left_n + ret], leaf->lengths_, ret + 1, n);
+
+        if (leaf->suffixes_) {
+          left_sibling->ensure_suffixes();
+          swap_with(&left_sibling->suffixes_[left_n], leaf->suffixes_, 0, ret);
+          swap_with(&left_sibling->suffixes_[left_n + ret], leaf->suffixes_, ret + 1, n);
+        }
+
+        left_sibling->set_key_slots_used(left_n + (n - 1));
+        left_sibling->next_ = leaf->next_;
+        if (leaf->next_)
+          leaf->next_->prev_ = left_sibling;
+
+        // see comments in right_sibling case above, for why one of them is INVARIANT and
+        // the other is SINGLE_THREADED_INVARIANT
+        INVARIANT(!left_sibling->next_ || left_sibling->next_->prev_ == left_sibling);
+        SINGLE_THREADED_INVARIANT(
+            !left_sibling->prev_ ||
+            left_sibling->prev_->next_ == left_sibling);
+
+        //left_sibling->base_invariant_unique_keys_check();
+        leaf_node::release(leaf);
+        return R_MERGE_WITH_LEFT;
+      }
+
+      // root node, so we are ok
+      //INVARIANT(leaf == root);
+      INVARIANT(leaf->is_root());
+      remove_pos_from_leaf_node(leaf, ret, n);
+      return UnlockAndReturn(locked_nodes, R_NONE_MOD);
+    }
+  } else {
+    internal_node *internal = AsInternal(np);
+    uint64_t version = internal->stable_version();
+    if (unlikely(RawVersionManip::IsDeleting(version)))
+      return UnlockAndReturn(locked_nodes, R_RETRY);
+    key_search_ret kret = internal->key_lower_bound_search(kslice);
+    ssize_t ret = kret.first;
+    size_t n = kret.second;
+    size_t child_idx = (ret == -1) ? 0 : ret + 1;
+    node *child_ptr = internal->children_[child_idx];
+    key_slice *child_min_key = child_idx == 0 ? NULL : &internal->keys_[child_idx - 1];
+    key_slice *child_max_key = child_idx == n ? NULL : &internal->keys_[child_idx];
+    node *child_left_sibling = child_idx == 0 ? NULL : internal->children_[child_idx - 1];
+    node *child_right_sibling = child_idx == n ? NULL : internal->children_[child_idx + 1];
+    if (unlikely(!internal->check_version(version)))
+      return UnlockAndReturn(locked_nodes, R_RETRY);
+    parents.push_back(remove_parent_entry(internal, left_node, right_node, version));
+    INVARIANT(n > 0);
+    key_slice nk;
+    node *rn;
+    remove_status status = remove0(child_ptr,
+        child_min_key,
+        child_max_key,
+        k,
+        old_v,
+        child_left_sibling,
+        child_right_sibling,
+        nk,
+        rn,
+        parents,
+        locked_nodes);
+    switch (status) {
+      case R_NONE_NOMOD:
+      case R_NONE_MOD:
+      case R_RETRY:
+        return status;
+
+      case R_STOLE_FROM_LEFT:
+        INVARIANT(internal->is_locked());
+        INVARIANT(internal->is_lock_owner());
+        internal->keys_[child_idx - 1] = nk;
+        return UnlockAndReturn(locked_nodes, R_NONE_MOD);
+
+      case R_STOLE_FROM_RIGHT:
+        INVARIANT(internal->is_locked());
+        INVARIANT(internal->is_lock_owner());
+        internal->keys_[child_idx] = nk;
+        return UnlockAndReturn(locked_nodes, R_NONE_MOD);
+
+      case R_MERGE_WITH_LEFT:
+      case R_MERGE_WITH_RIGHT:
+        {
+          internal->mark_modifying();
+
+          size_t del_key_idx, del_child_idx;
+          if (status == R_MERGE_WITH_LEFT) {
+            // need to delete key at position (child_idx - 1), and child at
+            // position (child_idx)
+            del_key_idx = child_idx - 1;
+            del_child_idx = child_idx;
+          } else {
+            // need to delete key at position (child_idx), and child at
+            // posiiton (child_idx + 1)
+            del_key_idx = child_idx;
+            del_child_idx = child_idx + 1;
+          }
+
+          if (n > NMinKeysPerNode) {
+            remove_pos_from_internal_node(internal, del_key_idx, del_child_idx, n);
+            return UnlockAndReturn(locked_nodes, R_NONE_MOD);
+          }
+
+          internal_node *left_sibling = AsInternal(left_node);
+          internal_node *right_sibling = AsInternal(right_node);
+
+          // WARNING: if you change the order of events here, then you must also
+          // change the locking protocol (see the comment in the leaf node case)
+
+          if (right_sibling) {
+            right_sibling->mark_modifying();
+            size_t right_n = right_sibling->key_slots_used();
+            INVARIANT(max_key);
+            INVARIANT(right_sibling->keys_[0] > internal->keys_[n - 1]);
+            INVARIANT(*max_key > internal->keys_[n - 1]);
+            if (right_n > NMinKeysPerNode) {
+              // steal from right
+              sift_left(internal->keys_, del_key_idx, n);
+              internal->keys_[n - 1] = *max_key;
+
+              sift_left(internal->children_, del_child_idx, n + 1);
+              internal->children_[n] = right_sibling->children_[0];
+
+              new_key = right_sibling->keys_[0];
+
+              sift_left(right_sibling->keys_, 0, right_n);
+              sift_left(right_sibling->children_, 0, right_n + 1);
+              right_sibling->dec_key_slots_used();
+
+              return R_STOLE_FROM_RIGHT;
+            } else {
+              // merge with right
+              INVARIANT(max_key);
+
+              sift_left(internal->keys_, del_key_idx, n);
+              internal->keys_[n - 1] = *max_key;
+              copy_into(&internal->keys_[n], right_sibling->keys_, 0, right_n);
+
+              sift_left(internal->children_, del_child_idx, n + 1);
+              copy_into(&internal->children_[n], right_sibling->children_, 0, right_n + 1);
+
+              internal->set_key_slots_used(n + right_n);
+              internal_node::release(right_sibling);
+              return R_MERGE_WITH_RIGHT;
+            }
+          }
+
+          if (left_sibling) {
+            left_sibling->mark_modifying();
+            size_t left_n = left_sibling->key_slots_used();
+            INVARIANT(min_key);
+            INVARIANT(left_sibling->keys_[left_n - 1] < internal->keys_[0]);
+            INVARIANT(left_sibling->keys_[left_n - 1] < *min_key);
+            INVARIANT(*min_key < internal->keys_[0]);
+            if (left_n > NMinKeysPerNode) {
+              // steal from left
+              sift_right(internal->keys_, 0, del_key_idx);
+              internal->keys_[0] = *min_key;
+
+              sift_right(internal->children_, 0, del_child_idx);
+              internal->children_[0] = left_sibling->children_[left_n];
+
+              new_key = left_sibling->keys_[left_n - 1];
+              left_sibling->dec_key_slots_used();
+
+              return R_STOLE_FROM_LEFT;
+            } else {
+              // merge into left sibling
+              INVARIANT(min_key);
+
+              size_t left_key_j = left_n;
+              size_t left_child_j = left_n + 1;
+
+              left_sibling->keys_[left_key_j++] = *min_key;
+
+              copy_into(&left_sibling->keys_[left_key_j], internal->keys_, 0, del_key_idx);
+              left_key_j += del_key_idx;
+              copy_into(&left_sibling->keys_[left_key_j], internal->keys_, del_key_idx + 1, n);
+
+              copy_into(&left_sibling->children_[left_child_j], internal->children_, 0, del_child_idx);
+              left_child_j += del_child_idx;
+              copy_into(&left_sibling->children_[left_child_j], internal->children_, del_child_idx + 1, n + 1);
+
+              left_sibling->set_key_slots_used(n + left_n);
+              internal_node::release(internal);
+              return R_MERGE_WITH_LEFT;
+            }
+          }
+
+          //INVARIANT(internal == root);
+          INVARIANT(internal->is_root());
+          remove_pos_from_internal_node(internal, del_key_idx, del_child_idx, n);
+          INVARIANT(internal->key_slots_used() + 1 == n);
+          if ((n - 1) == 0) {
+            replace_node = internal->children_[0];
+            internal_node::release(internal);
+            return R_REPLACE_NODE;
+          }
+
+          return UnlockAndReturn(locked_nodes, R_NONE_MOD);
+        }
+
+      default:
+        ALWAYS_ASSERT(false);
+        return UnlockAndReturn(locked_nodes, R_NONE_NOMOD);
+    }
+  }
+}
+
+template <typename P>
+std::string
+btree<P>::NodeStringify(const node_opaque_t *n)
+{
+  std::vector<std::string> keys;
+  for (size_t i = 0; i < n->key_slots_used(); i++)
+    keys.push_back(std::string("0x") + util::hexify(n->keys_[i]));
+
+  std::ostringstream b;
+  b << "node[v=" << n->version_info_str()
+    << ", keys=" << util::format_list(keys.begin(), keys.end());
+
+  if (n->is_leaf_node()) {
+    const leaf_node *leaf = AsLeaf(n);
+    std::vector<std::string> lengths;
+    for (size_t i = 0; i < leaf->key_slots_used(); i++) {
+      std::ostringstream inf;
+      inf << "<l=" << leaf->keyslice_length(i) << ",is_layer=" << leaf->is_layer(i) << ">";
+      lengths.push_back(inf.str());
+    }
+    b << ", lengths=" << util::format_list(lengths.begin(), lengths.end());
+  } else {
+    //const internal_node *internal = AsInternal(n);
+    // nothing for now
+  }
+
+  b << "]";
+  return b.str();
+}
+
+template <typename P>
+std::vector<std::pair<typename btree<P>::value_type, bool>>
+btree<P>::ExtractValues(const node_opaque_t *n)
+{
+  std::vector< std::pair<value_type, bool> > ret;
+  if (!n->is_leaf_node())
+    return ret;
+  const leaf_node *leaf = (const leaf_node *) n;
+  const size_t sz = leaf->key_slots_used();
+  for (size_t i = 0; i < sz; i++)
+    if (!leaf->is_layer(i))
+      ret.emplace_back(leaf->values_[i].v_, leaf->keyslice_length(i) > 8);
+  return ret;
+}