benchmark silo added
[c11concurrency-benchmarks.git] / silo / benchmarks / bench.h
1 #ifndef _NDB_BENCH_H_
2 #define _NDB_BENCH_H_
3
4 #include <stdint.h>
5
6 #include <map>
7 #include <vector>
8 #include <utility>
9 #include <string>
10
11 #include "abstract_db.h"
12 #include "../macros.h"
13 #include "../thread.h"
14 #include "../util.h"
15 #include "../spinbarrier.h"
16 #include "../rcu.h"
17
18 extern void ycsb_do_test(abstract_db *db, int argc, char **argv);
19 extern void tpcc_do_test(abstract_db *db, int argc, char **argv);
20 extern void queue_do_test(abstract_db *db, int argc, char **argv);
21 extern void encstress_do_test(abstract_db *db, int argc, char **argv);
22 extern void bid_do_test(abstract_db *db, int argc, char **argv);
23
24 enum {
25   RUNMODE_TIME = 0,
26   RUNMODE_OPS  = 1
27 };
28
29 // benchmark global variables
30 extern size_t nthreads;
31 extern volatile bool running;
32 extern int verbose;
33 extern uint64_t txn_flags;
34 extern double scale_factor;
35 extern uint64_t runtime;
36 extern uint64_t ops_per_worker;
37 extern int run_mode;
38 extern int enable_parallel_loading;
39 extern int pin_cpus;
40 extern int slow_exit;
41 extern int retry_aborted_transaction;
42 extern int no_reset_counters;
43 extern int backoff_aborted_transaction;
44
45 class scoped_db_thread_ctx {
46 public:
47   scoped_db_thread_ctx(const scoped_db_thread_ctx &) = delete;
48   scoped_db_thread_ctx(scoped_db_thread_ctx &&) = delete;
49   scoped_db_thread_ctx &operator=(const scoped_db_thread_ctx &) = delete;
50
51   scoped_db_thread_ctx(abstract_db *db, bool loader)
52     : db(db)
53   {
54     db->thread_init(loader);
55   }
56   ~scoped_db_thread_ctx()
57   {
58     db->thread_end();
59   }
60 private:
61   abstract_db *const db;
62 };
63
64 class bench_loader : public ndb_thread {
65 public:
66   bench_loader(unsigned long seed, abstract_db *db,
67                const std::map<std::string, abstract_ordered_index *> &open_tables)
68     : r(seed), db(db), open_tables(open_tables), b(0)
69   {
70     txn_obj_buf.reserve(str_arena::MinStrReserveLength);
71     txn_obj_buf.resize(db->sizeof_txn_object(txn_flags));
72   }
73   inline void
74   set_barrier(spin_barrier &b)
75   {
76     ALWAYS_ASSERT(!this->b);
77     this->b = &b;
78   }
79   virtual void
80   run()
81   {
82     { // XXX(stephentu): this is a hack
83       scoped_rcu_region r; // register this thread in rcu region
84     }
85     ALWAYS_ASSERT(b);
86     b->count_down();
87     b->wait_for();
88     scoped_db_thread_ctx ctx(db, true);
89     load();
90   }
91 protected:
92   inline void *txn_buf() { return (void *) txn_obj_buf.data(); }
93
94   virtual void load() = 0;
95
96   util::fast_random r;
97   abstract_db *const db;
98   std::map<std::string, abstract_ordered_index *> open_tables;
99   spin_barrier *b;
100   std::string txn_obj_buf;
101   str_arena arena;
102 };
103
104 class bench_worker : public ndb_thread {
105 public:
106
107   bench_worker(unsigned int worker_id,
108                bool set_core_id,
109                unsigned long seed, abstract_db *db,
110                const std::map<std::string, abstract_ordered_index *> &open_tables,
111                spin_barrier *barrier_a, spin_barrier *barrier_b)
112     : worker_id(worker_id), set_core_id(set_core_id),
113       r(seed), db(db), open_tables(open_tables),
114       barrier_a(barrier_a), barrier_b(barrier_b),
115       // the ntxn_* numbers are per worker
116       ntxn_commits(0), ntxn_aborts(0),
117       latency_numer_us(0),
118       backoff_shifts(0), // spin between [0, 2^backoff_shifts) times before retry
119       size_delta(0)
120   {
121     txn_obj_buf.reserve(str_arena::MinStrReserveLength);
122     txn_obj_buf.resize(db->sizeof_txn_object(txn_flags));
123   }
124
125   virtual ~bench_worker() {}
126
127   // returns [did_commit?, size_increase_bytes]
128   typedef std::pair<bool, ssize_t> txn_result;
129   typedef txn_result (*txn_fn_t)(bench_worker *);
130
131   struct workload_desc {
132     workload_desc() {}
133     workload_desc(const std::string &name, double frequency, txn_fn_t fn)
134       : name(name), frequency(frequency), fn(fn)
135     {
136       ALWAYS_ASSERT(frequency > 0.0);
137       ALWAYS_ASSERT(frequency <= 1.0);
138     }
139     std::string name;
140     double frequency;
141     txn_fn_t fn;
142   };
143   typedef std::vector<workload_desc> workload_desc_vec;
144   virtual workload_desc_vec get_workload() const = 0;
145
146   virtual void run();
147
148   inline size_t get_ntxn_commits() const { return ntxn_commits; }
149   inline size_t get_ntxn_aborts() const { return ntxn_aborts; }
150
151   inline uint64_t get_latency_numer_us() const { return latency_numer_us; }
152
153   inline double
154   get_avg_latency_us() const
155   {
156     return double(latency_numer_us) / double(ntxn_commits);
157   }
158
159   std::map<std::string, size_t> get_txn_counts() const;
160
161   typedef abstract_db::counter_map counter_map;
162   typedef abstract_db::txn_counter_map txn_counter_map;
163
164 #ifdef ENABLE_BENCH_TXN_COUNTERS
165   inline txn_counter_map
166   get_local_txn_counters() const
167   {
168     return local_txn_counters;
169   }
170 #endif
171
172   inline ssize_t get_size_delta() const { return size_delta; }
173
174 protected:
175
176   virtual void on_run_setup() {}
177
178   inline void *txn_buf() { return (void *) txn_obj_buf.data(); }
179
180   unsigned int worker_id;
181   bool set_core_id;
182   util::fast_random r;
183   abstract_db *const db;
184   std::map<std::string, abstract_ordered_index *> open_tables;
185   spin_barrier *const barrier_a;
186   spin_barrier *const barrier_b;
187
188 private:
189   size_t ntxn_commits;
190   size_t ntxn_aborts;
191   uint64_t latency_numer_us;
192   unsigned backoff_shifts;
193
194 protected:
195
196 #ifdef ENABLE_BENCH_TXN_COUNTERS
197   txn_counter_map local_txn_counters;
198   void measure_txn_counters(void *txn, const char *txn_name);
199 #else
200   inline ALWAYS_INLINE void measure_txn_counters(void *txn, const char *txn_name) {}
201 #endif
202
203   std::vector<size_t> txn_counts; // breakdown of txns
204   ssize_t size_delta; // how many logical bytes (of values) did the worker add to the DB
205
206   std::string txn_obj_buf;
207   str_arena arena;
208 };
209
210 class bench_runner {
211 public:
212   bench_runner(const bench_runner &) = delete;
213   bench_runner(bench_runner &&) = delete;
214   bench_runner &operator=(const bench_runner &) = delete;
215
216   bench_runner(abstract_db *db)
217     : db(db), barrier_a(nthreads), barrier_b(1) {}
218   virtual ~bench_runner() {}
219   void run();
220 protected:
221   // only called once
222   virtual std::vector<bench_loader*> make_loaders() = 0;
223
224   // only called once
225   virtual std::vector<bench_worker*> make_workers() = 0;
226
227   abstract_db *const db;
228   std::map<std::string, abstract_ordered_index *> open_tables;
229
230   // barriers for actual benchmark execution
231   spin_barrier barrier_a;
232   spin_barrier barrier_b;
233 };
234
235 // XXX(stephentu): limit_callback is not optimal, should use
236 // static_limit_callback if possible
237 class limit_callback : public abstract_ordered_index::scan_callback {
238 public:
239   limit_callback(ssize_t limit = -1)
240     : limit(limit), n(0)
241   {
242     ALWAYS_ASSERT(limit == -1 || limit > 0);
243   }
244
245   virtual bool invoke(
246       const char *keyp, size_t keylen,
247       const std::string &value)
248   {
249     INVARIANT(limit == -1 || n < size_t(limit));
250     values.emplace_back(std::string(keyp, keylen), value);
251     return (limit == -1) || (++n < size_t(limit));
252   }
253
254   typedef std::pair<std::string, std::string> kv_pair;
255   std::vector<kv_pair> values;
256
257   const ssize_t limit;
258 private:
259   size_t n;
260 };
261
262
263 class latest_key_callback : public abstract_ordered_index::scan_callback {
264 public:
265   latest_key_callback(std::string &k, ssize_t limit = -1)
266     : limit(limit), n(0), k(&k)
267   {
268     ALWAYS_ASSERT(limit == -1 || limit > 0);
269   }
270
271   virtual bool invoke(
272       const char *keyp, size_t keylen,
273       const std::string &value)
274   {
275     INVARIANT(limit == -1 || n < size_t(limit));
276     k->assign(keyp, keylen);
277     ++n;
278     return (limit == -1) || (n < size_t(limit));
279   }
280
281   inline size_t size() const { return n; }
282   inline std::string &kstr() { return *k; }
283
284 private:
285   ssize_t limit;
286   size_t n;
287   std::string *k;
288 };
289
290 // explicitly copies keys, because btree::search_range_call() interally
291 // re-uses a single string to pass keys (so using standard string assignment
292 // will force a re-allocation b/c of shared ref-counting)
293 //
294 // this isn't done for values, because each value has a distinct string from
295 // the string allocator, so there are no mutations while holding > 1 ref-count
296 template <size_t N>
297 class static_limit_callback : public abstract_ordered_index::scan_callback {
298 public:
299   // XXX: push ignore_key into lower layer
300   static_limit_callback(str_arena *arena, bool ignore_key)
301     : n(0), arena(arena), ignore_key(ignore_key)
302   {
303     static_assert(N > 0, "xx");
304   }
305
306   virtual bool invoke(
307       const char *keyp, size_t keylen,
308       const std::string &value)
309   {
310     INVARIANT(n < N);
311     INVARIANT(arena->manages(&value));
312     if (ignore_key) {
313       values.emplace_back(nullptr, &value);
314     } else {
315       std::string * const s_px = arena->next();
316       INVARIANT(s_px && s_px->empty());
317       s_px->assign(keyp, keylen);
318       values.emplace_back(s_px, &value);
319     }
320     return ++n < N;
321   }
322
323   inline size_t
324   size() const
325   {
326     return values.size();
327   }
328
329   typedef std::pair<const std::string *, const std::string *> kv_pair;
330   typename util::vec<kv_pair, N>::type values;
331
332 private:
333   size_t n;
334   str_arena *arena;
335   bool ignore_key;
336 };
337
338 #endif /* _NDB_BENCH_H_ */