benchmark silo added
[c11concurrency-benchmarks.git] / silo / benchmarks / tpcc.cc
diff --git a/silo/benchmarks/tpcc.cc b/silo/benchmarks/tpcc.cc
new file mode 100644 (file)
index 0000000..7619c4a
--- /dev/null
@@ -0,0 +1,2214 @@
+/**
+ * An implementation of TPC-C based off of:
+ * https://github.com/oltpbenchmark/oltpbench/tree/master/src/com/oltpbenchmark/benchmarks/tpcc
+ */
+
+#include <sys/time.h>
+#include <string>
+#include <ctype.h>
+#include <stdlib.h>
+#include <malloc.h>
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <getopt.h>
+
+#include <set>
+#include <vector>
+
+#include "../txn.h"
+#include "../macros.h"
+#include "../scopedperf.hh"
+#include "../spinlock.h"
+
+#include "bench.h"
+#include "tpcc.h"
+using namespace std;
+using namespace util;
+
+#define TPCC_TABLE_LIST(x) \
+  x(customer) \
+  x(customer_name_idx) \
+  x(district) \
+  x(history) \
+  x(item) \
+  x(new_order) \
+  x(oorder) \
+  x(oorder_c_id_idx) \
+  x(order_line) \
+  x(stock) \
+  x(stock_data) \
+  x(warehouse)
+
+static inline ALWAYS_INLINE size_t
+NumWarehouses()
+{
+  return (size_t) scale_factor;
+}
+
+// config constants
+
+static constexpr inline ALWAYS_INLINE size_t
+NumItems()
+{
+  return 100000;
+}
+
+static constexpr inline ALWAYS_INLINE size_t
+NumDistrictsPerWarehouse()
+{
+  return 10;
+}
+
+static constexpr inline ALWAYS_INLINE size_t
+NumCustomersPerDistrict()
+{
+  return 3000;
+}
+
+// T must implement lock()/unlock(). Both must *not* throw exceptions
+template <typename T>
+class scoped_multilock {
+public:
+  inline scoped_multilock()
+    : did_lock(false)
+  {
+  }
+
+  inline ~scoped_multilock()
+  {
+    if (did_lock)
+      for (auto &t : locks)
+        t->unlock();
+  }
+
+  inline void
+  enq(T &t)
+  {
+    ALWAYS_ASSERT(!did_lock);
+    locks.emplace_back(&t);
+  }
+
+  inline void
+  multilock()
+  {
+    ALWAYS_ASSERT(!did_lock);
+    if (locks.size() > 1)
+      sort(locks.begin(), locks.end());
+#ifdef CHECK_INVARIANTS
+    if (set<T *>(locks.begin(), locks.end()).size() != locks.size()) {
+      for (auto &t : locks)
+        cerr << "lock: " << hexify(t) << endl;
+      INVARIANT(false && "duplicate locks found");
+    }
+#endif
+    for (auto &t : locks)
+      t->lock();
+    did_lock = true;
+  }
+
+private:
+  bool did_lock;
+  typename util::vec<T *, 64>::type locks;
+};
+
+// like a lock_guard, but has the option of not acquiring
+template <typename T>
+class scoped_lock_guard {
+public:
+  inline scoped_lock_guard(T &l)
+    : l(&l)
+  {
+    this->l->lock();
+  }
+
+  inline scoped_lock_guard(T *l)
+    : l(l)
+  {
+    if (this->l)
+      this->l->lock();
+  }
+
+  inline ~scoped_lock_guard()
+  {
+    if (l)
+      l->unlock();
+  }
+
+private:
+  T *l;
+};
+
+// configuration flags
+static int g_disable_xpartition_txn = 0;
+static int g_disable_read_only_scans = 0;
+static int g_enable_partition_locks = 0;
+static int g_enable_separate_tree_per_partition = 0;
+static int g_new_order_remote_item_pct = 1;
+static int g_new_order_fast_id_gen = 0;
+static int g_uniform_item_dist = 0;
+static int g_order_status_scan_hack = 0;
+static unsigned g_txn_workload_mix[] = { 45, 43, 4, 4, 4 }; // default TPC-C workload mix
+
+static aligned_padded_elem<spinlock> *g_partition_locks = nullptr;
+static aligned_padded_elem<atomic<uint64_t>> *g_district_ids = nullptr;
+
+// maps a wid => partition id
+static inline ALWAYS_INLINE unsigned int
+PartitionId(unsigned int wid)
+{
+  INVARIANT(wid >= 1 && wid <= NumWarehouses());
+  wid -= 1; // 0-idx
+  if (NumWarehouses() <= nthreads)
+    // more workers than partitions, so its easy
+    return wid;
+  const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
+  const unsigned partid = wid / nwhse_per_partition;
+  if (partid >= nthreads)
+    return nthreads - 1;
+  return partid;
+}
+
+static inline ALWAYS_INLINE spinlock &
+LockForPartition(unsigned int wid)
+{
+  INVARIANT(g_enable_partition_locks);
+  return g_partition_locks[PartitionId(wid)].elem;
+}
+
+static inline atomic<uint64_t> &
+NewOrderIdHolder(unsigned warehouse, unsigned district)
+{
+  INVARIANT(warehouse >= 1 && warehouse <= NumWarehouses());
+  INVARIANT(district >= 1 && district <= NumDistrictsPerWarehouse());
+  const unsigned idx =
+    (warehouse - 1) * NumDistrictsPerWarehouse() + (district - 1);
+  return g_district_ids[idx].elem;
+}
+
+static inline uint64_t
+FastNewOrderIdGen(unsigned warehouse, unsigned district)
+{
+  return NewOrderIdHolder(warehouse, district).fetch_add(1, memory_order_acq_rel);
+}
+
+struct checker {
+  // these sanity checks are just a few simple checks to make sure
+  // the data is not entirely corrupted
+
+  static inline ALWAYS_INLINE void
+  SanityCheckCustomer(const customer::key *k, const customer::value *v)
+  {
+    INVARIANT(k->c_w_id >= 1 && static_cast<size_t>(k->c_w_id) <= NumWarehouses());
+    INVARIANT(k->c_d_id >= 1 && static_cast<size_t>(k->c_d_id) <= NumDistrictsPerWarehouse());
+    INVARIANT(k->c_id >= 1 && static_cast<size_t>(k->c_id) <= NumCustomersPerDistrict());
+    INVARIANT(v->c_credit == "BC" || v->c_credit == "GC");
+    INVARIANT(v->c_middle == "OE");
+  }
+
+  static inline ALWAYS_INLINE void
+  SanityCheckWarehouse(const warehouse::key *k, const warehouse::value *v)
+  {
+    INVARIANT(k->w_id >= 1 && static_cast<size_t>(k->w_id) <= NumWarehouses());
+    INVARIANT(v->w_state.size() == 2);
+    INVARIANT(v->w_zip == "123456789");
+  }
+
+  static inline ALWAYS_INLINE void
+  SanityCheckDistrict(const district::key *k, const district::value *v)
+  {
+    INVARIANT(k->d_w_id >= 1 && static_cast<size_t>(k->d_w_id) <= NumWarehouses());
+    INVARIANT(k->d_id >= 1 && static_cast<size_t>(k->d_id) <= NumDistrictsPerWarehouse());
+    INVARIANT(v->d_next_o_id >= 3001);
+    INVARIANT(v->d_state.size() == 2);
+    INVARIANT(v->d_zip == "123456789");
+  }
+
+  static inline ALWAYS_INLINE void
+  SanityCheckItem(const item::key *k, const item::value *v)
+  {
+    INVARIANT(k->i_id >= 1 && static_cast<size_t>(k->i_id) <= NumItems());
+    INVARIANT(v->i_price >= 1.0 && v->i_price <= 100.0);
+  }
+
+  static inline ALWAYS_INLINE void
+  SanityCheckStock(const stock::key *k, const stock::value *v)
+  {
+    INVARIANT(k->s_w_id >= 1 && static_cast<size_t>(k->s_w_id) <= NumWarehouses());
+    INVARIANT(k->s_i_id >= 1 && static_cast<size_t>(k->s_i_id) <= NumItems());
+  }
+
+  static inline ALWAYS_INLINE void
+  SanityCheckNewOrder(const new_order::key *k, const new_order::value *v)
+  {
+    INVARIANT(k->no_w_id >= 1 && static_cast<size_t>(k->no_w_id) <= NumWarehouses());
+    INVARIANT(k->no_d_id >= 1 && static_cast<size_t>(k->no_d_id) <= NumDistrictsPerWarehouse());
+  }
+
+  static inline ALWAYS_INLINE void
+  SanityCheckOOrder(const oorder::key *k, const oorder::value *v)
+  {
+    INVARIANT(k->o_w_id >= 1 && static_cast<size_t>(k->o_w_id) <= NumWarehouses());
+    INVARIANT(k->o_d_id >= 1 && static_cast<size_t>(k->o_d_id) <= NumDistrictsPerWarehouse());
+    INVARIANT(v->o_c_id >= 1 && static_cast<size_t>(v->o_c_id) <= NumCustomersPerDistrict());
+    INVARIANT(v->o_carrier_id >= 0 && static_cast<size_t>(v->o_carrier_id) <= NumDistrictsPerWarehouse());
+    INVARIANT(v->o_ol_cnt >= 5 && v->o_ol_cnt <= 15);
+  }
+
+  static inline ALWAYS_INLINE void
+  SanityCheckOrderLine(const order_line::key *k, const order_line::value *v)
+  {
+    INVARIANT(k->ol_w_id >= 1 && static_cast<size_t>(k->ol_w_id) <= NumWarehouses());
+    INVARIANT(k->ol_d_id >= 1 && static_cast<size_t>(k->ol_d_id) <= NumDistrictsPerWarehouse());
+    INVARIANT(k->ol_number >= 1 && k->ol_number <= 15);
+    INVARIANT(v->ol_i_id >= 1 && static_cast<size_t>(v->ol_i_id) <= NumItems());
+  }
+
+};
+
+
+struct _dummy {}; // exists so we can inherit from it, so we can use a macro in
+                  // an init list...
+
+class tpcc_worker_mixin : private _dummy {
+
+#define DEFN_TBL_INIT_X(name) \
+  , tbl_ ## name ## _vec(partitions.at(#name))
+
+public:
+  tpcc_worker_mixin(const map<string, vector<abstract_ordered_index *>> &partitions) :
+    _dummy() // so hacky...
+    TPCC_TABLE_LIST(DEFN_TBL_INIT_X)
+  {
+    ALWAYS_ASSERT(NumWarehouses() >= 1);
+  }
+
+#undef DEFN_TBL_INIT_X
+
+protected:
+
+#define DEFN_TBL_ACCESSOR_X(name) \
+private:  \
+  vector<abstract_ordered_index *> tbl_ ## name ## _vec; \
+protected: \
+  inline ALWAYS_INLINE abstract_ordered_index * \
+  tbl_ ## name (unsigned int wid) \
+  { \
+    INVARIANT(wid >= 1 && wid <= NumWarehouses()); \
+    INVARIANT(tbl_ ## name ## _vec.size() == NumWarehouses()); \
+    return tbl_ ## name ## _vec[wid - 1]; \
+  }
+
+  TPCC_TABLE_LIST(DEFN_TBL_ACCESSOR_X)
+
+#undef DEFN_TBL_ACCESSOR_X
+
+  // only TPCC loaders need to call this- workers are automatically
+  // pinned by their worker id (which corresponds to warehouse id
+  // in TPCC)
+  //
+  // pins the *calling* thread
+  static void
+  PinToWarehouseId(unsigned int wid)
+  {
+    const unsigned int partid = PartitionId(wid);
+    ALWAYS_ASSERT(partid < nthreads);
+    const unsigned int pinid  = partid;
+    if (verbose)
+      cerr << "PinToWarehouseId(): coreid=" << coreid::core_id()
+           << " pinned to whse=" << wid << " (partid=" << partid << ")"
+           << endl;
+    rcu::s_instance.pin_current_thread(pinid);
+    rcu::s_instance.fault_region();
+  }
+
+public:
+
+  static inline uint32_t
+  GetCurrentTimeMillis()
+  {
+    //struct timeval tv;
+    //ALWAYS_ASSERT(gettimeofday(&tv, 0) == 0);
+    //return tv.tv_sec * 1000;
+
+    // XXX(stephentu): implement a scalable GetCurrentTimeMillis()
+    // for now, we just give each core an increasing number
+
+    static __thread uint32_t tl_hack = 0;
+    return tl_hack++;
+  }
+
+  // utils for generating random #s and strings
+
+  static inline ALWAYS_INLINE int
+  CheckBetweenInclusive(int v, int lower, int upper)
+  {
+    INVARIANT(v >= lower);
+    INVARIANT(v <= upper);
+    return v;
+  }
+
+  static inline ALWAYS_INLINE int
+  RandomNumber(fast_random &r, int min, int max)
+  {
+    return CheckBetweenInclusive((int) (r.next_uniform() * (max - min + 1) + min), min, max);
+  }
+
+  static inline ALWAYS_INLINE int
+  NonUniformRandom(fast_random &r, int A, int C, int min, int max)
+  {
+    return (((RandomNumber(r, 0, A) | RandomNumber(r, min, max)) + C) % (max - min + 1)) + min;
+  }
+
+  static inline ALWAYS_INLINE int
+  GetItemId(fast_random &r)
+  {
+    return CheckBetweenInclusive(
+        g_uniform_item_dist ?
+          RandomNumber(r, 1, NumItems()) :
+          NonUniformRandom(r, 8191, 7911, 1, NumItems()),
+        1, NumItems());
+  }
+
+  static inline ALWAYS_INLINE int
+  GetCustomerId(fast_random &r)
+  {
+    return CheckBetweenInclusive(NonUniformRandom(r, 1023, 259, 1, NumCustomersPerDistrict()), 1, NumCustomersPerDistrict());
+  }
+
+  // pick a number between [start, end)
+  static inline ALWAYS_INLINE unsigned
+  PickWarehouseId(fast_random &r, unsigned start, unsigned end)
+  {
+    INVARIANT(start < end);
+    const unsigned diff = end - start;
+    if (diff == 1)
+      return start;
+    return (r.next() % diff) + start;
+  }
+
+  static string NameTokens[];
+
+  // all tokens are at most 5 chars long
+  static const size_t CustomerLastNameMaxSize = 5 * 3;
+
+  static inline size_t
+  GetCustomerLastName(uint8_t *buf, fast_random &r, int num)
+  {
+    const string &s0 = NameTokens[num / 100];
+    const string &s1 = NameTokens[(num / 10) % 10];
+    const string &s2 = NameTokens[num % 10];
+    uint8_t *const begin = buf;
+    const size_t s0_sz = s0.size();
+    const size_t s1_sz = s1.size();
+    const size_t s2_sz = s2.size();
+    NDB_MEMCPY(buf, s0.data(), s0_sz); buf += s0_sz;
+    NDB_MEMCPY(buf, s1.data(), s1_sz); buf += s1_sz;
+    NDB_MEMCPY(buf, s2.data(), s2_sz); buf += s2_sz;
+    return buf - begin;
+  }
+
+  static inline ALWAYS_INLINE size_t
+  GetCustomerLastName(char *buf, fast_random &r, int num)
+  {
+    return GetCustomerLastName((uint8_t *) buf, r, num);
+  }
+
+  static inline string
+  GetCustomerLastName(fast_random &r, int num)
+  {
+    string ret;
+    ret.resize(CustomerLastNameMaxSize);
+    ret.resize(GetCustomerLastName((uint8_t *) &ret[0], r, num));
+    return ret;
+  }
+
+  static inline ALWAYS_INLINE string
+  GetNonUniformCustomerLastNameLoad(fast_random &r)
+  {
+    return GetCustomerLastName(r, NonUniformRandom(r, 255, 157, 0, 999));
+  }
+
+  static inline ALWAYS_INLINE size_t
+  GetNonUniformCustomerLastNameRun(uint8_t *buf, fast_random &r)
+  {
+    return GetCustomerLastName(buf, r, NonUniformRandom(r, 255, 223, 0, 999));
+  }
+
+  static inline ALWAYS_INLINE size_t
+  GetNonUniformCustomerLastNameRun(char *buf, fast_random &r)
+  {
+    return GetNonUniformCustomerLastNameRun((uint8_t *) buf, r);
+  }
+
+  static inline ALWAYS_INLINE string
+  GetNonUniformCustomerLastNameRun(fast_random &r)
+  {
+    return GetCustomerLastName(r, NonUniformRandom(r, 255, 223, 0, 999));
+  }
+
+  // following oltpbench, we really generate strings of len - 1...
+  static inline string
+  RandomStr(fast_random &r, uint len)
+  {
+    // this is a property of the oltpbench implementation...
+    if (!len)
+      return "";
+
+    uint i = 0;
+    string buf(len - 1, 0);
+    while (i < (len - 1)) {
+      const char c = (char) r.next_char();
+      // XXX(stephentu): oltpbench uses java's Character.isLetter(), which
+      // is a less restrictive filter than isalnum()
+      if (!isalnum(c))
+        continue;
+      buf[i++] = c;
+    }
+    return buf;
+  }
+
+  // RandomNStr() actually produces a string of length len
+  static inline string
+  RandomNStr(fast_random &r, uint len)
+  {
+    const char base = '0';
+    string buf(len, 0);
+    for (uint i = 0; i < len; i++)
+      buf[i] = (char)(base + (r.next() % 10));
+    return buf;
+  }
+};
+
+string tpcc_worker_mixin::NameTokens[] =
+  {
+    string("BAR"),
+    string("OUGHT"),
+    string("ABLE"),
+    string("PRI"),
+    string("PRES"),
+    string("ESE"),
+    string("ANTI"),
+    string("CALLY"),
+    string("ATION"),
+    string("EING"),
+  };
+
+STATIC_COUNTER_DECL(scopedperf::tsc_ctr, tpcc_txn, tpcc_txn_cg)
+
+class tpcc_worker : public bench_worker, public tpcc_worker_mixin {
+public:
+  // resp for [warehouse_id_start, warehouse_id_end)
+  tpcc_worker(unsigned int worker_id,
+              unsigned long seed, abstract_db *db,
+              const map<string, abstract_ordered_index *> &open_tables,
+              const map<string, vector<abstract_ordered_index *>> &partitions,
+              spin_barrier *barrier_a, spin_barrier *barrier_b,
+              uint warehouse_id_start, uint warehouse_id_end)
+    : bench_worker(worker_id, true, seed, db,
+                   open_tables, barrier_a, barrier_b),
+      tpcc_worker_mixin(partitions),
+      warehouse_id_start(warehouse_id_start),
+      warehouse_id_end(warehouse_id_end)
+  {
+    INVARIANT(warehouse_id_start >= 1);
+    INVARIANT(warehouse_id_start <= NumWarehouses());
+    INVARIANT(warehouse_id_end > warehouse_id_start);
+    INVARIANT(warehouse_id_end <= (NumWarehouses() + 1));
+    NDB_MEMSET(&last_no_o_ids[0], 0, sizeof(last_no_o_ids));
+    if (verbose) {
+      cerr << "tpcc: worker id " << worker_id
+        << " => warehouses [" << warehouse_id_start
+        << ", " << warehouse_id_end << ")"
+        << endl;
+    }
+    obj_key0.reserve(str_arena::MinStrReserveLength);
+    obj_key1.reserve(str_arena::MinStrReserveLength);
+    obj_v.reserve(str_arena::MinStrReserveLength);
+  }
+
+  // XXX(stephentu): tune this
+  static const size_t NMaxCustomerIdxScanElems = 512;
+
+  txn_result txn_new_order();
+
+  static txn_result
+  TxnNewOrder(bench_worker *w)
+  {
+    ANON_REGION("TxnNewOrder:", &tpcc_txn_cg);
+    return static_cast<tpcc_worker *>(w)->txn_new_order();
+  }
+
+  txn_result txn_delivery();
+
+  static txn_result
+  TxnDelivery(bench_worker *w)
+  {
+    ANON_REGION("TxnDelivery:", &tpcc_txn_cg);
+    return static_cast<tpcc_worker *>(w)->txn_delivery();
+  }
+
+  txn_result txn_payment();
+
+  static txn_result
+  TxnPayment(bench_worker *w)
+  {
+    ANON_REGION("TxnPayment:", &tpcc_txn_cg);
+    return static_cast<tpcc_worker *>(w)->txn_payment();
+  }
+
+  txn_result txn_order_status();
+
+  static txn_result
+  TxnOrderStatus(bench_worker *w)
+  {
+    ANON_REGION("TxnOrderStatus:", &tpcc_txn_cg);
+    return static_cast<tpcc_worker *>(w)->txn_order_status();
+  }
+
+  txn_result txn_stock_level();
+
+  static txn_result
+  TxnStockLevel(bench_worker *w)
+  {
+    ANON_REGION("TxnStockLevel:", &tpcc_txn_cg);
+    return static_cast<tpcc_worker *>(w)->txn_stock_level();
+  }
+
+  virtual workload_desc_vec
+  get_workload() const
+  {
+    workload_desc_vec w;
+    // numbers from sigmod.csail.mit.edu:
+    //w.push_back(workload_desc("NewOrder", 1.0, TxnNewOrder)); // ~10k ops/sec
+    //w.push_back(workload_desc("Payment", 1.0, TxnPayment)); // ~32k ops/sec
+    //w.push_back(workload_desc("Delivery", 1.0, TxnDelivery)); // ~104k ops/sec
+    //w.push_back(workload_desc("OrderStatus", 1.0, TxnOrderStatus)); // ~33k ops/sec
+    //w.push_back(workload_desc("StockLevel", 1.0, TxnStockLevel)); // ~2k ops/sec
+    unsigned m = 0;
+    for (size_t i = 0; i < ARRAY_NELEMS(g_txn_workload_mix); i++)
+      m += g_txn_workload_mix[i];
+    ALWAYS_ASSERT(m == 100);
+    if (g_txn_workload_mix[0])
+      w.push_back(workload_desc("NewOrder", double(g_txn_workload_mix[0])/100.0, TxnNewOrder));
+    if (g_txn_workload_mix[1])
+      w.push_back(workload_desc("Payment", double(g_txn_workload_mix[1])/100.0, TxnPayment));
+    if (g_txn_workload_mix[2])
+      w.push_back(workload_desc("Delivery", double(g_txn_workload_mix[2])/100.0, TxnDelivery));
+    if (g_txn_workload_mix[3])
+      w.push_back(workload_desc("OrderStatus", double(g_txn_workload_mix[3])/100.0, TxnOrderStatus));
+    if (g_txn_workload_mix[4])
+      w.push_back(workload_desc("StockLevel", double(g_txn_workload_mix[4])/100.0, TxnStockLevel));
+    return w;
+  }
+
+protected:
+
+  virtual void
+  on_run_setup() OVERRIDE
+  {
+    if (!pin_cpus)
+      return;
+    const size_t a = worker_id % coreid::num_cpus_online();
+    const size_t b = a % nthreads;
+    rcu::s_instance.pin_current_thread(b);
+    rcu::s_instance.fault_region();
+  }
+
+  inline ALWAYS_INLINE string &
+  str()
+  {
+    return *arena.next();
+  }
+
+private:
+  const uint warehouse_id_start;
+  const uint warehouse_id_end;
+  int32_t last_no_o_ids[10]; // XXX(stephentu): hack
+
+  // some scratch buffer space
+  string obj_key0;
+  string obj_key1;
+  string obj_v;
+};
+
+class tpcc_warehouse_loader : public bench_loader, public tpcc_worker_mixin {
+public:
+  tpcc_warehouse_loader(unsigned long seed,
+                        abstract_db *db,
+                        const map<string, abstract_ordered_index *> &open_tables,
+                        const map<string, vector<abstract_ordered_index *>> &partitions)
+    : bench_loader(seed, db, open_tables),
+      tpcc_worker_mixin(partitions)
+  {}
+
+protected:
+  virtual void
+  load()
+  {
+    string obj_buf;
+    void *txn = db->new_txn(txn_flags, arena, txn_buf());
+    uint64_t warehouse_total_sz = 0, n_warehouses = 0;
+    try {
+      vector<warehouse::value> warehouses;
+      for (uint i = 1; i <= NumWarehouses(); i++) {
+        const warehouse::key k(i);
+
+        const string w_name = RandomStr(r, RandomNumber(r, 6, 10));
+        const string w_street_1 = RandomStr(r, RandomNumber(r, 10, 20));
+        const string w_street_2 = RandomStr(r, RandomNumber(r, 10, 20));
+        const string w_city = RandomStr(r, RandomNumber(r, 10, 20));
+        const string w_state = RandomStr(r, 3);
+        const string w_zip = "123456789";
+
+        warehouse::value v;
+        v.w_ytd = 300000;
+        v.w_tax = (float) RandomNumber(r, 0, 2000) / 10000.0;
+        v.w_name.assign(w_name);
+        v.w_street_1.assign(w_street_1);
+        v.w_street_2.assign(w_street_2);
+        v.w_city.assign(w_city);
+        v.w_state.assign(w_state);
+        v.w_zip.assign(w_zip);
+
+        checker::SanityCheckWarehouse(&k, &v);
+        const size_t sz = Size(v);
+        warehouse_total_sz += sz;
+        n_warehouses++;
+        tbl_warehouse(i)->insert(txn, Encode(k), Encode(obj_buf, v));
+
+        warehouses.push_back(v);
+      }
+      ALWAYS_ASSERT(db->commit_txn(txn));
+      arena.reset();
+      txn = db->new_txn(txn_flags, arena, txn_buf());
+      for (uint i = 1; i <= NumWarehouses(); i++) {
+        const warehouse::key k(i);
+        string warehouse_v;
+        ALWAYS_ASSERT(tbl_warehouse(i)->get(txn, Encode(k), warehouse_v));
+        warehouse::value warehouse_temp;
+        const warehouse::value *v = Decode(warehouse_v, warehouse_temp);
+        ALWAYS_ASSERT(warehouses[i - 1] == *v);
+
+        checker::SanityCheckWarehouse(&k, v);
+      }
+      ALWAYS_ASSERT(db->commit_txn(txn));
+    } catch (abstract_db::abstract_abort_exception &ex) {
+      // shouldn't abort on loading!
+      ALWAYS_ASSERT(false);
+    }
+    if (verbose) {
+      cerr << "[INFO] finished loading warehouse" << endl;
+      cerr << "[INFO]   * average warehouse record length: "
+           << (double(warehouse_total_sz)/double(n_warehouses)) << " bytes" << endl;
+    }
+  }
+};
+
+class tpcc_item_loader : public bench_loader, public tpcc_worker_mixin {
+public:
+  tpcc_item_loader(unsigned long seed,
+                   abstract_db *db,
+                   const map<string, abstract_ordered_index *> &open_tables,
+                   const map<string, vector<abstract_ordered_index *>> &partitions)
+    : bench_loader(seed, db, open_tables),
+      tpcc_worker_mixin(partitions)
+  {}
+
+protected:
+  virtual void
+  load()
+  {
+    string obj_buf;
+    const ssize_t bsize = db->txn_max_batch_size();
+    void *txn = db->new_txn(txn_flags, arena, txn_buf());
+    uint64_t total_sz = 0;
+    try {
+      for (uint i = 1; i <= NumItems(); i++) {
+        // items don't "belong" to a certain warehouse, so no pinning
+        const item::key k(i);
+
+        item::value v;
+        const string i_name = RandomStr(r, RandomNumber(r, 14, 24));
+        v.i_name.assign(i_name);
+        v.i_price = (float) RandomNumber(r, 100, 10000) / 100.0;
+        const int len = RandomNumber(r, 26, 50);
+        if (RandomNumber(r, 1, 100) > 10) {
+          const string i_data = RandomStr(r, len);
+          v.i_data.assign(i_data);
+        } else {
+          const int startOriginal = RandomNumber(r, 2, (len - 8));
+          const string i_data = RandomStr(r, startOriginal + 1) + "ORIGINAL" + RandomStr(r, len - startOriginal - 7);
+          v.i_data.assign(i_data);
+        }
+        v.i_im_id = RandomNumber(r, 1, 10000);
+
+        checker::SanityCheckItem(&k, &v);
+        const size_t sz = Size(v);
+        total_sz += sz;
+        tbl_item(1)->insert(txn, Encode(k), Encode(obj_buf, v)); // this table is shared, so any partition is OK
+
+        if (bsize != -1 && !(i % bsize)) {
+          ALWAYS_ASSERT(db->commit_txn(txn));
+          txn = db->new_txn(txn_flags, arena, txn_buf());
+          arena.reset();
+        }
+      }
+      ALWAYS_ASSERT(db->commit_txn(txn));
+    } catch (abstract_db::abstract_abort_exception &ex) {
+      // shouldn't abort on loading!
+      ALWAYS_ASSERT(false);
+    }
+    if (verbose) {
+      cerr << "[INFO] finished loading item" << endl;
+      cerr << "[INFO]   * average item record length: "
+           << (double(total_sz)/double(NumItems())) << " bytes" << endl;
+    }
+  }
+};
+
+class tpcc_stock_loader : public bench_loader, public tpcc_worker_mixin {
+public:
+  tpcc_stock_loader(unsigned long seed,
+                    abstract_db *db,
+                    const map<string, abstract_ordered_index *> &open_tables,
+                    const map<string, vector<abstract_ordered_index *>> &partitions,
+                    ssize_t warehouse_id)
+    : bench_loader(seed, db, open_tables),
+      tpcc_worker_mixin(partitions),
+      warehouse_id(warehouse_id)
+  {
+    ALWAYS_ASSERT(warehouse_id == -1 ||
+                  (warehouse_id >= 1 &&
+                   static_cast<size_t>(warehouse_id) <= NumWarehouses()));
+  }
+
+protected:
+  virtual void
+  load()
+  {
+    string obj_buf, obj_buf1;
+
+    uint64_t stock_total_sz = 0, n_stocks = 0;
+    const uint w_start = (warehouse_id == -1) ?
+      1 : static_cast<uint>(warehouse_id);
+    const uint w_end   = (warehouse_id == -1) ?
+      NumWarehouses() : static_cast<uint>(warehouse_id);
+
+    for (uint w = w_start; w <= w_end; w++) {
+      const size_t batchsize =
+        (db->txn_max_batch_size() == -1) ? NumItems() : db->txn_max_batch_size();
+      const size_t nbatches = (batchsize > NumItems()) ? 1 : (NumItems() / batchsize);
+
+      if (pin_cpus)
+        PinToWarehouseId(w);
+
+      for (uint b = 0; b < nbatches;) {
+        scoped_str_arena s_arena(arena);
+        void * const txn = db->new_txn(txn_flags, arena, txn_buf());
+        try {
+          const size_t iend = std::min((b + 1) * batchsize + 1, NumItems());
+          for (uint i = (b * batchsize + 1); i <= iend; i++) {
+            const stock::key k(w, i);
+            const stock_data::key k_data(w, i);
+
+            stock::value v;
+            v.s_quantity = RandomNumber(r, 10, 100);
+            v.s_ytd = 0;
+            v.s_order_cnt = 0;
+            v.s_remote_cnt = 0;
+
+            stock_data::value v_data;
+            const int len = RandomNumber(r, 26, 50);
+            if (RandomNumber(r, 1, 100) > 10) {
+              const string s_data = RandomStr(r, len);
+              v_data.s_data.assign(s_data);
+            } else {
+              const int startOriginal = RandomNumber(r, 2, (len - 8));
+              const string s_data = RandomStr(r, startOriginal + 1) + "ORIGINAL" + RandomStr(r, len - startOriginal - 7);
+              v_data.s_data.assign(s_data);
+            }
+            v_data.s_dist_01.assign(RandomStr(r, 24));
+            v_data.s_dist_02.assign(RandomStr(r, 24));
+            v_data.s_dist_03.assign(RandomStr(r, 24));
+            v_data.s_dist_04.assign(RandomStr(r, 24));
+            v_data.s_dist_05.assign(RandomStr(r, 24));
+            v_data.s_dist_06.assign(RandomStr(r, 24));
+            v_data.s_dist_07.assign(RandomStr(r, 24));
+            v_data.s_dist_08.assign(RandomStr(r, 24));
+            v_data.s_dist_09.assign(RandomStr(r, 24));
+            v_data.s_dist_10.assign(RandomStr(r, 24));
+
+            checker::SanityCheckStock(&k, &v);
+            const size_t sz = Size(v);
+            stock_total_sz += sz;
+            n_stocks++;
+            tbl_stock(w)->insert(txn, Encode(k), Encode(obj_buf, v));
+            tbl_stock_data(w)->insert(txn, Encode(k_data), Encode(obj_buf1, v_data));
+          }
+          if (db->commit_txn(txn)) {
+            b++;
+          } else {
+            db->abort_txn(txn);
+            if (verbose)
+              cerr << "[WARNING] stock loader loading abort" << endl;
+          }
+        } catch (abstract_db::abstract_abort_exception &ex) {
+          db->abort_txn(txn);
+          ALWAYS_ASSERT(warehouse_id != -1);
+          if (verbose)
+            cerr << "[WARNING] stock loader loading abort" << endl;
+        }
+      }
+    }
+
+    if (verbose) {
+      if (warehouse_id == -1) {
+        cerr << "[INFO] finished loading stock" << endl;
+        cerr << "[INFO]   * average stock record length: "
+             << (double(stock_total_sz)/double(n_stocks)) << " bytes" << endl;
+      } else {
+        cerr << "[INFO] finished loading stock (w=" << warehouse_id << ")" << endl;
+      }
+    }
+  }
+
+private:
+  ssize_t warehouse_id;
+};
+
+class tpcc_district_loader : public bench_loader, public tpcc_worker_mixin {
+public:
+  tpcc_district_loader(unsigned long seed,
+                       abstract_db *db,
+                       const map<string, abstract_ordered_index *> &open_tables,
+                       const map<string, vector<abstract_ordered_index *>> &partitions)
+    : bench_loader(seed, db, open_tables),
+      tpcc_worker_mixin(partitions)
+  {}
+
+protected:
+  virtual void
+  load()
+  {
+    string obj_buf;
+
+    const ssize_t bsize = db->txn_max_batch_size();
+    void *txn = db->new_txn(txn_flags, arena, txn_buf());
+    uint64_t district_total_sz = 0, n_districts = 0;
+    try {
+      uint cnt = 0;
+      for (uint w = 1; w <= NumWarehouses(); w++) {
+        if (pin_cpus)
+          PinToWarehouseId(w);
+        for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++, cnt++) {
+          const district::key k(w, d);
+
+          district::value v;
+          v.d_ytd = 30000;
+          v.d_tax = (float) (RandomNumber(r, 0, 2000) / 10000.0);
+          v.d_next_o_id = 3001;
+          v.d_name.assign(RandomStr(r, RandomNumber(r, 6, 10)));
+          v.d_street_1.assign(RandomStr(r, RandomNumber(r, 10, 20)));
+          v.d_street_2.assign(RandomStr(r, RandomNumber(r, 10, 20)));
+          v.d_city.assign(RandomStr(r, RandomNumber(r, 10, 20)));
+          v.d_state.assign(RandomStr(r, 3));
+          v.d_zip.assign("123456789");
+
+          checker::SanityCheckDistrict(&k, &v);
+          const size_t sz = Size(v);
+          district_total_sz += sz;
+          n_districts++;
+          tbl_district(w)->insert(txn, Encode(k), Encode(obj_buf, v));
+
+          if (bsize != -1 && !((cnt + 1) % bsize)) {
+            ALWAYS_ASSERT(db->commit_txn(txn));
+            txn = db->new_txn(txn_flags, arena, txn_buf());
+            arena.reset();
+          }
+        }
+      }
+      ALWAYS_ASSERT(db->commit_txn(txn));
+    } catch (abstract_db::abstract_abort_exception &ex) {
+      // shouldn't abort on loading!
+      ALWAYS_ASSERT(false);
+    }
+    if (verbose) {
+      cerr << "[INFO] finished loading district" << endl;
+      cerr << "[INFO]   * average district record length: "
+           << (double(district_total_sz)/double(n_districts)) << " bytes" << endl;
+    }
+  }
+};
+
+class tpcc_customer_loader : public bench_loader, public tpcc_worker_mixin {
+public:
+  tpcc_customer_loader(unsigned long seed,
+                       abstract_db *db,
+                       const map<string, abstract_ordered_index *> &open_tables,
+                       const map<string, vector<abstract_ordered_index *>> &partitions,
+                       ssize_t warehouse_id)
+    : bench_loader(seed, db, open_tables),
+      tpcc_worker_mixin(partitions),
+      warehouse_id(warehouse_id)
+  {
+    ALWAYS_ASSERT(warehouse_id == -1 ||
+                  (warehouse_id >= 1 &&
+                   static_cast<size_t>(warehouse_id) <= NumWarehouses()));
+  }
+
+protected:
+  virtual void
+  load()
+  {
+    string obj_buf;
+
+    const uint w_start = (warehouse_id == -1) ?
+      1 : static_cast<uint>(warehouse_id);
+    const uint w_end   = (warehouse_id == -1) ?
+      NumWarehouses() : static_cast<uint>(warehouse_id);
+    const size_t batchsize =
+      (db->txn_max_batch_size() == -1) ?
+        NumCustomersPerDistrict() : db->txn_max_batch_size();
+    const size_t nbatches =
+      (batchsize > NumCustomersPerDistrict()) ?
+        1 : (NumCustomersPerDistrict() / batchsize);
+    cerr << "num batches: " << nbatches << endl;
+
+    uint64_t total_sz = 0;
+
+    for (uint w = w_start; w <= w_end; w++) {
+      if (pin_cpus)
+        PinToWarehouseId(w);
+      for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
+        for (uint batch = 0; batch < nbatches;) {
+          scoped_str_arena s_arena(arena);
+          void * const txn = db->new_txn(txn_flags, arena, txn_buf());
+          const size_t cstart = batch * batchsize;
+          const size_t cend = std::min((batch + 1) * batchsize, NumCustomersPerDistrict());
+          try {
+            for (uint cidx0 = cstart; cidx0 < cend; cidx0++) {
+              const uint c = cidx0 + 1;
+              const customer::key k(w, d, c);
+
+              customer::value v;
+              v.c_discount = (float) (RandomNumber(r, 1, 5000) / 10000.0);
+              if (RandomNumber(r, 1, 100) <= 10)
+                v.c_credit.assign("BC");
+              else
+                v.c_credit.assign("GC");
+
+              if (c <= 1000)
+                v.c_last.assign(GetCustomerLastName(r, c - 1));
+              else
+                v.c_last.assign(GetNonUniformCustomerLastNameLoad(r));
+
+              v.c_first.assign(RandomStr(r, RandomNumber(r, 8, 16)));
+              v.c_credit_lim = 50000;
+
+              v.c_balance = -10;
+              v.c_ytd_payment = 10;
+              v.c_payment_cnt = 1;
+              v.c_delivery_cnt = 0;
+
+              v.c_street_1.assign(RandomStr(r, RandomNumber(r, 10, 20)));
+              v.c_street_2.assign(RandomStr(r, RandomNumber(r, 10, 20)));
+              v.c_city.assign(RandomStr(r, RandomNumber(r, 10, 20)));
+              v.c_state.assign(RandomStr(r, 3));
+              v.c_zip.assign(RandomNStr(r, 4) + "11111");
+              v.c_phone.assign(RandomNStr(r, 16));
+              v.c_since = GetCurrentTimeMillis();
+              v.c_middle.assign("OE");
+              v.c_data.assign(RandomStr(r, RandomNumber(r, 300, 500)));
+
+              checker::SanityCheckCustomer(&k, &v);
+              const size_t sz = Size(v);
+              total_sz += sz;
+              tbl_customer(w)->insert(txn, Encode(k), Encode(obj_buf, v));
+
+              // customer name index
+              const customer_name_idx::key k_idx(k.c_w_id, k.c_d_id, v.c_last.str(true), v.c_first.str(true));
+              const customer_name_idx::value v_idx(k.c_id);
+
+              // index structure is:
+              // (c_w_id, c_d_id, c_last, c_first) -> (c_id)
+
+              tbl_customer_name_idx(w)->insert(txn, Encode(k_idx), Encode(obj_buf, v_idx));
+
+              history::key k_hist;
+              k_hist.h_c_id = c;
+              k_hist.h_c_d_id = d;
+              k_hist.h_c_w_id = w;
+              k_hist.h_d_id = d;
+              k_hist.h_w_id = w;
+              k_hist.h_date = GetCurrentTimeMillis();
+
+              history::value v_hist;
+              v_hist.h_amount = 10;
+              v_hist.h_data.assign(RandomStr(r, RandomNumber(r, 10, 24)));
+
+              tbl_history(w)->insert(txn, Encode(k_hist), Encode(obj_buf, v_hist));
+            }
+            if (db->commit_txn(txn)) {
+              batch++;
+            } else {
+              db->abort_txn(txn);
+              if (verbose)
+                cerr << "[WARNING] customer loader loading abort" << endl;
+            }
+          } catch (abstract_db::abstract_abort_exception &ex) {
+            db->abort_txn(txn);
+            if (verbose)
+              cerr << "[WARNING] customer loader loading abort" << endl;
+          }
+        }
+      }
+    }
+
+    if (verbose) {
+      if (warehouse_id == -1) {
+        cerr << "[INFO] finished loading customer" << endl;
+        cerr << "[INFO]   * average customer record length: "
+             << (double(total_sz)/double(NumWarehouses()*NumDistrictsPerWarehouse()*NumCustomersPerDistrict()))
+             << " bytes " << endl;
+      } else {
+        cerr << "[INFO] finished loading customer (w=" << warehouse_id << ")" << endl;
+      }
+    }
+  }
+
+private:
+  ssize_t warehouse_id;
+};
+
+class tpcc_order_loader : public bench_loader, public tpcc_worker_mixin {
+public:
+  tpcc_order_loader(unsigned long seed,
+                    abstract_db *db,
+                    const map<string, abstract_ordered_index *> &open_tables,
+                    const map<string, vector<abstract_ordered_index *>> &partitions,
+                    ssize_t warehouse_id)
+    : bench_loader(seed, db, open_tables),
+      tpcc_worker_mixin(partitions),
+      warehouse_id(warehouse_id)
+  {
+    ALWAYS_ASSERT(warehouse_id == -1 ||
+                  (warehouse_id >= 1 &&
+                   static_cast<size_t>(warehouse_id) <= NumWarehouses()));
+  }
+
+protected:
+  virtual void
+  load()
+  {
+    string obj_buf;
+
+    uint64_t order_line_total_sz = 0, n_order_lines = 0;
+    uint64_t oorder_total_sz = 0, n_oorders = 0;
+    uint64_t new_order_total_sz = 0, n_new_orders = 0;
+
+    const uint w_start = (warehouse_id == -1) ?
+      1 : static_cast<uint>(warehouse_id);
+    const uint w_end   = (warehouse_id == -1) ?
+      NumWarehouses() : static_cast<uint>(warehouse_id);
+
+    for (uint w = w_start; w <= w_end; w++) {
+      if (pin_cpus)
+        PinToWarehouseId(w);
+      for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
+        set<uint> c_ids_s;
+        vector<uint> c_ids;
+        while (c_ids.size() != NumCustomersPerDistrict()) {
+          const auto x = (r.next() % NumCustomersPerDistrict()) + 1;
+          if (c_ids_s.count(x))
+            continue;
+          c_ids_s.insert(x);
+          c_ids.emplace_back(x);
+        }
+        for (uint c = 1; c <= NumCustomersPerDistrict();) {
+          scoped_str_arena s_arena(arena);
+          void * const txn = db->new_txn(txn_flags, arena, txn_buf());
+          try {
+            const oorder::key k_oo(w, d, c);
+
+            oorder::value v_oo;
+            v_oo.o_c_id = c_ids[c - 1];
+            if (k_oo.o_id < 2101)
+              v_oo.o_carrier_id = RandomNumber(r, 1, 10);
+            else
+              v_oo.o_carrier_id = 0;
+            v_oo.o_ol_cnt = RandomNumber(r, 5, 15);
+            v_oo.o_all_local = 1;
+            v_oo.o_entry_d = GetCurrentTimeMillis();
+
+            checker::SanityCheckOOrder(&k_oo, &v_oo);
+            const size_t sz = Size(v_oo);
+            oorder_total_sz += sz;
+            n_oorders++;
+            tbl_oorder(w)->insert(txn, Encode(k_oo), Encode(obj_buf, v_oo));
+
+            const oorder_c_id_idx::key k_oo_idx(k_oo.o_w_id, k_oo.o_d_id, v_oo.o_c_id, k_oo.o_id);
+            const oorder_c_id_idx::value v_oo_idx(0);
+
+            tbl_oorder_c_id_idx(w)->insert(txn, Encode(k_oo_idx), Encode(obj_buf, v_oo_idx));
+
+            if (c >= 2101) {
+              const new_order::key k_no(w, d, c);
+              const new_order::value v_no;
+
+              checker::SanityCheckNewOrder(&k_no, &v_no);
+              const size_t sz = Size(v_no);
+              new_order_total_sz += sz;
+              n_new_orders++;
+              tbl_new_order(w)->insert(txn, Encode(k_no), Encode(obj_buf, v_no));
+            }
+
+            for (uint l = 1; l <= uint(v_oo.o_ol_cnt); l++) {
+              const order_line::key k_ol(w, d, c, l);
+
+              order_line::value v_ol;
+              v_ol.ol_i_id = RandomNumber(r, 1, 100000);
+              if (k_ol.ol_o_id < 2101) {
+                v_ol.ol_delivery_d = v_oo.o_entry_d;
+                v_ol.ol_amount = 0;
+              } else {
+                v_ol.ol_delivery_d = 0;
+                // random within [0.01 .. 9,999.99]
+                v_ol.ol_amount = (float) (RandomNumber(r, 1, 999999) / 100.0);
+              }
+
+              v_ol.ol_supply_w_id = k_ol.ol_w_id;
+              v_ol.ol_quantity = 5;
+              // v_ol.ol_dist_info comes from stock_data(ol_supply_w_id, ol_o_id)
+              //v_ol.ol_dist_info = RandomStr(r, 24);
+
+              checker::SanityCheckOrderLine(&k_ol, &v_ol);
+              const size_t sz = Size(v_ol);
+              order_line_total_sz += sz;
+              n_order_lines++;
+              tbl_order_line(w)->insert(txn, Encode(k_ol), Encode(obj_buf, v_ol));
+            }
+            if (db->commit_txn(txn)) {
+              c++;
+            } else {
+              db->abort_txn(txn);
+              ALWAYS_ASSERT(warehouse_id != -1);
+              if (verbose)
+                cerr << "[WARNING] order loader loading abort" << endl;
+            }
+          } catch (abstract_db::abstract_abort_exception &ex) {
+            db->abort_txn(txn);
+            ALWAYS_ASSERT(warehouse_id != -1);
+            if (verbose)
+              cerr << "[WARNING] order loader loading abort" << endl;
+          }
+        }
+      }
+    }
+
+    if (verbose) {
+      if (warehouse_id == -1) {
+        cerr << "[INFO] finished loading order" << endl;
+        cerr << "[INFO]   * average order_line record length: "
+             << (double(order_line_total_sz)/double(n_order_lines)) << " bytes" << endl;
+        cerr << "[INFO]   * average oorder record length: "
+             << (double(oorder_total_sz)/double(n_oorders)) << " bytes" << endl;
+        cerr << "[INFO]   * average new_order record length: "
+             << (double(new_order_total_sz)/double(n_new_orders)) << " bytes" << endl;
+      } else {
+        cerr << "[INFO] finished loading order (w=" << warehouse_id << ")" << endl;
+      }
+    }
+  }
+
+private:
+  ssize_t warehouse_id;
+};
+
+static event_counter evt_tpcc_cross_partition_new_order_txns("tpcc_cross_partition_new_order_txns");
+static event_counter evt_tpcc_cross_partition_payment_txns("tpcc_cross_partition_payment_txns");
+
+tpcc_worker::txn_result
+tpcc_worker::txn_new_order()
+{
+  const uint warehouse_id = PickWarehouseId(r, warehouse_id_start, warehouse_id_end);
+  const uint districtID = RandomNumber(r, 1, 10);
+  const uint customerID = GetCustomerId(r);
+  const uint numItems = RandomNumber(r, 5, 15);
+  uint itemIDs[15], supplierWarehouseIDs[15], orderQuantities[15];
+  bool allLocal = true;
+  for (uint i = 0; i < numItems; i++) {
+    itemIDs[i] = GetItemId(r);
+    if (likely(g_disable_xpartition_txn ||
+               NumWarehouses() == 1 ||
+               RandomNumber(r, 1, 100) > g_new_order_remote_item_pct)) {
+      supplierWarehouseIDs[i] = warehouse_id;
+    } else {
+      do {
+       supplierWarehouseIDs[i] = RandomNumber(r, 1, NumWarehouses());
+      } while (supplierWarehouseIDs[i] == warehouse_id);
+      allLocal = false;
+    }
+    orderQuantities[i] = RandomNumber(r, 1, 10);
+  }
+  INVARIANT(!g_disable_xpartition_txn || allLocal);
+  if (!allLocal)
+    ++evt_tpcc_cross_partition_new_order_txns;
+
+  // XXX(stephentu): implement rollback
+  //
+  // worst case txn profile:
+  //   1 customer get
+  //   1 warehouse get
+  //   1 district get
+  //   1 new_order insert
+  //   1 district put
+  //   1 oorder insert
+  //   1 oorder_cid_idx insert
+  //   15 times:
+  //      1 item get
+  //      1 stock get
+  //      1 stock put
+  //      1 order_line insert
+  //
+  // output from txn counters:
+  //   max_absent_range_set_size : 0
+  //   max_absent_set_size : 0
+  //   max_node_scan_size : 0
+  //   max_read_set_size : 15
+  //   max_write_set_size : 15
+  //   num_txn_contexts : 9
+  void *txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_TPCC_NEW_ORDER);
+  scoped_str_arena s_arena(arena);
+  scoped_multilock<spinlock> mlock;
+  if (g_enable_partition_locks) {
+    if (allLocal) {
+      mlock.enq(LockForPartition(warehouse_id));
+    } else {
+      small_unordered_map<unsigned int, bool, 64> lockset;
+      mlock.enq(LockForPartition(warehouse_id));
+      lockset[PartitionId(warehouse_id)] = 1;
+      for (uint i = 0; i < numItems; i++) {
+        if (lockset.find(PartitionId(supplierWarehouseIDs[i])) == lockset.end()) {
+          mlock.enq(LockForPartition(supplierWarehouseIDs[i]));
+          lockset[PartitionId(supplierWarehouseIDs[i])] = 1;
+        }
+      }
+    }
+    mlock.multilock();
+  }
+  try {
+    ssize_t ret = 0;
+    const customer::key k_c(warehouse_id, districtID, customerID);
+    ALWAYS_ASSERT(tbl_customer(warehouse_id)->get(txn, Encode(obj_key0, k_c), obj_v));
+    customer::value v_c_temp;
+    const customer::value *v_c = Decode(obj_v, v_c_temp);
+    checker::SanityCheckCustomer(&k_c, v_c);
+
+    const warehouse::key k_w(warehouse_id);
+    ALWAYS_ASSERT(tbl_warehouse(warehouse_id)->get(txn, Encode(obj_key0, k_w), obj_v));
+    warehouse::value v_w_temp;
+    const warehouse::value *v_w = Decode(obj_v, v_w_temp);
+    checker::SanityCheckWarehouse(&k_w, v_w);
+
+    const district::key k_d(warehouse_id, districtID);
+    ALWAYS_ASSERT(tbl_district(warehouse_id)->get(txn, Encode(obj_key0, k_d), obj_v));
+    district::value v_d_temp;
+    const district::value *v_d = Decode(obj_v, v_d_temp);
+    checker::SanityCheckDistrict(&k_d, v_d);
+
+    const uint64_t my_next_o_id = g_new_order_fast_id_gen ?
+        FastNewOrderIdGen(warehouse_id, districtID) : v_d->d_next_o_id;
+
+    const new_order::key k_no(warehouse_id, districtID, my_next_o_id);
+    const new_order::value v_no;
+    const size_t new_order_sz = Size(v_no);
+    tbl_new_order(warehouse_id)->insert(txn, Encode(str(), k_no), Encode(str(), v_no));
+    ret += new_order_sz;
+
+    if (!g_new_order_fast_id_gen) {
+      district::value v_d_new(*v_d);
+      v_d_new.d_next_o_id++;
+      tbl_district(warehouse_id)->put(txn, Encode(str(), k_d), Encode(str(), v_d_new));
+    }
+
+    const oorder::key k_oo(warehouse_id, districtID, k_no.no_o_id);
+    oorder::value v_oo;
+    v_oo.o_c_id = int32_t(customerID);
+    v_oo.o_carrier_id = 0; // seems to be ignored
+    v_oo.o_ol_cnt = int8_t(numItems);
+    v_oo.o_all_local = allLocal;
+    v_oo.o_entry_d = GetCurrentTimeMillis();
+
+    const size_t oorder_sz = Size(v_oo);
+    tbl_oorder(warehouse_id)->insert(txn, Encode(str(), k_oo), Encode(str(), v_oo));
+    ret += oorder_sz;
+
+    const oorder_c_id_idx::key k_oo_idx(warehouse_id, districtID, customerID, k_no.no_o_id);
+    const oorder_c_id_idx::value v_oo_idx(0);
+
+    tbl_oorder_c_id_idx(warehouse_id)->insert(txn, Encode(str(), k_oo_idx), Encode(str(), v_oo_idx));
+
+    for (uint ol_number = 1; ol_number <= numItems; ol_number++) {
+      const uint ol_supply_w_id = supplierWarehouseIDs[ol_number - 1];
+      const uint ol_i_id = itemIDs[ol_number - 1];
+      const uint ol_quantity = orderQuantities[ol_number - 1];
+
+      const item::key k_i(ol_i_id);
+      ALWAYS_ASSERT(tbl_item(1)->get(txn, Encode(obj_key0, k_i), obj_v));
+      item::value v_i_temp;
+      const item::value *v_i = Decode(obj_v, v_i_temp);
+      checker::SanityCheckItem(&k_i, v_i);
+
+      const stock::key k_s(ol_supply_w_id, ol_i_id);
+      ALWAYS_ASSERT(tbl_stock(ol_supply_w_id)->get(txn, Encode(obj_key0, k_s), obj_v));
+      stock::value v_s_temp;
+      const stock::value *v_s = Decode(obj_v, v_s_temp);
+      checker::SanityCheckStock(&k_s, v_s);
+
+      stock::value v_s_new(*v_s);
+      if (v_s_new.s_quantity - ol_quantity >= 10)
+        v_s_new.s_quantity -= ol_quantity;
+      else
+        v_s_new.s_quantity += -int32_t(ol_quantity) + 91;
+      v_s_new.s_ytd += ol_quantity;
+      v_s_new.s_remote_cnt += (ol_supply_w_id == warehouse_id) ? 0 : 1;
+
+      tbl_stock(ol_supply_w_id)->put(txn, Encode(str(), k_s), Encode(str(), v_s_new));
+
+      const order_line::key k_ol(warehouse_id, districtID, k_no.no_o_id, ol_number);
+      order_line::value v_ol;
+      v_ol.ol_i_id = int32_t(ol_i_id);
+      v_ol.ol_delivery_d = 0; // not delivered yet
+      v_ol.ol_amount = float(ol_quantity) * v_i->i_price;
+      v_ol.ol_supply_w_id = int32_t(ol_supply_w_id);
+      v_ol.ol_quantity = int8_t(ol_quantity);
+
+      const size_t order_line_sz = Size(v_ol);
+      tbl_order_line(warehouse_id)->insert(txn, Encode(str(), k_ol), Encode(str(), v_ol));
+      ret += order_line_sz;
+    }
+
+    measure_txn_counters(txn, "txn_new_order");
+    if (likely(db->commit_txn(txn)))
+      return txn_result(true, ret);
+  } catch (abstract_db::abstract_abort_exception &ex) {
+    db->abort_txn(txn);
+  }
+  return txn_result(false, 0);
+}
+
+class new_order_scan_callback : public abstract_ordered_index::scan_callback {
+public:
+  new_order_scan_callback() : k_no(0) {}
+  virtual bool invoke(
+      const char *keyp, size_t keylen,
+      const string &value)
+  {
+    INVARIANT(keylen == sizeof(new_order::key));
+    INVARIANT(value.size() == sizeof(new_order::value));
+    k_no = Decode(keyp, k_no_temp);
+#ifdef CHECK_INVARIANTS
+    new_order::value v_no_temp;
+    const new_order::value *v_no = Decode(value, v_no_temp);
+    checker::SanityCheckNewOrder(k_no, v_no);
+#endif
+    return false;
+  }
+  inline const new_order::key *
+  get_key() const
+  {
+    return k_no;
+  }
+private:
+  new_order::key k_no_temp;
+  const new_order::key *k_no;
+};
+
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, delivery_probe0_tod, delivery_probe0_cg)
+
+tpcc_worker::txn_result
+tpcc_worker::txn_delivery()
+{
+  const uint warehouse_id = PickWarehouseId(r, warehouse_id_start, warehouse_id_end);
+  const uint o_carrier_id = RandomNumber(r, 1, NumDistrictsPerWarehouse());
+  const uint32_t ts = GetCurrentTimeMillis();
+
+  // worst case txn profile:
+  //   10 times:
+  //     1 new_order scan node
+  //     1 oorder get
+  //     2 order_line scan nodes
+  //     15 order_line puts
+  //     1 new_order remove
+  //     1 oorder put
+  //     1 customer get
+  //     1 customer put
+  //
+  // output from counters:
+  //   max_absent_range_set_size : 0
+  //   max_absent_set_size : 0
+  //   max_node_scan_size : 21
+  //   max_read_set_size : 133
+  //   max_write_set_size : 133
+  //   num_txn_contexts : 4
+  void *txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_TPCC_DELIVERY);
+  scoped_str_arena s_arena(arena);
+  scoped_lock_guard<spinlock> slock(
+      g_enable_partition_locks ? &LockForPartition(warehouse_id) : nullptr);
+  try {
+    ssize_t ret = 0;
+    for (uint d = 1; d <= NumDistrictsPerWarehouse(); d++) {
+      const new_order::key k_no_0(warehouse_id, d, last_no_o_ids[d - 1]);
+      const new_order::key k_no_1(warehouse_id, d, numeric_limits<int32_t>::max());
+      new_order_scan_callback new_order_c;
+      {
+        ANON_REGION("DeliverNewOrderScan:", &delivery_probe0_cg);
+        tbl_new_order(warehouse_id)->scan(txn, Encode(obj_key0, k_no_0), &Encode(obj_key1, k_no_1), new_order_c, s_arena.get());
+      }
+
+      const new_order::key *k_no = new_order_c.get_key();
+      if (unlikely(!k_no))
+        continue;
+      last_no_o_ids[d - 1] = k_no->no_o_id + 1; // XXX: update last seen
+
+      const oorder::key k_oo(warehouse_id, d, k_no->no_o_id);
+      if (unlikely(!tbl_oorder(warehouse_id)->get(txn, Encode(obj_key0, k_oo), obj_v))) {
+        // even if we read the new order entry, there's no guarantee
+        // we will read the oorder entry: in this case the txn will abort,
+        // but we're simply bailing out early
+        db->abort_txn(txn);
+        return txn_result(false, 0);
+      }
+      oorder::value v_oo_temp;
+      const oorder::value *v_oo = Decode(obj_v, v_oo_temp);
+      checker::SanityCheckOOrder(&k_oo, v_oo);
+
+      static_limit_callback<15> c(s_arena.get(), false); // never more than 15 order_lines per order
+      const order_line::key k_oo_0(warehouse_id, d, k_no->no_o_id, 0);
+      const order_line::key k_oo_1(warehouse_id, d, k_no->no_o_id, numeric_limits<int32_t>::max());
+
+      // XXX(stephentu): mutable scans would help here
+      tbl_order_line(warehouse_id)->scan(txn, Encode(obj_key0, k_oo_0), &Encode(obj_key1, k_oo_1), c, s_arena.get());
+      float sum = 0.0;
+      for (size_t i = 0; i < c.size(); i++) {
+        order_line::value v_ol_temp;
+        const order_line::value *v_ol = Decode(*c.values[i].second, v_ol_temp);
+
+#ifdef CHECK_INVARIANTS
+        order_line::key k_ol_temp;
+        const order_line::key *k_ol = Decode(*c.values[i].first, k_ol_temp);
+        checker::SanityCheckOrderLine(k_ol, v_ol);
+#endif
+
+        sum += v_ol->ol_amount;
+        order_line::value v_ol_new(*v_ol);
+        v_ol_new.ol_delivery_d = ts;
+        INVARIANT(s_arena.get()->manages(c.values[i].first));
+        tbl_order_line(warehouse_id)->put(txn, *c.values[i].first, Encode(str(), v_ol_new));
+      }
+
+      // delete new order
+      tbl_new_order(warehouse_id)->remove(txn, Encode(str(), *k_no));
+      ret -= 0 /*new_order_c.get_value_size()*/;
+
+      // update oorder
+      oorder::value v_oo_new(*v_oo);
+      v_oo_new.o_carrier_id = o_carrier_id;
+      tbl_oorder(warehouse_id)->put(txn, Encode(str(), k_oo), Encode(str(), v_oo_new));
+
+      const uint c_id = v_oo->o_c_id;
+      const float ol_total = sum;
+
+      // update customer
+      const customer::key k_c(warehouse_id, d, c_id);
+      ALWAYS_ASSERT(tbl_customer(warehouse_id)->get(txn, Encode(obj_key0, k_c), obj_v));
+
+      customer::value v_c_temp;
+      const customer::value *v_c = Decode(obj_v, v_c_temp);
+      customer::value v_c_new(*v_c);
+      v_c_new.c_balance += ol_total;
+      tbl_customer(warehouse_id)->put(txn, Encode(str(), k_c), Encode(str(), v_c_new));
+    }
+    measure_txn_counters(txn, "txn_delivery");
+    if (likely(db->commit_txn(txn)))
+      return txn_result(true, ret);
+  } catch (abstract_db::abstract_abort_exception &ex) {
+    db->abort_txn(txn);
+  }
+  return txn_result(false, 0);
+}
+
+static event_avg_counter evt_avg_cust_name_idx_scan_size("avg_cust_name_idx_scan_size");
+
+tpcc_worker::txn_result
+tpcc_worker::txn_payment()
+{
+  const uint warehouse_id = PickWarehouseId(r, warehouse_id_start, warehouse_id_end);
+  const uint districtID = RandomNumber(r, 1, NumDistrictsPerWarehouse());
+  uint customerDistrictID, customerWarehouseID;
+  if (likely(g_disable_xpartition_txn ||
+             NumWarehouses() == 1 ||
+             RandomNumber(r, 1, 100) <= 85)) {
+    customerDistrictID = districtID;
+    customerWarehouseID = warehouse_id;
+  } else {
+    customerDistrictID = RandomNumber(r, 1, NumDistrictsPerWarehouse());
+    do {
+      customerWarehouseID = RandomNumber(r, 1, NumWarehouses());
+    } while (customerWarehouseID == warehouse_id);
+  }
+  const float paymentAmount = (float) (RandomNumber(r, 100, 500000) / 100.0);
+  const uint32_t ts = GetCurrentTimeMillis();
+  INVARIANT(!g_disable_xpartition_txn || customerWarehouseID == warehouse_id);
+
+  // output from txn counters:
+  //   max_absent_range_set_size : 0
+  //   max_absent_set_size : 0
+  //   max_node_scan_size : 10
+  //   max_read_set_size : 71
+  //   max_write_set_size : 1
+  //   num_txn_contexts : 5
+  void *txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_TPCC_PAYMENT);
+  scoped_str_arena s_arena(arena);
+  scoped_multilock<spinlock> mlock;
+  if (g_enable_partition_locks) {
+    mlock.enq(LockForPartition(warehouse_id));
+    if (PartitionId(customerWarehouseID) != PartitionId(warehouse_id))
+      mlock.enq(LockForPartition(customerWarehouseID));
+    mlock.multilock();
+  }
+  if (customerWarehouseID != warehouse_id)
+    ++evt_tpcc_cross_partition_payment_txns;
+  try {
+    ssize_t ret = 0;
+
+    const warehouse::key k_w(warehouse_id);
+    ALWAYS_ASSERT(tbl_warehouse(warehouse_id)->get(txn, Encode(obj_key0, k_w), obj_v));
+    warehouse::value v_w_temp;
+    const warehouse::value *v_w = Decode(obj_v, v_w_temp);
+    checker::SanityCheckWarehouse(&k_w, v_w);
+
+    warehouse::value v_w_new(*v_w);
+    v_w_new.w_ytd += paymentAmount;
+    tbl_warehouse(warehouse_id)->put(txn, Encode(str(), k_w), Encode(str(), v_w_new));
+
+    const district::key k_d(warehouse_id, districtID);
+    ALWAYS_ASSERT(tbl_district(warehouse_id)->get(txn, Encode(obj_key0, k_d), obj_v));
+    district::value v_d_temp;
+    const district::value *v_d = Decode(obj_v, v_d_temp);
+    checker::SanityCheckDistrict(&k_d, v_d);
+
+    district::value v_d_new(*v_d);
+    v_d_new.d_ytd += paymentAmount;
+    tbl_district(warehouse_id)->put(txn, Encode(str(), k_d), Encode(str(), v_d_new));
+
+    customer::key k_c;
+    customer::value v_c;
+    if (RandomNumber(r, 1, 100) <= 60) {
+      // cust by name
+      uint8_t lastname_buf[CustomerLastNameMaxSize + 1];
+      static_assert(sizeof(lastname_buf) == 16, "xx");
+      NDB_MEMSET(lastname_buf, 0, sizeof(lastname_buf));
+      GetNonUniformCustomerLastNameRun(lastname_buf, r);
+
+      static const string zeros(16, 0);
+      static const string ones(16, 255);
+
+      customer_name_idx::key k_c_idx_0;
+      k_c_idx_0.c_w_id = customerWarehouseID;
+      k_c_idx_0.c_d_id = customerDistrictID;
+      k_c_idx_0.c_last.assign((const char *) lastname_buf, 16);
+      k_c_idx_0.c_first.assign(zeros);
+
+      customer_name_idx::key k_c_idx_1;
+      k_c_idx_1.c_w_id = customerWarehouseID;
+      k_c_idx_1.c_d_id = customerDistrictID;
+      k_c_idx_1.c_last.assign((const char *) lastname_buf, 16);
+      k_c_idx_1.c_first.assign(ones);
+
+      static_limit_callback<NMaxCustomerIdxScanElems> c(s_arena.get(), true); // probably a safe bet for now
+      tbl_customer_name_idx(customerWarehouseID)->scan(txn, Encode(obj_key0, k_c_idx_0), &Encode(obj_key1, k_c_idx_1), c, s_arena.get());
+      ALWAYS_ASSERT(c.size() > 0);
+      INVARIANT(c.size() < NMaxCustomerIdxScanElems); // we should detect this
+      int index = c.size() / 2;
+      if (c.size() % 2 == 0)
+        index--;
+      evt_avg_cust_name_idx_scan_size.offer(c.size());
+
+      customer_name_idx::value v_c_idx_temp;
+      const customer_name_idx::value *v_c_idx = Decode(*c.values[index].second, v_c_idx_temp);
+
+      k_c.c_w_id = customerWarehouseID;
+      k_c.c_d_id = customerDistrictID;
+      k_c.c_id = v_c_idx->c_id;
+      ALWAYS_ASSERT(tbl_customer(customerWarehouseID)->get(txn, Encode(obj_key0, k_c), obj_v));
+      Decode(obj_v, v_c);
+
+    } else {
+      // cust by ID
+      const uint customerID = GetCustomerId(r);
+      k_c.c_w_id = customerWarehouseID;
+      k_c.c_d_id = customerDistrictID;
+      k_c.c_id = customerID;
+      ALWAYS_ASSERT(tbl_customer(customerWarehouseID)->get(txn, Encode(obj_key0, k_c), obj_v));
+      Decode(obj_v, v_c);
+    }
+    checker::SanityCheckCustomer(&k_c, &v_c);
+    customer::value v_c_new(v_c);
+
+    v_c_new.c_balance -= paymentAmount;
+    v_c_new.c_ytd_payment += paymentAmount;
+    v_c_new.c_payment_cnt++;
+    if (strncmp(v_c.c_credit.data(), "BC", 2) == 0) {
+      char buf[501];
+      int n = snprintf(buf, sizeof(buf), "%d %d %d %d %d %f | %s",
+                       k_c.c_id,
+                       k_c.c_d_id,
+                       k_c.c_w_id,
+                       districtID,
+                       warehouse_id,
+                       paymentAmount,
+                       v_c.c_data.c_str());
+      v_c_new.c_data.resize_junk(
+          min(static_cast<size_t>(n), v_c_new.c_data.max_size()));
+      NDB_MEMCPY((void *) v_c_new.c_data.data(), &buf[0], v_c_new.c_data.size());
+    }
+
+    tbl_customer(customerWarehouseID)->put(txn, Encode(str(), k_c), Encode(str(), v_c_new));
+
+    const history::key k_h(k_c.c_d_id, k_c.c_w_id, k_c.c_id, districtID, warehouse_id, ts);
+    history::value v_h;
+    v_h.h_amount = paymentAmount;
+    v_h.h_data.resize_junk(v_h.h_data.max_size());
+    int n = snprintf((char *) v_h.h_data.data(), v_h.h_data.max_size() + 1,
+                     "%.10s    %.10s",
+                     v_w->w_name.c_str(),
+                     v_d->d_name.c_str());
+    v_h.h_data.resize_junk(min(static_cast<size_t>(n), v_h.h_data.max_size()));
+
+    const size_t history_sz = Size(v_h);
+    tbl_history(warehouse_id)->insert(txn, Encode(str(), k_h), Encode(str(), v_h));
+    ret += history_sz;
+
+    measure_txn_counters(txn, "txn_payment");
+    if (likely(db->commit_txn(txn)))
+      return txn_result(true, ret);
+  } catch (abstract_db::abstract_abort_exception &ex) {
+    db->abort_txn(txn);
+  }
+  return txn_result(false, 0);
+}
+
+class order_line_nop_callback : public abstract_ordered_index::scan_callback {
+public:
+  order_line_nop_callback() : n(0) {}
+  virtual bool invoke(
+      const char *keyp, size_t keylen,
+      const string &value)
+  {
+    INVARIANT(keylen == sizeof(order_line::key));
+    order_line::value v_ol_temp;
+    const order_line::value *v_ol UNUSED = Decode(value, v_ol_temp);
+#ifdef CHECK_INVARIANTS
+    order_line::key k_ol_temp;
+    const order_line::key *k_ol = Decode(keyp, k_ol_temp);
+    checker::SanityCheckOrderLine(k_ol, v_ol);
+#endif
+    ++n;
+    return true;
+  }
+  size_t n;
+};
+
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, order_status_probe0_tod, order_status_probe0_cg)
+
+tpcc_worker::txn_result
+tpcc_worker::txn_order_status()
+{
+  const uint warehouse_id = PickWarehouseId(r, warehouse_id_start, warehouse_id_end);
+  const uint districtID = RandomNumber(r, 1, NumDistrictsPerWarehouse());
+
+  // output from txn counters:
+  //   max_absent_range_set_size : 0
+  //   max_absent_set_size : 0
+  //   max_node_scan_size : 13
+  //   max_read_set_size : 81
+  //   max_write_set_size : 0
+  //   num_txn_contexts : 4
+  const uint64_t read_only_mask =
+    g_disable_read_only_scans ? 0 : transaction_base::TXN_FLAG_READ_ONLY;
+  const abstract_db::TxnProfileHint hint =
+    g_disable_read_only_scans ?
+      abstract_db::HINT_TPCC_ORDER_STATUS :
+      abstract_db::HINT_TPCC_ORDER_STATUS_READ_ONLY;
+  void *txn = db->new_txn(txn_flags | read_only_mask, arena, txn_buf(), hint);
+  scoped_str_arena s_arena(arena);
+  // NB: since txn_order_status() is a RO txn, we assume that
+  // locking is un-necessary (since we can just read from some old snapshot)
+  try {
+
+    customer::key k_c;
+    customer::value v_c;
+    if (RandomNumber(r, 1, 100) <= 60) {
+      // cust by name
+      uint8_t lastname_buf[CustomerLastNameMaxSize + 1];
+      static_assert(sizeof(lastname_buf) == 16, "xx");
+      NDB_MEMSET(lastname_buf, 0, sizeof(lastname_buf));
+      GetNonUniformCustomerLastNameRun(lastname_buf, r);
+
+      static const string zeros(16, 0);
+      static const string ones(16, 255);
+
+      customer_name_idx::key k_c_idx_0;
+      k_c_idx_0.c_w_id = warehouse_id;
+      k_c_idx_0.c_d_id = districtID;
+      k_c_idx_0.c_last.assign((const char *) lastname_buf, 16);
+      k_c_idx_0.c_first.assign(zeros);
+
+      customer_name_idx::key k_c_idx_1;
+      k_c_idx_1.c_w_id = warehouse_id;
+      k_c_idx_1.c_d_id = districtID;
+      k_c_idx_1.c_last.assign((const char *) lastname_buf, 16);
+      k_c_idx_1.c_first.assign(ones);
+
+      static_limit_callback<NMaxCustomerIdxScanElems> c(s_arena.get(), true); // probably a safe bet for now
+      tbl_customer_name_idx(warehouse_id)->scan(txn, Encode(obj_key0, k_c_idx_0), &Encode(obj_key1, k_c_idx_1), c, s_arena.get());
+      ALWAYS_ASSERT(c.size() > 0);
+      INVARIANT(c.size() < NMaxCustomerIdxScanElems); // we should detect this
+      int index = c.size() / 2;
+      if (c.size() % 2 == 0)
+        index--;
+      evt_avg_cust_name_idx_scan_size.offer(c.size());
+
+      customer_name_idx::value v_c_idx_temp;
+      const customer_name_idx::value *v_c_idx = Decode(*c.values[index].second, v_c_idx_temp);
+
+      k_c.c_w_id = warehouse_id;
+      k_c.c_d_id = districtID;
+      k_c.c_id = v_c_idx->c_id;
+      ALWAYS_ASSERT(tbl_customer(warehouse_id)->get(txn, Encode(obj_key0, k_c), obj_v));
+      Decode(obj_v, v_c);
+
+    } else {
+      // cust by ID
+      const uint customerID = GetCustomerId(r);
+      k_c.c_w_id = warehouse_id;
+      k_c.c_d_id = districtID;
+      k_c.c_id = customerID;
+      ALWAYS_ASSERT(tbl_customer(warehouse_id)->get(txn, Encode(obj_key0, k_c), obj_v));
+      Decode(obj_v, v_c);
+    }
+    checker::SanityCheckCustomer(&k_c, &v_c);
+
+    string *newest_o_c_id = s_arena.get()->next();
+    if (g_order_status_scan_hack) {
+      // XXX(stephentu): HACK- we bound the # of elems returned by this scan to
+      // 15- this is because we don't have reverse scans. In an ideal system, a
+      // reverse scan would only need to read 1 btree node. We could simulate a
+      // lookup by only reading the first element- but then we would *always*
+      // read the first order by any customer.  To make this more interesting, we
+      // randomly select which elem to pick within the 1st or 2nd btree nodes.
+      // This is obviously a deviation from TPC-C, but it shouldn't make that
+      // much of a difference in terms of performance numbers (in fact we are
+      // making it worse for us)
+      latest_key_callback c_oorder(*newest_o_c_id, (r.next() % 15) + 1);
+      const oorder_c_id_idx::key k_oo_idx_0(warehouse_id, districtID, k_c.c_id, 0);
+      const oorder_c_id_idx::key k_oo_idx_1(warehouse_id, districtID, k_c.c_id, numeric_limits<int32_t>::max());
+      {
+        ANON_REGION("OrderStatusOOrderScan:", &order_status_probe0_cg);
+        tbl_oorder_c_id_idx(warehouse_id)->scan(txn, Encode(obj_key0, k_oo_idx_0), &Encode(obj_key1, k_oo_idx_1), c_oorder, s_arena.get());
+      }
+      ALWAYS_ASSERT(c_oorder.size());
+    } else {
+      latest_key_callback c_oorder(*newest_o_c_id, 1);
+      const oorder_c_id_idx::key k_oo_idx_hi(warehouse_id, districtID, k_c.c_id, numeric_limits<int32_t>::max());
+      tbl_oorder_c_id_idx(warehouse_id)->rscan(txn, Encode(obj_key0, k_oo_idx_hi), nullptr, c_oorder, s_arena.get());
+      ALWAYS_ASSERT(c_oorder.size() == 1);
+    }
+
+    oorder_c_id_idx::key k_oo_idx_temp;
+    const oorder_c_id_idx::key *k_oo_idx = Decode(*newest_o_c_id, k_oo_idx_temp);
+    const uint o_id = k_oo_idx->o_o_id;
+
+    order_line_nop_callback c_order_line;
+    const order_line::key k_ol_0(warehouse_id, districtID, o_id, 0);
+    const order_line::key k_ol_1(warehouse_id, districtID, o_id, numeric_limits<int32_t>::max());
+    tbl_order_line(warehouse_id)->scan(txn, Encode(obj_key0, k_ol_0), &Encode(obj_key1, k_ol_1), c_order_line, s_arena.get());
+    ALWAYS_ASSERT(c_order_line.n >= 5 && c_order_line.n <= 15);
+
+    measure_txn_counters(txn, "txn_order_status");
+    if (likely(db->commit_txn(txn)))
+      return txn_result(true, 0);
+  } catch (abstract_db::abstract_abort_exception &ex) {
+    db->abort_txn(txn);
+  }
+  return txn_result(false, 0);
+}
+
+class order_line_scan_callback : public abstract_ordered_index::scan_callback {
+public:
+  order_line_scan_callback() : n(0) {}
+  virtual bool invoke(
+      const char *keyp, size_t keylen,
+      const string &value)
+  {
+    INVARIANT(keylen == sizeof(order_line::key));
+    order_line::value v_ol_temp;
+    const order_line::value *v_ol = Decode(value, v_ol_temp);
+
+#ifdef CHECK_INVARIANTS
+    order_line::key k_ol_temp;
+    const order_line::key *k_ol = Decode(keyp, k_ol_temp);
+    checker::SanityCheckOrderLine(k_ol, v_ol);
+#endif
+
+    s_i_ids[v_ol->ol_i_id] = 1;
+    n++;
+    return true;
+  }
+  size_t n;
+  small_unordered_map<uint, bool, 512> s_i_ids;
+};
+
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe0_tod, stock_level_probe0_cg)
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe1_tod, stock_level_probe1_cg)
+STATIC_COUNTER_DECL(scopedperf::tod_ctr, stock_level_probe2_tod, stock_level_probe2_cg)
+
+static event_avg_counter evt_avg_stock_level_loop_join_lookups("stock_level_loop_join_lookups");
+
+tpcc_worker::txn_result
+tpcc_worker::txn_stock_level()
+{
+  const uint warehouse_id = PickWarehouseId(r, warehouse_id_start, warehouse_id_end);
+  const uint threshold = RandomNumber(r, 10, 20);
+  const uint districtID = RandomNumber(r, 1, NumDistrictsPerWarehouse());
+
+  // output from txn counters:
+  //   max_absent_range_set_size : 0
+  //   max_absent_set_size : 0
+  //   max_node_scan_size : 19
+  //   max_read_set_size : 241
+  //   max_write_set_size : 0
+  //   n_node_scan_large_instances : 1
+  //   n_read_set_large_instances : 2
+  //   num_txn_contexts : 3
+  const uint64_t read_only_mask =
+    g_disable_read_only_scans ? 0 : transaction_base::TXN_FLAG_READ_ONLY;
+  const abstract_db::TxnProfileHint hint =
+    g_disable_read_only_scans ?
+      abstract_db::HINT_TPCC_STOCK_LEVEL :
+      abstract_db::HINT_TPCC_STOCK_LEVEL_READ_ONLY;
+  void *txn = db->new_txn(txn_flags | read_only_mask, arena, txn_buf(), hint);
+  scoped_str_arena s_arena(arena);
+  // NB: since txn_stock_level() is a RO txn, we assume that
+  // locking is un-necessary (since we can just read from some old snapshot)
+  try {
+    const district::key k_d(warehouse_id, districtID);
+    ALWAYS_ASSERT(tbl_district(warehouse_id)->get(txn, Encode(obj_key0, k_d), obj_v));
+    district::value v_d_temp;
+    const district::value *v_d = Decode(obj_v, v_d_temp);
+    checker::SanityCheckDistrict(&k_d, v_d);
+
+    const uint64_t cur_next_o_id = g_new_order_fast_id_gen ?
+      NewOrderIdHolder(warehouse_id, districtID).load(memory_order_acquire) :
+      v_d->d_next_o_id;
+
+    // manual joins are fun!
+    order_line_scan_callback c;
+    const int32_t lower = cur_next_o_id >= 20 ? (cur_next_o_id - 20) : 0;
+    const order_line::key k_ol_0(warehouse_id, districtID, lower, 0);
+    const order_line::key k_ol_1(warehouse_id, districtID, cur_next_o_id, 0);
+    {
+      ANON_REGION("StockLevelOrderLineScan:", &stock_level_probe0_cg);
+      tbl_order_line(warehouse_id)->scan(txn, Encode(obj_key0, k_ol_0), &Encode(obj_key1, k_ol_1), c, s_arena.get());
+    }
+    {
+      small_unordered_map<uint, bool, 512> s_i_ids_distinct;
+      for (auto &p : c.s_i_ids) {
+        ANON_REGION("StockLevelLoopJoinIter:", &stock_level_probe1_cg);
+
+        const size_t nbytesread = serializer<int16_t, true>::max_nbytes();
+
+        const stock::key k_s(warehouse_id, p.first);
+        INVARIANT(p.first >= 1 && p.first <= NumItems());
+        {
+          ANON_REGION("StockLevelLoopJoinGet:", &stock_level_probe2_cg);
+          ALWAYS_ASSERT(tbl_stock(warehouse_id)->get(txn, Encode(obj_key0, k_s), obj_v, nbytesread));
+        }
+        INVARIANT(obj_v.size() <= nbytesread);
+        const uint8_t *ptr = (const uint8_t *) obj_v.data();
+        int16_t i16tmp;
+        ptr = serializer<int16_t, true>::read(ptr, &i16tmp);
+        if (i16tmp < int(threshold))
+          s_i_ids_distinct[p.first] = 1;
+      }
+      evt_avg_stock_level_loop_join_lookups.offer(c.s_i_ids.size());
+      // NB(stephentu): s_i_ids_distinct.size() is the computed result of this txn
+    }
+    measure_txn_counters(txn, "txn_stock_level");
+    if (likely(db->commit_txn(txn)))
+      return txn_result(true, 0);
+  } catch (abstract_db::abstract_abort_exception &ex) {
+    db->abort_txn(txn);
+  }
+  return txn_result(false, 0);
+}
+
+template <typename T>
+static vector<T>
+unique_filter(const vector<T> &v)
+{
+  set<T> seen;
+  vector<T> ret;
+  for (auto &e : v)
+    if (!seen.count(e)) {
+      ret.emplace_back(e);
+      seen.insert(e);
+    }
+  return ret;
+}
+
+class tpcc_bench_runner : public bench_runner {
+private:
+
+  static bool
+  IsTableReadOnly(const char *name)
+  {
+    return strcmp("item", name) == 0;
+  }
+
+  static bool
+  IsTableAppendOnly(const char *name)
+  {
+    return strcmp("history", name) == 0 ||
+           strcmp("oorder_c_id_idx", name) == 0;
+  }
+
+  static vector<abstract_ordered_index *>
+  OpenTablesForTablespace(abstract_db *db, const char *name, size_t expected_size)
+  {
+    const bool is_read_only = IsTableReadOnly(name);
+    const bool is_append_only = IsTableAppendOnly(name);
+    const string s_name(name);
+    vector<abstract_ordered_index *> ret(NumWarehouses());
+    if (g_enable_separate_tree_per_partition && !is_read_only) {
+      if (NumWarehouses() <= nthreads) {
+        for (size_t i = 0; i < NumWarehouses(); i++)
+          ret[i] = db->open_index(s_name + "_" + to_string(i), expected_size, is_append_only);
+      } else {
+        const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
+        for (size_t partid = 0; partid < nthreads; partid++) {
+          const unsigned wstart = partid * nwhse_per_partition;
+          const unsigned wend   = (partid + 1 == nthreads) ?
+            NumWarehouses() : (partid + 1) * nwhse_per_partition;
+          abstract_ordered_index *idx =
+            db->open_index(s_name + "_" + to_string(partid), expected_size, is_append_only);
+          for (size_t i = wstart; i < wend; i++)
+            ret[i] = idx;
+        }
+      }
+    } else {
+      abstract_ordered_index *idx = db->open_index(s_name, expected_size, is_append_only);
+      for (size_t i = 0; i < NumWarehouses(); i++)
+        ret[i] = idx;
+    }
+    return ret;
+  }
+
+public:
+  tpcc_bench_runner(abstract_db *db)
+    : bench_runner(db)
+  {
+
+#define OPEN_TABLESPACE_X(x) \
+    partitions[#x] = OpenTablesForTablespace(db, #x, sizeof(x));
+
+    TPCC_TABLE_LIST(OPEN_TABLESPACE_X);
+
+#undef OPEN_TABLESPACE_X
+
+    for (auto &t : partitions) {
+      auto v = unique_filter(t.second);
+      for (size_t i = 0; i < v.size(); i++)
+        open_tables[t.first + "_" + to_string(i)] = v[i];
+    }
+
+    if (g_enable_partition_locks) {
+      static_assert(sizeof(aligned_padded_elem<spinlock>) == CACHELINE_SIZE, "xx");
+      void * const px = memalign(CACHELINE_SIZE, sizeof(aligned_padded_elem<spinlock>) * nthreads);
+      ALWAYS_ASSERT(px);
+      ALWAYS_ASSERT(reinterpret_cast<uintptr_t>(px) % CACHELINE_SIZE == 0);
+      g_partition_locks = reinterpret_cast<aligned_padded_elem<spinlock> *>(px);
+      for (size_t i = 0; i < nthreads; i++) {
+        new (&g_partition_locks[i]) aligned_padded_elem<spinlock>();
+        ALWAYS_ASSERT(!g_partition_locks[i].elem.is_locked());
+      }
+    }
+
+    if (g_new_order_fast_id_gen) {
+      void * const px =
+        memalign(
+            CACHELINE_SIZE,
+            sizeof(aligned_padded_elem<atomic<uint64_t>>) *
+              NumWarehouses() * NumDistrictsPerWarehouse());
+      g_district_ids = reinterpret_cast<aligned_padded_elem<atomic<uint64_t>> *>(px);
+      for (size_t i = 0; i < NumWarehouses() * NumDistrictsPerWarehouse(); i++)
+        new (&g_district_ids[i]) atomic<uint64_t>(3001);
+    }
+  }
+
+protected:
+  virtual vector<bench_loader *>
+  make_loaders()
+  {
+    vector<bench_loader *> ret;
+    ret.push_back(new tpcc_warehouse_loader(9324, db, open_tables, partitions));
+    ret.push_back(new tpcc_item_loader(235443, db, open_tables, partitions));
+    if (enable_parallel_loading) {
+      fast_random r(89785943);
+      for (uint i = 1; i <= NumWarehouses(); i++)
+        ret.push_back(new tpcc_stock_loader(r.next(), db, open_tables, partitions, i));
+    } else {
+      ret.push_back(new tpcc_stock_loader(89785943, db, open_tables, partitions, -1));
+    }
+    ret.push_back(new tpcc_district_loader(129856349, db, open_tables, partitions));
+    if (enable_parallel_loading) {
+      fast_random r(923587856425);
+      for (uint i = 1; i <= NumWarehouses(); i++)
+        ret.push_back(new tpcc_customer_loader(r.next(), db, open_tables, partitions, i));
+    } else {
+      ret.push_back(new tpcc_customer_loader(923587856425, db, open_tables, partitions, -1));
+    }
+    if (enable_parallel_loading) {
+      fast_random r(2343352);
+      for (uint i = 1; i <= NumWarehouses(); i++)
+        ret.push_back(new tpcc_order_loader(r.next(), db, open_tables, partitions, i));
+    } else {
+      ret.push_back(new tpcc_order_loader(2343352, db, open_tables, partitions, -1));
+    }
+    return ret;
+  }
+
+  virtual vector<bench_worker *>
+  make_workers()
+  {
+    const unsigned alignment = coreid::num_cpus_online();
+    const int blockstart =
+      coreid::allocate_contiguous_aligned_block(nthreads, alignment);
+    ALWAYS_ASSERT(blockstart >= 0);
+    ALWAYS_ASSERT((blockstart % alignment) == 0);
+    fast_random r(23984543);
+    vector<bench_worker *> ret;
+    if (NumWarehouses() <= nthreads) {
+      for (size_t i = 0; i < nthreads; i++)
+        ret.push_back(
+          new tpcc_worker(
+            blockstart + i,
+            r.next(), db, open_tables, partitions,
+            &barrier_a, &barrier_b,
+            (i % NumWarehouses()) + 1, (i % NumWarehouses()) + 2));
+    } else {
+      const unsigned nwhse_per_partition = NumWarehouses() / nthreads;
+      for (size_t i = 0; i < nthreads; i++) {
+        const unsigned wstart = i * nwhse_per_partition;
+        const unsigned wend   = (i + 1 == nthreads) ?
+          NumWarehouses() : (i + 1) * nwhse_per_partition;
+        ret.push_back(
+          new tpcc_worker(
+            blockstart + i,
+            r.next(), db, open_tables, partitions,
+            &barrier_a, &barrier_b, wstart+1, wend+1));
+      }
+    }
+    return ret;
+  }
+
+private:
+  map<string, vector<abstract_ordered_index *>> partitions;
+};
+
+void
+tpcc_do_test(abstract_db *db, int argc, char **argv)
+{
+  // parse options
+  optind = 1;
+  bool did_spec_remote_pct = false;
+  while (1) {
+    static struct option long_options[] =
+    {
+      {"disable-cross-partition-transactions" , no_argument       , &g_disable_xpartition_txn             , 1}   ,
+      {"disable-read-only-snapshots"          , no_argument       , &g_disable_read_only_scans            , 1}   ,
+      {"enable-partition-locks"               , no_argument       , &g_enable_partition_locks             , 1}   ,
+      {"enable-separate-tree-per-partition"   , no_argument       , &g_enable_separate_tree_per_partition , 1}   ,
+      {"new-order-remote-item-pct"            , required_argument , 0                                     , 'r'} ,
+      {"new-order-fast-id-gen"                , no_argument       , &g_new_order_fast_id_gen              , 1}   ,
+      {"uniform-item-dist"                    , no_argument       , &g_uniform_item_dist                  , 1}   ,
+      {"order-status-scan-hack"               , no_argument       , &g_order_status_scan_hack             , 1}   ,
+      {"workload-mix"                         , required_argument , 0                                     , 'w'} ,
+      {0, 0, 0, 0}
+    };
+    int option_index = 0;
+    int c = getopt_long(argc, argv, "r:", long_options, &option_index);
+    if (c == -1)
+      break;
+    switch (c) {
+    case 0:
+      if (long_options[option_index].flag != 0)
+        break;
+      abort();
+      break;
+
+    case 'r':
+      g_new_order_remote_item_pct = strtoul(optarg, NULL, 10);
+      ALWAYS_ASSERT(g_new_order_remote_item_pct >= 0 && g_new_order_remote_item_pct <= 100);
+      did_spec_remote_pct = true;
+      break;
+
+    case 'w':
+      {
+        const vector<string> toks = split(optarg, ',');
+        ALWAYS_ASSERT(toks.size() == ARRAY_NELEMS(g_txn_workload_mix));
+        unsigned s = 0;
+        for (size_t i = 0; i < toks.size(); i++) {
+          unsigned p = strtoul(toks[i].c_str(), nullptr, 10);
+          ALWAYS_ASSERT(p >= 0 && p <= 100);
+          s += p;
+          g_txn_workload_mix[i] = p;
+        }
+        ALWAYS_ASSERT(s == 100);
+      }
+      break;
+
+    case '?':
+      /* getopt_long already printed an error message. */
+      exit(1);
+
+    default:
+      abort();
+    }
+  }
+
+  if (did_spec_remote_pct && g_disable_xpartition_txn) {
+    cerr << "WARNING: --new-order-remote-item-pct given with --disable-cross-partition-transactions" << endl;
+    cerr << "  --new-order-remote-item-pct will have no effect" << endl;
+  }
+
+  if (verbose) {
+    cerr << "tpcc settings:" << endl;
+    cerr << "  cross_partition_transactions : " << !g_disable_xpartition_txn << endl;
+    cerr << "  read_only_snapshots          : " << !g_disable_read_only_scans << endl;
+    cerr << "  partition_locks              : " << g_enable_partition_locks << endl;
+    cerr << "  separate_tree_per_partition  : " << g_enable_separate_tree_per_partition << endl;
+    cerr << "  new_order_remote_item_pct    : " << g_new_order_remote_item_pct << endl;
+    cerr << "  new_order_fast_id_gen        : " << g_new_order_fast_id_gen << endl;
+    cerr << "  uniform_item_dist            : " << g_uniform_item_dist << endl;
+    cerr << "  order_status_scan_hack       : " << g_order_status_scan_hack << endl;
+    cerr << "  workload_mix                 : " <<
+      format_list(g_txn_workload_mix,
+                  g_txn_workload_mix + ARRAY_NELEMS(g_txn_workload_mix)) << endl;
+  }
+
+  tpcc_bench_runner r(db);
+  r.run();
+}