benchmark silo added
[c11concurrency-benchmarks.git] / silo / txn_proto2_impl.h
diff --git a/silo/txn_proto2_impl.h b/silo/txn_proto2_impl.h
new file mode 100644 (file)
index 0000000..83021ab
--- /dev/null
@@ -0,0 +1,1265 @@
+#ifndef _NDB_TXN_PROTO2_IMPL_H_
+#define _NDB_TXN_PROTO2_IMPL_H_
+
+#include <iostream>
+#include <atomic>
+#include <vector>
+#include <set>
+
+#include <lz4.h>
+
+#include "txn.h"
+#include "txn_impl.h"
+#include "txn_btree.h"
+#include "macros.h"
+#include "circbuf.h"
+#include "spinbarrier.h"
+#include "record/serializer.h"
+
+// forward decl
+template <typename Traits> class transaction_proto2;
+template <template <typename> class Transaction>
+  class txn_epoch_sync;
+
+// the system has a single logging subsystem (composed of multiple lgogers)
+// NOTE: currently, the persistence epoch is tied 1:1 with the ticker's epoch
+class txn_logger {
+  friend class transaction_proto2_static;
+  template <typename T>
+    friend class transaction_proto2;
+  // XXX: should only allow txn_epoch_sync<transaction_proto2> as friend
+  template <template <typename> class T>
+    friend class txn_epoch_sync;
+public:
+
+  static const size_t g_nmax_loggers = 16;
+  static const size_t g_perthread_buffers = 256; // 256 outstanding buffers
+  static const size_t g_buffer_size = (1<<20); // in bytes
+  static const size_t g_horizon_buffer_size = 2 * (1<<16); // in bytes
+  static const size_t g_max_lag_epochs = 128; // cannot lag more than 128 epochs
+  static const bool   g_pin_loggers_to_numa_nodes = false;
+
+  static inline bool
+  IsPersistenceEnabled()
+  {
+    return g_persist;
+  }
+
+  static inline bool
+  IsCompressionEnabled()
+  {
+    return g_use_compression;
+  }
+
+  // init the logging subsystem.
+  //
+  // should only be called ONCE is not thread-safe.  if assignments_used is not
+  // null, then fills it with a copy of the assignment actually computed
+  static void Init(
+      size_t nworkers,
+      const std::vector<std::string> &logfiles,
+      const std::vector<std::vector<unsigned>> &assignments_given,
+      std::vector<std::vector<unsigned>> *assignments_used = nullptr,
+      bool call_fsync = true,
+      bool use_compression = false,
+      bool fake_writes = false);
+
+  struct logbuf_header {
+    uint64_t nentries_; // > 0 for all valid log buffers
+    uint64_t last_tid_; // TID of the last commit
+  } PACKED;
+
+  struct pbuffer {
+    uint64_t earliest_start_us_; // start time of the earliest txn
+    bool io_scheduled_; // has the logger scheduled IO yet?
+
+    unsigned curoff_; // current offset into buf_ for writing
+
+    const unsigned core_id_; // which core does this pbuffer belong to?
+
+    const unsigned buf_sz_;
+
+    // must be last field
+    uint8_t buf_start_[0];
+
+    // to allocate a pbuffer, use placement new:
+    //    const size_t bufsz = ...;
+    //    char *p = malloc(sizeof(pbuffer) + bufsz);
+    //    pbuffer *pb = new (p) pbuffer(core_id, bufsz);
+    //
+    // NOTE: it is not necessary to call the destructor for pbuffer, since
+    // it only contains PODs
+    pbuffer(unsigned core_id, unsigned buf_sz)
+      : core_id_(core_id), buf_sz_(buf_sz)
+    {
+      INVARIANT(((char *)this) + sizeof(*this) == (char *) &buf_start_[0]);
+      INVARIANT(buf_sz > sizeof(logbuf_header));
+      reset();
+    }
+
+    pbuffer(const pbuffer &) = delete;
+    pbuffer &operator=(const pbuffer &) = delete;
+    pbuffer(pbuffer &&) = delete;
+
+    inline void
+    reset()
+    {
+      earliest_start_us_ = 0;
+      io_scheduled_ = false;
+      curoff_ = sizeof(logbuf_header);
+      NDB_MEMSET(&buf_start_[0], 0, buf_sz_);
+    }
+
+    inline uint8_t *
+    pointer()
+    {
+      INVARIANT(curoff_ >= sizeof(logbuf_header));
+      INVARIANT(curoff_ <= buf_sz_);
+      return &buf_start_[0] + curoff_;
+    }
+
+    inline uint8_t *
+    datastart()
+    {
+      return &buf_start_[0] + sizeof(logbuf_header);
+    }
+
+    inline size_t
+    datasize() const
+    {
+      INVARIANT(curoff_ >= sizeof(logbuf_header));
+      INVARIANT(curoff_ <= buf_sz_);
+      return curoff_ - sizeof(logbuf_header);
+    }
+
+    inline logbuf_header *
+    header()
+    {
+      return reinterpret_cast<logbuf_header *>(&buf_start_[0]);
+    }
+
+    inline const logbuf_header *
+    header() const
+    {
+      return reinterpret_cast<const logbuf_header *>(&buf_start_[0]);
+    }
+
+    inline size_t
+    space_remaining() const
+    {
+      INVARIANT(curoff_ >= sizeof(logbuf_header));
+      INVARIANT(curoff_ <= buf_sz_);
+      return buf_sz_ - curoff_;
+    }
+
+    inline bool
+    can_hold_tid(uint64_t tid) const;
+  } PACKED;
+
+  static bool
+  AssignmentsValid(const std::vector<std::vector<unsigned>> &assignments,
+                   unsigned nfds,
+                   unsigned nworkers)
+  {
+    // each worker must be assigned exactly once in the assignment
+    // there must be <= nfds assignments
+
+    if (assignments.size() > nfds)
+      return false;
+
+    std::set<unsigned> seen;
+    for (auto &assignment : assignments)
+      for (auto w : assignment) {
+        if (seen.count(w) || w >= nworkers)
+          return false;
+        seen.insert(w);
+      }
+
+    return seen.size() == nworkers;
+  }
+
+  typedef circbuf<pbuffer, g_perthread_buffers> pbuffer_circbuf;
+
+  static std::tuple<uint64_t, uint64_t, double>
+  compute_ntxns_persisted_statistics();
+
+  // purge counters from each thread about the number of
+  // persisted txns
+  static void
+  clear_ntxns_persisted_statistics();
+
+  // wait until the logging system appears to be idle.
+  //
+  // note that this isn't a guarantee, just a best effort attempt
+  static void
+  wait_for_idle_state();
+
+  // waits until the epoch on invocation time is persisted
+  static void
+  wait_until_current_point_persisted();
+
+private:
+
+  // data structures
+
+  struct epoch_array {
+    // don't use percore<std::atomic<uint64_t>> because we don't want padding
+    std::atomic<uint64_t> epochs_[NMAXCORES];
+    std::atomic<uint64_t> dummy_work_; // so we can do some fake work
+    CACHE_PADOUT;
+  };
+
+  struct persist_ctx {
+    bool init_;
+
+    void *lz4ctx_;     // for compression
+    pbuffer *horizon_; // for compression
+
+    circbuf<pbuffer, g_perthread_buffers> all_buffers_;     // logger pushes to core
+    circbuf<pbuffer, g_perthread_buffers> persist_buffers_; // core pushes to logger
+
+    persist_ctx() : init_(false), lz4ctx_(nullptr), horizon_(nullptr) {}
+  };
+
+  // context per one epoch
+  struct persist_stats {
+    // how many txns this thread has persisted in total
+    std::atomic<uint64_t> ntxns_persisted_;
+
+    // how many txns have been pushed to the logger (but not necessarily persisted)
+    std::atomic<uint64_t> ntxns_pushed_;
+
+    // committed (but not necessarily pushed, nor persisted)
+    std::atomic<uint64_t> ntxns_committed_;
+
+    // sum of all latencies (divid by ntxns_persisted_ to get avg latency in
+    // us) for *persisted* txns (is conservative)
+    std::atomic<uint64_t> latency_numer_;
+
+    // per last g_max_lag_epochs information
+    struct per_epoch_stats {
+      std::atomic<uint64_t> ntxns_;
+      std::atomic<uint64_t> earliest_start_us_;
+
+      per_epoch_stats() : ntxns_(0), earliest_start_us_(0) {}
+    } d_[g_max_lag_epochs];
+
+    persist_stats() :
+      ntxns_persisted_(0), ntxns_pushed_(0),
+      ntxns_committed_(0), latency_numer_(0) {}
+  };
+
+  // helpers
+
+  static void
+  advance_system_sync_epoch(
+      const std::vector<std::vector<unsigned>> &assignments);
+
+  // makes copy on purpose
+  static void writer(
+      unsigned id, int fd,
+      std::vector<unsigned> assignment);
+
+  static void persister(
+      std::vector<std::vector<unsigned>> assignments);
+
+  enum InitMode {
+    INITMODE_NONE, // no initialization
+    INITMODE_REG,  // just use malloc() to init buffers
+    INITMODE_RCU,  // try to use the RCU numa aware allocator
+  };
+
+  static inline persist_ctx &
+  persist_ctx_for(uint64_t core_id, InitMode imode)
+  {
+    INVARIANT(core_id < g_persist_ctxs.size());
+    persist_ctx &ctx = g_persist_ctxs[core_id];
+    if (unlikely(!ctx.init_ && imode != INITMODE_NONE)) {
+      size_t needed = g_perthread_buffers * (sizeof(pbuffer) + g_buffer_size);
+      if (IsCompressionEnabled())
+        needed += size_t(LZ4_create_size()) +
+          sizeof(pbuffer) + g_horizon_buffer_size;
+      char *mem =
+        (imode == INITMODE_REG) ?
+          (char *) malloc(needed) :
+          (char *) rcu::s_instance.alloc_static(needed);
+      if (IsCompressionEnabled()) {
+        ctx.lz4ctx_ = mem;
+        mem += LZ4_create_size();
+        ctx.horizon_ = new (mem) pbuffer(core_id, g_horizon_buffer_size);
+        mem += sizeof(pbuffer) + g_horizon_buffer_size;
+      }
+      for (size_t i = 0; i < g_perthread_buffers; i++) {
+        ctx.all_buffers_.enq(new (mem) pbuffer(core_id, g_buffer_size));
+        mem += sizeof(pbuffer) + g_buffer_size;
+      }
+      ctx.init_ = true;
+    }
+    return ctx;
+  }
+
+  // static state
+
+  static bool g_persist; // whether or not logging is enabled
+
+  static bool g_call_fsync; // whether or not fsync() needs to be called
+                            // in order to be considered durable
+
+  static bool g_use_compression; // whether or not to compress log buffers
+
+  static bool g_fake_writes; // whether or not to fake doing writes (to measure
+                             // pure overhead of disk)
+
+  static size_t g_nworkers; // assignments are computed based on g_nworkers
+                            // but a logger responsible for core i is really
+                            // responsible for cores i + k * g_nworkers, for k
+                            // >= 0
+
+  // v = per_thread_sync_epochs_[i].epochs_[j]: logger i has persisted up
+  // through (including) all transactions <= epoch v on core j. since core =>
+  // logger mapping is static, taking:
+  //   min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
+  // yields the entire system's persistent epoch
+  static epoch_array
+    per_thread_sync_epochs_[g_nmax_loggers] CACHE_ALIGNED;
+
+  // conservative estimate (<=) for:
+  //   min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
+  static util::aligned_padded_elem<std::atomic<uint64_t>>
+    system_sync_epoch_ CACHE_ALIGNED;
+
+  static percore<persist_ctx> g_persist_ctxs CACHE_ALIGNED;
+
+  static percore<persist_stats> g_persist_stats CACHE_ALIGNED;
+
+  // counters
+
+  static event_counter g_evt_log_buffer_epoch_boundary;
+  static event_counter g_evt_log_buffer_out_of_space;
+  static event_counter g_evt_log_buffer_bytes_before_compress;
+  static event_counter g_evt_log_buffer_bytes_after_compress;
+  static event_counter g_evt_logger_writev_limit_met;
+  static event_counter g_evt_logger_max_lag_wait;
+  static event_avg_counter g_evt_avg_log_entry_ntxns;
+  static event_avg_counter g_evt_avg_log_buffer_compress_time_us;
+  static event_avg_counter g_evt_avg_logger_bytes_per_writev;
+  static event_avg_counter g_evt_avg_logger_bytes_per_sec;
+};
+
+static inline std::ostream &
+operator<<(std::ostream &o, txn_logger::logbuf_header &hdr)
+{
+  o << "{nentries_=" << hdr.nentries_ << ", last_tid_="
+    << g_proto_version_str(hdr.last_tid_) << "}";
+  return o;
+}
+
+class transaction_proto2_static {
+public:
+
+  // NOTE:
+  // each epoch is tied (1:1) to the ticker subsystem's tick. this is the
+  // speed of the persistence layer.
+  //
+  // however, read only txns and GC are tied to multiples of the ticker
+  // subsystem's tick
+
+#ifdef CHECK_INVARIANTS
+  static const uint64_t ReadOnlyEpochMultiplier = 10; /* 10 * 1 ms */
+#else
+  static const uint64_t ReadOnlyEpochMultiplier = 25; /* 25 * 40 ms */
+  static_assert(ticker::tick_us * ReadOnlyEpochMultiplier == 1000000, "");
+#endif
+
+  static_assert(ReadOnlyEpochMultiplier >= 1, "XX");
+
+  static const uint64_t ReadOnlyEpochUsec =
+    ticker::tick_us * ReadOnlyEpochMultiplier;
+
+  static inline uint64_t constexpr
+  to_read_only_tick(uint64_t epoch_tick)
+  {
+    return epoch_tick / ReadOnlyEpochMultiplier;
+  }
+
+  // in this protocol, the version number is:
+  // (note that for tid_t's, the top bit is reserved and
+  // *must* be set to zero
+  //
+  // [ core  | number |  epoch | reserved ]
+  // [ 0..9  | 9..33  | 33..63 |  63..64  ]
+
+  static inline ALWAYS_INLINE
+  uint64_t CoreId(uint64_t v)
+  {
+    return v & CoreMask;
+  }
+
+  static inline ALWAYS_INLINE
+  uint64_t NumId(uint64_t v)
+  {
+    return (v & NumIdMask) >> NumIdShift;
+  }
+
+  static inline ALWAYS_INLINE
+  uint64_t EpochId(uint64_t v)
+  {
+    return (v & EpochMask) >> EpochShift;
+  }
+
+  // XXX(stephentu): HACK
+  static void
+  wait_an_epoch()
+  {
+    INVARIANT(!rcu::s_instance.in_rcu_region());
+    const uint64_t e = to_read_only_tick(
+        ticker::s_instance.global_last_tick_exclusive());
+    if (!e) {
+      std::cerr << "wait_an_epoch(): consistent reads happening in e-1, but e=0 so special case"
+                << std::endl;
+    } else {
+      std::cerr << "wait_an_epoch(): consistent reads happening in e-1: "
+                << (e-1) << std::endl;
+    }
+    while (to_read_only_tick(ticker::s_instance.global_last_tick_exclusive()) == e)
+      nop_pause();
+    COMPILER_MEMORY_FENCE;
+  }
+
+  static uint64_t
+  ComputeReadOnlyTid(uint64_t global_tick_ex)
+  {
+    const uint64_t a = (global_tick_ex / ReadOnlyEpochMultiplier);
+    const uint64_t b = a * ReadOnlyEpochMultiplier;
+
+    // want to read entries <= b-1, special casing for b=0
+    if (!b)
+      return MakeTid(0, 0, 0);
+    else
+      return MakeTid(CoreMask, NumIdMask >> NumIdShift, b - 1);
+  }
+
+  static const uint64_t NBitsNumber = 24;
+
+  // XXX(stephentu): need to implement core ID recycling
+  static const size_t CoreBits = NMAXCOREBITS; // allow 2^CoreShift distinct threads
+  static const size_t NMaxCores = NMAXCORES;
+
+  static const uint64_t CoreMask = (NMaxCores - 1);
+
+  static const uint64_t NumIdShift = CoreBits;
+  static const uint64_t NumIdMask = ((((uint64_t)1) << NBitsNumber) - 1) << NumIdShift;
+
+  static const uint64_t EpochShift = CoreBits + NBitsNumber;
+  // since the reserve bit is always zero, we don't need a special mask
+  static const uint64_t EpochMask = ((uint64_t)-1) << EpochShift;
+
+  static inline ALWAYS_INLINE
+  uint64_t MakeTid(uint64_t core_id, uint64_t num_id, uint64_t epoch_id)
+  {
+    // some sanity checking
+    static_assert((CoreMask | NumIdMask | EpochMask) == ((uint64_t)-1), "xx");
+    static_assert((CoreMask & NumIdMask) == 0, "xx");
+    static_assert((NumIdMask & EpochMask) == 0, "xx");
+    return (core_id) | (num_id << NumIdShift) | (epoch_id << EpochShift);
+  }
+
+  static inline void
+  set_hack_status(bool hack_status)
+  {
+    g_hack->status_ = hack_status;
+  }
+
+  static inline bool
+  get_hack_status()
+  {
+    return g_hack->status_;
+  }
+
+  // thread-safe, can be called many times
+  static void InitGC();
+
+  static void PurgeThreadOutstandingGCTasks();
+
+#ifdef PROTO2_CAN_DISABLE_GC
+  static inline bool
+  IsGCEnabled()
+  {
+    return g_flags->g_gc_init.load(std::memory_order_acquire);
+  }
+#endif
+
+#ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
+  static void
+  DisableSnapshots()
+  {
+    g_flags->g_disable_snapshots.store(true, std::memory_order_release);
+  }
+  static inline bool
+  IsSnapshotsEnabled()
+  {
+    return !g_flags->g_disable_snapshots.load(std::memory_order_acquire);
+  }
+#endif
+
+protected:
+  struct delete_entry {
+#ifdef CHECK_INVARIANTS
+    dbtuple *tuple_ahead_;
+    uint64_t trigger_tid_;
+#endif
+
+    dbtuple *tuple_;
+    marked_ptr<std::string> key_;
+    concurrent_btree *btr_;
+
+    delete_entry()
+      :
+#ifdef CHECK_INVARIANTS
+        tuple_ahead_(nullptr),
+        trigger_tid_(0),
+#endif
+        tuple_(),
+        key_(),
+        btr_(nullptr) {}
+
+    delete_entry(dbtuple *tuple_ahead,
+                 uint64_t trigger_tid,
+                 dbtuple *tuple,
+                 const marked_ptr<std::string> &key,
+                 concurrent_btree *btr)
+      :
+#ifdef CHECK_INVARIANTS
+        tuple_ahead_(tuple_ahead),
+        trigger_tid_(trigger_tid),
+#endif
+        tuple_(tuple),
+        key_(key),
+        btr_(btr) {}
+
+    inline dbtuple *
+    tuple()
+    {
+      return tuple_;
+    }
+  };
+
+  typedef basic_px_queue<delete_entry, 4096> px_queue;
+
+  struct threadctx {
+    uint64_t last_commit_tid_;
+    unsigned last_reaped_epoch_;
+#ifdef ENABLE_EVENT_COUNTERS
+    uint64_t last_reaped_timestamp_us_;
+#endif
+    px_queue queue_;
+    px_queue scratch_;
+    std::deque<std::string *> pool_;
+    threadctx() :
+        last_commit_tid_(0)
+      , last_reaped_epoch_(0)
+#ifdef ENABLE_EVENT_COUNTERS
+      , last_reaped_timestamp_us_(0)
+#endif
+    {
+      ALWAYS_ASSERT(((uintptr_t)this % CACHELINE_SIZE) == 0);
+      queue_.alloc_freelist(rcu::NQueueGroups);
+      scratch_.alloc_freelist(rcu::NQueueGroups);
+    }
+  };
+
+  static void
+  clean_up_to_including(threadctx &ctx, uint64_t ro_tick_geq);
+
+  // helper methods
+  static inline txn_logger::pbuffer *
+  wait_for_head(txn_logger::pbuffer_circbuf &pull_buf)
+  {
+    // XXX(stephentu): spinning for now
+    txn_logger::pbuffer *px;
+    while (unlikely(!(px = pull_buf.peek()))) {
+      nop_pause();
+      ++g_evt_worker_thread_wait_log_buffer;
+    }
+    INVARIANT(!px->io_scheduled_);
+    return px;
+  }
+
+  // pushes horizon to the front entry of pull_buf, pushing
+  // to push_buf if necessary
+  //
+  // horizon is reset after push_horizon_to_buffer() returns
+  //
+  // returns the number of txns pushed from buffer to *logger*
+  // (if doing so was necessary)
+  static inline size_t
+  push_horizon_to_buffer(txn_logger::pbuffer *horizon,
+                         void *lz4ctx,
+                         txn_logger::pbuffer_circbuf &pull_buf,
+                         txn_logger::pbuffer_circbuf &push_buf)
+  {
+    INVARIANT(txn_logger::IsCompressionEnabled());
+    if (unlikely(!horizon->header()->nentries_))
+      return 0;
+    INVARIANT(horizon->datasize());
+
+    size_t ntxns_pushed_to_logger = 0;
+
+    // horizon out of space- try to push horizon to buffer
+    txn_logger::pbuffer *px = wait_for_head(pull_buf);
+    const uint64_t compressed_space_needed =
+      sizeof(uint32_t) + LZ4_compressBound(horizon->datasize());
+
+    bool buffer_cond = false;
+    if (px->space_remaining() < compressed_space_needed ||
+        (buffer_cond = !px->can_hold_tid(horizon->header()->last_tid_))) {
+      // buffer out of space- push buffer to logger
+      INVARIANT(px->header()->nentries_);
+      ntxns_pushed_to_logger = px->header()->nentries_;
+      txn_logger::pbuffer *px1 = pull_buf.deq();
+      INVARIANT(px == px1);
+      push_buf.enq(px1);
+      px = wait_for_head(pull_buf);
+      if (buffer_cond)
+        ++txn_logger::g_evt_log_buffer_epoch_boundary;
+      else
+        ++txn_logger::g_evt_log_buffer_out_of_space;
+    }
+
+    INVARIANT(px->space_remaining() >= compressed_space_needed);
+    if (!px->header()->nentries_)
+      px->earliest_start_us_ = horizon->earliest_start_us_;
+    px->header()->nentries_ += horizon->header()->nentries_;
+    px->header()->last_tid_  = horizon->header()->last_tid_;
+
+#ifdef ENABLE_EVENT_COUNTERS
+    util::timer tt;
+#endif
+    const int ret = LZ4_compress_heap_limitedOutput(
+        lz4ctx,
+        (const char *) horizon->datastart(),
+        (char *) px->pointer() + sizeof(uint32_t),
+        horizon->datasize(),
+        px->space_remaining() - sizeof(uint32_t));
+#ifdef ENABLE_EVENT_COUNTERS
+    txn_logger::g_evt_avg_log_buffer_compress_time_us.offer(tt.lap());
+    txn_logger::g_evt_log_buffer_bytes_before_compress.inc(horizon->datasize());
+    txn_logger::g_evt_log_buffer_bytes_after_compress.inc(ret);
+#endif
+    INVARIANT(ret > 0);
+#if defined(CHECK_INVARIANTS) && defined(PARANOID_CHECKING)
+    {
+      uint8_t decode_buf[txn_logger::g_horizon_buffer_size];
+      const int decode_ret =
+        LZ4_decompress_safe_partial(
+            (const char *) px->pointer() + sizeof(uint32_t),
+            (char *) &decode_buf[0],
+            ret,
+            txn_logger::g_horizon_buffer_size,
+            txn_logger::g_horizon_buffer_size);
+      INVARIANT(decode_ret >= 0);
+      INVARIANT(size_t(decode_ret) == horizon->datasize());
+      INVARIANT(memcmp(horizon->datastart(),
+                       &decode_buf[0], decode_ret) == 0);
+    }
+#endif
+
+    serializer<uint32_t, false> s_uint32_t;
+    s_uint32_t.write(px->pointer(), ret);
+    px->curoff_ += sizeof(uint32_t) + uint32_t(ret);
+    horizon->reset();
+
+    return ntxns_pushed_to_logger;
+  }
+
+  struct hackstruct {
+    std::atomic<bool> status_;
+    std::atomic<uint64_t> global_tid_;
+    constexpr hackstruct() : status_(false), global_tid_(0) {}
+  };
+
+  // use to simulate global TID for comparsion
+  static util::aligned_padded_elem<hackstruct>
+    g_hack CACHE_ALIGNED;
+
+  struct flags {
+    std::atomic<bool> g_gc_init;
+    std::atomic<bool> g_disable_snapshots;
+    constexpr flags() : g_gc_init(false), g_disable_snapshots(false) {}
+  };
+  static util::aligned_padded_elem<flags> g_flags;
+
+  static percore_lazy<threadctx> g_threadctxs;
+
+  static event_counter g_evt_worker_thread_wait_log_buffer;
+  static event_counter g_evt_dbtuple_no_space_for_delkey;
+  static event_counter g_evt_proto_gc_delete_requeue;
+  static event_avg_counter g_evt_avg_log_entry_size;
+  static event_avg_counter g_evt_avg_proto_gc_queue_len;
+};
+
+bool
+txn_logger::pbuffer::can_hold_tid(uint64_t tid) const
+{
+  return !header()->nentries_ ||
+         (transaction_proto2_static::EpochId(header()->last_tid_) ==
+          transaction_proto2_static::EpochId(tid));
+}
+
+// protocol 2 - no global consistent TIDs
+template <typename Traits>
+class transaction_proto2 : public transaction<transaction_proto2, Traits>,
+                           private transaction_proto2_static {
+
+  friend class transaction<transaction_proto2, Traits>;
+  typedef transaction<transaction_proto2, Traits> super_type;
+
+public:
+
+  typedef Traits traits_type;
+  typedef transaction_base::tid_t tid_t;
+  typedef transaction_base::string_type string_type;
+  typedef typename super_type::dbtuple_write_info dbtuple_write_info;
+  typedef typename super_type::dbtuple_write_info_vec dbtuple_write_info_vec;
+  typedef typename super_type::read_set_map read_set_map;
+  typedef typename super_type::absent_set_map absent_set_map;
+  typedef typename super_type::write_set_map write_set_map;
+  typedef typename super_type::write_set_u32_vec write_set_u32_vec;
+
+  transaction_proto2(uint64_t flags,
+                     typename Traits::StringAllocator &sa)
+    : transaction<transaction_proto2, Traits>(flags, sa)
+  {
+    if (this->get_flags() & transaction_base::TXN_FLAG_READ_ONLY) {
+      const uint64_t global_tick_ex =
+        this->rcu_guard_->guard()->impl().global_last_tick_exclusive();
+      u_.last_consistent_tid = ComputeReadOnlyTid(global_tick_ex);
+    }
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+    dbtuple::TupleLockRegionBegin();
+#endif
+    INVARIANT(rcu::s_instance.in_rcu_region());
+  }
+
+  ~transaction_proto2()
+  {
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+    dbtuple::AssertAllTupleLocksReleased();
+#endif
+    INVARIANT(rcu::s_instance.in_rcu_region());
+  }
+
+  inline bool
+  can_overwrite_record_tid(tid_t prev, tid_t cur) const
+  {
+    INVARIANT(prev <= cur);
+
+#ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
+    if (!IsSnapshotsEnabled())
+      return true;
+#endif
+
+    // XXX(stephentu): the !prev check is a *bit* of a hack-
+    // we're assuming that !prev (MIN_TID) corresponds to an
+    // absent (removed) record, so it is safe to overwrite it,
+    //
+    // This is an OK assumption with *no TID wrap around*.
+    return (to_read_only_tick(EpochId(prev)) ==
+            to_read_only_tick(EpochId(cur))) ||
+           !prev;
+  }
+
+  // can only read elements in this epoch or previous epochs
+  inline bool
+  can_read_tid(tid_t t) const
+  {
+    return true;
+  }
+
+  inline void
+  on_tid_finish(tid_t commit_tid)
+  {
+    if (!txn_logger::IsPersistenceEnabled() ||
+        this->state != transaction_base::TXN_COMMITED)
+      return;
+    // need to write into log buffer
+
+    serializer<uint32_t, true> vs_uint32_t;
+
+    // compute how much space is necessary
+    uint64_t space_needed = 0;
+
+    // 8 bytes to indicate TID
+    space_needed += sizeof(uint64_t);
+
+    // variable bytes to indicate # of records written
+#ifdef LOGGER_UNSAFE_FAKE_COMPRESSION
+    const unsigned nwrites = 0;
+#else
+    const unsigned nwrites = this->write_set.size();
+#endif
+
+    space_needed += vs_uint32_t.nbytes(&nwrites);
+
+    // each record needs to be recorded
+    write_set_u32_vec value_sizes;
+    for (unsigned idx = 0; idx < nwrites; idx++) {
+      const transaction_base::write_record_t &rec = this->write_set[idx];
+      const uint32_t k_nbytes = rec.get_key().size();
+      space_needed += vs_uint32_t.nbytes(&k_nbytes);
+      space_needed += k_nbytes;
+
+      const uint32_t v_nbytes = rec.get_value() ?
+          rec.get_writer()(
+              dbtuple::TUPLE_WRITER_COMPUTE_DELTA_NEEDED,
+              rec.get_value(), nullptr, 0) : 0;
+      space_needed += vs_uint32_t.nbytes(&v_nbytes);
+      space_needed += v_nbytes;
+
+      value_sizes.push_back(v_nbytes);
+    }
+
+    g_evt_avg_log_entry_size.offer(space_needed);
+    INVARIANT(space_needed <= txn_logger::g_horizon_buffer_size);
+    INVARIANT(space_needed <= txn_logger::g_buffer_size);
+
+    const unsigned long my_core_id = coreid::core_id();
+
+    txn_logger::persist_ctx &ctx =
+      txn_logger::persist_ctx_for(my_core_id, txn_logger::INITMODE_REG);
+    txn_logger::persist_stats &stats =
+      txn_logger::g_persist_stats[my_core_id];
+    txn_logger::pbuffer_circbuf &pull_buf = ctx.all_buffers_;
+    txn_logger::pbuffer_circbuf &push_buf = ctx.persist_buffers_;
+
+    util::non_atomic_fetch_add(stats.ntxns_committed_, 1UL);
+
+    const bool do_compress = txn_logger::IsCompressionEnabled();
+    if (do_compress) {
+      // try placing in horizon
+      bool horizon_cond = false;
+      if (ctx.horizon_->space_remaining() < space_needed ||
+          (horizon_cond = !ctx.horizon_->can_hold_tid(commit_tid))) {
+        if (!ctx.horizon_->datasize()) {
+          std::cerr << "space_needed: " << space_needed << std::endl;
+          std::cerr << "space_remaining: " << ctx.horizon_->space_remaining() << std::endl;
+          std::cerr << "can_hold_tid: " << ctx.horizon_->can_hold_tid(commit_tid) << std::endl;
+        }
+        INVARIANT(ctx.horizon_->datasize());
+        // horizon out of space, so we push it
+        const uint64_t npushed =
+          push_horizon_to_buffer(ctx.horizon_, ctx.lz4ctx_, pull_buf, push_buf);
+        if (npushed)
+          util::non_atomic_fetch_add(stats.ntxns_pushed_, npushed);
+      }
+
+      INVARIANT(ctx.horizon_->space_remaining() >= space_needed);
+      const uint64_t written =
+        write_current_txn_into_buffer(ctx.horizon_, commit_tid, value_sizes);
+      if (written != space_needed)
+        INVARIANT(false);
+
+    } else {
+
+    retry:
+      txn_logger::pbuffer *px = wait_for_head(pull_buf);
+      INVARIANT(px && px->core_id_ == my_core_id);
+      bool cond = false;
+      if (px->space_remaining() < space_needed ||
+          (cond = !px->can_hold_tid(commit_tid))) {
+        INVARIANT(px->header()->nentries_);
+        txn_logger::pbuffer *px0 = pull_buf.deq();
+        INVARIANT(px == px0);
+        INVARIANT(px0->header()->nentries_);
+        util::non_atomic_fetch_add(stats.ntxns_pushed_, px0->header()->nentries_);
+        push_buf.enq(px0);
+        if (cond)
+          ++txn_logger::g_evt_log_buffer_epoch_boundary;
+        else
+          ++txn_logger::g_evt_log_buffer_out_of_space;
+        goto retry;
+      }
+
+      const uint64_t written =
+        write_current_txn_into_buffer(px, commit_tid, value_sizes);
+      if (written != space_needed)
+        INVARIANT(false);
+    }
+  }
+
+private:
+
+  // assumes enough space in px to hold this txn
+  inline uint64_t
+  write_current_txn_into_buffer(
+      txn_logger::pbuffer *px,
+      uint64_t commit_tid,
+      const write_set_u32_vec &value_sizes)
+  {
+    INVARIANT(px->can_hold_tid(commit_tid));
+
+    if (unlikely(!px->header()->nentries_))
+      px->earliest_start_us_ = this->rcu_guard_->guard()->start_us();
+
+    uint8_t *p = px->pointer();
+    uint8_t *porig = p;
+
+    serializer<uint32_t, true> vs_uint32_t;
+    serializer<uint64_t, false> s_uint64_t;
+
+#ifdef LOGGER_UNSAFE_FAKE_COMPRESSION
+    const unsigned nwrites = 0;
+#else
+    const unsigned nwrites = this->write_set.size();
+#endif
+
+
+    INVARIANT(nwrites == value_sizes.size());
+
+    p = s_uint64_t.write(p, commit_tid);
+    p = vs_uint32_t.write(p, nwrites);
+
+    for (unsigned idx = 0; idx < nwrites; idx++) {
+      const transaction_base::write_record_t &rec = this->write_set[idx];
+      const uint32_t k_nbytes = rec.get_key().size();
+      p = vs_uint32_t.write(p, k_nbytes);
+      NDB_MEMCPY(p, rec.get_key().data(), k_nbytes);
+      p += k_nbytes;
+      const uint32_t v_nbytes = value_sizes[idx];
+      p = vs_uint32_t.write(p, v_nbytes);
+      if (v_nbytes) {
+        rec.get_writer()(dbtuple::TUPLE_WRITER_DO_DELTA_WRITE, rec.get_value(), p, v_nbytes);
+        p += v_nbytes;
+      }
+    }
+
+    px->curoff_ += (p - porig);
+    px->header()->nentries_++;
+    px->header()->last_tid_ = commit_tid;
+
+    return uint64_t(p - porig);
+  }
+
+public:
+
+  inline ALWAYS_INLINE bool is_snapshot() const {
+    return this->get_flags() & transaction_base::TXN_FLAG_READ_ONLY;
+  }
+
+  inline transaction_base::tid_t
+  snapshot_tid() const
+  {
+#ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
+    if (!IsSnapshotsEnabled())
+      // when snapshots are disabled, but we have a RO txn, we simply allow
+      // it to read all the latest values and treat them as consistent
+      //
+      // it's not correct, but its for the factor analysis
+      return dbtuple::MAX_TID;
+#endif
+    return u_.last_consistent_tid;
+  }
+
+  void
+  dump_debug_info() const
+  {
+    transaction<transaction_proto2, Traits>::dump_debug_info();
+    if (this->is_snapshot())
+      std::cerr << "  last_consistent_tid: "
+        << g_proto_version_str(u_.last_consistent_tid) << std::endl;
+  }
+
+  transaction_base::tid_t
+  gen_commit_tid(const dbtuple_write_info_vec &write_tuples)
+  {
+    const size_t my_core_id = this->rcu_guard_->guard()->core();
+    threadctx &ctx = g_threadctxs.get(my_core_id);
+    INVARIANT(!this->is_snapshot());
+
+    COMPILER_MEMORY_FENCE;
+    u_.commit_epoch = ticker::s_instance.global_current_tick();
+    COMPILER_MEMORY_FENCE;
+
+    tid_t ret = ctx.last_commit_tid_;
+    INVARIANT(ret == dbtuple::MIN_TID || CoreId(ret) == my_core_id);
+    if (u_.commit_epoch != EpochId(ret))
+      ret = MakeTid(0, 0, u_.commit_epoch);
+
+    // What is this? Is txn_proto1_impl used?
+    if (g_hack->status_.load(std::memory_order_acquire))
+      g_hack->global_tid_.fetch_add(1, std::memory_order_acq_rel);
+
+    // XXX(stephentu): I believe this is correct, but not 100% sure
+    //const size_t my_core_id = 0;
+    //tid_t ret = 0;
+    {
+      typename read_set_map::const_iterator it     = this->read_set.begin();
+      typename read_set_map::const_iterator it_end = this->read_set.end();
+      for (; it != it_end; ++it) {
+        if (it->get_tid() > ret)
+          ret = it->get_tid();
+      }
+    }
+
+    {
+      typename dbtuple_write_info_vec::const_iterator it     = write_tuples.begin();
+      typename dbtuple_write_info_vec::const_iterator it_end = write_tuples.end();
+      for (; it != it_end; ++it) {
+        INVARIANT(it->tuple->is_locked());
+        INVARIANT(it->tuple->is_lock_owner());
+        INVARIANT(it->tuple->is_write_intent());
+        INVARIANT(!it->tuple->is_modifying());
+        INVARIANT(it->tuple->is_latest());
+        if (it->is_insert())
+          // we inserted this node, so we don't want to do the checks below
+          continue;
+        const tid_t t = it->tuple->version;
+
+        // XXX(stephentu): we are overly conservative for now- technically this
+        // abort isn't necessary (we really should just write the value in the correct
+        // position)
+        //if (EpochId(t) > u_.commit_epoch) {
+        //  std::cerr << "t: " << g_proto_version_str(t) << std::endl;
+        //  std::cerr << "epoch: " << u_.commit_epoch << std::endl;
+        //  this->dump_debug_info();
+        //}
+
+        // t == dbtuple::MAX_TID when a txn does an insert of a new tuple
+        // followed by 1+ writes to the same tuple.
+        INVARIANT(EpochId(t) <= u_.commit_epoch || t == dbtuple::MAX_TID);
+        if (t != dbtuple::MAX_TID && t > ret)
+          ret = t;
+      }
+
+      INVARIANT(EpochId(ret) == u_.commit_epoch);
+      ret = MakeTid(my_core_id, NumId(ret) + 1, u_.commit_epoch);
+    }
+
+    // XXX(stephentu): this txn hasn't actually been commited yet,
+    // and could potentially be aborted - but it's ok to increase this #, since
+    // subsequent txns on this core will read this # anyways
+    return (ctx.last_commit_tid_ = ret);
+  }
+
+  inline ALWAYS_INLINE void
+  on_dbtuple_spill(dbtuple *tuple_ahead, dbtuple *tuple)
+  {
+#ifdef PROTO2_CAN_DISABLE_GC
+    if (!IsGCEnabled())
+      return;
+#endif
+
+    INVARIANT(rcu::s_instance.in_rcu_region());
+    INVARIANT(!tuple->is_latest());
+
+    // >= not > only b/c of the special case of inserting a new tuple +
+    // overwriting the newly inserted record with a longer sequence of bytes in
+    // the *same* txn
+    INVARIANT(tuple_ahead->version >= tuple->version);
+
+    if (tuple->is_deleting()) {
+      INVARIANT(tuple->is_locked());
+      INVARIANT(tuple->is_lock_owner());
+      // already on queue
+      return;
+    }
+
+    const uint64_t ro_tick = to_read_only_tick(this->u_.commit_epoch);
+    INVARIANT(to_read_only_tick(EpochId(tuple->version)) <= ro_tick);
+
+#ifdef CHECK_INVARIANTS
+    uint64_t exp = 0;
+    INVARIANT(tuple->opaque.compare_exchange_strong(exp, 1, std::memory_order_acq_rel));
+#endif
+
+    // when all snapshots are happening >= the current epoch,
+    // then we can safely remove tuple
+    threadctx &ctx = g_threadctxs.my();
+    ctx.queue_.enqueue(
+        delete_entry(tuple_ahead, tuple_ahead->version,
+          tuple, marked_ptr<std::string>(), nullptr),
+        ro_tick);
+  }
+
+  inline ALWAYS_INLINE void
+  on_logical_delete(dbtuple *tuple, const std::string &key, concurrent_btree *btr)
+  {
+#ifdef PROTO2_CAN_DISABLE_GC
+    if (!IsGCEnabled())
+      return;
+#endif
+
+    INVARIANT(tuple->is_locked());
+    INVARIANT(tuple->is_lock_owner());
+    INVARIANT(tuple->is_write_intent());
+    INVARIANT(tuple->is_latest());
+    INVARIANT(tuple->is_deleting());
+    INVARIANT(!tuple->size);
+    INVARIANT(rcu::s_instance.in_rcu_region());
+
+    const uint64_t ro_tick = to_read_only_tick(this->u_.commit_epoch);
+    threadctx &ctx = g_threadctxs.my();
+
+#ifdef CHECK_INVARIANTS
+    uint64_t exp = 0;
+    INVARIANT(tuple->opaque.compare_exchange_strong(exp, 1, std::memory_order_acq_rel));
+#endif
+
+    if (likely(key.size() <= tuple->alloc_size)) {
+      NDB_MEMCPY(tuple->get_value_start(), key.data(), key.size());
+      tuple->size = key.size();
+
+      // eligible for deletion when all snapshots >= the current epoch
+      marked_ptr<std::string> mpx;
+      mpx.set_flags(0x1);
+
+      ctx.queue_.enqueue(
+          delete_entry(nullptr, tuple->version, tuple, mpx, btr),
+          ro_tick);
+    } else {
+      // this is a rare event
+      ++g_evt_dbtuple_no_space_for_delkey;
+      std::string *spx = nullptr;
+      if (ctx.pool_.empty()) {
+        spx = new std::string(key.data(), key.size()); // XXX: use numa memory?
+      } else {
+        spx = ctx.pool_.front();
+        ctx.pool_.pop_front();
+        spx->assign(key.data(), key.size());
+      }
+      INVARIANT(spx);
+
+      marked_ptr<std::string> mpx(spx);
+      mpx.set_flags(0x1);
+
+      ctx.queue_.enqueue(
+          delete_entry(nullptr, tuple->version, tuple, mpx, btr),
+          ro_tick);
+    }
+  }
+
+  void
+  on_post_rcu_region_completion()
+  {
+#ifdef PROTO2_CAN_DISABLE_GC
+    if (!IsGCEnabled())
+      return;
+#endif
+    const uint64_t last_tick_ex = ticker::s_instance.global_last_tick_exclusive();
+    if (unlikely(!last_tick_ex))
+      return;
+    // we subtract one from the global last tick, because of the way
+    // consistent TIDs are computed, the global_last_tick_exclusive() can
+    // increase by at most one tick during a transaction.
+    const uint64_t ro_tick_ex = to_read_only_tick(last_tick_ex - 1);
+    if (unlikely(!ro_tick_ex))
+      // won't have anything to clean
+      return;
+    // all reads happening at >= ro_tick_geq
+    const uint64_t ro_tick_geq = ro_tick_ex - 1;
+    threadctx &ctx = g_threadctxs.my();
+    clean_up_to_including(ctx, ro_tick_geq);
+  }
+
+private:
+
+  union {
+    // the global epoch this txn is running in (this # is read when it starts)
+    // -- snapshot txns only
+    uint64_t last_consistent_tid;
+    // the epoch for this txn -- committing non-snapshot txns only
+    uint64_t commit_epoch;
+  } u_;
+};
+
+// txn_btree_handler specialization
+template <>
+struct base_txn_btree_handler<transaction_proto2> {
+  static inline void
+  on_construct()
+  {
+#ifndef PROTO2_CAN_DISABLE_GC
+    transaction_proto2_static::InitGC();
+#endif
+  }
+  static const bool has_background_task = true;
+};
+
+template <>
+struct txn_epoch_sync<transaction_proto2> : public transaction_proto2_static {
+  static void
+  sync()
+  {
+    wait_an_epoch();
+    if (txn_logger::IsPersistenceEnabled())
+      txn_logger::wait_until_current_point_persisted();
+  }
+  static void
+  finish()
+  {
+    if (txn_logger::IsPersistenceEnabled())
+      txn_logger::wait_until_current_point_persisted();
+  }
+  static void
+  thread_init(bool loader)
+  {
+    if (!txn_logger::IsPersistenceEnabled())
+      return;
+    const unsigned long my_core_id = coreid::core_id();
+    // try to initialize using numa allocator
+    txn_logger::persist_ctx_for(
+        my_core_id,
+        loader ? txn_logger::INITMODE_REG : txn_logger::INITMODE_RCU);
+  }
+  static void
+  thread_end()
+  {
+    if (!txn_logger::IsPersistenceEnabled())
+      return;
+    const unsigned long my_core_id = coreid::core_id();
+    txn_logger::persist_ctx &ctx =
+      txn_logger::persist_ctx_for(my_core_id, txn_logger::INITMODE_NONE);
+    if (unlikely(!ctx.init_))
+      return;
+    txn_logger::persist_stats &stats =
+      txn_logger::g_persist_stats[my_core_id];
+    txn_logger::pbuffer_circbuf &pull_buf = ctx.all_buffers_;
+    txn_logger::pbuffer_circbuf &push_buf = ctx.persist_buffers_;
+    if (txn_logger::IsCompressionEnabled() &&
+        ctx.horizon_->header()->nentries_) {
+      INVARIANT(ctx.horizon_->datasize());
+      const uint64_t npushed =
+        push_horizon_to_buffer(ctx.horizon_, ctx.lz4ctx_, pull_buf, push_buf);
+      if (npushed)
+        util::non_atomic_fetch_add(stats.ntxns_pushed_, npushed);
+    }
+    txn_logger::pbuffer *px = pull_buf.peek();
+    if (!px || !px->header()->nentries_) {
+      //std::cerr << "core " << my_core_id
+      //          << " nothing to push to logger" << std::endl;
+      return;
+    }
+    //std::cerr << "core " << my_core_id
+    //          << " pushing buffer to logger" << std::endl;
+    txn_logger::pbuffer *px0 = pull_buf.deq();
+    util::non_atomic_fetch_add(stats.ntxns_pushed_, px0->header()->nentries_);
+    INVARIANT(px0 == px);
+    push_buf.enq(px0);
+  }
+  static std::tuple<uint64_t, uint64_t, double>
+  compute_ntxn_persisted()
+  {
+    if (!txn_logger::IsPersistenceEnabled())
+      return std::make_tuple(0, 0, 0.0);
+    return txn_logger::compute_ntxns_persisted_statistics();
+  }
+  static void
+  reset_ntxn_persisted()
+  {
+    if (!txn_logger::IsPersistenceEnabled())
+      return;
+    txn_logger::clear_ntxns_persisted_statistics();
+  }
+};
+
+#endif /* _NDB_TXN_PROTO2_IMPL_H_ */