Merge branch 'master' of /home/git/concurrency-benchmarks
[c11concurrency-benchmarks.git] / silo / typed_txn_btree.h
1 #ifndef _NDB_TYPED_TXN_BTREE_H_
2 #define _NDB_TYPED_TXN_BTREE_H_
3
4 #include "base_txn_btree.h"
5 #include "txn_btree.h"
6 #include "record/cursor.h"
7
8 template <typename Schema>
9 struct typed_txn_btree_ {
10
11   typedef typename Schema::base_type base_type;
12   typedef typename Schema::key_type key_type;
13   typedef typename Schema::value_type value_type;
14   typedef typename Schema::value_descriptor_type value_descriptor_type;
15   typedef typename Schema::key_encoder_type key_encoder_type;
16   typedef typename Schema::value_encoder_type value_encoder_type;
17
18   static_assert(value_descriptor_type::nfields() <= 64, "xx");
19   static const uint64_t AllFieldsMask = (1UL << value_descriptor_type::nfields()) - 1;
20
21   static inline constexpr bool
22   IsAllFields(uint64_t m)
23   {
24     return (m & AllFieldsMask) == AllFieldsMask;
25   }
26
27   class key_reader {
28   public:
29     constexpr key_reader(bool no_key_results) : no_key_results(no_key_results) {}
30     inline const key_type &
31     operator()(const std::string &s)
32     {
33       const typename Schema::key_encoder_type key_encoder;
34       if (!no_key_results)
35         key_encoder.read(s, &k);
36       return k;
37     }
38 #if NDB_MASSTREE
39     inline const key_type &
40     operator()(lcdf::Str s)
41     {
42       const typename Schema::key_encoder_type key_encoder;
43       if (!no_key_results)
44         key_encoder.read(s, &k);
45       return k;
46     }
47 #endif
48   private:
49     key_type k;
50     bool no_key_results;
51   };
52
53   static inline bool
54   do_record_read(const uint8_t *data, size_t sz, uint64_t fields_mask, value_type *v)
55   {
56     if (IsAllFields(fields_mask)) {
57       // read the entire record
58       const value_encoder_type value_encoder;
59       return value_encoder.failsafe_read(data, sz, v);
60     } else {
61       // pick individual fields
62       read_record_cursor<base_type> r(data, sz);
63       for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
64         if ((1UL << i) & fields_mask) {
65           r.skip_to(i);
66           if (unlikely(!r.read_current_and_advance(v)))
67             return false;
68         }
69       }
70       return true;
71     }
72   }
73
74   class single_value_reader {
75   public:
76     typedef typename Schema::value_type value_type;
77
78     constexpr single_value_reader(value_type &v, uint64_t fields_mask)
79       : v(&v), fields_mask(fields_mask) {}
80
81     template <typename StringAllocator>
82     inline bool
83     operator()(const uint8_t *data, size_t sz, StringAllocator &sa)
84     {
85       return do_record_read(data, sz, fields_mask, v);
86     }
87
88     inline value_type &
89     results()
90     {
91       return *v;
92     }
93
94     inline const value_type &
95     results() const
96     {
97       return *v;
98     }
99
100     template <typename StringAllocator>
101     inline void
102     dup(const value_type &vdup, StringAllocator &sa)
103     {
104       *v = vdup;
105     }
106
107   private:
108     value_type *v;
109     uint64_t fields_mask;
110   };
111
112   class value_reader {
113   public:
114     typedef typename Schema::value_type value_type;
115
116     constexpr value_reader(uint64_t fields_mask) : fields_mask(fields_mask) {}
117
118     template <typename StringAllocator>
119     inline bool
120     operator()(const uint8_t *data, size_t sz, StringAllocator &sa)
121     {
122       return do_record_read(data, sz, fields_mask, &v);
123     }
124
125     inline value_type &
126     results()
127     {
128       return v;
129     }
130
131     inline const value_type &
132     results() const
133     {
134       return v;
135     }
136
137     template <typename StringAllocator>
138     inline void
139     dup(const value_type &vdup, StringAllocator &sa)
140     {
141       v = vdup;
142     }
143
144   private:
145     value_type v;
146     uint64_t fields_mask;
147   };
148
149   class key_writer {
150   public:
151     constexpr key_writer(const key_type *k) : k(k) {}
152
153     template <typename StringAllocator>
154     inline const std::string *
155     fully_materialize(bool stable_input, StringAllocator &sa)
156     {
157       if (!k)
158         return nullptr;
159       std::string * const ret = sa();
160       const key_encoder_type key_encoder;
161       key_encoder.write(*ret, k);
162       return ret;
163     }
164   private:
165     const key_type *k;
166   };
167
168   static inline size_t
169   compute_needed_standalone(
170     const value_type *v, uint64_t fields,
171     const uint8_t *buf, size_t sz)
172   {
173     if (fields == 0) {
174       // delete
175       INVARIANT(!v);
176       return 0;
177     }
178     INVARIANT(v);
179     if (sz == 0) {
180       // new record (insert)
181       INVARIANT(IsAllFields(fields));
182       const value_encoder_type value_encoder;
183       return value_encoder.nbytes(v);
184     }
185
186     ssize_t new_updates_sum = 0;
187     for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
188       if ((1UL << i) & fields) {
189         const uint8_t * px = reinterpret_cast<const uint8_t *>(v) +
190           value_descriptor_type::cstruct_offsetof(i);
191         new_updates_sum += value_descriptor_type::nbytes_fn(i)(px);
192       }
193     }
194
195     // XXX: should try to cache pointers discovered by read_record_cursor
196     ssize_t old_updates_sum = 0;
197     read_record_cursor<base_type> rc(buf, sz);
198     for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
199       if ((1UL << i) & fields) {
200         rc.skip_to(i);
201         const size_t sz = rc.read_current_raw_size_and_advance();
202         INVARIANT(sz);
203         old_updates_sum += sz;
204       }
205     }
206
207     // XXX: see if approximate version works almost as well (approx version is
208     // to assume that each field has the minimum possible size, which is
209     // overly conservative but correct)
210
211     const ssize_t ret = static_cast<ssize_t>(sz) - old_updates_sum + new_updates_sum;
212     INVARIANT(ret > 0);
213     return ret;
214   }
215
216   // how many bytes do we need to encode a delta record
217   static inline size_t
218   compute_needed_delta_standalone(
219       const value_type *v, uint64_t fields)
220   {
221     size_t size_needed = 0;
222     size_needed += sizeof(uint64_t);
223     if (fields == 0) {
224       // delete
225       INVARIANT(!v);
226       return size_needed;
227     }
228     INVARIANT(v);
229     if (IsAllFields(fields)) {
230       // new record (insert)
231       const value_encoder_type value_encoder;
232       size_needed += value_encoder.nbytes(v);
233       return size_needed;
234     }
235
236     for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
237       if ((1UL << i) & fields) {
238         const uint8_t * px = reinterpret_cast<const uint8_t *>(v) +
239           value_descriptor_type::cstruct_offsetof(i);
240         size_needed += value_descriptor_type::nbytes_fn(i)(px);
241       }
242     }
243
244     return size_needed;
245   }
246
247   static inline void
248   do_write_standalone(
249       const value_type *v, uint64_t fields,
250       uint8_t *buf, size_t sz)
251   {
252     if (fields == 0) {
253       // no-op for delete
254       INVARIANT(!v);
255       return;
256     }
257     if (IsAllFields(fields)) {
258       // special case, just use the standard encoder (faster)
259       // because it's straight-line w/ no branching
260       const value_encoder_type value_encoder;
261       value_encoder.write(buf, v);
262       return;
263     }
264     write_record_cursor<base_type> wc(buf);
265     for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
266       if ((1UL << i) & fields) {
267         wc.skip_to(i);
268         wc.write_current_and_advance(v, nullptr);
269       }
270     }
271   }
272
273   static inline void
274   do_delta_write_standalone(
275       const value_type *v, uint64_t fields,
276       uint8_t *buf, size_t sz)
277   {
278     serializer<uint64_t, false> s_uint64_t;
279
280 #ifdef CHECK_INVARIANTS
281     const uint8_t * const orig_buf = buf;
282 #endif
283
284     buf = s_uint64_t.write(buf, fields);
285     if (fields == 0) {
286       // no-op for delete
287       INVARIANT(!v);
288       return;
289     }
290     if (IsAllFields(fields)) {
291       // special case, just use the standard encoder (faster)
292       // because it's straight-line w/ no branching
293       const value_encoder_type value_encoder;
294       value_encoder.write(buf, v);
295       return;
296     }
297     for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
298       if ((1UL << i) & fields) {
299         const uint8_t * px = reinterpret_cast<const uint8_t *>(v) +
300           value_descriptor_type::cstruct_offsetof(i);
301         buf = value_descriptor_type::write_fn(i)(buf, px);
302       }
303     }
304
305     INVARIANT(buf - orig_buf == ptrdiff_t(sz));
306   }
307
308   template <uint64_t Fields>
309   static inline size_t
310   tuple_writer(dbtuple::TupleWriterMode mode, const void *v, uint8_t *p, size_t sz)
311   {
312     const value_type *vx = reinterpret_cast<const value_type *>(v);
313     switch (mode) {
314     case dbtuple::TUPLE_WRITER_NEEDS_OLD_VALUE:
315       return 1;
316     case dbtuple::TUPLE_WRITER_COMPUTE_NEEDED:
317       return compute_needed_standalone(vx, Fields, p, sz);
318     case dbtuple::TUPLE_WRITER_COMPUTE_DELTA_NEEDED:
319       return compute_needed_delta_standalone(vx, Fields);
320     case dbtuple::TUPLE_WRITER_DO_WRITE:
321       do_write_standalone(vx, Fields, p, sz);
322       return 0;
323     case dbtuple::TUPLE_WRITER_DO_DELTA_WRITE:
324       do_delta_write_standalone(vx, Fields, p, sz);
325       return 0;
326     }
327     ALWAYS_ASSERT(false);
328     return 0;
329   }
330
331   class value_writer {
332   public:
333     constexpr value_writer(const value_type *v, uint64_t fields)
334       : v(v), fields(fields) {}
335
336     // old version of record is stored at
337     // [buf, buf+sz).
338     //
339     // compute the new required size for the update
340     inline size_t
341     compute_needed(const uint8_t *buf, size_t sz)
342     {
343       return compute_needed_standalone(v, fields, buf, sz);
344     }
345
346     template <typename StringAllocator>
347     inline const std::string *
348     fully_materialize(bool stable_input, StringAllocator &sa)
349     {
350       INVARIANT(IsAllFields(fields) || fields == 0);
351       if (fields == 0) {
352         // delete
353         INVARIANT(!v);
354         return nullptr;
355       }
356       std::string * const ret = sa();
357       const value_encoder_type value_encoder;
358       value_encoder.write(*ret, v);
359       return ret;
360     }
361
362     // the old value lives in [buf, buf+sz), but [buf, buf+compute_needed())
363     // is valid memory to write to
364     inline void
365     operator()(uint8_t *buf, size_t sz)
366     {
367       do_write_standalone(v, fields, buf, sz);
368     }
369
370   private:
371     const value_type *v;
372     uint64_t fields;
373   };
374
375   typedef key_type Key;
376   typedef key_writer KeyWriter;
377   typedef value_type Value;
378   typedef value_writer ValueWriter;
379   typedef uint64_t ValueInfo;
380
381   //typedef key_reader KeyReader;
382   //typedef single_value_reader SingleValueReader;
383   //typedef value_reader ValueReader;
384
385 };
386
387 template <template <typename> class Transaction, typename Schema>
388 class typed_txn_btree : public base_txn_btree<Transaction, typed_txn_btree_<Schema>> {
389   typedef base_txn_btree<Transaction, typed_txn_btree_<Schema>> super_type;
390 public:
391
392   typedef typename super_type::string_type string_type;
393   typedef typename super_type::size_type size_type;
394
395   typedef typename Schema::base_type base_type;
396   typedef typename Schema::key_type key_type;
397   typedef typename Schema::value_type value_type;
398   typedef typename Schema::value_descriptor_type value_descriptor_type;
399   typedef typename Schema::key_encoder_type key_encoder_type;
400   typedef typename Schema::value_encoder_type value_encoder_type;
401
402 private:
403
404   typedef txn_btree_::key_reader bytes_key_reader;
405   typedef txn_btree_::single_value_reader bytes_single_value_reader;
406   typedef txn_btree_::value_reader bytes_value_reader;
407
408   typedef
409     typename typed_txn_btree_<Schema>::key_writer
410     key_writer;
411
412   typedef
413     typename typed_txn_btree_<Schema>::key_reader
414     key_reader;
415   typedef
416     typename typed_txn_btree_<Schema>::single_value_reader
417     single_value_reader;
418   typedef
419     typename typed_txn_btree_<Schema>::value_reader
420     value_reader;
421
422   template <typename Traits>
423   static constexpr inline bool
424   IsSupportable()
425   {
426     return Traits::stable_input_memory ||
427       (private_::is_trivially_copyable<key_type>::value &&
428        private_::is_trivially_destructible<key_type>::value &&
429        private_::is_trivially_copyable<value_type>::value &&
430        private_::is_trivially_destructible<value_type>::value);
431   }
432
433 public:
434
435   static const uint64_t AllFieldsMask = typed_txn_btree_<Schema>::AllFieldsMask;
436   typedef util::Fields<AllFieldsMask> AllFields;
437
438   struct search_range_callback {
439   public:
440     virtual ~search_range_callback() {}
441     virtual bool invoke(const key_type &k, const value_type &v) = 0;
442   };
443
444   struct bytes_search_range_callback {
445   public:
446     virtual ~bytes_search_range_callback() {}
447     virtual bool invoke(const string_type &k, const string_type &v) = 0;
448   };
449
450   typed_txn_btree(size_type value_size_hint = 128,
451                   bool mostly_append = false,
452                   const std::string &name = "<unknown>")
453     : super_type(value_size_hint, mostly_append, name)
454   {}
455
456   template <typename Traits, typename FieldsMask = AllFields>
457   inline bool search(
458       Transaction<Traits> &t, const key_type &k, value_type &v,
459       FieldsMask fm = FieldsMask());
460
461   template <typename Traits, typename FieldsMask = AllFields>
462   inline void search_range_call(
463       Transaction<Traits> &t, const key_type &lower, const key_type *upper,
464       search_range_callback &callback,
465       bool no_key_results = false /* skip decoding of keys? */,
466       FieldsMask fm = FieldsMask());
467
468   // a lower-level variant which does not bother to decode the key/values
469   template <typename Traits>
470   inline void bytes_search_range_call(
471       Transaction<Traits> &t, const key_type &lower, const key_type *upper,
472       bytes_search_range_callback &callback,
473       size_type value_fields_prefix = std::numeric_limits<size_type>::max());
474
475   template <typename Traits, typename FieldsMask = AllFields>
476   inline void put(
477       Transaction<Traits> &t, const key_type &k, const value_type &v,
478       FieldsMask fm = FieldsMask());
479
480   template <typename Traits>
481   inline void insert(
482       Transaction<Traits> &t, const key_type &k, const value_type &v);
483
484   template <typename Traits>
485   inline void remove(
486       Transaction<Traits> &t, const key_type &k);
487
488 private:
489
490   template <typename Traits>
491   static inline const std::string *
492   stablize(Transaction<Traits> &t, const key_type &k)
493   {
494     key_writer writer(&k);
495     return writer.fully_materialize(
496         Traits::stable_input_memory, t.string_allocator());
497   }
498
499   template <typename Traits>
500   static inline const value_type *
501   stablize(Transaction<Traits> &t, const value_type &v)
502   {
503     if (Traits::stable_input_memory)
504       return &v;
505     std::string * const px = t.string_allocator()();
506     px->assign(reinterpret_cast<const char *>(&v), sizeof(v));
507     return reinterpret_cast<const value_type *>(px->data());
508   }
509
510   key_encoder_type key_encoder;
511   value_encoder_type value_encoder;
512 };
513
514 template <template <typename> class Transaction, typename Schema>
515 template <typename Traits, typename FieldsMask>
516 bool
517 typed_txn_btree<Transaction, Schema>::search(
518     Transaction<Traits> &t, const key_type &k, value_type &v,
519     FieldsMask fm)
520 {
521   // XXX: template single_value_reader with mask
522   single_value_reader vr(v, FieldsMask::value);
523   return this->do_search(t, k, vr);
524 }
525
526 template <template <typename> class Transaction, typename Schema>
527 template <typename Traits, typename FieldsMask>
528 void
529 typed_txn_btree<Transaction, Schema>::search_range_call(
530     Transaction<Traits> &t,
531     const key_type &lower, const key_type *upper,
532     search_range_callback &callback,
533     bool no_key_results,
534     FieldsMask fm)
535 {
536   key_reader kr(no_key_results);
537   value_reader vr(FieldsMask::value);
538   this->do_search_range_call(t, lower, upper, callback, kr, vr);
539 }
540
541 template <template <typename> class Transaction, typename Schema>
542 template <typename Traits>
543 void
544 typed_txn_btree<Transaction, Schema>::bytes_search_range_call(
545     Transaction<Traits> &t, const key_type &lower, const key_type *upper,
546     bytes_search_range_callback &callback,
547     size_type value_fields_prefix)
548 {
549   const value_encoder_type value_encoder;
550   const size_t max_bytes_read =
551     value_encoder.encode_max_nbytes_prefix(value_fields_prefix);
552   bytes_key_reader kr;
553   bytes_value_reader vr(max_bytes_read);
554   this->do_search_range_call(t, lower, upper, callback, kr, vr);
555 }
556
557 template <template <typename> class Transaction, typename Schema>
558 template <typename Traits, typename FieldsMask>
559 void
560 typed_txn_btree<Transaction, Schema>::put(
561     Transaction<Traits> &t, const key_type &k, const value_type &v, FieldsMask fm)
562 {
563   static_assert(IsSupportable<Traits>(), "xx");
564   const dbtuple::tuple_writer_t tw =
565     &typed_txn_btree_<Schema>::template tuple_writer<FieldsMask::value>;
566   this->do_tree_put(t, stablize(t, k), stablize(t, v), tw, false);
567 }
568
569 template <template <typename> class Transaction, typename Schema>
570 template <typename Traits>
571 void
572 typed_txn_btree<Transaction, Schema>::insert(
573     Transaction<Traits> &t, const key_type &k, const value_type &v)
574 {
575   static_assert(IsSupportable<Traits>(), "xx");
576   const dbtuple::tuple_writer_t tw =
577     &typed_txn_btree_<Schema>::template tuple_writer<AllFieldsMask>;
578   this->do_tree_put(t, stablize(t, k), stablize(t, v), tw, true);
579 }
580
581 template <template <typename> class Transaction, typename Schema>
582 template <typename Traits>
583 void
584 typed_txn_btree<Transaction, Schema>::remove(
585     Transaction<Traits> &t, const key_type &k)
586 {
587   static_assert(IsSupportable<Traits>(), "xx");
588   const dbtuple::tuple_writer_t tw =
589     &typed_txn_btree_<Schema>::template tuple_writer<0>;
590   this->do_tree_put(t, stablize(t, k), nullptr, tw, false);
591 }
592
593 #endif /* _NDB_TYPED_TXN_BTREE_H_ */