80313e18cbdca8a2a6d49484e90229fbc47c2c1f
[c11concurrency-benchmarks.git] / silo / benchmarks / ycsb.cc
1 #include <iostream>
2 #include <sstream>
3 #include <vector>
4 #include <utility>
5 #include <string>
6 #include <set>
7
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <getopt.h>
11 #include <numa.h>
12
13 #include "../macros.h"
14 #include "../varkey.h"
15 #include "../thread.h"
16 #include "../util.h"
17 #include "../spinbarrier.h"
18 #include "../core.h"
19
20 #include "bench.h"
21
22 using namespace std;
23 using namespace util;
24
25 static size_t nkeys;
26 static const size_t YCSBRecordSize = 100;
27
28 // [R, W, RMW, Scan]
29 // we're missing remove for now
30 // the default is a modification of YCSB "A" we made (80/20 R/W)
31 static unsigned g_txn_workload_mix[] = { 80, 20, 0, 0 };
32
33 class ycsb_worker : public bench_worker {
34 public:
35   ycsb_worker(unsigned int worker_id,
36               unsigned long seed, abstract_db *db,
37               const map<string, abstract_ordered_index *> &open_tables,
38               spin_barrier *barrier_a, spin_barrier *barrier_b)
39     : bench_worker(worker_id, true, seed, db,
40                    open_tables, barrier_a, barrier_b),
41       tbl(open_tables.at("USERTABLE")),
42       computation_n(0)
43   {
44     obj_key0.reserve(str_arena::MinStrReserveLength);
45     obj_key1.reserve(str_arena::MinStrReserveLength);
46     obj_v.reserve(str_arena::MinStrReserveLength);
47   }
48
49   txn_result
50   txn_read()
51   {
52     void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_GET_PUT);
53     scoped_str_arena s_arena(arena);
54     try {
55       const uint64_t k = r.next() % nkeys;
56       ALWAYS_ASSERT(tbl->get(txn, u64_varkey(k).str(obj_key0), obj_v));
57       computation_n += obj_v.size();
58       measure_txn_counters(txn, "txn_read");
59       if (likely(db->commit_txn(txn)))
60         return txn_result(true, 0);
61     } catch (abstract_db::abstract_abort_exception &ex) {
62       db->abort_txn(txn);
63     }
64     return txn_result(false, 0);
65   }
66
67   static txn_result
68   TxnRead(bench_worker *w)
69   {
70     return static_cast<ycsb_worker *>(w)->txn_read();
71   }
72
73   txn_result
74   txn_write()
75   {
76     void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_GET_PUT);
77     scoped_str_arena s_arena(arena);
78     try {
79       tbl->put(txn, u64_varkey(r.next() % nkeys).str(str()), str().assign(YCSBRecordSize, 'b'));
80       measure_txn_counters(txn, "txn_write");
81       if (likely(db->commit_txn(txn)))
82         return txn_result(true, 0);
83     } catch (abstract_db::abstract_abort_exception &ex) {
84       db->abort_txn(txn);
85     }
86     return txn_result(false, 0);
87   }
88
89   static txn_result
90   TxnWrite(bench_worker *w)
91   {
92     return static_cast<ycsb_worker *>(w)->txn_write();
93   }
94
95   txn_result
96   txn_rmw()
97   {
98     void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_RMW);
99     scoped_str_arena s_arena(arena);
100     try {
101       const uint64_t key = r.next() % nkeys;
102       ALWAYS_ASSERT(tbl->get(txn, u64_varkey(key).str(obj_key0), obj_v));
103       computation_n += obj_v.size();
104       tbl->put(txn, obj_key0, str().assign(YCSBRecordSize, 'c'));
105       measure_txn_counters(txn, "txn_rmw");
106       if (likely(db->commit_txn(txn)))
107         return txn_result(true, 0);
108     } catch (abstract_db::abstract_abort_exception &ex) {
109       db->abort_txn(txn);
110     }
111     return txn_result(false, 0);
112   }
113
114   static txn_result
115   TxnRmw(bench_worker *w)
116   {
117     return static_cast<ycsb_worker *>(w)->txn_rmw();
118   }
119
120   class worker_scan_callback : public abstract_ordered_index::scan_callback {
121   public:
122     worker_scan_callback() : n(0) {}
123     virtual bool
124     invoke(const char *, size_t, const string &value)
125     {
126       n += value.size();
127       return true;
128     }
129     size_t n;
130   };
131
132   txn_result
133   txn_scan()
134   {
135     void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_SCAN);
136     scoped_str_arena s_arena(arena);
137     const size_t kstart = r.next() % nkeys;
138     const string &kbegin = u64_varkey(kstart).str(obj_key0);
139     const string &kend = u64_varkey(kstart + 100).str(obj_key1);
140     worker_scan_callback c;
141     try {
142       tbl->scan(txn, kbegin, &kend, c);
143       computation_n += c.n;
144       measure_txn_counters(txn, "txn_scan");
145       if (likely(db->commit_txn(txn)))
146         return txn_result(true, 0);
147     } catch (abstract_db::abstract_abort_exception &ex) {
148       db->abort_txn(txn);
149     }
150     return txn_result(false, 0);
151   }
152
153   static txn_result
154   TxnScan(bench_worker *w)
155   {
156     return static_cast<ycsb_worker *>(w)->txn_scan();
157   }
158
159   virtual workload_desc_vec
160   get_workload() const
161   {
162     //w.push_back(workload_desc("Read", 0.95, TxnRead));
163     //w.push_back(workload_desc("ReadModifyWrite", 0.04, TxnRmw));
164     //w.push_back(workload_desc("Write", 0.01, TxnWrite));
165
166     //w.push_back(workload_desc("Read", 1.0, TxnRead));
167     //w.push_back(workload_desc("Write", 1.0, TxnWrite));
168
169     // YCSB workload "A" - 50/50 read/write
170     //w.push_back(workload_desc("Read", 0.5, TxnRead));
171     //w.push_back(workload_desc("Write", 0.5, TxnWrite));
172
173     // YCSB workload custom - 80/20 read/write
174     //w.push_back(workload_desc("Read",  0.8, TxnRead));
175     //w.push_back(workload_desc("Write", 0.2, TxnWrite));
176
177     workload_desc_vec w;
178     unsigned m = 0;
179     for (size_t i = 0; i < ARRAY_NELEMS(g_txn_workload_mix); i++)
180       m += g_txn_workload_mix[i];
181     ALWAYS_ASSERT(m == 100);
182     if (g_txn_workload_mix[0])
183       w.push_back(workload_desc("Read",  double(g_txn_workload_mix[0])/100.0, TxnRead));
184     if (g_txn_workload_mix[1])
185       w.push_back(workload_desc("Write",  double(g_txn_workload_mix[1])/100.0, TxnWrite));
186     if (g_txn_workload_mix[2])
187       w.push_back(workload_desc("ReadModifyWrite",  double(g_txn_workload_mix[2])/100.0, TxnRmw));
188     if (g_txn_workload_mix[3])
189       w.push_back(workload_desc("Scan",  double(g_txn_workload_mix[3])/100.0, TxnScan));
190     return w;
191   }
192
193 protected:
194
195   virtual void
196   on_run_setup() OVERRIDE
197   {
198     if (!pin_cpus)
199       return;
200     const size_t a = worker_id % coreid::num_cpus_online();
201     const size_t b = a % nthreads;
202     rcu::s_instance.pin_current_thread(b);
203   }
204
205   inline ALWAYS_INLINE string &
206   str() {
207     return *arena.next();
208   }
209
210 private:
211   abstract_ordered_index *tbl;
212
213   string obj_key0;
214   string obj_key1;
215   string obj_v;
216
217   uint64_t computation_n;
218 };
219
220 static void
221 ycsb_load_keyrange(
222     uint64_t keystart,
223     uint64_t keyend,
224     unsigned int pinid,
225     abstract_db *db,
226     abstract_ordered_index *tbl,
227     str_arena &arena,
228     uint64_t txn_flags,
229     void *txn_buf)
230 {
231   if (pin_cpus) {
232     ALWAYS_ASSERT(pinid < nthreads);
233     rcu::s_instance.pin_current_thread(pinid);
234     rcu::s_instance.fault_region();
235   }
236
237   const size_t batchsize = (db->txn_max_batch_size() == -1) ?
238     10000 : db->txn_max_batch_size();
239   ALWAYS_ASSERT(batchsize > 0);
240   const size_t nkeys = keyend - keystart;
241   ALWAYS_ASSERT(nkeys > 0);
242   const size_t nbatches = nkeys < batchsize ? 1 : (nkeys / batchsize);
243   for (size_t batchid = 0; batchid < nbatches;) {
244     scoped_str_arena s_arena(arena);
245     void * const txn = db->new_txn(txn_flags, arena, txn_buf);
246     try {
247       const size_t rend = (batchid + 1 == nbatches) ?
248         keyend : keystart + ((batchid + 1) * batchsize);
249       for (size_t i = batchid * batchsize + keystart; i < rend; i++) {
250         ALWAYS_ASSERT(i >= keystart && i < keyend);
251         const string k = u64_varkey(i).str();
252         const string v(YCSBRecordSize, 'a');
253         tbl->insert(txn, k, v);
254       }
255       if (db->commit_txn(txn))
256         batchid++;
257       else
258         db->abort_txn(txn);
259     } catch (abstract_db::abstract_abort_exception &ex) {
260       db->abort_txn(txn);
261     }
262   }
263   if (verbose)
264     cerr << "[INFO] finished loading USERTABLE range [kstart="
265       << keystart << ", kend=" << keyend << ") - nkeys: " << nkeys << endl;
266 }
267
268 class ycsb_usertable_loader : public bench_loader {
269 public:
270   ycsb_usertable_loader(unsigned long seed,
271                         abstract_db *db,
272                         const map<string, abstract_ordered_index *> &open_tables)
273     : bench_loader(seed, db, open_tables)
274   {}
275
276 protected:
277   virtual void
278   load()
279   {
280     abstract_ordered_index *tbl = open_tables.at("USERTABLE");
281     const size_t nkeysperthd = nkeys / nthreads;
282     for (size_t i = 0; i < nthreads; i++) {
283       const size_t keystart = i * nkeysperthd;
284       const size_t keyend = min((i + 1) * nkeysperthd, nkeys);
285       ycsb_load_keyrange(
286           keystart,
287           keyend,
288           i,
289           db,
290           tbl,
291           arena,
292           txn_flags,
293           txn_buf());
294     }
295   }
296 };
297
298 class ycsb_parallel_usertable_loader : public bench_loader {
299 public:
300   ycsb_parallel_usertable_loader(unsigned long seed,
301                                  abstract_db *db,
302                                  const map<string, abstract_ordered_index *> &open_tables,
303                                  unsigned int pinid,
304                                  uint64_t keystart,
305                                  uint64_t keyend)
306     : bench_loader(seed, db, open_tables),
307       pinid(pinid), keystart(keystart), keyend(keyend)
308   {
309     INVARIANT(keyend > keystart);
310     if (verbose)
311       cerr << "[INFO] YCSB par loader cpu " << pinid
312            << " [" << keystart << ", " << keyend << ")" << endl;
313   }
314
315 protected:
316   virtual void
317   load()
318   {
319     abstract_ordered_index *tbl = open_tables.at("USERTABLE");
320     ycsb_load_keyrange(
321         keystart,
322         keyend,
323         pinid,
324         db,
325         tbl,
326         arena,
327         txn_flags,
328         txn_buf());
329   }
330
331 private:
332   unsigned int pinid;
333   uint64_t keystart;
334   uint64_t keyend;
335 };
336
337
338 class ycsb_bench_runner : public bench_runner {
339 public:
340   ycsb_bench_runner(abstract_db *db)
341     : bench_runner(db)
342   {
343     open_tables["USERTABLE"] = db->open_index("USERTABLE", YCSBRecordSize);
344   }
345
346 protected:
347   virtual vector<bench_loader *>
348   make_loaders()
349   {
350     vector<bench_loader *> ret;
351     const unsigned long ncpus = coreid::num_cpus_online();
352     if (enable_parallel_loading && nkeys >= nthreads) {
353       // divide the key space amongst all the loaders
354       const size_t nkeysperloader = nkeys / ncpus;
355       if (nthreads > ncpus) {
356         for (size_t i = 0; i < ncpus; i++) {
357           const uint64_t kend = (i + 1 == ncpus) ?
358             nkeys : (i + 1) * nkeysperloader;
359           ret.push_back(
360               new ycsb_parallel_usertable_loader(
361                 0, db, open_tables, i,
362                 i * nkeysperloader, kend));
363         }
364       } else {
365         // load balance the loaders amongst numa nodes in RR fashion
366         //
367         // XXX: here we hardcode an assumption about the NUMA topology of
368         // the system
369         const vector<unsigned> numa_nodes_used = get_numa_nodes_used(nthreads);
370
371         // assign loaders to cores based on numa node assignment in RR fashion
372         const unsigned loaders_per_node = ncpus / numa_nodes_used.size();
373
374         vector<unsigned> node_allocations(numa_nodes_used.size(), loaders_per_node);
375         // RR the remaining
376         for (unsigned i = 0;
377              i < (ncpus - loaders_per_node * numa_nodes_used.size());
378              i++)
379           node_allocations[i]++;
380
381         size_t loader_i = 0;
382         for (size_t i = 0; i < numa_nodes_used.size(); i++) {
383           // allocate loaders_per_node loaders to this numa node
384           const vector<unsigned> cpus = numa_node_to_cpus(numa_nodes_used[i]);
385           const vector<unsigned> cpus_avail = exclude(cpus, nthreads);
386           const unsigned nloaders = node_allocations[i];
387           for (size_t j = 0; j < nloaders; j++, loader_i++) {
388             const uint64_t kend = (loader_i + 1 == ncpus) ?
389               nkeys : (loader_i + 1) * nkeysperloader;
390             ret.push_back(
391                 new ycsb_parallel_usertable_loader(
392                   0, db, open_tables, cpus_avail[j % cpus_avail.size()],
393                   loader_i * nkeysperloader, kend));
394           }
395         }
396       }
397     } else {
398       ret.push_back(new ycsb_usertable_loader(0, db, open_tables));
399     }
400     return ret;
401   }
402
403   virtual vector<bench_worker *>
404   make_workers()
405   {
406     const unsigned alignment = coreid::num_cpus_online();
407     const int blockstart =
408       coreid::allocate_contiguous_aligned_block(nthreads, alignment);
409     ALWAYS_ASSERT(blockstart >= 0);
410     ALWAYS_ASSERT((blockstart % alignment) == 0);
411     fast_random r(8544290);
412     vector<bench_worker *> ret;
413     for (size_t i = 0; i < nthreads; i++)
414       ret.push_back(
415         new ycsb_worker(
416           blockstart + i, r.next(), db, open_tables,
417           &barrier_a, &barrier_b));
418     return ret;
419   }
420
421 private:
422
423   static vector<unsigned>
424   get_numa_nodes_used(unsigned nthds)
425   {
426     // assuming CPUs [0, nthds) are used, what are all the
427     // NUMA nodes touched by [0, nthds)
428     set<unsigned> ret;
429     for (unsigned i = 0; i < nthds; i++) {
430       const int node = numa_node_of_cpu(i);
431       ALWAYS_ASSERT(node >= 0);
432       ret.insert(node);
433     }
434     return vector<unsigned>(ret.begin(), ret.end());
435   }
436
437   static vector<unsigned>
438   numa_node_to_cpus(unsigned node)
439   {
440     struct bitmask *bm = numa_allocate_cpumask();
441     ALWAYS_ASSERT(!::numa_node_to_cpus(node, bm));
442     vector<unsigned> ret;
443     for (int i = 0; i < numa_num_configured_cpus(); i++)
444       if (numa_bitmask_isbitset(bm, i))
445         ret.push_back(i);
446     numa_free_cpumask(bm);
447     return ret;
448   }
449
450   static vector<unsigned>
451   exclude(const vector<unsigned> &cpus, unsigned nthds)
452   {
453     vector<unsigned> ret;
454     for (auto n : cpus)
455       if (n < nthds)
456         ret.push_back(n);
457     return ret;
458   }
459
460 };
461
462 void
463 ycsb_do_test(abstract_db *db, int argc, char **argv)
464 {
465   nkeys = size_t(scale_factor * 1000.0);
466   ALWAYS_ASSERT(nkeys > 0);
467
468   // parse options
469   optind = 1;
470   while (1) {
471     static struct option long_options[] = {
472       {"workload-mix" , required_argument , 0 , 'w'},
473       {0, 0, 0, 0}
474     };
475     int option_index = 0;
476     int c = getopt_long(argc, argv, "w:", long_options, &option_index);
477     if (c == -1)
478       break;
479     switch (c) {
480     case 0:
481       if (long_options[option_index].flag != 0)
482         break;
483       abort();
484       break;
485
486     case 'w':
487       {
488         const vector<string> toks = split(optarg, ',');
489         ALWAYS_ASSERT(toks.size() == ARRAY_NELEMS(g_txn_workload_mix));
490         unsigned s = 0;
491         for (size_t i = 0; i < toks.size(); i++) {
492           unsigned p = strtoul(toks[i].c_str(), nullptr, 10);
493           ALWAYS_ASSERT(p >= 0 && p <= 100);
494           s += p;
495           g_txn_workload_mix[i] = p;
496         }
497         ALWAYS_ASSERT(s == 100);
498       }
499       break;
500
501     case '?':
502       /* getopt_long already printed an error message. */
503       exit(1);
504
505     default:
506       abort();
507     }
508   }
509
510   if (verbose) {
511     cerr << "ycsb settings:" << endl;
512     cerr << "  workload_mix: "
513          << format_list(g_txn_workload_mix, g_txn_workload_mix + ARRAY_NELEMS(g_txn_workload_mix))
514          << endl;
515   }
516
517   ycsb_bench_runner r(db);
518   r.run();
519 }