2 * Eddie Kohler, Yandong Mao, Robert Morris
3 * Copyright (c) 2012-2014 President and Fellows of Harvard College
4 * Copyright (c) 2012-2014 Massachusetts Institute of Technology
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, subject to the conditions
9 * listed in the Masstree LICENSE file. These conditions include: you must
10 * preserve this copyright notice, and you cannot mention the copyright
11 * holders in advertising related to the Software without their permission.
12 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13 * notice is a summary of the Masstree LICENSE file; the license in that file
16 #ifndef MASSTREE_REMOVE_HH
17 #define MASSTREE_REMOVE_HH
18 #include "masstree_get.hh"
19 #include "btree_leaflink.hh"
20 #include "circular_int.hh"
24 bool tcursor<P>::gc_layer(threadinfo& ti)
27 masstree_precondition(!n_->deleted() && !n_->deleted_layer());
29 // find_locked might return early if another gc_layer attempt has
30 // succeeded at removing multiple tree layers. So check that the whole
31 // key has been consumed
35 // find the slot for the child tree
36 // ka_ is a multiple of ikey_size bytes long. We are looking for the entry
37 // for the next tree layer, which has keylenx_ corresponding to ikey_size+1.
38 // So if has_value(), then we found an entry for the same ikey, but with
39 // length ikey_size; we need to adjust ki_.
41 if (kx_.i >= n_->size())
43 permuter_type perm(n_->permutation_);
45 if (n_->ikey0_[kx_.p] != ka_.ikey() || !n_->is_layer(kx_.p))
48 // remove redundant internode layers
51 layer = n_->lv_[kx_.p].layer();
52 if (layer->has_split())
53 n_->lv_[kx_.p] = layer = layer->unsplit_ancestor();
57 internode_type *in = static_cast<internode_type *>(layer);
58 if (in->size() > 0 && !in->has_split())
60 in->lock(*in, ti.lock_fence(tc_internode_lock));
61 if (in->has_split() && !in->has_parent())
63 if (in->size() > 0 || in->has_split()) {
68 node_type *child = in->child_[0];
69 child->set_parent(node_type::parent_for_layer_root(n_));
70 n_->lv_[kx_.p] = child;
72 in->set_parent(child); // ensure concurrent reader finds true root
73 // NB: now p->parent() might weirdly be a LEAF!
75 in->deallocate_rcu(ti);
78 // we are left with a leaf child
79 leaf_type *lf = static_cast<leaf_type *>(layer);
80 if (lf->size() > 0 && !lf->has_split())
82 lf->lock(*lf, ti.lock_fence(tc_leaf_lock));
83 if (lf->has_split() && !lf->has_parent())
85 if (lf->size() > 0 || lf->has_split()) {
90 // child is an empty leaf: kill it
91 masstree_invariant(!lf->prev_ && !lf->next_.ptr);
92 masstree_invariant(!lf->deleted());
93 masstree_invariant(!lf->deleted_layer());
94 if (circular_int<kvtimestamp_t>::less(n_->node_ts_, lf->node_ts_))
95 n_->node_ts_ = lf->node_ts_;
96 lf->mark_deleted_layer(); // NB DO NOT mark as deleted (see above)
98 lf->deallocate_rcu(ti);
102 template <typename P>
103 struct gc_layer_rcu_callback : public P::threadinfo_type::rcu_callback {
104 typedef typename P::threadinfo_type threadinfo;
108 gc_layer_rcu_callback(node_base<P>* root, Str prefix)
109 : root_(root), len_(prefix.length()) {
110 memcpy(s_, prefix.data(), len_);
112 void operator()(threadinfo& ti);
113 size_t size() const {
114 return len_ + sizeof(*this);
116 static void make(node_base<P>* root, Str prefix, threadinfo& ti);
119 template <typename P>
120 void gc_layer_rcu_callback<P>::operator()(threadinfo& ti)
122 root_ = root_->unsplit_ancestor();
123 if (!root_->deleted()) { // if not destroying tree...
124 tcursor<P> lp(root_, s_, len_);
125 bool do_remove = lp.gc_layer(ti);
126 if (!do_remove || !lp.finish_remove(ti))
128 ti.deallocate(this, size(), memtag_masstree_gc);
132 template <typename P>
133 void gc_layer_rcu_callback<P>::make(node_base<P>* root, Str prefix,
136 size_t sz = prefix.len + sizeof(gc_layer_rcu_callback<P>);
137 void *data = ti.allocate(sz, memtag_masstree_gc);
138 gc_layer_rcu_callback<P> *cb =
139 new(data) gc_layer_rcu_callback<P>(root, prefix);
143 template <typename P>
144 bool tcursor<P>::finish_remove(threadinfo& ti)
146 if (n_->modstate_ == leaf<P>::modstate_insert) {
148 n_->modstate_ = leaf<P>::modstate_remove;
151 permuter_type perm(n_->permutation_);
153 n_->permutation_ = perm.value();
157 return remove_leaf(n_, root_, ka_.prefix_string(), ti);
160 template <typename P>
161 bool tcursor<P>::remove_leaf(leaf_type* leaf, node_type* root,
162 Str prefix, threadinfo& ti)
165 if (!leaf->next_.ptr && !prefix.empty())
166 gc_layer_rcu_callback<P>::make(root, prefix, ti);
170 // mark leaf deleted, RCU-free
171 leaf->mark_deleted();
172 leaf->deallocate_rcu(ti);
174 // Ensure node that becomes responsible for our keys has its node_ts_ kept
177 leaf_type *prev = leaf->prev_;
178 kvtimestamp_t prev_ts = prev->node_ts_;
179 while (circular_int<kvtimestamp_t>::less(prev_ts, leaf->node_ts_)
180 && !bool_cmpxchg(&prev->node_ts_, prev_ts, leaf->node_ts_))
181 prev_ts = prev->node_ts_;
183 if (prev == leaf->prev_)
187 // Unlink leaf from doubly-linked leaf list
188 btree_leaflink<leaf_type>::unlink(leaf);
190 // Remove leaf from tree. This is simple unless the leaf is the first
191 // child of its parent, in which case we need to traverse up until we find
194 ikey_type ikey = leaf->ikey_bound(), reshape_ikey = 0;
195 bool reshaping = false;
198 internode_type *p = n->locked_parent(ti);
199 masstree_invariant(p);
202 int kp = internode_type::bound_type::upper(ikey, *p);
203 masstree_invariant(kp == 0 || p->compare_key(ikey, kp - 1) == 0);
208 p->shift_down(kp - 1, kp, p->nkeys_ - kp);
211 p->ikey0_[kp - 1] = reshape_ikey;
212 if (kp > 1 || p->child_[0]) {
214 collapse(p, ikey, root, prefix, ti);
222 if (p->size() == 0) {
224 p->deallocate_rcu(ti);
227 reshape_ikey = p->ikey0_[0];
238 template <typename P>
239 void tcursor<P>::collapse(internode_type* p, ikey_type ikey,
240 node_type* root, Str prefix, threadinfo& ti)
242 masstree_precondition(p && p->locked());
245 internode_type *gp = p->locked_parent(ti);
246 if (!internode_type::parent_exists(gp)) {
248 gc_layer_rcu_callback<P>::make(root, prefix, ti);
253 int kp = key_upper_bound(ikey, *gp);
254 masstree_invariant(gp->child_[kp] == p);
255 gp->child_[kp] = p->child_[0];
256 p->child_[0]->set_parent(gp);
260 p->deallocate_rcu(ti);
263 if (p->size() != 0) {
270 template <typename P>
271 struct destroy_rcu_callback : public P::threadinfo_type::rcu_callback {
272 typedef typename P::threadinfo_type threadinfo;
273 typedef typename node_base<P>::leaf_type leaf_type;
274 typedef typename node_base<P>::internode_type internode_type;
277 destroy_rcu_callback(node_base<P>* root)
278 : root_(root), count_(0) {
280 void operator()(threadinfo& ti);
281 static void make(node_base<P>* root, Str prefix, threadinfo& ti);
283 static inline node_base<P>** link_ptr(node_base<P>* n);
284 static inline void enqueue(node_base<P>* n, node_base<P>**& tailp);
287 template <typename P>
288 inline node_base<P>** destroy_rcu_callback<P>::link_ptr(node_base<P>* n) {
290 return &static_cast<leaf_type*>(n)->parent_;
292 return &static_cast<internode_type*>(n)->parent_;
295 template <typename P>
296 inline void destroy_rcu_callback<P>::enqueue(node_base<P>* n,
297 node_base<P>**& tailp) {
302 template <typename P>
303 void destroy_rcu_callback<P>::operator()(threadinfo& ti) {
305 root_ = root_->unsplit_ancestor();
307 root_->mark_deleted_tree(); // i.e., deleted but not splitting
309 ti.rcu_register(this);
314 node_base<P>** tailp = &workq;
315 enqueue(root_, tailp);
317 while (node_base<P>* n = workq) {
318 node_base<P>** linkp = link_ptr(n);
327 leaf_type* l = static_cast<leaf_type*>(n);
328 typename leaf_type::permuter_type perm = l->permutation();
329 for (int i = 0; i != l->size(); ++i) {
332 enqueue(l->lv_[p].layer(), tailp);
336 internode_type* in = static_cast<internode_type*>(n);
337 for (int i = 0; i != in->size() + 1; ++i)
339 enqueue(in->child_[i], tailp);
343 ti.deallocate(this, sizeof(this), memtag_masstree_gc);
346 template <typename P>
347 void basic_table<P>::destroy(threadinfo& ti) {
349 void* data = ti.allocate(sizeof(destroy_rcu_callback<P>), memtag_masstree_gc);
350 destroy_rcu_callback<P>* cb = new(data) destroy_rcu_callback<P>(root_);
356 } // namespace Masstree