benchmark silo added
[c11concurrency-benchmarks.git] / silo / benchmarks / kvdb_wrapper_impl.h
1 #ifndef _KVDB_WRAPPER_IMPL_H_
2 #define _KVDB_WRAPPER_IMPL_H_
3
4 #include <vector>
5 #include <limits>
6 #include <utility>
7
8 #include "kvdb_wrapper.h"
9 #include "../varint.h"
10 #include "../macros.h"
11 #include "../util.h"
12 #include "../amd64.h"
13 #include "../lockguard.h"
14 #include "../prefetch.h"
15 #include "../scopedperf.hh"
16 #include "../counter.h"
17
18 namespace private_ {
19   static event_avg_counter evt_avg_kvdb_stable_version_spins("avg_kvdb_stable_version_spins");
20   static event_avg_counter evt_avg_kvdb_lock_acquire_spins("avg_kvdb_lock_acquire_spins");
21   static event_avg_counter evt_avg_kvdb_read_retries("avg_kvdb_read_retries");
22
23   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_get_probe0, kvdb_get_probe0_cg);
24   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_get_probe1, kvdb_get_probe1_cg);
25   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_put_probe0, kvdb_put_probe0_cg);
26   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_insert_probe0, kvdb_insert_probe0_cg);
27   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_scan_probe0, kvdb_scan_probe0_cg);
28   STATIC_COUNTER_DECL(scopedperf::tsc_ctr, kvdb_remove_probe0, kvdb_remove_probe0_cg);
29 }
30
31 // defines single-threaded version
32 template <bool UseConcurrencyControl>
33 struct record_version {
34   uint16_t sz;
35
36   inline ALWAYS_INLINE bool
37   is_locked() const
38   {
39     return false;
40   }
41
42   inline ALWAYS_INLINE void lock() {}
43
44   inline ALWAYS_INLINE void unlock() {}
45
46   static inline ALWAYS_INLINE size_t
47   Size(uint32_t v)
48   {
49     return 0;
50   }
51
52   inline ALWAYS_INLINE size_t
53   size() const
54   {
55     return sz;
56   }
57
58   inline ALWAYS_INLINE void
59   set_size(size_t s)
60   {
61     INVARIANT(s <= std::numeric_limits<uint16_t>::max());
62     sz = s;
63   }
64
65   inline ALWAYS_INLINE uint32_t
66   stable_version() const
67   {
68     return 0;
69   }
70
71   inline ALWAYS_INLINE bool
72   check_version(uint32_t version) const
73   {
74     return true;
75   }
76 };
77
78 // concurrency control version
79 template <>
80 struct record_version<true> {
81   // [ locked | size  | version ]
82   // [  0..1  | 1..17 | 17..32  ]
83
84   static const uint32_t HDR_LOCKED_MASK = 0x1;
85
86   static const uint32_t HDR_SIZE_SHIFT = 1;
87   static const uint32_t HDR_SIZE_MASK = std::numeric_limits<uint16_t>::max() << HDR_SIZE_SHIFT;
88
89   static const uint32_t HDR_VERSION_SHIFT = 17;
90   static const uint32_t HDR_VERSION_MASK = ((uint32_t)-1) << HDR_VERSION_SHIFT;
91
92   record_version<true>() : hdr(0) {}
93
94   volatile uint32_t hdr;
95
96   static inline bool
97   IsLocked(uint32_t v)
98   {
99     return v & HDR_LOCKED_MASK;
100   }
101
102   inline bool
103   is_locked() const
104   {
105     return IsLocked(hdr);
106   }
107
108   inline void
109   lock()
110   {
111 #ifdef ENABLE_EVENT_COUNTERS
112     unsigned long nspins = 0;
113 #endif
114     uint32_t v = hdr;
115     while (IsLocked(v) ||
116            !__sync_bool_compare_and_swap(&hdr, v, v | HDR_LOCKED_MASK)) {
117       nop_pause();
118       v = hdr;
119 #ifdef ENABLE_EVENT_COUNTERS
120       ++nspins;
121 #endif
122     }
123     COMPILER_MEMORY_FENCE;
124 #ifdef ENABLE_EVENT_COUNTERS
125     private_::evt_avg_kvdb_lock_acquire_spins.offer(nspins);
126 #endif
127   }
128
129   inline void
130   unlock()
131   {
132     uint32_t v = hdr;
133     INVARIANT(IsLocked(v));
134     const uint32_t n = Version(v);
135     v &= ~HDR_VERSION_MASK;
136     v |= (((n + 1) << HDR_VERSION_SHIFT) & HDR_VERSION_MASK);
137     v &= ~HDR_LOCKED_MASK;
138     INVARIANT(!IsLocked(v));
139     COMPILER_MEMORY_FENCE;
140     hdr = v;
141   }
142
143   static inline size_t
144   Size(uint32_t v)
145   {
146     return (v & HDR_SIZE_MASK) >> HDR_SIZE_SHIFT;
147   }
148
149   inline size_t
150   size() const
151   {
152     return Size(hdr);
153   }
154
155   inline void
156   set_size(size_t s)
157   {
158     INVARIANT(s <= std::numeric_limits<uint16_t>::max());
159     INVARIANT(is_locked());
160     const uint16_t new_sz = static_cast<uint16_t>(s);
161     hdr &= ~HDR_SIZE_MASK;
162     hdr |= (new_sz << HDR_SIZE_SHIFT);
163     INVARIANT(size() == s);
164   }
165
166   static inline uint32_t
167   Version(uint32_t v)
168   {
169     return (v & HDR_VERSION_MASK) >> HDR_VERSION_SHIFT;
170   }
171
172   inline uint32_t
173   stable_version() const
174   {
175     uint32_t v = hdr;
176 #ifdef ENABLE_EVENT_COUNTERS
177     unsigned long nspins = 0;
178 #endif
179     while (IsLocked(v)) {
180       nop_pause();
181       v = hdr;
182 #ifdef ENABLE_EVENT_COUNTERS
183       ++nspins;
184 #endif
185     }
186     COMPILER_MEMORY_FENCE;
187 #ifdef ENABLE_EVENT_COUNTERS
188     private_::evt_avg_kvdb_stable_version_spins.offer(nspins);
189 #endif
190     return v;
191   }
192
193   inline bool
194   check_version(uint32_t version) const
195   {
196     COMPILER_MEMORY_FENCE;
197     return hdr == version;
198   }
199 };
200
201 template <bool UseConcurrencyControl>
202 struct basic_kvdb_record : public record_version<UseConcurrencyControl> {
203   typedef record_version<UseConcurrencyControl> super_type;
204   uint16_t alloc_size;
205   char data[0];
206
207   basic_kvdb_record(uint16_t alloc_size, const std::string &s)
208     : record_version<UseConcurrencyControl>(),
209       alloc_size(alloc_size)
210   {
211 #ifdef CHECK_INVARIANTS
212     this->lock();
213     this->set_size(s.size());
214     do_write(s);
215     this->unlock();
216 #else
217     this->set_size(s.size());
218     do_write(s);
219 #endif
220   }
221
222   inline void
223   prefetch() const
224   {
225 #ifdef PREFETCH
226     prefetch_bytes(this, sizeof(*this) + size());
227 #endif
228   }
229
230   inline void
231   do_read(std::string &s, size_t max_bytes_read) const
232   {
233     if (UseConcurrencyControl) {
234 #ifdef ENABLE_EVENT_COUNTERS
235       unsigned long nretries = 0;
236 #endif
237     retry:
238       const uint32_t v = this->stable_version();
239       const size_t sz = std::min(super_type::Size(v), max_bytes_read);
240       s.assign(&data[0], sz);
241       if (unlikely(!this->check_version(v))) {
242 #ifdef ENABLE_EVENT_COUNTERS
243         ++nretries;
244 #endif
245         goto retry;
246       }
247 #ifdef ENABLE_EVENT_COUNTERS
248       private_::evt_avg_kvdb_read_retries.offer(nretries);
249 #endif
250     } else {
251       const size_t sz = std::min(this->size(), max_bytes_read);
252       s.assign(&data[0], sz);
253     }
254   }
255
256   inline bool
257   do_write(const std::string &s)
258   {
259     INVARIANT(!UseConcurrencyControl || this->is_locked());
260     if (unlikely(s.size() > alloc_size))
261       return false;
262     this->set_size(s.size());
263     NDB_MEMCPY(&data[0], s.data(), s.size());
264     return true;
265   }
266
267   static basic_kvdb_record *
268   alloc(const std::string &s)
269   {
270     const size_t sz = s.size();
271     const size_t max_alloc_sz =
272       std::numeric_limits<uint16_t>::max() + sizeof(basic_kvdb_record);
273     const size_t alloc_sz =
274       std::min(
275           util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(basic_kvdb_record) + sz),
276           max_alloc_sz);
277     char * const p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
278     INVARIANT(p);
279     return new (p) basic_kvdb_record(alloc_sz - sizeof(basic_kvdb_record), s);
280   }
281
282 private:
283   static inline void
284   deleter(void *r)
285   {
286     basic_kvdb_record * const px =
287       reinterpret_cast<basic_kvdb_record *>(r);
288     const size_t alloc_sz = px->alloc_size + sizeof(*px);
289     px->~basic_kvdb_record();
290     rcu::s_instance.dealloc(px, alloc_sz);
291   }
292
293 public:
294   static void
295   release(basic_kvdb_record *r)
296   {
297     if (unlikely(!r))
298       return;
299     rcu::s_instance.free_with_fn(r, deleter);
300   }
301
302   static void
303   release_no_rcu(basic_kvdb_record *r)
304   {
305     if (unlikely(!r))
306       return;
307     deleter(r);
308   }
309
310 } PACKED;
311
312 template <bool UseConcurrencyControl>
313 bool
314 kvdb_ordered_index<UseConcurrencyControl>::get(
315     void *txn,
316     const std::string &key,
317     std::string &value, size_t max_bytes_read)
318 {
319   typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
320   ANON_REGION("kvdb_ordered_index::get:", &private_::kvdb_get_probe0_cg);
321   typename my_btree::value_type v = 0;
322   if (btr.search(varkey(key), v)) {
323     ANON_REGION("kvdb_ordered_index::get:do_read:", &private_::kvdb_get_probe1_cg);
324     const kvdb_record * const r = (const kvdb_record *) v;
325     r->prefetch();
326     r->do_read(value, max_bytes_read);
327     return true;
328   }
329   return false;
330 }
331
332 template <bool UseConcurrencyControl>
333 const char *
334 kvdb_ordered_index<UseConcurrencyControl>::put(
335     void *txn,
336     const std::string &key,
337     const std::string &value)
338 {
339   typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
340   ANON_REGION("kvdb_ordered_index::put:", &private_::kvdb_put_probe0_cg);
341   typename my_btree::value_type v = 0, v_old = 0;
342   if (btr.search(varkey(key), v)) {
343     // easy
344     kvdb_record * const r = (kvdb_record *) v;
345     r->prefetch();
346     lock_guard<kvdb_record> guard(*r);
347     if (r->do_write(value))
348       return 0;
349     // replace
350     kvdb_record * const rnew = kvdb_record::alloc(value);
351     btr.insert(varkey(key), (typename my_btree::value_type) rnew, &v_old, 0);
352     INVARIANT((typename my_btree::value_type) r == v_old);
353     // rcu-free the old record
354     kvdb_record::release(r);
355     return 0;
356   }
357   kvdb_record * const rnew = kvdb_record::alloc(value);
358   if (!btr.insert(varkey(key), (typename my_btree::value_type) rnew, &v_old, 0)) {
359     kvdb_record * const r = (kvdb_record *) v_old;
360     kvdb_record::release(r);
361   }
362   return 0;
363 }
364
365 template <bool UseConcurrencyControl>
366 const char *
367 kvdb_ordered_index<UseConcurrencyControl>::insert(void *txn,
368                            const std::string &key,
369                            const std::string &value)
370 {
371   typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
372   ANON_REGION("kvdb_ordered_index::insert:", &private_::kvdb_insert_probe0_cg);
373   kvdb_record * const rnew = kvdb_record::alloc(value);
374   typename my_btree::value_type v_old = 0;
375   if (!btr.insert(varkey(key), (typename my_btree::value_type) rnew, &v_old, 0)) {
376     kvdb_record * const r = (kvdb_record *) v_old;
377     kvdb_record::release(r);
378   }
379   return 0;
380 }
381
382 template <typename Btree, bool UseConcurrencyControl>
383 class kvdb_wrapper_search_range_callback : public Btree::search_range_callback {
384 public:
385   typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
386   kvdb_wrapper_search_range_callback(
387       abstract_ordered_index::scan_callback &upcall,
388       str_arena *arena)
389     : upcall(&upcall), arena(arena) {}
390
391   virtual bool
392   invoke(const typename Btree::string_type &k, typename Btree::value_type v)
393   {
394     const kvdb_record * const r = (const kvdb_record *) v;
395     std::string * const s_px = likely(arena) ? arena->next() : nullptr;
396     INVARIANT(s_px && s_px->empty());
397     r->prefetch();
398     r->do_read(*s_px, std::numeric_limits<size_t>::max());
399     return upcall->invoke(k.data(), k.length(), *s_px);
400   }
401
402 private:
403   abstract_ordered_index::scan_callback *upcall;
404   str_arena *arena;
405 };
406
407 template <bool UseConcurrencyControl>
408 void
409 kvdb_ordered_index<UseConcurrencyControl>::scan(
410     void *txn,
411     const std::string &start_key,
412     const std::string *end_key,
413     scan_callback &callback,
414     str_arena *arena)
415 {
416   ANON_REGION("kvdb_ordered_index::scan:", &private_::kvdb_scan_probe0_cg);
417   kvdb_wrapper_search_range_callback<my_btree, UseConcurrencyControl> c(callback, arena);
418   key_type end(end_key ? key_type(*end_key) : key_type());
419   btr.search_range_call(key_type(start_key), end_key ? &end : 0, c, arena->next());
420 }
421
422 template <bool UseConcurrencyControl>
423 void
424 kvdb_ordered_index<UseConcurrencyControl>::rscan(
425     void *txn,
426     const std::string &start_key,
427     const std::string *end_key,
428     scan_callback &callback,
429     str_arena *arena)
430 {
431   ANON_REGION("kvdb_ordered_index::rscan:", &private_::kvdb_scan_probe0_cg);
432   kvdb_wrapper_search_range_callback<my_btree, UseConcurrencyControl> c(callback, arena);
433   key_type end(end_key ? key_type(*end_key) : key_type());
434   btr.rsearch_range_call(key_type(start_key), end_key ? &end : 0, c, arena->next());
435 }
436
437 template <bool UseConcurrencyControl>
438 void
439 kvdb_ordered_index<UseConcurrencyControl>::remove(void *txn, const std::string &key)
440 {
441   typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
442   ANON_REGION("kvdb_ordered_index::remove:", &private_::kvdb_remove_probe0_cg);
443   typename my_btree::value_type v = 0;
444   if (btr.remove(varkey(key), &v)) {
445     kvdb_record * const r = (kvdb_record *) v;
446     kvdb_record::release(r);
447   }
448 }
449
450 template <bool UseConcurrencyControl>
451 size_t
452 kvdb_ordered_index<UseConcurrencyControl>::size() const
453 {
454   return btr.size();
455 }
456
457 template <typename Btree, bool UseConcurrencyControl>
458 struct purge_tree_walker : public Btree::tree_walk_callback {
459   typedef basic_kvdb_record<UseConcurrencyControl> kvdb_record;
460
461 #ifdef TXN_BTREE_DUMP_PURGE_STATS
462   purge_tree_walker()
463     : purge_stats_nodes(0),
464       purge_stats_nosuffix_nodes(0) {}
465   std::map<size_t, size_t> purge_stats_record_size_counts; // just the record
466   std::map<size_t, size_t> purge_stats_alloc_size_counts; // includes overhead
467   std::vector<uint16_t> purge_stats_nkeys_node;
468   size_t purge_stats_nodes;
469   size_t purge_stats_nosuffix_nodes;
470
471   void
472   dump_stats()
473   {
474     size_t v = 0;
475     for (std::vector<uint16_t>::iterator it = purge_stats_nkeys_node.begin();
476         it != purge_stats_nkeys_node.end(); ++it)
477       v += *it;
478     const double avg_nkeys_node = double(v)/double(purge_stats_nkeys_node.size());
479     const double avg_fill_factor = avg_nkeys_node/double(Btree::NKeysPerNode);
480     std::cerr << "btree node stats" << std::endl;
481     std::cerr << "    avg_nkeys_node: " << avg_nkeys_node << std::endl;
482     std::cerr << "    avg_fill_factor: " << avg_fill_factor << std::endl;
483     std::cerr << "    num_nodes: " << purge_stats_nodes << std::endl;
484     std::cerr << "    num_nosuffix_nodes: " << purge_stats_nosuffix_nodes << std::endl;
485     std::cerr << "record size stats (nbytes => count)" << std::endl;
486     for (std::map<size_t, size_t>::iterator it = purge_stats_record_size_counts.begin();
487         it != purge_stats_record_size_counts.end(); ++it)
488       std::cerr << "    " << it->first << " => " << it->second << std::endl;
489     std::cerr << "alloc size stats  (nbytes => count)" << std::endl;
490     for (std::map<size_t, size_t>::iterator it = purge_stats_alloc_size_counts.begin();
491         it != purge_stats_alloc_size_counts.end(); ++it)
492       std::cerr << "    " << (it->first + sizeof(kvdb_record)) << " => " << it->second << std::endl;
493   }
494 #endif
495
496   virtual void
497   on_node_begin(const typename Btree::node_opaque_t *n)
498   {
499     INVARIANT(spec_values.empty());
500     spec_values = Btree::ExtractValues(n);
501   }
502
503   virtual void
504   on_node_success()
505   {
506     for (size_t i = 0; i < spec_values.size(); i++) {
507       kvdb_record * const r = (kvdb_record *) spec_values[i].first;
508       purge_stats_record_size_counts[r->size()]++;
509       purge_stats_alloc_size_counts[r->alloc_size]++;
510       kvdb_record::release_no_rcu(r);
511     }
512 #ifdef TXN_BTREE_DUMP_PURGE_STATS
513     purge_stats_nkeys_node.push_back(spec_values.size());
514     purge_stats_nodes++;
515     for (size_t i = 0; i < spec_values.size(); i++)
516       if (spec_values[i].second)
517         goto done;
518     purge_stats_nosuffix_nodes++;
519 done:
520 #endif
521     spec_values.clear();
522   }
523
524   virtual void
525   on_node_failure()
526   {
527     spec_values.clear();
528   }
529
530 private:
531   std::vector<std::pair<typename Btree::value_type, bool>> spec_values;
532 };
533
534 template <bool UseConcurrencyControl>
535 std::map<std::string, uint64_t>
536 kvdb_ordered_index<UseConcurrencyControl>::clear()
537 {
538
539   purge_tree_walker<my_btree, UseConcurrencyControl> w;
540   scoped_rcu_region guard;
541   btr.tree_walk(w);
542   btr.clear();
543 #ifdef TXN_BTREE_DUMP_PURGE_STATS
544   std::cerr << "purging kvdb index: " << name << std::endl;
545   w.dump_stats();
546 #endif
547   return std::map<std::string, uint64_t>();
548 }
549
550 template <bool UseConcurrencyControl>
551 abstract_ordered_index *
552 kvdb_wrapper<UseConcurrencyControl>::open_index(
553     const std::string &name, size_t value_size_hint, bool mostly_append)
554 {
555   return new kvdb_ordered_index<UseConcurrencyControl>(name);
556 }
557
558 #endif /* _KVDB_WRAPPER_IMPL_H_ */