12 #include "abstract_db.h"
13 #include "../macros.h"
14 #include "../thread.h"
16 #include "../spinbarrier.h"
19 struct persistconfig {
21 : nofsync_(0), do_compress_(0), fake_writes_(0),
22 disable_gc_(0), disable_snapshots_(0) {}
27 int disable_snapshots_;
28 std::vector<std::string> logfiles_;
29 std::vector<std::vector<unsigned>> assignments_;
32 extern void tpcc_do_test(
33 const std::string &dbtype,
34 const persistconfig &cfg,
35 int argc, char **argv);
37 enum { RUNMODE_TIME = 0,
40 // benchmark global variables
41 extern size_t nthreads;
42 extern volatile bool running;
44 extern uint64_t txn_flags;
45 extern double scale_factor;
46 extern uint64_t runtime;
47 extern uint64_t ops_per_worker;
49 extern int enable_parallel_loading;
52 extern int retry_aborted_transaction;
53 extern int no_reset_counters;
54 extern int backoff_aborted_transaction;
56 // NOTE: the typed_* versions of classes exist so we don't have to convert all
57 // classes to templatetized [for sanity in compliation times]; we trade off
58 // a bit of type-safety for more rapid development cycles
60 class scoped_db_thread_ctx {
62 scoped_db_thread_ctx(const scoped_db_thread_ctx &) = delete;
63 scoped_db_thread_ctx(scoped_db_thread_ctx &&) = delete;
64 scoped_db_thread_ctx &operator=(const scoped_db_thread_ctx &) = delete;
66 scoped_db_thread_ctx(abstract_db *db, bool loader)
69 db->thread_init(loader);
71 ~scoped_db_thread_ctx()
76 abstract_db *const db;
79 class bench_loader : public ndb_thread {
81 bench_loader(unsigned long seed, abstract_db *db)
82 : r(seed), db(db), b(0)
86 set_barrier(spin_barrier &b)
88 ALWAYS_ASSERT(!this->b);
94 { // XXX(stephentu): this is a hack
95 scoped_rcu_region r; // register this thread in rcu region
100 scoped_db_thread_ctx ctx(db, true);
105 virtual void load() = 0;
108 abstract_db *const db;
113 template <typename Database>
114 class typed_bench_loader : public bench_loader {
116 typed_bench_loader(unsigned long seed, Database *db)
117 : bench_loader(seed, db) {}
121 return static_cast<Database *>(db);
123 inline const Database *
126 return static_cast<const Database *>(db);
130 class bench_worker : public ndb_thread {
133 bench_worker(unsigned int worker_id,
135 unsigned long seed, abstract_db *db,
136 spin_barrier *barrier_a, spin_barrier *barrier_b)
137 : worker_id(worker_id), set_core_id(set_core_id),
139 barrier_a(barrier_a), barrier_b(barrier_b),
140 // the ntxn_* numbers are per worker
141 ntxn_commits(0), ntxn_aborts(0),
143 backoff_shifts(0), // spin between [0, 2^backoff_shifts) times before retry
148 virtual ~bench_worker() {}
150 // returns [did_commit?, size_increase_bytes]
151 typedef std::pair<bool, ssize_t> txn_result;
152 typedef txn_result (*txn_fn_t)(bench_worker *);
154 struct workload_desc {
156 workload_desc(const std::string &name, double frequency, txn_fn_t fn)
157 : name(name), frequency(frequency), fn(fn)
159 ALWAYS_ASSERT(frequency > 0.0);
160 ALWAYS_ASSERT(frequency <= 1.0);
166 typedef std::vector<workload_desc> workload_desc_vec;
167 virtual workload_desc_vec get_workload() const = 0;
171 inline size_t get_ntxn_commits() const { return ntxn_commits; }
172 inline size_t get_ntxn_aborts() const { return ntxn_aborts; }
174 inline uint64_t get_latency_numer_us() const { return latency_numer_us; }
177 get_avg_latency_us() const
179 return double(latency_numer_us) / double(ntxn_commits);
182 std::map<std::string, size_t> get_txn_counts() const;
184 typedef abstract_db::counter_map counter_map;
185 typedef abstract_db::txn_counter_map txn_counter_map;
187 #ifdef ENABLE_BENCH_TXN_COUNTERS
188 inline txn_counter_map
189 get_local_txn_counters() const
191 return local_txn_counters;
195 inline ssize_t get_size_delta() const { return size_delta; }
199 virtual void on_run_setup() {}
201 unsigned int worker_id;
204 abstract_db *const db;
205 spin_barrier *const barrier_a;
206 spin_barrier *const barrier_b;
211 uint64_t latency_numer_us;
212 unsigned backoff_shifts;
216 //#ifdef ENABLE_BENCH_TXN_COUNTERS
217 // txn_counter_map local_txn_counters;
218 // void measure_txn_counters(void *txn, const char *txn_name);
220 // inline ALWAYS_INLINE void measure_txn_counters(void *txn, const char *txn_name) {}
223 std::vector<size_t> txn_counts; // breakdown of txns
224 ssize_t size_delta; // how many logical bytes (of values) did the worker add to the DB
231 bench_runner(const bench_runner &) = delete;
232 bench_runner(bench_runner &&) = delete;
233 bench_runner &operator=(const bench_runner &) = delete;
235 bench_runner(abstract_db *db)
236 : db(db), barrier_a(nthreads), barrier_b(1) {}
237 virtual ~bench_runner() {}
241 virtual std::vector<std::unique_ptr<bench_loader>> make_loaders() = 0;
244 virtual std::vector<std::unique_ptr<bench_worker>> make_workers() = 0;
246 abstract_db *const db;
247 std::map<std::string, std::shared_ptr<abstract_ordered_index>> open_tables;
249 // barriers for actual benchmark execution
250 spin_barrier barrier_a;
251 spin_barrier barrier_b;
254 template <typename Database>
255 class typed_bench_runner : public bench_runner {
257 typed_bench_runner(Database *db)
258 : bench_runner(db) {}
262 return static_cast<Database *>(db);
264 inline const Database *
267 return static_cast<const Database *>(db);
271 template <typename Index>
272 class latest_key_callback : public Index::bytes_search_range_callback {
275 latest_key_callback(std::string &k, ssize_t limit = -1)
276 : limit(limit), n(0), k(&k)
278 ALWAYS_ASSERT(limit == -1 || limit > 0);
282 const std::string &key,
283 const std::string &value)
285 INVARIANT(limit == -1 || n < size_t(limit));
286 // see the note in bytes_static_limit_callback for why we explicitly
287 // copy over regular (ref-counting) assignment
288 k->assign(key.data(), key.size());
290 return (limit == -1) || (n < size_t(limit));
293 inline size_t size() const { return n; }
294 inline std::string &kstr() { return *k; }
303 template <typename T, bool enable>
306 container(const T &t) {}
307 T & get(); // not defined
310 template <typename T>
311 struct container<T, true> {
313 container(const T &t) : t(t) {}
314 inline T & get() { return t; }
319 // explicitly copies keys, because btree::search_range_call() interally
320 // re-uses a single string to pass keys (so using standard string assignment
321 // will force a re-allocation b/c of shared ref-counting)
323 // this isn't done for values, because each value has a distinct string from
324 // the string allocator, so there are no mutations while holding > 1 ref-count
325 template <typename Index, size_t N, bool ignore_key>
326 class bytes_static_limit_callback : public Index::bytes_search_range_callback {
329 static_assert(N > 0, "xx");
331 bytes_static_limit_callback(str_arena *arena)
337 const std::string &key,
338 const std::string &value) OVERRIDE
340 INVARIANT(size() < N);
341 INVARIANT(arena->manages(&key));
342 INVARIANT(arena->manages(&value));
344 values.emplace_back(nullptr, &value);
347 std::string * const s_px = arena->next();
348 INVARIANT(s_px && s_px->empty());
349 s_px->assign(key.data(), key.size());
350 values.emplace_back(s_px, &value);
358 return values.size();
361 inline const std::string &
364 return *values[i].first.get();
367 inline const std::string &
368 value(size_t i) const
370 return *values[i].second;
375 private_::container<const std::string *, !ignore_key>,
376 const std::string *> kv_pair;
377 typename util::vec<kv_pair, N>::type values;
381 template <typename Index, size_t N, bool ignore_key>
382 class static_limit_callback : public Index::search_range_callback {
385 static_assert(N > 0, "xx");
389 const typename Index::key_type &key,
390 const typename Index::value_type &value) OVERRIDE
392 INVARIANT(size() < N);
393 values.emplace_back(key, value);
400 return values.size();
403 inline typename Index::key_type &
406 return values[i].first.get();
409 inline typename Index::value_type &
412 return values[i].second;
417 private_::container<typename Index::key_type, !ignore_key>,
418 typename Index::value_type> kv_pair;
419 typename util::vec<kv_pair, N>::type values;
422 #endif /* _NDB_BENCH_H_ */