benchmark silo added
[c11concurrency-benchmarks.git] / silo / persist_test.cc
diff --git a/silo/persist_test.cc b/silo/persist_test.cc
new file mode 100644 (file)
index 0000000..604a739
--- /dev/null
@@ -0,0 +1,1221 @@
+/**
+ * A stand-alone binary which doesn't depend on the system,
+ * used to test the current persistence strategy
+ */
+
+#include <cassert>
+#include <iostream>
+#include <cstdint>
+#include <random>
+#include <vector>
+#include <set>
+#include <atomic>
+#include <thread>
+#include <sstream>
+
+#include <unistd.h>
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <getopt.h>
+#include <time.h>
+
+#include <lz4.h>
+
+#include "macros.h"
+#include "circbuf.h"
+#include "amd64.h"
+#include "record/serializer.h"
+#include "util.h"
+
+using namespace std;
+using namespace util;
+
+struct tidhelpers {
+  // copied from txn_proto2_impl.h
+
+  static const uint64_t NBitsNumber = 24;
+
+  static const size_t CoreBits = NMAXCOREBITS; // allow 2^CoreShift distinct threads
+  static const size_t NMaxCores = NMAXCORES;
+
+  static const uint64_t CoreMask = (NMaxCores - 1);
+
+  static const uint64_t NumIdShift = CoreBits;
+  static const uint64_t NumIdMask = ((((uint64_t)1) << NBitsNumber) - 1) << NumIdShift;
+
+  static const uint64_t EpochShift = CoreBits + NBitsNumber;
+  static const uint64_t EpochMask = ((uint64_t)-1) << EpochShift;
+
+  static inline
+  uint64_t CoreId(uint64_t v)
+  {
+    return v & CoreMask;
+  }
+
+  static inline
+  uint64_t NumId(uint64_t v)
+  {
+    return (v & NumIdMask) >> NumIdShift;
+  }
+
+  static inline
+  uint64_t EpochId(uint64_t v)
+  {
+    return (v & EpochMask) >> EpochShift;
+  }
+
+  static inline
+  uint64_t MakeTid(uint64_t core_id, uint64_t num_id, uint64_t epoch_id)
+  {
+    // some sanity checking
+    static_assert((CoreMask | NumIdMask | EpochMask) == ((uint64_t)-1), "xx");
+    static_assert((CoreMask & NumIdMask) == 0, "xx");
+    static_assert((NumIdMask & EpochMask) == 0, "xx");
+    return (core_id) | (num_id << NumIdShift) | (epoch_id << EpochShift);
+  }
+
+  static uint64_t
+  vecidmax(uint64_t coremax, const vector<uint64_t> &v)
+  {
+    uint64_t ret = NumId(coremax);
+    for (size_t i = 0; i < v.size(); i++)
+      ret = max(ret, NumId(v[i]));
+    return ret;
+  }
+
+  static string
+  Str(uint64_t v)
+  {
+    ostringstream b;
+    b << "[core=" << CoreId(v) << " | n="
+      << NumId(v) << " | epoch="
+      << EpochId(v) << "]";
+    return b.str();
+  }
+
+};
+
+//static void
+//fillstring(std::string &s, size_t t)
+//{
+//  s.clear();
+//  for (size_t i = 0; i < t; i++)
+//    s[i] = (char) i;
+//}
+
+template <typename PRNG>
+static inline void
+fillkey(std::string &s, uint64_t idx, size_t sz, PRNG &prng)
+{
+  s.resize(sz);
+  serializer<uint64_t, false> ser;
+  ser.write((uint8_t *) s.data(), idx);
+}
+
+template <typename PRNG>
+static inline void
+fillvalue(std::string &s, uint64_t idx, size_t sz, PRNG &prng)
+{
+  uniform_int_distribution<uint32_t> dist(0, 10000);
+  s.resize(sz);
+  serializer<uint32_t, false> s_uint32_t;
+  for (size_t i = 0; i < sz; i += sizeof(uint32_t)) {
+    if (i + sizeof(uint32_t) <= sz) {
+      const uint32_t x = dist(prng);
+      s_uint32_t.write((uint8_t *) &s[i], x);
+    }
+  }
+}
+
+/** simulate global database state */
+
+static const size_t g_nrecords = 1000000;
+static const size_t g_ntxns_worker = 1000000;
+static const size_t g_nmax_loggers = 16;
+
+static vector<uint64_t> g_database;
+static atomic<uint64_t> g_ntxns_committed(0);
+static atomic<uint64_t> g_ntxns_written(0);
+static atomic<uint64_t> g_bytes_written[g_nmax_loggers];
+
+static size_t g_nworkers = 1;
+static int g_verbose = 0;
+static int g_fsync_background = 0;
+static size_t g_readset = 30;
+static size_t g_writeset = 16;
+static size_t g_keysize = 8; // in bytes
+static size_t g_valuesize = 32; // in bytes
+
+/** simulation framework */
+
+// all simulations are epoch based
+class database_simulation {
+public:
+  static const unsigned long g_epoch_time_ns = 30000000; /* 30ms in ns */
+
+  database_simulation()
+    : keep_going_(true),
+      epoch_thread_(),
+      epoch_number_(1), // start at 1 so 0 can be fully persistent initially
+      system_sync_epoch_(0)
+  {
+    // XXX: depends on g_nworkers to be set by now
+    for (size_t i = 0; i < g_nworkers; i++)
+      per_thread_epochs_[i]->store(1, memory_order_release);
+    for (size_t i = 0; i < g_nmax_loggers; i++)
+      for (size_t j = 0; j < g_nworkers; j++)
+        per_thread_sync_epochs_[i].epochs_[j].store(0, memory_order_release);
+  }
+
+  virtual ~database_simulation() {}
+
+  virtual void
+  init()
+  {
+    epoch_thread_ = move(thread(&database_simulation::epoch_thread, this));
+  }
+
+  virtual void worker(unsigned id) = 0;
+
+  virtual void logger(const vector<int> &fd,
+                      const vector<vector<unsigned>> &assignments) = 0;
+
+  virtual void
+  terminate()
+  {
+    keep_going_->store(false, memory_order_release);
+    epoch_thread_.join();
+  }
+
+  static bool
+  AssignmentsValid(const vector<vector<unsigned>> &assignments,
+                   unsigned nfds,
+                   unsigned nworkers)
+  {
+    // each worker must be assigned exactly once in the assignment
+    // there must be <= nfds assignments
+
+    if (assignments.size() > nfds)
+      return false;
+
+    set<unsigned> seen;
+    for (auto &assignment : assignments)
+      for (auto w : assignment) {
+        if (seen.count(w) || w >= nworkers)
+          return false;
+        seen.insert(w);
+      }
+
+    return seen.size() == nworkers;
+  }
+
+protected:
+  void
+  epoch_thread()
+  {
+    while (keep_going_->load(memory_order_acquire)) {
+      struct timespec t;
+      t.tv_sec  = g_epoch_time_ns / ONE_SECOND_NS;
+      t.tv_nsec = g_epoch_time_ns % ONE_SECOND_NS;
+      nanosleep(&t, nullptr);
+
+      // make sure all threads are at the current epoch
+      const uint64_t curepoch = epoch_number_->load(memory_order_acquire);
+
+    retry:
+      bool allthere = true;
+      for (size_t i = 0;
+           i < g_nworkers && keep_going_->load(memory_order_acquire);
+           i++) {
+        if (per_thread_epochs_[i]->load(memory_order_acquire) < curepoch) {
+          allthere = false;
+          break;
+        }
+      }
+      if (!keep_going_->load(memory_order_acquire))
+        return;
+      if (!allthere) {
+        nop_pause();
+        goto retry;
+      }
+
+      //cerr << "bumping epoch" << endl;
+      epoch_number_->store(curepoch + 1, memory_order_release); // bump it
+    }
+  }
+
+  aligned_padded_elem<atomic<bool>> keep_going_;
+
+  thread epoch_thread_;
+
+  aligned_padded_elem<atomic<uint64_t>> epoch_number_;
+
+  aligned_padded_elem<atomic<uint64_t>> per_thread_epochs_[NMAXCORES];
+
+  // v = per_thread_sync_epochs_[i].epochs_[j]: logger i has persisted up
+  // through (including) all transactions <= epoch v on core j. since core =>
+  // logger mapping is static, taking:
+  //   min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
+  // yields the entire system's persistent epoch
+  struct {
+    atomic<uint64_t> epochs_[NMAXCORES];
+    CACHE_PADOUT;
+  } per_thread_sync_epochs_[g_nmax_loggers] CACHE_ALIGNED;
+
+  // conservative estimate (<=) for:
+  //   min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
+  aligned_padded_elem<atomic<uint64_t>> system_sync_epoch_;
+};
+
+struct logbuf_header {
+  uint64_t nentries_; // > 0 for all valid log buffers
+  uint64_t last_tid_; // TID of the last commit
+} PACKED;
+
+struct pbuffer {
+  bool io_scheduled_; // has the logger scheduled IO yet?
+  size_t curoff_; // current offset into buf_, either for writing
+                  // or during the dep computation phase
+  size_t remaining_; // number of deps remaining to compute
+  std::string buf_; // the actual buffer, of size g_buffer_size
+
+  inline uint8_t *
+  pointer()
+  {
+    return (uint8_t *) buf_.data() + curoff_;
+  }
+
+  inline logbuf_header *
+  header()
+  {
+    return (logbuf_header *) buf_.data();
+  }
+
+  inline const logbuf_header *
+  header() const
+  {
+    return (const logbuf_header *) buf_.data();
+  }
+};
+
+class onecopy_logbased_simulation : public database_simulation {
+public:
+  static const size_t g_perthread_buffers = 64; // 64 outstanding buffers
+  static const size_t g_buffer_size = (1<<20); // in bytes
+  static const size_t g_horizon_size = (1<<16); // in bytes, for compression only
+
+  static circbuf<pbuffer, g_perthread_buffers> g_all_buffers[NMAXCORES];
+  static circbuf<pbuffer, g_perthread_buffers> g_persist_buffers[NMAXCORES];
+
+protected:
+
+  virtual const uint8_t *
+  read_log_entry(const uint8_t *p, uint64_t &tid,
+                 std::function<void(uint64_t)> readfunctor) = 0;
+
+  virtual uint64_t
+  compute_log_record_space() const = 0;
+
+  virtual void
+  write_log_record(uint8_t *p,
+                   uint64_t tidcommit,
+                   const vector<uint64_t> &readset,
+                   const vector<pair<string, string>> &writeset) = 0;
+
+  virtual void
+  logger_on_io_completion() {}
+
+  virtual bool
+  do_compression() const = 0;
+
+  pbuffer *
+  getbuffer(unsigned id)
+  {
+    // block until we get a buf
+    pbuffer *ret = g_all_buffers[id].deq();
+    ret->io_scheduled_ = false;
+    ret->buf_.assign(g_buffer_size, 0);
+    ret->curoff_ = sizeof(logbuf_header);
+    ret->remaining_ = 0;
+    return ret;
+  }
+
+public:
+  void
+  init() OVERRIDE
+  {
+    database_simulation::init();
+    for (size_t i = 0; i < g_nworkers; i++) {
+      for (size_t j = 0; j < g_perthread_buffers; j++) {
+        struct pbuffer *p = new pbuffer;
+        g_all_buffers[i].enq(p);
+      }
+    }
+  }
+
+private:
+  inline size_t
+  inplace_update_persistent_info(
+      vector<pair<uint64_t, uint64_t>> &outstanding_commits,
+      uint64_t cursyncepoch)
+  {
+    size_t ncommits_synced = 0;
+    // can erase all entries with x.first <= cursyncepoch
+    size_t idx = 0;
+    for (; idx < outstanding_commits.size(); idx++) {
+      if (outstanding_commits[idx].first <= cursyncepoch)
+        ncommits_synced += outstanding_commits[idx].second;
+      else
+        break;
+    }
+
+    // erase entries [0, idx)
+    // XXX: slow
+    outstanding_commits.erase(outstanding_commits.begin(),
+        outstanding_commits.begin() + idx);
+
+    return ncommits_synced;
+  }
+
+  inline pbuffer *
+  ensure_buffer_with_space(unsigned id, pbuffer *cur, size_t space_needed)
+  {
+    if (!cur) {
+      cur = getbuffer(id);
+    } else if (g_buffer_size - cur->curoff_ < space_needed) {
+      g_persist_buffers[id].enq(cur);
+      cur = getbuffer(id);
+    }
+    INVARIANT(cur);
+    INVARIANT(g_buffer_size - cur->curoff_ >= space_needed);
+    return cur;
+  }
+
+  /**
+   * write the horizon from [p, p+sz) into cur, assuming that cur has enough
+   * space. space needed is at least:
+   *   sizeof(uint32_t) + LZ4_compressBound(sz)
+   *
+   * also updates the buffer's headers and offset to reflect the write
+   *
+   * returns the compressed size of the horizon
+   */
+  inline uint64_t
+  write_horizon(void *lz4ctx,
+                const uint8_t *p, uint64_t sz,
+                uint64_t nentries, uint64_t lasttid,
+                pbuffer *cur)
+  {
+#ifdef CHECK_INVARIANTS
+    const uint64_t needed = sizeof(uint32_t) + LZ4_compressBound(sz);
+    INVARIANT(g_buffer_size - cur->curoff_ >= needed);
+#endif
+
+    const int ret = LZ4_compress_heap(
+        lz4ctx,
+        (const char *) p,
+        (char *) cur->pointer() + sizeof(uint32_t),
+        sz);
+
+    INVARIANT(ret >= 0);
+    serializer<uint32_t, false> s_uint32_t;
+    s_uint32_t.write(cur->pointer(), ret);
+    cur->curoff_ += sizeof(uint32_t) + ret;
+    cur->header()->nentries_ += nentries;
+    cur->header()->last_tid_ = lasttid;
+
+    return ret;
+  }
+
+protected:
+  void
+  worker(unsigned id) OVERRIDE
+  {
+    const bool compress = do_compression();
+    uint8_t horizon[g_horizon_size]; // LZ4 looks at 65kb windows
+
+    // where are we in the window, how many elems in this window?
+    size_t horizon_p = 0, horizon_nentries = 0;
+    uint64_t horizon_last_tid = 0; // last committed TID in the horizon
+
+    double cratios = 0.0;
+    unsigned long ncompressions = 0;
+
+    void *lz4ctx = nullptr; // holds a heap-allocated LZ4 hash table
+    if (compress)
+      lz4ctx = LZ4_create();
+
+    mt19937 prng(id);
+
+    // read/write sets are uniform for now
+    uniform_int_distribution<unsigned> dist(0, g_nrecords - 1);
+
+    vector<uint64_t> readset(g_readset);
+    vector<pair<string, string>> writeset(g_writeset);
+    for (auto &pr : writeset) {
+      pr.first.reserve(g_keysize);
+      pr.second.reserve(g_valuesize);
+    }
+
+    struct pbuffer *curbuf = nullptr;
+    uint64_t lasttid = 0,
+             ncommits_currentepoch = 0,
+             ncommits_synced = 0;
+    vector<pair<uint64_t, uint64_t>> outstanding_commits;
+    for (size_t i = 0; i < g_ntxns_worker; i++) {
+
+      // update epoch info
+      const uint64_t lastepoch = per_thread_epochs_[id]->load(memory_order_acquire);
+      const uint64_t curepoch = epoch_number_->load(memory_order_acquire);
+
+      if (lastepoch != curepoch) {
+        // try to sync outstanding commits
+        INVARIANT(curepoch == (lastepoch + 1));
+        const size_t cursyncepoch = system_sync_epoch_->load(memory_order_acquire);
+        ncommits_synced +=
+          inplace_update_persistent_info(outstanding_commits, cursyncepoch);
+
+        // add information about the last epoch
+        outstanding_commits.emplace_back(lastepoch, ncommits_currentepoch);
+        ncommits_currentepoch = 0;
+
+        per_thread_epochs_[id]->store(curepoch, memory_order_release);
+      }
+
+      for (size_t j = 0; j < g_readset; j++)
+        readset[j] = g_database[dist(prng)];
+
+      const uint64_t idmax = tidhelpers::vecidmax(lasttid, readset);
+      // XXX: ignore future epochs for now
+      const uint64_t tidcommit = tidhelpers::MakeTid(id, idmax + 1, curepoch);
+      lasttid = tidcommit;
+
+      for (size_t j = 0; j < g_writeset; j++) {
+        auto idx = dist(prng);
+        g_database[idx] = lasttid;
+        fillkey(writeset[j].first, idx, g_keysize, prng);
+        fillvalue(writeset[j].second, idx, g_valuesize, prng);
+      }
+
+      const uint64_t space_needed = compute_log_record_space();
+      if (compress) {
+        if (horizon_p + space_needed > g_horizon_size) {
+          // need to compress and write horizon
+          curbuf = ensure_buffer_with_space(id, curbuf,
+            sizeof(uint32_t) + LZ4_compressBound(horizon_p));
+
+          const uint64_t compsz =
+            write_horizon(lz4ctx, &horizon[0], horizon_p,
+                          horizon_nentries, horizon_last_tid,
+                          curbuf);
+
+          const double cratio = double(horizon_p) / double(compsz);
+          cratios += cratio;
+          ncompressions++;
+
+          // can reset horizon
+          horizon_p = horizon_nentries = horizon_last_tid = 0;
+        }
+
+        write_log_record(&horizon[0] + horizon_p, tidcommit, readset, writeset);
+        horizon_p += space_needed;
+        horizon_nentries++;
+        horizon_last_tid = tidcommit;
+        ncommits_currentepoch++;
+      } else {
+        curbuf = ensure_buffer_with_space(id, curbuf, space_needed);
+        uint8_t *p = curbuf->pointer();
+        write_log_record(p, tidcommit, readset, writeset);
+        //cerr << "write tidcommit=" << tidhelpers::Str(tidcommit) << endl;
+        curbuf->curoff_ += space_needed;
+        curbuf->header()->nentries_++;
+        curbuf->header()->last_tid_ = tidcommit;
+        ncommits_currentepoch++;
+      }
+    }
+
+    if (compress) {
+      if (horizon_nentries) {
+        curbuf = ensure_buffer_with_space(id, curbuf,
+            sizeof(uint32_t) + LZ4_compressBound(horizon_p));
+
+        const uint64_t compsz =
+          write_horizon(lz4ctx, &horizon[0], horizon_p,
+                        horizon_nentries, horizon_last_tid,
+                        curbuf);
+
+        const double cratio = double(horizon_p) / double(compsz);
+        cratios += cratio;
+        ncompressions++;
+
+        horizon_p = horizon_nentries = horizon_last_tid = 0;
+      }
+      LZ4_free(lz4ctx);
+    }
+
+    if (curbuf) {
+      // XXX: hacky - an agreed upon future epoch for all threads to converge
+      // on upon finishing
+      const uint64_t FutureEpoch = 100000;
+      const uint64_t waitfor = tidhelpers::EpochId(
+          curbuf->header()->last_tid_);
+      INVARIANT(per_thread_epochs_[id]->load(memory_order_acquire) == waitfor);
+      ALWAYS_ASSERT(waitfor < FutureEpoch);
+      curbuf->header()->last_tid_ =
+        tidhelpers::MakeTid(id, 0, FutureEpoch);
+      g_persist_buffers[id].enq(curbuf);
+      outstanding_commits.emplace_back(waitfor, ncommits_currentepoch);
+      //cerr << "worker " << id << " waitfor epoch " << waitfor << endl;
+      // get these commits persisted
+      while (system_sync_epoch_->load(memory_order_acquire) < waitfor)
+        nop_pause();
+      ncommits_synced +=
+        inplace_update_persistent_info(outstanding_commits, waitfor);
+      ALWAYS_ASSERT(outstanding_commits.empty());
+    }
+
+    if (g_verbose && compress)
+      cerr << "Average compression ratio: " << cratios / double(ncompressions) << endl;
+
+    g_ntxns_committed.fetch_add(ncommits_synced, memory_order_release);
+  }
+
+private:
+  void
+  fsyncer(unsigned id, int fd, one_way_post<int> &channel)
+  {
+    for (;;) {
+      int ret;
+      channel.peek(ret);
+      if (ret == -1)
+        return;
+      ret = fdatasync(fd);
+      if (ret == -1) {
+        perror("fdatasync");
+        exit(1);
+      }
+      channel.consume(ret);
+    }
+  }
+
+  void
+  writer(unsigned id, int fd, const vector<unsigned> &assignment)
+  {
+    vector<iovec> iovs(g_nworkers * g_perthread_buffers);
+    vector<pbuffer *> pxs;
+    struct timespec last_io_completed;
+    one_way_post<int> *channel =
+      g_fsync_background ? new one_way_post<int> : nullptr;
+    uint64_t total_nbytes_written = 0,
+             total_txns_written = 0;
+
+    bool sense = false; // cur is at sense, prev is at !sense
+    uint64_t nbytes_written[2], txns_written[2], epoch_prefixes[2][g_nworkers];
+    memset(&nbytes_written[0], 0, sizeof(nbytes_written));
+    memset(&txns_written[0], 0, sizeof(txns_written));
+    memset(&epoch_prefixes[0], 0, sizeof(epoch_prefixes[0]));
+    memset(&epoch_prefixes[1], 0, sizeof(epoch_prefixes[1]));
+
+    clock_gettime(CLOCK_MONOTONIC, &last_io_completed);
+    thread fsync_thread;
+    if (g_fsync_background) {
+      fsync_thread = move(thread(
+            &onecopy_logbased_simulation::fsyncer, this, id, fd, ref(*channel)));
+      fsync_thread.detach();
+    }
+
+    while (keep_going_->load(memory_order_acquire)) {
+
+      // don't allow this loop to proceed less than an epoch's worth of time,
+      // so we can batch IO
+      struct timespec now, diff;
+      clock_gettime(CLOCK_MONOTONIC, &now);
+      timespec_utils::subtract(&now, &last_io_completed, &diff);
+      if (diff.tv_sec == 0 && diff.tv_nsec < long(g_epoch_time_ns)) {
+        // need to sleep it out
+        struct timespec ts;
+        ts.tv_sec = 0;
+        ts.tv_nsec = g_epoch_time_ns - diff.tv_nsec;
+        nanosleep(&ts, nullptr);
+      }
+      clock_gettime(CLOCK_MONOTONIC, &last_io_completed);
+
+      size_t nwritten = 0;
+      nbytes_written[sense] = txns_written[sense] = 0;
+      for (auto idx : assignment) {
+        INVARIANT(idx >= 0 && idx < g_nworkers);
+        g_persist_buffers[idx].peekall(pxs);
+        for (auto px : pxs) {
+          INVARIANT(px);
+          INVARIANT(!px->io_scheduled_);
+          iovs[nwritten].iov_base = (void *) px->buf_.data();
+          iovs[nwritten].iov_len = px->curoff_;
+          nbytes_written[sense] += px->curoff_;
+          px->io_scheduled_ = true;
+          px->curoff_ = sizeof(logbuf_header);
+          px->remaining_ = px->header()->nentries_;
+          txns_written[sense] += px->header()->nentries_;
+          nwritten++;
+          INVARIANT(tidhelpers::CoreId(px->header()->last_tid_) == idx);
+          INVARIANT(epoch_prefixes[sense][idx] <=
+                    tidhelpers::EpochId(px->header()->last_tid_));
+          INVARIANT(tidhelpers::EpochId(px->header()->last_tid_) > 0);
+          epoch_prefixes[sense][idx] =
+            tidhelpers::EpochId(px->header()->last_tid_) - 1;
+        }
+      }
+
+      if (!nwritten) {
+        // XXX: should probably sleep here
+        nop_pause();
+        if (!g_fsync_background || !channel->can_post()) {
+          //cerr << "writer skipping because no work to do" << endl;
+          continue;
+        }
+      }
+
+      //cerr << "writer " << id << " nwritten " << nwritten << endl;
+
+      const ssize_t ret =
+        nwritten ? writev(fd, &iovs[0], nwritten) : 0;
+      if (ret == -1) {
+        perror("writev");
+        exit(1);
+      }
+
+      bool dosense;
+      if (g_fsync_background) {
+        // wait for fsync from the previous write
+        if (nwritten)
+          channel->post(0, true);
+        else
+          INVARIANT(channel->can_post());
+        dosense = !sense;
+      } else {
+        int ret = fdatasync(fd);
+        if (ret == -1) {
+          perror("fdatasync");
+          exit(1);
+        }
+        dosense = sense;
+      }
+
+      // update metadata from previous write
+      for (size_t i = 0; i < g_nworkers; i++) {
+        const uint64_t x0 =
+          per_thread_sync_epochs_[id].epochs_[i].load(memory_order_acquire);
+        const uint64_t x1 = epoch_prefixes[dosense][i];
+        if (x1 > x0)
+          per_thread_sync_epochs_[id].epochs_[i].store(
+              x1, memory_order_release);
+      }
+      total_nbytes_written += nbytes_written[dosense];
+      total_txns_written += txns_written[dosense];
+
+      // bump the sense
+      sense = !sense;
+
+      // return all buffers that have been io_scheduled_ - we can do this as
+      // soon as write returns
+      for (auto idx : assignment) {
+        pbuffer *px;
+        while ((px = g_persist_buffers[idx].peek()) &&
+               px->io_scheduled_) {
+          g_persist_buffers[idx].deq();
+          g_all_buffers[idx].enq(px);
+        }
+      }
+    }
+
+    g_bytes_written[id].store(total_nbytes_written, memory_order_release);
+    g_ntxns_written.fetch_add(total_txns_written, memory_order_release);
+  }
+
+  inline void
+  advance_system_sync_epoch(const vector<vector<unsigned>> &assignments)
+  {
+    uint64_t min_so_far = numeric_limits<uint64_t>::max();
+    for (size_t i = 0; i < assignments.size(); i++)
+      for (auto j : assignments[i])
+        min_so_far =
+          min(per_thread_sync_epochs_[i].epochs_[j].load(memory_order_acquire), min_so_far);
+
+#ifdef CHECK_INVARIANTS
+    const uint64_t syssync = system_sync_epoch_->load(memory_order_acquire);
+    INVARIANT(syssync <= min_so_far);
+#endif
+    system_sync_epoch_->store(min_so_far, memory_order_release);
+  }
+
+public:
+  void
+  logger(const vector<int> &fds,
+         const vector<vector<unsigned>> &assignments_given) OVERRIDE
+  {
+    // compute thread => logger assignment
+    vector<thread> writers;
+    vector<vector<unsigned>> assignments(assignments_given);
+
+    if (assignments.empty()) {
+      // compute assuming homogenous disks
+      if (g_nworkers <= fds.size()) {
+        // each thread gets its own logging worker
+        for (size_t i = 0; i < g_nworkers; i++)
+          assignments.push_back({(unsigned) i});
+      } else {
+        // XXX: currently we assume each logger is equally as fast- we should
+        // adjust ratios accordingly for non-homogenous loggers
+        const size_t threads_per_logger = g_nworkers / fds.size();
+        for (size_t i = 0; i < fds.size(); i++) {
+          assignments.emplace_back(
+            MakeRange<unsigned>(
+                i * threads_per_logger,
+                ((i + 1) == fds.size()) ?
+                  g_nworkers :
+                  (i + 1) * threads_per_logger));
+        }
+      }
+    }
+
+    INVARIANT(AssignmentsValid(assignments, fds.size(), g_nworkers));
+
+    timer tt;
+    for (size_t i = 0; i < assignments.size(); i++)
+      writers.emplace_back(
+        &onecopy_logbased_simulation::writer,
+        this, i, fds[i], ref(assignments[i]));
+    if (g_verbose)
+      cerr << "assignments: " << assignments << endl;
+    while (keep_going_->load(memory_order_acquire)) {
+      // periodically compute which epoch is the persistence epoch,
+      // and update system_sync_epoch_
+
+      struct timespec t;
+      t.tv_sec  = g_epoch_time_ns / ONE_SECOND_NS;
+      t.tv_nsec = g_epoch_time_ns % ONE_SECOND_NS;
+      nanosleep(&t, nullptr);
+
+      advance_system_sync_epoch(assignments);
+    }
+
+    for (auto &t : writers)
+      t.join();
+
+    if (g_verbose) {
+      cerr << "current epoch: " << epoch_number_->load(memory_order_acquire) << endl;
+      cerr << "sync epoch   : " << system_sync_epoch_->load(memory_order_acquire) << endl;
+      const double xsec = tt.lap_ms() / 1000.0;
+      for (size_t i = 0; i < writers.size(); i++)
+        cerr << "writer " << i << " " <<
+          (double(g_bytes_written[i].load(memory_order_acquire)) /
+           double(1UL << 20) /
+           xsec) << " MB/sec" << endl;
+    }
+  }
+
+protected:
+  vector<pbuffer *> pxs_; // just some scratch space
+};
+
+circbuf<pbuffer, onecopy_logbased_simulation::g_perthread_buffers>
+  onecopy_logbased_simulation::g_all_buffers[NMAXCORES];
+circbuf<pbuffer, onecopy_logbased_simulation::g_perthread_buffers>
+  onecopy_logbased_simulation::g_persist_buffers[NMAXCORES];
+
+class explicit_deptracking_simulation : public onecopy_logbased_simulation {
+public:
+
+  /** global state about our persistence calculations */
+
+  // contains the latest TID inclusive, per core, which is (transitively)
+  // persistent. note that the prefix of the DB which is totally persistent is
+  // simply the max of this table.
+  static uint64_t g_persistence_vc[NMAXCORES];
+
+protected:
+
+  bool do_compression() const OVERRIDE { return false; }
+
+  const uint8_t *
+  read_log_entry(const uint8_t *p, uint64_t &tid,
+                 std::function<void(uint64_t)> readfunctor) OVERRIDE
+  {
+    serializer<uint8_t, false> s_uint8_t;
+    serializer<uint64_t, false> s_uint64_t;
+
+    uint8_t readset_sz, writeset_sz, key_sz, value_sz;
+    uint64_t v;
+
+    p = s_uint64_t.read(p, &tid);
+    p = s_uint8_t.read(p, &readset_sz);
+    INVARIANT(size_t(readset_sz) == g_readset);
+    for (size_t i = 0; i < size_t(readset_sz); i++) {
+      p = s_uint64_t.read(p, &v);
+      readfunctor(v);
+    }
+
+    p = s_uint8_t.read(p, &writeset_sz);
+    INVARIANT(size_t(writeset_sz) == g_writeset);
+    for (size_t i = 0; i < size_t(writeset_sz); i++) {
+      p = s_uint8_t.read(p, &key_sz);
+      INVARIANT(size_t(key_sz) == g_keysize);
+      p += size_t(key_sz);
+      p = s_uint8_t.read(p, &value_sz);
+      INVARIANT(size_t(value_sz) == g_valuesize);
+      p += size_t(value_sz);
+    }
+
+    return p;
+  }
+
+  uint64_t
+  compute_log_record_space() const OVERRIDE
+  {
+    // compute how much space we need for this entry
+    uint64_t space_needed = 0;
+
+    // 8 bytes to indicate TID
+    space_needed += sizeof(uint64_t);
+
+    // one byte to indicate # of read deps
+    space_needed += 1;
+
+    // each dep occupies 8 bytes
+    space_needed += g_readset * sizeof(uint64_t);
+
+    // one byte to indicate # of records written
+    space_needed += 1;
+
+    // each record occupies (1 + key_length + 1 + value_length) bytes
+    space_needed += g_writeset * (1 + g_keysize + 1 + g_valuesize);
+
+    return space_needed;
+  }
+
+  void
+  write_log_record(uint8_t *p,
+                   uint64_t tidcommit,
+                   const vector<uint64_t> &readset,
+                   const vector<pair<string, string>> &writeset) OVERRIDE
+  {
+    serializer<uint8_t, false> s_uint8_t;
+    serializer<uint64_t, false> s_uint64_t;
+
+    p = s_uint64_t.write(p, tidcommit);
+    p = s_uint8_t.write(p, readset.size());
+    for (auto t : readset)
+      p = s_uint64_t.write(p, t);
+    p = s_uint8_t.write(p, writeset.size());
+    for (auto &pr : writeset) {
+      p = s_uint8_t.write(p, pr.first.size());
+      memcpy(p, pr.first.data(), pr.first.size()); p += pr.first.size();
+      p = s_uint8_t.write(p, pr.second.size());
+      memcpy(p, pr.second.data(), pr.second.size()); p += pr.second.size();
+    }
+  }
+
+  void
+  logger_on_io_completion() OVERRIDE
+  {
+    ALWAYS_ASSERT(false); // currently broken
+    bool changed = true;
+    while (changed) {
+      changed = false;
+      for (size_t i = 0; i < NMAXCORES; i++) {
+        g_persist_buffers[i].peekall(pxs_);
+        for (auto px : pxs_) {
+          INVARIANT(px);
+          if (!px->io_scheduled_)
+            break;
+
+          INVARIANT(px->remaining_ > 0);
+          INVARIANT(px->curoff_ < g_buffer_size);
+
+          const uint8_t *p = px->pointer();
+          uint64_t committid;
+          bool allsat = true;
+
+          //cerr << "processing buffer " << px << " with curoff_=" << px->curoff_ << endl
+          //     << "  p=" << intptr_t(p) << endl;
+
+          while (px->remaining_ && allsat) {
+            allsat = true;
+            const uint8_t *nextp =
+              read_log_entry(p, committid, [&allsat](uint64_t readdep) {
+                if (!allsat)
+                  return;
+                const uint64_t cid = tidhelpers::CoreId(readdep);
+                if (readdep > g_persistence_vc[cid])
+                  allsat = false;
+              });
+            if (allsat) {
+              //cerr << "committid=" << tidhelpers::Str(committid)
+              //     << ", g_persistence_vc=" << tidhelpers::Str(g_persistence_vc[i])
+              //     << endl;
+              INVARIANT(tidhelpers::CoreId(committid) == i);
+              INVARIANT(g_persistence_vc[i] < committid);
+              g_persistence_vc[i] = committid;
+              changed = true;
+              p = nextp;
+              px->remaining_--;
+              px->curoff_ = intptr_t(p) - intptr_t(px->buf_.data());
+              g_ntxns_committed++;
+            } else {
+              // done, no further entries will be satisfied
+            }
+          }
+
+          if (allsat) {
+            INVARIANT(px->remaining_ == 0);
+            // finished entire buffer
+            struct pbuffer *pxcheck = g_persist_buffers[i].deq();
+            if (pxcheck != px)
+              INVARIANT(false);
+            g_all_buffers[i].enq(px);
+            //cerr << "buffer flused at g_persistence_vc=" << tidhelpers::Str(g_persistence_vc[i]) << endl;
+          } else {
+            INVARIANT(px->remaining_ > 0);
+            break; // cannot process core's list any further
+          }
+        }
+      }
+    }
+  }
+
+};
+
+uint64_t explicit_deptracking_simulation::g_persistence_vc[NMAXCORES] = {0};
+
+class epochbased_simulation : public onecopy_logbased_simulation {
+public:
+  epochbased_simulation(bool compress)
+    : compress_(compress)
+  {
+  }
+
+protected:
+  bool do_compression() const OVERRIDE { return compress_; }
+
+protected:
+  const uint8_t *
+  read_log_entry(const uint8_t *p, uint64_t &tid,
+                 std::function<void(uint64_t)> readfunctor) OVERRIDE
+  {
+    serializer<uint8_t, false> s_uint8_t;
+    serializer<uint64_t, false> s_uint64_t;
+
+    uint8_t writeset_sz, key_sz, value_sz;
+
+    p = s_uint64_t.read(p, &tid);
+    p = s_uint8_t.read(p, &writeset_sz);
+    INVARIANT(size_t(writeset_sz) == g_writeset);
+    for (size_t i = 0; i < size_t(writeset_sz); i++) {
+      p = s_uint8_t.read(p, &key_sz);
+      INVARIANT(size_t(key_sz) == g_keysize);
+      p += size_t(key_sz);
+      p = s_uint8_t.read(p, &value_sz);
+      INVARIANT(size_t(value_sz) == g_valuesize);
+      p += size_t(value_sz);
+    }
+
+    return p;
+  }
+
+  uint64_t
+  compute_log_record_space() const OVERRIDE
+  {
+    // compute how much space we need for this entry
+    uint64_t space_needed = 0;
+
+    // 8 bytes to indicate TID
+    space_needed += sizeof(uint64_t);
+
+    // one byte to indicate # of records written
+    space_needed += 1;
+
+    // each record occupies (1 + key_length + 1 + value_length) bytes
+    space_needed += g_writeset * (1 + g_keysize + 1 + g_valuesize);
+
+    return space_needed;
+  }
+
+  void
+  write_log_record(uint8_t *p,
+                   uint64_t tidcommit,
+                   const vector<uint64_t> &readset,
+                   const vector<pair<string, string>> &writeset) OVERRIDE
+  {
+    serializer<uint8_t, false> s_uint8_t;
+    serializer<uint64_t, false> s_uint64_t;
+
+    p = s_uint64_t.write(p, tidcommit);
+    p = s_uint8_t.write(p, writeset.size());
+    for (auto &pr : writeset) {
+      p = s_uint8_t.write(p, pr.first.size());
+      memcpy(p, pr.first.data(), pr.first.size()); p += pr.first.size();
+      p = s_uint8_t.write(p, pr.second.size());
+      memcpy(p, pr.second.data(), pr.second.size()); p += pr.second.size();
+    }
+  }
+
+private:
+  bool compress_;
+};
+
+int
+main(int argc, char **argv)
+{
+  string strategy = "epoch";
+  vector<string> logfiles;
+  vector<vector<unsigned>> assignments;
+
+  while (1) {
+    static struct option long_options[] =
+    {
+      {"verbose"     , no_argument       , &g_verbose , 1}   ,
+      {"fsync-back"  , no_argument       , &g_fsync_background, 1},
+      {"num-threads" , required_argument , 0          , 't'} ,
+      {"strategy"    , required_argument , 0          , 's'} ,
+      {"readset"     , required_argument , 0          , 'r'} ,
+      {"writeset"    , required_argument , 0          , 'w'} ,
+      {"keysize"     , required_argument , 0          , 'k'} ,
+      {"valuesize"   , required_argument , 0          , 'v'} ,
+      {"logfile"     , required_argument , 0          , 'l'} ,
+      {"assignment"  , required_argument , 0          , 'a'} ,
+      {0, 0, 0, 0}
+    };
+    int option_index = 0;
+    int c = getopt_long(argc, argv, "t:s:r:w:k:v:l:a:", long_options, &option_index);
+    if (c == -1)
+      break;
+
+    switch (c) {
+    case 0:
+      if (long_options[option_index].flag != 0)
+        break;
+      abort();
+      break;
+
+    case 't':
+      g_nworkers = strtoul(optarg, nullptr, 10);
+      break;
+
+    case 's':
+      strategy = optarg;
+      break;
+
+    case 'r':
+      g_readset = strtoul(optarg, nullptr, 10);
+      break;
+
+    case 'w':
+      g_writeset = strtoul(optarg, nullptr, 10);
+      break;
+
+    case 'k':
+      g_keysize = strtoul(optarg, nullptr, 10);
+      break;
+
+    case 'v':
+      g_valuesize = strtoul(optarg, nullptr, 10);
+      break;
+
+    case 'l':
+      logfiles.emplace_back(optarg);
+      break;
+
+    case 'a':
+      assignments.emplace_back(
+          ParseCSVString<unsigned, RangeAwareParser<unsigned>>(optarg));
+      break;
+
+    case '?':
+      /* getopt_long already printed an error message. */
+      exit(1);
+
+    default:
+      abort();
+    }
+  }
+  ALWAYS_ASSERT(g_nworkers >= 1);
+  ALWAYS_ASSERT(g_readset >= 0);
+  ALWAYS_ASSERT(g_writeset > 0);
+  ALWAYS_ASSERT(g_keysize > 0);
+  ALWAYS_ASSERT(g_valuesize >= 0);
+  ALWAYS_ASSERT(!logfiles.empty());
+  ALWAYS_ASSERT(logfiles.size() <= g_nmax_loggers);
+  ALWAYS_ASSERT(
+      assignments.empty() ||
+      database_simulation::AssignmentsValid(
+        assignments, logfiles.size(), g_nworkers));
+
+  if (g_verbose)
+    cerr << "{nworkers=" << g_nworkers
+         << ", readset=" << g_readset
+         << ", writeset=" << g_writeset
+         << ", keysize=" << g_keysize
+         << ", valuesize=" << g_valuesize
+         << ", logfiles=" << logfiles
+         << ", strategy=" << strategy
+         << ", fsync_background=" << g_fsync_background
+         << ", assignments=" << assignments
+         << "}" << endl;
+
+  if (strategy != "deptracking" &&
+      strategy != "epoch" &&
+      strategy != "epoch-compress")
+    ALWAYS_ASSERT(false);
+
+  g_database.resize(g_nrecords); // all start at TID=0
+
+  vector<int> fds;
+  for (auto &fname : logfiles) {
+    int fd = open(fname.c_str(), O_CREAT|O_WRONLY|O_TRUNC, 0664);
+    if (fd == -1) {
+      perror("open");
+      return 1;
+    }
+    fds.push_back(fd);
+  }
+
+  unique_ptr<database_simulation> sim;
+  if (strategy == "deptracking")
+    sim.reset(new explicit_deptracking_simulation);
+  else if (strategy == "epoch")
+    sim.reset(new epochbased_simulation(false));
+  else if (strategy == "epoch-compress")
+    sim.reset(new epochbased_simulation(true));
+  else
+    ALWAYS_ASSERT(false);
+  sim->init();
+
+  thread logger_thread(
+      &database_simulation::logger, sim.get(), fds, ref(assignments));
+
+  vector<thread> workers;
+  util::timer tt, tt1;
+  for (size_t i = 0; i < g_nworkers; i++)
+    workers.emplace_back(&database_simulation::worker, sim.get(), i);
+  for (auto &p: workers)
+    p.join();
+  sim->terminate();
+  logger_thread.join();
+
+  const double ntxns_committed = g_ntxns_committed.load();
+  const double xsec = tt.lap_ms() / 1000.0;
+  const double rate = double(ntxns_committed) / xsec;
+  if (g_verbose) {
+    cerr << "txns commited rate: " << rate << " txns/sec" << endl;
+    cerr << "  (" << size_t(ntxns_committed) << " in " << xsec << " sec)" << endl;
+
+    const double ntxns_written = g_ntxns_written.load();
+    const double rate1 = double(ntxns_written) / xsec;
+    cerr << "txns written rate: " << rate1 << " txns/sec" << endl;
+    cerr << "  (" << size_t(ntxns_written) << " in " << xsec << " sec)" << endl;
+  } else {
+    cout << rate << endl;
+  }
+
+  return 0;
+}