benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / mtclient.cc
diff --git a/silo/masstree/mtclient.cc b/silo/masstree/mtclient.cc
new file mode 100644 (file)
index 0000000..a072164
--- /dev/null
@@ -0,0 +1,1702 @@
+/* Masstree
+ * Eddie Kohler, Yandong Mao, Robert Morris
+ * Copyright (c) 2012-2014 President and Fellows of Harvard College
+ * Copyright (c) 2012-2014 Massachusetts Institute of Technology
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, subject to the conditions
+ * listed in the Masstree LICENSE file. These conditions include: you must
+ * preserve this copyright notice, and you cannot mention the copyright
+ * holders in advertising related to the Software without their permission.
+ * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
+ * notice is a summary of the Masstree LICENSE file; the license in that file
+ * is legally binding.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <sys/select.h>
+#include <sys/wait.h>
+#include <assert.h>
+#include <string.h>
+#include <pthread.h>
+#include <ctype.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <arpa/inet.h>
+#include <math.h>
+#include <fcntl.h>
+#include "kvstats.hh"
+#include "kvio.hh"
+#include "json.hh"
+#include "kvtest.hh"
+#include "mtclient.hh"
+#include "kvrandom.hh"
+#include "clp.h"
+
+const char *serverip = "127.0.0.1";
+
+typedef void (*get_async_cb)(struct child *c, struct async *a,
+                             bool has_val, const Str &val);
+typedef void (*put_async_cb)(struct child *c, struct async *a,
+                             int status);
+typedef void (*remove_async_cb)(struct child *c, struct async *a,
+                                int status);
+
+struct async {
+    int cmd; // Cmd_ constant
+    unsigned seq;
+    union {
+        get_async_cb get_fn;
+        put_async_cb put_fn;
+        remove_async_cb remove_fn;
+    };
+    char key[16]; // just first 16 bytes
+    char wanted[16]; // just first 16 bytes
+    int wantedlen;
+    int acked;
+};
+#define MAXWINDOW 512
+unsigned window = MAXWINDOW;
+
+struct child {
+    int s;
+    int udp; // 1 -> udp, 0 -> tcp
+    KVConn *conn;
+
+    struct async a[MAXWINDOW];
+
+    unsigned seq0_;
+    unsigned seq1_;
+    unsigned long long nsent_;
+    int childno;
+
+    inline void check_flush();
+};
+
+void checkasync(struct child *c, int force);
+
+inline void child::check_flush() {
+    if ((seq1_ & ((window - 1) >> 1)) == 0)
+        conn->flush();
+    while (seq1_ - seq0_ >= window)
+        checkasync(this, 1);
+}
+
+void aget(struct child *, const Str &key, const Str &wanted, get_async_cb fn);
+void aget(struct child *c, long ikey, long iwanted, get_async_cb fn);
+void aget_col(struct child *c, const Str& key, int col, const Str& wanted,
+              get_async_cb fn);
+int get(struct child *c, const Str &key, char *val, int max);
+
+void asyncgetcb(struct child *, struct async *a, bool, const Str &val);
+void asyncgetcb_int(struct child *, struct async *a, bool, const Str &val);
+
+void aput(struct child *c, const Str &key, const Str &val,
+          put_async_cb fn = 0, const Str &wanted = Str());
+void aput_col(struct child *c, const Str &key, int col, const Str &val,
+              put_async_cb fn = 0, const Str &wanted = Str());
+int put(struct child *c, const Str &key, const Str &val);
+void asyncputcb(struct child *, struct async *a, int status);
+
+void aremove(struct child *c, const Str &key, remove_async_cb fn);
+bool remove(struct child *c, const Str &key);
+
+void udp1(struct child *);
+void w1b(struct child *);
+void u1(struct child *);
+void over1(struct child *);
+void over2(struct child *);
+void rec1(struct child *);
+void rec2(struct child *);
+void cpa(struct child *);
+void cpb(struct child *);
+void cpc(struct child *);
+void cpd(struct child *);
+void volt1a(struct child *);
+void volt1b(struct child *);
+void volt2a(struct child *);
+void volt2b(struct child *);
+void scantest(struct child *);
+
+static int children = 1;
+static uint64_t nkeys = 0;
+static int prefixLen = 0;
+static int keylen = 0;
+static uint64_t limit = ~uint64_t(0);
+double duration = 10;
+double duration2 = 0;
+int udpflag = 0;
+int quiet = 0;
+int first_server_port = 2117;
+// Should all child processes connects to the same UDP PORT on server
+bool share_server_port = false;
+volatile bool timeout[2] = {false, false};
+int first_local_port = 0;
+const char *input = NULL;
+static int rsinit_part = 0;
+int kvtest_first_seed = 0;
+static int rscale_partsz = 0;
+static int getratio = -1;
+static int minkeyletter = '0';
+static int maxkeyletter = '9';
+
+
+struct kvtest_client {
+    kvtest_client(struct child& c)
+        : c_(&c) {
+    }
+    struct child* child() const {
+        return c_;
+    }
+    int id() const {
+        return c_->childno;
+    }
+    int nthreads() const {
+        return ::children;
+    }
+    char minkeyletter() const {
+        return ::minkeyletter;
+    }
+    char maxkeyletter() const {
+        return ::maxkeyletter;
+    }
+    void register_timeouts(int n) {
+        (void) n;
+    }
+    bool timeout(int which) const {
+        return ::timeout[which];
+    }
+    uint64_t limit() const {
+        return ::limit;
+    }
+    int getratio() const {
+        assert(::getratio >= 0);
+        return ::getratio;
+    }
+    uint64_t nkeys() const {
+        return ::nkeys;
+    }
+    int keylen() const {
+        return ::keylen;
+    }
+    int prefixLen() const {
+        return ::prefixLen;
+    }
+    double now() const {
+        return ::now();
+    }
+
+    void get(long ikey, Str *value) {
+        quick_istr key(ikey);
+        aget(c_, key.string(),
+             Str(reinterpret_cast<const char *>(&value), sizeof(value)),
+             asyncgetcb);
+    }
+    void get(const Str &key, int *ivalue) {
+        aget(c_, key,
+             Str(reinterpret_cast<const char *>(&ivalue), sizeof(ivalue)),
+             asyncgetcb_int);
+    }
+    bool get_sync(long ikey) {
+        char got[512];
+        quick_istr key(ikey);
+        return ::get(c_, key.string(), got, sizeof(got)) >= 0;
+    }
+    void get_check(long ikey, long iexpected) {
+        aget(c_, ikey, iexpected, 0);
+    }
+    void get_check(const char *key, const char *val) {
+        aget(c_, Str(key), Str(val), 0);
+    }
+    void get_check(const Str &key, const Str &val) {
+        aget(c_, key, val, 0);
+    }
+    void get_check_key8(long ikey, long iexpected) {
+        quick_istr key(ikey, 8), expected(iexpected);
+        aget(c_, key.string(), expected.string(), 0);
+    }
+    void get_check_key10(long ikey, long iexpected) {
+        quick_istr key(ikey, 10), expected(iexpected);
+        aget(c_, key.string(), expected.string(), 0);
+    }
+    void many_get_check(int, long [], long []) {
+        assert(0);
+    }
+    void get_col_check(const Str &key, int col, const Str &value) {
+        aget_col(c_, key, col, value, 0);
+    }
+    void get_col_check(long ikey, int col, long ivalue) {
+        quick_istr key(ikey), value(ivalue);
+        get_col_check(key.string(), col, value.string());
+    }
+    void get_col_check_key10(long ikey, int col, long ivalue) {
+        quick_istr key(ikey, 10), value(ivalue);
+        get_col_check(key.string(), col, value.string());
+    }
+    void get_check_sync(long ikey, long iexpected) {
+        char key[512], val[512], got[512];
+        sprintf(key, "%010ld", ikey);
+        sprintf(val, "%ld", iexpected);
+        memset(got, 0, sizeof(got));
+        ::get(c_, Str(key), got, sizeof(got));
+        if (strcmp(val, got)) {
+            fprintf(stderr, "key %s, expected %s, got %s\n", key, val, got);
+            always_assert(0);
+        }
+    }
+
+    void put(const Str &key, const Str &value) {
+        aput(c_, key, value);
+    }
+    void put(const Str &key, const Str &value, int *status) {
+        aput(c_, key, value,
+             asyncputcb,
+             Str(reinterpret_cast<const char *>(&status), sizeof(status)));
+    }
+    void put(const char *key, const char *value) {
+        aput(c_, Str(key), Str(value));
+    }
+    void put(const Str &key, long ivalue) {
+        quick_istr value(ivalue);
+        aput(c_, key, value.string());
+    }
+    void put(long ikey, long ivalue) {
+        quick_istr key(ikey), value(ivalue);
+        aput(c_, key.string(), value.string());
+    }
+    void put_key8(long ikey, long ivalue) {
+        quick_istr key(ikey, 8), value(ivalue);
+        aput(c_, key.string(), value.string());
+    }
+    void put_key10(long ikey, long ivalue) {
+        quick_istr key(ikey, 10), value(ivalue);
+        aput(c_, key.string(), value.string());
+    }
+    void put_col(const Str &key, int col, const Str &value) {
+        aput_col(c_, key, col, value);
+    }
+    void put_col(long ikey, int col, long ivalue) {
+        quick_istr key(ikey), value(ivalue);
+        put_col(key.string(), col, value.string());
+    }
+    void put_col_key10(long ikey, int col, long ivalue) {
+        quick_istr key(ikey, 10), value(ivalue);
+        put_col(key.string(), col, value.string());
+    }
+    void put_sync(long ikey, long ivalue) {
+        quick_istr key(ikey, 10), value(ivalue);
+        ::put(c_, key.string(), value.string());
+    }
+
+    void remove(const Str &key) {
+        aremove(c_, key, 0);
+    }
+    void remove(long ikey) {
+        quick_istr key(ikey);
+        remove(key.string());
+    }
+    bool remove_sync(long ikey) {
+        quick_istr key(ikey);
+        return ::remove(c_, key.string());
+    }
+
+    int ruscale_partsz() const {
+        return ::rscale_partsz;
+    }
+    int ruscale_init_part_no() const {
+        return ::rsinit_part;
+    }
+    long nseqkeys() const {
+        return 16 * ::rscale_partsz;
+    }
+    void wait_all() {
+        checkasync(c_, 2);
+    }
+    void puts_done() {
+    }
+    void rcu_quiesce() {
+    }
+    void notice(String s) {
+        if (!quiet) {
+            if (!s.empty() && s.back() == '\n')
+                s = s.substr(0, -1);
+            if (s.empty() || isspace((unsigned char) s[0]))
+                fprintf(stderr, "%d%.*s\n", c_->childno, s.length(), s.data());
+            else
+                fprintf(stderr, "%d %.*s\n", c_->childno, s.length(), s.data());
+        }
+    }
+    void notice(const char *fmt, ...) {
+        if (!quiet) {
+            va_list val;
+            va_start(val, fmt);
+            String x;
+            if (!*fmt || isspace((unsigned char) *fmt))
+                x = String(c_->childno) + fmt;
+            else
+                x = String(c_->childno) + String(" ") + fmt;
+            vfprintf(stderr, x.c_str(), val);
+            va_end(val);
+        }
+    }
+    const Json& report(const Json& x) {
+        return report_.merge(x);
+    }
+    void finish() {
+        if (!quiet) {
+            lcdf::StringAccum sa;
+            double dv;
+            if (report_.count("puts"))
+                sa << " total " << report_.get("puts");
+            if (report_.get("puts_per_sec", dv))
+                sa.snprintf(100, " %.0f put/s", dv);
+            if (report_.get("gets_per_sec", dv))
+                sa.snprintf(100, " %.0f get/s", dv);
+            if (!sa.empty())
+                notice(sa.take_string());
+        }
+        printf("%s\n", report_.unparse().c_str());
+    }
+    kvrandom_random rand;
+    struct child *c_;
+    Json report_;
+};
+
+
+#define TESTRUNNER_CLIENT_TYPE kvtest_client&
+#include "testrunner.hh"
+
+MAKE_TESTRUNNER(rw1, kvtest_rw1(client));
+MAKE_TESTRUNNER(rw2, kvtest_rw2(client));
+MAKE_TESTRUNNER(rw3, kvtest_rw3(client));
+MAKE_TESTRUNNER(rw4, kvtest_rw4(client));
+MAKE_TESTRUNNER(rw1fixed, kvtest_rw1fixed(client));
+MAKE_TESTRUNNER(rw16, kvtest_rw16(client));
+MAKE_TESTRUNNER(sync_rw1, kvtest_sync_rw1(client));
+MAKE_TESTRUNNER(r1, kvtest_r1_seed(client, kvtest_first_seed + client.id()));
+MAKE_TESTRUNNER(w1, kvtest_w1_seed(client, kvtest_first_seed + client.id()));
+MAKE_TESTRUNNER(w1b, w1b(client.child()));
+MAKE_TESTRUNNER(u1, u1(client.child()));
+MAKE_TESTRUNNER(wd1, kvtest_wd1(10000000, 1, client));
+MAKE_TESTRUNNER(wd1m1, kvtest_wd1(100000000, 1, client));
+MAKE_TESTRUNNER(wd1m2, kvtest_wd1(1000000000, 4, client));
+MAKE_TESTRUNNER(wd1check, kvtest_wd1_check(10000000, 1, client));
+MAKE_TESTRUNNER(wd1m1check, kvtest_wd1_check(100000000, 1, client));
+MAKE_TESTRUNNER(wd1m2check, kvtest_wd1_check(1000000000, 4, client));
+MAKE_TESTRUNNER(wd2, kvtest_wd2(client));
+MAKE_TESTRUNNER(wd2check, kvtest_wd2_check(client));
+MAKE_TESTRUNNER(tri1, kvtest_tri1(10000000, 1, client));
+MAKE_TESTRUNNER(tri1check, kvtest_tri1_check(10000000, 1, client));
+MAKE_TESTRUNNER(same, kvtest_same(client));
+MAKE_TESTRUNNER(wcol1, kvtest_wcol1at(client, client.id() % 24, kvtest_first_seed + client.id() % 48, 5000000));
+MAKE_TESTRUNNER(rcol1, kvtest_rcol1at(client, client.id() % 24, kvtest_first_seed + client.id() % 48, 5000000));
+MAKE_TESTRUNNER(wcol1o1, kvtest_wcol1at(client, (client.id() + 1) % 24, kvtest_first_seed + client.id() % 48, 5000000));
+MAKE_TESTRUNNER(rcol1o1, kvtest_rcol1at(client, (client.id() + 1) % 24, kvtest_first_seed + client.id() % 48, 5000000));
+MAKE_TESTRUNNER(wcol1o2, kvtest_wcol1at(client, (client.id() + 2) % 24, kvtest_first_seed + client.id() % 48, 5000000));
+MAKE_TESTRUNNER(rcol1o2, kvtest_rcol1at(client, (client.id() + 2) % 24, kvtest_first_seed + client.id() % 48, 5000000));
+MAKE_TESTRUNNER(over1, over1(client.child()));
+MAKE_TESTRUNNER(over2, over2(client.child()));
+MAKE_TESTRUNNER(rec1, rec1(client.child()));
+MAKE_TESTRUNNER(rec2, rec2(client.child()));
+MAKE_TESTRUNNER(cpa, cpa(client.child()));
+MAKE_TESTRUNNER(cpb, cpb(client.child()));
+MAKE_TESTRUNNER(cpc, cpc(client.child()));
+MAKE_TESTRUNNER(cpd, cpd(client.child()));
+MAKE_TESTRUNNER(volt1a, volt1a(client.child()));
+MAKE_TESTRUNNER(volt1b, volt1b(client.child()));
+MAKE_TESTRUNNER(volt2a, volt2a(client.child()));
+MAKE_TESTRUNNER(volt2b, volt2b(client.child()));
+MAKE_TESTRUNNER(scantest, scantest(client.child()));
+MAKE_TESTRUNNER(wscale, kvtest_wscale(client));
+MAKE_TESTRUNNER(ruscale_init, kvtest_ruscale_init(client));
+MAKE_TESTRUNNER(rscale, kvtest_rscale(client));
+MAKE_TESTRUNNER(uscale, kvtest_uscale(client));
+MAKE_TESTRUNNER(long_init, kvtest_long_init(client));
+MAKE_TESTRUNNER(long_go, kvtest_long_go(client));
+MAKE_TESTRUNNER(udp1, kvtest_udp1(client));
+
+void run_child(testrunner*, int childno);
+
+
+void
+usage()
+{
+  fprintf(stderr, "Usage: mtclient [-s serverip] [-w window] [--udp] "\
+          "[-j nchildren] [-d duration] [--ssp] [--flp first_local_port] "\
+          "[--fsp first_server_port] [-i json_input]\nTests:\n");
+  testrunner::print_names(stderr, 5);
+  exit(1);
+}
+
+void
+settimeout(int)
+{
+  if (!timeout[0]) {
+    timeout[0] = true;
+    if (duration2)
+        alarm((int) ceil(duration2));
+  } else
+    timeout[1] = true;
+}
+
+enum { clp_val_suffixdouble = Clp_ValFirstUser };
+enum { opt_threads = 1, opt_threads_deprecated, opt_duration, opt_duration2,
+       opt_window, opt_server, opt_first_server_port, opt_quiet, opt_udp,
+       opt_first_local_port, opt_share_server_port, opt_input,
+       opt_rsinit_part, opt_first_seed, opt_rscale_partsz, opt_keylen,
+       opt_limit, opt_prefix_len, opt_nkeys, opt_get_ratio, opt_minkeyletter,
+       opt_maxkeyletter, opt_nofork };
+static const Clp_Option options[] = {
+    { "threads", 'j', opt_threads, Clp_ValInt, 0 },
+    { 0, 'n', opt_threads_deprecated, Clp_ValInt, 0 },
+    { "duration", 'd', opt_duration, Clp_ValDouble, 0 },
+    { "duration2", 0, opt_duration2, Clp_ValDouble, 0 },
+    { "d2", 0, opt_duration2, Clp_ValDouble, 0 },
+    { "window", 'w', opt_window, Clp_ValUnsigned, 0 },
+    { "server-ip", 's', opt_server, Clp_ValString, 0 },
+    { "first-server-port", 0, opt_first_server_port, Clp_ValInt, 0 },
+    { "fsp", 0, opt_first_server_port, Clp_ValInt, 0 },
+    { "quiet", 'q', opt_quiet, 0, Clp_Negate },
+    { "udp", 'u', opt_udp, 0, Clp_Negate },
+    { "first-local-port", 0, opt_first_local_port, Clp_ValInt, 0 },
+    { "flp", 0, opt_first_local_port, Clp_ValInt, 0 },
+    { "share-server-port", 0, opt_share_server_port, 0, Clp_Negate },
+    { "ssp", 0, opt_share_server_port, 0, Clp_Negate },
+    { "input", 'i', opt_input, Clp_ValString, 0 },
+    { "rsinit_part", 0, opt_rsinit_part, Clp_ValInt, 0 },
+    { "first_seed", 0, opt_first_seed, Clp_ValInt, 0 },
+    { "rscale_partsz", 0, opt_rscale_partsz, Clp_ValInt, 0 },
+    { "keylen", 0, opt_keylen, Clp_ValInt, 0 },
+    { "limit", 'l', opt_limit, clp_val_suffixdouble, 0 },
+    { "prefixLen", 0, opt_prefix_len, Clp_ValInt, 0 },
+    { "nkeys", 0, opt_nkeys, Clp_ValInt, 0 },
+    { "getratio", 0, opt_get_ratio, Clp_ValInt, 0 },
+    { "minkeyletter", 0, opt_minkeyletter, Clp_ValString, 0 },
+    { "maxkeyletter", 0, opt_maxkeyletter, Clp_ValString, 0 },
+    { "no-fork", 0, opt_nofork, 0, 0 }
+};
+
+int
+main(int argc, char *argv[])
+{
+  int i, pid, status;
+  testrunner* test = 0;
+  int pipes[512];
+  int dofork = 1;
+
+  Clp_Parser *clp = Clp_NewParser(argc, argv, (int) arraysize(options), options);
+  Clp_AddType(clp, clp_val_suffixdouble, Clp_DisallowOptions, clp_parse_suffixdouble, 0);
+  int opt;
+  while ((opt = Clp_Next(clp)) != Clp_Done) {
+      switch (opt) {
+      case opt_threads:
+          children = clp->val.i;
+          break;
+      case opt_threads_deprecated:
+          Clp_OptionError(clp, "%<%O%> is deprecated, use %<-j%>");
+          children = clp->val.i;
+          break;
+      case opt_duration:
+          duration = clp->val.d;
+          break;
+      case opt_duration2:
+          duration2 = clp->val.d;
+          break;
+      case opt_window:
+          window = clp->val.u;
+          always_assert(window <= MAXWINDOW);
+          always_assert((window & (window - 1)) == 0); // power of 2
+          break;
+      case opt_server:
+          serverip = clp->vstr;
+          break;
+      case opt_first_server_port:
+          first_server_port = clp->val.i;
+          break;
+      case opt_quiet:
+          quiet = !clp->negated;
+          break;
+      case opt_udp:
+          udpflag = !clp->negated;
+          break;
+      case opt_first_local_port:
+          first_local_port = clp->val.i;
+          break;
+      case opt_share_server_port:
+          share_server_port = !clp->negated;
+          break;
+      case opt_input:
+          input = clp->vstr;
+          break;
+      case opt_rsinit_part:
+          rsinit_part = clp->val.i;
+          break;
+      case opt_first_seed:
+          kvtest_first_seed = clp->val.i;
+          break;
+      case opt_rscale_partsz:
+          rscale_partsz = clp->val.i;
+          break;
+      case opt_keylen:
+          keylen = clp->val.i;
+          break;
+      case opt_limit:
+          limit = (uint64_t) clp->val.d;
+          break;
+      case opt_prefix_len:
+          prefixLen = clp->val.i;
+          break;
+      case opt_nkeys:
+          nkeys = clp->val.i;
+          break;
+      case opt_get_ratio:
+          getratio = clp->val.i;
+          break;
+      case opt_minkeyletter:
+          minkeyletter = clp->vstr[0];
+          break;
+      case opt_maxkeyletter:
+          maxkeyletter = clp->vstr[0];
+          break;
+      case opt_nofork:
+          dofork = !clp->negated;
+          break;
+      case Clp_NotOption:
+          test = testrunner::find(clp->vstr);
+          if (!test)
+              usage();
+          break;
+      case Clp_BadOption:
+          usage();
+          break;
+      }
+  }
+  if(children < 1 || (children != 1 && !dofork))
+    usage();
+  if (!test)
+      test = testrunner::first();
+
+  printf("%s, w %d, test %s, children %d\n",
+         udpflag ? "udp" : "tcp", window,
+         test->name().c_str(), children);
+
+  fflush(stdout);
+
+  if (dofork) {
+      for(i = 0; i < children; i++){
+          int ptmp[2];
+          int r = pipe(ptmp);
+          always_assert(r == 0);
+          pid = fork();
+          if(pid < 0){
+              perror("fork");
+              exit(1);
+          }
+          if(pid == 0){
+              close(ptmp[0]);
+              dup2(ptmp[1], 1);
+              close(ptmp[1]);
+              signal(SIGALRM, settimeout);
+              alarm((int) ceil(duration));
+              run_child(test, i);
+              exit(0);
+          }
+          pipes[i] = ptmp[0];
+          close(ptmp[1]);
+      }
+      for(i = 0; i < children; i++){
+          if(wait(&status) <= 0){
+              perror("wait");
+              exit(1);
+          }
+          if (WIFSIGNALED(status))
+              fprintf(stderr, "child %d died by signal %d\n", i, WTERMSIG(status));
+      }
+  } else {
+      int ptmp[2];
+      int r = pipe(ptmp);
+      always_assert(r == 0);
+      pipes[0] = ptmp[0];
+      int stdout_fd = dup(STDOUT_FILENO);
+      always_assert(stdout_fd > 0);
+      r = dup2(ptmp[1], STDOUT_FILENO);
+      always_assert(r >= 0);
+      close(ptmp[1]);
+      signal(SIGALRM, settimeout);
+      alarm((int) ceil(duration));
+      run_child(test, 0);
+      fflush(stdout);
+      r = dup2(stdout_fd, STDOUT_FILENO);
+      always_assert(r >= 0);
+      close(stdout_fd);
+  }
+
+  long long total = 0;
+  kvstats puts, gets, scans, puts_per_sec, gets_per_sec, scans_per_sec;
+  for(i = 0; i < children; i++){
+    char buf[2048];
+    int cc = read(pipes[i], buf, sizeof(buf)-1);
+    assert(cc > 0);
+    buf[cc] = 0;
+    printf("%s", buf);
+    Json bufj = Json::parse(buf, buf + cc);
+    long long iv;
+    double dv;
+    if (bufj.to_i(iv))
+        total += iv;
+    else if (bufj.is_object()) {
+        if (bufj.get("ops", iv)
+            || bufj.get("total", iv)
+            || bufj.get("count", iv))
+            total += iv;
+        if (bufj.get("puts", iv))
+            puts.add(iv);
+        if (bufj.get("gets", iv))
+            gets.add(iv);
+        if (bufj.get("scans", iv))
+            scans.add(iv);
+        if (bufj.get("puts_per_sec", dv))
+            puts_per_sec.add(dv);
+        if (bufj.get("gets_per_sec", dv))
+            gets_per_sec.add(dv);
+        if (bufj.get("scans_per_sec", dv))
+            scans_per_sec.add(dv);
+    }
+  }
+
+  printf("total %lld\n", total);
+  puts.print_report("puts");
+  gets.print_report("gets");
+  scans.print_report("scans");
+  puts_per_sec.print_report("puts/s");
+  gets_per_sec.print_report("gets/s");
+  scans_per_sec.print_report("scans/s");
+
+  exit(0);
+}
+
+void
+run_child(testrunner* test, int childno)
+{
+  struct sockaddr_in sin;
+  int ret, yes = 1;
+  struct child c;
+
+  bzero(&c, sizeof(c));
+  c.childno = childno;
+
+  if(udpflag){
+    c.udp = 1;
+    c.s = socket(AF_INET, SOCK_DGRAM, 0);
+  } else {
+    c.s = socket(AF_INET, SOCK_STREAM, 0);
+  }
+  if (first_local_port) {
+    bzero(&sin, sizeof(sin));
+    sin.sin_family = AF_INET;
+    sin.sin_port = htons(first_local_port + (childno % 48));
+    ret = ::bind(c.s, (struct sockaddr *) &sin, sizeof(sin));
+    if (ret < 0) {
+      perror("bind");
+      exit(1);
+    }
+  }
+
+  assert(c.s >= 0);
+  setsockopt(c.s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
+
+  bzero(&sin, sizeof(sin));
+  sin.sin_family = AF_INET;
+  if (udpflag && !share_server_port)
+    sin.sin_port = htons(first_server_port + (childno % 48));
+  else
+    sin.sin_port = htons(first_server_port);
+  sin.sin_addr.s_addr = inet_addr(serverip);
+  ret = connect(c.s, (struct sockaddr *) &sin, sizeof(sin));
+  if(ret < 0){
+    perror("connect");
+    exit(1);
+  }
+
+  c.conn = new KVConn(c.s, !udpflag);
+  kvtest_client client(c);
+
+  test->run(client);
+
+  checkasync(&c, 2);
+
+  delete c.conn;
+  close(c.s);
+}
+
+void KVConn::hard_check(int tryhard) {
+    masstree_precondition(inbufpos_ == inbuflen_);
+    if (parser_.empty()) {
+        inbufpos_ = inbuflen_ = 0;
+        for (auto x : oldinbuf_)
+            delete[] x;
+        oldinbuf_.clear();
+    } else if (inbufpos_ == inbufsz) {
+        oldinbuf_.push_back(inbuf_);
+        inbuf_ = new char[inbufsz];
+        inbufpos_ = inbuflen_ = 0;
+    }
+    if (tryhard == 1) {
+        fd_set rfds;
+        FD_ZERO(&rfds);
+        FD_SET(infd_, &rfds);
+        struct timeval tv = {0, 0};
+        if (select(infd_ + 1, &rfds, NULL, NULL, &tv) <= 0)
+            return;
+    } else
+        kvflush(out_);
+
+    ssize_t r = read(infd_, inbuf_ + inbufpos_, inbufsz - inbufpos_);
+    if (r != -1)
+        inbuflen_ += r;
+}
+
+int
+get(struct child *c, const Str &key, char *val, int max)
+{
+    assert(c->seq0_ == c->seq1_);
+
+    unsigned sseq = c->seq1_;
+    ++c->seq1_;
+    ++c->nsent_;
+
+    c->conn->sendgetwhole(key, sseq);
+    c->conn->flush();
+
+    const Json& result = c->conn->receive();
+    always_assert(result && result[0] == sseq);
+    ++c->seq0_;
+    if (!result[2])
+        return -1;
+    always_assert(result.size() == 3 && result[2].is_s()
+                  && result[2].as_s().length() <= max);
+    memcpy(val, result[2].as_s().data(), result[2].as_s().length());
+    return result[2].as_s().length();
+}
+
+// builtin aget callback: no check
+void
+nocheck(struct child *, struct async *, bool, const Str &)
+{
+}
+
+// builtin aget callback: store string
+void
+asyncgetcb(struct child *, struct async *a, bool, const Str &val)
+{
+    Str *sptr;
+    assert(a->wantedlen == sizeof(Str *));
+    memcpy(&sptr, a->wanted, sizeof(Str *));
+    sptr->len = std::min(sptr->len, val.len);
+    memcpy(const_cast<char *>(sptr->s), val.s, sptr->len);
+}
+
+// builtin aget callback: store string
+void
+asyncgetcb_int(struct child *, struct async *a, bool, const Str &val)
+{
+    int *vptr;
+    assert(a->wantedlen == sizeof(int *));
+    memcpy(&vptr, a->wanted, sizeof(int *));
+    long x = 0;
+    if (val.len <= 0)
+        x = -1;
+    else
+        for (int i = 0; i < val.len; ++i)
+            if (val.s[i] >= '0' && val.s[i] <= '9')
+                x = (x * 10) + (val.s[i] - '0');
+            else {
+                x = -1;
+                break;
+            }
+    *vptr = x;
+}
+
+// default aget callback: check val against wanted
+void
+defaultget(struct child *, struct async *a, bool have_val, const Str &val)
+{
+    // check that we got the expected value
+    int wanted_avail = std::min(a->wantedlen, int(sizeof(a->wanted)));
+    if (!have_val
+        || a->wantedlen != val.len
+        || memcmp(val.s, a->wanted, wanted_avail) != 0)
+        fprintf(stderr, "oops wanted %.*s(%d) got %.*s(%d)\n",
+                wanted_avail, a->wanted, a->wantedlen, val.len, val.s, val.len);
+    else {
+        always_assert(a->wantedlen == val.len);
+        always_assert(memcmp(val.s, a->wanted, wanted_avail) == 0);
+    }
+}
+
+// builtin aput/aremove callback: store status
+void
+asyncputcb(struct child *, struct async *a, int status)
+{
+    int *sptr;
+    assert(a->wantedlen == sizeof(int *));
+    memcpy(&sptr, a->wanted, sizeof(int *));
+    *sptr = status;
+}
+
+// process any waiting replies to aget() and aput().
+// force=0 means non-blocking check if anything waiting on socket.
+// force=1 means wait for at least one reply.
+// force=2 means wait for all pending (nw-nr) replies.
+void
+checkasync(struct child *c, int force)
+{
+    while (c->seq0_ != c->seq1_) {
+        if (force)
+            c->conn->flush();
+        if (c->conn->check(force ? 2 : 1) > 0) {
+            const Json& result = c->conn->receive();
+            always_assert(result);
+
+            // is rseq in the nr..nw window?
+            // replies might arrive out of order if UDP
+            unsigned rseq = result[0].as_i();
+            always_assert(rseq - c->seq0_ < c->seq1_ - c->seq0_);
+            struct async *a = &c->a[rseq & (window - 1)];
+            always_assert(a->seq == rseq);
+
+            // advance the nr..nw window
+            always_assert(a->acked == 0);
+            a->acked = 1;
+            while (c->seq0_ != c->seq1_ && c->a[c->seq0_ & (window - 1)].acked)
+                ++c->seq0_;
+
+            // might have been the last free slot,
+            // don't want to re-use it underfoot.
+            struct async tmpa = *a;
+
+            if(tmpa.cmd == Cmd_Get){
+                // this is a reply to a get
+                String s = result.size() > 2 ? result[2].as_s() : String();
+                if (tmpa.get_fn)
+                    (tmpa.get_fn)(c, &tmpa, result.size() > 2, s);
+            } else if (tmpa.cmd == Cmd_Put || tmpa.cmd == Cmd_Replace) {
+                // this is a reply to a put
+                if (tmpa.put_fn)
+                    (tmpa.put_fn)(c, &tmpa, result[2].as_i());
+            } else if(tmpa.cmd == Cmd_Scan){
+                // this is a reply to a scan
+                always_assert((result.size() - 2) / 2 <= tmpa.wantedlen);
+            } else if (tmpa.cmd == Cmd_Remove) {
+                // this is a reply to a remove
+                if (tmpa.remove_fn)
+                    (tmpa.remove_fn)(c, &tmpa, result[2].as_i());
+            } else {
+                always_assert(0);
+            }
+
+            if (force < 2)
+                force = 0;
+        } else if (!force)
+            break;
+    }
+}
+
+// async get, checkasync() will eventually check reply
+// against wanted.
+void
+aget(struct child *c, const Str &key, const Str &wanted, get_async_cb fn)
+{
+    c->check_flush();
+
+    c->conn->sendgetwhole(key, c->seq1_);
+    if (c->udp)
+        c->conn->flush();
+
+    struct async *a = &c->a[c->seq1_ & (window - 1)];
+    a->cmd = Cmd_Get;
+    a->seq = c->seq1_;
+    a->get_fn = (fn ? fn : defaultget);
+    assert(key.len < int(sizeof(a->key)));
+    memcpy(a->key, key.s, key.len);
+    a->key[key.len] = 0;
+    a->wantedlen = wanted.len;
+    int wantedavail = std::min(wanted.len, int(sizeof(a->wanted)));
+    memcpy(a->wanted, wanted.s, wantedavail);
+    a->acked = 0;
+
+    ++c->seq1_;
+    ++c->nsent_;
+}
+
+void aget_col(struct child *c, const Str& key, int col, const Str& wanted,
+              get_async_cb fn)
+{
+    c->check_flush();
+
+    c->conn->sendgetcol(key, col, c->seq1_);
+    if (c->udp)
+        c->conn->flush();
+
+    struct async *a = &c->a[c->seq1_ & (window - 1)];
+    a->cmd = Cmd_Get;
+    a->seq = c->seq1_;
+    a->get_fn = (fn ? fn : defaultget);
+    assert(key.len < int(sizeof(a->key)));
+    memcpy(a->key, key.s, key.len);
+    a->key[key.len] = 0;
+    a->wantedlen = wanted.len;
+    int wantedavail = std::min(wanted.len, int(sizeof(a->wanted)));
+    memcpy(a->wanted, wanted.s, wantedavail);
+    a->acked = 0;
+
+    ++c->seq1_;
+    ++c->nsent_;
+}
+
+void
+aget(struct child *c, long ikey, long iwanted, get_async_cb fn)
+{
+    quick_istr key(ikey), wanted(iwanted);
+    aget(c, key.string(), wanted.string(), fn);
+}
+
+int
+put(struct child *c, const Str &key, const Str &val)
+{
+    always_assert(c->seq0_ == c->seq1_);
+
+    unsigned int sseq = c->seq1_;
+    c->conn->sendputwhole(key, val, sseq);
+    c->conn->flush();
+
+    const Json& result = c->conn->receive();
+    always_assert(result && result[0] == sseq);
+
+    ++c->seq0_;
+    ++c->seq1_;
+    ++c->nsent_;
+    return 0;
+}
+
+void
+aput(struct child *c, const Str &key, const Str &val,
+     put_async_cb fn, const Str &wanted)
+{
+    c->check_flush();
+
+    c->conn->sendputwhole(key, val, c->seq1_);
+    if (c->udp)
+        c->conn->flush();
+
+    struct async *a = &c->a[c->seq1_ & (window - 1)];
+    a->cmd = Cmd_Put;
+    a->seq = c->seq1_;
+    assert(key.len < int(sizeof(a->key)) - 1);
+    memcpy(a->key, key.s, key.len);
+    a->key[key.len] = 0;
+    a->put_fn = fn;
+    if (fn) {
+        assert(wanted.len <= int(sizeof(a->wanted)));
+        a->wantedlen = wanted.len;
+        memcpy(a->wanted, wanted.s, wanted.len);
+    } else {
+        a->wantedlen = -1;
+        a->wanted[0] = 0;
+    }
+    a->acked = 0;
+
+    ++c->seq1_;
+    ++c->nsent_;
+}
+
+void aput_col(struct child *c, const Str &key, int col, const Str &val,
+              put_async_cb fn, const Str &wanted)
+{
+    c->check_flush();
+
+    c->conn->sendputcol(key, col, val, c->seq1_);
+    if (c->udp)
+        c->conn->flush();
+
+    struct async *a = &c->a[c->seq1_ & (window - 1)];
+    a->cmd = Cmd_Put;
+    a->seq = c->seq1_;
+    assert(key.len < int(sizeof(a->key)) - 1);
+    memcpy(a->key, key.s, key.len);
+    a->key[key.len] = 0;
+    a->put_fn = fn;
+    if (fn) {
+        assert(wanted.len <= int(sizeof(a->wanted)));
+        a->wantedlen = wanted.len;
+        memcpy(a->wanted, wanted.s, wanted.len);
+    } else {
+        a->wantedlen = -1;
+        a->wanted[0] = 0;
+    }
+    a->acked = 0;
+
+    ++c->seq1_;
+    ++c->nsent_;
+}
+
+bool remove(struct child *c, const Str &key)
+{
+    always_assert(c->seq0_ == c->seq1_);
+
+    unsigned int sseq = c->seq1_;
+    c->conn->sendremove(key, sseq);
+    c->conn->flush();
+
+    const Json& result = c->conn->receive();
+    always_assert(result && result[0] == sseq);
+
+    ++c->seq0_;
+    ++c->seq1_;
+    ++c->nsent_;
+    return result[2].to_b();
+}
+
+void
+aremove(struct child *c, const Str &key, remove_async_cb fn)
+{
+    c->check_flush();
+
+    c->conn->sendremove(key, c->seq1_);
+    if (c->udp)
+        c->conn->flush();
+
+    struct async *a = &c->a[c->seq1_ & (window - 1)];
+    a->cmd = Cmd_Remove;
+    a->seq = c->seq1_;
+    assert(key.len < int(sizeof(a->key)) - 1);
+    memcpy(a->key, key.s, key.len);
+    a->key[key.len] = 0;
+    a->acked = 0;
+    a->remove_fn = fn;
+
+    ++c->seq1_;
+    ++c->nsent_;
+}
+
+int
+xcompar(const void *xa, const void *xb)
+{
+  long *a = (long *) xa;
+  long *b = (long *) xb;
+  if(*a == *b)
+    return 0;
+  if(*a < *b)
+    return -1;
+  return 1;
+}
+
+// like w1, but in a binary-tree-like order that
+// produces a balanced 3-wide tree.
+void
+w1b(struct child *c)
+{
+  int n;
+  if (limit == ~(uint64_t) 0)
+      n = 4000000;
+  else
+      n = std::min(limit, (uint64_t) INT_MAX);
+  long *a = (long *) malloc(sizeof(long) * n);
+  always_assert(a);
+  char *done = (char *) malloc(n);
+
+  srandom(kvtest_first_seed + c->childno);
+
+  // insert in an order which causes 3-wide
+  // to be balanced
+
+  for(int i = 0; i < n; i++){
+    a[i] = random();
+    done[i] = 0;
+  }
+
+  qsort(a, n, sizeof(a[0]), xcompar);
+  always_assert(a[0] <= a[1] && a[1] <= a[2] && a[2] <= a[3]);
+
+  double t0 = now(), t1;
+
+  for(int stride = n / 2; stride > 0; stride /= 2){
+    for(int i = stride; i < n; i += stride){
+      if(done[i] == 0){
+        done[i] = 1;
+        char key[512], val[512];
+        sprintf(key, "%010ld", a[i]);
+        sprintf(val, "%ld", a[i] + 1);
+        aput(c, Str(key), Str(val));
+      }
+    }
+  }
+  for(int i = 0; i < n; i++){
+    if(done[i] == 0){
+      done[i] = 1;
+      char key[512], val[512];
+      sprintf(key, "%010ld", a[i]);
+      sprintf(val, "%ld", a[i] + 1);
+      aput(c, Str(key), Str(val));
+    }
+  }
+
+  checkasync(c, 2);
+  t1 = now();
+
+  free(done);
+  free(a);
+  Json result = Json().set("total", (long) (n / (t1 - t0)))
+    .set("puts", n)
+    .set("puts_per_sec", n / (t1 - t0));
+  printf("%s\n", result.unparse().c_str());
+}
+
+// update random keys from a set of 10 million.
+// maybe best to run it twice, first time to
+// populate the database.
+void
+u1(struct child *c)
+{
+  int i, n;
+  double t0 = now();
+
+  srandom(kvtest_first_seed + c->childno);
+
+  for(i = 0; i < 10000000; i++){
+    char key[512], val[512];
+    long x = random() % 10000000;
+    sprintf(key, "%ld", x);
+    sprintf(val, "%ld", x + 1);
+    aput(c, Str(key), Str(val));
+  }
+  n = i;
+
+  checkasync(c, 2);
+
+  double t1 = now();
+  Json result = Json().set("total", (long) (n / (t1 - t0)))
+    .set("puts", n)
+    .set("puts_per_sec", n / (t1 - t0));
+  printf("%s\n", result.unparse().c_str());
+}
+
+#define CPN 10000000
+
+void
+cpa(struct child *c)
+{
+  int i, n;
+  double t0 = now();
+
+  srandom(kvtest_first_seed + c->childno);
+
+  for(i = 0; i < CPN; i++){
+    char key[512], val[512];
+    long x = random();
+    sprintf(key, "%ld", x);
+    sprintf(val, "%ld", x + 1);
+    aput(c, Str(key), Str(val));
+  }
+  n = i;
+
+  checkasync(c, 2);
+
+  double t1 = now();
+  Json result = Json().set("total", (long) (n / (t1 - t0)))
+    .set("puts", n)
+    .set("puts_per_sec", n / (t1 - t0));
+  printf("%s\n", result.unparse().c_str());
+
+}
+
+void
+cpc(struct child *c)
+{
+  int i, n;
+  double t0 = now();
+
+  srandom(kvtest_first_seed + c->childno);
+
+  for(i = 0; !timeout[0]; i++){
+    char key[512], val[512];
+    if (i % CPN == 0)
+      srandom(kvtest_first_seed + c->childno);
+    long x = random();
+    sprintf(key, "%ld", x);
+    sprintf(val, "%ld", x + 1);
+    aget(c, Str(key), Str(val), NULL);
+  }
+  n = i;
+
+  checkasync(c, 2);
+
+  double t1 = now();
+  Json result = Json().set("total", (long) (n / (t1 - t0)))
+    .set("gets", n)
+    .set("gets_per_sec", n / (t1 - t0));
+  printf("%s\n", result.unparse().c_str());
+}
+
+void
+cpd(struct child *c)
+{
+  int i, n;
+  double t0 = now();
+
+  srandom(kvtest_first_seed + c->childno);
+
+  for(i = 0; !timeout[0]; i++){
+    char key[512], val[512];
+    if (i % CPN == 0)
+      srandom(kvtest_first_seed + c->childno);
+    long x = random();
+    sprintf(key, "%ld", x);
+    sprintf(val, "%ld", x + 1);
+    aput(c, Str(key), Str(val));
+  }
+  n = i;
+
+  checkasync(c, 2);
+
+  double t1 = now();
+  Json result = Json().set("total", (long) (n / (t1 - t0)))
+    .set("puts", n)
+    .set("puts_per_sec", n / (t1 - t0));
+  printf("%s\n", result.unparse().c_str());
+}
+
+// multiple threads simultaneously update the same key.
+// keep track of the winning value.
+// use over2 to make sure it's the same after a crash/restart.
+void
+over1(struct child *c)
+{
+  int ret, iter;
+
+  srandom(kvtest_first_seed + c->childno);
+
+  iter = 0;
+  while(!timeout[0]){
+    char key1[64], key2[64], val1[64], val2[64];
+    time_t xt = time(0);
+    while(xt == time(0))
+      ;
+    sprintf(key1, "%d", iter);
+    sprintf(val1, "%ld", random());
+    put(c, Str(key1), Str(val1));
+    napms(500);
+    ret = get(c, Str(key1), val2, sizeof(val2));
+    always_assert(ret > 0);
+    sprintf(key2, "%d-%d", iter, c->childno);
+    put(c, Str(key2), Str(val2));
+    if(c->childno == 0)
+      printf("%d: %s\n", iter, val2);
+    iter++;
+  }
+  checkasync(c, 2);
+  printf("0\n");
+}
+
+// check each round of over1()
+void
+over2(struct child *c)
+{
+  int iter;
+
+  for(iter = 0; ; iter++){
+    char key1[64], key2[64], val1[64], val2[64];
+    int ret;
+    sprintf(key1, "%d", iter);
+    ret = get(c, Str(key1), val1, sizeof(val1));
+    if(ret == -1)
+      break;
+    sprintf(key2, "%d-%d", iter, c->childno);
+    ret = get(c, Str(key2), val2, sizeof(val2));
+    if(ret == -1)
+      break;
+    if(c->childno == 0)
+      printf("%d: %s\n", iter, val2);
+    always_assert(strcmp(val1, val2) == 0);
+  }
+
+  checkasync(c, 2);
+  fprintf(stderr, "child %d checked %d\n", c->childno, iter);
+  printf("0\n");
+}
+
+// do a bunch of inserts to distinct keys.
+// rec2() checks that a prefix of those inserts are present.
+// meant to be interrupted by a crash/restart.
+void
+rec1(struct child *c)
+{
+  int i;
+  double t0 = now(), t1;
+
+  srandom(kvtest_first_seed + c->childno);
+
+  for(i = 0; !timeout[0]; i++){
+    char key[512], val[512];
+    long x = random();
+    sprintf(key, "%ld-%d-%d", x, i, c->childno);
+    sprintf(val, "%ld", x);
+    aput(c, Str(key), Str(val));
+  }
+  checkasync(c, 2);
+  t1 = now();
+
+  fprintf(stderr, "child %d: done %d %.0f put/s\n",
+          c->childno,
+          i,
+          i / (t1 - t0));
+  printf("%.0f\n", i / (t1 - t0));
+}
+
+void
+rec2(struct child *c)
+{
+  int i;
+
+  srandom(kvtest_first_seed + c->childno);
+
+  for(i = 0; ; i++){
+    char key[512], val[512], wanted[512];
+    long x = random();
+    sprintf(key, "%ld-%d-%d", x, i, c->childno);
+    sprintf(wanted, "%ld", x);
+    int ret = get(c, Str(key), val, sizeof(val));
+    if(ret == -1)
+      break;
+    val[ret] = 0;
+    if(strcmp(val, wanted) != 0){
+      fprintf(stderr, "oops key %s got %s wanted %s\n", key, val, wanted);
+      exit(1);
+    }
+  }
+
+  int i0 = i; // first missing record
+  for(i = i0+1; i < i0 + 10000; i++){
+    char key[512], val[512];
+    long x = random();
+    sprintf(key, "%ld-%d-%d", x, i, c->childno);
+    val[0] = 0;
+    int ret = get(c, Str(key), val, sizeof(val));
+    if(ret != -1){
+      printf("child %d: oops first missing %d but %d present\n",
+             c->childno, i0, i);
+      exit(1);
+    }
+  }
+  checkasync(c, 2);
+
+  fprintf(stderr, "correct prefix of %d records\n", i0);
+  printf("0\n");
+}
+
+// ask server to checkpoint
+void
+cpb(struct child *c)
+{
+    if (c->childno == 0)
+        c->conn->checkpoint(c->childno);
+    checkasync(c, 2);
+}
+
+// mimic the first benchmark from the VoltDB blog:
+//   https://voltdb.com/blog/key-value-benchmarking
+//   https://voltdb.com/blog/key-value-benchmark-faq
+//   http://community.voltdb.com/kvbenchdetails
+//   svn checkout http://svnmirror.voltdb.com/projects/kvbench/trunk
+// 500,000 items: 50-byte key, 12 KB value
+// volt1a creates the DB.
+// volt1b runs the benchmark.
+#define VOLT1N 500000
+#define VOLT1SIZE (12*1024)
+void
+volt1a(struct child *c)
+{
+  int i, j;
+  double t0 = now(), t1;
+  char *val = (char *) malloc(VOLT1SIZE + 1);
+  always_assert(val);
+
+  srandom(kvtest_first_seed + c->childno);
+
+  for(i = 0; i < VOLT1SIZE; i++)
+    val[i] = 'a' + (i % 26);
+  val[VOLT1SIZE] = '\0';
+
+  // XXX insert the keys in a random order to maintain
+  // tree balance.
+  int *keys = (int *) malloc(sizeof(int) * VOLT1N);
+  always_assert(keys);
+  for(i = 0; i < VOLT1N; i++)
+    keys[i] = i;
+  for(i = 0; i < VOLT1N; i++){
+    int x = random() % VOLT1N;
+    int tmp = keys[i];
+    keys[i] = keys[x];
+    keys[x] = tmp;
+  }
+
+  for(i = 0; i < VOLT1N; i++){
+    char key[100];
+    sprintf(key, "%-50d", keys[i]);
+    for(j = 0; j < 20; j++)
+      val[j] = 'a' + (j % 26);
+    sprintf(val, ">%d", keys[i]);
+    int j = strlen(val);
+    val[j] = '<';
+    always_assert(strlen(val) == VOLT1SIZE);
+    always_assert(strlen(key) == 50);
+    always_assert(isdigit(key[0]));
+    aput(c, Str(key), Str(val));
+  }
+  checkasync(c, 2);
+  t1 = now();
+
+  free(val);
+  free(keys);
+
+  Json result = Json().set("total", (long) (i / (t1 - t0)));
+  printf("%s\n", result.unparse().c_str());
+}
+
+// the actual volt1 benchmark.
+// get or update with equal probability.
+// their client pipelines many requests.
+// they use 8 client threads.
+// blog post says, for one server, VoltDB 17000, Cassandra 9740
+// this benchmark ends up being network or disk limited,
+// due to the huge values.
+void
+volt1b(struct child *c)
+{
+  int i, n, j;
+  double t0 = now(), t1;
+  char *wanted = (char *) malloc(VOLT1SIZE + 1);
+  always_assert(wanted);
+
+  for(i = 0; i < VOLT1SIZE; i++)
+    wanted[i] = 'a' + (i % 26);
+  wanted[VOLT1SIZE] = '\0';
+
+  srandom(kvtest_first_seed + c->childno);
+
+  for(i = 0; !timeout[0]; i++){
+    char key[100];
+    int x = random() % VOLT1N;
+    sprintf(key, "%-50d", x);
+    for(j = 0; j < 20; j++)
+      wanted[j] = 'a' + (j % 26);
+    sprintf(wanted, ">%d", x);
+    int j = strlen(wanted);
+    wanted[j] = '<';
+    if(i > 1)
+      checkasync(c, 1); // try to avoid deadlock, only 2 reqs outstanding
+    if((random() % 2) == 0)
+        aget(c, Str(key, 50), Str(wanted, VOLT1SIZE), 0);
+    else
+        aput(c, Str(key, 50), Str(wanted, VOLT1SIZE));
+  }
+  n = i;
+
+  checkasync(c, 2);
+  t1 = now();
+
+  Json result = Json().set("total", (long) (n / (t1 - t0)));
+  printf("%s\n", result.unparse().c_str());
+}
+
+// second VoltDB benchmark.
+// 500,000 pairs, 50-byte key, value is 50 32-bit ints.
+// pick a key, read one int, if odd, write a different int (same key).
+// i'm simulating columns by embedding column name in key: rowname-colname.
+// also the read/modify/write is not atomic.
+// volt2a creates the DB.
+// volt2b runs the benchmark.
+#define VOLT2N 500000
+#define VOLT2INTS 50
+void
+volt2a(struct child *c)
+{
+  int i, j, n = 0;
+  double t0 = now(), t1;
+
+  srandom(kvtest_first_seed + c->childno);
+
+  // XXX insert the keys in a random order to maintain
+  // tree balance.
+  int *keys = (int *) malloc(sizeof(int) * VOLT2N);
+  always_assert(keys);
+  for(i = 0; i < VOLT2N; i++)
+    keys[i] = i;
+  for(i = 0; i < VOLT2N; i++){
+    int x = random() % VOLT2N;
+    int tmp = keys[i];
+    keys[i] = keys[x];
+    keys[x] = tmp;
+  }
+
+  int subkeys[VOLT2INTS];
+  for(i = 0; i < VOLT2INTS; i++)
+    subkeys[i] = i;
+  for(i = 0; i < VOLT2INTS; i++){
+    int x = random() % VOLT2INTS;
+    int tmp = subkeys[i];
+    subkeys[i] = subkeys[x];
+    subkeys[x] = tmp;
+  }
+
+  for(i = 0; i < VOLT2N; i++){
+    for(j = 0; j < VOLT2INTS; j++){
+      char val[32], key[100];
+      int k;
+      sprintf(key, "%d-%d", keys[i], subkeys[j]);
+      for(k = strlen(key); k < 50; k++)
+        key[k] = ' ';
+      key[50] = '\0';
+      sprintf(val, "%ld", random());
+      aput(c, Str(key), Str(val));
+      n++;
+    }
+  }
+  checkasync(c, 2);
+  t1 = now();
+
+  free(keys);
+  Json result = Json().set("total", (long) (n / (t1 - t0)));
+  printf("%s\n", result.unparse().c_str());
+}
+
+// get callback
+void
+volt2b1(struct child *c, struct async *a, bool, const Str &val)
+{
+  int k = atoi(a->key);
+  int v = atoi(val.s);
+  if((v % 2) == 1){
+    char key[100], val[100];
+    sprintf(key, "%d-%ld", k, random() % VOLT2INTS);
+    for (int i = strlen(key); i < 50; i++)
+      key[i] = ' ';
+    sprintf(val, "%ld", random());
+    aput(c, Str(key, 50), Str(val));
+  }
+}
+
+void
+volt2b(struct child *c)
+{
+  int i, n;
+  double t0 = now(), t1;
+  srandom(kvtest_first_seed + c->childno);
+  for(i = 0; !timeout[0]; i++){
+    char key[100];
+    int x = random() % VOLT2N;
+    int y = random() % VOLT2INTS;
+    sprintf(key, "%d-%d", x, y);
+    int j;
+    for(j = strlen(key); j < 50; j++)
+      key[j] = ' ';
+    aget(c, Str(key, 50), Str(), volt2b1);
+  }
+  n = i;
+
+  checkasync(c, 2);
+  t1 = now();
+
+  Json result = Json().set("total", (long) (n / (t1 - t0)));
+  printf("%s\n", result.unparse().c_str());
+}
+
+using std::vector;
+using std::string;
+
+void
+scantest(struct child *c)
+{
+  int i;
+
+  srandom(kvtest_first_seed + c->childno);
+
+  for(i = 100; i < 200; i++){
+    char key[32], val[32];
+    int kl = sprintf(key, "k%04d", i);
+    sprintf(val, "v%04d", i);
+    aput(c, Str(key, kl), Str(val));
+  }
+
+  checkasync(c, 2);
+
+  for(i = 90; i < 210; i++){
+    char key[32];
+    sprintf(key, "k%04d", i);
+    int wanted = random() % 10;
+    c->conn->sendscanwhole(key, wanted, 1);
+    c->conn->flush();
+
+    {
+        const Json& result = c->conn->receive();
+        always_assert(result && result[0] == 1);
+        int n = (result.size() - 2) / 2;
+        if(i <= 200 - wanted){
+            always_assert(n == wanted);
+        } else if(i <= 200){
+            always_assert(n == 200 - i);
+        } else {
+            always_assert(n == 0);
+        }
+        int k0 = (i < 100 ? 100 : i);
+        int j, ki, off = 2;
+        for(j = k0, ki = 0; j < k0 + wanted && j < 200; j++, ki++, off += 2){
+            char xkey[32], xval[32];
+            sprintf(xkey, "k%04d", j);
+            sprintf(xval, "v%04d", j);
+            if (!result[off].as_s().equals(xkey)) {
+                fprintf(stderr, "Assertion failed @%d: strcmp(%s, %s) == 0\n", ki, result[off].as_s().c_str(), xkey);
+                always_assert(0);
+            }
+            always_assert(result[off + 1].as_s().equals(xval));
+        }
+    }
+
+    {
+        sprintf(key, "k%04d-a", i);
+        c->conn->sendscanwhole(key, 1, 1);
+        c->conn->flush();
+
+        const Json& result = c->conn->receive();
+        always_assert(result && result[0] == 1);
+        int n = (result.size() - 2) / 2;
+        if(i >= 100 && i < 199){
+            always_assert(n == 1);
+            sprintf(key, "k%04d", i+1);
+            always_assert(result[2].as_s().equals(key));
+        }
+    }
+  }
+
+  c->conn->sendscanwhole("k015", 10, 1);
+  c->conn->flush();
+
+  const Json& result = c->conn->receive();
+  always_assert(result && result[0] == 1);
+  int n = (result.size() - 2) / 2;
+  always_assert(n == 10);
+  always_assert(result[2].as_s().equals("k0150"));
+  always_assert(result[3].as_s().equals("v0150"));
+
+  fprintf(stderr, "scantest OK\n");
+  printf("0\n");
+}