benchmark silo added
[c11concurrency-benchmarks.git] / silo / txn_proto2_impl.cc
diff --git a/silo/txn_proto2_impl.cc b/silo/txn_proto2_impl.cc
new file mode 100644 (file)
index 0000000..fad91be
--- /dev/null
@@ -0,0 +1,716 @@
+#include <iostream>
+#include <thread>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/uio.h>
+#include <limits.h>
+#include <numa.h>
+
+#include "txn_proto2_impl.h"
+#include "counter.h"
+#include "util.h"
+
+using namespace std;
+using namespace util;
+
+                    /** logger subsystem **/
+/*{{{*/
+bool txn_logger::g_persist = false;
+bool txn_logger::g_call_fsync = true;
+bool txn_logger::g_use_compression = false;
+bool txn_logger::g_fake_writes = false;
+size_t txn_logger::g_nworkers = 0;
+txn_logger::epoch_array
+  txn_logger::per_thread_sync_epochs_[txn_logger::g_nmax_loggers];
+aligned_padded_elem<atomic<uint64_t>>
+  txn_logger::system_sync_epoch_(0);
+percore<txn_logger::persist_ctx>
+  txn_logger::g_persist_ctxs;
+percore<txn_logger::persist_stats>
+  txn_logger::g_persist_stats;
+event_counter
+  txn_logger::g_evt_log_buffer_epoch_boundary("log_buffer_epoch_boundary");
+event_counter
+  txn_logger::g_evt_log_buffer_out_of_space("log_buffer_out_of_space");
+event_counter
+  txn_logger::g_evt_log_buffer_bytes_before_compress("log_buffer_bytes_before_compress");
+event_counter
+  txn_logger::g_evt_log_buffer_bytes_after_compress("log_buffer_bytes_after_compress");
+event_counter
+  txn_logger::g_evt_logger_writev_limit_met("logger_writev_limit_met");
+event_counter
+  txn_logger::g_evt_logger_max_lag_wait("logger_max_lag_wait");
+event_avg_counter
+  txn_logger::g_evt_avg_log_buffer_compress_time_us("avg_log_buffer_compress_time_us");
+event_avg_counter
+  txn_logger::g_evt_avg_log_entry_ntxns("avg_log_entry_ntxns_per_entry");
+event_avg_counter
+  txn_logger::g_evt_avg_logger_bytes_per_writev("avg_logger_bytes_per_writev");
+event_avg_counter
+  txn_logger::g_evt_avg_logger_bytes_per_sec("avg_logger_bytes_per_sec");
+
+static event_avg_counter
+  evt_avg_log_buffer_iov_len("avg_log_buffer_iov_len");
+
+void
+txn_logger::Init(
+    size_t nworkers,
+    const vector<string> &logfiles,
+    const vector<vector<unsigned>> &assignments_given,
+    vector<vector<unsigned>> *assignments_used,
+    bool call_fsync,
+    bool use_compression,
+    bool fake_writes)
+{
+  INVARIANT(!g_persist);
+  INVARIANT(g_nworkers == 0);
+  INVARIANT(nworkers > 0);
+  INVARIANT(!logfiles.empty());
+  INVARIANT(logfiles.size() <= g_nmax_loggers);
+  INVARIANT(!use_compression || g_perthread_buffers > 1); // need 1 as scratch buf
+  vector<int> fds;
+  for (auto &fname : logfiles) {
+    int fd = open(fname.c_str(), O_CREAT|O_WRONLY|O_TRUNC, 0664);
+    if (fd == -1) {
+      perror("open");
+      ALWAYS_ASSERT(false);
+    }
+    fds.push_back(fd);
+  }
+  g_persist = true;
+  g_call_fsync = call_fsync;
+  g_use_compression = use_compression;
+  g_fake_writes = fake_writes;
+  g_nworkers = nworkers;
+
+  for (size_t i = 0; i < g_nmax_loggers; i++)
+    for (size_t j = 0; j < g_nworkers; j++)
+      per_thread_sync_epochs_[i].epochs_[j].store(0, memory_order_release);
+
+  vector<thread> writers;
+  vector<vector<unsigned>> assignments(assignments_given);
+
+  if (assignments.empty()) {
+    // compute assuming homogenous disks
+    if (g_nworkers <= fds.size()) {
+      // each thread gets its own logging worker
+      for (size_t i = 0; i < g_nworkers; i++)
+        assignments.push_back({(unsigned) i});
+    } else {
+      // XXX: currently we assume each logger is equally as fast- we should
+      // adjust ratios accordingly for non-homogenous loggers
+      const size_t threads_per_logger = g_nworkers / fds.size();
+      for (size_t i = 0; i < fds.size(); i++) {
+        assignments.emplace_back(
+            MakeRange<unsigned>(
+              i * threads_per_logger,
+              ((i + 1) == fds.size()) ?  g_nworkers : (i + 1) * threads_per_logger));
+      }
+    }
+  }
+
+  INVARIANT(AssignmentsValid(assignments, fds.size(), g_nworkers));
+
+  for (size_t i = 0; i < assignments.size(); i++) {
+    writers.emplace_back(
+        &txn_logger::writer,
+        i, fds[i], assignments[i]);
+    writers.back().detach();
+  }
+
+  thread persist_thread(&txn_logger::persister, assignments);
+  persist_thread.detach();
+
+  if (assignments_used)
+    *assignments_used = assignments;
+}
+
+void
+txn_logger::persister(
+    vector<vector<unsigned>> assignments)
+{
+  timer loop_timer;
+  for (;;) {
+    const uint64_t last_loop_usec = loop_timer.lap();
+    const uint64_t delay_time_usec = ticker::tick_us;
+    if (last_loop_usec < delay_time_usec) {
+      const uint64_t sleep_ns = (delay_time_usec - last_loop_usec) * 1000;
+      struct timespec t;
+      t.tv_sec  = sleep_ns / ONE_SECOND_NS;
+      t.tv_nsec = sleep_ns % ONE_SECOND_NS;
+      nanosleep(&t, nullptr);
+    }
+    advance_system_sync_epoch(assignments);
+  }
+}
+
+void
+txn_logger::advance_system_sync_epoch(
+    const vector<vector<unsigned>> &assignments)
+{
+  uint64_t min_so_far = numeric_limits<uint64_t>::max();
+  const uint64_t best_tick_ex =
+    ticker::s_instance.global_current_tick();
+  // special case 0
+  const uint64_t best_tick_inc =
+    best_tick_ex ? (best_tick_ex - 1) : 0;
+
+  for (size_t i = 0; i < assignments.size(); i++)
+    for (auto j : assignments[i])
+      for (size_t k = j; k < NMAXCORES; k += g_nworkers) {
+        persist_ctx &ctx = persist_ctx_for(k, INITMODE_NONE);
+        // we need to arbitrarily advance threads which are not "doing
+        // anything", so they don't drag down the persistence of the system. if
+        // we can see that a thread is NOT in a guarded section AND its
+        // core->logger queue is empty, then that means we can advance its sync
+        // epoch up to best_tick_inc, b/c it is guaranteed that the next time
+        // it does any actions will be in epoch > best_tick_inc
+        if (!ctx.persist_buffers_.peek()) {
+          spinlock &l = ticker::s_instance.lock_for(k);
+          if (!l.is_locked()) {
+            bool did_lock = false;
+            for (size_t c = 0; c < 3; c++) {
+              if (l.try_lock()) {
+                did_lock = true;
+                break;
+              }
+            }
+            if (did_lock) {
+              if (!ctx.persist_buffers_.peek()) {
+                min_so_far = min(min_so_far, best_tick_inc);
+                per_thread_sync_epochs_[i].epochs_[k].store(
+                    best_tick_inc, memory_order_release);
+                l.unlock();
+                continue;
+              }
+              l.unlock();
+            }
+          }
+        }
+        min_so_far = min(
+            per_thread_sync_epochs_[i].epochs_[k].load(
+              memory_order_acquire),
+            min_so_far);
+      }
+
+  const uint64_t syssync =
+    system_sync_epoch_->load(memory_order_acquire);
+
+  INVARIANT(min_so_far < numeric_limits<uint64_t>::max());
+  INVARIANT(syssync <= min_so_far);
+
+  // need to aggregate from [syssync + 1, min_so_far]
+  const uint64_t now_us = timer::cur_usec();
+  for (size_t i = 0; i < g_persist_stats.size(); i++) {
+    auto &ps = g_persist_stats[i];
+    for (uint64_t e = syssync + 1; e <= min_so_far; e++) {
+        auto &pes = ps.d_[e % g_max_lag_epochs];
+        const uint64_t ntxns_in_epoch = pes.ntxns_.load(memory_order_acquire);
+        const uint64_t start_us = pes.earliest_start_us_.load(memory_order_acquire);
+        INVARIANT(now_us >= start_us);
+        non_atomic_fetch_add(ps.ntxns_persisted_, ntxns_in_epoch);
+        non_atomic_fetch_add(
+            ps.latency_numer_,
+            (now_us - start_us) * ntxns_in_epoch);
+        pes.ntxns_.store(0, memory_order_release);
+        pes.earliest_start_us_.store(0, memory_order_release);
+    }
+  }
+
+  system_sync_epoch_->store(min_so_far, memory_order_release);
+}
+
+void
+txn_logger::writer(
+    unsigned id, int fd,
+    vector<unsigned> assignment)
+{
+
+  if (g_pin_loggers_to_numa_nodes) {
+    ALWAYS_ASSERT(!numa_run_on_node(id % numa_num_configured_nodes()));
+    ALWAYS_ASSERT(!sched_yield());
+  }
+
+  vector<iovec> iovs(
+      min(size_t(IOV_MAX), g_nworkers * g_perthread_buffers));
+  vector<pbuffer *> pxs;
+  timer loop_timer;
+
+  // XXX: sense is not useful for now, unless we want to
+  // fsync in the background...
+  bool sense = false; // cur is at sense, prev is at !sense
+  uint64_t epoch_prefixes[2][NMAXCORES];
+
+  NDB_MEMSET(&epoch_prefixes[0], 0, sizeof(epoch_prefixes[0]));
+  NDB_MEMSET(&epoch_prefixes[1], 0, sizeof(epoch_prefixes[1]));
+
+  // NOTE: a core id in the persistence system really represets
+  // all cores in the regular system modulo g_nworkers
+  size_t nbufswritten = 0, nbyteswritten = 0;
+  for (;;) {
+
+    const uint64_t last_loop_usec = loop_timer.lap();
+    const uint64_t delay_time_usec = ticker::tick_us;
+    // don't allow this loop to proceed less than an epoch's worth of time,
+    // so we can batch IO
+    if (last_loop_usec < delay_time_usec && nbufswritten < iovs.size()) {
+      const uint64_t sleep_ns = (delay_time_usec - last_loop_usec) * 1000;
+      struct timespec t;
+      t.tv_sec  = sleep_ns / ONE_SECOND_NS;
+      t.tv_nsec = sleep_ns % ONE_SECOND_NS;
+      nanosleep(&t, nullptr);
+    }
+
+    // we need g_persist_stats[cur_sync_epoch_ex % g_nmax_loggers]
+    // to remain untouched (until the syncer can catch up), so we
+    // cannot read any buffers with epoch >=
+    // (cur_sync_epoch_ex + g_max_lag_epochs)
+    const uint64_t cur_sync_epoch_ex =
+      system_sync_epoch_->load(memory_order_acquire) + 1;
+    nbufswritten = nbyteswritten = 0;
+    for (auto idx : assignment) {
+      INVARIANT(idx >= 0 && idx < g_nworkers);
+      for (size_t k = idx; k < NMAXCORES; k += g_nworkers) {
+        persist_ctx &ctx = persist_ctx_for(k, INITMODE_NONE);
+        ctx.persist_buffers_.peekall(pxs);
+        for (auto px : pxs) {
+          INVARIANT(px);
+          INVARIANT(!px->io_scheduled_);
+          INVARIANT(nbufswritten <= iovs.size());
+          INVARIANT(px->header()->nentries_);
+          INVARIANT(px->core_id_ == k);
+          if (nbufswritten == iovs.size()) {
+            ++g_evt_logger_writev_limit_met;
+            goto process;
+          }
+          if (transaction_proto2_static::EpochId(px->header()->last_tid_) >=
+              cur_sync_epoch_ex + g_max_lag_epochs) {
+            ++g_evt_logger_max_lag_wait;
+            break;
+          }
+          iovs[nbufswritten].iov_base = (void *) &px->buf_start_[0];
+
+#ifdef LOGGER_UNSAFE_REDUCE_BUFFER_SIZE
+  #define PXLEN(px) (((px)->curoff_ < 4) ? (px)->curoff_ : ((px)->curoff_ / 4))
+#else
+  #define PXLEN(px) ((px)->curoff_)
+#endif
+
+          const size_t pxlen = PXLEN(px);
+
+          iovs[nbufswritten].iov_len = pxlen;
+          evt_avg_log_buffer_iov_len.offer(pxlen);
+          px->io_scheduled_ = true;
+          nbufswritten++;
+          nbyteswritten += pxlen;
+
+#ifdef CHECK_INVARIANTS
+          auto last_tid_cid = transaction_proto2_static::CoreId(px->header()->last_tid_);
+          auto px_cid = px->core_id_;
+          if (last_tid_cid != px_cid) {
+            cerr << "header: " << *px->header() << endl;
+            cerr << g_proto_version_str(last_tid_cid) << endl;
+            cerr << "last_tid_cid: " << last_tid_cid << endl;
+            cerr << "px_cid: " << px_cid << endl;
+          }
+#endif
+
+          const uint64_t px_epoch =
+            transaction_proto2_static::EpochId(px->header()->last_tid_);
+          INVARIANT(
+              transaction_proto2_static::CoreId(px->header()->last_tid_) ==
+              px->core_id_);
+          INVARIANT(epoch_prefixes[sense][k] <= px_epoch);
+          INVARIANT(px_epoch > 0);
+          epoch_prefixes[sense][k] = px_epoch - 1;
+          auto &pes = g_persist_stats[k].d_[px_epoch % g_max_lag_epochs];
+          if (!pes.ntxns_.load(memory_order_acquire))
+            pes.earliest_start_us_.store(px->earliest_start_us_, memory_order_release);
+          non_atomic_fetch_add(pes.ntxns_, px->header()->nentries_);
+          g_evt_avg_log_entry_ntxns.offer(px->header()->nentries_);
+        }
+      }
+    }
+
+  process:
+    if (!nbufswritten) {
+      // XXX: should probably sleep here
+      nop_pause();
+      continue;
+    }
+
+    const bool dosense = sense;
+
+    if (!g_fake_writes) {
+#ifdef ENABLE_EVENT_COUNTERS
+      timer write_timer;
+#endif
+      const ssize_t ret = writev(fd, &iovs[0], nbufswritten);
+      if (unlikely(ret == -1)) {
+        perror("writev");
+        ALWAYS_ASSERT(false);
+      }
+
+      if (g_call_fsync) {
+        const int fret = fdatasync(fd);
+        if (unlikely(fret == -1)) {
+          perror("fdatasync");
+          ALWAYS_ASSERT(false);
+        }
+      }
+
+#ifdef ENABLE_EVENT_COUNTERS
+      {
+        g_evt_avg_logger_bytes_per_writev.offer(nbyteswritten);
+        const double bytes_per_sec =
+          double(nbyteswritten)/(write_timer.lap_ms() / 1000.0);
+        g_evt_avg_logger_bytes_per_sec.offer(bytes_per_sec);
+      }
+#endif
+    }
+
+    // update metadata from previous write
+    //
+    // return all buffers that have been io_scheduled_ - we can do this as
+    // soon as write returns. we take care to return to the proper buffer
+    epoch_array &ea = per_thread_sync_epochs_[id];
+    for (auto idx: assignment) {
+      for (size_t k = idx; k < NMAXCORES; k += g_nworkers) {
+        const uint64_t x0 = ea.epochs_[k].load(memory_order_acquire);
+        const uint64_t x1 = epoch_prefixes[dosense][k];
+        if (x1 > x0)
+          ea.epochs_[k].store(x1, memory_order_release);
+
+        persist_ctx &ctx = persist_ctx_for(k, INITMODE_NONE);
+        pbuffer *px, *px0;
+        while ((px = ctx.persist_buffers_.peek()) && px->io_scheduled_) {
+#ifdef LOGGER_STRIDE_OVER_BUFFER
+          {
+            const size_t pxlen = PXLEN(px);
+            const size_t stridelen = 1;
+            for (size_t p = 0; p < pxlen; p += stridelen)
+              if ((&px->buf_start_[0])[p] & 0xF)
+                non_atomic_fetch_add(ea.dummy_work_, 1UL);
+          }
+#endif
+          px0 = ctx.persist_buffers_.deq();
+          INVARIANT(px == px0);
+          INVARIANT(px->header()->nentries_);
+          px0->reset();
+          INVARIANT(ctx.init_);
+          INVARIANT(px0->core_id_ == k);
+          ctx.all_buffers_.enq(px0);
+        }
+      }
+    }
+
+    // bump the sense
+    sense = !sense;
+  }
+}
+
+tuple<uint64_t, uint64_t, double>
+txn_logger::compute_ntxns_persisted_statistics()
+{
+  uint64_t acc = 0, acc1 = 0, acc2 = 0;
+  uint64_t num = 0;
+  for (size_t i = 0; i < g_persist_stats.size(); i++) {
+    acc  += g_persist_stats[i].ntxns_persisted_.load(memory_order_acquire);
+    acc1 += g_persist_stats[i].ntxns_pushed_.load(memory_order_acquire);
+    acc2 += g_persist_stats[i].ntxns_committed_.load(memory_order_acquire);
+    num  += g_persist_stats[i].latency_numer_.load(memory_order_acquire);
+  }
+  INVARIANT(acc <= acc1);
+  INVARIANT(acc1 <= acc2);
+  if (acc == 0)
+    return make_tuple(0, acc1, 0.0);
+  return make_tuple(acc, acc1, double(num)/double(acc));
+}
+
+void
+txn_logger::clear_ntxns_persisted_statistics()
+{
+  for (size_t i = 0; i < g_persist_stats.size(); i++) {
+    auto &ps = g_persist_stats[i];
+    ps.ntxns_persisted_.store(0, memory_order_release);
+    ps.ntxns_pushed_.store(0, memory_order_release);
+    ps.ntxns_committed_.store(0, memory_order_release);
+    ps.latency_numer_.store(0, memory_order_release);
+    for (size_t e = 0; e < g_max_lag_epochs; e++) {
+      auto &pes = ps.d_[e];
+      pes.ntxns_.store(0, memory_order_release);
+      pes.earliest_start_us_.store(0, memory_order_release);
+    }
+  }
+}
+
+void
+txn_logger::wait_for_idle_state()
+{
+  for (size_t i = 0; i < NMAXCORES; i++) {
+    persist_ctx &ctx = persist_ctx_for(i, INITMODE_NONE);
+    if (!ctx.init_)
+      continue;
+    pbuffer *px;
+    while (!(px = ctx.all_buffers_.peek()) || px->header()->nentries_)
+      nop_pause();
+    while (ctx.persist_buffers_.peek())
+      nop_pause();
+  }
+}
+
+void
+txn_logger::wait_until_current_point_persisted()
+{
+  const uint64_t e = ticker::s_instance.global_current_tick();
+  cerr << "waiting for system_sync_epoch_="
+       << system_sync_epoch_->load(memory_order_acquire)
+       << " to be < e=" << e << endl;
+  while (system_sync_epoch_->load(memory_order_acquire) < e)
+    nop_pause();
+}
+/*}}}*/
+
+                /** garbage collection subsystem **/
+
+static event_counter evt_local_chain_cleanups("local_chain_cleanups");
+static event_counter evt_try_delete_unlinks("try_delete_unlinks");
+static event_avg_counter evt_avg_time_inbetween_ro_epochs_usec(
+    "avg_time_inbetween_ro_epochs_usec");
+
+void
+transaction_proto2_static::InitGC()
+{
+  g_flags->g_gc_init.store(true, memory_order_release);
+}
+
+static void
+sleep_ro_epoch()
+{
+  const uint64_t sleep_ns = transaction_proto2_static::ReadOnlyEpochUsec * 1000;
+  struct timespec t;
+  t.tv_sec  = sleep_ns / ONE_SECOND_NS;
+  t.tv_nsec = sleep_ns % ONE_SECOND_NS;
+  nanosleep(&t, nullptr);
+}
+
+void
+transaction_proto2_static::PurgeThreadOutstandingGCTasks()
+{
+#ifdef PROTO2_CAN_DISABLE_GC
+  if (!IsGCEnabled())
+    return;
+#endif
+  INVARIANT(!rcu::s_instance.in_rcu_region());
+  threadctx &ctx = g_threadctxs.my();
+  uint64_t e;
+  if (!ctx.queue_.get_latest_epoch(e))
+    return;
+  // wait until we can clean up e
+  for (;;) {
+    const uint64_t last_tick_ex = ticker::s_instance.global_last_tick_exclusive();
+    const uint64_t ro_tick_ex = to_read_only_tick(last_tick_ex);
+    if (unlikely(!ro_tick_ex)) {
+      sleep_ro_epoch();
+      continue;
+    }
+    const uint64_t ro_tick_geq = ro_tick_ex - 1;
+    if (ro_tick_geq < e) {
+      sleep_ro_epoch();
+      continue;
+    }
+    break;
+  }
+  clean_up_to_including(ctx, e);
+  INVARIANT(ctx.queue_.empty());
+}
+
+//#ifdef CHECK_INVARIANTS
+//// make sure hidden is blocked by version e, when traversing from start
+//static bool
+//IsBlocked(dbtuple *start, dbtuple *hidden, uint64_t e)
+//{
+//  dbtuple *c = start;
+//  while (c) {
+//    if (c == hidden)
+//      return false;
+//    if (c->is_not_behind(e))
+//      // blocked
+//      return true;
+//    c = c->next;
+//  }
+//  ALWAYS_ASSERT(false); // hidden should be found on chain
+//}
+//#endif
+
+void
+transaction_proto2_static::clean_up_to_including(threadctx &ctx, uint64_t ro_tick_geq)
+{
+  INVARIANT(!rcu::s_instance.in_rcu_region());
+  INVARIANT(ctx.last_reaped_epoch_ <= ro_tick_geq);
+  INVARIANT(ctx.scratch_.empty());
+  if (ctx.last_reaped_epoch_ == ro_tick_geq)
+    return;
+
+#ifdef ENABLE_EVENT_COUNTERS
+  const uint64_t now = timer::cur_usec();
+  if (ctx.last_reaped_timestamp_us_ > 0) {
+    const uint64_t diff = now - ctx.last_reaped_timestamp_us_;
+    evt_avg_time_inbetween_ro_epochs_usec.offer(diff);
+  }
+  ctx.last_reaped_timestamp_us_ = now;
+#endif
+  ctx.last_reaped_epoch_ = ro_tick_geq;
+
+#ifdef CHECK_INVARIANTS
+  const uint64_t last_tick_ex = ticker::s_instance.global_last_tick_exclusive();
+  INVARIANT(last_tick_ex);
+  const uint64_t last_consistent_tid = ComputeReadOnlyTid(last_tick_ex - 1);
+  const uint64_t computed_last_tick_ex = ticker::s_instance.compute_global_last_tick_exclusive();
+  INVARIANT(last_tick_ex <= computed_last_tick_ex);
+  INVARIANT(to_read_only_tick(last_tick_ex) > ro_tick_geq);
+#endif
+
+  // XXX: hacky
+  char rcu_guard[sizeof(scoped_rcu_base<false>)] = {0};
+  const size_t max_niters_with_rcu = 128;
+#define ENTER_RCU() \
+    do { \
+      new (&rcu_guard[0]) scoped_rcu_base<false>(); \
+    } while (0)
+#define EXIT_RCU() \
+    do { \
+      scoped_rcu_base<false> *px = (scoped_rcu_base<false> *) &rcu_guard[0]; \
+      px->~scoped_rcu_base<false>(); \
+    } while (0)
+
+  ctx.scratch_.empty_accept_from(ctx.queue_, ro_tick_geq);
+  ctx.scratch_.transfer_freelist(ctx.queue_);
+  px_queue &q = ctx.scratch_;
+  if (q.empty())
+    return;
+  bool in_rcu = false;
+  size_t niters_with_rcu = 0, n = 0;
+  for (auto it = q.begin(); it != q.end(); ++it, ++n, ++niters_with_rcu) {
+    auto &delent = *it;
+    INVARIANT(delent.tuple()->opaque.load(std::memory_order_acquire) == 1);
+    if (!delent.key_.get_flags()) {
+      // guaranteed to be gc-able now (even w/o RCU)
+#ifdef CHECK_INVARIANTS
+      if (delent.trigger_tid_ > last_consistent_tid /*|| !IsBlocked(delent.tuple_ahead_, delent.tuple(), last_consistent_tid) */) {
+        cerr << "tuple ahead     : " << g_proto_version_str(delent.tuple_ahead_->version) << endl;
+        cerr << "tuple ahead     : " << *delent.tuple_ahead_ << endl;
+        cerr << "trigger tid     : " << g_proto_version_str(delent.trigger_tid_) << endl;
+        cerr << "tuple           : " << g_proto_version_str(delent.tuple()->version) << endl;
+        cerr << "last_consist_tid: " << g_proto_version_str(last_consistent_tid) << endl;
+        cerr << "last_tick_ex    : " << last_tick_ex << endl;
+        cerr << "ro_tick_geq     : " << ro_tick_geq << endl;
+        cerr << "rcu_block_tick  : " << it.tick() << endl;
+      }
+      INVARIANT(delent.trigger_tid_ <= last_consistent_tid);
+      delent.tuple()->opaque.store(0, std::memory_order_release);
+#endif
+      dbtuple::release_no_rcu(delent.tuple());
+    } else {
+      INVARIANT(!delent.tuple_ahead_);
+      INVARIANT(delent.btr_);
+      // check if an element preceeds the (deleted) tuple before doing the delete
+      ::lock_guard<dbtuple> lg_tuple(delent.tuple(), false);
+#ifdef CHECK_INVARIANTS
+      if (!delent.tuple()->is_not_behind(last_consistent_tid)) {
+        cerr << "trigger tid     : " << g_proto_version_str(delent.trigger_tid_) << endl;
+        cerr << "tuple           : " << g_proto_version_str(delent.tuple()->version) << endl;
+        cerr << "last_consist_tid: " << g_proto_version_str(last_consistent_tid) << endl;
+        cerr << "last_tick_ex    : " << last_tick_ex << endl;
+        cerr << "ro_tick_geq     : " << ro_tick_geq << endl;
+        cerr << "rcu_block_tick  : " << it.tick() << endl;
+      }
+      INVARIANT(delent.tuple()->version == delent.trigger_tid_);
+      INVARIANT(delent.tuple()->is_not_behind(last_consistent_tid));
+      INVARIANT(delent.tuple()->is_deleting());
+#endif
+      if (unlikely(!delent.tuple()->is_latest())) {
+        // requeue it up, except this time as a regular delete
+        const uint64_t my_ro_tick = to_read_only_tick(
+            ticker::s_instance.global_current_tick());
+        ctx.queue_.enqueue(
+            delete_entry(
+              nullptr,
+              MakeTid(CoreMask, NumIdMask >> NumIdShift, (my_ro_tick + 1) * ReadOnlyEpochMultiplier - 1),
+              delent.tuple(),
+              marked_ptr<string>(),
+              nullptr),
+            my_ro_tick);
+        ++g_evt_proto_gc_delete_requeue;
+        // reclaim string ptrs
+        string *spx = delent.key_.get();
+        if (unlikely(spx))
+          ctx.pool_.emplace_back(spx);
+        continue;
+      }
+#ifdef CHECK_INVARIANTS
+      delent.tuple()->opaque.store(0, std::memory_order_release);
+#endif
+      // if delent.key_ is nullptr, then the key is stored in the tuple
+      // record storage location, and the size field contains the length of
+      // the key
+      //
+      // otherwise, delent.key_ is a pointer to a string containing the
+      // key
+      varkey k;
+      string *spx = delent.key_.get();
+      if (likely(!spx)) {
+        k = varkey(delent.tuple()->get_value_start(), delent.tuple()->size);
+      } else {
+        k = varkey(*spx);
+        ctx.pool_.emplace_back(spx);
+      }
+
+      if (!in_rcu) {
+        ENTER_RCU();
+        niters_with_rcu = 0;
+        in_rcu = true;
+      }
+      typename concurrent_btree::value_type removed = 0;
+      const bool did_remove = delent.btr_->remove(k, &removed);
+      ALWAYS_ASSERT(did_remove);
+      INVARIANT(removed == (typename concurrent_btree::value_type) delent.tuple());
+      delent.tuple()->clear_latest();
+      dbtuple::release(delent.tuple()); // rcu free it
+    }
+
+    if (in_rcu && niters_with_rcu >= max_niters_with_rcu) {
+      EXIT_RCU();
+      niters_with_rcu = 0;
+      in_rcu = false;
+    }
+  }
+  q.clear();
+  g_evt_avg_proto_gc_queue_len.offer(n);
+
+  if (in_rcu)
+    EXIT_RCU();
+  INVARIANT(!rcu::s_instance.in_rcu_region());
+}
+
+aligned_padded_elem<transaction_proto2_static::hackstruct>
+  transaction_proto2_static::g_hack;
+aligned_padded_elem<transaction_proto2_static::flags>
+  transaction_proto2_static::g_flags;
+percore_lazy<transaction_proto2_static::threadctx>
+  transaction_proto2_static::g_threadctxs;
+event_counter
+  transaction_proto2_static::g_evt_worker_thread_wait_log_buffer(
+      "worker_thread_wait_log_buffer");
+event_counter
+  transaction_proto2_static::g_evt_dbtuple_no_space_for_delkey(
+      "dbtuple_no_space_for_delkey");
+event_counter
+  transaction_proto2_static::g_evt_proto_gc_delete_requeue(
+      "proto_gc_delete_requeue");
+event_avg_counter
+  transaction_proto2_static::g_evt_avg_log_entry_size(
+      "avg_log_entry_size");
+event_avg_counter
+  transaction_proto2_static::g_evt_avg_proto_gc_queue_len(
+      "avg_proto_gc_queue_len");