benchmark silo added
[c11concurrency-benchmarks.git] / silo / new-benchmarks / tpcc.cc
1 /**
2  * An implementation of TPC-C based off of:
3  * https://github.com/oltpbenchmark/oltpbench/tree/master/src/com/oltpbenchmark/benchmarks/tpcc
4  */
5
6 #include <sys/time.h>
7 #include <string>
8 #include <ctype.h>
9 #include <stdlib.h>
10 #include <malloc.h>
11
12 #include <stdlib.h>
13 #include <unistd.h>
14 #include <getopt.h>
15
16 #include <set>
17 #include <vector>
18
19 #include "../txn.h"
20 #include "../macros.h"
21 #include "../scopedperf.hh"
22 #include "../spinlock.h"
23
24 #include "bench.h"
25 #include "tpcc.h"
26
27 #include "ndb_database.h"
28 #include "kvdb_database.h"
29
30 using namespace std;
31 using namespace util;
32
33 static inline ALWAYS_INLINE size_t
34 NumWarehouses()
35 {
36   return (size_t) scale_factor;
37 }
38
39 // config constants
40
41 static constexpr inline ALWAYS_INLINE size_t
42 NumItems()
43 {
44   return 100000;
45 }
46
47 static constexpr inline ALWAYS_INLINE size_t
48 NumDistrictsPerWarehouse()
49 {
50   return 10;
51 }
52
53 static constexpr inline ALWAYS_INLINE size_t
54 NumCustomersPerDistrict()
55 {
56   return 3000;
57 }
58
59 // T must implement lock()/unlock(). Both must *not* throw exceptions
60 template <typename T>
61 class scoped_multilock {
62 public:
63   inline scoped_multilock()
64     : did_lock(false)
65   {
66   }
67
68   inline ~scoped_multilock()
69   {
70     if (did_lock)
71       for (auto &t : locks)
72         t->unlock();
73   }
74
75   inline void
76   enq(T &t)
77   {
78     ALWAYS_ASSERT(!did_lock);
79     locks.emplace_back(&t);
80   }
81
82   inline void
83   multilock()
84   {
85     ALWAYS_ASSERT(!did_lock);
86     if (locks.size() > 1)
87       sort(locks.begin(), locks.end());
88 #ifdef CHECK_INVARIANTS
89     if (set<T *>(locks.begin(), locks.end()).size() != locks.size()) {
90       for (auto &t : locks)
91         cerr << "lock: " << hexify(t) << endl;
92       INVARIANT(false && "duplicate locks found");
93     }
94 #endif
95     for (auto &t : locks)
96       t->lock();
97     did_lock = true;
98   }
99
100 private:
101   bool did_lock;
102   typename util::vec<T *, 64>::type locks;
103 };
104
105 // like a lock_guard, but has the option of not acquiring
106 template <typename T>
107 class scoped_lock_guard {
108 public:
109   inline scoped_lock_guard(T &l)
110     : l(&l)
111   {
112     this->l->lock();
113   }
114
115   inline scoped_lock_guard(T *l)
116     : l(l)
117   {
118     if (this->l)
119       this->l->lock();
120   }
121
122   inline ~scoped_lock_guard()
123   {
124     if (l)
125       l->unlock();
126   }
127
128 private:
129   T *l;
130 };
131
132 // configuration flags
133 static int g_disable_xpartition_txn = 0;
134 static int g_disable_read_only_scans = 0;
135 static int g_enable_partition_locks = 0;
136 static int g_enable_separate_tree_per_partition = 0;
137 static int g_new_order_remote_item_pct = 1;
138 static int g_new_order_fast_id_gen = 0;
139 static int g_uniform_item_dist = 0;
140 static unsigned g_txn_workload_mix[] = { 45, 43, 4, 4, 4 }; // default TPC-C workload mix
141
142 static aligned_padded_elem<spinlock> *g_partition_locks = nullptr;
143 static aligned_padded_elem<atomic<uint64_t>> *g_district_ids = nullptr;
144
145 // maps a wid => partition id
146 static inline ALWAYS_INLINE unsigned int
147 PartitionId(unsigned int wid)
148 {
149   INVARIANT(wid >= 1 && wid <= NumWarehouses());
150   wid -= 1; // 0-idx
151   if (NumWarehouses() <= nthreads)
152     // more workers than partitions, so its easy
153     return wid;
154   const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
155   const unsigned partid = wid / nwhse_per_partition;
156   if (partid >= nthreads)
157     return nthreads - 1;
158   return partid;
159 }
160
161 static inline ALWAYS_INLINE spinlock &
162 LockForPartition(unsigned int wid)
163 {
164   INVARIANT(g_enable_partition_locks);
165   return g_partition_locks[PartitionId(wid)].elem;
166 }
167
168 static inline atomic<uint64_t> &
169 NewOrderIdHolder(unsigned warehouse, unsigned district)
170 {
171   INVARIANT(warehouse >= 1 && warehouse <= NumWarehouses());
172   INVARIANT(district >= 1 && district <= NumDistrictsPerWarehouse());
173   const unsigned idx =
174     (warehouse - 1) * NumDistrictsPerWarehouse() + (district - 1);
175   return g_district_ids[idx].elem;
176 }
177
178 static inline uint64_t
179 FastNewOrderIdGen(unsigned warehouse, unsigned district)
180 {
181   return NewOrderIdHolder(warehouse, district).fetch_add(1, memory_order_acq_rel);
182 }
183
184 struct checker {
185   // these sanity checks are just a few simple checks to make sure
186   // the data is not entirely corrupted
187
188   static inline ALWAYS_INLINE void
189   SanityCheckCustomer(const customer::key *k, const customer::value *v)
190   {
191     INVARIANT(k->c_w_id >= 1 && static_cast<size_t>(k->c_w_id) <= NumWarehouses());
192     INVARIANT(k->c_d_id >= 1 && static_cast<size_t>(k->c_d_id) <= NumDistrictsPerWarehouse());
193     INVARIANT(k->c_id >= 1 && static_cast<size_t>(k->c_id) <= NumCustomersPerDistrict());
194 #ifdef DISABLE_FIELD_SELECTION
195     INVARIANT(v->c_credit == "BC" || v->c_credit == "GC");
196     INVARIANT(v->c_middle == "OE");
197 #endif
198   }
199
200   static inline ALWAYS_INLINE void
201   SanityCheckWarehouse(const warehouse::key *k, const warehouse::value *v)
202   {
203     INVARIANT(k->w_id >= 1 && static_cast<size_t>(k->w_id) <= NumWarehouses());
204 #ifdef DISABLE_FIELD_SELECTION
205     INVARIANT(v->w_state.size() == 2);
206     INVARIANT(v->w_zip == "123456789");
207 #endif
208   }
209
210   static inline ALWAYS_INLINE void
211   SanityCheckDistrict(const district::key *k, const district::value *v)
212   {
213     INVARIANT(k->d_w_id >= 1 && static_cast<size_t>(k->d_w_id) <= NumWarehouses());
214     INVARIANT(k->d_id >= 1 && static_cast<size_t>(k->d_id) <= NumDistrictsPerWarehouse());
215 #ifdef DISABLE_FIELD_SELECTION
216     INVARIANT(v->d_next_o_id >= 3001);
217     INVARIANT(v->d_state.size() == 2);
218     INVARIANT(v->d_zip == "123456789");
219 #endif
220   }
221
222   static inline ALWAYS_INLINE void
223   SanityCheckItem(const item::key *k, const item::value *v)
224   {
225     INVARIANT(k->i_id >= 1 && static_cast<size_t>(k->i_id) <= NumItems());
226 #ifdef DISABLE_FIELD_SELECTION
227     INVARIANT(v->i_price >= 1.0 && v->i_price <= 100.0);
228 #endif
229   }
230
231   static inline ALWAYS_INLINE void
232   SanityCheckStock(const stock::key *k, const stock::value *v)
233   {
234     INVARIANT(k->s_w_id >= 1 && static_cast<size_t>(k->s_w_id) <= NumWarehouses());
235     INVARIANT(k->s_i_id >= 1 && static_cast<size_t>(k->s_i_id) <= NumItems());
236   }
237
238   static inline ALWAYS_INLINE void
239   SanityCheckNewOrder(const new_order::key *k, const new_order::value *v)
240   {
241     INVARIANT(k->no_w_id >= 1 && static_cast<size_t>(k->no_w_id) <= NumWarehouses());
242     INVARIANT(k->no_d_id >= 1 && static_cast<size_t>(k->no_d_id) <= NumDistrictsPerWarehouse());
243   }
244
245   static inline ALWAYS_INLINE void
246   SanityCheckOOrder(const oorder::key *k, const oorder::value *v)
247   {
248     INVARIANT(k->o_w_id >= 1 && static_cast<size_t>(k->o_w_id) <= NumWarehouses());
249     INVARIANT(k->o_d_id >= 1 && static_cast<size_t>(k->o_d_id) <= NumDistrictsPerWarehouse());
250 #ifdef DISABLE_FIELD_SELECTION
251     INVARIANT(v->o_c_id >= 1 && static_cast<size_t>(v->o_c_id) <= NumCustomersPerDistrict());
252     INVARIANT(v->o_carrier_id >= 0 && static_cast<size_t>(v->o_carrier_id) <= NumDistrictsPerWarehouse());
253     INVARIANT(v->o_ol_cnt >= 5 && v->o_ol_cnt <= 15);
254 #endif
255   }
256
257   static inline ALWAYS_INLINE void
258   SanityCheckOrderLine(const order_line::key *k, const order_line::value *v)
259   {
260     INVARIANT(k->ol_w_id >= 1 && static_cast<size_t>(k->ol_w_id) <= NumWarehouses());
261     INVARIANT(k->ol_d_id >= 1 && static_cast<size_t>(k->ol_d_id) <= NumDistrictsPerWarehouse());
262     INVARIANT(k->ol_number >= 1 && k->ol_number <= 15);
263 #ifdef DISABLE_FIELD_SELECTION
264     INVARIANT(v->ol_i_id >= 1 && static_cast<size_t>(v->ol_i_id) <= NumItems());
265 #endif
266   }
267 };
268
269 static string NameTokens[] =
270   {
271     string("BAR"),
272     string("OUGHT"),
273     string("ABLE"),
274     string("PRI"),
275     string("PRES"),
276     string("ESE"),
277     string("ANTI"),
278     string("CALLY"),
279     string("ATION"),
280     string("EING"),
281   };
282
283 class tpcc_worker_mixin {
284 public:
285
286   // only TPCC loaders need to call this- workers are automatically
287   // pinned by their worker id (which corresponds to warehouse id
288   // in TPCC)
289   //
290   // pins the *calling* thread
291   static void
292   PinToWarehouseId(unsigned int wid)
293   {
294     const unsigned int partid = PartitionId(wid);
295     ALWAYS_ASSERT(partid < nthreads);
296     const unsigned int pinid  = partid;
297     if (verbose)
298       cerr << "PinToWarehouseId(): coreid=" << coreid::core_id()
299            << " pinned to whse=" << wid << " (partid=" << partid << ")"
300            << endl;
301     rcu::s_instance.pin_current_thread(pinid);
302     rcu::s_instance.fault_region();
303   }
304
305   static inline uint32_t
306   GetCurrentTimeMillis()
307   {
308     //struct timeval tv;
309     //ALWAYS_ASSERT(gettimeofday(&tv, 0) == 0);
310     //return tv.tv_sec * 1000;
311
312     // XXX(stephentu): implement a scalable GetCurrentTimeMillis()
313     // for now, we just give each core an increasing number
314
315     static __thread uint32_t tl_hack = 0;
316     return tl_hack++;
317   }
318
319   // utils for generating random #s and strings
320
321   static inline ALWAYS_INLINE int
322   CheckBetweenInclusive(int v, int lower, int upper)
323   {
324     INVARIANT(v >= lower);
325     INVARIANT(v <= upper);
326     return v;
327   }
328
329   static inline ALWAYS_INLINE int
330   RandomNumber(fast_random &r, int min, int max)
331   {
332     return CheckBetweenInclusive((int) (r.next_uniform() * (max - min + 1) + min), min, max);
333   }
334
335   static inline ALWAYS_INLINE int
336   NonUniformRandom(fast_random &r, int A, int C, int min, int max)
337   {
338     return (((RandomNumber(r, 0, A) | RandomNumber(r, min, max)) + C) % (max - min + 1)) + min;
339   }
340
341   static inline ALWAYS_INLINE int
342   GetItemId(fast_random &r)
343   {
344     return CheckBetweenInclusive(
345         g_uniform_item_dist ?
346           RandomNumber(r, 1, NumItems()) :
347           NonUniformRandom(r, 8191, 7911, 1, NumItems()),
348         1, NumItems());
349   }
350
351   static inline ALWAYS_INLINE int
352   GetCustomerId(fast_random &r)
353   {
354     return CheckBetweenInclusive(NonUniformRandom(r, 1023, 259, 1, NumCustomersPerDistrict()), 1, NumCustomersPerDistrict());
355   }
356
357   // pick a number between [start, end)
358   static inline ALWAYS_INLINE unsigned
359   PickWarehouseId(fast_random &r, unsigned start, unsigned end)
360   {
361     INVARIANT(start < end);
362     const unsigned diff = end - start;
363     if (diff == 1)
364       return start;
365     return (r.next() % diff) + start;
366   }
367   // all tokens are at most 5 chars long
368   static const size_t CustomerLastNameMaxSize = 5 * 3;
369
370   static inline size_t
371   GetCustomerLastName(uint8_t *buf, fast_random &r, int num)
372   {
373     const string &s0 = NameTokens[num / 100];
374     const string &s1 = NameTokens[(num / 10) % 10];
375     const string &s2 = NameTokens[num % 10];
376     uint8_t *const begin = buf;
377     const size_t s0_sz = s0.size();
378     const size_t s1_sz = s1.size();
379     const size_t s2_sz = s2.size();
380     NDB_MEMCPY(buf, s0.data(), s0_sz); buf += s0_sz;
381     NDB_MEMCPY(buf, s1.data(), s1_sz); buf += s1_sz;
382     NDB_MEMCPY(buf, s2.data(), s2_sz); buf += s2_sz;
383     return buf - begin;
384   }
385
386   static inline ALWAYS_INLINE size_t
387   GetCustomerLastName(char *buf, fast_random &r, int num)
388   {
389     return GetCustomerLastName((uint8_t *) buf, r, num);
390   }
391
392   static inline string
393   GetCustomerLastName(fast_random &r, int num)
394   {
395     string ret;
396     ret.resize(CustomerLastNameMaxSize);
397     ret.resize(GetCustomerLastName((uint8_t *) &ret[0], r, num));
398     return ret;
399   }
400
401   static inline ALWAYS_INLINE string
402   GetNonUniformCustomerLastNameLoad(fast_random &r)
403   {
404     return GetCustomerLastName(r, NonUniformRandom(r, 255, 157, 0, 999));
405   }
406
407   static inline ALWAYS_INLINE size_t
408   GetNonUniformCustomerLastNameRun(uint8_t *buf, fast_random &r)
409   {
410     return GetCustomerLastName(buf, r, NonUniformRandom(r, 255, 223, 0, 999));
411   }
412
413   static inline ALWAYS_INLINE size_t
414   GetNonUniformCustomerLastNameRun(char *buf, fast_random &r)
415   {
416     return GetNonUniformCustomerLastNameRun((uint8_t *) buf, r);
417   }
418
419   static inline ALWAYS_INLINE string
420   GetNonUniformCustomerLastNameRun(fast_random &r)
421   {
422     return GetCustomerLastName(r, NonUniformRandom(r, 255, 223, 0, 999));
423   }
424
425   // following oltpbench, we really generate strings of len - 1...
426   static inline string
427   RandomStr(fast_random &r, uint len)
428   {
429     // this is a property of the oltpbench implementation...
430     if (!len)
431       return "";
432
433     uint i = 0;
434     string buf(len - 1, 0);
435     while (i < (len - 1)) {
436       const char c = (char) r.next_char();
437       // XXX(stephentu): oltpbench uses java's Character.isLetter(), which
438       // is a less restrictive filter than isalnum()
439       if (!isalnum(c))
440         continue;
441       buf[i++] = c;
442     }
443     return buf;
444   }
445
446   // RandomNStr() actually produces a string of length len
447   static inline string
448   RandomNStr(fast_random &r, uint len)
449   {
450     const char base = '0';
451     string buf(len, 0);
452     for (uint i = 0; i < len; i++)
453       buf[i] = (char)(base + (r.next() % 10));
454     return buf;
455   }
456
457 };
458
459 STATIC_COUNTER_DECL(scopedperf::tsc_ctr, tpcc_txn, tpcc_txn_cg)
460
461 template <typename Database, bool AllowReadOnlyScans>
462 class tpcc_worker : public bench_worker, public tpcc_worker_mixin {
463 public:
464   tpcc_worker(unsigned int worker_id,
465               unsigned long seed, Database *db,
466               const tpcc_tables<Database> &tables,
467               spin_barrier *barrier_a, spin_barrier *barrier_b,
468               uint warehouse_id_start, uint warehouse_id_end)
469     : bench_worker(worker_id, true, seed, db,
470                    barrier_a, barrier_b),
471       tpcc_worker_mixin(),
472       tables(tables),
473       warehouse_id_start(warehouse_id_start),
474       warehouse_id_end(warehouse_id_end)
475   {
476     ALWAYS_ASSERT(g_disable_read_only_scans == !AllowReadOnlyScans);
477     INVARIANT(warehouse_id_start >= 1);
478     INVARIANT(warehouse_id_start <= NumWarehouses());
479     INVARIANT(warehouse_id_end > warehouse_id_start);
480     INVARIANT(warehouse_id_end <= (NumWarehouses() + 1));
481     NDB_MEMSET(&last_no_o_ids[0], 0, sizeof(last_no_o_ids));
482     if (verbose) {
483       cerr << "tpcc: worker id " << worker_id
484         << " => warehouses [" << warehouse_id_start
485         << ", " << warehouse_id_end << ")"
486         << endl;
487     }
488   }
489
490   // XXX(stephentu): tune this
491   static const size_t NMaxCustomerIdxScanElems = 512;
492
493   txn_result txn_new_order();
494
495   static txn_result
496   TxnNewOrder(bench_worker *w)
497   {
498     ANON_REGION("TxnNewOrder:", &tpcc_txn_cg);
499     return static_cast<tpcc_worker *>(w)->txn_new_order();
500   }
501
502   txn_result txn_delivery();
503
504   static txn_result
505   TxnDelivery(bench_worker *w)
506   {
507     ANON_REGION("TxnDelivery:", &tpcc_txn_cg);
508     return static_cast<tpcc_worker *>(w)->txn_delivery();
509   }
510
511   txn_result txn_payment();
512
513   static txn_result
514   TxnPayment(bench_worker *w)
515   {
516     ANON_REGION("TxnPayment:", &tpcc_txn_cg);
517     return static_cast<tpcc_worker *>(w)->txn_payment();
518   }
519
520   txn_result txn_order_status();
521
522   static txn_result
523   TxnOrderStatus(bench_worker *w)
524   {
525     ANON_REGION("TxnOrderStatus:", &tpcc_txn_cg);
526     return static_cast<tpcc_worker *>(w)->txn_order_status();
527   }
528
529   txn_result txn_stock_level();
530
531   static txn_result
532   TxnStockLevel(bench_worker *w)
533   {
534     ANON_REGION("TxnStockLevel:", &tpcc_txn_cg);
535     return static_cast<tpcc_worker *>(w)->txn_stock_level();
536   }
537
538   virtual workload_desc_vec
539   get_workload() const OVERRIDE
540   {
541     workload_desc_vec w;
542     // numbers from sigmod.csail.mit.edu:
543     //w.push_back(workload_desc("NewOrder", 1.0, TxnNewOrder)); // ~10k ops/sec
544     //w.push_back(workload_desc("Payment", 1.0, TxnPayment)); // ~32k ops/sec
545     //w.push_back(workload_desc("Delivery", 1.0, TxnDelivery)); // ~104k ops/sec
546     //w.push_back(workload_desc("OrderStatus", 1.0, TxnOrderStatus)); // ~33k ops/sec
547     //w.push_back(workload_desc("StockLevel", 1.0, TxnStockLevel)); // ~2k ops/sec
548     unsigned m = 0;
549     for (size_t i = 0; i < ARRAY_NELEMS(g_txn_workload_mix); i++)
550       m += g_txn_workload_mix[i];
551     ALWAYS_ASSERT(m == 100);
552     if (g_txn_workload_mix[0])
553       w.push_back(workload_desc("NewOrder", double(g_txn_workload_mix[0])/100.0, TxnNewOrder));
554     if (g_txn_workload_mix[1])
555       w.push_back(workload_desc("Payment", double(g_txn_workload_mix[1])/100.0, TxnPayment));
556     if (g_txn_workload_mix[2])
557       w.push_back(workload_desc("Delivery", double(g_txn_workload_mix[2])/100.0, TxnDelivery));
558     if (g_txn_workload_mix[3])
559       w.push_back(workload_desc("OrderStatus", double(g_txn_workload_mix[3])/100.0, TxnOrderStatus));
560     if (g_txn_workload_mix[4])
561       w.push_back(workload_desc("StockLevel", double(g_txn_workload_mix[4])/100.0, TxnStockLevel));
562     return w;
563   }
564
565 protected:
566
567   virtual void
568   on_run_setup() OVERRIDE
569   {
570     if (!pin_cpus)
571       return;
572     const size_t a = worker_id % coreid::num_cpus_online();
573     const size_t b = a % nthreads;
574     rcu::s_instance.pin_current_thread(b);
575     rcu::s_instance.fault_region();
576   }
577
578 private:
579   tpcc_tables<Database> tables;
580   const uint warehouse_id_start;
581   const uint warehouse_id_end;
582   int32_t last_no_o_ids[10]; // XXX(stephentu): hack
583 };
584
585 template <typename Database>
586 class tpcc_warehouse_loader : public typed_bench_loader<Database>,
587                               public tpcc_worker_mixin {
588 public:
589   tpcc_warehouse_loader(unsigned long seed,
590                         Database *db,
591                         const tpcc_tables<Database> &tables)
592     : typed_bench_loader<Database>(seed, db),
593       tpcc_worker_mixin(),
594       tables(tables)
595   {}
596
597 protected:
598   virtual void
599   load()
600   {
601     uint64_t warehouse_total_sz = 0, n_warehouses = 0;
602     try {
603       vector<warehouse::value> warehouses;
604       {
605         scoped_str_arena s_arena(this->arena);
606         typename Database::template
607           TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
608         for (uint i = 1; i <= NumWarehouses(); i++) {
609           const warehouse::key k(i);
610
611           const string w_name = RandomStr(this->r, RandomNumber(this->r, 6, 10));
612           const string w_street_1 = RandomStr(this->r, RandomNumber(this->r, 10, 20));
613           const string w_street_2 = RandomStr(this->r, RandomNumber(this->r, 10, 20));
614           const string w_city = RandomStr(this->r, RandomNumber(this->r, 10, 20));
615           const string w_state = RandomStr(this->r, 3);
616           const string w_zip = "123456789";
617
618           warehouse::value v;
619           v.w_ytd = 300000;
620           v.w_tax = (float) RandomNumber(this->r, 0, 2000) / 10000.0;
621           v.w_name.assign(w_name);
622           v.w_street_1.assign(w_street_1);
623           v.w_street_2.assign(w_street_2);
624           v.w_city.assign(w_city);
625           v.w_state.assign(w_state);
626           v.w_zip.assign(w_zip);
627
628           checker::SanityCheckWarehouse(&k, &v);
629           const size_t sz = Size(v);
630           warehouse_total_sz += sz;
631           n_warehouses++;
632           tables.tbl_warehouse(i)->insert(txn, k, v);
633           warehouses.push_back(v);
634         }
635         ALWAYS_ASSERT(txn.commit());
636       }
637       {
638         scoped_str_arena s_arena(this->arena);
639         typename Database::template
640           TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
641         for (uint i = 1; i <= NumWarehouses(); i++) {
642           const warehouse::key k(i);
643           warehouse::value v;
644           ALWAYS_ASSERT(tables.tbl_warehouse(i)->search(txn, k, v));
645           ALWAYS_ASSERT(warehouses[i - 1] == v);
646           checker::SanityCheckWarehouse(&k, &v);
647         }
648         ALWAYS_ASSERT(txn.commit());
649       }
650     } catch (typename Database::abort_exception_type &e) {
651       // shouldn't abort on loading!
652       ALWAYS_ASSERT(false);
653     }
654     if (verbose) {
655       cerr << "[INFO] finished loading warehouse" << endl;
656       cerr << "[INFO]   * average warehouse record length: "
657            << (double(warehouse_total_sz)/double(n_warehouses)) << " bytes" << endl;
658     }
659   }
660
661   tpcc_tables<Database> tables;
662 };
663
664 template <typename Database>
665 class tpcc_item_loader : public typed_bench_loader<Database>,
666                          public tpcc_worker_mixin {
667 public:
668   tpcc_item_loader(unsigned long seed,
669                    Database *db,
670                    const tpcc_tables<Database> &tables)
671     : typed_bench_loader<Database>(seed, db),
672       tpcc_worker_mixin(),
673       tables(tables)
674   {}
675
676 protected:
677   virtual void
678   load()
679   {
680     const ssize_t bsize = this->typed_db()->txn_max_batch_size();
681     auto txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
682     uint64_t total_sz = 0;
683     try {
684       for (uint i = 1; i <= NumItems(); i++) {
685         // items don't "belong" to a certain warehouse, so no pinning
686         const item::key k(i);
687
688         item::value v;
689         const string i_name = RandomStr(this->r, RandomNumber(this->r, 14, 24));
690         v.i_name.assign(i_name);
691         v.i_price = (float) RandomNumber(this->r, 100, 10000) / 100.0;
692         const int len = RandomNumber(this->r, 26, 50);
693         if (RandomNumber(this->r, 1, 100) > 10) {
694           const string i_data = RandomStr(this->r, len);
695           v.i_data.assign(i_data);
696         } else {
697           const int startOriginal = RandomNumber(this->r, 2, (len - 8));
698           const string i_data = RandomStr(this->r, startOriginal + 1) + "ORIGINAL" + RandomStr(this->r, len - startOriginal - 7);
699           v.i_data.assign(i_data);
700         }
701         v.i_im_id = RandomNumber(this->r, 1, 10000);
702
703         checker::SanityCheckItem(&k, &v);
704         const size_t sz = Size(v);
705         total_sz += sz;
706         // XXX: replicate items table across all NUMA nodes
707         tables.tbl_item(1)->insert(*txn, k, v); // this table is shared, so any partition is OK
708
709         if (bsize != -1 && !(i % bsize)) {
710           ALWAYS_ASSERT(txn->commit());
711           txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
712           this->arena.reset();
713         }
714       }
715       ALWAYS_ASSERT(txn->commit());
716     } catch (typename Database::abort_exception_type &e) {
717       // shouldn't abort on loading!
718       ALWAYS_ASSERT(false);
719     }
720     if (verbose) {
721       cerr << "[INFO] finished loading item" << endl;
722       cerr << "[INFO]   * average item record length: "
723            << (double(total_sz)/double(NumItems())) << " bytes" << endl;
724     }
725   }
726
727   tpcc_tables<Database> tables;
728 };
729
730 template <typename Database>
731 class tpcc_stock_loader : public typed_bench_loader<Database>,
732                           public tpcc_worker_mixin {
733 public:
734   tpcc_stock_loader(unsigned long seed,
735                     Database *db,
736                     const tpcc_tables<Database> &tables,
737                     ssize_t warehouse_id)
738     : typed_bench_loader<Database>(seed, db),
739       tpcc_worker_mixin(),
740       tables(tables),
741       warehouse_id(warehouse_id)
742   {
743     ALWAYS_ASSERT(warehouse_id == -1 ||
744                   (warehouse_id >= 1 &&
745                    static_cast<size_t>(warehouse_id) <= NumWarehouses()));
746   }
747
748 protected:
749   virtual void
750   load()
751   {
752     uint64_t stock_total_sz = 0, n_stocks = 0;
753     const uint w_start = (warehouse_id == -1) ?
754       1 : static_cast<uint>(warehouse_id);
755     const uint w_end   = (warehouse_id == -1) ?
756       NumWarehouses() : static_cast<uint>(warehouse_id);
757
758     for (uint w = w_start; w <= w_end; w++) {
759       const size_t batchsize =
760         (this->typed_db()->txn_max_batch_size() == -1) ?
761           NumItems() : this->typed_db()->txn_max_batch_size();
762       const size_t nbatches = (batchsize > NumItems()) ? 1 : (NumItems() / batchsize);
763
764       if (pin_cpus)
765         PinToWarehouseId(w);
766
767       for (uint b = 0; b < nbatches;) {
768         scoped_str_arena s_arena(this->arena);
769         auto txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
770         try {
771           const size_t iend = std::min((b + 1) * batchsize + 1, NumItems());
772           for (uint i = (b * batchsize + 1); i <= iend; i++) {
773             const stock::key k(w, i);
774             const stock_data::key k_data(w, i);
775
776             stock::value v;
777             v.s_quantity = RandomNumber(this->r, 10, 100);
778             v.s_ytd = 0;
779             v.s_order_cnt = 0;
780             v.s_remote_cnt = 0;
781
782             stock_data::value v_data;
783             const int len = RandomNumber(this->r, 26, 50);
784             if (RandomNumber(this->r, 1, 100) > 10) {
785               const string s_data = RandomStr(this->r, len);
786               v_data.s_data.assign(s_data);
787             } else {
788               const int startOriginal = RandomNumber(this->r, 2, (len - 8));
789               const string s_data = RandomStr(this->r, startOriginal + 1) +
790                 "ORIGINAL" + RandomStr(this->r, len - startOriginal - 7);
791               v_data.s_data.assign(s_data);
792             }
793             v_data.s_dist_01.assign(RandomStr(this->r, 24));
794             v_data.s_dist_02.assign(RandomStr(this->r, 24));
795             v_data.s_dist_03.assign(RandomStr(this->r, 24));
796             v_data.s_dist_04.assign(RandomStr(this->r, 24));
797             v_data.s_dist_05.assign(RandomStr(this->r, 24));
798             v_data.s_dist_06.assign(RandomStr(this->r, 24));
799             v_data.s_dist_07.assign(RandomStr(this->r, 24));
800             v_data.s_dist_08.assign(RandomStr(this->r, 24));
801             v_data.s_dist_09.assign(RandomStr(this->r, 24));
802             v_data.s_dist_10.assign(RandomStr(this->r, 24));
803
804             checker::SanityCheckStock(&k, &v);
805             const size_t sz = Size(v);
806             stock_total_sz += sz;
807             n_stocks++;
808             tables.tbl_stock(w)->insert(*txn, k, v);
809             tables.tbl_stock_data(w)->insert(*txn, k_data, v_data);
810           }
811           if (txn->commit()) {
812             b++;
813           } else {
814             if (verbose)
815               cerr << "[WARNING] stock loader loading abort" << endl;
816           }
817         } catch (typename Database::abort_exception_type &e) {
818           txn->abort();
819           ALWAYS_ASSERT(warehouse_id != -1);
820           if (verbose)
821             cerr << "[WARNING] stock loader loading abort" << endl;
822         }
823       }
824     }
825
826     if (verbose) {
827       if (warehouse_id == -1) {
828         cerr << "[INFO] finished loading stock" << endl;
829         cerr << "[INFO]   * average stock record length: "
830              << (double(stock_total_sz)/double(n_stocks)) << " bytes" << endl;
831       } else {
832         cerr << "[INFO] finished loading stock (w=" << warehouse_id << ")" << endl;
833       }
834     }
835   }
836
837 private:
838   tpcc_tables<Database> tables;
839   ssize_t warehouse_id;
840 };
841
842 template <typename Database>
843 class tpcc_district_loader : public typed_bench_loader<Database>,
844                              public tpcc_worker_mixin {
845 public:
846   tpcc_district_loader(unsigned long seed,
847                        Database *db,
848                        const tpcc_tables<Database> &tables)
849     : typed_bench_loader<Database>(seed, db),
850       tpcc_worker_mixin(),
851       tables(tables)
852   {}
853
854 protected:
855   virtual void
856   load()
857   {
858     const ssize_t bsize = this->typed_db()->txn_max_batch_size();
859     auto txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
860     uint64_t district_total_sz = 0, n_districts = 0;
861     try {
862       uint cnt = 0;
863       for (uint w = 1; w <= NumWarehouses(); w++) {
864         if (pin_cpus)
865           PinToWarehouseId(w);
866         for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++, cnt++) {
867           const district::key k(w, d);
868
869           district::value v;
870           v.d_ytd = 30000;
871           v.d_tax = (float) (RandomNumber(this->r, 0, 2000) / 10000.0);
872           v.d_next_o_id = 3001;
873           v.d_name.assign(RandomStr(this->r, RandomNumber(this->r, 6, 10)));
874           v.d_street_1.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
875           v.d_street_2.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
876           v.d_city.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
877           v.d_state.assign(RandomStr(this->r, 3));
878           v.d_zip.assign("123456789");
879
880           checker::SanityCheckDistrict(&k, &v);
881           const size_t sz = Size(v);
882           district_total_sz += sz;
883           n_districts++;
884           tables.tbl_district(w)->insert(*txn, k, v);
885
886           if (bsize != -1 && !((cnt + 1) % bsize)) {
887             ALWAYS_ASSERT(txn->commit());
888             txn = this->typed_db()->template new_txn<abstract_db::HINT_DEFAULT>(txn_flags, this->arena);
889             this->arena.reset();
890           }
891         }
892       }
893       ALWAYS_ASSERT(txn->commit());
894     } catch (typename Database::abort_exception_type &e) {
895       // shouldn't abort on loading!
896       ALWAYS_ASSERT(false);
897     }
898     if (verbose) {
899       cerr << "[INFO] finished loading district" << endl;
900       cerr << "[INFO]   * average district record length: "
901            << (double(district_total_sz)/double(n_districts)) << " bytes" << endl;
902     }
903   }
904
905   tpcc_tables<Database> tables;
906 };
907
908 template <typename Database>
909 class tpcc_customer_loader : public typed_bench_loader<Database>,
910                              public tpcc_worker_mixin {
911 public:
912   tpcc_customer_loader(unsigned long seed,
913                        Database *db,
914                        const tpcc_tables<Database> &tables,
915                        ssize_t warehouse_id)
916     : typed_bench_loader<Database>(seed, db),
917       tpcc_worker_mixin(),
918       tables(tables),
919       warehouse_id(warehouse_id)
920   {
921     ALWAYS_ASSERT(warehouse_id == -1 ||
922                   (warehouse_id >= 1 &&
923                    static_cast<size_t>(warehouse_id) <= NumWarehouses()));
924   }
925
926 protected:
927   virtual void
928   load()
929   {
930     const uint w_start = (warehouse_id == -1) ?
931       1 : static_cast<uint>(warehouse_id);
932     const uint w_end   = (warehouse_id == -1) ?
933       NumWarehouses() : static_cast<uint>(warehouse_id);
934     const size_t batchsize =
935       (this->typed_db()->txn_max_batch_size() == -1) ?
936         NumCustomersPerDistrict() : this->typed_db()->txn_max_batch_size();
937     const size_t nbatches =
938       (batchsize > NumCustomersPerDistrict()) ?
939         1 : (NumCustomersPerDistrict() / batchsize);
940     cerr << "num batches: " << nbatches << endl;
941
942     uint64_t total_sz = 0;
943
944     for (uint w = w_start; w <= w_end; w++) {
945       if (pin_cpus)
946         PinToWarehouseId(w);
947       for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
948         for (uint batch = 0; batch < nbatches;) {
949           scoped_str_arena s_arena(this->arena);
950           typename Database::template TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
951           const size_t cstart = batch * batchsize;
952           const size_t cend = std::min((batch + 1) * batchsize, NumCustomersPerDistrict());
953           try {
954             for (uint cidx0 = cstart; cidx0 < cend; cidx0++) {
955               const uint c = cidx0 + 1;
956               const customer::key k(w, d, c);
957
958               customer::value v;
959               v.c_discount = (float) (RandomNumber(this->r, 1, 5000) / 10000.0);
960               if (RandomNumber(this->r, 1, 100) <= 10)
961                 v.c_credit.assign("BC");
962               else
963                 v.c_credit.assign("GC");
964
965               if (c <= 1000)
966                 v.c_last.assign(GetCustomerLastName(this->r, c - 1));
967               else
968                 v.c_last.assign(GetNonUniformCustomerLastNameLoad(this->r));
969
970               v.c_first.assign(RandomStr(this->r, RandomNumber(this->r, 8, 16)));
971               v.c_credit_lim = 50000;
972
973               v.c_balance = -10;
974               v.c_ytd_payment = 10;
975               v.c_payment_cnt = 1;
976               v.c_delivery_cnt = 0;
977
978               v.c_street_1.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
979               v.c_street_2.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
980               v.c_city.assign(RandomStr(this->r, RandomNumber(this->r, 10, 20)));
981               v.c_state.assign(RandomStr(this->r, 3));
982               v.c_zip.assign(RandomNStr(this->r, 4) + "11111");
983               v.c_phone.assign(RandomNStr(this->r, 16));
984               v.c_since = GetCurrentTimeMillis();
985               v.c_middle.assign("OE");
986               v.c_data.assign(RandomStr(this->r, RandomNumber(this->r, 300, 500)));
987
988               checker::SanityCheckCustomer(&k, &v);
989               const size_t sz = Size(v);
990               total_sz += sz;
991               tables.tbl_customer(w)->insert(txn, k, v);
992
993               // customer name index
994               const customer_name_idx::key k_idx(k.c_w_id, k.c_d_id, v.c_last.str(true), v.c_first.str(true));
995               const customer_name_idx::value v_idx(k.c_id);
996
997               // index structure is:
998               // (c_w_id, c_d_id, c_last, c_first) -> (c_id)
999
1000               tables.tbl_customer_name_idx(w)->insert(txn, k_idx, v_idx);
1001
1002               history::key k_hist;
1003               k_hist.h_c_id = c;
1004               k_hist.h_c_d_id = d;
1005               k_hist.h_c_w_id = w;
1006               k_hist.h_d_id = d;
1007               k_hist.h_w_id = w;
1008               k_hist.h_date = GetCurrentTimeMillis();
1009
1010               history::value v_hist;
1011               v_hist.h_amount = 10;
1012               v_hist.h_data.assign(RandomStr(this->r, RandomNumber(this->r, 10, 24)));
1013
1014               tables.tbl_history(w)->insert(txn, k_hist, v_hist);
1015             }
1016             if (txn.commit()) {
1017               batch++;
1018             } else {
1019               txn.abort();
1020               if (verbose)
1021                 cerr << "[WARNING] customer loader loading abort" << endl;
1022             }
1023           } catch (typename Database::abort_exception_type &e) {
1024             txn.abort();
1025             if (verbose)
1026               cerr << "[WARNING] customer loader loading abort" << endl;
1027           }
1028         }
1029       }
1030     }
1031
1032     if (verbose) {
1033       if (warehouse_id == -1) {
1034         cerr << "[INFO] finished loading customer" << endl;
1035         cerr << "[INFO]   * average customer record length: "
1036              << (double(total_sz)/double(NumWarehouses()*NumDistrictsPerWarehouse()*NumCustomersPerDistrict()))
1037              << " bytes " << endl;
1038       } else {
1039         cerr << "[INFO] finished loading customer (w=" << warehouse_id << ")" << endl;
1040       }
1041     }
1042   }
1043
1044 private:
1045   tpcc_tables<Database> tables;
1046   ssize_t warehouse_id;
1047 };
1048
1049 template <typename Database>
1050 class tpcc_order_loader : public typed_bench_loader<Database>,
1051                           public tpcc_worker_mixin {
1052 public:
1053   tpcc_order_loader(unsigned long seed,
1054                     Database *db,
1055                     const tpcc_tables<Database> &tables,
1056                     ssize_t warehouse_id)
1057     : typed_bench_loader<Database>(seed, db),
1058       tpcc_worker_mixin(),
1059       tables(tables),
1060       warehouse_id(warehouse_id)
1061   {
1062     ALWAYS_ASSERT(warehouse_id == -1 ||
1063                   (warehouse_id >= 1 &&
1064                    static_cast<size_t>(warehouse_id) <= NumWarehouses()));
1065   }
1066
1067 protected:
1068   virtual void
1069   load()
1070   {
1071     uint64_t order_line_total_sz = 0, n_order_lines = 0;
1072     uint64_t oorder_total_sz = 0, n_oorders = 0;
1073     uint64_t new_order_total_sz = 0, n_new_orders = 0;
1074
1075     const uint w_start = (warehouse_id == -1) ?
1076       1 : static_cast<uint>(warehouse_id);
1077     const uint w_end   = (warehouse_id == -1) ?
1078       NumWarehouses() : static_cast<uint>(warehouse_id);
1079
1080     for (uint w = w_start; w <= w_end; w++) {
1081       if (pin_cpus)
1082         PinToWarehouseId(w);
1083       for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
1084         set<uint> c_ids_s;
1085         vector<uint> c_ids;
1086         while (c_ids.size() != NumCustomersPerDistrict()) {
1087           const auto x = (this->r.next() % NumCustomersPerDistrict()) + 1;
1088           if (c_ids_s.count(x))
1089             continue;
1090           c_ids_s.insert(x);
1091           c_ids.emplace_back(x);
1092         }
1093         for (uint c = 1; c <= NumCustomersPerDistrict();) {
1094           scoped_str_arena s_arena(this->arena);
1095           typename Database::template TransactionType<abstract_db::HINT_DEFAULT>::type txn(txn_flags, this->arena);
1096           try {
1097             const oorder::key k_oo(w, d, c);
1098
1099             oorder::value v_oo;
1100             v_oo.o_c_id = c_ids[c - 1];
1101             if (k_oo.o_id < 2101)
1102               v_oo.o_carrier_id = RandomNumber(this->r, 1, 10);
1103             else
1104               v_oo.o_carrier_id = 0;
1105             v_oo.o_ol_cnt = RandomNumber(this->r, 5, 15);
1106             v_oo.o_all_local = 1;
1107             v_oo.o_entry_d = GetCurrentTimeMillis();
1108
1109             checker::SanityCheckOOrder(&k_oo, &v_oo);
1110             const size_t sz = Size(v_oo);
1111             oorder_total_sz += sz;
1112             n_oorders++;
1113             tables.tbl_oorder(w)->insert(txn, k_oo, v_oo);
1114
1115             const oorder_c_id_idx::key k_oo_idx(k_oo.o_w_id, k_oo.o_d_id, v_oo.o_c_id, k_oo.o_id);
1116             const oorder_c_id_idx::value v_oo_idx(0);
1117
1118             tables.tbl_oorder_c_id_idx(w)->insert(txn, k_oo_idx, v_oo_idx);
1119
1120             if (c >= 2101) {
1121               const new_order::key k_no(w, d, c);
1122               const new_order::value v_no;
1123
1124               checker::SanityCheckNewOrder(&k_no, &v_no);
1125               const size_t sz = Size(v_no);
1126               new_order_total_sz += sz;
1127               n_new_orders++;
1128               tables.tbl_new_order(w)->insert(txn, k_no, v_no);
1129             }
1130
1131             for (uint l = 1; l <= uint(v_oo.o_ol_cnt); l++) {
1132               const order_line::key k_ol(w, d, c, l);
1133
1134               order_line::value v_ol;
1135               v_ol.ol_i_id = RandomNumber(this->r, 1, 100000);
1136               if (k_ol.ol_o_id < 2101) {
1137                 v_ol.ol_delivery_d = v_oo.o_entry_d;
1138                 v_ol.ol_amount = 0;
1139               } else {
1140                 v_ol.ol_delivery_d = 0;
1141                 // random within [0.01 .. 9,999.99]
1142                 v_ol.ol_amount = (float) (RandomNumber(this->r, 1, 999999) / 100.0);
1143               }
1144
1145               v_ol.ol_supply_w_id = k_ol.ol_w_id;
1146               v_ol.ol_quantity = 5;
1147               // v_ol.ol_dist_info comes from stock_data(ol_supply_w_id, ol_o_id)
1148               //v_ol.ol_dist_info = RandomStr(this->r, 24);
1149
1150               checker::SanityCheckOrderLine(&k_ol, &v_ol);
1151               const size_t sz = Size(v_ol);
1152               order_line_total_sz += sz;
1153               n_order_lines++;
1154               tables.tbl_order_line(w)->insert(txn, k_ol, v_ol);
1155             }
1156             if (txn.commit()) {
1157               c++;
1158             } else {
1159               txn.abort();
1160               ALWAYS_ASSERT(warehouse_id != -1);
1161               if (verbose)
1162                 cerr << "[WARNING] order loader loading abort" << endl;
1163             }
1164           } catch (typename Database::abort_exception_type &e) {
1165             txn.abort();
1166             ALWAYS_ASSERT(warehouse_id != -1);
1167             if (verbose)
1168               cerr << "[WARNING] order loader loading abort" << endl;
1169           }
1170         }
1171       }
1172     }
1173
1174     if (verbose) {
1175       if (warehouse_id == -1) {
1176         cerr << "[INFO] finished loading order" << endl;
1177         cerr << "[INFO]   * average order_line record length: "
1178              << (double(order_line_total_sz)/double(n_order_lines)) << " bytes" << endl;
1179         cerr << "[INFO]   * average oorder record length: "
1180              << (double(oorder_total_sz)/double(n_oorders)) << " bytes" << endl;
1181         cerr << "[INFO]   * average new_order record length: "
1182              << (double(new_order_total_sz)/double(n_new_orders)) << " bytes" << endl;
1183       } else {
1184         cerr << "[INFO] finished loading order (w=" << warehouse_id << ")" << endl;
1185       }
1186     }
1187   }
1188
1189 private:
1190   tpcc_tables<Database> tables;
1191   ssize_t warehouse_id;
1192 };
1193
1194 static event_counter evt_tpcc_cross_partition_new_order_txns("tpcc_cross_partition_new_order_txns");
1195 static event_counter evt_tpcc_cross_partition_payment_txns("tpcc_cross_partition_payment_txns");
1196
1197 template <typename Database, bool AllowReadOnlyScans>
1198 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1199 tpcc_worker<Database, AllowReadOnlyScans>::txn_new_order()
1200 {
1201   const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1202   const uint districtID = RandomNumber(this->r, 1, 10);
1203   const uint customerID = GetCustomerId(r);
1204   const uint numItems = RandomNumber(this->r, 5, 15);
1205   uint itemIDs[15], supplierWarehouseIDs[15], orderQuantities[15];
1206   bool allLocal = true;
1207   for (uint i = 0; i < numItems; i++) {
1208     itemIDs[i] = GetItemId(r);
1209     if (likely(g_disable_xpartition_txn ||
1210                NumWarehouses() == 1 ||
1211                RandomNumber(this->r, 1, 100) > g_new_order_remote_item_pct)) {
1212       supplierWarehouseIDs[i] = warehouse_id;
1213     } else {
1214       do {
1215        supplierWarehouseIDs[i] = RandomNumber(this->r, 1, NumWarehouses());
1216       } while (supplierWarehouseIDs[i] == warehouse_id);
1217       allLocal = false;
1218     }
1219     orderQuantities[i] = RandomNumber(this->r, 1, 10);
1220   }
1221   INVARIANT(!g_disable_xpartition_txn || allLocal);
1222   if (!allLocal)
1223     ++evt_tpcc_cross_partition_new_order_txns;
1224
1225   // XXX(stephentu): implement rollback
1226   //
1227   // worst case txn profile:
1228   //   1 customer get
1229   //   1 warehouse get
1230   //   1 district get
1231   //   1 new_order insert
1232   //   1 district put
1233   //   1 oorder insert
1234   //   1 oorder_cid_idx insert
1235   //   15 times:
1236   //      1 item get
1237   //      1 stock get
1238   //      1 stock put
1239   //      1 order_line insert
1240   //
1241   // output from txn counters:
1242   //   max_absent_range_set_size : 0
1243   //   max_absent_set_size : 0
1244   //   max_node_scan_size : 0
1245   //   max_read_set_size : 15
1246   //   max_write_set_size : 15
1247   //   num_txn_contexts : 9
1248   typename Database::template TransactionType<abstract_db::HINT_TPCC_NEW_ORDER>::type txn(txn_flags, this->arena);
1249   scoped_str_arena s_arena(this->arena);
1250   scoped_multilock<spinlock> mlock;
1251   if (g_enable_partition_locks) {
1252     if (allLocal) {
1253       mlock.enq(LockForPartition(warehouse_id));
1254     } else {
1255       small_unordered_map<unsigned int, bool, 64> lockset;
1256       mlock.enq(LockForPartition(warehouse_id));
1257       lockset[PartitionId(warehouse_id)] = 1;
1258       for (uint i = 0; i < numItems; i++) {
1259         if (lockset.find(PartitionId(supplierWarehouseIDs[i])) == lockset.end()) {
1260           mlock.enq(LockForPartition(supplierWarehouseIDs[i]));
1261           lockset[PartitionId(supplierWarehouseIDs[i])] = 1;
1262         }
1263       }
1264     }
1265     mlock.multilock();
1266   }
1267   try {
1268     ssize_t ret = 0;
1269     const customer::key k_c(warehouse_id, districtID, customerID);
1270     customer::value v_c;
1271     ALWAYS_ASSERT(
1272         tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c,
1273           GUARDED_FIELDS(
1274             customer::value::c_discount_field,
1275             customer::value::c_last_field,
1276             customer::value::c_credit_field)));
1277     checker::SanityCheckCustomer(&k_c, &v_c);
1278
1279     const warehouse::key k_w(warehouse_id);
1280     warehouse::value v_w;
1281     ALWAYS_ASSERT(
1282         tables.tbl_warehouse(warehouse_id)->search(txn, k_w, v_w,
1283           GUARDED_FIELDS(warehouse::value::w_tax_field)));
1284     checker::SanityCheckWarehouse(&k_w, &v_w);
1285
1286     const district::key k_d(warehouse_id, districtID);
1287     district::value v_d;
1288     ALWAYS_ASSERT(
1289         tables.tbl_district(warehouse_id)->search(txn, k_d, v_d,
1290           GUARDED_FIELDS(
1291             district::value::d_next_o_id_field,
1292             district::value::d_tax_field)));
1293     checker::SanityCheckDistrict(&k_d, &v_d);
1294
1295     const uint64_t my_next_o_id = g_new_order_fast_id_gen ?
1296         FastNewOrderIdGen(warehouse_id, districtID) : v_d.d_next_o_id;
1297
1298     const new_order::key k_no(warehouse_id, districtID, my_next_o_id);
1299     const new_order::value v_no;
1300     const size_t new_order_sz = Size(v_no);
1301     tables.tbl_new_order(warehouse_id)->insert(txn, k_no, v_no);
1302     ret += new_order_sz;
1303
1304     if (!g_new_order_fast_id_gen) {
1305       v_d.d_next_o_id++;
1306       tables.tbl_district(warehouse_id)->put(txn, k_d, v_d,
1307           GUARDED_FIELDS(district::value::d_next_o_id_field));
1308     }
1309
1310     const oorder::key k_oo(warehouse_id, districtID, k_no.no_o_id);
1311     oorder::value v_oo;
1312     v_oo.o_c_id = int32_t(customerID);
1313     v_oo.o_carrier_id = 0; // seems to be ignored
1314     v_oo.o_ol_cnt = int8_t(numItems);
1315     v_oo.o_all_local = allLocal;
1316     v_oo.o_entry_d = GetCurrentTimeMillis();
1317
1318     const size_t oorder_sz = Size(v_oo);
1319     tables.tbl_oorder(warehouse_id)->insert(txn, k_oo, v_oo);
1320     ret += oorder_sz;
1321
1322     const oorder_c_id_idx::key k_oo_idx(warehouse_id, districtID, customerID, k_no.no_o_id);
1323     const oorder_c_id_idx::value v_oo_idx(0);
1324
1325     tables.tbl_oorder_c_id_idx(warehouse_id)->insert(txn, k_oo_idx, v_oo_idx);
1326
1327     for (uint ol_number = 1; ol_number <= numItems; ol_number++) {
1328       const uint ol_supply_w_id = supplierWarehouseIDs[ol_number - 1];
1329       const uint ol_i_id = itemIDs[ol_number - 1];
1330       const uint ol_quantity = orderQuantities[ol_number - 1];
1331
1332       const item::key k_i(ol_i_id);
1333       item::value v_i;
1334       ALWAYS_ASSERT(tables.tbl_item(1)->search(txn, k_i, v_i,
1335             GUARDED_FIELDS(
1336               item::value::i_price_field,
1337               item::value::i_name_field,
1338               item::value::i_data_field)));
1339       checker::SanityCheckItem(&k_i, &v_i);
1340
1341       const stock::key k_s(ol_supply_w_id, ol_i_id);
1342       stock::value v_s;
1343       ALWAYS_ASSERT(tables.tbl_stock(ol_supply_w_id)->search(txn, k_s, v_s));
1344       checker::SanityCheckStock(&k_s, &v_s);
1345
1346       stock::value v_s_new(v_s);
1347       if (v_s_new.s_quantity - ol_quantity >= 10)
1348         v_s_new.s_quantity -= ol_quantity;
1349       else
1350         v_s_new.s_quantity += -int32_t(ol_quantity) + 91;
1351       v_s_new.s_ytd += ol_quantity;
1352       v_s_new.s_remote_cnt += (ol_supply_w_id == warehouse_id) ? 0 : 1;
1353
1354       tables.tbl_stock(ol_supply_w_id)->put(txn, k_s, v_s_new);
1355
1356       const order_line::key k_ol(warehouse_id, districtID, k_no.no_o_id, ol_number);
1357       order_line::value v_ol;
1358       v_ol.ol_i_id = int32_t(ol_i_id);
1359       v_ol.ol_delivery_d = 0; // not delivered yet
1360       v_ol.ol_amount = float(ol_quantity) * v_i.i_price;
1361       v_ol.ol_supply_w_id = int32_t(ol_supply_w_id);
1362       v_ol.ol_quantity = int8_t(ol_quantity);
1363
1364       const size_t order_line_sz = Size(v_ol);
1365       tables.tbl_order_line(warehouse_id)->insert(txn, k_ol, v_ol);
1366       ret += order_line_sz;
1367     }
1368
1369     //measure_txn_counters(txn, "txn_new_order");
1370     if (likely(txn.commit()))
1371       return txn_result(true, ret);
1372   } catch (typename Database::abort_exception_type &e) {
1373     txn.abort();
1374   }
1375   return txn_result(false, 0);
1376 }
1377
1378 template <typename Database>
1379 class new_order_scan_callback : public
1380   Database::template IndexType<schema<new_order>>::type::search_range_callback {
1381 public:
1382   new_order_scan_callback() : invoked(false) {}
1383   virtual bool
1384   invoke(const new_order::key &key, const new_order::value &value) OVERRIDE
1385   {
1386     INVARIANT(!invoked);
1387     k_no = key;
1388     invoked = true;
1389 #ifdef CHECK_INVARIANTS
1390     checker::SanityCheckNewOrder(&key, &value);
1391 #endif
1392     return false;
1393   }
1394   inline const new_order::key &
1395   get_key() const
1396   {
1397     return k_no;
1398   }
1399   inline bool was_invoked() const { return invoked; }
1400 private:
1401   new_order::key k_no;
1402   bool invoked;
1403 };
1404
1405 STATIC_COUNTER_DECL(scopedperf::tod_ctr, delivery_probe0_tod, delivery_probe0_cg)
1406
1407 template <typename Database, bool AllowReadOnlyScans>
1408 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1409 tpcc_worker<Database, AllowReadOnlyScans>::txn_delivery()
1410 {
1411   const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1412   const uint o_carrier_id = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1413   const uint32_t ts = GetCurrentTimeMillis();
1414
1415   // worst case txn profile:
1416   //   10 times:
1417   //     1 new_order scan node
1418   //     1 oorder get
1419   //     2 order_line scan nodes
1420   //     15 order_line puts
1421   //     1 new_order remove
1422   //     1 oorder put
1423   //     1 customer get
1424   //     1 customer put
1425   //
1426   // output from counters:
1427   //   max_absent_range_set_size : 0
1428   //   max_absent_set_size : 0
1429   //   max_node_scan_size : 21
1430   //   max_read_set_size : 133
1431   //   max_write_set_size : 133
1432   //   num_txn_contexts : 4
1433   typename Database::template TransactionType<abstract_db::HINT_TPCC_DELIVERY>::type txn(txn_flags, this->arena);
1434   scoped_str_arena s_arena(this->arena);
1435   scoped_lock_guard<spinlock> slock(
1436       g_enable_partition_locks ? &LockForPartition(warehouse_id) : nullptr);
1437   try {
1438     ssize_t ret = 0;
1439     for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
1440       const new_order::key k_no_0(warehouse_id, d, last_no_o_ids[d - 1]);
1441       const new_order::key k_no_1(warehouse_id, d, numeric_limits<int32_t>::max());
1442       new_order_scan_callback<Database> new_order_c;
1443       {
1444         ANON_REGION("DeliverNewOrderScan:", &delivery_probe0_cg);
1445         tables.tbl_new_order(warehouse_id)->search_range_call(
1446             txn, k_no_0, &k_no_1, new_order_c);
1447       }
1448
1449       const new_order::key &k_no = new_order_c.get_key();
1450       if (unlikely(!new_order_c.was_invoked()))
1451           continue;
1452       last_no_o_ids[d - 1] = k_no.no_o_id + 1; // XXX: update last seen
1453
1454       const oorder::key k_oo(warehouse_id, d, k_no.no_o_id);
1455       oorder::value v_oo;
1456       if (unlikely(!tables.tbl_oorder(warehouse_id)->search(txn, k_oo, v_oo,
1457               GUARDED_FIELDS(
1458                 oorder::value::o_c_id_field,
1459                 oorder::value::o_carrier_id_field)))) {
1460         // even if we read the new order entry, there's no guarantee
1461         // we will read the oorder entry: in this case the txn will abort,
1462         // but we're simply bailing out early
1463         txn.abort();
1464         return txn_result(false, 0);
1465       }
1466       checker::SanityCheckOOrder(&k_oo, &v_oo);
1467
1468       // never more than 15 order_lines per order
1469       static_limit_callback<typename Database::template IndexType<schema<order_line>>::type, 15, false> c;
1470       const order_line::key k_oo_0(warehouse_id, d, k_no.no_o_id, 0);
1471       const order_line::key k_oo_1(warehouse_id, d, k_no.no_o_id, numeric_limits<int32_t>::max());
1472
1473       // XXX(stephentu): mutable scans would help here
1474       tables.tbl_order_line(warehouse_id)->search_range_call(
1475           txn, k_oo_0, &k_oo_1, c, false,
1476           GUARDED_FIELDS(
1477             order_line::value::ol_amount_field,
1478             order_line::value::ol_delivery_d_field));
1479       float sum = 0.0;
1480       for (size_t i = 0; i < c.size(); i++) {
1481 #if defined(CHECK_INVARIANTS) && defined(DISABLE_FIELD_SELECTION)
1482         checker::SanityCheckOrderLine(&c.key(i), &c.value(i));
1483 #endif
1484         sum += c.value(i).ol_amount;
1485         c.value(i).ol_delivery_d = ts;
1486         tables.tbl_order_line(warehouse_id)->put(txn, c.key(i), c.value(i),
1487             GUARDED_FIELDS(order_line::value::ol_delivery_d_field));
1488       }
1489
1490       // delete new order
1491       tables.tbl_new_order(warehouse_id)->remove(txn, k_no);
1492       ret -= 0 /*new_order_c.get_value_size()*/;
1493
1494       // update oorder
1495       v_oo.o_carrier_id = o_carrier_id;
1496       tables.tbl_oorder(warehouse_id)->put(txn, k_oo, v_oo,
1497           GUARDED_FIELDS(oorder::value::o_carrier_id_field));
1498
1499       const uint c_id = v_oo.o_c_id;
1500       const float ol_total = sum;
1501
1502       // update customer
1503       const customer::key k_c(warehouse_id, d, c_id);
1504       customer::value v_c;
1505       ALWAYS_ASSERT(tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c,
1506             GUARDED_FIELDS(customer::value::c_balance_field)));
1507       v_c.c_balance += ol_total;
1508       tables.tbl_customer(warehouse_id)->put(txn, k_c, v_c,
1509             GUARDED_FIELDS(customer::value::c_balance_field));
1510     }
1511     //measure_txn_counters(txn, "txn_delivery");
1512     if (likely(txn.commit()))
1513       return txn_result(true, ret);
1514   } catch (typename Database::abort_exception_type &e) {
1515     txn.abort();
1516   }
1517   return txn_result(false, 0);
1518 }
1519
1520 static event_avg_counter evt_avg_cust_name_idx_scan_size("avg_cust_name_idx_scan_size");
1521
1522 template <typename Database, bool AllowReadOnlyScans>
1523 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1524 tpcc_worker<Database, AllowReadOnlyScans>::txn_payment()
1525 {
1526   const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1527   const uint districtID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1528   uint customerDistrictID, customerWarehouseID;
1529   if (likely(g_disable_xpartition_txn ||
1530              NumWarehouses() == 1 ||
1531              RandomNumber(this->r, 1, 100) <= 85)) {
1532     customerDistrictID = districtID;
1533     customerWarehouseID = warehouse_id;
1534   } else {
1535     customerDistrictID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1536     do {
1537       customerWarehouseID = RandomNumber(this->r, 1, NumWarehouses());
1538     } while (customerWarehouseID == warehouse_id);
1539   }
1540   const float paymentAmount = (float) (RandomNumber(this->r, 100, 500000) / 100.0);
1541   const uint32_t ts = GetCurrentTimeMillis();
1542   INVARIANT(!g_disable_xpartition_txn || customerWarehouseID == warehouse_id);
1543
1544   // output from txn counters:
1545   //   max_absent_range_set_size : 0
1546   //   max_absent_set_size : 0
1547   //   max_node_scan_size : 10
1548   //   max_read_set_size : 71
1549   //   max_write_set_size : 1
1550   //   num_txn_contexts : 5
1551   typename Database::template TransactionType<abstract_db::HINT_TPCC_PAYMENT>::type txn(txn_flags, this->arena);
1552   scoped_str_arena s_arena(this->arena);
1553   scoped_multilock<spinlock> mlock;
1554   if (g_enable_partition_locks) {
1555     mlock.enq(LockForPartition(warehouse_id));
1556     if (PartitionId(customerWarehouseID) != PartitionId(warehouse_id))
1557       mlock.enq(LockForPartition(customerWarehouseID));
1558     mlock.multilock();
1559   }
1560   if (customerWarehouseID != warehouse_id)
1561     ++evt_tpcc_cross_partition_payment_txns;
1562   try {
1563     ssize_t ret = 0;
1564
1565     const warehouse::key k_w(warehouse_id);
1566     warehouse::value v_w;
1567     ALWAYS_ASSERT(
1568         tables.tbl_warehouse(warehouse_id)->search(txn, k_w, v_w, GUARDED_FIELDS(
1569             warehouse::value::w_ytd_field,
1570             warehouse::value::w_street_1_field,
1571             warehouse::value::w_street_1_field,
1572             warehouse::value::w_city_field,
1573             warehouse::value::w_state_field,
1574             warehouse::value::w_zip_field,
1575             warehouse::value::w_name_field)));
1576     checker::SanityCheckWarehouse(&k_w, &v_w);
1577
1578     v_w.w_ytd += paymentAmount;
1579     tables.tbl_warehouse(warehouse_id)->put(txn, k_w, v_w, GUARDED_FIELDS(warehouse::value::w_ytd_field));
1580
1581     const district::key k_d(warehouse_id, districtID);
1582     district::value v_d;
1583     ALWAYS_ASSERT(tables.tbl_district(warehouse_id)->search(txn, k_d, v_d, GUARDED_FIELDS(
1584             district::value::d_ytd_field,
1585             district::value::d_street_1_field,
1586             district::value::d_street_1_field,
1587             district::value::d_city_field,
1588             district::value::d_state_field,
1589             district::value::d_zip_field,
1590             district::value::d_name_field)));
1591     checker::SanityCheckDistrict(&k_d, &v_d);
1592
1593     v_d.d_ytd += paymentAmount;
1594     tables.tbl_district(warehouse_id)->put(txn, k_d, v_d, GUARDED_FIELDS(district::value::d_ytd_field));
1595
1596     customer::key k_c;
1597     customer::value v_c;
1598     if (RandomNumber(this->r, 1, 100) <= 60) {
1599       // cust by name
1600       uint8_t lastname_buf[CustomerLastNameMaxSize + 1];
1601       static_assert(sizeof(lastname_buf) == 16, "XX");
1602       NDB_MEMSET(lastname_buf, 0, sizeof(lastname_buf));
1603       GetNonUniformCustomerLastNameRun(lastname_buf, r);
1604
1605       static const string zeros(16, 0);
1606       static const string ones(16, 255);
1607
1608       customer_name_idx::key k_c_idx_0;
1609       k_c_idx_0.c_w_id = customerWarehouseID;
1610       k_c_idx_0.c_d_id = customerDistrictID;
1611       k_c_idx_0.c_last.assign((const char *) lastname_buf, 16);
1612       k_c_idx_0.c_first.assign(zeros);
1613
1614       customer_name_idx::key k_c_idx_1;
1615       k_c_idx_1.c_w_id = customerWarehouseID;
1616       k_c_idx_1.c_d_id = customerDistrictID;
1617       k_c_idx_1.c_last.assign((const char *) lastname_buf, 16);
1618       k_c_idx_1.c_first.assign(ones);
1619
1620       // probably a safe bet for now
1621       bytes_static_limit_callback<
1622         typename Database::template IndexType<schema<customer_name_idx>>::type,
1623         NMaxCustomerIdxScanElems, true> c(s_arena.get());
1624       tables.tbl_customer_name_idx(customerWarehouseID)->bytes_search_range_call(
1625         txn, k_c_idx_0, &k_c_idx_1, c);
1626       ALWAYS_ASSERT(c.size() > 0);
1627       INVARIANT(c.size() < NMaxCustomerIdxScanElems); // we should detect this
1628       int index = c.size() / 2;
1629       if (c.size() % 2 == 0)
1630         index--;
1631       evt_avg_cust_name_idx_scan_size.offer(c.size());
1632
1633       customer_name_idx::value v_c_idx_temp;
1634       const customer_name_idx::value *v_c_idx = Decode(c.value(index), v_c_idx_temp);
1635
1636       k_c.c_w_id = customerWarehouseID;
1637       k_c.c_d_id = customerDistrictID;
1638       k_c.c_id = v_c_idx->c_id;
1639       ALWAYS_ASSERT(tables.tbl_customer(customerWarehouseID)->search(txn, k_c, v_c));
1640     } else {
1641       // cust by ID
1642       const uint customerID = GetCustomerId(r);
1643       k_c.c_w_id = customerWarehouseID;
1644       k_c.c_d_id = customerDistrictID;
1645       k_c.c_id = customerID;
1646       ALWAYS_ASSERT(tables.tbl_customer(customerWarehouseID)->search(txn, k_c, v_c));
1647     }
1648     checker::SanityCheckCustomer(&k_c, &v_c);
1649
1650     v_c.c_balance -= paymentAmount;
1651     v_c.c_ytd_payment += paymentAmount;
1652     v_c.c_payment_cnt++;
1653     if (strncmp(v_c.c_credit.data(), "BC", 2) == 0) {
1654       char buf[501];
1655       int n = snprintf(buf, sizeof(buf), "%d %d %d %d %d %f | %s",
1656                        k_c.c_id,
1657                        k_c.c_d_id,
1658                        k_c.c_w_id,
1659                        districtID,
1660                        warehouse_id,
1661                        paymentAmount,
1662                        v_c.c_data.c_str());
1663       v_c.c_data.resize_junk(
1664           min(static_cast<size_t>(n), v_c.c_data.max_size()));
1665       NDB_MEMCPY((void *) v_c.c_data.data(), &buf[0], v_c.c_data.size());
1666     }
1667
1668     tables.tbl_customer(customerWarehouseID)->put(txn, k_c, v_c);
1669
1670     const history::key k_h(k_c.c_d_id, k_c.c_w_id, k_c.c_id, districtID, warehouse_id, ts);
1671     history::value v_h;
1672     v_h.h_amount = paymentAmount;
1673     v_h.h_data.resize_junk(v_h.h_data.max_size());
1674     int n = snprintf((char *) v_h.h_data.data(), v_h.h_data.max_size() + 1,
1675                      "%.10s    %.10s",
1676                      v_w.w_name.c_str(),
1677                      v_d.d_name.c_str());
1678     v_h.h_data.resize_junk(min(static_cast<size_t>(n), v_h.h_data.max_size()));
1679
1680     const size_t history_sz = Size(v_h);
1681     tables.tbl_history(warehouse_id)->insert(txn, k_h, v_h);
1682     ret += history_sz;
1683
1684     //measure_txn_counters(txn, "txn_payment");
1685     if (likely(txn.commit()))
1686       return txn_result(true, ret);
1687   } catch (typename Database::abort_exception_type &e) {
1688     txn.abort();
1689   }
1690   return txn_result(false, 0);
1691 }
1692
1693 template <typename Database>
1694 class order_line_nop_callback : public
1695   Database::template IndexType<schema<order_line>>::type::bytes_search_range_callback {
1696
1697 public:
1698   order_line_nop_callback() : n(0) {}
1699   virtual bool invoke(
1700       const string &key,
1701       const string &value)
1702   {
1703     INVARIANT(key.size() == sizeof(order_line::key));
1704     order_line::value v_ol_temp;
1705     const order_line::value *v_ol UNUSED = Decode(value, v_ol_temp);
1706 #ifdef CHECK_INVARIANTS
1707     order_line::key k_ol_temp;
1708     const order_line::key *k_ol = Decode(key, k_ol_temp);
1709     checker::SanityCheckOrderLine(k_ol, v_ol);
1710 #endif
1711     ++n;
1712     return true;
1713   }
1714   size_t n;
1715 };
1716
1717 STATIC_COUNTER_DECL(scopedperf::tod_ctr, order_status_probe0_tod, order_status_probe0_cg)
1718
1719 template <typename Database, bool AllowReadOnlyScans>
1720 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1721 tpcc_worker<Database, AllowReadOnlyScans>::txn_order_status()
1722 {
1723   const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1724   const uint districtID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1725
1726   // output from txn counters:
1727   //   max_absent_range_set_size : 0
1728   //   max_absent_set_size : 0
1729   //   max_node_scan_size : 13
1730   //   max_read_set_size : 81
1731   //   max_write_set_size : 0
1732   //   num_txn_contexts : 4
1733   const uint64_t read_only_mask =
1734     !AllowReadOnlyScans ? 0 : transaction_base::TXN_FLAG_READ_ONLY;
1735   typename Database::template TransactionType<
1736     AllowReadOnlyScans ?
1737       abstract_db::HINT_TPCC_ORDER_STATUS_READ_ONLY :
1738       abstract_db::HINT_TPCC_ORDER_STATUS>::type txn(
1739           txn_flags | read_only_mask, this->arena);
1740   scoped_str_arena s_arena(this->arena);
1741   // NB: since txn_order_status() is a RO txn, we assume that
1742   // locking is un-necessary (since we can just read from some old snapshot)
1743   try {
1744
1745     customer::key k_c;
1746     customer::value v_c;
1747     if (RandomNumber(this->r, 1, 100) <= 60) {
1748       // cust by name
1749       uint8_t lastname_buf[CustomerLastNameMaxSize + 1];
1750       static_assert(sizeof(lastname_buf) == 16, "xx");
1751       NDB_MEMSET(lastname_buf, 0, sizeof(lastname_buf));
1752       GetNonUniformCustomerLastNameRun(lastname_buf, r);
1753
1754       static const string zeros(16, 0);
1755       static const string ones(16, 255);
1756
1757       customer_name_idx::key k_c_idx_0;
1758       k_c_idx_0.c_w_id = warehouse_id;
1759       k_c_idx_0.c_d_id = districtID;
1760       k_c_idx_0.c_last.assign((const char *) lastname_buf, 16);
1761       k_c_idx_0.c_first.assign(zeros);
1762
1763       customer_name_idx::key k_c_idx_1;
1764       k_c_idx_1.c_w_id = warehouse_id;
1765       k_c_idx_1.c_d_id = districtID;
1766       k_c_idx_1.c_last.assign((const char *) lastname_buf, 16);
1767       k_c_idx_1.c_first.assign(ones);
1768
1769       // NMaxCustomerIdxScanElems is probably a safe bet for now
1770       bytes_static_limit_callback<
1771         typename Database::template IndexType<schema<customer_name_idx>>::type,
1772         NMaxCustomerIdxScanElems, true> c(s_arena.get());
1773       tables.tbl_customer_name_idx(warehouse_id)->bytes_search_range_call(
1774           txn, k_c_idx_0, &k_c_idx_1, c);
1775       ALWAYS_ASSERT(c.size() > 0);
1776       INVARIANT(c.size() < NMaxCustomerIdxScanElems); // we should detect this
1777       int index = c.size() / 2;
1778       if (c.size() % 2 == 0)
1779         index--;
1780       evt_avg_cust_name_idx_scan_size.offer(c.size());
1781
1782       customer_name_idx::value v_c_idx_temp;
1783       const customer_name_idx::value *v_c_idx = Decode(c.value(index), v_c_idx_temp);
1784
1785       k_c.c_w_id = warehouse_id;
1786       k_c.c_d_id = districtID;
1787       k_c.c_id = v_c_idx->c_id;
1788       ALWAYS_ASSERT(tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c));
1789
1790     } else {
1791       // cust by ID
1792       const uint customerID = GetCustomerId(r);
1793       k_c.c_w_id = warehouse_id;
1794       k_c.c_d_id = districtID;
1795       k_c.c_id = customerID;
1796       ALWAYS_ASSERT(tables.tbl_customer(warehouse_id)->search(txn, k_c, v_c));
1797     }
1798     checker::SanityCheckCustomer(&k_c, &v_c);
1799
1800     // XXX(stephentu): HACK- we bound the # of elems returned by this scan to
1801     // 15- this is because we don't have reverse scans. In an ideal system, a
1802     // reverse scan would only need to read 1 btree node. We could simulate a
1803     // lookup by only reading the first element- but then we would *always*
1804     // read the first order by any customer.  To make this more interesting, we
1805     // randomly select which elem to pick within the 1st or 2nd btree nodes.
1806     // This is obviously a deviation from TPC-C, but it shouldn't make that
1807     // much of a difference in terms of performance numbers (in fact we are
1808     // making it worse for us)
1809     latest_key_callback<
1810       typename Database::template IndexType<schema<oorder_c_id_idx>>::type> c_oorder(
1811           *this->arena.next(), (r.next() % 15) + 1);
1812     const oorder_c_id_idx::key k_oo_idx_0(warehouse_id, districtID, k_c.c_id, 0);
1813     const oorder_c_id_idx::key k_oo_idx_1(warehouse_id, districtID, k_c.c_id, numeric_limits<int32_t>::max());
1814     {
1815       ANON_REGION("OrderStatusOOrderScan:", &order_status_probe0_cg);
1816       tables.tbl_oorder_c_id_idx(warehouse_id)->bytes_search_range_call(
1817           txn, k_oo_idx_0, &k_oo_idx_1, c_oorder);
1818     }
1819     ALWAYS_ASSERT(c_oorder.size());
1820
1821     oorder_c_id_idx::key k_oo_idx_temp;
1822     const oorder_c_id_idx::key *k_oo_idx = Decode(c_oorder.kstr(), k_oo_idx_temp);
1823     const uint o_id = k_oo_idx->o_o_id;
1824
1825     // XXX: fix
1826     // XXX(stephentu): what's wrong w/ it?
1827     order_line_nop_callback<Database> c_order_line;
1828     const order_line::key k_ol_0(warehouse_id, districtID, o_id, 0);
1829     const order_line::key k_ol_1(warehouse_id, districtID, o_id, numeric_limits<int32_t>::max());
1830     tables.tbl_order_line(warehouse_id)->bytes_search_range_call(
1831         txn, k_ol_0, &k_ol_1, c_order_line);
1832     ALWAYS_ASSERT(c_order_line.n >= 5 && c_order_line.n <= 15);
1833
1834     //measure_txn_counters(txn, "txn_order_status");
1835     if (likely(txn.commit()))
1836       return txn_result(true, 0);
1837   } catch (typename Database::abort_exception_type &e) {
1838     txn.abort();
1839   }
1840   return txn_result(false, 0);
1841 }
1842
1843 template <typename Database>
1844 class order_line_scan_callback : public
1845   Database::template IndexType<schema<order_line>>::type::bytes_search_range_callback {
1846 public:
1847   order_line_scan_callback() : n(0) {}
1848   virtual bool invoke(
1849       const string &key,
1850       const string &value)
1851   {
1852     INVARIANT(key.size() == sizeof(order_line::key));
1853     order_line::value v_ol;
1854
1855 #ifdef DISABLE_FIELD_SELECTION
1856     const uint64_t mask = numeric_limits<uint64_t>::max();
1857 #else
1858     const uint64_t mask = compute_fields_mask(0);
1859 #endif
1860
1861     typed_txn_btree_<schema<order_line>>::do_record_read(
1862         (const uint8_t *) value.data(), value.size(), mask, &v_ol);
1863
1864 #if defined(CHECK_INVARIANTS) && defined(DISABLE_FIELD_SELECTION)
1865     order_line::key k_ol_temp;
1866     const order_line::key *k_ol = Decode(key, k_ol_temp);
1867     checker::SanityCheckOrderLine(k_ol, &v_ol);
1868 #endif
1869
1870     s_i_ids[v_ol.ol_i_id] = 1;
1871     n++;
1872     return true;
1873   }
1874   size_t n;
1875   small_unordered_map<uint, bool, 512> s_i_ids;
1876 };
1877
1878 STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe0_tod, stock_level_probe0_cg)
1879 STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe1_tod, stock_level_probe1_cg)
1880 STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe2_tod, stock_level_probe2_cg)
1881
1882 static event_avg_counter evt_avg_stock_level_loop_join_lookups("stock_level_loop_join_lookups");
1883
1884 template <typename Database, bool AllowReadOnlyScans>
1885 typename tpcc_worker<Database, AllowReadOnlyScans>::txn_result
1886 tpcc_worker<Database, AllowReadOnlyScans>::txn_stock_level()
1887 {
1888   const uint warehouse_id = PickWarehouseId(this->r, warehouse_id_start, warehouse_id_end);
1889   const uint threshold = RandomNumber(this->r, 10, 20);
1890   const uint districtID = RandomNumber(this->r, 1, NumDistrictsPerWarehouse());
1891
1892   // output from txn counters:
1893   //   max_absent_range_set_size : 0
1894   //   max_absent_set_size : 0
1895   //   max_node_scan_size : 19
1896   //   max_read_set_size : 241
1897   //   max_write_set_size : 0
1898   //   n_node_scan_large_instances : 1
1899   //   n_read_set_large_instances : 2
1900   //   num_txn_contexts : 3
1901   const uint64_t read_only_mask =
1902     !AllowReadOnlyScans ? 0 : transaction_base::TXN_FLAG_READ_ONLY;
1903   typename Database::template TransactionType<
1904     AllowReadOnlyScans ?
1905       abstract_db::HINT_TPCC_STOCK_LEVEL_READ_ONLY :
1906       abstract_db::HINT_TPCC_STOCK_LEVEL>::type txn(
1907           txn_flags | read_only_mask, this->arena);
1908   scoped_str_arena s_arena(this->arena);
1909   // NB: since txn_stock_level() is a RO txn, we assume that
1910   // locking is un-necessary (since we can just read from some old snapshot)
1911   try {
1912     const district::key k_d(warehouse_id, districtID);
1913     district::value v_d;
1914     ALWAYS_ASSERT(tables.tbl_district(warehouse_id)->search(txn, k_d, v_d));
1915     checker::SanityCheckDistrict(&k_d, &v_d);
1916     const uint64_t cur_next_o_id = g_new_order_fast_id_gen ?
1917       NewOrderIdHolder(warehouse_id, districtID).load(memory_order_acquire) :
1918       v_d.d_next_o_id;
1919
1920     // manual joins are fun!
1921     order_line_scan_callback<Database> c;
1922     const int32_t lower = cur_next_o_id >= 20 ? (cur_next_o_id - 20) : 0;
1923     const order_line::key k_ol_0(warehouse_id, districtID, lower, 0);
1924     const order_line::key k_ol_1(warehouse_id, districtID, cur_next_o_id, 0);
1925     {
1926       // mask must be kept in sync w/ order_line_scan_callback
1927 #ifdef DISABLE_FIELD_SELECTION
1928       const size_t nfields = order_line::value::NFIELDS;
1929 #else
1930       const size_t nfields = 1;
1931 #endif
1932       ANON_REGION("StockLevelOrderLineScan:", &stock_level_probe0_cg);
1933       tables.tbl_order_line(warehouse_id)->bytes_search_range_call(
1934           txn, k_ol_0, &k_ol_1, c, nfields);
1935     }
1936     {
1937       small_unordered_map<uint, bool, 512> s_i_ids_distinct;
1938       for (auto &p : c.s_i_ids) {
1939         ANON_REGION("StockLevelLoopJoinIter:", &stock_level_probe1_cg);
1940         const stock::key k_s(warehouse_id, p.first);
1941         stock::value v_s;
1942         INVARIANT(p.first >= 1 && p.first <= NumItems());
1943         {
1944           ANON_REGION("StockLevelLoopJoinGet:", &stock_level_probe2_cg);
1945           ALWAYS_ASSERT(tables.tbl_stock(warehouse_id)->search(txn, k_s, v_s, GUARDED_FIELDS(stock::value::s_quantity_field)));
1946         }
1947         if (v_s.s_quantity < int(threshold))
1948           s_i_ids_distinct[p.first] = 1;
1949       }
1950       evt_avg_stock_level_loop_join_lookups.offer(c.s_i_ids.size());
1951       // NB(stephentu): s_i_ids_distinct.size() is the computed result of this txn
1952     }
1953     //measure_txn_counters(txn, "txn_stock_level");
1954     if (likely(txn.commit()))
1955       return txn_result(true, 0);
1956   } catch (typename Database::abort_exception_type &e) {
1957     txn.abort();
1958   }
1959   return txn_result(false, 0);
1960 }
1961
1962 template <typename T>
1963 static vector<T>
1964 unique_filter(const vector<T> &v)
1965 {
1966   set<T> seen;
1967   vector<T> ret;
1968   for (auto &e : v)
1969     if (!seen.count(e)) {
1970       ret.emplace_back(e);
1971       seen.insert(e);
1972     }
1973   return ret;
1974 }
1975
1976 template <typename Database, bool AllowReadOnlyScans>
1977 class tpcc_bench_runner : public typed_bench_runner<Database> {
1978 private:
1979
1980   static bool
1981   IsTableReadOnly(const char *name)
1982   {
1983     return strcmp("item", name) == 0;
1984   }
1985
1986   static bool
1987   IsTableAppendOnly(const char *name)
1988   {
1989     return strcmp("history", name) == 0 ||
1990            strcmp("oorder_c_id_idx", name) == 0;
1991   }
1992
1993   template <typename Schema>
1994   static vector<shared_ptr<typename Database::template IndexType<Schema>::type>>
1995   OpenTablesForTablespace(Database *db, const char *name, size_t expected_size)
1996   {
1997     const bool is_read_only = IsTableReadOnly(name);
1998     const bool is_append_only = IsTableAppendOnly(name);
1999     const string s_name(name);
2000     vector<typename Database::template IndexType<Schema>::ptr_type> ret(NumWarehouses());
2001     if (g_enable_separate_tree_per_partition && !is_read_only) {
2002       if (NumWarehouses() <= nthreads) {
2003         for (size_t i = 0; i < NumWarehouses(); i++)
2004           ret[i] = db->template open_index<Schema>(s_name + "_" + to_string(i), expected_size, is_append_only);
2005       } else {
2006         const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
2007         for (size_t partid = 0; partid < nthreads; partid++) {
2008           const unsigned wstart = partid * nwhse_per_partition;
2009           const unsigned wend   = (partid + 1 == nthreads) ?
2010             NumWarehouses() : (partid + 1) * nwhse_per_partition;
2011           auto idx =
2012             db->template open_index<Schema>(s_name + "_" + to_string(partid), expected_size, is_append_only);
2013           for (size_t i = wstart; i < wend; i++)
2014             ret[i] = idx;
2015         }
2016       }
2017     } else {
2018       auto idx = db->template open_index<Schema>(s_name, expected_size, is_append_only);
2019       for (size_t i = 0; i < NumWarehouses(); i++)
2020         ret[i] = idx;
2021     }
2022     return ret;
2023   }
2024
2025 public:
2026   tpcc_bench_runner(Database *db)
2027     : typed_bench_runner<Database>(db)
2028   {
2029
2030 #define OPEN_TABLESPACE_X(x) \
2031     do { \
2032       tables.tbl_ ## x ## _vec = OpenTablesForTablespace<schema<x>>(db, #x, sizeof(x::value)); \
2033       auto v = unique_filter(tables.tbl_ ## x ## _vec); \
2034       for (size_t i = 0; i < v.size(); i++) \
2035         this->open_tables[string(#x) + "_" + to_string(i)] = v[i]; \
2036     } while (0);
2037
2038     TPCC_TABLE_LIST(OPEN_TABLESPACE_X);
2039
2040 #undef OPEN_TABLESPACE_X
2041
2042     if (g_enable_partition_locks) {
2043       static_assert(sizeof(aligned_padded_elem<spinlock>) == CACHELINE_SIZE, "xx");
2044       void * const px = memalign(CACHELINE_SIZE, sizeof(aligned_padded_elem<spinlock>) * nthreads);
2045       ALWAYS_ASSERT(px);
2046       ALWAYS_ASSERT(reinterpret_cast<uintptr_t>(px) % CACHELINE_SIZE == 0);
2047       g_partition_locks = reinterpret_cast<aligned_padded_elem<spinlock> *>(px);
2048       for (size_t i = 0; i < nthreads; i++) {
2049         new (&g_partition_locks[i]) aligned_padded_elem<spinlock>();
2050         ALWAYS_ASSERT(!g_partition_locks[i].elem.is_locked());
2051       }
2052     }
2053
2054     if (g_new_order_fast_id_gen) {
2055       void * const px =
2056         memalign(
2057             CACHELINE_SIZE,
2058             sizeof(aligned_padded_elem<atomic<uint64_t>>) *
2059               NumWarehouses() * NumDistrictsPerWarehouse());
2060       g_district_ids = reinterpret_cast<aligned_padded_elem<atomic<uint64_t>> *>(px);
2061       for (size_t i = 0; i < NumWarehouses() * NumDistrictsPerWarehouse(); i++)
2062         new (&g_district_ids[i]) atomic<uint64_t>(3001);
2063     }
2064   }
2065
2066 protected:
2067   virtual vector<unique_ptr<bench_loader>>
2068   make_loaders()
2069   {
2070     vector<unique_ptr<bench_loader>> ret;
2071     ret.emplace_back(new tpcc_warehouse_loader<Database>(9324, this->typed_db(), tables));
2072     ret.emplace_back(new tpcc_item_loader<Database>(235443, this->typed_db(), tables));
2073     if (enable_parallel_loading) {
2074       fast_random r(89785943);
2075       for (uint i = 1; i <= NumWarehouses(); i++)
2076         ret.emplace_back(new tpcc_stock_loader<Database>(r.next(), this->typed_db(), tables, i));
2077     } else {
2078       ret.emplace_back(new tpcc_stock_loader<Database>(89785943, this->typed_db(), tables, -1));
2079     }
2080     ret.emplace_back(new tpcc_district_loader<Database>(129856349, this->typed_db(), tables));
2081     if (enable_parallel_loading) {
2082       fast_random r(923587856425);
2083       for (uint i = 1; i <= NumWarehouses(); i++)
2084         ret.emplace_back(new tpcc_customer_loader<Database>(r.next(), this->typed_db(), tables, i));
2085     } else {
2086       ret.emplace_back(new tpcc_customer_loader<Database>(923587856425, this->typed_db(), tables, -1));
2087     }
2088     if (enable_parallel_loading) {
2089       fast_random r(2343352);
2090       for (uint i = 1; i <= NumWarehouses(); i++)
2091         ret.emplace_back(new tpcc_order_loader<Database>(r.next(), this->typed_db(), tables, i));
2092     } else {
2093       ret.emplace_back(new tpcc_order_loader<Database>(2343352, this->typed_db(), tables, -1));
2094     }
2095     return ret;
2096   }
2097
2098 private:
2099   template <bool RO>
2100   vector<unique_ptr<bench_worker>>
2101   make_workers_impl()
2102   {
2103     const unsigned alignment = coreid::num_cpus_online();
2104     const int blockstart =
2105       coreid::allocate_contiguous_aligned_block(nthreads, alignment);
2106     ALWAYS_ASSERT(blockstart >= 0);
2107     ALWAYS_ASSERT((blockstart % alignment) == 0);
2108     fast_random r(23984543);
2109     vector<unique_ptr<bench_worker>> ret;
2110     if (NumWarehouses() <= nthreads) {
2111       for (size_t i = 0; i < nthreads; i++)
2112         ret.emplace_back(
2113           new tpcc_worker<Database, RO>(
2114             blockstart + i,
2115             r.next(), this->typed_db(), tables,
2116             &this->barrier_a, &this->barrier_b,
2117             (i % NumWarehouses()) + 1, (i % NumWarehouses()) + 2));
2118     } else {
2119       const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
2120       for (size_t i = 0; i < nthreads; i++) {
2121         const unsigned wstart = i * nwhse_per_partition;
2122         const unsigned wend   = (i + 1 == nthreads) ?
2123           NumWarehouses() : (i + 1) * nwhse_per_partition;
2124         ret.emplace_back(
2125           new tpcc_worker<Database, RO>(
2126             blockstart + i,
2127             r.next(), this->typed_db(), tables,
2128             &this->barrier_a, &this->barrier_b, wstart+1, wend+2));
2129       }
2130     }
2131     return ret;
2132   }
2133
2134 protected:
2135   virtual vector<unique_ptr<bench_worker>>
2136   make_workers()
2137   {
2138     return g_disable_read_only_scans ? make_workers_impl<false>() : make_workers_impl<true>();
2139   }
2140
2141 private:
2142   tpcc_tables<Database> tables;
2143 };
2144
2145 template <typename Database>
2146 static unique_ptr<bench_runner>
2147 MakeBenchRunner(Database *db)
2148 {
2149   return unique_ptr<bench_runner>(
2150     g_disable_read_only_scans ?
2151       static_cast<bench_runner *>(new tpcc_bench_runner<Database, false>(db)) :
2152       static_cast<bench_runner *>(new tpcc_bench_runner<Database, true>(db)));
2153 }
2154
2155 void
2156 tpcc_do_test(const string &dbtype,
2157              const persistconfig &cfg,
2158              int argc, char **argv)
2159 {
2160   // parse options
2161   optind = 1;
2162   bool did_spec_remote_pct = false;
2163   while (1) {
2164     static struct option long_options[] =
2165     {
2166       {"disable-cross-partition-transactions" , no_argument       , &g_disable_xpartition_txn             , 1}   ,
2167       {"disable-read-only-snapshots"          , no_argument       , &g_disable_read_only_scans            , 1}   ,
2168       {"enable-partition-locks"               , no_argument       , &g_enable_partition_locks             , 1}   ,
2169       {"enable-separate-tree-per-partition"   , no_argument       , &g_enable_separate_tree_per_partition , 1}   ,
2170       {"new-order-remote-item-pct"            , required_argument , 0                                     , 'r'} ,
2171       {"new-order-fast-id-gen"                , no_argument       , &g_new_order_fast_id_gen              , 1}   ,
2172       {"uniform-item-dist"                    , no_argument       , &g_uniform_item_dist                  , 1}   ,
2173       {"workload-mix"                         , required_argument , 0                                     , 'w'} ,
2174       {0, 0, 0, 0}
2175     };
2176     int option_index = 0;
2177     int c = getopt_long(argc, argv, "r:", long_options, &option_index);
2178     if (c == -1)
2179       break;
2180     switch (c) {
2181     case 0:
2182       if (long_options[option_index].flag != 0)
2183         break;
2184       abort();
2185       break;
2186
2187     case 'r':
2188       g_new_order_remote_item_pct = strtoul(optarg, NULL, 10);
2189       ALWAYS_ASSERT(g_new_order_remote_item_pct >= 0 && g_new_order_remote_item_pct <= 100);
2190       did_spec_remote_pct = true;
2191       break;
2192
2193     case 'w':
2194       {
2195         const vector<string> toks = split(optarg, ',');
2196         ALWAYS_ASSERT(toks.size() == ARRAY_NELEMS(g_txn_workload_mix));
2197         unsigned s = 0;
2198         for (size_t i = 0; i < toks.size(); i++) {
2199           unsigned p = strtoul(toks[i].c_str(), nullptr, 10);
2200           ALWAYS_ASSERT(p >= 0 && p <= 100);
2201           s += p;
2202           g_txn_workload_mix[i] = p;
2203         }
2204         ALWAYS_ASSERT(s == 100);
2205       }
2206       break;
2207
2208     case '?':
2209       /* getopt_long already printed an error message. */
2210       exit(1);
2211
2212     default:
2213       abort();
2214     }
2215   }
2216
2217   if (did_spec_remote_pct && g_disable_xpartition_txn) {
2218     cerr << "WARNING: --new-order-remote-item-pct given with --disable-cross-partition-transactions" << endl;
2219     cerr << "  --new-order-remote-item-pct will have no effect" << endl;
2220   }
2221
2222   if (verbose) {
2223     cerr << "tpcc settings:" << endl;
2224     cerr << "  cross_partition_transactions : " << !g_disable_xpartition_txn << endl;
2225     cerr << "  read_only_snapshots          : " << !g_disable_read_only_scans << endl;
2226     cerr << "  partition_locks              : " << g_enable_partition_locks << endl;
2227     cerr << "  separate_tree_per_partition  : " << g_enable_separate_tree_per_partition << endl;
2228     cerr << "  new_order_remote_item_pct    : " << g_new_order_remote_item_pct << endl;
2229     cerr << "  new_order_fast_id_gen        : " << g_new_order_fast_id_gen << endl;
2230     cerr << "  uniform_item_dist            : " << g_uniform_item_dist << endl;
2231     cerr << "  workload_mix                 : " <<
2232       format_list(g_txn_workload_mix,
2233                   g_txn_workload_mix + ARRAY_NELEMS(g_txn_workload_mix)) << endl;
2234   }
2235
2236   unique_ptr<abstract_db> db;
2237   unique_ptr<bench_runner> r;
2238
2239   if (dbtype == "ndb-proto2") {
2240     if (!cfg.logfiles_.empty()) {
2241       vector<vector<unsigned>> assignments_used;
2242       txn_logger::Init(
2243           nthreads, cfg.logfiles_, cfg.assignments_, &assignments_used,
2244           !cfg.nofsync_,
2245           cfg.do_compress_,
2246           cfg.fake_writes_);
2247       if (verbose) {
2248         cerr << "[logging subsystem]" << endl;
2249         cerr << "  assignments: " << assignments_used  << endl;
2250         cerr << "  call fsync : " << !cfg.nofsync_     << endl;
2251         cerr << "  compression: " << cfg.do_compress_  << endl;
2252         cerr << "  fake_writes: " << cfg.fake_writes_  << endl;
2253       }
2254     }
2255 #ifdef PROTO2_CAN_DISABLE_GC
2256     if (!cfg.disable_gc_)
2257       transaction_proto2_static::InitGC();
2258 #endif
2259 #ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
2260     if (cfg.disable_snapshots_)
2261       transaction_proto2_static::DisableSnapshots();
2262 #endif
2263     typedef ndb_database<transaction_proto2> Database;
2264     Database *raw = new Database;
2265     db.reset(raw);
2266     r = MakeBenchRunner(raw);
2267   } else if (dbtype == "kvdb-st") {
2268     typedef kvdb_database<false> Database;
2269     Database *raw = new Database;
2270     db.reset(raw);
2271     r = MakeBenchRunner(raw);
2272   } else
2273     ALWAYS_ASSERT(false);
2274
2275   r->run();
2276 }