benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / kvrow.hh
1 /* Masstree
2  * Eddie Kohler, Yandong Mao, Robert Morris
3  * Copyright (c) 2012-2013 President and Fellows of Harvard College
4  * Copyright (c) 2012-2013 Massachusetts Institute of Technology
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, subject to the conditions
9  * listed in the Masstree LICENSE file. These conditions include: you must
10  * preserve this copyright notice, and you cannot mention the copyright
11  * holders in advertising related to the Software without their permission.
12  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13  * notice is a summary of the Masstree LICENSE file; the license in that file
14  * is legally binding.
15  */
16 #ifndef KVROW_HH
17 #define KVROW_HH 1
18 #include "kvthread.hh"
19 #include "kvproto.hh"
20 #include "log.hh"
21 #include "json.hh"
22 #include <algorithm>
23
24 #if MASSTREE_ROW_TYPE_ARRAY
25 # include "value_array.hh"
26 typedef value_array row_type;
27 #elif MASSTREE_ROW_TYPE_ARRAY_VER
28 # include "value_versioned_array.hh"
29 typedef value_versioned_array row_type;
30 #elif MASSTREE_ROW_TYPE_STR
31 # include "value_string.hh"
32 typedef value_string row_type;
33 #else
34 # include "value_bag.hh"
35 typedef value_bag<uint16_t> row_type;
36 #endif
37
38 template <typename R>
39 struct query_helper {
40     inline const R* snapshot(const R* row, const std::vector<typename R::index_type>&, threadinfo&) {
41         return row;
42     }
43 };
44
45 template <typename R> class query_json_scanner;
46
47 template <typename R>
48 class query {
49   public:
50     typedef lcdf::Json Json;
51
52     template <typename T>
53     void run_get(T& table, Json& req, threadinfo& ti);
54     template <typename T>
55     bool run_get1(T& table, Str key, int col, Str& value, threadinfo& ti);
56
57     template <typename T>
58     result_t run_put(T& table, Str key,
59                      const Json* firstreq, const Json* lastreq, threadinfo& ti);
60     template <typename T>
61     result_t run_replace(T& table, Str key, Str value, threadinfo& ti);
62     template <typename T>
63     bool run_remove(T& table, Str key, threadinfo& ti);
64
65     template <typename T>
66     void run_scan(T& table, Json& request, threadinfo& ti);
67     template <typename T>
68     void run_rscan(T& table, Json& request, threadinfo& ti);
69
70     const loginfo::query_times& query_times() const {
71         return qtimes_;
72     }
73
74   private:
75     std::vector<typename R::index_type> f_;
76     loginfo::query_times qtimes_;
77     query_helper<R> helper_;
78     lcdf::String scankey_;
79     int scankeypos_;
80
81     void emit_fields(const R* value, Json& req, threadinfo& ti);
82     void emit_fields1(const R* value, Json& req, threadinfo& ti);
83     void assign_timestamp(threadinfo& ti);
84     void assign_timestamp(threadinfo& ti, kvtimestamp_t t);
85     inline bool apply_put(R*& value, bool found, const Json* firstreq,
86                           const Json* lastreq, threadinfo& ti);
87     inline bool apply_replace(R*& value, bool found, Str new_value,
88                               threadinfo& ti);
89     inline void apply_remove(R*& value, kvtimestamp_t& node_ts, threadinfo& ti);
90
91     template <typename RR> friend class query_json_scanner;
92 };
93
94
95 template <typename R>
96 void query<R>::emit_fields(const R* value, Json& req, threadinfo& ti) {
97     const R* snapshot = helper_.snapshot(value, f_, ti);
98     if (f_.empty()) {
99         for (int i = 0; i != snapshot->ncol(); ++i)
100             req.push_back(lcdf::String::make_stable(snapshot->col(i)));
101     } else {
102         for (int i = 0; i != (int) f_.size(); ++i)
103             req.push_back(lcdf::String::make_stable(snapshot->col(f_[i])));
104     }
105 }
106
107 template <typename R>
108 void query<R>::emit_fields1(const R* value, Json& req, threadinfo& ti) {
109     const R* snapshot = helper_.snapshot(value, f_, ti);
110     if ((f_.empty() && snapshot->ncol() == 1) || f_.size() == 1)
111         req = lcdf::String::make_stable(snapshot->col(f_.empty() ? 0 : f_[0]));
112     else if (f_.empty()) {
113         for (int i = 0; i != snapshot->ncol(); ++i)
114             req.push_back(lcdf::String::make_stable(snapshot->col(i)));
115     } else {
116         for (int i = 0; i != (int) f_.size(); ++i)
117             req.push_back(lcdf::String::make_stable(snapshot->col(f_[i])));
118     }
119 }
120
121
122 template <typename R> template <typename T>
123 void query<R>::run_get(T& table, Json& req, threadinfo& ti) {
124     typename T::unlocked_cursor_type lp(table, req[2].as_s());
125     bool found = lp.find_unlocked(ti);
126     if (found && row_is_marker(lp.value()))
127         found = false;
128     if (found) {
129         f_.clear();
130         for (int i = 3; i != req.size(); ++i)
131             f_.push_back(req[i].as_i());
132         req.resize(2);
133         emit_fields(lp.value(), req, ti);
134     }
135 }
136
137 template <typename R> template <typename T>
138 bool query<R>::run_get1(T& table, Str key, int col, Str& value, threadinfo& ti) {
139     typename T::unlocked_cursor_type lp(table, key);
140     bool found = lp.find_unlocked(ti);
141     if (found && row_is_marker(lp.value()))
142         found = false;
143     if (found)
144         value = lp.value()->col(col);
145     return found;
146 }
147
148
149 template <typename R>
150 inline void query<R>::assign_timestamp(threadinfo& ti) {
151     qtimes_.ts = ti.update_timestamp();
152     qtimes_.prev_ts = 0;
153 }
154
155 template <typename R>
156 inline void query<R>::assign_timestamp(threadinfo& ti, kvtimestamp_t min_ts) {
157     qtimes_.ts = ti.update_timestamp(min_ts);
158     qtimes_.prev_ts = min_ts;
159 }
160
161
162 template <typename R> template <typename T>
163 result_t query<R>::run_put(T& table, Str key,
164                            const Json* firstreq, const Json* lastreq,
165                            threadinfo& ti) {
166     typename T::cursor_type lp(table, key);
167     bool found = lp.find_insert(ti);
168     if (!found)
169         ti.advance_timestamp(lp.node_timestamp());
170     bool inserted = apply_put(lp.value(), found, firstreq, lastreq, ti);
171     lp.finish(1, ti);
172     return inserted ? Inserted : Updated;
173 }
174
175 template <typename R>
176 inline bool query<R>::apply_put(R*& value, bool found, const Json* firstreq,
177                                 const Json* lastreq, threadinfo& ti) {
178     if (loginfo* log = ti.logger()) {
179         log->acquire();
180         qtimes_.epoch = global_log_epoch;
181     }
182
183     if (!found) {
184     insert:
185         assign_timestamp(ti);
186         value = R::create(firstreq, lastreq, qtimes_.ts, ti);
187         return true;
188     }
189
190     R* old_value = value;
191     assign_timestamp(ti, old_value->timestamp());
192     if (row_is_marker(old_value)) {
193         old_value->deallocate_rcu(ti);
194         goto insert;
195     }
196
197     R* updated = old_value->update(firstreq, lastreq, qtimes_.ts, ti);
198     if (updated != old_value) {
199         value = updated;
200         old_value->deallocate_rcu_after_update(firstreq, lastreq, ti);
201     }
202     return false;
203 }
204
205 template <typename R> template <typename T>
206 result_t query<R>::run_replace(T& table, Str key, Str value, threadinfo& ti) {
207     typename T::cursor_type lp(table, key);
208     bool found = lp.find_insert(ti);
209     if (!found)
210         ti.advance_timestamp(lp.node_timestamp());
211     bool inserted = apply_replace(lp.value(), found, value, ti);
212     lp.finish(1, ti);
213     return inserted ? Inserted : Updated;
214 }
215
216 template <typename R>
217 inline bool query<R>::apply_replace(R*& value, bool found, Str new_value,
218                                     threadinfo& ti) {
219     if (loginfo* log = ti.logger()) {
220         log->acquire();
221         qtimes_.epoch = global_log_epoch;
222     }
223
224     bool inserted = !found || row_is_marker(value);
225     if (!found)
226         assign_timestamp(ti);
227     else {
228         assign_timestamp(ti, value->timestamp());
229         value->deallocate_rcu(ti);
230     }
231
232     value = R::create1(new_value, qtimes_.ts, ti);
233     return inserted;
234 }
235
236 template <typename R> template <typename T>
237 bool query<R>::run_remove(T& table, Str key, threadinfo& ti) {
238     typename T::cursor_type lp(table, key);
239     bool found = lp.find_locked(ti);
240     if (found)
241         apply_remove(lp.value(), lp.node_timestamp(), ti);
242     lp.finish(-1, ti);
243     return found;
244 }
245
246 template <typename R>
247 inline void query<R>::apply_remove(R*& value, kvtimestamp_t& node_ts,
248                                    threadinfo& ti) {
249     if (loginfo* log = ti.logger()) {
250         log->acquire();
251         qtimes_.epoch = global_log_epoch;
252     }
253
254     R* old_value = value;
255     assign_timestamp(ti, old_value->timestamp());
256     if (circular_int<kvtimestamp_t>::less_equal(node_ts, qtimes_.ts))
257         node_ts = qtimes_.ts + 2;
258     old_value->deallocate_rcu(ti);
259 }
260
261
262 template <typename R>
263 class query_json_scanner {
264   public:
265     query_json_scanner(query<R> &q, lcdf::Json& request)
266         : q_(q), nleft_(request[3].as_i()), request_(request) {
267         std::swap(request[2].value().as_s(), firstkey_);
268         request_.resize(2);
269         q_.scankeypos_ = 0;
270     }
271     const lcdf::String& firstkey() const {
272         return firstkey_;
273     }
274     template <typename SS, typename K>
275     void visit_leaf(const SS&, const K&, threadinfo&) {
276     }
277     bool visit_value(Str key, R* value, threadinfo& ti) {
278         if (row_is_marker(value))
279             return true;
280         // NB the `key` is not stable! We must save space for it.
281         while (q_.scankeypos_ + key.length() > q_.scankey_.length()) {
282             q_.scankey_ = lcdf::String::make_uninitialized(q_.scankey_.length() ? q_.scankey_.length() * 2 : 1024);
283             q_.scankeypos_ = 0;
284         }
285         memcpy(const_cast<char*>(q_.scankey_.data() + q_.scankeypos_),
286                key.data(), key.length());
287         request_.push_back(q_.scankey_.substr(q_.scankeypos_, key.length()));
288         q_.scankeypos_ += key.length();
289         request_.push_back(lcdf::Json());
290         q_.emit_fields1(value, request_.back(), ti);
291         --nleft_;
292         return nleft_ != 0;
293     }
294   private:
295     query<R> &q_;
296     int nleft_;
297     lcdf::Json& request_;
298     lcdf::String firstkey_;
299 };
300
301 template <typename R> template <typename T>
302 void query<R>::run_scan(T& table, Json& request, threadinfo& ti) {
303     assert(request[3].as_i() > 0);
304     f_.clear();
305     for (int i = 4; i != request.size(); ++i)
306         f_.push_back(request[i].as_i());
307     query_json_scanner<R> scanf(*this, request);
308     table.scan(scanf.firstkey(), true, scanf, ti);
309 }
310
311 template <typename R> template <typename T>
312 void query<R>::run_rscan(T& table, Json& request, threadinfo& ti) {
313     assert(request[3].as_i() > 0);
314     f_.clear();
315     for (int i = 4; i != request.size(); ++i)
316         f_.push_back(request[i].as_i());
317     query_json_scanner<R> scanf(*this, request);
318     table.rscan(scanf.firstkey(), true, scanf, ti);
319 }
320
321 #endif