benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / masstree_split.hh
1 /* Masstree
2  * Eddie Kohler, Yandong Mao, Robert Morris
3  * Copyright (c) 2012-2014 President and Fellows of Harvard College
4  * Copyright (c) 2012-2014 Massachusetts Institute of Technology
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, subject to the conditions
9  * listed in the Masstree LICENSE file. These conditions include: you must
10  * preserve this copyright notice, and you cannot mention the copyright
11  * holders in advertising related to the Software without their permission.
12  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13  * notice is a summary of the Masstree LICENSE file; the license in that file
14  * is legally binding.
15  */
16 #ifndef MASSTREE_SPLIT_HH
17 #define MASSTREE_SPLIT_HH 1
18 #include "masstree_tcursor.hh"
19 #include "btree_leaflink.hh"
20 namespace Masstree {
21
22 /** @brief Return ikey at position @a i, assuming insert of @a ka at @a ka_i. */
23 template <typename P>
24 inline typename P::ikey_type
25 leaf<P>::ikey_after_insert(const permuter_type& perm, int i,
26                            const key_type& ka, int ka_i) const
27 {
28     if (i < ka_i)
29         return this->ikey0_[perm[i]];
30     else if (i == ka_i)
31         return ka.ikey();
32     else
33         return this->ikey0_[perm[i - 1]];
34 }
35
36 /** @brief Split this node into *@a nr and insert @a ka at position @a p.
37     @pre *@a nr is a new empty leaf
38     @pre this->locked() && @a nr->locked()
39     @post split_ikey is the first key in *@a nr
40     @return split type
41
42     If @a p == this->size() and *this is the rightmost node in the layer,
43     then this code assumes we're inserting nodes in sequential order, and
44     the split does not move any keys.
45
46     The split type is 0 if @a ka went into *this, 1 if the @a ka went into
47     *@a nr, and 2 for the sequential-order optimization (@a ka went into *@a
48     nr and no other keys were moved). */
49 template <typename P>
50 int leaf<P>::split_into(leaf<P>* nr, int p, const key_type& ka,
51                         ikey_type& split_ikey, threadinfo& ti)
52 {
53     // B+tree leaf insertion.
54     // Split *this, with items [0,T::width), into *this + nr, simultaneously
55     // inserting "ka:value" at position "p" (0 <= p <= T::width).
56
57     // Let mid = floor(T::width / 2) + 1. After the split,
58     // "*this" contains [0,mid) and "nr" contains [mid,T::width+1).
59     // If p < mid, then x goes into *this, and the first element of nr
60     //   will be former item (mid - 1).
61     // If p >= mid, then x goes into nr.
62     masstree_precondition(!this->concurrent || (this->locked() && nr->locked()));
63     masstree_precondition(this->size() >= this->width - 1);
64
65     int width = this->size();   // == this->width or this->width - 1
66     int mid = this->width / 2 + 1;
67     if (p == 0 && !this->prev_)
68         mid = 1;
69     else if (p == width && !this->next_.ptr)
70         mid = width;
71
72     // Never separate keys with the same ikey0.
73     permuter_type perml(this->permutation_);
74     ikey_type mid_ikey = ikey_after_insert(perml, mid, ka, p);
75     if (mid_ikey == ikey_after_insert(perml, mid - 1, ka, p)) {
76         int midl = mid - 2, midr = mid + 1;
77         while (1) {
78             if (midr <= width
79                 && mid_ikey != ikey_after_insert(perml, midr, ka, p)) {
80                 mid = midr;
81                 break;
82             } else if (midl >= 0
83                        && mid_ikey != ikey_after_insert(perml, midl, ka, p)) {
84                 mid = midl + 1;
85                 break;
86             }
87             --midl, ++midr;
88         }
89         masstree_invariant(mid > 0 && mid <= width);
90     }
91
92     typename permuter_type::value_type pv = perml.value_from(mid - (p < mid));
93     for (int x = mid; x <= width; ++x)
94         if (x == p)
95             nr->assign_initialize(x - mid, ka, ti);
96         else {
97             nr->assign_initialize(x - mid, this, pv & 15, ti);
98             pv >>= 4;
99         }
100     permuter_type permr = permuter_type::make_sorted(width + 1 - mid);
101     if (p >= mid)
102         permr.remove_to_back(p - mid);
103     nr->permutation_ = permr.value();
104
105     btree_leaflink<leaf<P>, P::concurrent>::link_split(this, nr);
106
107     split_ikey = nr->ikey0_[0];
108     return p >= mid ? 1 + (mid == width) : 0;
109 }
110
111 template <typename P>
112 int internode<P>::split_into(internode<P> *nr, int p, ikey_type ka,
113                              node_base<P> *value, ikey_type& split_ikey,
114                              int split_type)
115 {
116     // B+tree internal node insertion.
117     // Split *this, with items [0,T::width), into *this + nr, simultaneously
118     // inserting "ka:value" at position "p" (0 <= p <= T::width).
119     // The midpoint element of the result is stored in "split_ikey".
120
121     // Let mid = ceil(T::width / 2). After the split, the key at
122     // post-insertion position mid is stored in split_ikey. *this contains keys
123     // [0,mid) and nr contains keys [mid+1,T::width+1).
124     // If p < mid, then x goes into *this, pre-insertion item mid-1 goes into
125     //   split_ikey, and the first element of nr is pre-insertion item mid.
126     // If p == mid, then x goes into split_ikey and the first element of
127     //   nr is pre-insertion item mid.
128     // If p > mid, then x goes into nr, pre-insertion item mid goes into
129     //   split_ikey, and the first element of nr is post-insertion item mid+1.
130     masstree_precondition(!this->concurrent || (this->locked() && nr->locked()));
131
132     int mid = (split_type == 2 ? this->width : (this->width + 1) / 2);
133     nr->nkeys_ = this->width + 1 - (mid + 1);
134
135     if (p < mid) {
136         nr->child_[0] = this->child_[mid];
137         nr->shift_from(0, this, mid, this->width - mid);
138         split_ikey = this->ikey0_[mid - 1];
139     } else if (p == mid) {
140         nr->child_[0] = value;
141         nr->shift_from(0, this, mid, this->width - mid);
142         split_ikey = ka;
143     } else {
144         nr->child_[0] = this->child_[mid + 1];
145         nr->shift_from(0, this, mid + 1, p - (mid + 1));
146         nr->assign(p - (mid + 1), ka, value);
147         nr->shift_from(p + 1 - (mid + 1), this, p, this->width - p);
148         split_ikey = this->ikey0_[mid];
149     }
150
151     for (int i = 0; i <= nr->nkeys_; ++i)
152         nr->child_[i]->set_parent(nr);
153
154     this->mark_split();
155     if (p < mid) {
156         this->nkeys_ = mid - 1;
157         return p;
158     } else {
159         this->nkeys_ = mid;
160         return -1;
161     }
162 }
163
164
165 template <typename P>
166 bool tcursor<P>::make_split(threadinfo& ti)
167 {
168     // We reach here if we might need to split, either because the node is
169     // full, or because we're trying to insert into position 0 (which holds
170     // the ikey_bound). But in the latter case, perhaps we can rearrange the
171     // permutation to do an insert instead.
172     if (n_->size() < n_->width) {
173         permuter_type perm(n_->permutation_);
174         perm.exchange(perm.size(), n_->width - 1);
175         kx_.p = perm.back();
176         if (kx_.p != 0) {
177             n_->permutation_ = perm.value();
178             fence();
179             n_->assign(kx_.p, ka_, ti);
180             return false;
181         }
182     }
183
184     node_type* n = n_;
185     node_type* child = leaf_type::make(n_->ksuf_used_capacity(), n_->node_ts_, ti);
186     child->assign_version(*n_);
187     ikey_type xikey[2];
188     int split_type = n_->split_into(static_cast<leaf_type *>(child),
189                                     kx_.i, ka_, xikey[0], ti);
190     bool sense = false;
191
192     while (1) {
193         masstree_invariant(!n->concurrent || (n->locked() && child->locked() && (n->isleaf() || n->splitting())));
194         internode_type *next_child = 0;
195
196         internode_type *p = n->locked_parent(ti);
197
198         if (!node_type::parent_exists(p)) {
199             internode_type *nn = internode_type::make(ti);
200             nn->child_[0] = n;
201             nn->assign(0, xikey[sense], child);
202             nn->nkeys_ = 1;
203             nn->parent_ = p;
204             nn->mark_root();
205             fence();
206             n->set_parent(nn);
207         } else {
208             int kp = internode_type::bound_type::upper(xikey[sense], *p);
209
210             if (p->size() < p->width)
211                 p->mark_insert();
212             else {
213                 next_child = internode_type::make(ti);
214                 next_child->assign_version(*p);
215                 next_child->mark_nonroot();
216                 kp = p->split_into(next_child, kp, xikey[sense],
217                                    child, xikey[!sense], split_type);
218             }
219
220             if (kp >= 0) {
221                 p->shift_up(kp + 1, kp, p->size() - kp);
222                 p->assign(kp, xikey[sense], child);
223                 fence();
224                 ++p->nkeys_;
225             }
226         }
227
228         if (n->isleaf()) {
229             leaf_type *nl = static_cast<leaf_type *>(n);
230             leaf_type *nr = static_cast<leaf_type *>(child);
231             permuter_type perml(nl->permutation_);
232             int width = perml.size();
233             perml.set_size(width - nr->size());
234             // removed item, if any, must be @ perml.size()
235             if (width != nl->width)
236                 perml.exchange(perml.size(), nl->width - 1);
237             nl->mark_split();
238             nl->permutation_ = perml.value();
239             if (split_type == 0) {
240                 kx_.p = perml.back();
241                 nl->assign(kx_.p, ka_, ti);
242             } else {
243                 kx_.i = kx_.p = kx_.i - perml.size();
244                 n_ = nr;
245             }
246             // versions/sizes shouldn't change after this
247             if (nl != n_) {
248                 assert(nr == n_);
249                 // we don't add n_ until lp.finish() is called (this avoids next_version_value() annoyances)
250                 updated_v_ = nl->full_unlocked_version_value();
251             } else
252                 new_nodes_.emplace_back(nr, nr->full_unlocked_version_value());
253         }
254
255         if (n != n_)
256             n->unlock();
257         if (child != n_)
258             child->unlock();
259         if (next_child) {
260             n = p;
261             child = next_child;
262             sense = !sense;
263         } else if (p) {
264             p->unlock();
265             break;
266         } else
267             break;
268     }
269
270     return false;
271 }
272
273 } // namespace Masstree
274 #endif