11 #include "abstract_db.h"
12 #include "../macros.h"
13 #include "../thread.h"
15 #include "../spinbarrier.h"
18 extern void ycsb_do_test(abstract_db *db, int argc, char **argv);
19 extern void tpcc_do_test(abstract_db *db, int argc, char **argv);
20 extern void queue_do_test(abstract_db *db, int argc, char **argv);
21 extern void encstress_do_test(abstract_db *db, int argc, char **argv);
22 extern void bid_do_test(abstract_db *db, int argc, char **argv);
29 // benchmark global variables
30 extern size_t nthreads;
31 extern volatile bool running;
33 extern uint64_t txn_flags;
34 extern double scale_factor;
35 extern uint64_t runtime;
36 extern uint64_t ops_per_worker;
38 extern int enable_parallel_loading;
41 extern int retry_aborted_transaction;
42 extern int no_reset_counters;
43 extern int backoff_aborted_transaction;
45 class scoped_db_thread_ctx {
47 scoped_db_thread_ctx(const scoped_db_thread_ctx &) = delete;
48 scoped_db_thread_ctx(scoped_db_thread_ctx &&) = delete;
49 scoped_db_thread_ctx &operator=(const scoped_db_thread_ctx &) = delete;
51 scoped_db_thread_ctx(abstract_db *db, bool loader)
54 db->thread_init(loader);
56 ~scoped_db_thread_ctx()
61 abstract_db *const db;
64 class bench_loader : public ndb_thread {
66 bench_loader(unsigned long seed, abstract_db *db,
67 const std::map<std::string, abstract_ordered_index *> &open_tables)
68 : r(seed), db(db), open_tables(open_tables), b(0)
70 txn_obj_buf.reserve(str_arena::MinStrReserveLength);
71 txn_obj_buf.resize(db->sizeof_txn_object(txn_flags));
74 set_barrier(spin_barrier &b)
76 ALWAYS_ASSERT(!this->b);
82 { // XXX(stephentu): this is a hack
83 scoped_rcu_region r; // register this thread in rcu region
88 scoped_db_thread_ctx ctx(db, true);
92 inline void *txn_buf() { return (void *) txn_obj_buf.data(); }
94 virtual void load() = 0;
97 abstract_db *const db;
98 std::map<std::string, abstract_ordered_index *> open_tables;
100 std::string txn_obj_buf;
104 class bench_worker : public ndb_thread {
107 bench_worker(unsigned int worker_id,
109 unsigned long seed, abstract_db *db,
110 const std::map<std::string, abstract_ordered_index *> &open_tables,
111 spin_barrier *barrier_a, spin_barrier *barrier_b)
112 : worker_id(worker_id), set_core_id(set_core_id),
113 r(seed), db(db), open_tables(open_tables),
114 barrier_a(barrier_a), barrier_b(barrier_b),
115 // the ntxn_* numbers are per worker
116 ntxn_commits(0), ntxn_aborts(0),
118 backoff_shifts(0), // spin between [0, 2^backoff_shifts) times before retry
121 txn_obj_buf.reserve(str_arena::MinStrReserveLength);
122 txn_obj_buf.resize(db->sizeof_txn_object(txn_flags));
125 virtual ~bench_worker() {}
127 // returns [did_commit?, size_increase_bytes]
128 typedef std::pair<bool, ssize_t> txn_result;
129 typedef txn_result (*txn_fn_t)(bench_worker *);
131 struct workload_desc {
133 workload_desc(const std::string &name, double frequency, txn_fn_t fn)
134 : name(name), frequency(frequency), fn(fn)
136 ALWAYS_ASSERT(frequency > 0.0);
137 ALWAYS_ASSERT(frequency <= 1.0);
143 typedef std::vector<workload_desc> workload_desc_vec;
144 virtual workload_desc_vec get_workload() const = 0;
148 inline size_t get_ntxn_commits() const { return ntxn_commits; }
149 inline size_t get_ntxn_aborts() const { return ntxn_aborts; }
151 inline uint64_t get_latency_numer_us() const { return latency_numer_us; }
154 get_avg_latency_us() const
156 return double(latency_numer_us) / double(ntxn_commits);
159 std::map<std::string, size_t> get_txn_counts() const;
161 typedef abstract_db::counter_map counter_map;
162 typedef abstract_db::txn_counter_map txn_counter_map;
164 #ifdef ENABLE_BENCH_TXN_COUNTERS
165 inline txn_counter_map
166 get_local_txn_counters() const
168 return local_txn_counters;
172 inline ssize_t get_size_delta() const { return size_delta; }
176 virtual void on_run_setup() {}
178 inline void *txn_buf() { return (void *) txn_obj_buf.data(); }
180 unsigned int worker_id;
183 abstract_db *const db;
184 std::map<std::string, abstract_ordered_index *> open_tables;
185 spin_barrier *const barrier_a;
186 spin_barrier *const barrier_b;
191 uint64_t latency_numer_us;
192 unsigned backoff_shifts;
196 #ifdef ENABLE_BENCH_TXN_COUNTERS
197 txn_counter_map local_txn_counters;
198 void measure_txn_counters(void *txn, const char *txn_name);
200 inline ALWAYS_INLINE void measure_txn_counters(void *txn, const char *txn_name) {}
203 std::vector<size_t> txn_counts; // breakdown of txns
204 ssize_t size_delta; // how many logical bytes (of values) did the worker add to the DB
206 std::string txn_obj_buf;
212 bench_runner(const bench_runner &) = delete;
213 bench_runner(bench_runner &&) = delete;
214 bench_runner &operator=(const bench_runner &) = delete;
216 bench_runner(abstract_db *db)
217 : db(db), barrier_a(nthreads), barrier_b(1) {}
218 virtual ~bench_runner() {}
222 virtual std::vector<bench_loader*> make_loaders() = 0;
225 virtual std::vector<bench_worker*> make_workers() = 0;
227 abstract_db *const db;
228 std::map<std::string, abstract_ordered_index *> open_tables;
230 // barriers for actual benchmark execution
231 spin_barrier barrier_a;
232 spin_barrier barrier_b;
235 // XXX(stephentu): limit_callback is not optimal, should use
236 // static_limit_callback if possible
237 class limit_callback : public abstract_ordered_index::scan_callback {
239 limit_callback(ssize_t limit = -1)
242 ALWAYS_ASSERT(limit == -1 || limit > 0);
246 const char *keyp, size_t keylen,
247 const std::string &value)
249 INVARIANT(limit == -1 || n < size_t(limit));
250 values.emplace_back(std::string(keyp, keylen), value);
251 return (limit == -1) || (++n < size_t(limit));
254 typedef std::pair<std::string, std::string> kv_pair;
255 std::vector<kv_pair> values;
263 class latest_key_callback : public abstract_ordered_index::scan_callback {
265 latest_key_callback(std::string &k, ssize_t limit = -1)
266 : limit(limit), n(0), k(&k)
268 ALWAYS_ASSERT(limit == -1 || limit > 0);
272 const char *keyp, size_t keylen,
273 const std::string &value)
275 INVARIANT(limit == -1 || n < size_t(limit));
276 k->assign(keyp, keylen);
278 return (limit == -1) || (n < size_t(limit));
281 inline size_t size() const { return n; }
282 inline std::string &kstr() { return *k; }
290 // explicitly copies keys, because btree::search_range_call() interally
291 // re-uses a single string to pass keys (so using standard string assignment
292 // will force a re-allocation b/c of shared ref-counting)
294 // this isn't done for values, because each value has a distinct string from
295 // the string allocator, so there are no mutations while holding > 1 ref-count
297 class static_limit_callback : public abstract_ordered_index::scan_callback {
299 // XXX: push ignore_key into lower layer
300 static_limit_callback(str_arena *arena, bool ignore_key)
301 : n(0), arena(arena), ignore_key(ignore_key)
303 static_assert(N > 0, "xx");
307 const char *keyp, size_t keylen,
308 const std::string &value)
311 INVARIANT(arena->manages(&value));
313 values.emplace_back(nullptr, &value);
315 std::string * const s_px = arena->next();
316 INVARIANT(s_px && s_px->empty());
317 s_px->assign(keyp, keylen);
318 values.emplace_back(s_px, &value);
326 return values.size();
329 typedef std::pair<const std::string *, const std::string *> kv_pair;
330 typename util::vec<kv_pair, N>::type values;
338 #endif /* _NDB_BENCH_H_ */