2 * Eddie Kohler, Yandong Mao, Robert Morris
3 * Copyright (c) 2012-2014 President and Fellows of Harvard College
4 * Copyright (c) 2012-2014 Massachusetts Institute of Technology
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, subject to the conditions
9 * listed in the Masstree LICENSE file. These conditions include: you must
10 * preserve this copyright notice, and you cannot mention the copyright
11 * holders in advertising related to the Software without their permission.
12 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13 * notice is a summary of the Masstree LICENSE file; the license in that file
16 #ifndef MASSTREE_SPLIT_HH
17 #define MASSTREE_SPLIT_HH 1
18 #include "masstree_tcursor.hh"
19 #include "btree_leaflink.hh"
22 /** @brief Return ikey at position @a i, assuming insert of @a ka at @a ka_i. */
24 inline typename P::ikey_type
25 leaf<P>::ikey_after_insert(const permuter_type& perm, int i,
26 const key_type& ka, int ka_i) const
29 return this->ikey0_[perm[i]];
33 return this->ikey0_[perm[i - 1]];
36 /** @brief Split this node into *@a nr and insert @a ka at position @a p.
37 @pre *@a nr is a new empty leaf
38 @pre this->locked() && @a nr->locked()
39 @post split_ikey is the first key in *@a nr
42 If @a p == this->size() and *this is the rightmost node in the layer,
43 then this code assumes we're inserting nodes in sequential order, and
44 the split does not move any keys.
46 The split type is 0 if @a ka went into *this, 1 if the @a ka went into
47 *@a nr, and 2 for the sequential-order optimization (@a ka went into *@a
48 nr and no other keys were moved). */
50 int leaf<P>::split_into(leaf<P>* nr, int p, const key_type& ka,
51 ikey_type& split_ikey, threadinfo& ti)
53 // B+tree leaf insertion.
54 // Split *this, with items [0,T::width), into *this + nr, simultaneously
55 // inserting "ka:value" at position "p" (0 <= p <= T::width).
57 // Let mid = floor(T::width / 2) + 1. After the split,
58 // "*this" contains [0,mid) and "nr" contains [mid,T::width+1).
59 // If p < mid, then x goes into *this, and the first element of nr
60 // will be former item (mid - 1).
61 // If p >= mid, then x goes into nr.
62 masstree_precondition(!this->concurrent || (this->locked() && nr->locked()));
63 masstree_precondition(this->size() >= this->width - 1);
65 int width = this->size(); // == this->width or this->width - 1
66 int mid = this->width / 2 + 1;
67 if (p == 0 && !this->prev_)
69 else if (p == width && !this->next_.ptr)
72 // Never separate keys with the same ikey0.
73 permuter_type perml(this->permutation_);
74 ikey_type mid_ikey = ikey_after_insert(perml, mid, ka, p);
75 if (mid_ikey == ikey_after_insert(perml, mid - 1, ka, p)) {
76 int midl = mid - 2, midr = mid + 1;
79 && mid_ikey != ikey_after_insert(perml, midr, ka, p)) {
83 && mid_ikey != ikey_after_insert(perml, midl, ka, p)) {
89 masstree_invariant(mid > 0 && mid <= width);
92 typename permuter_type::value_type pv = perml.value_from(mid - (p < mid));
93 for (int x = mid; x <= width; ++x)
95 nr->assign_initialize(x - mid, ka, ti);
97 nr->assign_initialize(x - mid, this, pv & 15, ti);
100 permuter_type permr = permuter_type::make_sorted(width + 1 - mid);
102 permr.remove_to_back(p - mid);
103 nr->permutation_ = permr.value();
105 btree_leaflink<leaf<P>, P::concurrent>::link_split(this, nr);
107 split_ikey = nr->ikey0_[0];
108 return p >= mid ? 1 + (mid == width) : 0;
111 template <typename P>
112 int internode<P>::split_into(internode<P> *nr, int p, ikey_type ka,
113 node_base<P> *value, ikey_type& split_ikey,
116 // B+tree internal node insertion.
117 // Split *this, with items [0,T::width), into *this + nr, simultaneously
118 // inserting "ka:value" at position "p" (0 <= p <= T::width).
119 // The midpoint element of the result is stored in "split_ikey".
121 // Let mid = ceil(T::width / 2). After the split, the key at
122 // post-insertion position mid is stored in split_ikey. *this contains keys
123 // [0,mid) and nr contains keys [mid+1,T::width+1).
124 // If p < mid, then x goes into *this, pre-insertion item mid-1 goes into
125 // split_ikey, and the first element of nr is pre-insertion item mid.
126 // If p == mid, then x goes into split_ikey and the first element of
127 // nr is pre-insertion item mid.
128 // If p > mid, then x goes into nr, pre-insertion item mid goes into
129 // split_ikey, and the first element of nr is post-insertion item mid+1.
130 masstree_precondition(!this->concurrent || (this->locked() && nr->locked()));
132 int mid = (split_type == 2 ? this->width : (this->width + 1) / 2);
133 nr->nkeys_ = this->width + 1 - (mid + 1);
136 nr->child_[0] = this->child_[mid];
137 nr->shift_from(0, this, mid, this->width - mid);
138 split_ikey = this->ikey0_[mid - 1];
139 } else if (p == mid) {
140 nr->child_[0] = value;
141 nr->shift_from(0, this, mid, this->width - mid);
144 nr->child_[0] = this->child_[mid + 1];
145 nr->shift_from(0, this, mid + 1, p - (mid + 1));
146 nr->assign(p - (mid + 1), ka, value);
147 nr->shift_from(p + 1 - (mid + 1), this, p, this->width - p);
148 split_ikey = this->ikey0_[mid];
151 for (int i = 0; i <= nr->nkeys_; ++i)
152 nr->child_[i]->set_parent(nr);
156 this->nkeys_ = mid - 1;
165 template <typename P>
166 bool tcursor<P>::make_split(threadinfo& ti)
168 // We reach here if we might need to split, either because the node is
169 // full, or because we're trying to insert into position 0 (which holds
170 // the ikey_bound). But in the latter case, perhaps we can rearrange the
171 // permutation to do an insert instead.
172 if (n_->size() < n_->width) {
173 permuter_type perm(n_->permutation_);
174 perm.exchange(perm.size(), n_->width - 1);
177 n_->permutation_ = perm.value();
179 n_->assign(kx_.p, ka_, ti);
185 node_type* child = leaf_type::make(n_->ksuf_used_capacity(), n_->node_ts_, ti);
186 child->assign_version(*n_);
188 int split_type = n_->split_into(static_cast<leaf_type *>(child),
189 kx_.i, ka_, xikey[0], ti);
193 masstree_invariant(!n->concurrent || (n->locked() && child->locked() && (n->isleaf() || n->splitting())));
194 internode_type *next_child = 0;
196 internode_type *p = n->locked_parent(ti);
198 if (!node_type::parent_exists(p)) {
199 internode_type *nn = internode_type::make(ti);
201 nn->assign(0, xikey[sense], child);
208 int kp = internode_type::bound_type::upper(xikey[sense], *p);
210 if (p->size() < p->width)
213 next_child = internode_type::make(ti);
214 next_child->assign_version(*p);
215 next_child->mark_nonroot();
216 kp = p->split_into(next_child, kp, xikey[sense],
217 child, xikey[!sense], split_type);
221 p->shift_up(kp + 1, kp, p->size() - kp);
222 p->assign(kp, xikey[sense], child);
229 leaf_type *nl = static_cast<leaf_type *>(n);
230 leaf_type *nr = static_cast<leaf_type *>(child);
231 permuter_type perml(nl->permutation_);
232 int width = perml.size();
233 perml.set_size(width - nr->size());
234 // removed item, if any, must be @ perml.size()
235 if (width != nl->width)
236 perml.exchange(perml.size(), nl->width - 1);
238 nl->permutation_ = perml.value();
239 if (split_type == 0) {
240 kx_.p = perml.back();
241 nl->assign(kx_.p, ka_, ti);
243 kx_.i = kx_.p = kx_.i - perml.size();
246 // versions/sizes shouldn't change after this
249 // we don't add n_ until lp.finish() is called (this avoids next_version_value() annoyances)
250 updated_v_ = nl->full_unlocked_version_value();
252 new_nodes_.emplace_back(nr, nr->full_unlocked_version_value());
273 } // namespace Masstree