11 #include <sys/sysinfo.h>
15 #include "../counter.h"
16 #include "../scopedperf.hh"
17 #include "../allocator.h"
20 //cannot include this header b/c conflicts with malloc.h
21 //#include <jemalloc/jemalloc.h>
22 extern "C" void malloc_stats_print(void (*write_cb)(void *, const char *), void *cbopaque, const char *opts);
23 extern "C" int mallctl(const char *name, void *oldp, size_t *oldlenp, void *newp, size_t newlen);
26 #include <google/heap-profiler.h>
33 volatile bool running = true;
35 uint64_t txn_flags = 0;
36 double scale_factor = 1.0;
37 uint64_t runtime = 30;
38 uint64_t ops_per_worker = 0;
39 int run_mode = RUNMODE_TIME;
40 int enable_parallel_loading = false;
43 int retry_aborted_transaction = 0;
44 int no_reset_counters = 0;
45 int backoff_aborted_transaction = 0;
49 delete_pointers(const vector<T *> &pts)
51 for (size_t i = 0; i < pts.size(); i++)
57 elemwise_sum(const vector<T> &a, const vector<T> &b)
59 INVARIANT(a.size() == b.size());
60 vector<T> ret(a.size());
61 for (size_t i = 0; i < a.size(); i++)
66 template <typename K, typename V>
68 map_agg(map<K, V> &agg, const map<K, V> &m)
70 for (typename map<K, V>::const_iterator it = m.begin();
72 agg[it->first] += it->second;
75 // returns <free_bytes, total_bytes>
76 static pair<uint64_t, uint64_t>
77 get_system_memory_info()
81 return make_pair(inf.mem_unit * inf.freeram, inf.mem_unit * inf.totalram);
85 clear_file(const char *name)
93 write_cb(void *p, const char *s) UNUSED;
95 write_cb(void *p, const char *s)
97 const char *f = "jemalloc.stats";
98 static bool s_clear_file UNUSED = clear_file(f);
99 ofstream ofs(f, ofstream::app);
105 static event_avg_counter evt_avg_abort_spins("avg_abort_spins");
110 // XXX(stephentu): so many nasty hacks here. should actually
111 // fix some of this stuff one day
113 coreid::set_core_id(worker_id); // cringe
115 scoped_rcu_region r; // register this thread in rcu region
118 scoped_db_thread_ctx ctx(db, false);
119 const workload_desc_vec workload = get_workload();
120 txn_counts.resize(workload.size());
121 barrier_a->count_down();
122 barrier_b->wait_for();
123 while (running && (run_mode != RUNMODE_OPS || ntxn_commits < ops_per_worker)) {
124 double d = r.next_uniform();
125 for (size_t i = 0; i < workload.size(); i++) {
126 if ((i + 1) == workload.size() || d < workload[i].frequency) {
129 const unsigned long old_seed = r.get_seed();
130 const auto ret = workload[i].fn(this);
131 if (likely(ret.first)) {
133 latency_numer_us += t.lap();
134 backoff_shifts >>= 1;
137 if (retry_aborted_transaction && running) {
138 if (backoff_aborted_transaction) {
139 if (backoff_shifts < 63)
141 uint64_t spins = 1UL << backoff_shifts;
142 spins *= 100; // XXX: tuned pretty arbitrarily
143 evt_avg_abort_spins.offer(spins);
149 r.set_seed(old_seed);
153 size_delta += ret.second; // should be zero on abort
154 txn_counts[i]++; // txn_counts aren't used to compute throughput (is
155 // just an informative number to print to the console
159 d -= workload[i].frequency;
168 const vector<bench_loader *> loaders = make_loaders();
170 spin_barrier b(loaders.size());
171 const pair<uint64_t, uint64_t> mem_info_before = get_system_memory_info();
173 scoped_timer t("dataloading", verbose);
174 for (vector<bench_loader *>::const_iterator it = loaders.begin();
175 it != loaders.end(); ++it) {
176 (*it)->set_barrier(b);
179 for (vector<bench_loader *>::const_iterator it = loaders.begin();
180 it != loaders.end(); ++it)
183 const pair<uint64_t, uint64_t> mem_info_after = get_system_memory_info();
184 const int64_t delta = int64_t(mem_info_before.first) - int64_t(mem_info_after.first); // free mem
185 const double delta_mb = double(delta)/1048576.0;
187 cerr << "DB size: " << delta_mb << " MB" << endl;
190 db->do_txn_epoch_sync(); // also waits for worker threads to be persisted
192 const auto persisted_info = db->get_ntxn_persisted();
193 if (get<0>(persisted_info) != get<1>(persisted_info))
194 cerr << "ERROR: " << persisted_info << endl;
195 //ALWAYS_ASSERT(get<0>(persisted_info) == get<1>(persisted_info));
197 cerr << persisted_info << " txns persisted in loading phase" << endl;
199 db->reset_ntxn_persisted();
201 if (!no_reset_counters) {
202 event_counter::reset_all_counters(); // XXX: for now - we really should have a before/after loading
203 PERF_EXPR(scopedperf::perfsum_base::resetall());
206 const auto persisted_info = db->get_ntxn_persisted();
207 if (get<0>(persisted_info) != 0 ||
208 get<1>(persisted_info) != 0 ||
209 get<2>(persisted_info) != 0.0) {
210 cerr << persisted_info << endl;
211 ALWAYS_ASSERT(false);
215 map<string, size_t> table_sizes_before;
217 for (map<string, abstract_ordered_index *>::iterator it = open_tables.begin();
218 it != open_tables.end(); ++it) {
219 scoped_rcu_region guard;
220 const size_t s = it->second->size();
221 cerr << "table " << it->first << " size " << s << endl;
222 table_sizes_before[it->first] = s;
224 cerr << "starting benchmark..." << endl;
227 const pair<uint64_t, uint64_t> mem_info_before = get_system_memory_info();
229 const vector<bench_worker *> workers = make_workers();
230 ALWAYS_ASSERT(!workers.empty());
231 for (vector<bench_worker *>::const_iterator it = workers.begin();
232 it != workers.end(); ++it)
235 barrier_a.wait_for(); // wait for all threads to start up
237 barrier_b.count_down(); // bombs away!
238 if (run_mode == RUNMODE_TIME) {
242 __sync_synchronize();
243 for (size_t i = 0; i < nthreads; i++)
245 const unsigned long elapsed_nosync = t_nosync.lap();
246 db->do_txn_finish(); // waits for all worker txns to persist
247 size_t n_commits = 0;
249 uint64_t latency_numer_us = 0;
250 for (size_t i = 0; i < nthreads; i++) {
251 n_commits += workers[i]->get_ntxn_commits();
252 n_aborts += workers[i]->get_ntxn_aborts();
253 latency_numer_us += workers[i]->get_latency_numer_us();
255 const auto persisted_info = db->get_ntxn_persisted();
257 const unsigned long elapsed = t.lap(); // lap() must come after do_txn_finish(),
258 // because do_txn_finish() potentially
261 // various sanity checks
262 ALWAYS_ASSERT(get<0>(persisted_info) == get<1>(persisted_info));
263 // not == b/c persisted_info does not count read-only txns
264 ALWAYS_ASSERT(n_commits >= get<1>(persisted_info));
266 const double elapsed_nosync_sec = double(elapsed_nosync) / 1000000.0;
267 const double agg_nosync_throughput = double(n_commits) / elapsed_nosync_sec;
268 const double avg_nosync_per_core_throughput = agg_nosync_throughput / double(workers.size());
270 const double elapsed_sec = double(elapsed) / 1000000.0;
271 const double agg_throughput = double(n_commits) / elapsed_sec;
272 const double avg_per_core_throughput = agg_throughput / double(workers.size());
274 const double agg_abort_rate = double(n_aborts) / elapsed_sec;
275 const double avg_per_core_abort_rate = agg_abort_rate / double(workers.size());
277 // we can use n_commits here, because we explicitly wait for all txns
279 const double agg_persist_throughput = double(n_commits) / elapsed_sec;
280 const double avg_per_core_persist_throughput =
281 agg_persist_throughput / double(workers.size());
283 // XXX(stephentu): latency currently doesn't account for read-only txns
284 const double avg_latency_us =
285 double(latency_numer_us) / double(n_commits);
286 const double avg_latency_ms = avg_latency_us / 1000.0;
287 const double avg_persist_latency_ms =
288 get<2>(persisted_info) / 1000.0;
291 const pair<uint64_t, uint64_t> mem_info_after = get_system_memory_info();
292 const int64_t delta = int64_t(mem_info_before.first) - int64_t(mem_info_after.first); // free mem
293 const double delta_mb = double(delta)/1048576.0;
294 map<string, size_t> agg_txn_counts = workers[0]->get_txn_counts();
295 ssize_t size_delta = workers[0]->get_size_delta();
296 for (size_t i = 1; i < workers.size(); i++) {
297 map_agg(agg_txn_counts, workers[i]->get_txn_counts());
298 size_delta += workers[i]->get_size_delta();
300 const double size_delta_mb = double(size_delta)/1048576.0;
301 map<string, counter_data> ctrs = event_counter::get_all_counters();
303 cerr << "--- table statistics ---" << endl;
304 for (map<string, abstract_ordered_index *>::iterator it = open_tables.begin();
305 it != open_tables.end(); ++it) {
306 scoped_rcu_region guard;
307 const size_t s = it->second->size();
308 const ssize_t delta = ssize_t(s) - ssize_t(table_sizes_before[it->first]);
309 cerr << "table " << it->first << " size " << it->second->size();
311 cerr << " (" << delta << " records)" << endl;
313 cerr << " (+" << delta << " records)" << endl;
315 #ifdef ENABLE_BENCH_TXN_COUNTERS
316 cerr << "--- txn counter statistics ---" << endl;
318 // take from thread 0 for now
319 abstract_db::txn_counter_map agg = workers[0]->get_local_txn_counters();
320 for (auto &p : agg) {
321 cerr << p.first << ":" << endl;
322 for (auto &q : p.second)
323 cerr << " " << q.first << " : " << q.second << endl;
327 cerr << "--- benchmark statistics ---" << endl;
328 cerr << "runtime: " << elapsed_sec << " sec" << endl;
329 cerr << "memory delta: " << delta_mb << " MB" << endl;
330 cerr << "memory delta rate: " << (delta_mb / elapsed_sec) << " MB/sec" << endl;
331 cerr << "logical memory delta: " << size_delta_mb << " MB" << endl;
332 cerr << "logical memory delta rate: " << (size_delta_mb / elapsed_sec) << " MB/sec" << endl;
333 cerr << "agg_nosync_throughput: " << agg_nosync_throughput << " ops/sec" << endl;
334 cerr << "avg_nosync_per_core_throughput: " << avg_nosync_per_core_throughput << " ops/sec/core" << endl;
335 cerr << "agg_throughput: " << agg_throughput << " ops/sec" << endl;
336 cerr << "avg_per_core_throughput: " << avg_per_core_throughput << " ops/sec/core" << endl;
337 cerr << "agg_persist_throughput: " << agg_persist_throughput << " ops/sec" << endl;
338 cerr << "avg_per_core_persist_throughput: " << avg_per_core_persist_throughput << " ops/sec/core" << endl;
339 cerr << "avg_latency: " << avg_latency_ms << " ms" << endl;
340 cerr << "avg_persist_latency: " << avg_persist_latency_ms << " ms" << endl;
341 cerr << "agg_abort_rate: " << agg_abort_rate << " aborts/sec" << endl;
342 cerr << "avg_per_core_abort_rate: " << avg_per_core_abort_rate << " aborts/sec/core" << endl;
343 cerr << "txn breakdown: " << format_list(agg_txn_counts.begin(), agg_txn_counts.end()) << endl;
344 cerr << "--- system counters (for benchmark) ---" << endl;
345 for (map<string, counter_data>::iterator it = ctrs.begin();
346 it != ctrs.end(); ++it)
347 cerr << it->first << ": " << it->second << endl;
348 cerr << "--- perf counters (if enabled, for benchmark) ---" << endl;
349 PERF_EXPR(scopedperf::perfsum_base::printall());
350 cerr << "--- allocator stats ---" << endl;
351 ::allocator::DumpStats();
352 cerr << "---------------------------------------" << endl;
355 cerr << "dumping heap profile..." << endl;
356 mallctl("prof.dump", NULL, NULL, NULL, 0);
357 cerr << "printing jemalloc stats..." << endl;
358 malloc_stats_print(write_cb, NULL, "");
361 HeapProfilerDump("before-exit");
365 // output for plotting script
366 cout << agg_throughput << " "
367 << agg_persist_throughput << " "
368 << avg_latency_ms << " "
369 << avg_persist_latency_ms << " "
370 << agg_abort_rate << endl;
376 map<string, uint64_t> agg_stats;
377 for (map<string, abstract_ordered_index *>::iterator it = open_tables.begin();
378 it != open_tables.end(); ++it) {
379 map_agg(agg_stats, it->second->clear());
383 for (auto &p : agg_stats)
384 cerr << p.first << " : " << p.second << endl;
389 delete_pointers(loaders);
390 delete_pointers(workers);
393 template <typename K, typename V>
395 typedef map<K, V> map_type;
397 operator()(map_type &agg, const map_type &m) const
399 for (typename map_type::const_iterator it = m.begin();
401 agg[it->first] = std::max(agg[it->first], it->second);
405 //template <typename KOuter, typename KInner, typename VInner>
406 //struct map_maxer<KOuter, map<KInner, VInner>> {
407 // typedef map<KInner, VInner> inner_map_type;
408 // typedef map<KOuter, inner_map_type> map_type;
411 #ifdef ENABLE_BENCH_TXN_COUNTERS
413 bench_worker::measure_txn_counters(void *txn, const char *txn_name)
415 auto ret = db->get_txn_counters(txn);
416 map_maxer<string, uint64_t>()(local_txn_counters[txn_name], ret);
421 bench_worker::get_txn_counts() const
423 map<string, size_t> m;
424 const workload_desc_vec workload = get_workload();
425 for (size_t i = 0; i < txn_counts.size(); i++)
426 m[workload[i].name] = txn_counts[i];