benchmark silo added
[c11concurrency-benchmarks.git] / silo / typed_txn_btree.h
diff --git a/silo/typed_txn_btree.h b/silo/typed_txn_btree.h
new file mode 100644 (file)
index 0000000..e3a62e2
--- /dev/null
@@ -0,0 +1,593 @@
+#ifndef _NDB_TYPED_TXN_BTREE_H_
+#define _NDB_TYPED_TXN_BTREE_H_
+
+#include "base_txn_btree.h"
+#include "txn_btree.h"
+#include "record/cursor.h"
+
+template <typename Schema>
+struct typed_txn_btree_ {
+
+  typedef typename Schema::base_type base_type;
+  typedef typename Schema::key_type key_type;
+  typedef typename Schema::value_type value_type;
+  typedef typename Schema::value_descriptor_type value_descriptor_type;
+  typedef typename Schema::key_encoder_type key_encoder_type;
+  typedef typename Schema::value_encoder_type value_encoder_type;
+
+  static_assert(value_descriptor_type::nfields() <= 64, "xx");
+  static const uint64_t AllFieldsMask = (1UL << value_descriptor_type::nfields()) - 1;
+
+  static inline constexpr bool
+  IsAllFields(uint64_t m)
+  {
+    return (m & AllFieldsMask) == AllFieldsMask;
+  }
+
+  class key_reader {
+  public:
+    constexpr key_reader(bool no_key_results) : no_key_results(no_key_results) {}
+    inline const key_type &
+    operator()(const std::string &s)
+    {
+      const typename Schema::key_encoder_type key_encoder;
+      if (!no_key_results)
+        key_encoder.read(s, &k);
+      return k;
+    }
+#if NDB_MASSTREE
+    inline const key_type &
+    operator()(lcdf::Str s)
+    {
+      const typename Schema::key_encoder_type key_encoder;
+      if (!no_key_results)
+        key_encoder.read(s, &k);
+      return k;
+    }
+#endif
+  private:
+    key_type k;
+    bool no_key_results;
+  };
+
+  static inline bool
+  do_record_read(const uint8_t *data, size_t sz, uint64_t fields_mask, value_type *v)
+  {
+    if (IsAllFields(fields_mask)) {
+      // read the entire record
+      const value_encoder_type value_encoder;
+      return value_encoder.failsafe_read(data, sz, v);
+    } else {
+      // pick individual fields
+      read_record_cursor<base_type> r(data, sz);
+      for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
+        if ((1UL << i) & fields_mask) {
+          r.skip_to(i);
+          if (unlikely(!r.read_current_and_advance(v)))
+            return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  class single_value_reader {
+  public:
+    typedef typename Schema::value_type value_type;
+
+    constexpr single_value_reader(value_type &v, uint64_t fields_mask)
+      : v(&v), fields_mask(fields_mask) {}
+
+    template <typename StringAllocator>
+    inline bool
+    operator()(const uint8_t *data, size_t sz, StringAllocator &sa)
+    {
+      return do_record_read(data, sz, fields_mask, v);
+    }
+
+    inline value_type &
+    results()
+    {
+      return *v;
+    }
+
+    inline const value_type &
+    results() const
+    {
+      return *v;
+    }
+
+    template <typename StringAllocator>
+    inline void
+    dup(const value_type &vdup, StringAllocator &sa)
+    {
+      *v = vdup;
+    }
+
+  private:
+    value_type *v;
+    uint64_t fields_mask;
+  };
+
+  class value_reader {
+  public:
+    typedef typename Schema::value_type value_type;
+
+    constexpr value_reader(uint64_t fields_mask) : fields_mask(fields_mask) {}
+
+    template <typename StringAllocator>
+    inline bool
+    operator()(const uint8_t *data, size_t sz, StringAllocator &sa)
+    {
+      return do_record_read(data, sz, fields_mask, &v);
+    }
+
+    inline value_type &
+    results()
+    {
+      return v;
+    }
+
+    inline const value_type &
+    results() const
+    {
+      return v;
+    }
+
+    template <typename StringAllocator>
+    inline void
+    dup(const value_type &vdup, StringAllocator &sa)
+    {
+      v = vdup;
+    }
+
+  private:
+    value_type v;
+    uint64_t fields_mask;
+  };
+
+  class key_writer {
+  public:
+    constexpr key_writer(const key_type *k) : k(k) {}
+
+    template <typename StringAllocator>
+    inline const std::string *
+    fully_materialize(bool stable_input, StringAllocator &sa)
+    {
+      if (!k)
+        return nullptr;
+      std::string * const ret = sa();
+      const key_encoder_type key_encoder;
+      key_encoder.write(*ret, k);
+      return ret;
+    }
+  private:
+    const key_type *k;
+  };
+
+  static inline size_t
+  compute_needed_standalone(
+    const value_type *v, uint64_t fields,
+    const uint8_t *buf, size_t sz)
+  {
+    if (fields == 0) {
+      // delete
+      INVARIANT(!v);
+      return 0;
+    }
+    INVARIANT(v);
+    if (sz == 0) {
+      // new record (insert)
+      INVARIANT(IsAllFields(fields));
+      const value_encoder_type value_encoder;
+      return value_encoder.nbytes(v);
+    }
+
+    ssize_t new_updates_sum = 0;
+    for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
+      if ((1UL << i) & fields) {
+        const uint8_t * px = reinterpret_cast<const uint8_t *>(v) +
+          value_descriptor_type::cstruct_offsetof(i);
+        new_updates_sum += value_descriptor_type::nbytes_fn(i)(px);
+      }
+    }
+
+    // XXX: should try to cache pointers discovered by read_record_cursor
+    ssize_t old_updates_sum = 0;
+    read_record_cursor<base_type> rc(buf, sz);
+    for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
+      if ((1UL << i) & fields) {
+        rc.skip_to(i);
+        const size_t sz = rc.read_current_raw_size_and_advance();
+        INVARIANT(sz);
+        old_updates_sum += sz;
+      }
+    }
+
+    // XXX: see if approximate version works almost as well (approx version is
+    // to assume that each field has the minimum possible size, which is
+    // overly conservative but correct)
+
+    const ssize_t ret = static_cast<ssize_t>(sz) - old_updates_sum + new_updates_sum;
+    INVARIANT(ret > 0);
+    return ret;
+  }
+
+  // how many bytes do we need to encode a delta record
+  static inline size_t
+  compute_needed_delta_standalone(
+      const value_type *v, uint64_t fields)
+  {
+    size_t size_needed = 0;
+    size_needed += sizeof(uint64_t);
+    if (fields == 0) {
+      // delete
+      INVARIANT(!v);
+      return size_needed;
+    }
+    INVARIANT(v);
+    if (IsAllFields(fields)) {
+      // new record (insert)
+      const value_encoder_type value_encoder;
+      size_needed += value_encoder.nbytes(v);
+      return size_needed;
+    }
+
+    for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
+      if ((1UL << i) & fields) {
+        const uint8_t * px = reinterpret_cast<const uint8_t *>(v) +
+          value_descriptor_type::cstruct_offsetof(i);
+        size_needed += value_descriptor_type::nbytes_fn(i)(px);
+      }
+    }
+
+    return size_needed;
+  }
+
+  static inline void
+  do_write_standalone(
+      const value_type *v, uint64_t fields,
+      uint8_t *buf, size_t sz)
+  {
+    if (fields == 0) {
+      // no-op for delete
+      INVARIANT(!v);
+      return;
+    }
+    if (IsAllFields(fields)) {
+      // special case, just use the standard encoder (faster)
+      // because it's straight-line w/ no branching
+      const value_encoder_type value_encoder;
+      value_encoder.write(buf, v);
+      return;
+    }
+    write_record_cursor<base_type> wc(buf);
+    for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
+      if ((1UL << i) & fields) {
+        wc.skip_to(i);
+        wc.write_current_and_advance(v, nullptr);
+      }
+    }
+  }
+
+  static inline void
+  do_delta_write_standalone(
+      const value_type *v, uint64_t fields,
+      uint8_t *buf, size_t sz)
+  {
+    serializer<uint64_t, false> s_uint64_t;
+
+#ifdef CHECK_INVARIANTS
+    const uint8_t * const orig_buf = buf;
+#endif
+
+    buf = s_uint64_t.write(buf, fields);
+    if (fields == 0) {
+      // no-op for delete
+      INVARIANT(!v);
+      return;
+    }
+    if (IsAllFields(fields)) {
+      // special case, just use the standard encoder (faster)
+      // because it's straight-line w/ no branching
+      const value_encoder_type value_encoder;
+      value_encoder.write(buf, v);
+      return;
+    }
+    for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
+      if ((1UL << i) & fields) {
+        const uint8_t * px = reinterpret_cast<const uint8_t *>(v) +
+          value_descriptor_type::cstruct_offsetof(i);
+        buf = value_descriptor_type::write_fn(i)(buf, px);
+      }
+    }
+
+    INVARIANT(buf - orig_buf == ptrdiff_t(sz));
+  }
+
+  template <uint64_t Fields>
+  static inline size_t
+  tuple_writer(dbtuple::TupleWriterMode mode, const void *v, uint8_t *p, size_t sz)
+  {
+    const value_type *vx = reinterpret_cast<const value_type *>(v);
+    switch (mode) {
+    case dbtuple::TUPLE_WRITER_NEEDS_OLD_VALUE:
+      return 1;
+    case dbtuple::TUPLE_WRITER_COMPUTE_NEEDED:
+      return compute_needed_standalone(vx, Fields, p, sz);
+    case dbtuple::TUPLE_WRITER_COMPUTE_DELTA_NEEDED:
+      return compute_needed_delta_standalone(vx, Fields);
+    case dbtuple::TUPLE_WRITER_DO_WRITE:
+      do_write_standalone(vx, Fields, p, sz);
+      return 0;
+    case dbtuple::TUPLE_WRITER_DO_DELTA_WRITE:
+      do_delta_write_standalone(vx, Fields, p, sz);
+      return 0;
+    }
+    ALWAYS_ASSERT(false);
+    return 0;
+  }
+
+  class value_writer {
+  public:
+    constexpr value_writer(const value_type *v, uint64_t fields)
+      : v(v), fields(fields) {}
+
+    // old version of record is stored at
+    // [buf, buf+sz).
+    //
+    // compute the new required size for the update
+    inline size_t
+    compute_needed(const uint8_t *buf, size_t sz)
+    {
+      return compute_needed_standalone(v, fields, buf, sz);
+    }
+
+    template <typename StringAllocator>
+    inline const std::string *
+    fully_materialize(bool stable_input, StringAllocator &sa)
+    {
+      INVARIANT(IsAllFields(fields) || fields == 0);
+      if (fields == 0) {
+        // delete
+        INVARIANT(!v);
+        return nullptr;
+      }
+      std::string * const ret = sa();
+      const value_encoder_type value_encoder;
+      value_encoder.write(*ret, v);
+      return ret;
+    }
+
+    // the old value lives in [buf, buf+sz), but [buf, buf+compute_needed())
+    // is valid memory to write to
+    inline void
+    operator()(uint8_t *buf, size_t sz)
+    {
+      do_write_standalone(v, fields, buf, sz);
+    }
+
+  private:
+    const value_type *v;
+    uint64_t fields;
+  };
+
+  typedef key_type Key;
+  typedef key_writer KeyWriter;
+  typedef value_type Value;
+  typedef value_writer ValueWriter;
+  typedef uint64_t ValueInfo;
+
+  //typedef key_reader KeyReader;
+  //typedef single_value_reader SingleValueReader;
+  //typedef value_reader ValueReader;
+
+};
+
+template <template <typename> class Transaction, typename Schema>
+class typed_txn_btree : public base_txn_btree<Transaction, typed_txn_btree_<Schema>> {
+  typedef base_txn_btree<Transaction, typed_txn_btree_<Schema>> super_type;
+public:
+
+  typedef typename super_type::string_type string_type;
+  typedef typename super_type::size_type size_type;
+
+  typedef typename Schema::base_type base_type;
+  typedef typename Schema::key_type key_type;
+  typedef typename Schema::value_type value_type;
+  typedef typename Schema::value_descriptor_type value_descriptor_type;
+  typedef typename Schema::key_encoder_type key_encoder_type;
+  typedef typename Schema::value_encoder_type value_encoder_type;
+
+private:
+
+  typedef txn_btree_::key_reader bytes_key_reader;
+  typedef txn_btree_::single_value_reader bytes_single_value_reader;
+  typedef txn_btree_::value_reader bytes_value_reader;
+
+  typedef
+    typename typed_txn_btree_<Schema>::key_writer
+    key_writer;
+
+  typedef
+    typename typed_txn_btree_<Schema>::key_reader
+    key_reader;
+  typedef
+    typename typed_txn_btree_<Schema>::single_value_reader
+    single_value_reader;
+  typedef
+    typename typed_txn_btree_<Schema>::value_reader
+    value_reader;
+
+  template <typename Traits>
+  static constexpr inline bool
+  IsSupportable()
+  {
+    return Traits::stable_input_memory ||
+      (private_::is_trivially_copyable<key_type>::value &&
+       private_::is_trivially_destructible<key_type>::value &&
+       private_::is_trivially_copyable<value_type>::value &&
+       private_::is_trivially_destructible<value_type>::value);
+  }
+
+public:
+
+  static const uint64_t AllFieldsMask = typed_txn_btree_<Schema>::AllFieldsMask;
+  typedef util::Fields<AllFieldsMask> AllFields;
+
+  struct search_range_callback {
+  public:
+    virtual ~search_range_callback() {}
+    virtual bool invoke(const key_type &k, const value_type &v) = 0;
+  };
+
+  struct bytes_search_range_callback {
+  public:
+    virtual ~bytes_search_range_callback() {}
+    virtual bool invoke(const string_type &k, const string_type &v) = 0;
+  };
+
+  typed_txn_btree(size_type value_size_hint = 128,
+                  bool mostly_append = false,
+                  const std::string &name = "<unknown>")
+    : super_type(value_size_hint, mostly_append, name)
+  {}
+
+  template <typename Traits, typename FieldsMask = AllFields>
+  inline bool search(
+      Transaction<Traits> &t, const key_type &k, value_type &v,
+      FieldsMask fm = FieldsMask());
+
+  template <typename Traits, typename FieldsMask = AllFields>
+  inline void search_range_call(
+      Transaction<Traits> &t, const key_type &lower, const key_type *upper,
+      search_range_callback &callback,
+      bool no_key_results = false /* skip decoding of keys? */,
+      FieldsMask fm = FieldsMask());
+
+  // a lower-level variant which does not bother to decode the key/values
+  template <typename Traits>
+  inline void bytes_search_range_call(
+      Transaction<Traits> &t, const key_type &lower, const key_type *upper,
+      bytes_search_range_callback &callback,
+      size_type value_fields_prefix = std::numeric_limits<size_type>::max());
+
+  template <typename Traits, typename FieldsMask = AllFields>
+  inline void put(
+      Transaction<Traits> &t, const key_type &k, const value_type &v,
+      FieldsMask fm = FieldsMask());
+
+  template <typename Traits>
+  inline void insert(
+      Transaction<Traits> &t, const key_type &k, const value_type &v);
+
+  template <typename Traits>
+  inline void remove(
+      Transaction<Traits> &t, const key_type &k);
+
+private:
+
+  template <typename Traits>
+  static inline const std::string *
+  stablize(Transaction<Traits> &t, const key_type &k)
+  {
+    key_writer writer(&k);
+    return writer.fully_materialize(
+        Traits::stable_input_memory, t.string_allocator());
+  }
+
+  template <typename Traits>
+  static inline const value_type *
+  stablize(Transaction<Traits> &t, const value_type &v)
+  {
+    if (Traits::stable_input_memory)
+      return &v;
+    std::string * const px = t.string_allocator()();
+    px->assign(reinterpret_cast<const char *>(&v), sizeof(v));
+    return reinterpret_cast<const value_type *>(px->data());
+  }
+
+  key_encoder_type key_encoder;
+  value_encoder_type value_encoder;
+};
+
+template <template <typename> class Transaction, typename Schema>
+template <typename Traits, typename FieldsMask>
+bool
+typed_txn_btree<Transaction, Schema>::search(
+    Transaction<Traits> &t, const key_type &k, value_type &v,
+    FieldsMask fm)
+{
+  // XXX: template single_value_reader with mask
+  single_value_reader vr(v, FieldsMask::value);
+  return this->do_search(t, k, vr);
+}
+
+template <template <typename> class Transaction, typename Schema>
+template <typename Traits, typename FieldsMask>
+void
+typed_txn_btree<Transaction, Schema>::search_range_call(
+    Transaction<Traits> &t,
+    const key_type &lower, const key_type *upper,
+    search_range_callback &callback,
+    bool no_key_results,
+    FieldsMask fm)
+{
+  key_reader kr(no_key_results);
+  value_reader vr(FieldsMask::value);
+  this->do_search_range_call(t, lower, upper, callback, kr, vr);
+}
+
+template <template <typename> class Transaction, typename Schema>
+template <typename Traits>
+void
+typed_txn_btree<Transaction, Schema>::bytes_search_range_call(
+    Transaction<Traits> &t, const key_type &lower, const key_type *upper,
+    bytes_search_range_callback &callback,
+    size_type value_fields_prefix)
+{
+  const value_encoder_type value_encoder;
+  const size_t max_bytes_read =
+    value_encoder.encode_max_nbytes_prefix(value_fields_prefix);
+  bytes_key_reader kr;
+  bytes_value_reader vr(max_bytes_read);
+  this->do_search_range_call(t, lower, upper, callback, kr, vr);
+}
+
+template <template <typename> class Transaction, typename Schema>
+template <typename Traits, typename FieldsMask>
+void
+typed_txn_btree<Transaction, Schema>::put(
+    Transaction<Traits> &t, const key_type &k, const value_type &v, FieldsMask fm)
+{
+  static_assert(IsSupportable<Traits>(), "xx");
+  const dbtuple::tuple_writer_t tw =
+    &typed_txn_btree_<Schema>::template tuple_writer<FieldsMask::value>;
+  this->do_tree_put(t, stablize(t, k), stablize(t, v), tw, false);
+}
+
+template <template <typename> class Transaction, typename Schema>
+template <typename Traits>
+void
+typed_txn_btree<Transaction, Schema>::insert(
+    Transaction<Traits> &t, const key_type &k, const value_type &v)
+{
+  static_assert(IsSupportable<Traits>(), "xx");
+  const dbtuple::tuple_writer_t tw =
+    &typed_txn_btree_<Schema>::template tuple_writer<AllFieldsMask>;
+  this->do_tree_put(t, stablize(t, k), stablize(t, v), tw, true);
+}
+
+template <template <typename> class Transaction, typename Schema>
+template <typename Traits>
+void
+typed_txn_btree<Transaction, Schema>::remove(
+    Transaction<Traits> &t, const key_type &k)
+{
+  static_assert(IsSupportable<Traits>(), "xx");
+  const dbtuple::tuple_writer_t tw =
+    &typed_txn_btree_<Schema>::template tuple_writer<0>;
+  this->do_tree_put(t, stablize(t, k), nullptr, tw, false);
+}
+
+#endif /* _NDB_TYPED_TXN_BTREE_H_ */