modify build setup
[c11concurrency-benchmarks.git] / silo / txn_impl.h
1 #ifndef _NDB_TXN_IMPL_H_
2 #define _NDB_TXN_IMPL_H_
3
4 #include "txn.h"
5 #include "lockguard.h"
6
7 // base definitions
8
9 template <template <typename> class Protocol, typename Traits>
10 transaction<Protocol, Traits>::transaction(uint64_t flags, string_allocator_type &sa)
11   : transaction_base(flags), sa(&sa)
12 {
13   INVARIANT(rcu::s_instance.in_rcu_region());
14 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
15   concurrent_btree::NodeLockRegionBegin();
16 #endif
17 }
18
19 template <template <typename> class Protocol, typename Traits>
20 transaction<Protocol, Traits>::~transaction()
21 {
22   // transaction shouldn't fall out of scope w/o resolution
23   // resolution means TXN_EMBRYO, TXN_COMMITED, and TXN_ABRT
24   INVARIANT(state != TXN_ACTIVE);
25   INVARIANT(rcu::s_instance.in_rcu_region());
26   const unsigned cur_depth = rcu_guard_->sync()->depth();
27   rcu_guard_.destroy();
28   if (cur_depth == 1) {
29     INVARIANT(!rcu::s_instance.in_rcu_region());
30     cast()->on_post_rcu_region_completion();
31   }
32 #ifdef BTREE_LOCK_OWNERSHIP_CHECKING
33   concurrent_btree::AssertAllNodeLocksReleased();
34 #endif
35 }
36
37 template <template <typename> class Protocol, typename Traits>
38 void
39 transaction<Protocol, Traits>::clear()
40 {
41   // it's actually *more* efficient to not call clear explicitly on the
42   // read/write/absent sets, and let the destructors do the clearing- this is
43   // because the destructors can take shortcuts since it knows the obj doesn't
44   // have to end in a valid state
45 }
46
47 template <template <typename> class Protocol, typename Traits>
48 void
49 transaction<Protocol, Traits>::abort_impl(abort_reason reason)
50 {
51   abort_trap(reason);
52   switch (state) {
53   case TXN_EMBRYO:
54   case TXN_ACTIVE:
55     break;
56   case TXN_ABRT:
57     return;
58   case TXN_COMMITED:
59     throw transaction_unusable_exception();
60   }
61   state = TXN_ABRT;
62   this->reason = reason;
63
64   // on abort, we need to go over all insert nodes and
65   // release the locks
66   typename write_set_map::iterator it     = write_set.begin();
67   typename write_set_map::iterator it_end = write_set.end();
68   for (; it != it_end; ++it) {
69     dbtuple * const tuple = it->get_tuple();
70     if (it->is_insert()) {
71       INVARIANT(tuple->is_locked());
72       this->cleanup_inserted_tuple_marker(tuple, it->get_key(), it->get_btree());
73       tuple->unlock();
74     }
75   }
76
77   clear();
78 }
79
80 template <template <typename> class Protocol, typename Traits>
81 void
82 transaction<Protocol, Traits>::cleanup_inserted_tuple_marker(
83     dbtuple *marker, const std::string &key, concurrent_btree *btr)
84 {
85   // XXX: this code should really live in txn_proto2_impl.h
86   INVARIANT(marker->version == dbtuple::MAX_TID);
87   INVARIANT(marker->is_locked());
88   INVARIANT(marker->is_lock_owner());
89   typename concurrent_btree::value_type removed = 0;
90   const bool did_remove = btr->remove(varkey(key), &removed);
91   if (unlikely(!did_remove)) {
92 #ifdef CHECK_INVARIANTS
93     std::cerr << " *** could not remove key: " << util::hexify(key)  << std::endl;
94 #ifdef TUPLE_CHECK_KEY
95     std::cerr << " *** original key        : " << util::hexify(marker->key) << std::endl;
96 #endif
97 #endif
98     ALWAYS_ASSERT(false);
99   }
100   INVARIANT(removed == (typename concurrent_btree::value_type) marker);
101   INVARIANT(marker->is_latest());
102   marker->clear_latest();
103   dbtuple::release(marker); // rcu free
104 }
105
106 namespace {
107   inline const char *
108   transaction_state_to_cstr(transaction_base::txn_state state)
109   {
110     switch (state) {
111     case transaction_base::TXN_EMBRYO: return "TXN_EMBRYO";
112     case transaction_base::TXN_ACTIVE: return "TXN_ACTIVE";
113     case transaction_base::TXN_ABRT: return "TXN_ABRT";
114     case transaction_base::TXN_COMMITED: return "TXN_COMMITED";
115     }
116     ALWAYS_ASSERT(false);
117     return 0;
118   }
119
120   inline std::string
121   transaction_flags_to_str(uint64_t flags)
122   {
123     bool first = true;
124     std::ostringstream oss;
125     if (flags & transaction_base::TXN_FLAG_LOW_LEVEL_SCAN) {
126       oss << "TXN_FLAG_LOW_LEVEL_SCAN";
127       first = false;
128     }
129     if (flags & transaction_base::TXN_FLAG_READ_ONLY) {
130       if (first)
131         oss << "TXN_FLAG_READ_ONLY";
132       else
133         oss << " | TXN_FLAG_READ_ONLY";
134       first = false;
135     }
136     return oss.str();
137   }
138 }
139
140 template <template <typename> class Protocol, typename Traits>
141 void
142 transaction<Protocol, Traits>::dump_debug_info() const
143 {
144   std::cerr << "Transaction (obj=" << util::hexify(this) << ") -- state "
145        << transaction_state_to_cstr(state) << std::endl;
146   std::cerr << "  Abort Reason: " << AbortReasonStr(reason) << std::endl;
147   std::cerr << "  Flags: " << transaction_flags_to_str(flags) << std::endl;
148   std::cerr << "  Read/Write sets:" << std::endl;
149
150   std::cerr << "      === Read Set ===" << std::endl;
151   // read-set
152   for (typename read_set_map::const_iterator rs_it = read_set.begin();
153        rs_it != read_set.end(); ++rs_it)
154     std::cerr << *rs_it << std::endl;
155
156   std::cerr << "      === Write Set ===" << std::endl;
157   // write-set
158   for (typename write_set_map::const_iterator ws_it = write_set.begin();
159        ws_it != write_set.end(); ++ws_it)
160     std::cerr << *ws_it << std::endl;
161
162   std::cerr << "      === Absent Set ===" << std::endl;
163   // absent-set
164   for (typename absent_set_map::const_iterator as_it = absent_set.begin();
165        as_it != absent_set.end(); ++as_it)
166     std::cerr << "      B-tree Node " << util::hexify(as_it->first)
167               << " : " << as_it->second << std::endl;
168
169 }
170
171 template <template <typename> class Protocol, typename Traits>
172 std::map<std::string, uint64_t>
173 transaction<Protocol, Traits>::get_txn_counters() const
174 {
175   std::map<std::string, uint64_t> ret;
176
177   // max_read_set_size
178   ret["read_set_size"] = read_set.size();;
179   ret["read_set_is_large?"] = !read_set.is_small_type();
180
181   // max_absent_set_size
182   ret["absent_set_size"] = absent_set.size();
183   ret["absent_set_is_large?"] = !absent_set.is_small_type();
184
185   // max_write_set_size
186   ret["write_set_size"] = write_set.size();
187   ret["write_set_is_large?"] = !write_set.is_small_type();
188
189   return ret;
190 }
191
192 template <template <typename> class Protocol, typename Traits>
193 bool
194 transaction<Protocol, Traits>::handle_last_tuple_in_group(
195     dbtuple_write_info &last,
196     bool did_group_insert)
197 {
198   if (did_group_insert) {
199     // don't need to lock
200     if (!last.is_insert())
201       // we inserted the last run, and then we did 1+ more overwrites
202       // to it, so we do NOT need to lock the node (again), but we DO
203       // need to apply the latest write
204       last.entry->set_do_write();
205   } else {
206     dbtuple *tuple = last.get_tuple();
207     if (unlikely(tuple->version == dbtuple::MAX_TID)) {
208       // if we race to put/insert w/ another txn which has inserted a new
209       // record, we *must* abort b/c the other txn could try to put/insert
210       // into a new record which we hold the lock on, so we must abort
211       //
212       // other ideas:
213       // we could *not* abort if this txn did not insert any new records.
214       // we could also release our insert locks and try to acquire them
215       // again in sorted order
216       return false; // signal abort
217     }
218     const dbtuple::version_t v = tuple->lock(true); // lock for write
219     INVARIANT(dbtuple::IsLatest(v) == tuple->is_latest());
220     last.mark_locked();
221     if (unlikely(!dbtuple::IsLatest(v) ||
222                  !cast()->can_read_tid(tuple->version))) {
223       // XXX(stephentu): overly conservative (with the can_read_tid() check)
224       return false; // signal abort
225     }
226     last.entry->set_do_write();
227   }
228   return true;
229 }
230
231 template <template <typename> class Protocol, typename Traits>
232 bool
233 transaction<Protocol, Traits>::commit(bool doThrow)
234 {
235 #ifdef TUPLE_MAGIC
236   try {
237 #endif
238
239   PERF_DECL(
240       static std::string probe0_name(
241         std::string(__PRETTY_FUNCTION__) + std::string(":total:")));
242   ANON_REGION(probe0_name.c_str(), &transaction_base::g_txn_commit_probe0_cg);
243
244   switch (state) {
245   case TXN_EMBRYO:
246   case TXN_ACTIVE:
247     break;
248   case TXN_COMMITED:
249     return true;
250   case TXN_ABRT:
251     if (doThrow)
252       throw transaction_abort_exception(reason);
253     return false;
254   }
255
256   dbtuple_write_info_vec write_dbtuples;
257   std::pair<bool, tid_t> commit_tid(false, 0);
258
259   // copy write tuples to vector for sorting
260   if (!write_set.empty()) {
261     PERF_DECL(
262         static std::string probe1_name(
263           std::string(__PRETTY_FUNCTION__) + std::string(":lock_write_nodes:")));
264     ANON_REGION(probe1_name.c_str(), &transaction_base::g_txn_commit_probe1_cg);
265     INVARIANT(!is_snapshot());
266     typename write_set_map::iterator it     = write_set.begin();
267     typename write_set_map::iterator it_end = write_set.end();
268     for (size_t pos = 0; it != it_end; ++it, ++pos) {
269       INVARIANT(!it->is_insert() || it->get_tuple()->is_locked());
270       write_dbtuples.emplace_back(it->get_tuple(), &(*it), it->is_insert(), pos);
271     }
272   }
273
274   // read_only txns require consistent snapshots
275   INVARIANT(!is_snapshot() || read_set.empty());
276   INVARIANT(!is_snapshot() || write_set.empty());
277   INVARIANT(!is_snapshot() || absent_set.empty());
278   if (!is_snapshot()) {
279     // we don't have consistent tids, or not a read-only txn
280
281     // lock write nodes
282     if (!write_dbtuples.empty()) {
283       PERF_DECL(
284           static std::string probe2_name(
285             std::string(__PRETTY_FUNCTION__) + std::string(":lock_write_nodes:")));
286       ANON_REGION(probe2_name.c_str(), &transaction_base::g_txn_commit_probe2_cg);
287       // lock the logical nodes in sort order
288       {
289         PERF_DECL(
290             static std::string probe6_name(
291               std::string(__PRETTY_FUNCTION__) + std::string(":sort_write_nodes:")));
292         ANON_REGION(probe6_name.c_str(), &transaction_base::g_txn_commit_probe6_cg);
293         write_dbtuples.sort(); // in-place
294       }
295       typename dbtuple_write_info_vec::iterator it     = write_dbtuples.begin();
296       typename dbtuple_write_info_vec::iterator it_end = write_dbtuples.end();
297       dbtuple_write_info *last_px = nullptr;
298       bool inserted_last_run = false;
299       for (; it != it_end; last_px = &(*it), ++it) {
300         if (likely(last_px && last_px->tuple != it->tuple)) {
301           // on boundary
302           if (unlikely(!handle_last_tuple_in_group(*last_px, inserted_last_run))) {
303             abort_trap((reason = ABORT_REASON_WRITE_NODE_INTERFERENCE));
304             goto do_abort;
305           }
306           inserted_last_run = false;
307         }
308         if (it->is_insert()) {
309           INVARIANT(!last_px || last_px->tuple != it->tuple);
310           INVARIANT(it->is_locked());
311           INVARIANT(it->get_tuple()->is_locked());
312           INVARIANT(it->get_tuple()->is_lock_owner());
313           it->entry->set_do_write(); // all inserts are marked do-write
314           inserted_last_run = true;
315         } else {
316           INVARIANT(!it->is_locked());
317         }
318       }
319       if (likely(last_px) &&
320           unlikely(!handle_last_tuple_in_group(*last_px, inserted_last_run))) {
321         abort_trap((reason = ABORT_REASON_WRITE_NODE_INTERFERENCE));
322         goto do_abort;
323       }
324       commit_tid.first = true;
325       PERF_DECL(
326           static std::string probe5_name(
327             std::string(__PRETTY_FUNCTION__) + std::string(":gen_commit_tid:")));
328       ANON_REGION(probe5_name.c_str(), &transaction_base::g_txn_commit_probe5_cg);
329       commit_tid.second = cast()->gen_commit_tid(write_dbtuples);
330       VERBOSE(std::cerr << "commit tid: " << g_proto_version_str(commit_tid.second) << std::endl);
331     } else {
332       VERBOSE(std::cerr << "commit tid: <read-only>" << std::endl);
333     }
334
335     // do read validation
336     {
337       PERF_DECL(
338           static std::string probe3_name(
339             std::string(__PRETTY_FUNCTION__) + std::string(":read_validation:")));
340       ANON_REGION(probe3_name.c_str(), &transaction_base::g_txn_commit_probe3_cg);
341
342       // check the nodes we actually read are still the latest version
343       if (!read_set.empty()) {
344         typename read_set_map::iterator it     = read_set.begin();
345         typename read_set_map::iterator it_end = read_set.end();
346         for (; it != it_end; ++it) {
347           VERBOSE(std::cerr << "validating dbtuple " << util::hexify(it->get_tuple())
348                             << " at snapshot_tid "
349                             << g_proto_version_str(cast()->snapshot_tid())
350                             << std::endl);
351           const bool found = sorted_dbtuples_contains(
352               write_dbtuples, it->get_tuple());
353           if (likely(found ?
354                 it->get_tuple()->is_latest_version(it->get_tid()) :
355                 it->get_tuple()->stable_is_latest_version(it->get_tid())))
356             continue;
357
358           VERBOSE(std::cerr << "validating dbtuple " << util::hexify(it->get_tuple()) << " at snapshot_tid "
359                             << g_proto_version_str(cast()->snapshot_tid()) << " FAILED" << std::endl
360                             << "  txn read version: " << g_proto_version_str(it->get_tid()) << std::endl
361                             << "  tuple=" << *it->get_tuple() << std::endl);
362
363           //std::cerr << "failed tuple: " << *it->get_tuple() << std::endl;
364
365           abort_trap((reason = ABORT_REASON_READ_NODE_INTEREFERENCE));
366           goto do_abort;
367         }
368       }
369
370       // check btree versions have not changed
371       if (!absent_set.empty()) {
372         typename absent_set_map::iterator it     = absent_set.begin();
373         typename absent_set_map::iterator it_end = absent_set.end();
374         for (; it != it_end; ++it) {
375           const uint64_t v = concurrent_btree::ExtractVersionNumber(it->first);
376           if (unlikely(v != it->second.version)) {
377             VERBOSE(std::cerr << "expected node " << util::hexify(it->first) << " at v="
378                               << it->second.version << ", got v=" << v << std::endl);
379             abort_trap((reason = ABORT_REASON_NODE_SCAN_READ_VERSION_CHANGED));
380             goto do_abort;
381           }
382         }
383       }
384     }
385
386     // commit actual records
387     if (!write_dbtuples.empty()) {
388       PERF_DECL(
389           static std::string probe4_name(
390             std::string(__PRETTY_FUNCTION__) + std::string(":write_records:")));
391       ANON_REGION(probe4_name.c_str(), &transaction_base::g_txn_commit_probe4_cg);
392       typename write_set_map::iterator it     = write_set.begin();
393       typename write_set_map::iterator it_end = write_set.end();
394       for (; it != it_end; ++it) {
395         if (unlikely(!it->do_write()))
396           continue;
397         dbtuple * const tuple = it->get_tuple();
398         INVARIANT(tuple->is_locked());
399         VERBOSE(std::cerr << "writing dbtuple " << util::hexify(tuple)
400                           << " at commit_tid " << g_proto_version_str(commit_tid.second)
401                           << std::endl);
402         if (it->is_insert()) {
403           INVARIANT(tuple->version == dbtuple::MAX_TID);
404           tuple->version = commit_tid.second; // allows write_record_ret() to succeed
405                                               // w/o creating a new chain
406         } else {
407           tuple->prefetch();
408           const dbtuple::write_record_ret ret =
409             tuple->write_record_at(
410                 cast(), commit_tid.second,
411                 it->get_value(), it->get_writer());
412           bool unlock_head = false;
413           if (unlikely(ret.head_ != tuple)) {
414             // tuple was replaced by ret.head_
415             INVARIANT(ret.rest_ == tuple);
416             // XXX: write_record_at() should acquire this lock
417             ret.head_->lock(true);
418             unlock_head = true;
419             // need to unlink tuple from underlying btree, replacing
420             // with ret.rest_ (atomically)
421             typename concurrent_btree::value_type old_v = 0;
422             if (it->get_btree()->insert(
423                   varkey(it->get_key()), (typename concurrent_btree::value_type) ret.head_, &old_v, NULL))
424               // should already exist in tree
425               INVARIANT(false);
426             INVARIANT(old_v == (typename concurrent_btree::value_type) tuple);
427             // we don't RCU free this, because it is now part of the chain
428             // (the cleaners will take care of this)
429             ++evt_dbtuple_latest_replacement;
430           }
431           if (unlikely(ret.rest_))
432             // spill happened: schedule GC task
433             cast()->on_dbtuple_spill(ret.head_, ret.rest_);
434           if (!it->get_value())
435             // logical delete happened: schedule GC task
436             cast()->on_logical_delete(ret.head_, it->get_key(), it->get_btree());
437           if (unlikely(unlock_head))
438             ret.head_->unlock();
439         }
440         VERBOSE(std::cerr << "dbtuple " << util::hexify(tuple) << " is_locked? " << tuple->is_locked() << std::endl);
441       }
442       // unlock
443       // NB: we can no longer un-lock after doing the writes above
444       for (typename dbtuple_write_info_vec::iterator it = write_dbtuples.begin();
445            it != write_dbtuples.end(); ++it) {
446         if (it->is_locked())
447           it->tuple->unlock();
448         else
449           INVARIANT(!it->is_insert());
450       }
451     }
452   }
453   state = TXN_COMMITED;
454   if (commit_tid.first)
455     cast()->on_tid_finish(commit_tid.second);
456   clear();
457   return true;
458
459 do_abort:
460   // XXX: these values are possibly un-initialized
461   if (this->is_snapshot())
462     VERBOSE(std::cerr << "aborting txn @ snapshot_tid " << cast()->snapshot_tid() << std::endl);
463   else
464     VERBOSE(std::cerr << "aborting txn" << std::endl);
465
466   for (typename dbtuple_write_info_vec::iterator it = write_dbtuples.begin();
467        it != write_dbtuples.end(); ++it) {
468     if (it->is_locked()) {
469       if (it->is_insert()) {
470         INVARIANT(it->entry);
471         this->cleanup_inserted_tuple_marker(
472             it->tuple.get(), it->entry->get_key(), it->entry->get_btree());
473       }
474       // XXX: potential optimization: on unlock() for abort, we don't
475       // technically need to change the version number
476       it->tuple->unlock();
477     } else {
478       INVARIANT(!it->is_insert());
479     }
480   }
481
482   state = TXN_ABRT;
483   if (commit_tid.first)
484     cast()->on_tid_finish(commit_tid.second);
485   clear();
486   if (doThrow)
487     throw transaction_abort_exception(reason);
488   return false;
489
490 #ifdef TUPLE_MAGIC
491   } catch (dbtuple::magic_failed_exception &) {
492     dump_debug_info();
493     ALWAYS_ASSERT(false);
494   }
495 #endif
496 }
497
498 template <template <typename> class Protocol, typename Traits>
499 std::pair< dbtuple *, bool >
500 transaction<Protocol, Traits>::try_insert_new_tuple(
501     concurrent_btree &btr,
502     const std::string *key,
503     const void *value,
504     dbtuple::tuple_writer_t writer)
505 {
506   INVARIANT(key);
507   const size_t sz =
508     value ? writer(dbtuple::TUPLE_WRITER_COMPUTE_NEEDED,
509       value, nullptr, 0) : 0;
510
511   // perf: ~900 tsc/alloc on istc11.csail.mit.edu
512   dbtuple * const tuple = dbtuple::alloc_first(sz, true);
513   if (value)
514     writer(dbtuple::TUPLE_WRITER_DO_WRITE,
515         value, tuple->get_value_start(), 0);
516   INVARIANT(find_read_set(tuple) == read_set.end());
517   INVARIANT(tuple->is_latest());
518   INVARIANT(tuple->version == dbtuple::MAX_TID);
519   INVARIANT(tuple->is_locked());
520   INVARIANT(tuple->is_write_intent());
521 #ifdef TUPLE_CHECK_KEY
522   tuple->key.assign(key->data(), key->size());
523   tuple->tree = (void *) &btr;
524 #endif
525
526   // XXX: underlying btree api should return the existing value if insert
527   // fails- this would allow us to avoid having to do another search
528   typename concurrent_btree::insert_info_t insert_info;
529   if (unlikely(!btr.insert_if_absent(
530           varkey(*key), (typename concurrent_btree::value_type) tuple, &insert_info))) {
531     VERBOSE(std::cerr << "insert_if_absent failed for key: " << util::hexify(key) << std::endl);
532     tuple->clear_latest();
533     tuple->unlock();
534     dbtuple::release_no_rcu(tuple);
535     ++transaction_base::g_evt_dbtuple_write_insert_failed;
536     return std::pair< dbtuple *, bool >(nullptr, false);
537   }
538   VERBOSE(std::cerr << "insert_if_absent suceeded for key: " << util::hexify(key) << std::endl
539                     << "  new dbtuple is " << util::hexify(tuple) << std::endl);
540   // update write_set
541   // too expensive to be practical
542   // INVARIANT(find_write_set(tuple) == write_set.end());
543   write_set.emplace_back(tuple, key, value, writer, &btr, true);
544
545   // update node #s
546   INVARIANT(insert_info.node);
547   if (!absent_set.empty()) {
548     auto it = absent_set.find(insert_info.node);
549     if (it != absent_set.end()) {
550       if (unlikely(it->second.version != insert_info.old_version)) {
551         abort_trap((reason = ABORT_REASON_WRITE_NODE_INTERFERENCE));
552         return std::make_pair(tuple, true);
553       }
554       VERBOSE(std::cerr << "bump node=" << util::hexify(it->first) << " from v=" << insert_info.old_version
555                         << " -> v=" << insert_info.new_version << std::endl);
556       // otherwise, bump the version
557       it->second.version = insert_info.new_version;
558       SINGLE_THREADED_INVARIANT(concurrent_btree::ExtractVersionNumber(it->first) == it->second);
559     }
560   }
561   return std::make_pair(tuple, false);
562 }
563
564 template <template <typename> class Protocol, typename Traits>
565 template <typename ValueReader>
566 bool
567 transaction<Protocol, Traits>::do_tuple_read(
568     const dbtuple *tuple, ValueReader &value_reader)
569 {
570   INVARIANT(tuple);
571   ++evt_local_search_lookups;
572
573   const bool is_snapshot_txn = is_snapshot();
574   const transaction_base::tid_t snapshot_tid = is_snapshot_txn ?
575     cast()->snapshot_tid() : static_cast<transaction_base::tid_t>(dbtuple::MAX_TID);
576   transaction_base::tid_t start_t = 0;
577
578   if (Traits::read_own_writes) {
579     // this is why read_own_writes is not performant, because we have
580     // to do linear scan
581     auto write_set_it = find_write_set(const_cast<dbtuple *>(tuple));
582     if (unlikely(write_set_it != write_set.end())) {
583       ++evt_local_search_write_set_hits;
584       if (!write_set_it->get_value())
585         return false;
586       const typename ValueReader::value_type * const px =
587         reinterpret_cast<const typename ValueReader::value_type *>(
588             write_set_it->get_value());
589       value_reader.dup(*px, this->string_allocator());
590       return true;
591     }
592   }
593
594   // do the actual tuple read
595   dbtuple::ReadStatus stat;
596   {
597     PERF_DECL(static std::string probe0_name(std::string(__PRETTY_FUNCTION__) + std::string(":do_read:")));
598     ANON_REGION(probe0_name.c_str(), &private_::txn_btree_search_probe0_cg);
599     tuple->prefetch();
600     stat = tuple->stable_read(snapshot_tid, start_t, value_reader, this->string_allocator(), is_snapshot_txn);
601     if (unlikely(stat == dbtuple::READ_FAILED)) {
602       const transaction_base::abort_reason r = transaction_base::ABORT_REASON_UNSTABLE_READ;
603       abort_impl(r);
604       throw transaction_abort_exception(r);
605     }
606   }
607   if (unlikely(!cast()->can_read_tid(start_t))) {
608     const transaction_base::abort_reason r = transaction_base::ABORT_REASON_FUTURE_TID_READ;
609     abort_impl(r);
610     throw transaction_abort_exception(r);
611   }
612   INVARIANT(stat == dbtuple::READ_EMPTY ||
613             stat == dbtuple::READ_RECORD);
614   const bool v_empty = (stat == dbtuple::READ_EMPTY);
615   if (v_empty)
616     ++transaction_base::g_evt_read_logical_deleted_node_search;
617   if (!is_snapshot_txn)
618     // read-only txns do not need read-set tracking
619     // (b/c we know the values are consistent)
620     read_set.emplace_back(tuple, start_t);
621   return !v_empty;
622 }
623
624 template <template <typename> class Protocol, typename Traits>
625 void
626 transaction<Protocol, Traits>::do_node_read(
627     const typename concurrent_btree::node_opaque_t *n, uint64_t v)
628 {
629   INVARIANT(n);
630   if (is_snapshot())
631     return;
632   auto it = absent_set.find(n);
633   if (it == absent_set.end()) {
634     absent_set[n].version = v;
635   } else if (it->second.version != v) {
636     const transaction_base::abort_reason r =
637       transaction_base::ABORT_REASON_NODE_SCAN_READ_VERSION_CHANGED;
638     abort_impl(r);
639     throw transaction_abort_exception(r);
640   }
641 }
642
643 #endif /* _NDB_TXN_IMPL_H_ */