edit scripts
[c11concurrency-benchmarks.git] / silo / txn_proto2_impl.cc
1 #include <iostream>
2 #include <thread>
3 #include <fcntl.h>
4 #include <unistd.h>
5 #include <sys/uio.h>
6 #include <limits.h>
7 #include <numa.h>
8
9 #include "txn_proto2_impl.h"
10 #include "counter.h"
11 #include "util.h"
12
13 using namespace std;
14 using namespace util;
15
16                     /** logger subsystem **/
17 /*{{{*/
18 bool txn_logger::g_persist = false;
19 bool txn_logger::g_call_fsync = true;
20 bool txn_logger::g_use_compression = false;
21 bool txn_logger::g_fake_writes = false;
22 size_t txn_logger::g_nworkers = 0;
23 txn_logger::epoch_array
24   txn_logger::per_thread_sync_epochs_[txn_logger::g_nmax_loggers];
25 aligned_padded_elem<atomic<uint64_t>>
26   txn_logger::system_sync_epoch_(0);
27 percore<txn_logger::persist_ctx>
28   txn_logger::g_persist_ctxs;
29 percore<txn_logger::persist_stats>
30   txn_logger::g_persist_stats;
31 event_counter
32   txn_logger::g_evt_log_buffer_epoch_boundary("log_buffer_epoch_boundary");
33 event_counter
34   txn_logger::g_evt_log_buffer_out_of_space("log_buffer_out_of_space");
35 event_counter
36   txn_logger::g_evt_log_buffer_bytes_before_compress("log_buffer_bytes_before_compress");
37 event_counter
38   txn_logger::g_evt_log_buffer_bytes_after_compress("log_buffer_bytes_after_compress");
39 event_counter
40   txn_logger::g_evt_logger_writev_limit_met("logger_writev_limit_met");
41 event_counter
42   txn_logger::g_evt_logger_max_lag_wait("logger_max_lag_wait");
43 event_avg_counter
44   txn_logger::g_evt_avg_log_buffer_compress_time_us("avg_log_buffer_compress_time_us");
45 event_avg_counter
46   txn_logger::g_evt_avg_log_entry_ntxns("avg_log_entry_ntxns_per_entry");
47 event_avg_counter
48   txn_logger::g_evt_avg_logger_bytes_per_writev("avg_logger_bytes_per_writev");
49 event_avg_counter
50   txn_logger::g_evt_avg_logger_bytes_per_sec("avg_logger_bytes_per_sec");
51
52 static event_avg_counter
53   evt_avg_log_buffer_iov_len("avg_log_buffer_iov_len");
54
55 void
56 txn_logger::Init(
57     size_t nworkers,
58     const vector<string> &logfiles,
59     const vector<vector<unsigned>> &assignments_given,
60     vector<vector<unsigned>> *assignments_used,
61     bool call_fsync,
62     bool use_compression,
63     bool fake_writes)
64 {
65   INVARIANT(!g_persist);
66   INVARIANT(g_nworkers == 0);
67   INVARIANT(nworkers > 0);
68   INVARIANT(!logfiles.empty());
69   INVARIANT(logfiles.size() <= g_nmax_loggers);
70   INVARIANT(!use_compression || g_perthread_buffers > 1); // need 1 as scratch buf
71   vector<int> fds;
72   for (auto &fname : logfiles) {
73     int fd = open(fname.c_str(), O_CREAT|O_WRONLY|O_TRUNC, 0664);
74     if (fd == -1) {
75       perror("open");
76       ALWAYS_ASSERT(false);
77     }
78     fds.push_back(fd);
79   }
80   g_persist = true;
81   g_call_fsync = call_fsync;
82   g_use_compression = use_compression;
83   g_fake_writes = fake_writes;
84   g_nworkers = nworkers;
85
86   for (size_t i = 0; i < g_nmax_loggers; i++)
87     for (size_t j = 0; j < g_nworkers; j++)
88       per_thread_sync_epochs_[i].epochs_[j].store(0, memory_order_release);
89
90   vector<thread> writers;
91   vector<vector<unsigned>> assignments(assignments_given);
92
93   if (assignments.empty()) {
94     // compute assuming homogenous disks
95     if (g_nworkers <= fds.size()) {
96       // each thread gets its own logging worker
97       for (size_t i = 0; i < g_nworkers; i++)
98         assignments.push_back({(unsigned) i});
99     } else {
100       // XXX: currently we assume each logger is equally as fast- we should
101       // adjust ratios accordingly for non-homogenous loggers
102       const size_t threads_per_logger = g_nworkers / fds.size();
103       for (size_t i = 0; i < fds.size(); i++) {
104         assignments.emplace_back(
105             MakeRange<unsigned>(
106               i * threads_per_logger,
107               ((i + 1) == fds.size()) ?  g_nworkers : (i + 1) * threads_per_logger));
108       }
109     }
110   }
111
112   INVARIANT(AssignmentsValid(assignments, fds.size(), g_nworkers));
113
114   for (size_t i = 0; i < assignments.size(); i++) {
115     writers.emplace_back(
116         &txn_logger::writer,
117         i, fds[i], assignments[i]);
118     writers.back().detach();
119   }
120
121   thread persist_thread(&txn_logger::persister, assignments);
122   persist_thread.detach();
123
124   if (assignments_used)
125     *assignments_used = assignments;
126 }
127
128 void
129 txn_logger::persister(
130     vector<vector<unsigned>> assignments)
131 {
132   timer loop_timer;
133   for (;;) {
134     const uint64_t last_loop_usec = loop_timer.lap();
135     const uint64_t delay_time_usec = ticker::tick_us;
136     if (last_loop_usec < delay_time_usec) {
137       const uint64_t sleep_ns = (delay_time_usec - last_loop_usec) * 1000;
138       struct timespec t;
139       t.tv_sec  = sleep_ns / ONE_SECOND_NS;
140       t.tv_nsec = sleep_ns % ONE_SECOND_NS;
141       nanosleep(&t, nullptr);
142     }
143     advance_system_sync_epoch(assignments);
144   }
145 }
146
147 void
148 txn_logger::advance_system_sync_epoch(
149     const vector<vector<unsigned>> &assignments)
150 {
151   uint64_t min_so_far = numeric_limits<uint64_t>::max();
152   const uint64_t best_tick_ex =
153     ticker::s_instance.global_current_tick();
154   // special case 0
155   const uint64_t best_tick_inc =
156     best_tick_ex ? (best_tick_ex - 1) : 0;
157
158   for (size_t i = 0; i < assignments.size(); i++)
159     for (auto j : assignments[i])
160       for (size_t k = j; k < NMAXCORES; k += g_nworkers) {
161         persist_ctx &ctx = persist_ctx_for(k, INITMODE_NONE);
162         // we need to arbitrarily advance threads which are not "doing
163         // anything", so they don't drag down the persistence of the system. if
164         // we can see that a thread is NOT in a guarded section AND its
165         // core->logger queue is empty, then that means we can advance its sync
166         // epoch up to best_tick_inc, b/c it is guaranteed that the next time
167         // it does any actions will be in epoch > best_tick_inc
168         if (!ctx.persist_buffers_.peek()) {
169           spinlock &l = ticker::s_instance.lock_for(k);
170           if (!l.is_locked()) {
171             bool did_lock = false;
172             for (size_t c = 0; c < 3; c++) {
173               if (l.try_lock()) {
174                 did_lock = true;
175                 break;
176               }
177             }
178             if (did_lock) {
179               if (!ctx.persist_buffers_.peek()) {
180                 min_so_far = min(min_so_far, best_tick_inc);
181                 per_thread_sync_epochs_[i].epochs_[k].store(
182                     best_tick_inc, memory_order_release);
183                 l.unlock();
184                 continue;
185               }
186               l.unlock();
187             }
188           }
189         }
190         min_so_far = min(
191             per_thread_sync_epochs_[i].epochs_[k].load(
192               memory_order_acquire),
193             min_so_far);
194       }
195
196   const uint64_t syssync =
197     system_sync_epoch_->load(memory_order_acquire);
198
199   INVARIANT(min_so_far < numeric_limits<uint64_t>::max());
200   INVARIANT(syssync <= min_so_far);
201
202   // need to aggregate from [syssync + 1, min_so_far]
203   const uint64_t now_us = timer::cur_usec();
204   for (size_t i = 0; i < g_persist_stats.size(); i++) {
205     auto &ps = g_persist_stats[i];
206     for (uint64_t e = syssync + 1; e <= min_so_far; e++) {
207         auto &pes = ps.d_[e % g_max_lag_epochs];
208         const uint64_t ntxns_in_epoch = pes.ntxns_.load(memory_order_acquire);
209         const uint64_t start_us = pes.earliest_start_us_.load(memory_order_acquire);
210         INVARIANT(now_us >= start_us);
211         non_atomic_fetch_add(ps.ntxns_persisted_, ntxns_in_epoch);
212         non_atomic_fetch_add(
213             ps.latency_numer_,
214             (now_us - start_us) * ntxns_in_epoch);
215         pes.ntxns_.store(0, memory_order_release);
216         pes.earliest_start_us_.store(0, memory_order_release);
217     }
218   }
219
220   system_sync_epoch_->store(min_so_far, memory_order_release);
221 }
222
223 void
224 txn_logger::writer(
225     unsigned id, int fd,
226     vector<unsigned> assignment)
227 {
228
229   if (g_pin_loggers_to_numa_nodes) {
230     ALWAYS_ASSERT(!numa_run_on_node(id % numa_num_configured_nodes()));
231     ALWAYS_ASSERT(!sched_yield());
232   }
233
234   vector<iovec> iovs(
235       min(size_t(IOV_MAX), g_nworkers * g_perthread_buffers));
236   vector<pbuffer *> pxs;
237   timer loop_timer;
238
239   // XXX: sense is not useful for now, unless we want to
240   // fsync in the background...
241   bool sense = false; // cur is at sense, prev is at !sense
242   uint64_t epoch_prefixes[2][NMAXCORES];
243
244   NDB_MEMSET(&epoch_prefixes[0], 0, sizeof(epoch_prefixes[0]));
245   NDB_MEMSET(&epoch_prefixes[1], 0, sizeof(epoch_prefixes[1]));
246
247   // NOTE: a core id in the persistence system really represets
248   // all cores in the regular system modulo g_nworkers
249   size_t nbufswritten = 0, nbyteswritten = 0;
250   for (;;) {
251
252     const uint64_t last_loop_usec = loop_timer.lap();
253     const uint64_t delay_time_usec = ticker::tick_us;
254     // don't allow this loop to proceed less than an epoch's worth of time,
255     // so we can batch IO
256     if (last_loop_usec < delay_time_usec && nbufswritten < iovs.size()) {
257       const uint64_t sleep_ns = (delay_time_usec - last_loop_usec) * 1000;
258       struct timespec t;
259       t.tv_sec  = sleep_ns / ONE_SECOND_NS;
260       t.tv_nsec = sleep_ns % ONE_SECOND_NS;
261       nanosleep(&t, nullptr);
262     }
263
264     // we need g_persist_stats[cur_sync_epoch_ex % g_nmax_loggers]
265     // to remain untouched (until the syncer can catch up), so we
266     // cannot read any buffers with epoch >=
267     // (cur_sync_epoch_ex + g_max_lag_epochs)
268     const uint64_t cur_sync_epoch_ex =
269       system_sync_epoch_->load(memory_order_acquire) + 1;
270     nbufswritten = nbyteswritten = 0;
271     for (auto idx : assignment) {
272       INVARIANT(idx >= 0 && idx < g_nworkers);
273       for (size_t k = idx; k < NMAXCORES; k += g_nworkers) {
274         persist_ctx &ctx = persist_ctx_for(k, INITMODE_NONE);
275         ctx.persist_buffers_.peekall(pxs);
276         for (auto px : pxs) {
277           INVARIANT(px);
278           INVARIANT(!px->io_scheduled_);
279           INVARIANT(nbufswritten <= iovs.size());
280           INVARIANT(px->header()->nentries_);
281           INVARIANT(px->core_id_ == k);
282           if (nbufswritten == iovs.size()) {
283             ++g_evt_logger_writev_limit_met;
284             goto process;
285           }
286           if (transaction_proto2_static::EpochId(px->header()->last_tid_) >=
287               cur_sync_epoch_ex + g_max_lag_epochs) {
288             ++g_evt_logger_max_lag_wait;
289             break;
290           }
291           iovs[nbufswritten].iov_base = (void *) &px->buf_start_[0];
292
293 #ifdef LOGGER_UNSAFE_REDUCE_BUFFER_SIZE
294   #define PXLEN(px) (((px)->curoff_ < 4) ? (px)->curoff_ : ((px)->curoff_ / 4))
295 #else
296   #define PXLEN(px) ((px)->curoff_)
297 #endif
298
299           const size_t pxlen = PXLEN(px);
300
301           iovs[nbufswritten].iov_len = pxlen;
302           evt_avg_log_buffer_iov_len.offer(pxlen);
303           px->io_scheduled_ = true;
304           nbufswritten++;
305           nbyteswritten += pxlen;
306
307 #ifdef CHECK_INVARIANTS
308           auto last_tid_cid = transaction_proto2_static::CoreId(px->header()->last_tid_);
309           auto px_cid = px->core_id_;
310           if (last_tid_cid != px_cid) {
311             cerr << "header: " << *px->header() << endl;
312             cerr << g_proto_version_str(last_tid_cid) << endl;
313             cerr << "last_tid_cid: " << last_tid_cid << endl;
314             cerr << "px_cid: " << px_cid << endl;
315           }
316 #endif
317
318           const uint64_t px_epoch =
319             transaction_proto2_static::EpochId(px->header()->last_tid_);
320           INVARIANT(
321               transaction_proto2_static::CoreId(px->header()->last_tid_) ==
322               px->core_id_);
323           INVARIANT(epoch_prefixes[sense][k] <= px_epoch);
324           INVARIANT(px_epoch > 0);
325           epoch_prefixes[sense][k] = px_epoch - 1;
326           auto &pes = g_persist_stats[k].d_[px_epoch % g_max_lag_epochs];
327           if (!pes.ntxns_.load(memory_order_acquire))
328             pes.earliest_start_us_.store(px->earliest_start_us_, memory_order_release);
329           non_atomic_fetch_add(pes.ntxns_, px->header()->nentries_);
330           g_evt_avg_log_entry_ntxns.offer(px->header()->nentries_);
331         }
332       }
333     }
334
335   process:
336     if (!nbufswritten) {
337       // XXX: should probably sleep here
338       nop_pause();
339       continue;
340     }
341
342     const bool dosense = sense;
343
344     if (!g_fake_writes) {
345 #ifdef ENABLE_EVENT_COUNTERS
346       timer write_timer;
347 #endif
348       const ssize_t ret = writev(fd, &iovs[0], nbufswritten);
349       if (unlikely(ret == -1)) {
350         perror("writev");
351         ALWAYS_ASSERT(false);
352       }
353
354       if (g_call_fsync) {
355         const int fret = fdatasync(fd);
356         if (unlikely(fret == -1)) {
357           perror("fdatasync");
358           ALWAYS_ASSERT(false);
359         }
360       }
361
362 #ifdef ENABLE_EVENT_COUNTERS
363       {
364         g_evt_avg_logger_bytes_per_writev.offer(nbyteswritten);
365         const double bytes_per_sec =
366           double(nbyteswritten)/(write_timer.lap_ms() / 1000.0);
367         g_evt_avg_logger_bytes_per_sec.offer(bytes_per_sec);
368       }
369 #endif
370     }
371
372     // update metadata from previous write
373     //
374     // return all buffers that have been io_scheduled_ - we can do this as
375     // soon as write returns. we take care to return to the proper buffer
376     epoch_array &ea = per_thread_sync_epochs_[id];
377     for (auto idx: assignment) {
378       for (size_t k = idx; k < NMAXCORES; k += g_nworkers) {
379         const uint64_t x0 = ea.epochs_[k].load(memory_order_acquire);
380         const uint64_t x1 = epoch_prefixes[dosense][k];
381         if (x1 > x0)
382           ea.epochs_[k].store(x1, memory_order_release);
383
384         persist_ctx &ctx = persist_ctx_for(k, INITMODE_NONE);
385         pbuffer *px, *px0;
386         while ((px = ctx.persist_buffers_.peek()) && px->io_scheduled_) {
387 #ifdef LOGGER_STRIDE_OVER_BUFFER
388           {
389             const size_t pxlen = PXLEN(px);
390             const size_t stridelen = 1;
391             for (size_t p = 0; p < pxlen; p += stridelen)
392               if ((&px->buf_start_[0])[p] & 0xF)
393                 non_atomic_fetch_add(ea.dummy_work_, 1UL);
394           }
395 #endif
396           px0 = ctx.persist_buffers_.deq();
397           INVARIANT(px == px0);
398           INVARIANT(px->header()->nentries_);
399           px0->reset();
400           INVARIANT(ctx.init_);
401           INVARIANT(px0->core_id_ == k);
402           ctx.all_buffers_.enq(px0);
403         }
404       }
405     }
406
407     // bump the sense
408     sense = !sense;
409   }
410 }
411
412 tuple<uint64_t, uint64_t, double>
413 txn_logger::compute_ntxns_persisted_statistics()
414 {
415   uint64_t acc = 0, acc1 = 0, acc2 = 0;
416   uint64_t num = 0;
417   for (size_t i = 0; i < g_persist_stats.size(); i++) {
418     acc  += g_persist_stats[i].ntxns_persisted_.load(memory_order_acquire);
419     acc1 += g_persist_stats[i].ntxns_pushed_.load(memory_order_acquire);
420     acc2 += g_persist_stats[i].ntxns_committed_.load(memory_order_acquire);
421     num  += g_persist_stats[i].latency_numer_.load(memory_order_acquire);
422   }
423   INVARIANT(acc <= acc1);
424   INVARIANT(acc1 <= acc2);
425   if (acc == 0)
426     return make_tuple(0, acc1, 0.0);
427   return make_tuple(acc, acc1, double(num)/double(acc));
428 }
429
430 void
431 txn_logger::clear_ntxns_persisted_statistics()
432 {
433   for (size_t i = 0; i < g_persist_stats.size(); i++) {
434     auto &ps = g_persist_stats[i];
435     ps.ntxns_persisted_.store(0, memory_order_release);
436     ps.ntxns_pushed_.store(0, memory_order_release);
437     ps.ntxns_committed_.store(0, memory_order_release);
438     ps.latency_numer_.store(0, memory_order_release);
439     for (size_t e = 0; e < g_max_lag_epochs; e++) {
440       auto &pes = ps.d_[e];
441       pes.ntxns_.store(0, memory_order_release);
442       pes.earliest_start_us_.store(0, memory_order_release);
443     }
444   }
445 }
446
447 void
448 txn_logger::wait_for_idle_state()
449 {
450   for (size_t i = 0; i < NMAXCORES; i++) {
451     persist_ctx &ctx = persist_ctx_for(i, INITMODE_NONE);
452     if (!ctx.init_)
453       continue;
454     pbuffer *px;
455     while (!(px = ctx.all_buffers_.peek()) || px->header()->nentries_)
456       nop_pause();
457     while (ctx.persist_buffers_.peek())
458       nop_pause();
459   }
460 }
461
462 void
463 txn_logger::wait_until_current_point_persisted()
464 {
465   const uint64_t e = ticker::s_instance.global_current_tick();
466   cerr << "waiting for system_sync_epoch_="
467        << system_sync_epoch_->load(memory_order_acquire)
468        << " to be < e=" << e << endl;
469   while (system_sync_epoch_->load(memory_order_acquire) < e)
470     nop_pause();
471 }
472 /*}}}*/
473
474                 /** garbage collection subsystem **/
475
476 static event_counter evt_local_chain_cleanups("local_chain_cleanups");
477 static event_counter evt_try_delete_unlinks("try_delete_unlinks");
478 static event_avg_counter evt_avg_time_inbetween_ro_epochs_usec(
479     "avg_time_inbetween_ro_epochs_usec");
480
481 void
482 transaction_proto2_static::InitGC()
483 {
484   g_flags->g_gc_init.store(true, memory_order_release);
485 }
486
487 static void
488 sleep_ro_epoch()
489 {
490   const uint64_t sleep_ns = transaction_proto2_static::ReadOnlyEpochUsec * 1000;
491   struct timespec t;
492   t.tv_sec  = sleep_ns / ONE_SECOND_NS;
493   t.tv_nsec = sleep_ns % ONE_SECOND_NS;
494   nanosleep(&t, nullptr);
495 }
496
497 void
498 transaction_proto2_static::PurgeThreadOutstandingGCTasks()
499 {
500 #ifdef PROTO2_CAN_DISABLE_GC
501   if (!IsGCEnabled())
502     return;
503 #endif
504   INVARIANT(!rcu::s_instance.in_rcu_region());
505   threadctx &ctx = g_threadctxs.my();
506   uint64_t e;
507   if (!ctx.queue_.get_latest_epoch(e))
508     return;
509   // wait until we can clean up e
510   for (;;) {
511     const uint64_t last_tick_ex = ticker::s_instance.global_last_tick_exclusive();
512     const uint64_t ro_tick_ex = to_read_only_tick(last_tick_ex);
513     if (unlikely(!ro_tick_ex)) {
514       sleep_ro_epoch();
515       continue;
516     }
517     const uint64_t ro_tick_geq = ro_tick_ex - 1;
518     if (ro_tick_geq < e) {
519       sleep_ro_epoch();
520       continue;
521     }
522     break;
523   }
524   clean_up_to_including(ctx, e);
525   INVARIANT(ctx.queue_.empty());
526 }
527
528 //#ifdef CHECK_INVARIANTS
529 //// make sure hidden is blocked by version e, when traversing from start
530 //static bool
531 //IsBlocked(dbtuple *start, dbtuple *hidden, uint64_t e)
532 //{
533 //  dbtuple *c = start;
534 //  while (c) {
535 //    if (c == hidden)
536 //      return false;
537 //    if (c->is_not_behind(e))
538 //      // blocked
539 //      return true;
540 //    c = c->next;
541 //  }
542 //  ALWAYS_ASSERT(false); // hidden should be found on chain
543 //}
544 //#endif
545
546 void
547 transaction_proto2_static::clean_up_to_including(threadctx &ctx, uint64_t ro_tick_geq)
548 {
549   INVARIANT(!rcu::s_instance.in_rcu_region());
550   INVARIANT(ctx.last_reaped_epoch_ <= ro_tick_geq);
551   INVARIANT(ctx.scratch_.empty());
552   if (ctx.last_reaped_epoch_ == ro_tick_geq)
553     return;
554
555 #ifdef ENABLE_EVENT_COUNTERS
556   const uint64_t now = timer::cur_usec();
557   if (ctx.last_reaped_timestamp_us_ > 0) {
558     const uint64_t diff = now - ctx.last_reaped_timestamp_us_;
559     evt_avg_time_inbetween_ro_epochs_usec.offer(diff);
560   }
561   ctx.last_reaped_timestamp_us_ = now;
562 #endif
563   ctx.last_reaped_epoch_ = ro_tick_geq;
564
565 #ifdef CHECK_INVARIANTS
566   const uint64_t last_tick_ex = ticker::s_instance.global_last_tick_exclusive();
567   INVARIANT(last_tick_ex);
568   const uint64_t last_consistent_tid = ComputeReadOnlyTid(last_tick_ex - 1);
569   const uint64_t computed_last_tick_ex = ticker::s_instance.compute_global_last_tick_exclusive();
570   INVARIANT(last_tick_ex <= computed_last_tick_ex);
571   INVARIANT(to_read_only_tick(last_tick_ex) > ro_tick_geq);
572 #endif
573
574   // XXX: hacky
575   char rcu_guard[sizeof(scoped_rcu_base<false>)] = {0};
576   const size_t max_niters_with_rcu = 128;
577 #define ENTER_RCU() \
578     do { \
579       new (&rcu_guard[0]) scoped_rcu_base<false>(); \
580     } while (0)
581 #define EXIT_RCU() \
582     do { \
583       scoped_rcu_base<false> *px = (scoped_rcu_base<false> *) &rcu_guard[0]; \
584       px->~scoped_rcu_base<false>(); \
585     } while (0)
586
587   ctx.scratch_.empty_accept_from(ctx.queue_, ro_tick_geq);
588   ctx.scratch_.transfer_freelist(ctx.queue_);
589   px_queue &q = ctx.scratch_;
590   if (q.empty())
591     return;
592   bool in_rcu = false;
593   size_t niters_with_rcu = 0, n = 0;
594   for (auto it = q.begin(); it != q.end(); ++it, ++n, ++niters_with_rcu) {
595     auto &delent = *it;
596     INVARIANT(delent.tuple()->opaque.load(std::memory_order_acquire) == 1);
597     if (!delent.key_.get_flags()) {
598       // guaranteed to be gc-able now (even w/o RCU)
599 #ifdef CHECK_INVARIANTS
600       if (delent.trigger_tid_ > last_consistent_tid /*|| !IsBlocked(delent.tuple_ahead_, delent.tuple(), last_consistent_tid) */) {
601         cerr << "tuple ahead     : " << g_proto_version_str(delent.tuple_ahead_->version) << endl;
602         cerr << "tuple ahead     : " << *delent.tuple_ahead_ << endl;
603         cerr << "trigger tid     : " << g_proto_version_str(delent.trigger_tid_) << endl;
604         cerr << "tuple           : " << g_proto_version_str(delent.tuple()->version) << endl;
605         cerr << "last_consist_tid: " << g_proto_version_str(last_consistent_tid) << endl;
606         cerr << "last_tick_ex    : " << last_tick_ex << endl;
607         cerr << "ro_tick_geq     : " << ro_tick_geq << endl;
608         cerr << "rcu_block_tick  : " << it.tick() << endl;
609       }
610       INVARIANT(delent.trigger_tid_ <= last_consistent_tid);
611       delent.tuple()->opaque.store(0, std::memory_order_release);
612 #endif
613       dbtuple::release_no_rcu(delent.tuple());
614     } else {
615       INVARIANT(!delent.tuple_ahead_);
616       INVARIANT(delent.btr_);
617       // check if an element preceeds the (deleted) tuple before doing the delete
618       ::lock_guard<dbtuple> lg_tuple(delent.tuple(), false);
619 #ifdef CHECK_INVARIANTS
620       if (!delent.tuple()->is_not_behind(last_consistent_tid)) {
621         cerr << "trigger tid     : " << g_proto_version_str(delent.trigger_tid_) << endl;
622         cerr << "tuple           : " << g_proto_version_str(delent.tuple()->version) << endl;
623         cerr << "last_consist_tid: " << g_proto_version_str(last_consistent_tid) << endl;
624         cerr << "last_tick_ex    : " << last_tick_ex << endl;
625         cerr << "ro_tick_geq     : " << ro_tick_geq << endl;
626         cerr << "rcu_block_tick  : " << it.tick() << endl;
627       }
628       INVARIANT(delent.tuple()->version == delent.trigger_tid_);
629       INVARIANT(delent.tuple()->is_not_behind(last_consistent_tid));
630       INVARIANT(delent.tuple()->is_deleting());
631 #endif
632       if (unlikely(!delent.tuple()->is_latest())) {
633         // requeue it up, except this time as a regular delete
634         const uint64_t my_ro_tick = to_read_only_tick(
635             ticker::s_instance.global_current_tick());
636         ctx.queue_.enqueue(
637             delete_entry(
638               nullptr,
639               MakeTid(CoreMask, NumIdMask >> NumIdShift, (my_ro_tick + 1) * ReadOnlyEpochMultiplier - 1),
640               delent.tuple(),
641               marked_ptr<string>(),
642               nullptr),
643             my_ro_tick);
644         ++g_evt_proto_gc_delete_requeue;
645         // reclaim string ptrs
646         string *spx = delent.key_.get();
647         if (unlikely(spx))
648           ctx.pool_.emplace_back(spx);
649         continue;
650       }
651 #ifdef CHECK_INVARIANTS
652       delent.tuple()->opaque.store(0, std::memory_order_release);
653 #endif
654       // if delent.key_ is nullptr, then the key is stored in the tuple
655       // record storage location, and the size field contains the length of
656       // the key
657       //
658       // otherwise, delent.key_ is a pointer to a string containing the
659       // key
660       varkey k;
661       string *spx = delent.key_.get();
662       if (likely(!spx)) {
663         k = varkey(delent.tuple()->get_value_start(), delent.tuple()->size);
664       } else {
665         k = varkey(*spx);
666         ctx.pool_.emplace_back(spx);
667       }
668
669       if (!in_rcu) {
670         ENTER_RCU();
671         niters_with_rcu = 0;
672         in_rcu = true;
673       }
674       typename concurrent_btree::value_type removed = 0;
675       const bool did_remove = delent.btr_->remove(k, &removed);
676       ALWAYS_ASSERT(did_remove);
677       INVARIANT(removed == (typename concurrent_btree::value_type) delent.tuple());
678       delent.tuple()->clear_latest();
679       dbtuple::release(delent.tuple()); // rcu free it
680     }
681
682     if (in_rcu && niters_with_rcu >= max_niters_with_rcu) {
683       EXIT_RCU();
684       niters_with_rcu = 0;
685       in_rcu = false;
686     }
687   }
688   q.clear();
689   g_evt_avg_proto_gc_queue_len.offer(n);
690
691   if (in_rcu)
692     EXIT_RCU();
693   INVARIANT(!rcu::s_instance.in_rcu_region());
694 }
695
696 aligned_padded_elem<transaction_proto2_static::hackstruct>
697   transaction_proto2_static::g_hack;
698 aligned_padded_elem<transaction_proto2_static::flags>
699   transaction_proto2_static::g_flags;
700 percore_lazy<transaction_proto2_static::threadctx>
701   transaction_proto2_static::g_threadctxs;
702 event_counter
703   transaction_proto2_static::g_evt_worker_thread_wait_log_buffer(
704       "worker_thread_wait_log_buffer");
705 event_counter
706   transaction_proto2_static::g_evt_dbtuple_no_space_for_delkey(
707       "dbtuple_no_space_for_delkey");
708 event_counter
709   transaction_proto2_static::g_evt_proto_gc_delete_requeue(
710       "proto_gc_delete_requeue");
711 event_avg_counter
712   transaction_proto2_static::g_evt_avg_log_entry_size(
713       "avg_log_entry_size");
714 event_avg_counter
715   transaction_proto2_static::g_evt_avg_proto_gc_queue_len(
716       "avg_proto_gc_queue_len");