2 * Eddie Kohler, Yandong Mao, Robert Morris
3 * Copyright (c) 2012-2013 President and Fellows of Harvard College
4 * Copyright (c) 2012-2013 Massachusetts Institute of Technology
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, subject to the conditions
9 * listed in the Masstree LICENSE file. These conditions include: you must
10 * preserve this copyright notice, and you cannot mention the copyright
11 * holders in advertising related to the Software without their permission.
12 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13 * notice is a summary of the Masstree LICENSE file; the license in that file
18 #include "kvthread.hh"
24 #if MASSTREE_ROW_TYPE_ARRAY
25 # include "value_array.hh"
26 typedef value_array row_type;
27 #elif MASSTREE_ROW_TYPE_ARRAY_VER
28 # include "value_versioned_array.hh"
29 typedef value_versioned_array row_type;
30 #elif MASSTREE_ROW_TYPE_STR
31 # include "value_string.hh"
32 typedef value_string row_type;
34 # include "value_bag.hh"
35 typedef value_bag<uint16_t> row_type;
40 inline const R* snapshot(const R* row, const std::vector<typename R::index_type>&, threadinfo&) {
45 template <typename R> class query_json_scanner;
50 typedef lcdf::Json Json;
53 void run_get(T& table, Json& req, threadinfo& ti);
55 bool run_get1(T& table, Str key, int col, Str& value, threadinfo& ti);
58 result_t run_put(T& table, Str key,
59 const Json* firstreq, const Json* lastreq, threadinfo& ti);
61 result_t run_replace(T& table, Str key, Str value, threadinfo& ti);
63 bool run_remove(T& table, Str key, threadinfo& ti);
66 void run_scan(T& table, Json& request, threadinfo& ti);
68 void run_rscan(T& table, Json& request, threadinfo& ti);
70 const loginfo::query_times& query_times() const {
75 std::vector<typename R::index_type> f_;
76 loginfo::query_times qtimes_;
77 query_helper<R> helper_;
78 lcdf::String scankey_;
81 void emit_fields(const R* value, Json& req, threadinfo& ti);
82 void emit_fields1(const R* value, Json& req, threadinfo& ti);
83 void assign_timestamp(threadinfo& ti);
84 void assign_timestamp(threadinfo& ti, kvtimestamp_t t);
85 inline bool apply_put(R*& value, bool found, const Json* firstreq,
86 const Json* lastreq, threadinfo& ti);
87 inline bool apply_replace(R*& value, bool found, Str new_value,
89 inline void apply_remove(R*& value, kvtimestamp_t& node_ts, threadinfo& ti);
91 template <typename RR> friend class query_json_scanner;
96 void query<R>::emit_fields(const R* value, Json& req, threadinfo& ti) {
97 const R* snapshot = helper_.snapshot(value, f_, ti);
99 for (int i = 0; i != snapshot->ncol(); ++i)
100 req.push_back(lcdf::String::make_stable(snapshot->col(i)));
102 for (int i = 0; i != (int) f_.size(); ++i)
103 req.push_back(lcdf::String::make_stable(snapshot->col(f_[i])));
107 template <typename R>
108 void query<R>::emit_fields1(const R* value, Json& req, threadinfo& ti) {
109 const R* snapshot = helper_.snapshot(value, f_, ti);
110 if ((f_.empty() && snapshot->ncol() == 1) || f_.size() == 1)
111 req = lcdf::String::make_stable(snapshot->col(f_.empty() ? 0 : f_[0]));
112 else if (f_.empty()) {
113 for (int i = 0; i != snapshot->ncol(); ++i)
114 req.push_back(lcdf::String::make_stable(snapshot->col(i)));
116 for (int i = 0; i != (int) f_.size(); ++i)
117 req.push_back(lcdf::String::make_stable(snapshot->col(f_[i])));
122 template <typename R> template <typename T>
123 void query<R>::run_get(T& table, Json& req, threadinfo& ti) {
124 typename T::unlocked_cursor_type lp(table, req[2].as_s());
125 bool found = lp.find_unlocked(ti);
126 if (found && row_is_marker(lp.value()))
130 for (int i = 3; i != req.size(); ++i)
131 f_.push_back(req[i].as_i());
133 emit_fields(lp.value(), req, ti);
137 template <typename R> template <typename T>
138 bool query<R>::run_get1(T& table, Str key, int col, Str& value, threadinfo& ti) {
139 typename T::unlocked_cursor_type lp(table, key);
140 bool found = lp.find_unlocked(ti);
141 if (found && row_is_marker(lp.value()))
144 value = lp.value()->col(col);
149 template <typename R>
150 inline void query<R>::assign_timestamp(threadinfo& ti) {
151 qtimes_.ts = ti.update_timestamp();
155 template <typename R>
156 inline void query<R>::assign_timestamp(threadinfo& ti, kvtimestamp_t min_ts) {
157 qtimes_.ts = ti.update_timestamp(min_ts);
158 qtimes_.prev_ts = min_ts;
162 template <typename R> template <typename T>
163 result_t query<R>::run_put(T& table, Str key,
164 const Json* firstreq, const Json* lastreq,
166 typename T::cursor_type lp(table, key);
167 bool found = lp.find_insert(ti);
169 ti.advance_timestamp(lp.node_timestamp());
170 bool inserted = apply_put(lp.value(), found, firstreq, lastreq, ti);
172 return inserted ? Inserted : Updated;
175 template <typename R>
176 inline bool query<R>::apply_put(R*& value, bool found, const Json* firstreq,
177 const Json* lastreq, threadinfo& ti) {
178 if (loginfo* log = ti.logger()) {
180 qtimes_.epoch = global_log_epoch;
185 assign_timestamp(ti);
186 value = R::create(firstreq, lastreq, qtimes_.ts, ti);
190 R* old_value = value;
191 assign_timestamp(ti, old_value->timestamp());
192 if (row_is_marker(old_value)) {
193 old_value->deallocate_rcu(ti);
197 R* updated = old_value->update(firstreq, lastreq, qtimes_.ts, ti);
198 if (updated != old_value) {
200 old_value->deallocate_rcu_after_update(firstreq, lastreq, ti);
205 template <typename R> template <typename T>
206 result_t query<R>::run_replace(T& table, Str key, Str value, threadinfo& ti) {
207 typename T::cursor_type lp(table, key);
208 bool found = lp.find_insert(ti);
210 ti.advance_timestamp(lp.node_timestamp());
211 bool inserted = apply_replace(lp.value(), found, value, ti);
213 return inserted ? Inserted : Updated;
216 template <typename R>
217 inline bool query<R>::apply_replace(R*& value, bool found, Str new_value,
219 if (loginfo* log = ti.logger()) {
221 qtimes_.epoch = global_log_epoch;
224 bool inserted = !found || row_is_marker(value);
226 assign_timestamp(ti);
228 assign_timestamp(ti, value->timestamp());
229 value->deallocate_rcu(ti);
232 value = R::create1(new_value, qtimes_.ts, ti);
236 template <typename R> template <typename T>
237 bool query<R>::run_remove(T& table, Str key, threadinfo& ti) {
238 typename T::cursor_type lp(table, key);
239 bool found = lp.find_locked(ti);
241 apply_remove(lp.value(), lp.node_timestamp(), ti);
246 template <typename R>
247 inline void query<R>::apply_remove(R*& value, kvtimestamp_t& node_ts,
249 if (loginfo* log = ti.logger()) {
251 qtimes_.epoch = global_log_epoch;
254 R* old_value = value;
255 assign_timestamp(ti, old_value->timestamp());
256 if (circular_int<kvtimestamp_t>::less_equal(node_ts, qtimes_.ts))
257 node_ts = qtimes_.ts + 2;
258 old_value->deallocate_rcu(ti);
262 template <typename R>
263 class query_json_scanner {
265 query_json_scanner(query<R> &q, lcdf::Json& request)
266 : q_(q), nleft_(request[3].as_i()), request_(request) {
267 std::swap(request[2].value().as_s(), firstkey_);
271 const lcdf::String& firstkey() const {
274 template <typename SS, typename K>
275 void visit_leaf(const SS&, const K&, threadinfo&) {
277 bool visit_value(Str key, R* value, threadinfo& ti) {
278 if (row_is_marker(value))
280 // NB the `key` is not stable! We must save space for it.
281 while (q_.scankeypos_ + key.length() > q_.scankey_.length()) {
282 q_.scankey_ = lcdf::String::make_uninitialized(q_.scankey_.length() ? q_.scankey_.length() * 2 : 1024);
285 memcpy(const_cast<char*>(q_.scankey_.data() + q_.scankeypos_),
286 key.data(), key.length());
287 request_.push_back(q_.scankey_.substr(q_.scankeypos_, key.length()));
288 q_.scankeypos_ += key.length();
289 request_.push_back(lcdf::Json());
290 q_.emit_fields1(value, request_.back(), ti);
297 lcdf::Json& request_;
298 lcdf::String firstkey_;
301 template <typename R> template <typename T>
302 void query<R>::run_scan(T& table, Json& request, threadinfo& ti) {
303 assert(request[3].as_i() > 0);
305 for (int i = 4; i != request.size(); ++i)
306 f_.push_back(request[i].as_i());
307 query_json_scanner<R> scanf(*this, request);
308 table.scan(scanf.firstkey(), true, scanf, ti);
311 template <typename R> template <typename T>
312 void query<R>::run_rscan(T& table, Json& request, threadinfo& ti) {
313 assert(request[3].as_i() > 0);
315 for (int i = 4; i != request.size(); ++i)
316 f_.push_back(request[i].as_i());
317 query_json_scanner<R> scanf(*this, request);
318 table.rscan(scanf.firstkey(), true, scanf, ti);