a1c12bb7e61625f590f70096cdafcefc51e1b3a6
[c11concurrency-benchmarks.git] / silo / benchmarks / bench.cc
1 #include <iostream>
2 #include <fstream>
3 #include <sstream>
4 #include <vector>
5 #include <utility>
6 #include <string>
7
8 #include <stdlib.h>
9 #include <sched.h>
10 #include <unistd.h>
11 #include <sys/sysinfo.h>
12
13 #include "bench.h"
14
15 #include "../counter.h"
16 #include "../scopedperf.hh"
17 #include "../allocator.h"
18
19 #ifdef USE_JEMALLOC
20 //cannot include this header b/c conflicts with malloc.h
21 //#include <jemalloc/jemalloc.h>
22 extern "C" void malloc_stats_print(void (*write_cb)(void *, const char *), void *cbopaque, const char *opts);
23 extern "C" int mallctl(const char *name, void *oldp, size_t *oldlenp, void *newp, size_t newlen);
24 #endif
25 #ifdef USE_TCMALLOC
26 #include <google/heap-profiler.h>
27 #endif
28
29 using namespace std;
30 using namespace util;
31
32 size_t nthreads = 1;
33 volatile bool running = true;
34 int verbose = 0;
35 uint64_t txn_flags = 0;
36 double scale_factor = 1.0;
37 uint64_t runtime = 30;
38 uint64_t ops_per_worker = 0;
39 int run_mode = RUNMODE_TIME;
40 int enable_parallel_loading = false;
41 int pin_cpus = 0;
42 int slow_exit = 0;
43 int retry_aborted_transaction = 0;
44 int no_reset_counters = 0;
45 int backoff_aborted_transaction = 0;
46
47 template <typename T>
48 static void
49 delete_pointers(const vector<T *> &pts)
50 {
51   for (size_t i = 0; i < pts.size(); i++)
52     delete pts[i];
53 }
54
55 template <typename T>
56 static vector<T>
57 elemwise_sum(const vector<T> &a, const vector<T> &b)
58 {
59   INVARIANT(a.size() == b.size());
60   vector<T> ret(a.size());
61   for (size_t i = 0; i < a.size(); i++)
62     ret[i] = a[i] + b[i];
63   return ret;
64 }
65
66 template <typename K, typename V>
67 static void
68 map_agg(map<K, V> &agg, const map<K, V> &m)
69 {
70   for (typename map<K, V>::const_iterator it = m.begin();
71        it != m.end(); ++it)
72     agg[it->first] += it->second;
73 }
74
75 // returns <free_bytes, total_bytes>
76 static pair<uint64_t, uint64_t>
77 get_system_memory_info()
78 {
79   struct sysinfo inf;
80   sysinfo(&inf);
81   return make_pair(inf.mem_unit * inf.freeram, inf.mem_unit * inf.totalram);
82 }
83
84 static bool
85 clear_file(const char *name)
86 {
87   ofstream ofs(name);
88   ofs.close();
89   return true;
90 }
91
92 static void
93 write_cb(void *p, const char *s) UNUSED;
94 static void
95 write_cb(void *p, const char *s)
96 {
97   const char *f = "jemalloc.stats";
98   static bool s_clear_file UNUSED = clear_file(f);
99   ofstream ofs(f, ofstream::app);
100   ofs << s;
101   ofs.flush();
102   ofs.close();
103 }
104
105 static event_avg_counter evt_avg_abort_spins("avg_abort_spins");
106
107 void
108 bench_worker::run()
109 {
110   // XXX(stephentu): so many nasty hacks here. should actually
111   // fix some of this stuff one day
112   if (set_core_id)
113     coreid::set_core_id(worker_id); // cringe
114   {
115     scoped_rcu_region r; // register this thread in rcu region
116   }
117   on_run_setup();
118   scoped_db_thread_ctx ctx(db, false);
119   const workload_desc_vec workload = get_workload();
120   txn_counts.resize(workload.size());
121   barrier_a->count_down();
122   barrier_b->wait_for();
123   while (running && (run_mode != RUNMODE_OPS || ntxn_commits < ops_per_worker)) {
124     double d = r.next_uniform();
125     for (size_t i = 0; i < workload.size(); i++) {
126       if ((i + 1) == workload.size() || d < workload[i].frequency) {
127       retry:
128         timer t;
129         const unsigned long old_seed = r.get_seed();
130         const auto ret = workload[i].fn(this);
131         if (likely(ret.first)) {
132           ++ntxn_commits;
133           latency_numer_us += t.lap();
134           backoff_shifts >>= 1;
135         } else {
136           ++ntxn_aborts;
137           if (retry_aborted_transaction && running) {
138             if (backoff_aborted_transaction) {
139               if (backoff_shifts < 63)
140                 backoff_shifts++;
141               uint64_t spins = 1UL << backoff_shifts;
142               spins *= 100; // XXX: tuned pretty arbitrarily
143               evt_avg_abort_spins.offer(spins);
144               while (spins) {
145                 nop_pause();
146                 spins--;
147               }
148             }
149             r.set_seed(old_seed);
150             goto retry;
151           }
152         }
153         size_delta += ret.second; // should be zero on abort
154         txn_counts[i]++; // txn_counts aren't used to compute throughput (is
155                          // just an informative number to print to the console
156                          // in verbose mode)
157         break;
158       }
159       d -= workload[i].frequency;
160     }
161   }
162 }
163
164 void
165 bench_runner::run()
166 {
167   // load data
168   const vector<bench_loader *> loaders = make_loaders();
169   {
170     spin_barrier b(loaders.size());
171     const pair<uint64_t, uint64_t> mem_info_before = get_system_memory_info();
172     {
173       scoped_timer t("dataloading", verbose);
174       for (vector<bench_loader *>::const_iterator it = loaders.begin();
175           it != loaders.end(); ++it) {
176         (*it)->set_barrier(b);
177         (*it)->start();
178       }
179       for (vector<bench_loader *>::const_iterator it = loaders.begin();
180           it != loaders.end(); ++it)
181         (*it)->join();
182     }
183     const pair<uint64_t, uint64_t> mem_info_after = get_system_memory_info();
184     const int64_t delta = int64_t(mem_info_before.first) - int64_t(mem_info_after.first); // free mem
185     const double delta_mb = double(delta)/1048576.0;
186     if (verbose)
187       cerr << "DB size: " << delta_mb << " MB" << endl;
188   }
189
190   db->do_txn_epoch_sync(); // also waits for worker threads to be persisted
191   {
192     const auto persisted_info = db->get_ntxn_persisted();
193     if (get<0>(persisted_info) != get<1>(persisted_info))
194       cerr << "ERROR: " << persisted_info << endl;
195     //ALWAYS_ASSERT(get<0>(persisted_info) == get<1>(persisted_info));
196     if (verbose)
197       cerr << persisted_info << " txns persisted in loading phase" << endl;
198   }
199   db->reset_ntxn_persisted();
200
201   if (!no_reset_counters) {
202     event_counter::reset_all_counters(); // XXX: for now - we really should have a before/after loading
203     PERF_EXPR(scopedperf::perfsum_base::resetall());
204   }
205   {
206     const auto persisted_info = db->get_ntxn_persisted();
207     if (get<0>(persisted_info) != 0 ||
208         get<1>(persisted_info) != 0 ||
209         get<2>(persisted_info) != 0.0) {
210       cerr << persisted_info << endl;
211       ALWAYS_ASSERT(false);
212     }
213   }
214
215   map<string, size_t> table_sizes_before;
216   if (verbose) {
217     for (map<string, abstract_ordered_index *>::iterator it = open_tables.begin();
218          it != open_tables.end(); ++it) {
219       scoped_rcu_region guard;
220       const size_t s = it->second->size();
221       cerr << "table " << it->first << " size " << s << endl;
222       table_sizes_before[it->first] = s;
223     }
224     cerr << "starting benchmark..." << endl;
225   }
226
227   const pair<uint64_t, uint64_t> mem_info_before = get_system_memory_info();
228
229   const vector<bench_worker *> workers = make_workers();
230   ALWAYS_ASSERT(!workers.empty());
231   for (vector<bench_worker *>::const_iterator it = workers.begin();
232        it != workers.end(); ++it)
233     (*it)->start();
234
235   barrier_a.wait_for(); // wait for all threads to start up
236   timer t, t_nosync;
237   barrier_b.count_down(); // bombs away!
238   if (run_mode == RUNMODE_TIME) {
239     sleep(runtime);
240     running = false;
241   }
242   __sync_synchronize();
243   for (size_t i = 0; i < nthreads; i++)
244     workers[i]->join();
245   const unsigned long elapsed_nosync = t_nosync.lap();
246   db->do_txn_finish(); // waits for all worker txns to persist
247   size_t n_commits = 0;
248   size_t n_aborts = 0;
249   uint64_t latency_numer_us = 0;
250   for (size_t i = 0; i < nthreads; i++) {
251     n_commits += workers[i]->get_ntxn_commits();
252     n_aborts += workers[i]->get_ntxn_aborts();
253     latency_numer_us += workers[i]->get_latency_numer_us();
254   }
255   const auto persisted_info = db->get_ntxn_persisted();
256
257   const unsigned long elapsed = t.lap(); // lap() must come after do_txn_finish(),
258                                          // because do_txn_finish() potentially
259                                          // waits a bit
260
261   // various sanity checks
262   ALWAYS_ASSERT(get<0>(persisted_info) == get<1>(persisted_info));
263   // not == b/c persisted_info does not count read-only txns
264   ALWAYS_ASSERT(n_commits >= get<1>(persisted_info));
265
266   const double elapsed_nosync_sec = double(elapsed_nosync) / 1000000.0;
267   const double agg_nosync_throughput = double(n_commits) / elapsed_nosync_sec;
268   const double avg_nosync_per_core_throughput = agg_nosync_throughput / double(workers.size());
269
270   const double elapsed_sec = double(elapsed) / 1000000.0;
271   const double agg_throughput = double(n_commits) / elapsed_sec;
272   const double avg_per_core_throughput = agg_throughput / double(workers.size());
273
274   const double agg_abort_rate = double(n_aborts) / elapsed_sec;
275   const double avg_per_core_abort_rate = agg_abort_rate / double(workers.size());
276
277   // we can use n_commits here, because we explicitly wait for all txns
278   // run to be durable
279   const double agg_persist_throughput = double(n_commits) / elapsed_sec;
280   const double avg_per_core_persist_throughput =
281     agg_persist_throughput / double(workers.size());
282
283   // XXX(stephentu): latency currently doesn't account for read-only txns
284   const double avg_latency_us =
285     double(latency_numer_us) / double(n_commits);
286   const double avg_latency_ms = avg_latency_us / 1000.0;
287   const double avg_persist_latency_ms =
288     get<2>(persisted_info) / 1000.0;
289
290   if (verbose) {
291     const pair<uint64_t, uint64_t> mem_info_after = get_system_memory_info();
292     const int64_t delta = int64_t(mem_info_before.first) - int64_t(mem_info_after.first); // free mem
293     const double delta_mb = double(delta)/1048576.0;
294     map<string, size_t> agg_txn_counts = workers[0]->get_txn_counts();
295     ssize_t size_delta = workers[0]->get_size_delta();
296     for (size_t i = 1; i < workers.size(); i++) {
297       map_agg(agg_txn_counts, workers[i]->get_txn_counts());
298       size_delta += workers[i]->get_size_delta();
299     }
300     const double size_delta_mb = double(size_delta)/1048576.0;
301     map<string, counter_data> ctrs = event_counter::get_all_counters();
302
303     cerr << "--- table statistics ---" << endl;
304     for (map<string, abstract_ordered_index *>::iterator it = open_tables.begin();
305          it != open_tables.end(); ++it) {
306       scoped_rcu_region guard;
307       const size_t s = it->second->size();
308       const ssize_t delta = ssize_t(s) - ssize_t(table_sizes_before[it->first]);
309       cerr << "table " << it->first << " size " << it->second->size();
310       if (delta < 0)
311         cerr << " (" << delta << " records)" << endl;
312       else
313         cerr << " (+" << delta << " records)" << endl;
314     }
315 #ifdef ENABLE_BENCH_TXN_COUNTERS
316     cerr << "--- txn counter statistics ---" << endl;
317     {
318       // take from thread 0 for now
319       abstract_db::txn_counter_map agg = workers[0]->get_local_txn_counters();
320       for (auto &p : agg) {
321         cerr << p.first << ":" << endl;
322         for (auto &q : p.second)
323           cerr << "  " << q.first << " : " << q.second << endl;
324       }
325     }
326 #endif
327     cerr << "--- benchmark statistics ---" << endl;
328     cerr << "runtime: " << elapsed_sec << " sec" << endl;
329     cerr << "memory delta: " << delta_mb  << " MB" << endl;
330     cerr << "memory delta rate: " << (delta_mb / elapsed_sec)  << " MB/sec" << endl;
331     cerr << "logical memory delta: " << size_delta_mb << " MB" << endl;
332     cerr << "logical memory delta rate: " << (size_delta_mb / elapsed_sec) << " MB/sec" << endl;
333     cerr << "agg_nosync_throughput: " << agg_nosync_throughput << " ops/sec" << endl;
334     cerr << "avg_nosync_per_core_throughput: " << avg_nosync_per_core_throughput << " ops/sec/core" << endl;
335     cerr << "agg_throughput: " << agg_throughput << " ops/sec" << endl;
336     cerr << "avg_per_core_throughput: " << avg_per_core_throughput << " ops/sec/core" << endl;
337     cerr << "agg_persist_throughput: " << agg_persist_throughput << " ops/sec" << endl;
338     cerr << "avg_per_core_persist_throughput: " << avg_per_core_persist_throughput << " ops/sec/core" << endl;
339     cerr << "avg_latency: " << avg_latency_ms << " ms" << endl;
340     cerr << "avg_persist_latency: " << avg_persist_latency_ms << " ms" << endl;
341     cerr << "agg_abort_rate: " << agg_abort_rate << " aborts/sec" << endl;
342     cerr << "avg_per_core_abort_rate: " << avg_per_core_abort_rate << " aborts/sec/core" << endl;
343     cerr << "txn breakdown: " << format_list(agg_txn_counts.begin(), agg_txn_counts.end()) << endl;
344     cerr << "--- system counters (for benchmark) ---" << endl;
345     for (map<string, counter_data>::iterator it = ctrs.begin();
346          it != ctrs.end(); ++it)
347       cerr << it->first << ": " << it->second << endl;
348     cerr << "--- perf counters (if enabled, for benchmark) ---" << endl;
349     PERF_EXPR(scopedperf::perfsum_base::printall());
350     cerr << "--- allocator stats ---" << endl;
351     ::allocator::DumpStats();
352     cerr << "---------------------------------------" << endl;
353
354 #ifdef USE_JEMALLOC
355     cerr << "dumping heap profile..." << endl;
356     mallctl("prof.dump", NULL, NULL, NULL, 0);
357     cerr << "printing jemalloc stats..." << endl;
358     malloc_stats_print(write_cb, NULL, "");
359 #endif
360 #ifdef USE_TCMALLOC
361     HeapProfilerDump("before-exit");
362 #endif
363   }
364
365   // output for plotting script
366   cout << agg_throughput << " "
367        << agg_persist_throughput << " "
368        << avg_latency_ms << " "
369        << avg_persist_latency_ms << " "
370        << agg_abort_rate << endl;
371   cout.flush();
372
373   if (!slow_exit)
374     return;
375
376   map<string, uint64_t> agg_stats;
377   for (map<string, abstract_ordered_index *>::iterator it = open_tables.begin();
378        it != open_tables.end(); ++it) {
379     map_agg(agg_stats, it->second->clear());
380     delete it->second;
381   }
382   if (verbose) {
383     for (auto &p : agg_stats)
384       cerr << p.first << " : " << p.second << endl;
385
386   }
387   open_tables.clear();
388
389   delete_pointers(loaders);
390   delete_pointers(workers);
391 }
392
393 template <typename K, typename V>
394 struct map_maxer {
395   typedef map<K, V> map_type;
396   void
397   operator()(map_type &agg, const map_type &m) const
398   {
399     for (typename map_type::const_iterator it = m.begin();
400         it != m.end(); ++it)
401       agg[it->first] = std::max(agg[it->first], it->second);
402   }
403 };
404
405 //template <typename KOuter, typename KInner, typename VInner>
406 //struct map_maxer<KOuter, map<KInner, VInner>> {
407 //  typedef map<KInner, VInner> inner_map_type;
408 //  typedef map<KOuter, inner_map_type> map_type;
409 //};
410
411 #ifdef ENABLE_BENCH_TXN_COUNTERS
412 void
413 bench_worker::measure_txn_counters(void *txn, const char *txn_name)
414 {
415   auto ret = db->get_txn_counters(txn);
416   map_maxer<string, uint64_t>()(local_txn_counters[txn_name], ret);
417 }
418 #endif
419
420 map<string, size_t>
421 bench_worker::get_txn_counts() const
422 {
423   map<string, size_t> m;
424   const workload_desc_vec workload = get_workload();
425   for (size_t i = 0; i < txn_counts.size(); i++)
426     m[workload[i].name] = txn_counts[i];
427   return m;
428 }