edits
[c11concurrency-benchmarks.git] / silo / new-benchmarks / bench.h
1 #ifndef _NDB_BENCH_H_
2 #define _NDB_BENCH_H_
3
4 #include <stdint.h>
5
6 #include <map>
7 #include <memory>
8 #include <vector>
9 #include <utility>
10 #include <string>
11
12 #include "abstract_db.h"
13 #include "../macros.h"
14 #include "../thread.h"
15 #include "../util.h"
16 #include "../spinbarrier.h"
17 #include "../rcu.h"
18
19 struct persistconfig {
20   persistconfig()
21     : nofsync_(0), do_compress_(0), fake_writes_(0),
22       disable_gc_(0), disable_snapshots_(0) {}
23   int nofsync_;
24   int do_compress_;
25   int fake_writes_;
26   int disable_gc_;
27   int disable_snapshots_;
28   std::vector<std::string> logfiles_;
29   std::vector<std::vector<unsigned>> assignments_;
30 };
31
32 extern void tpcc_do_test(
33     const std::string &dbtype,
34     const persistconfig &cfg,
35     int argc, char **argv);
36
37 enum { RUNMODE_TIME = 0,
38        RUNMODE_OPS  = 1};
39
40 // benchmark global variables
41 extern size_t nthreads;
42 extern volatile bool running;
43 extern int verbose;
44 extern uint64_t txn_flags;
45 extern double scale_factor;
46 extern uint64_t runtime;
47 extern uint64_t ops_per_worker;
48 extern int run_mode;
49 extern int enable_parallel_loading;
50 extern int pin_cpus;
51 extern int slow_exit;
52 extern int retry_aborted_transaction;
53 extern int no_reset_counters;
54 extern int backoff_aborted_transaction;
55
56 // NOTE: the typed_* versions of classes exist so we don't have to convert all
57 // classes to templatetized [for sanity in compliation times]; we trade off
58 // a bit of type-safety for more rapid development cycles
59
60 class scoped_db_thread_ctx {
61 public:
62   scoped_db_thread_ctx(const scoped_db_thread_ctx &) = delete;
63   scoped_db_thread_ctx(scoped_db_thread_ctx &&) = delete;
64   scoped_db_thread_ctx &operator=(const scoped_db_thread_ctx &) = delete;
65
66   scoped_db_thread_ctx(abstract_db *db, bool loader)
67     : db(db)
68   {
69     db->thread_init(loader);
70   }
71   ~scoped_db_thread_ctx()
72   {
73     db->thread_end();
74   }
75 private:
76   abstract_db *const db;
77 };
78
79 class bench_loader : public ndb_thread {
80 public:
81   bench_loader(unsigned long seed, abstract_db *db)
82     : r(seed), db(db), b(0)
83   {
84   }
85   inline void
86   set_barrier(spin_barrier &b)
87   {
88     ALWAYS_ASSERT(!this->b);
89     this->b = &b;
90   }
91   virtual void
92   run()
93   {
94     { // XXX(stephentu): this is a hack
95       scoped_rcu_region r; // register this thread in rcu region
96     }
97     ALWAYS_ASSERT(b);
98     b->count_down();
99     b->wait_for();
100     scoped_db_thread_ctx ctx(db, true);
101     load();
102   }
103 protected:
104
105   virtual void load() = 0;
106
107   util::fast_random r;
108   abstract_db *const db;
109   spin_barrier *b;
110   str_arena arena;
111 };
112
113 template <typename Database>
114 class typed_bench_loader : public bench_loader {
115 public:
116   typed_bench_loader(unsigned long seed, Database *db)
117     : bench_loader(seed, db) {}
118   inline Database *
119   typed_db()
120   {
121     return static_cast<Database *>(db);
122   }
123   inline const Database *
124   typed_db() const
125   {
126     return static_cast<const Database *>(db);
127   }
128 };
129
130 class bench_worker : public ndb_thread {
131 public:
132
133   bench_worker(unsigned int worker_id,
134                bool set_core_id,
135                unsigned long seed, abstract_db *db,
136                spin_barrier *barrier_a, spin_barrier *barrier_b)
137     : worker_id(worker_id), set_core_id(set_core_id),
138       r(seed), db(db),
139       barrier_a(barrier_a), barrier_b(barrier_b),
140       // the ntxn_* numbers are per worker
141       ntxn_commits(0), ntxn_aborts(0),
142       latency_numer_us(0),
143       backoff_shifts(0), // spin between [0, 2^backoff_shifts) times before retry
144       size_delta(0)
145   {
146   }
147
148   virtual ~bench_worker() {}
149
150   // returns [did_commit?, size_increase_bytes]
151   typedef std::pair<bool, ssize_t> txn_result;
152   typedef txn_result (*txn_fn_t)(bench_worker *);
153
154   struct workload_desc {
155     workload_desc() {}
156     workload_desc(const std::string &name, double frequency, txn_fn_t fn)
157       : name(name), frequency(frequency), fn(fn)
158     {
159       ALWAYS_ASSERT(frequency > 0.0);
160       ALWAYS_ASSERT(frequency <= 1.0);
161     }
162     std::string name;
163     double frequency;
164     txn_fn_t fn;
165   };
166   typedef std::vector<workload_desc> workload_desc_vec;
167   virtual workload_desc_vec get_workload() const = 0;
168
169   virtual void run();
170
171   inline size_t get_ntxn_commits() const { return ntxn_commits; }
172   inline size_t get_ntxn_aborts() const { return ntxn_aborts; }
173
174   inline uint64_t get_latency_numer_us() const { return latency_numer_us; }
175
176   inline double
177   get_avg_latency_us() const
178   {
179     return double(latency_numer_us) / double(ntxn_commits);
180   }
181
182   std::map<std::string, size_t> get_txn_counts() const;
183
184   typedef abstract_db::counter_map counter_map;
185   typedef abstract_db::txn_counter_map txn_counter_map;
186
187 #ifdef ENABLE_BENCH_TXN_COUNTERS
188   inline txn_counter_map
189   get_local_txn_counters() const
190   {
191     return local_txn_counters;
192   }
193 #endif
194
195   inline ssize_t get_size_delta() const { return size_delta; }
196
197 protected:
198
199   virtual void on_run_setup() {}
200
201   unsigned int worker_id;
202   bool set_core_id;
203   util::fast_random r;
204   abstract_db *const db;
205   spin_barrier *const barrier_a;
206   spin_barrier *const barrier_b;
207
208 private:
209   size_t ntxn_commits;
210   size_t ntxn_aborts;
211   uint64_t latency_numer_us;
212   unsigned backoff_shifts;
213
214 protected:
215
216 //#ifdef ENABLE_BENCH_TXN_COUNTERS
217 //  txn_counter_map local_txn_counters;
218 //  void measure_txn_counters(void *txn, const char *txn_name);
219 //#else
220 //  inline ALWAYS_INLINE void measure_txn_counters(void *txn, const char *txn_name) {}
221 //#endif
222
223   std::vector<size_t> txn_counts; // breakdown of txns
224   ssize_t size_delta; // how many logical bytes (of values) did the worker add to the DB
225
226   str_arena arena;
227 };
228
229 class bench_runner {
230 public:
231   bench_runner(const bench_runner &) = delete;
232   bench_runner(bench_runner &&) = delete;
233   bench_runner &operator=(const bench_runner &) = delete;
234
235   bench_runner(abstract_db *db)
236     : db(db), barrier_a(nthreads), barrier_b(1) {}
237   virtual ~bench_runner() {}
238   void run();
239 protected:
240   // only called once
241   virtual std::vector<std::unique_ptr<bench_loader>> make_loaders() = 0;
242
243   // only called once
244   virtual std::vector<std::unique_ptr<bench_worker>> make_workers() = 0;
245
246   abstract_db *const db;
247   std::map<std::string, std::shared_ptr<abstract_ordered_index>> open_tables;
248
249   // barriers for actual benchmark execution
250   spin_barrier barrier_a;
251   spin_barrier barrier_b;
252 };
253
254 template <typename Database>
255 class typed_bench_runner : public bench_runner {
256 public:
257   typed_bench_runner(Database *db)
258     : bench_runner(db) {}
259   inline Database *
260   typed_db()
261   {
262     return static_cast<Database *>(db);
263   }
264   inline const Database *
265   typed_db() const
266   {
267     return static_cast<const Database *>(db);
268   }
269 };
270
271 template <typename Index>
272 class latest_key_callback : public Index::bytes_search_range_callback {
273 public:
274
275   latest_key_callback(std::string &k, ssize_t limit = -1)
276     : limit(limit), n(0), k(&k)
277   {
278     ALWAYS_ASSERT(limit == -1 || limit > 0);
279   }
280
281   virtual bool invoke(
282       const std::string &key,
283       const std::string &value)
284   {
285     INVARIANT(limit == -1 || n < size_t(limit));
286     // see the note in bytes_static_limit_callback for why we explicitly
287     // copy over regular (ref-counting) assignment
288     k->assign(key.data(), key.size());
289     ++n;
290     return (limit == -1) || (n < size_t(limit));
291   }
292
293   inline size_t size() const { return n; }
294   inline std::string &kstr() { return *k; }
295
296 private:
297   ssize_t limit;
298   size_t n;
299   std::string *k;
300 };
301
302 namespace private_ {
303   template <typename T, bool enable>
304   struct container {
305     container() {}
306     container(const T &t) {}
307     T & get(); // not defined
308   };
309
310   template <typename T>
311   struct container<T, true> {
312     container() {}
313     container(const T &t) : t(t) {}
314     inline T & get() { return t; }
315     T t;
316   };
317 }
318
319 // explicitly copies keys, because btree::search_range_call() interally
320 // re-uses a single string to pass keys (so using standard string assignment
321 // will force a re-allocation b/c of shared ref-counting)
322 //
323 // this isn't done for values, because each value has a distinct string from
324 // the string allocator, so there are no mutations while holding > 1 ref-count
325 template <typename Index, size_t N, bool ignore_key>
326 class bytes_static_limit_callback : public Index::bytes_search_range_callback {
327 public:
328
329   static_assert(N > 0, "xx");
330
331   bytes_static_limit_callback(str_arena *arena)
332     : arena(arena)
333   {
334   }
335
336   virtual bool invoke(
337       const std::string &key,
338       const std::string &value) OVERRIDE
339   {
340     INVARIANT(size() < N);
341     INVARIANT(arena->manages(&key));
342     INVARIANT(arena->manages(&value));
343     if (ignore_key) {
344       values.emplace_back(nullptr, &value);
345     } else {
346       // see note above
347       std::string * const s_px = arena->next();
348       INVARIANT(s_px && s_px->empty());
349       s_px->assign(key.data(), key.size());
350       values.emplace_back(s_px, &value);
351     }
352     return size() < N;
353   }
354
355   inline size_t
356   size() const
357   {
358     return values.size();
359   }
360
361   inline const std::string &
362   key(size_t i) const
363   {
364     return *values[i].first.get();
365   }
366
367   inline const std::string &
368   value(size_t i) const
369   {
370     return *values[i].second;
371   }
372
373 private:
374   typedef std::pair<
375     private_::container<const std::string *, !ignore_key>,
376     const std::string *> kv_pair;
377   typename util::vec<kv_pair, N>::type values;
378   str_arena *arena;
379 };
380
381 template <typename Index, size_t N, bool ignore_key>
382 class static_limit_callback : public Index::search_range_callback {
383 public:
384
385   static_assert(N > 0, "xx");
386
387   virtual bool
388   invoke(
389       const typename Index::key_type &key,
390       const typename Index::value_type &value) OVERRIDE
391   {
392     INVARIANT(size() < N);
393     values.emplace_back(key, value);
394     return size() < N;
395   }
396
397   inline size_t
398   size() const
399   {
400     return values.size();
401   }
402
403   inline typename Index::key_type &
404   key(size_t i)
405   {
406     return values[i].first.get();
407   }
408
409   inline typename Index::value_type &
410   value(size_t i)
411   {
412     return values[i].second;
413   }
414
415 private:
416   typedef std::pair<
417     private_::container<typename Index::key_type, !ignore_key>,
418     typename Index::value_type> kv_pair;
419   typename util::vec<kv_pair, N>::type values;
420 };
421
422 #endif /* _NDB_BENCH_H_ */