benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / kvrow.hh
diff --git a/silo/masstree/kvrow.hh b/silo/masstree/kvrow.hh
new file mode 100644 (file)
index 0000000..81e71d8
--- /dev/null
@@ -0,0 +1,321 @@
+/* Masstree
+ * Eddie Kohler, Yandong Mao, Robert Morris
+ * Copyright (c) 2012-2013 President and Fellows of Harvard College
+ * Copyright (c) 2012-2013 Massachusetts Institute of Technology
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, subject to the conditions
+ * listed in the Masstree LICENSE file. These conditions include: you must
+ * preserve this copyright notice, and you cannot mention the copyright
+ * holders in advertising related to the Software without their permission.
+ * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
+ * notice is a summary of the Masstree LICENSE file; the license in that file
+ * is legally binding.
+ */
+#ifndef KVROW_HH
+#define KVROW_HH 1
+#include "kvthread.hh"
+#include "kvproto.hh"
+#include "log.hh"
+#include "json.hh"
+#include <algorithm>
+
+#if MASSTREE_ROW_TYPE_ARRAY
+# include "value_array.hh"
+typedef value_array row_type;
+#elif MASSTREE_ROW_TYPE_ARRAY_VER
+# include "value_versioned_array.hh"
+typedef value_versioned_array row_type;
+#elif MASSTREE_ROW_TYPE_STR
+# include "value_string.hh"
+typedef value_string row_type;
+#else
+# include "value_bag.hh"
+typedef value_bag<uint16_t> row_type;
+#endif
+
+template <typename R>
+struct query_helper {
+    inline const R* snapshot(const R* row, const std::vector<typename R::index_type>&, threadinfo&) {
+        return row;
+    }
+};
+
+template <typename R> class query_json_scanner;
+
+template <typename R>
+class query {
+  public:
+    typedef lcdf::Json Json;
+
+    template <typename T>
+    void run_get(T& table, Json& req, threadinfo& ti);
+    template <typename T>
+    bool run_get1(T& table, Str key, int col, Str& value, threadinfo& ti);
+
+    template <typename T>
+    result_t run_put(T& table, Str key,
+                     const Json* firstreq, const Json* lastreq, threadinfo& ti);
+    template <typename T>
+    result_t run_replace(T& table, Str key, Str value, threadinfo& ti);
+    template <typename T>
+    bool run_remove(T& table, Str key, threadinfo& ti);
+
+    template <typename T>
+    void run_scan(T& table, Json& request, threadinfo& ti);
+    template <typename T>
+    void run_rscan(T& table, Json& request, threadinfo& ti);
+
+    const loginfo::query_times& query_times() const {
+        return qtimes_;
+    }
+
+  private:
+    std::vector<typename R::index_type> f_;
+    loginfo::query_times qtimes_;
+    query_helper<R> helper_;
+    lcdf::String scankey_;
+    int scankeypos_;
+
+    void emit_fields(const R* value, Json& req, threadinfo& ti);
+    void emit_fields1(const R* value, Json& req, threadinfo& ti);
+    void assign_timestamp(threadinfo& ti);
+    void assign_timestamp(threadinfo& ti, kvtimestamp_t t);
+    inline bool apply_put(R*& value, bool found, const Json* firstreq,
+                          const Json* lastreq, threadinfo& ti);
+    inline bool apply_replace(R*& value, bool found, Str new_value,
+                              threadinfo& ti);
+    inline void apply_remove(R*& value, kvtimestamp_t& node_ts, threadinfo& ti);
+
+    template <typename RR> friend class query_json_scanner;
+};
+
+
+template <typename R>
+void query<R>::emit_fields(const R* value, Json& req, threadinfo& ti) {
+    const R* snapshot = helper_.snapshot(value, f_, ti);
+    if (f_.empty()) {
+        for (int i = 0; i != snapshot->ncol(); ++i)
+            req.push_back(lcdf::String::make_stable(snapshot->col(i)));
+    } else {
+        for (int i = 0; i != (int) f_.size(); ++i)
+            req.push_back(lcdf::String::make_stable(snapshot->col(f_[i])));
+    }
+}
+
+template <typename R>
+void query<R>::emit_fields1(const R* value, Json& req, threadinfo& ti) {
+    const R* snapshot = helper_.snapshot(value, f_, ti);
+    if ((f_.empty() && snapshot->ncol() == 1) || f_.size() == 1)
+        req = lcdf::String::make_stable(snapshot->col(f_.empty() ? 0 : f_[0]));
+    else if (f_.empty()) {
+        for (int i = 0; i != snapshot->ncol(); ++i)
+            req.push_back(lcdf::String::make_stable(snapshot->col(i)));
+    } else {
+        for (int i = 0; i != (int) f_.size(); ++i)
+            req.push_back(lcdf::String::make_stable(snapshot->col(f_[i])));
+    }
+}
+
+
+template <typename R> template <typename T>
+void query<R>::run_get(T& table, Json& req, threadinfo& ti) {
+    typename T::unlocked_cursor_type lp(table, req[2].as_s());
+    bool found = lp.find_unlocked(ti);
+    if (found && row_is_marker(lp.value()))
+        found = false;
+    if (found) {
+        f_.clear();
+        for (int i = 3; i != req.size(); ++i)
+            f_.push_back(req[i].as_i());
+        req.resize(2);
+        emit_fields(lp.value(), req, ti);
+    }
+}
+
+template <typename R> template <typename T>
+bool query<R>::run_get1(T& table, Str key, int col, Str& value, threadinfo& ti) {
+    typename T::unlocked_cursor_type lp(table, key);
+    bool found = lp.find_unlocked(ti);
+    if (found && row_is_marker(lp.value()))
+        found = false;
+    if (found)
+        value = lp.value()->col(col);
+    return found;
+}
+
+
+template <typename R>
+inline void query<R>::assign_timestamp(threadinfo& ti) {
+    qtimes_.ts = ti.update_timestamp();
+    qtimes_.prev_ts = 0;
+}
+
+template <typename R>
+inline void query<R>::assign_timestamp(threadinfo& ti, kvtimestamp_t min_ts) {
+    qtimes_.ts = ti.update_timestamp(min_ts);
+    qtimes_.prev_ts = min_ts;
+}
+
+
+template <typename R> template <typename T>
+result_t query<R>::run_put(T& table, Str key,
+                           const Json* firstreq, const Json* lastreq,
+                           threadinfo& ti) {
+    typename T::cursor_type lp(table, key);
+    bool found = lp.find_insert(ti);
+    if (!found)
+        ti.advance_timestamp(lp.node_timestamp());
+    bool inserted = apply_put(lp.value(), found, firstreq, lastreq, ti);
+    lp.finish(1, ti);
+    return inserted ? Inserted : Updated;
+}
+
+template <typename R>
+inline bool query<R>::apply_put(R*& value, bool found, const Json* firstreq,
+                                const Json* lastreq, threadinfo& ti) {
+    if (loginfo* log = ti.logger()) {
+       log->acquire();
+       qtimes_.epoch = global_log_epoch;
+    }
+
+    if (!found) {
+    insert:
+       assign_timestamp(ti);
+        value = R::create(firstreq, lastreq, qtimes_.ts, ti);
+        return true;
+    }
+
+    R* old_value = value;
+    assign_timestamp(ti, old_value->timestamp());
+    if (row_is_marker(old_value)) {
+       old_value->deallocate_rcu(ti);
+       goto insert;
+    }
+
+    R* updated = old_value->update(firstreq, lastreq, qtimes_.ts, ti);
+    if (updated != old_value) {
+       value = updated;
+       old_value->deallocate_rcu_after_update(firstreq, lastreq, ti);
+    }
+    return false;
+}
+
+template <typename R> template <typename T>
+result_t query<R>::run_replace(T& table, Str key, Str value, threadinfo& ti) {
+    typename T::cursor_type lp(table, key);
+    bool found = lp.find_insert(ti);
+    if (!found)
+        ti.advance_timestamp(lp.node_timestamp());
+    bool inserted = apply_replace(lp.value(), found, value, ti);
+    lp.finish(1, ti);
+    return inserted ? Inserted : Updated;
+}
+
+template <typename R>
+inline bool query<R>::apply_replace(R*& value, bool found, Str new_value,
+                                    threadinfo& ti) {
+    if (loginfo* log = ti.logger()) {
+       log->acquire();
+       qtimes_.epoch = global_log_epoch;
+    }
+
+    bool inserted = !found || row_is_marker(value);
+    if (!found)
+       assign_timestamp(ti);
+    else {
+        assign_timestamp(ti, value->timestamp());
+        value->deallocate_rcu(ti);
+    }
+
+    value = R::create1(new_value, qtimes_.ts, ti);
+    return inserted;
+}
+
+template <typename R> template <typename T>
+bool query<R>::run_remove(T& table, Str key, threadinfo& ti) {
+    typename T::cursor_type lp(table, key);
+    bool found = lp.find_locked(ti);
+    if (found)
+        apply_remove(lp.value(), lp.node_timestamp(), ti);
+    lp.finish(-1, ti);
+    return found;
+}
+
+template <typename R>
+inline void query<R>::apply_remove(R*& value, kvtimestamp_t& node_ts,
+                                   threadinfo& ti) {
+    if (loginfo* log = ti.logger()) {
+       log->acquire();
+       qtimes_.epoch = global_log_epoch;
+    }
+
+    R* old_value = value;
+    assign_timestamp(ti, old_value->timestamp());
+    if (circular_int<kvtimestamp_t>::less_equal(node_ts, qtimes_.ts))
+       node_ts = qtimes_.ts + 2;
+    old_value->deallocate_rcu(ti);
+}
+
+
+template <typename R>
+class query_json_scanner {
+  public:
+    query_json_scanner(query<R> &q, lcdf::Json& request)
+       : q_(q), nleft_(request[3].as_i()), request_(request) {
+        std::swap(request[2].value().as_s(), firstkey_);
+        request_.resize(2);
+        q_.scankeypos_ = 0;
+    }
+    const lcdf::String& firstkey() const {
+        return firstkey_;
+    }
+    template <typename SS, typename K>
+    void visit_leaf(const SS&, const K&, threadinfo&) {
+    }
+    bool visit_value(Str key, R* value, threadinfo& ti) {
+        if (row_is_marker(value))
+            return true;
+        // NB the `key` is not stable! We must save space for it.
+        while (q_.scankeypos_ + key.length() > q_.scankey_.length()) {
+            q_.scankey_ = lcdf::String::make_uninitialized(q_.scankey_.length() ? q_.scankey_.length() * 2 : 1024);
+            q_.scankeypos_ = 0;
+        }
+        memcpy(const_cast<char*>(q_.scankey_.data() + q_.scankeypos_),
+               key.data(), key.length());
+        request_.push_back(q_.scankey_.substr(q_.scankeypos_, key.length()));
+        q_.scankeypos_ += key.length();
+        request_.push_back(lcdf::Json());
+        q_.emit_fields1(value, request_.back(), ti);
+        --nleft_;
+        return nleft_ != 0;
+    }
+  private:
+    query<R> &q_;
+    int nleft_;
+    lcdf::Json& request_;
+    lcdf::String firstkey_;
+};
+
+template <typename R> template <typename T>
+void query<R>::run_scan(T& table, Json& request, threadinfo& ti) {
+    assert(request[3].as_i() > 0);
+    f_.clear();
+    for (int i = 4; i != request.size(); ++i)
+        f_.push_back(request[i].as_i());
+    query_json_scanner<R> scanf(*this, request);
+    table.scan(scanf.firstkey(), true, scanf, ti);
+}
+
+template <typename R> template <typename T>
+void query<R>::run_rscan(T& table, Json& request, threadinfo& ti) {
+    assert(request[3].as_i() > 0);
+    f_.clear();
+    for (int i = 4; i != request.size(); ++i)
+        f_.push_back(request[i].as_i());
+    query_json_scanner<R> scanf(*this, request);
+    table.rscan(scanf.firstkey(), true, scanf, ti);
+}
+
+#endif