13 #include "../macros.h"
14 #include "../varkey.h"
15 #include "../thread.h"
17 #include "../spinbarrier.h"
26 static const size_t YCSBRecordSize = 100;
29 // we're missing remove for now
30 // the default is a modification of YCSB "A" we made (80/20 R/W)
31 static unsigned g_txn_workload_mix[] = { 80, 20, 0, 0 };
33 class ycsb_worker : public bench_worker {
35 ycsb_worker(unsigned int worker_id,
36 unsigned long seed, abstract_db *db,
37 const map<string, abstract_ordered_index *> &open_tables,
38 spin_barrier *barrier_a, spin_barrier *barrier_b)
39 : bench_worker(worker_id, true, seed, db,
40 open_tables, barrier_a, barrier_b),
41 tbl(open_tables.at("USERTABLE")),
44 obj_key0.reserve(str_arena::MinStrReserveLength);
45 obj_key1.reserve(str_arena::MinStrReserveLength);
46 obj_v.reserve(str_arena::MinStrReserveLength);
52 void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_GET_PUT);
53 scoped_str_arena s_arena(arena);
55 const uint64_t k = r.next() % nkeys;
56 ALWAYS_ASSERT(tbl->get(txn, u64_varkey(k).str(obj_key0), obj_v));
57 computation_n += obj_v.size();
58 measure_txn_counters(txn, "txn_read");
59 if (likely(db->commit_txn(txn)))
60 return txn_result(true, 0);
61 } catch (abstract_db::abstract_abort_exception &ex) {
64 return txn_result(false, 0);
68 TxnRead(bench_worker *w)
70 return static_cast<ycsb_worker *>(w)->txn_read();
76 void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_GET_PUT);
77 scoped_str_arena s_arena(arena);
79 tbl->put(txn, u64_varkey(r.next() % nkeys).str(str()), str().assign(YCSBRecordSize, 'b'));
80 measure_txn_counters(txn, "txn_write");
81 if (likely(db->commit_txn(txn)))
82 return txn_result(true, 0);
83 } catch (abstract_db::abstract_abort_exception &ex) {
86 return txn_result(false, 0);
90 TxnWrite(bench_worker *w)
92 return static_cast<ycsb_worker *>(w)->txn_write();
98 void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_RMW);
99 scoped_str_arena s_arena(arena);
101 const uint64_t key = r.next() % nkeys;
102 ALWAYS_ASSERT(tbl->get(txn, u64_varkey(key).str(obj_key0), obj_v));
103 computation_n += obj_v.size();
104 tbl->put(txn, obj_key0, str().assign(YCSBRecordSize, 'c'));
105 measure_txn_counters(txn, "txn_rmw");
106 if (likely(db->commit_txn(txn)))
107 return txn_result(true, 0);
108 } catch (abstract_db::abstract_abort_exception &ex) {
111 return txn_result(false, 0);
115 TxnRmw(bench_worker *w)
117 return static_cast<ycsb_worker *>(w)->txn_rmw();
120 class worker_scan_callback : public abstract_ordered_index::scan_callback {
122 worker_scan_callback() : n(0) {}
124 invoke(const char *, size_t, const string &value)
135 void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_SCAN);
136 scoped_str_arena s_arena(arena);
137 const size_t kstart = r.next() % nkeys;
138 const string &kbegin = u64_varkey(kstart).str(obj_key0);
139 const string &kend = u64_varkey(kstart + 100).str(obj_key1);
140 worker_scan_callback c;
142 tbl->scan(txn, kbegin, &kend, c);
143 computation_n += c.n;
144 measure_txn_counters(txn, "txn_scan");
145 if (likely(db->commit_txn(txn)))
146 return txn_result(true, 0);
147 } catch (abstract_db::abstract_abort_exception &ex) {
150 return txn_result(false, 0);
154 TxnScan(bench_worker *w)
156 return static_cast<ycsb_worker *>(w)->txn_scan();
159 virtual workload_desc_vec
162 //w.push_back(workload_desc("Read", 0.95, TxnRead));
163 //w.push_back(workload_desc("ReadModifyWrite", 0.04, TxnRmw));
164 //w.push_back(workload_desc("Write", 0.01, TxnWrite));
166 //w.push_back(workload_desc("Read", 1.0, TxnRead));
167 //w.push_back(workload_desc("Write", 1.0, TxnWrite));
169 // YCSB workload "A" - 50/50 read/write
170 //w.push_back(workload_desc("Read", 0.5, TxnRead));
171 //w.push_back(workload_desc("Write", 0.5, TxnWrite));
173 // YCSB workload custom - 80/20 read/write
174 //w.push_back(workload_desc("Read", 0.8, TxnRead));
175 //w.push_back(workload_desc("Write", 0.2, TxnWrite));
179 for (size_t i = 0; i < ARRAY_NELEMS(g_txn_workload_mix); i++)
180 m += g_txn_workload_mix[i];
181 ALWAYS_ASSERT(m == 100);
182 if (g_txn_workload_mix[0])
183 w.push_back(workload_desc("Read", double(g_txn_workload_mix[0])/100.0, TxnRead));
184 if (g_txn_workload_mix[1])
185 w.push_back(workload_desc("Write", double(g_txn_workload_mix[1])/100.0, TxnWrite));
186 if (g_txn_workload_mix[2])
187 w.push_back(workload_desc("ReadModifyWrite", double(g_txn_workload_mix[2])/100.0, TxnRmw));
188 if (g_txn_workload_mix[3])
189 w.push_back(workload_desc("Scan", double(g_txn_workload_mix[3])/100.0, TxnScan));
196 on_run_setup() OVERRIDE
200 const size_t a = worker_id % coreid::num_cpus_online();
201 const size_t b = a % nthreads;
202 rcu::s_instance.pin_current_thread(b);
205 inline ALWAYS_INLINE string &
207 return *arena.next();
211 abstract_ordered_index *tbl;
217 uint64_t computation_n;
226 abstract_ordered_index *tbl,
232 ALWAYS_ASSERT(pinid < nthreads);
233 rcu::s_instance.pin_current_thread(pinid);
234 rcu::s_instance.fault_region();
237 const size_t batchsize = (db->txn_max_batch_size() == -1) ?
238 10000 : db->txn_max_batch_size();
239 ALWAYS_ASSERT(batchsize > 0);
240 const size_t nkeys = keyend - keystart;
241 ALWAYS_ASSERT(nkeys > 0);
242 const size_t nbatches = nkeys < batchsize ? 1 : (nkeys / batchsize);
243 for (size_t batchid = 0; batchid < nbatches;) {
244 scoped_str_arena s_arena(arena);
245 void * const txn = db->new_txn(txn_flags, arena, txn_buf);
247 const size_t rend = (batchid + 1 == nbatches) ?
248 keyend : keystart + ((batchid + 1) * batchsize);
249 for (size_t i = batchid * batchsize + keystart; i < rend; i++) {
250 ALWAYS_ASSERT(i >= keystart && i < keyend);
251 const string k = u64_varkey(i).str();
252 const string v(YCSBRecordSize, 'a');
253 tbl->insert(txn, k, v);
255 if (db->commit_txn(txn))
259 } catch (abstract_db::abstract_abort_exception &ex) {
264 cerr << "[INFO] finished loading USERTABLE range [kstart="
265 << keystart << ", kend=" << keyend << ") - nkeys: " << nkeys << endl;
268 class ycsb_usertable_loader : public bench_loader {
270 ycsb_usertable_loader(unsigned long seed,
272 const map<string, abstract_ordered_index *> &open_tables)
273 : bench_loader(seed, db, open_tables)
280 abstract_ordered_index *tbl = open_tables.at("USERTABLE");
281 const size_t nkeysperthd = nkeys / nthreads;
282 for (size_t i = 0; i < nthreads; i++) {
283 const size_t keystart = i * nkeysperthd;
284 const size_t keyend = min((i + 1) * nkeysperthd, nkeys);
298 class ycsb_parallel_usertable_loader : public bench_loader {
300 ycsb_parallel_usertable_loader(unsigned long seed,
302 const map<string, abstract_ordered_index *> &open_tables,
306 : bench_loader(seed, db, open_tables),
307 pinid(pinid), keystart(keystart), keyend(keyend)
309 INVARIANT(keyend > keystart);
311 cerr << "[INFO] YCSB par loader cpu " << pinid
312 << " [" << keystart << ", " << keyend << ")" << endl;
319 abstract_ordered_index *tbl = open_tables.at("USERTABLE");
338 class ycsb_bench_runner : public bench_runner {
340 ycsb_bench_runner(abstract_db *db)
343 open_tables["USERTABLE"] = db->open_index("USERTABLE", YCSBRecordSize);
347 virtual vector<bench_loader *>
350 vector<bench_loader *> ret;
351 const unsigned long ncpus = coreid::num_cpus_online();
352 if (enable_parallel_loading && nkeys >= nthreads) {
353 // divide the key space amongst all the loaders
354 const size_t nkeysperloader = nkeys / ncpus;
355 if (nthreads > ncpus) {
356 for (size_t i = 0; i < ncpus; i++) {
357 const uint64_t kend = (i + 1 == ncpus) ?
358 nkeys : (i + 1) * nkeysperloader;
360 new ycsb_parallel_usertable_loader(
361 0, db, open_tables, i,
362 i * nkeysperloader, kend));
365 // load balance the loaders amongst numa nodes in RR fashion
367 // XXX: here we hardcode an assumption about the NUMA topology of
369 const vector<unsigned> numa_nodes_used = get_numa_nodes_used(nthreads);
371 // assign loaders to cores based on numa node assignment in RR fashion
372 const unsigned loaders_per_node = ncpus / numa_nodes_used.size();
374 vector<unsigned> node_allocations(numa_nodes_used.size(), loaders_per_node);
377 i < (ncpus - loaders_per_node * numa_nodes_used.size());
379 node_allocations[i]++;
382 for (size_t i = 0; i < numa_nodes_used.size(); i++) {
383 // allocate loaders_per_node loaders to this numa node
384 const vector<unsigned> cpus = numa_node_to_cpus(numa_nodes_used[i]);
385 const vector<unsigned> cpus_avail = exclude(cpus, nthreads);
386 const unsigned nloaders = node_allocations[i];
387 for (size_t j = 0; j < nloaders; j++, loader_i++) {
388 const uint64_t kend = (loader_i + 1 == ncpus) ?
389 nkeys : (loader_i + 1) * nkeysperloader;
391 new ycsb_parallel_usertable_loader(
392 0, db, open_tables, cpus_avail[j % cpus_avail.size()],
393 loader_i * nkeysperloader, kend));
398 ret.push_back(new ycsb_usertable_loader(0, db, open_tables));
403 virtual vector<bench_worker *>
406 const unsigned alignment = coreid::num_cpus_online();
407 const int blockstart =
408 coreid::allocate_contiguous_aligned_block(nthreads, alignment);
409 ALWAYS_ASSERT(blockstart >= 0);
410 ALWAYS_ASSERT((blockstart % alignment) == 0);
411 fast_random r(8544290);
412 vector<bench_worker *> ret;
413 for (size_t i = 0; i < nthreads; i++)
416 blockstart + i, r.next(), db, open_tables,
417 &barrier_a, &barrier_b));
423 static vector<unsigned>
424 get_numa_nodes_used(unsigned nthds)
426 // assuming CPUs [0, nthds) are used, what are all the
427 // NUMA nodes touched by [0, nthds)
429 for (unsigned i = 0; i < nthds; i++) {
430 const int node = numa_node_of_cpu(i);
431 ALWAYS_ASSERT(node >= 0);
434 return vector<unsigned>(ret.begin(), ret.end());
437 static vector<unsigned>
438 numa_node_to_cpus(unsigned node)
440 struct bitmask *bm = numa_allocate_cpumask();
441 ALWAYS_ASSERT(!::numa_node_to_cpus(node, bm));
442 vector<unsigned> ret;
443 for (int i = 0; i < numa_num_configured_cpus(); i++)
444 if (numa_bitmask_isbitset(bm, i))
446 numa_free_cpumask(bm);
450 static vector<unsigned>
451 exclude(const vector<unsigned> &cpus, unsigned nthds)
453 vector<unsigned> ret;
463 ycsb_do_test(abstract_db *db, int argc, char **argv)
465 nkeys = size_t(scale_factor * 1000.0);
466 ALWAYS_ASSERT(nkeys > 0);
471 static struct option long_options[] = {
472 {"workload-mix" , required_argument , 0 , 'w'},
475 int option_index = 0;
476 int c = getopt_long(argc, argv, "w:", long_options, &option_index);
481 if (long_options[option_index].flag != 0)
488 const vector<string> toks = split(optarg, ',');
489 ALWAYS_ASSERT(toks.size() == ARRAY_NELEMS(g_txn_workload_mix));
491 for (size_t i = 0; i < toks.size(); i++) {
492 unsigned p = strtoul(toks[i].c_str(), nullptr, 10);
493 ALWAYS_ASSERT(p >= 0 && p <= 100);
495 g_txn_workload_mix[i] = p;
497 ALWAYS_ASSERT(s == 100);
502 /* getopt_long already printed an error message. */
511 cerr << "ycsb settings:" << endl;
512 cerr << " workload_mix: "
513 << format_list(g_txn_workload_mix, g_txn_workload_mix + ARRAY_NELEMS(g_txn_workload_mix))
517 ycsb_bench_runner r(db);