Merge
[c11concurrency-benchmarks.git] / silo / masstree / masstree_remove.hh
1 /* Masstree
2  * Eddie Kohler, Yandong Mao, Robert Morris
3  * Copyright (c) 2012-2014 President and Fellows of Harvard College
4  * Copyright (c) 2012-2014 Massachusetts Institute of Technology
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, subject to the conditions
9  * listed in the Masstree LICENSE file. These conditions include: you must
10  * preserve this copyright notice, and you cannot mention the copyright
11  * holders in advertising related to the Software without their permission.
12  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13  * notice is a summary of the Masstree LICENSE file; the license in that file
14  * is legally binding.
15  */
16 #ifndef MASSTREE_REMOVE_HH
17 #define MASSTREE_REMOVE_HH
18 #include "masstree_get.hh"
19 #include "btree_leaflink.hh"
20 #include "circular_int.hh"
21 namespace Masstree {
22
23 template <typename P>
24 bool tcursor<P>::gc_layer(threadinfo& ti)
25 {
26     find_locked(ti);
27     masstree_precondition(!n_->deleted() && !n_->deleted_layer());
28
29     // find_locked might return early if another gc_layer attempt has
30     // succeeded at removing multiple tree layers. So check that the whole
31     // key has been consumed
32     if (ka_.has_suffix())
33         return false;
34
35     // find the slot for the child tree
36     // ka_ is a multiple of ikey_size bytes long. We are looking for the entry
37     // for the next tree layer, which has keylenx_ corresponding to ikey_size+1.
38     // So if has_value(), then we found an entry for the same ikey, but with
39     // length ikey_size; we need to adjust ki_.
40     kx_.i += has_value();
41     if (kx_.i >= n_->size())
42         return false;
43     permuter_type perm(n_->permutation_);
44     kx_.p = perm[kx_.i];
45     if (n_->ikey0_[kx_.p] != ka_.ikey() || !n_->is_layer(kx_.p))
46         return false;
47
48     // remove redundant internode layers
49     node_type *layer;
50     while (1) {
51         layer = n_->lv_[kx_.p].layer();
52         if (layer->has_split())
53             n_->lv_[kx_.p] = layer = layer->unsplit_ancestor();
54         if (layer->isleaf())
55             break;
56
57         internode_type *in = static_cast<internode_type *>(layer);
58         if (in->size() > 0 && !in->has_split())
59             return false;
60         in->lock(*in, ti.lock_fence(tc_internode_lock));
61         if (in->has_split() && !in->has_parent())
62             in->mark_root();
63         if (in->size() > 0 || in->has_split()) {
64             in->unlock();
65             return false;
66         }
67
68         node_type *child = in->child_[0];
69         child->set_parent(node_type::parent_for_layer_root(n_));
70         n_->lv_[kx_.p] = child;
71         in->mark_split();
72         in->set_parent(child);  // ensure concurrent reader finds true root
73                                 // NB: now p->parent() might weirdly be a LEAF!
74         in->unlock();
75         in->deallocate_rcu(ti);
76     }
77
78     // we are left with a leaf child
79     leaf_type *lf = static_cast<leaf_type *>(layer);
80     if (lf->size() > 0 && !lf->has_split())
81         return false;
82     lf->lock(*lf, ti.lock_fence(tc_leaf_lock));
83     if (lf->has_split() && !lf->has_parent())
84         lf->mark_root();
85     if (lf->size() > 0 || lf->has_split()) {
86         lf->unlock();
87         return false;
88     }
89
90     // child is an empty leaf: kill it
91     masstree_invariant(!lf->prev_ && !lf->next_.ptr);
92     masstree_invariant(!lf->deleted());
93     masstree_invariant(!lf->deleted_layer());
94     if (circular_int<kvtimestamp_t>::less(n_->node_ts_, lf->node_ts_))
95         n_->node_ts_ = lf->node_ts_;
96     lf->mark_deleted_layer();   // NB DO NOT mark as deleted (see above)
97     lf->unlock();
98     lf->deallocate_rcu(ti);
99     return true;
100 }
101
102 template <typename P>
103 struct gc_layer_rcu_callback : public P::threadinfo_type::rcu_callback {
104     typedef typename P::threadinfo_type threadinfo;
105     node_base<P>* root_;
106     int len_;
107     char s_[0];
108     gc_layer_rcu_callback(node_base<P>* root, Str prefix)
109         : root_(root), len_(prefix.length()) {
110         memcpy(s_, prefix.data(), len_);
111     }
112     void operator()(threadinfo& ti);
113     size_t size() const {
114         return len_ + sizeof(*this);
115     }
116     static void make(node_base<P>* root, Str prefix, threadinfo& ti);
117 };
118
119 template <typename P>
120 void gc_layer_rcu_callback<P>::operator()(threadinfo& ti)
121 {
122     root_ = root_->unsplit_ancestor();
123     if (!root_->deleted()) {    // if not destroying tree...
124         tcursor<P> lp(root_, s_, len_);
125         bool do_remove = lp.gc_layer(ti);
126         if (!do_remove || !lp.finish_remove(ti))
127             lp.n_->unlock();
128         ti.deallocate(this, size(), memtag_masstree_gc);
129     }
130 }
131
132 template <typename P>
133 void gc_layer_rcu_callback<P>::make(node_base<P>* root, Str prefix,
134                                     threadinfo& ti)
135 {
136     size_t sz = prefix.len + sizeof(gc_layer_rcu_callback<P>);
137     void *data = ti.allocate(sz, memtag_masstree_gc);
138     gc_layer_rcu_callback<P> *cb =
139         new(data) gc_layer_rcu_callback<P>(root, prefix);
140     ti.rcu_register(cb);
141 }
142
143 template <typename P>
144 bool tcursor<P>::finish_remove(threadinfo& ti)
145 {
146     if (n_->modstate_ == leaf<P>::modstate_insert) {
147         n_->mark_insert();
148         n_->modstate_ = leaf<P>::modstate_remove;
149     }
150
151     permuter_type perm(n_->permutation_);
152     perm.remove(kx_.i);
153     n_->permutation_ = perm.value();
154     if (perm.size())
155         return false;
156     else
157         return remove_leaf(n_, root_, ka_.prefix_string(), ti);
158 }
159
160 template <typename P>
161 bool tcursor<P>::remove_leaf(leaf_type* leaf, node_type* root,
162                              Str prefix, threadinfo& ti)
163 {
164     if (!leaf->prev_) {
165         if (!leaf->next_.ptr && !prefix.empty())
166             gc_layer_rcu_callback<P>::make(root, prefix, ti);
167         return false;
168     }
169
170     // mark leaf deleted, RCU-free
171     leaf->mark_deleted();
172     leaf->deallocate_rcu(ti);
173
174     // Ensure node that becomes responsible for our keys has its node_ts_ kept
175     // up to date
176     while (1) {
177         leaf_type *prev = leaf->prev_;
178         kvtimestamp_t prev_ts = prev->node_ts_;
179         while (circular_int<kvtimestamp_t>::less(prev_ts, leaf->node_ts_)
180                && !bool_cmpxchg(&prev->node_ts_, prev_ts, leaf->node_ts_))
181             prev_ts = prev->node_ts_;
182         fence();
183         if (prev == leaf->prev_)
184             break;
185     }
186
187     // Unlink leaf from doubly-linked leaf list
188     btree_leaflink<leaf_type>::unlink(leaf);
189
190     // Remove leaf from tree. This is simple unless the leaf is the first
191     // child of its parent, in which case we need to traverse up until we find
192     // its key.
193     node_type *n = leaf;
194     ikey_type ikey = leaf->ikey_bound(), reshape_ikey = 0;
195     bool reshaping = false;
196
197     while (1) {
198         internode_type *p = n->locked_parent(ti);
199         masstree_invariant(p);
200         n->unlock();
201
202         int kp = internode_type::bound_type::upper(ikey, *p);
203         masstree_invariant(kp == 0 || p->compare_key(ikey, kp - 1) == 0);
204
205         if (kp > 0) {
206             p->mark_insert();
207             if (!reshaping) {
208                 p->shift_down(kp - 1, kp, p->nkeys_ - kp);
209                 --p->nkeys_;
210             } else
211                 p->ikey0_[kp - 1] = reshape_ikey;
212             if (kp > 1 || p->child_[0]) {
213                 if (p->size() == 0)
214                     collapse(p, ikey, root, prefix, ti);
215                 else
216                     p->unlock();
217                 break;
218             }
219         }
220
221         if (!reshaping) {
222             if (p->size() == 0) {
223                 p->mark_deleted();
224                 p->deallocate_rcu(ti);
225             } else {
226                 reshaping = true;
227                 reshape_ikey = p->ikey0_[0];
228                 p->child_[0] = 0;
229             }
230         }
231
232         n = p;
233     }
234
235     return true;
236 }
237
238 template <typename P>
239 void tcursor<P>::collapse(internode_type* p, ikey_type ikey,
240                           node_type* root, Str prefix, threadinfo& ti)
241 {
242     masstree_precondition(p && p->locked());
243
244     while (1) {
245         internode_type *gp = p->locked_parent(ti);
246         if (!internode_type::parent_exists(gp)) {
247             if (!prefix.empty())
248                 gc_layer_rcu_callback<P>::make(root, prefix, ti);
249             p->unlock();
250             break;
251         }
252
253         int kp = key_upper_bound(ikey, *gp);
254         masstree_invariant(gp->child_[kp] == p);
255         gp->child_[kp] = p->child_[0];
256         p->child_[0]->set_parent(gp);
257
258         p->mark_deleted();
259         p->unlock();
260         p->deallocate_rcu(ti);
261
262         p = gp;
263         if (p->size() != 0) {
264             p->unlock();
265             break;
266         }
267     }
268 }
269
270 template <typename P>
271 struct destroy_rcu_callback : public P::threadinfo_type::rcu_callback {
272     typedef typename P::threadinfo_type threadinfo;
273     typedef typename node_base<P>::leaf_type leaf_type;
274     typedef typename node_base<P>::internode_type internode_type;
275     node_base<P>* root_;
276     int count_;
277     destroy_rcu_callback(node_base<P>* root)
278         : root_(root), count_(0) {
279     }
280     void operator()(threadinfo& ti);
281     static void make(node_base<P>* root, Str prefix, threadinfo& ti);
282   private:
283     static inline node_base<P>** link_ptr(node_base<P>* n);
284     static inline void enqueue(node_base<P>* n, node_base<P>**& tailp);
285 };
286
287 template <typename P>
288 inline node_base<P>** destroy_rcu_callback<P>::link_ptr(node_base<P>* n) {
289     if (n->isleaf())
290         return &static_cast<leaf_type*>(n)->parent_;
291     else
292         return &static_cast<internode_type*>(n)->parent_;
293 }
294
295 template <typename P>
296 inline void destroy_rcu_callback<P>::enqueue(node_base<P>* n,
297                                              node_base<P>**& tailp) {
298     *tailp = n;
299     tailp = link_ptr(n);
300 }
301
302 template <typename P>
303 void destroy_rcu_callback<P>::operator()(threadinfo& ti) {
304     if (++count_ == 1) {
305         root_ = root_->unsplit_ancestor();
306         root_->lock();
307         root_->mark_deleted_tree(); // i.e., deleted but not splitting
308         root_->unlock();
309         ti.rcu_register(this);
310         return;
311     }
312
313     node_base<P>* workq;
314     node_base<P>** tailp = &workq;
315     enqueue(root_, tailp);
316
317     while (node_base<P>* n = workq) {
318         node_base<P>** linkp = link_ptr(n);
319         if (linkp != tailp)
320             workq = *linkp;
321         else {
322             workq = 0;
323             tailp = &workq;
324         }
325
326         if (n->isleaf()) {
327             leaf_type* l = static_cast<leaf_type*>(n);
328             typename leaf_type::permuter_type perm = l->permutation();
329             for (int i = 0; i != l->size(); ++i) {
330                 int p = perm[i];
331                 if (l->is_layer(p))
332                     enqueue(l->lv_[p].layer(), tailp);
333             }
334             l->deallocate(ti);
335         } else {
336             internode_type* in = static_cast<internode_type*>(n);
337             for (int i = 0; i != in->size() + 1; ++i)
338                 if (in->child_[i])
339                     enqueue(in->child_[i], tailp);
340             in->deallocate(ti);
341         }
342     }
343     ti.deallocate(this, sizeof(this), memtag_masstree_gc);
344 }
345
346 template <typename P>
347 void basic_table<P>::destroy(threadinfo& ti) {
348     if (root_) {
349         void* data = ti.allocate(sizeof(destroy_rcu_callback<P>), memtag_masstree_gc);
350         destroy_rcu_callback<P>* cb = new(data) destroy_rcu_callback<P>(root_);
351         ti.rcu_register(cb);
352         root_ = 0;
353     }
354 }
355
356 } // namespace Masstree
357 #endif