--- /dev/null
+#ifndef _NDB_TXN_PROTO2_IMPL_H_
+#define _NDB_TXN_PROTO2_IMPL_H_
+
+#include <iostream>
+#include <atomic>
+#include <vector>
+#include <set>
+
+#include <lz4.h>
+
+#include "txn.h"
+#include "txn_impl.h"
+#include "txn_btree.h"
+#include "macros.h"
+#include "circbuf.h"
+#include "spinbarrier.h"
+#include "record/serializer.h"
+
+// forward decl
+template <typename Traits> class transaction_proto2;
+template <template <typename> class Transaction>
+ class txn_epoch_sync;
+
+// the system has a single logging subsystem (composed of multiple lgogers)
+// NOTE: currently, the persistence epoch is tied 1:1 with the ticker's epoch
+class txn_logger {
+ friend class transaction_proto2_static;
+ template <typename T>
+ friend class transaction_proto2;
+ // XXX: should only allow txn_epoch_sync<transaction_proto2> as friend
+ template <template <typename> class T>
+ friend class txn_epoch_sync;
+public:
+
+ static const size_t g_nmax_loggers = 16;
+ static const size_t g_perthread_buffers = 256; // 256 outstanding buffers
+ static const size_t g_buffer_size = (1<<20); // in bytes
+ static const size_t g_horizon_buffer_size = 2 * (1<<16); // in bytes
+ static const size_t g_max_lag_epochs = 128; // cannot lag more than 128 epochs
+ static const bool g_pin_loggers_to_numa_nodes = false;
+
+ static inline bool
+ IsPersistenceEnabled()
+ {
+ return g_persist;
+ }
+
+ static inline bool
+ IsCompressionEnabled()
+ {
+ return g_use_compression;
+ }
+
+ // init the logging subsystem.
+ //
+ // should only be called ONCE is not thread-safe. if assignments_used is not
+ // null, then fills it with a copy of the assignment actually computed
+ static void Init(
+ size_t nworkers,
+ const std::vector<std::string> &logfiles,
+ const std::vector<std::vector<unsigned>> &assignments_given,
+ std::vector<std::vector<unsigned>> *assignments_used = nullptr,
+ bool call_fsync = true,
+ bool use_compression = false,
+ bool fake_writes = false);
+
+ struct logbuf_header {
+ uint64_t nentries_; // > 0 for all valid log buffers
+ uint64_t last_tid_; // TID of the last commit
+ } PACKED;
+
+ struct pbuffer {
+ uint64_t earliest_start_us_; // start time of the earliest txn
+ bool io_scheduled_; // has the logger scheduled IO yet?
+
+ unsigned curoff_; // current offset into buf_ for writing
+
+ const unsigned core_id_; // which core does this pbuffer belong to?
+
+ const unsigned buf_sz_;
+
+ // must be last field
+ uint8_t buf_start_[0];
+
+ // to allocate a pbuffer, use placement new:
+ // const size_t bufsz = ...;
+ // char *p = malloc(sizeof(pbuffer) + bufsz);
+ // pbuffer *pb = new (p) pbuffer(core_id, bufsz);
+ //
+ // NOTE: it is not necessary to call the destructor for pbuffer, since
+ // it only contains PODs
+ pbuffer(unsigned core_id, unsigned buf_sz)
+ : core_id_(core_id), buf_sz_(buf_sz)
+ {
+ INVARIANT(((char *)this) + sizeof(*this) == (char *) &buf_start_[0]);
+ INVARIANT(buf_sz > sizeof(logbuf_header));
+ reset();
+ }
+
+ pbuffer(const pbuffer &) = delete;
+ pbuffer &operator=(const pbuffer &) = delete;
+ pbuffer(pbuffer &&) = delete;
+
+ inline void
+ reset()
+ {
+ earliest_start_us_ = 0;
+ io_scheduled_ = false;
+ curoff_ = sizeof(logbuf_header);
+ NDB_MEMSET(&buf_start_[0], 0, buf_sz_);
+ }
+
+ inline uint8_t *
+ pointer()
+ {
+ INVARIANT(curoff_ >= sizeof(logbuf_header));
+ INVARIANT(curoff_ <= buf_sz_);
+ return &buf_start_[0] + curoff_;
+ }
+
+ inline uint8_t *
+ datastart()
+ {
+ return &buf_start_[0] + sizeof(logbuf_header);
+ }
+
+ inline size_t
+ datasize() const
+ {
+ INVARIANT(curoff_ >= sizeof(logbuf_header));
+ INVARIANT(curoff_ <= buf_sz_);
+ return curoff_ - sizeof(logbuf_header);
+ }
+
+ inline logbuf_header *
+ header()
+ {
+ return reinterpret_cast<logbuf_header *>(&buf_start_[0]);
+ }
+
+ inline const logbuf_header *
+ header() const
+ {
+ return reinterpret_cast<const logbuf_header *>(&buf_start_[0]);
+ }
+
+ inline size_t
+ space_remaining() const
+ {
+ INVARIANT(curoff_ >= sizeof(logbuf_header));
+ INVARIANT(curoff_ <= buf_sz_);
+ return buf_sz_ - curoff_;
+ }
+
+ inline bool
+ can_hold_tid(uint64_t tid) const;
+ } PACKED;
+
+ static bool
+ AssignmentsValid(const std::vector<std::vector<unsigned>> &assignments,
+ unsigned nfds,
+ unsigned nworkers)
+ {
+ // each worker must be assigned exactly once in the assignment
+ // there must be <= nfds assignments
+
+ if (assignments.size() > nfds)
+ return false;
+
+ std::set<unsigned> seen;
+ for (auto &assignment : assignments)
+ for (auto w : assignment) {
+ if (seen.count(w) || w >= nworkers)
+ return false;
+ seen.insert(w);
+ }
+
+ return seen.size() == nworkers;
+ }
+
+ typedef circbuf<pbuffer, g_perthread_buffers> pbuffer_circbuf;
+
+ static std::tuple<uint64_t, uint64_t, double>
+ compute_ntxns_persisted_statistics();
+
+ // purge counters from each thread about the number of
+ // persisted txns
+ static void
+ clear_ntxns_persisted_statistics();
+
+ // wait until the logging system appears to be idle.
+ //
+ // note that this isn't a guarantee, just a best effort attempt
+ static void
+ wait_for_idle_state();
+
+ // waits until the epoch on invocation time is persisted
+ static void
+ wait_until_current_point_persisted();
+
+private:
+
+ // data structures
+
+ struct epoch_array {
+ // don't use percore<std::atomic<uint64_t>> because we don't want padding
+ std::atomic<uint64_t> epochs_[NMAXCORES];
+ std::atomic<uint64_t> dummy_work_; // so we can do some fake work
+ CACHE_PADOUT;
+ };
+
+ struct persist_ctx {
+ bool init_;
+
+ void *lz4ctx_; // for compression
+ pbuffer *horizon_; // for compression
+
+ circbuf<pbuffer, g_perthread_buffers> all_buffers_; // logger pushes to core
+ circbuf<pbuffer, g_perthread_buffers> persist_buffers_; // core pushes to logger
+
+ persist_ctx() : init_(false), lz4ctx_(nullptr), horizon_(nullptr) {}
+ };
+
+ // context per one epoch
+ struct persist_stats {
+ // how many txns this thread has persisted in total
+ std::atomic<uint64_t> ntxns_persisted_;
+
+ // how many txns have been pushed to the logger (but not necessarily persisted)
+ std::atomic<uint64_t> ntxns_pushed_;
+
+ // committed (but not necessarily pushed, nor persisted)
+ std::atomic<uint64_t> ntxns_committed_;
+
+ // sum of all latencies (divid by ntxns_persisted_ to get avg latency in
+ // us) for *persisted* txns (is conservative)
+ std::atomic<uint64_t> latency_numer_;
+
+ // per last g_max_lag_epochs information
+ struct per_epoch_stats {
+ std::atomic<uint64_t> ntxns_;
+ std::atomic<uint64_t> earliest_start_us_;
+
+ per_epoch_stats() : ntxns_(0), earliest_start_us_(0) {}
+ } d_[g_max_lag_epochs];
+
+ persist_stats() :
+ ntxns_persisted_(0), ntxns_pushed_(0),
+ ntxns_committed_(0), latency_numer_(0) {}
+ };
+
+ // helpers
+
+ static void
+ advance_system_sync_epoch(
+ const std::vector<std::vector<unsigned>> &assignments);
+
+ // makes copy on purpose
+ static void writer(
+ unsigned id, int fd,
+ std::vector<unsigned> assignment);
+
+ static void persister(
+ std::vector<std::vector<unsigned>> assignments);
+
+ enum InitMode {
+ INITMODE_NONE, // no initialization
+ INITMODE_REG, // just use malloc() to init buffers
+ INITMODE_RCU, // try to use the RCU numa aware allocator
+ };
+
+ static inline persist_ctx &
+ persist_ctx_for(uint64_t core_id, InitMode imode)
+ {
+ INVARIANT(core_id < g_persist_ctxs.size());
+ persist_ctx &ctx = g_persist_ctxs[core_id];
+ if (unlikely(!ctx.init_ && imode != INITMODE_NONE)) {
+ size_t needed = g_perthread_buffers * (sizeof(pbuffer) + g_buffer_size);
+ if (IsCompressionEnabled())
+ needed += size_t(LZ4_create_size()) +
+ sizeof(pbuffer) + g_horizon_buffer_size;
+ char *mem =
+ (imode == INITMODE_REG) ?
+ (char *) malloc(needed) :
+ (char *) rcu::s_instance.alloc_static(needed);
+ if (IsCompressionEnabled()) {
+ ctx.lz4ctx_ = mem;
+ mem += LZ4_create_size();
+ ctx.horizon_ = new (mem) pbuffer(core_id, g_horizon_buffer_size);
+ mem += sizeof(pbuffer) + g_horizon_buffer_size;
+ }
+ for (size_t i = 0; i < g_perthread_buffers; i++) {
+ ctx.all_buffers_.enq(new (mem) pbuffer(core_id, g_buffer_size));
+ mem += sizeof(pbuffer) + g_buffer_size;
+ }
+ ctx.init_ = true;
+ }
+ return ctx;
+ }
+
+ // static state
+
+ static bool g_persist; // whether or not logging is enabled
+
+ static bool g_call_fsync; // whether or not fsync() needs to be called
+ // in order to be considered durable
+
+ static bool g_use_compression; // whether or not to compress log buffers
+
+ static bool g_fake_writes; // whether or not to fake doing writes (to measure
+ // pure overhead of disk)
+
+ static size_t g_nworkers; // assignments are computed based on g_nworkers
+ // but a logger responsible for core i is really
+ // responsible for cores i + k * g_nworkers, for k
+ // >= 0
+
+ // v = per_thread_sync_epochs_[i].epochs_[j]: logger i has persisted up
+ // through (including) all transactions <= epoch v on core j. since core =>
+ // logger mapping is static, taking:
+ // min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
+ // yields the entire system's persistent epoch
+ static epoch_array
+ per_thread_sync_epochs_[g_nmax_loggers] CACHE_ALIGNED;
+
+ // conservative estimate (<=) for:
+ // min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
+ static util::aligned_padded_elem<std::atomic<uint64_t>>
+ system_sync_epoch_ CACHE_ALIGNED;
+
+ static percore<persist_ctx> g_persist_ctxs CACHE_ALIGNED;
+
+ static percore<persist_stats> g_persist_stats CACHE_ALIGNED;
+
+ // counters
+
+ static event_counter g_evt_log_buffer_epoch_boundary;
+ static event_counter g_evt_log_buffer_out_of_space;
+ static event_counter g_evt_log_buffer_bytes_before_compress;
+ static event_counter g_evt_log_buffer_bytes_after_compress;
+ static event_counter g_evt_logger_writev_limit_met;
+ static event_counter g_evt_logger_max_lag_wait;
+ static event_avg_counter g_evt_avg_log_entry_ntxns;
+ static event_avg_counter g_evt_avg_log_buffer_compress_time_us;
+ static event_avg_counter g_evt_avg_logger_bytes_per_writev;
+ static event_avg_counter g_evt_avg_logger_bytes_per_sec;
+};
+
+static inline std::ostream &
+operator<<(std::ostream &o, txn_logger::logbuf_header &hdr)
+{
+ o << "{nentries_=" << hdr.nentries_ << ", last_tid_="
+ << g_proto_version_str(hdr.last_tid_) << "}";
+ return o;
+}
+
+class transaction_proto2_static {
+public:
+
+ // NOTE:
+ // each epoch is tied (1:1) to the ticker subsystem's tick. this is the
+ // speed of the persistence layer.
+ //
+ // however, read only txns and GC are tied to multiples of the ticker
+ // subsystem's tick
+
+#ifdef CHECK_INVARIANTS
+ static const uint64_t ReadOnlyEpochMultiplier = 10; /* 10 * 1 ms */
+#else
+ static const uint64_t ReadOnlyEpochMultiplier = 25; /* 25 * 40 ms */
+ static_assert(ticker::tick_us * ReadOnlyEpochMultiplier == 1000000, "");
+#endif
+
+ static_assert(ReadOnlyEpochMultiplier >= 1, "XX");
+
+ static const uint64_t ReadOnlyEpochUsec =
+ ticker::tick_us * ReadOnlyEpochMultiplier;
+
+ static inline uint64_t constexpr
+ to_read_only_tick(uint64_t epoch_tick)
+ {
+ return epoch_tick / ReadOnlyEpochMultiplier;
+ }
+
+ // in this protocol, the version number is:
+ // (note that for tid_t's, the top bit is reserved and
+ // *must* be set to zero
+ //
+ // [ core | number | epoch | reserved ]
+ // [ 0..9 | 9..33 | 33..63 | 63..64 ]
+
+ static inline ALWAYS_INLINE
+ uint64_t CoreId(uint64_t v)
+ {
+ return v & CoreMask;
+ }
+
+ static inline ALWAYS_INLINE
+ uint64_t NumId(uint64_t v)
+ {
+ return (v & NumIdMask) >> NumIdShift;
+ }
+
+ static inline ALWAYS_INLINE
+ uint64_t EpochId(uint64_t v)
+ {
+ return (v & EpochMask) >> EpochShift;
+ }
+
+ // XXX(stephentu): HACK
+ static void
+ wait_an_epoch()
+ {
+ INVARIANT(!rcu::s_instance.in_rcu_region());
+ const uint64_t e = to_read_only_tick(
+ ticker::s_instance.global_last_tick_exclusive());
+ if (!e) {
+ std::cerr << "wait_an_epoch(): consistent reads happening in e-1, but e=0 so special case"
+ << std::endl;
+ } else {
+ std::cerr << "wait_an_epoch(): consistent reads happening in e-1: "
+ << (e-1) << std::endl;
+ }
+ while (to_read_only_tick(ticker::s_instance.global_last_tick_exclusive()) == e)
+ nop_pause();
+ COMPILER_MEMORY_FENCE;
+ }
+
+ static uint64_t
+ ComputeReadOnlyTid(uint64_t global_tick_ex)
+ {
+ const uint64_t a = (global_tick_ex / ReadOnlyEpochMultiplier);
+ const uint64_t b = a * ReadOnlyEpochMultiplier;
+
+ // want to read entries <= b-1, special casing for b=0
+ if (!b)
+ return MakeTid(0, 0, 0);
+ else
+ return MakeTid(CoreMask, NumIdMask >> NumIdShift, b - 1);
+ }
+
+ static const uint64_t NBitsNumber = 24;
+
+ // XXX(stephentu): need to implement core ID recycling
+ static const size_t CoreBits = NMAXCOREBITS; // allow 2^CoreShift distinct threads
+ static const size_t NMaxCores = NMAXCORES;
+
+ static const uint64_t CoreMask = (NMaxCores - 1);
+
+ static const uint64_t NumIdShift = CoreBits;
+ static const uint64_t NumIdMask = ((((uint64_t)1) << NBitsNumber) - 1) << NumIdShift;
+
+ static const uint64_t EpochShift = CoreBits + NBitsNumber;
+ // since the reserve bit is always zero, we don't need a special mask
+ static const uint64_t EpochMask = ((uint64_t)-1) << EpochShift;
+
+ static inline ALWAYS_INLINE
+ uint64_t MakeTid(uint64_t core_id, uint64_t num_id, uint64_t epoch_id)
+ {
+ // some sanity checking
+ static_assert((CoreMask | NumIdMask | EpochMask) == ((uint64_t)-1), "xx");
+ static_assert((CoreMask & NumIdMask) == 0, "xx");
+ static_assert((NumIdMask & EpochMask) == 0, "xx");
+ return (core_id) | (num_id << NumIdShift) | (epoch_id << EpochShift);
+ }
+
+ static inline void
+ set_hack_status(bool hack_status)
+ {
+ g_hack->status_ = hack_status;
+ }
+
+ static inline bool
+ get_hack_status()
+ {
+ return g_hack->status_;
+ }
+
+ // thread-safe, can be called many times
+ static void InitGC();
+
+ static void PurgeThreadOutstandingGCTasks();
+
+#ifdef PROTO2_CAN_DISABLE_GC
+ static inline bool
+ IsGCEnabled()
+ {
+ return g_flags->g_gc_init.load(std::memory_order_acquire);
+ }
+#endif
+
+#ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
+ static void
+ DisableSnapshots()
+ {
+ g_flags->g_disable_snapshots.store(true, std::memory_order_release);
+ }
+ static inline bool
+ IsSnapshotsEnabled()
+ {
+ return !g_flags->g_disable_snapshots.load(std::memory_order_acquire);
+ }
+#endif
+
+protected:
+ struct delete_entry {
+#ifdef CHECK_INVARIANTS
+ dbtuple *tuple_ahead_;
+ uint64_t trigger_tid_;
+#endif
+
+ dbtuple *tuple_;
+ marked_ptr<std::string> key_;
+ concurrent_btree *btr_;
+
+ delete_entry()
+ :
+#ifdef CHECK_INVARIANTS
+ tuple_ahead_(nullptr),
+ trigger_tid_(0),
+#endif
+ tuple_(),
+ key_(),
+ btr_(nullptr) {}
+
+ delete_entry(dbtuple *tuple_ahead,
+ uint64_t trigger_tid,
+ dbtuple *tuple,
+ const marked_ptr<std::string> &key,
+ concurrent_btree *btr)
+ :
+#ifdef CHECK_INVARIANTS
+ tuple_ahead_(tuple_ahead),
+ trigger_tid_(trigger_tid),
+#endif
+ tuple_(tuple),
+ key_(key),
+ btr_(btr) {}
+
+ inline dbtuple *
+ tuple()
+ {
+ return tuple_;
+ }
+ };
+
+ typedef basic_px_queue<delete_entry, 4096> px_queue;
+
+ struct threadctx {
+ uint64_t last_commit_tid_;
+ unsigned last_reaped_epoch_;
+#ifdef ENABLE_EVENT_COUNTERS
+ uint64_t last_reaped_timestamp_us_;
+#endif
+ px_queue queue_;
+ px_queue scratch_;
+ std::deque<std::string *> pool_;
+ threadctx() :
+ last_commit_tid_(0)
+ , last_reaped_epoch_(0)
+#ifdef ENABLE_EVENT_COUNTERS
+ , last_reaped_timestamp_us_(0)
+#endif
+ {
+ ALWAYS_ASSERT(((uintptr_t)this % CACHELINE_SIZE) == 0);
+ queue_.alloc_freelist(rcu::NQueueGroups);
+ scratch_.alloc_freelist(rcu::NQueueGroups);
+ }
+ };
+
+ static void
+ clean_up_to_including(threadctx &ctx, uint64_t ro_tick_geq);
+
+ // helper methods
+ static inline txn_logger::pbuffer *
+ wait_for_head(txn_logger::pbuffer_circbuf &pull_buf)
+ {
+ // XXX(stephentu): spinning for now
+ txn_logger::pbuffer *px;
+ while (unlikely(!(px = pull_buf.peek()))) {
+ nop_pause();
+ ++g_evt_worker_thread_wait_log_buffer;
+ }
+ INVARIANT(!px->io_scheduled_);
+ return px;
+ }
+
+ // pushes horizon to the front entry of pull_buf, pushing
+ // to push_buf if necessary
+ //
+ // horizon is reset after push_horizon_to_buffer() returns
+ //
+ // returns the number of txns pushed from buffer to *logger*
+ // (if doing so was necessary)
+ static inline size_t
+ push_horizon_to_buffer(txn_logger::pbuffer *horizon,
+ void *lz4ctx,
+ txn_logger::pbuffer_circbuf &pull_buf,
+ txn_logger::pbuffer_circbuf &push_buf)
+ {
+ INVARIANT(txn_logger::IsCompressionEnabled());
+ if (unlikely(!horizon->header()->nentries_))
+ return 0;
+ INVARIANT(horizon->datasize());
+
+ size_t ntxns_pushed_to_logger = 0;
+
+ // horizon out of space- try to push horizon to buffer
+ txn_logger::pbuffer *px = wait_for_head(pull_buf);
+ const uint64_t compressed_space_needed =
+ sizeof(uint32_t) + LZ4_compressBound(horizon->datasize());
+
+ bool buffer_cond = false;
+ if (px->space_remaining() < compressed_space_needed ||
+ (buffer_cond = !px->can_hold_tid(horizon->header()->last_tid_))) {
+ // buffer out of space- push buffer to logger
+ INVARIANT(px->header()->nentries_);
+ ntxns_pushed_to_logger = px->header()->nentries_;
+ txn_logger::pbuffer *px1 = pull_buf.deq();
+ INVARIANT(px == px1);
+ push_buf.enq(px1);
+ px = wait_for_head(pull_buf);
+ if (buffer_cond)
+ ++txn_logger::g_evt_log_buffer_epoch_boundary;
+ else
+ ++txn_logger::g_evt_log_buffer_out_of_space;
+ }
+
+ INVARIANT(px->space_remaining() >= compressed_space_needed);
+ if (!px->header()->nentries_)
+ px->earliest_start_us_ = horizon->earliest_start_us_;
+ px->header()->nentries_ += horizon->header()->nentries_;
+ px->header()->last_tid_ = horizon->header()->last_tid_;
+
+#ifdef ENABLE_EVENT_COUNTERS
+ util::timer tt;
+#endif
+ const int ret = LZ4_compress_heap_limitedOutput(
+ lz4ctx,
+ (const char *) horizon->datastart(),
+ (char *) px->pointer() + sizeof(uint32_t),
+ horizon->datasize(),
+ px->space_remaining() - sizeof(uint32_t));
+#ifdef ENABLE_EVENT_COUNTERS
+ txn_logger::g_evt_avg_log_buffer_compress_time_us.offer(tt.lap());
+ txn_logger::g_evt_log_buffer_bytes_before_compress.inc(horizon->datasize());
+ txn_logger::g_evt_log_buffer_bytes_after_compress.inc(ret);
+#endif
+ INVARIANT(ret > 0);
+#if defined(CHECK_INVARIANTS) && defined(PARANOID_CHECKING)
+ {
+ uint8_t decode_buf[txn_logger::g_horizon_buffer_size];
+ const int decode_ret =
+ LZ4_decompress_safe_partial(
+ (const char *) px->pointer() + sizeof(uint32_t),
+ (char *) &decode_buf[0],
+ ret,
+ txn_logger::g_horizon_buffer_size,
+ txn_logger::g_horizon_buffer_size);
+ INVARIANT(decode_ret >= 0);
+ INVARIANT(size_t(decode_ret) == horizon->datasize());
+ INVARIANT(memcmp(horizon->datastart(),
+ &decode_buf[0], decode_ret) == 0);
+ }
+#endif
+
+ serializer<uint32_t, false> s_uint32_t;
+ s_uint32_t.write(px->pointer(), ret);
+ px->curoff_ += sizeof(uint32_t) + uint32_t(ret);
+ horizon->reset();
+
+ return ntxns_pushed_to_logger;
+ }
+
+ struct hackstruct {
+ std::atomic<bool> status_;
+ std::atomic<uint64_t> global_tid_;
+ constexpr hackstruct() : status_(false), global_tid_(0) {}
+ };
+
+ // use to simulate global TID for comparsion
+ static util::aligned_padded_elem<hackstruct>
+ g_hack CACHE_ALIGNED;
+
+ struct flags {
+ std::atomic<bool> g_gc_init;
+ std::atomic<bool> g_disable_snapshots;
+ constexpr flags() : g_gc_init(false), g_disable_snapshots(false) {}
+ };
+ static util::aligned_padded_elem<flags> g_flags;
+
+ static percore_lazy<threadctx> g_threadctxs;
+
+ static event_counter g_evt_worker_thread_wait_log_buffer;
+ static event_counter g_evt_dbtuple_no_space_for_delkey;
+ static event_counter g_evt_proto_gc_delete_requeue;
+ static event_avg_counter g_evt_avg_log_entry_size;
+ static event_avg_counter g_evt_avg_proto_gc_queue_len;
+};
+
+bool
+txn_logger::pbuffer::can_hold_tid(uint64_t tid) const
+{
+ return !header()->nentries_ ||
+ (transaction_proto2_static::EpochId(header()->last_tid_) ==
+ transaction_proto2_static::EpochId(tid));
+}
+
+// protocol 2 - no global consistent TIDs
+template <typename Traits>
+class transaction_proto2 : public transaction<transaction_proto2, Traits>,
+ private transaction_proto2_static {
+
+ friend class transaction<transaction_proto2, Traits>;
+ typedef transaction<transaction_proto2, Traits> super_type;
+
+public:
+
+ typedef Traits traits_type;
+ typedef transaction_base::tid_t tid_t;
+ typedef transaction_base::string_type string_type;
+ typedef typename super_type::dbtuple_write_info dbtuple_write_info;
+ typedef typename super_type::dbtuple_write_info_vec dbtuple_write_info_vec;
+ typedef typename super_type::read_set_map read_set_map;
+ typedef typename super_type::absent_set_map absent_set_map;
+ typedef typename super_type::write_set_map write_set_map;
+ typedef typename super_type::write_set_u32_vec write_set_u32_vec;
+
+ transaction_proto2(uint64_t flags,
+ typename Traits::StringAllocator &sa)
+ : transaction<transaction_proto2, Traits>(flags, sa)
+ {
+ if (this->get_flags() & transaction_base::TXN_FLAG_READ_ONLY) {
+ const uint64_t global_tick_ex =
+ this->rcu_guard_->guard()->impl().global_last_tick_exclusive();
+ u_.last_consistent_tid = ComputeReadOnlyTid(global_tick_ex);
+ }
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+ dbtuple::TupleLockRegionBegin();
+#endif
+ INVARIANT(rcu::s_instance.in_rcu_region());
+ }
+
+ ~transaction_proto2()
+ {
+#ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
+ dbtuple::AssertAllTupleLocksReleased();
+#endif
+ INVARIANT(rcu::s_instance.in_rcu_region());
+ }
+
+ inline bool
+ can_overwrite_record_tid(tid_t prev, tid_t cur) const
+ {
+ INVARIANT(prev <= cur);
+
+#ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
+ if (!IsSnapshotsEnabled())
+ return true;
+#endif
+
+ // XXX(stephentu): the !prev check is a *bit* of a hack-
+ // we're assuming that !prev (MIN_TID) corresponds to an
+ // absent (removed) record, so it is safe to overwrite it,
+ //
+ // This is an OK assumption with *no TID wrap around*.
+ return (to_read_only_tick(EpochId(prev)) ==
+ to_read_only_tick(EpochId(cur))) ||
+ !prev;
+ }
+
+ // can only read elements in this epoch or previous epochs
+ inline bool
+ can_read_tid(tid_t t) const
+ {
+ return true;
+ }
+
+ inline void
+ on_tid_finish(tid_t commit_tid)
+ {
+ if (!txn_logger::IsPersistenceEnabled() ||
+ this->state != transaction_base::TXN_COMMITED)
+ return;
+ // need to write into log buffer
+
+ serializer<uint32_t, true> vs_uint32_t;
+
+ // compute how much space is necessary
+ uint64_t space_needed = 0;
+
+ // 8 bytes to indicate TID
+ space_needed += sizeof(uint64_t);
+
+ // variable bytes to indicate # of records written
+#ifdef LOGGER_UNSAFE_FAKE_COMPRESSION
+ const unsigned nwrites = 0;
+#else
+ const unsigned nwrites = this->write_set.size();
+#endif
+
+ space_needed += vs_uint32_t.nbytes(&nwrites);
+
+ // each record needs to be recorded
+ write_set_u32_vec value_sizes;
+ for (unsigned idx = 0; idx < nwrites; idx++) {
+ const transaction_base::write_record_t &rec = this->write_set[idx];
+ const uint32_t k_nbytes = rec.get_key().size();
+ space_needed += vs_uint32_t.nbytes(&k_nbytes);
+ space_needed += k_nbytes;
+
+ const uint32_t v_nbytes = rec.get_value() ?
+ rec.get_writer()(
+ dbtuple::TUPLE_WRITER_COMPUTE_DELTA_NEEDED,
+ rec.get_value(), nullptr, 0) : 0;
+ space_needed += vs_uint32_t.nbytes(&v_nbytes);
+ space_needed += v_nbytes;
+
+ value_sizes.push_back(v_nbytes);
+ }
+
+ g_evt_avg_log_entry_size.offer(space_needed);
+ INVARIANT(space_needed <= txn_logger::g_horizon_buffer_size);
+ INVARIANT(space_needed <= txn_logger::g_buffer_size);
+
+ const unsigned long my_core_id = coreid::core_id();
+
+ txn_logger::persist_ctx &ctx =
+ txn_logger::persist_ctx_for(my_core_id, txn_logger::INITMODE_REG);
+ txn_logger::persist_stats &stats =
+ txn_logger::g_persist_stats[my_core_id];
+ txn_logger::pbuffer_circbuf &pull_buf = ctx.all_buffers_;
+ txn_logger::pbuffer_circbuf &push_buf = ctx.persist_buffers_;
+
+ util::non_atomic_fetch_add(stats.ntxns_committed_, 1UL);
+
+ const bool do_compress = txn_logger::IsCompressionEnabled();
+ if (do_compress) {
+ // try placing in horizon
+ bool horizon_cond = false;
+ if (ctx.horizon_->space_remaining() < space_needed ||
+ (horizon_cond = !ctx.horizon_->can_hold_tid(commit_tid))) {
+ if (!ctx.horizon_->datasize()) {
+ std::cerr << "space_needed: " << space_needed << std::endl;
+ std::cerr << "space_remaining: " << ctx.horizon_->space_remaining() << std::endl;
+ std::cerr << "can_hold_tid: " << ctx.horizon_->can_hold_tid(commit_tid) << std::endl;
+ }
+ INVARIANT(ctx.horizon_->datasize());
+ // horizon out of space, so we push it
+ const uint64_t npushed =
+ push_horizon_to_buffer(ctx.horizon_, ctx.lz4ctx_, pull_buf, push_buf);
+ if (npushed)
+ util::non_atomic_fetch_add(stats.ntxns_pushed_, npushed);
+ }
+
+ INVARIANT(ctx.horizon_->space_remaining() >= space_needed);
+ const uint64_t written =
+ write_current_txn_into_buffer(ctx.horizon_, commit_tid, value_sizes);
+ if (written != space_needed)
+ INVARIANT(false);
+
+ } else {
+
+ retry:
+ txn_logger::pbuffer *px = wait_for_head(pull_buf);
+ INVARIANT(px && px->core_id_ == my_core_id);
+ bool cond = false;
+ if (px->space_remaining() < space_needed ||
+ (cond = !px->can_hold_tid(commit_tid))) {
+ INVARIANT(px->header()->nentries_);
+ txn_logger::pbuffer *px0 = pull_buf.deq();
+ INVARIANT(px == px0);
+ INVARIANT(px0->header()->nentries_);
+ util::non_atomic_fetch_add(stats.ntxns_pushed_, px0->header()->nentries_);
+ push_buf.enq(px0);
+ if (cond)
+ ++txn_logger::g_evt_log_buffer_epoch_boundary;
+ else
+ ++txn_logger::g_evt_log_buffer_out_of_space;
+ goto retry;
+ }
+
+ const uint64_t written =
+ write_current_txn_into_buffer(px, commit_tid, value_sizes);
+ if (written != space_needed)
+ INVARIANT(false);
+ }
+ }
+
+private:
+
+ // assumes enough space in px to hold this txn
+ inline uint64_t
+ write_current_txn_into_buffer(
+ txn_logger::pbuffer *px,
+ uint64_t commit_tid,
+ const write_set_u32_vec &value_sizes)
+ {
+ INVARIANT(px->can_hold_tid(commit_tid));
+
+ if (unlikely(!px->header()->nentries_))
+ px->earliest_start_us_ = this->rcu_guard_->guard()->start_us();
+
+ uint8_t *p = px->pointer();
+ uint8_t *porig = p;
+
+ serializer<uint32_t, true> vs_uint32_t;
+ serializer<uint64_t, false> s_uint64_t;
+
+#ifdef LOGGER_UNSAFE_FAKE_COMPRESSION
+ const unsigned nwrites = 0;
+#else
+ const unsigned nwrites = this->write_set.size();
+#endif
+
+
+ INVARIANT(nwrites == value_sizes.size());
+
+ p = s_uint64_t.write(p, commit_tid);
+ p = vs_uint32_t.write(p, nwrites);
+
+ for (unsigned idx = 0; idx < nwrites; idx++) {
+ const transaction_base::write_record_t &rec = this->write_set[idx];
+ const uint32_t k_nbytes = rec.get_key().size();
+ p = vs_uint32_t.write(p, k_nbytes);
+ NDB_MEMCPY(p, rec.get_key().data(), k_nbytes);
+ p += k_nbytes;
+ const uint32_t v_nbytes = value_sizes[idx];
+ p = vs_uint32_t.write(p, v_nbytes);
+ if (v_nbytes) {
+ rec.get_writer()(dbtuple::TUPLE_WRITER_DO_DELTA_WRITE, rec.get_value(), p, v_nbytes);
+ p += v_nbytes;
+ }
+ }
+
+ px->curoff_ += (p - porig);
+ px->header()->nentries_++;
+ px->header()->last_tid_ = commit_tid;
+
+ return uint64_t(p - porig);
+ }
+
+public:
+
+ inline ALWAYS_INLINE bool is_snapshot() const {
+ return this->get_flags() & transaction_base::TXN_FLAG_READ_ONLY;
+ }
+
+ inline transaction_base::tid_t
+ snapshot_tid() const
+ {
+#ifdef PROTO2_CAN_DISABLE_SNAPSHOTS
+ if (!IsSnapshotsEnabled())
+ // when snapshots are disabled, but we have a RO txn, we simply allow
+ // it to read all the latest values and treat them as consistent
+ //
+ // it's not correct, but its for the factor analysis
+ return dbtuple::MAX_TID;
+#endif
+ return u_.last_consistent_tid;
+ }
+
+ void
+ dump_debug_info() const
+ {
+ transaction<transaction_proto2, Traits>::dump_debug_info();
+ if (this->is_snapshot())
+ std::cerr << " last_consistent_tid: "
+ << g_proto_version_str(u_.last_consistent_tid) << std::endl;
+ }
+
+ transaction_base::tid_t
+ gen_commit_tid(const dbtuple_write_info_vec &write_tuples)
+ {
+ const size_t my_core_id = this->rcu_guard_->guard()->core();
+ threadctx &ctx = g_threadctxs.get(my_core_id);
+ INVARIANT(!this->is_snapshot());
+
+ COMPILER_MEMORY_FENCE;
+ u_.commit_epoch = ticker::s_instance.global_current_tick();
+ COMPILER_MEMORY_FENCE;
+
+ tid_t ret = ctx.last_commit_tid_;
+ INVARIANT(ret == dbtuple::MIN_TID || CoreId(ret) == my_core_id);
+ if (u_.commit_epoch != EpochId(ret))
+ ret = MakeTid(0, 0, u_.commit_epoch);
+
+ // What is this? Is txn_proto1_impl used?
+ if (g_hack->status_.load(std::memory_order_acquire))
+ g_hack->global_tid_.fetch_add(1, std::memory_order_acq_rel);
+
+ // XXX(stephentu): I believe this is correct, but not 100% sure
+ //const size_t my_core_id = 0;
+ //tid_t ret = 0;
+ {
+ typename read_set_map::const_iterator it = this->read_set.begin();
+ typename read_set_map::const_iterator it_end = this->read_set.end();
+ for (; it != it_end; ++it) {
+ if (it->get_tid() > ret)
+ ret = it->get_tid();
+ }
+ }
+
+ {
+ typename dbtuple_write_info_vec::const_iterator it = write_tuples.begin();
+ typename dbtuple_write_info_vec::const_iterator it_end = write_tuples.end();
+ for (; it != it_end; ++it) {
+ INVARIANT(it->tuple->is_locked());
+ INVARIANT(it->tuple->is_lock_owner());
+ INVARIANT(it->tuple->is_write_intent());
+ INVARIANT(!it->tuple->is_modifying());
+ INVARIANT(it->tuple->is_latest());
+ if (it->is_insert())
+ // we inserted this node, so we don't want to do the checks below
+ continue;
+ const tid_t t = it->tuple->version;
+
+ // XXX(stephentu): we are overly conservative for now- technically this
+ // abort isn't necessary (we really should just write the value in the correct
+ // position)
+ //if (EpochId(t) > u_.commit_epoch) {
+ // std::cerr << "t: " << g_proto_version_str(t) << std::endl;
+ // std::cerr << "epoch: " << u_.commit_epoch << std::endl;
+ // this->dump_debug_info();
+ //}
+
+ // t == dbtuple::MAX_TID when a txn does an insert of a new tuple
+ // followed by 1+ writes to the same tuple.
+ INVARIANT(EpochId(t) <= u_.commit_epoch || t == dbtuple::MAX_TID);
+ if (t != dbtuple::MAX_TID && t > ret)
+ ret = t;
+ }
+
+ INVARIANT(EpochId(ret) == u_.commit_epoch);
+ ret = MakeTid(my_core_id, NumId(ret) + 1, u_.commit_epoch);
+ }
+
+ // XXX(stephentu): this txn hasn't actually been commited yet,
+ // and could potentially be aborted - but it's ok to increase this #, since
+ // subsequent txns on this core will read this # anyways
+ return (ctx.last_commit_tid_ = ret);
+ }
+
+ inline ALWAYS_INLINE void
+ on_dbtuple_spill(dbtuple *tuple_ahead, dbtuple *tuple)
+ {
+#ifdef PROTO2_CAN_DISABLE_GC
+ if (!IsGCEnabled())
+ return;
+#endif
+
+ INVARIANT(rcu::s_instance.in_rcu_region());
+ INVARIANT(!tuple->is_latest());
+
+ // >= not > only b/c of the special case of inserting a new tuple +
+ // overwriting the newly inserted record with a longer sequence of bytes in
+ // the *same* txn
+ INVARIANT(tuple_ahead->version >= tuple->version);
+
+ if (tuple->is_deleting()) {
+ INVARIANT(tuple->is_locked());
+ INVARIANT(tuple->is_lock_owner());
+ // already on queue
+ return;
+ }
+
+ const uint64_t ro_tick = to_read_only_tick(this->u_.commit_epoch);
+ INVARIANT(to_read_only_tick(EpochId(tuple->version)) <= ro_tick);
+
+#ifdef CHECK_INVARIANTS
+ uint64_t exp = 0;
+ INVARIANT(tuple->opaque.compare_exchange_strong(exp, 1, std::memory_order_acq_rel));
+#endif
+
+ // when all snapshots are happening >= the current epoch,
+ // then we can safely remove tuple
+ threadctx &ctx = g_threadctxs.my();
+ ctx.queue_.enqueue(
+ delete_entry(tuple_ahead, tuple_ahead->version,
+ tuple, marked_ptr<std::string>(), nullptr),
+ ro_tick);
+ }
+
+ inline ALWAYS_INLINE void
+ on_logical_delete(dbtuple *tuple, const std::string &key, concurrent_btree *btr)
+ {
+#ifdef PROTO2_CAN_DISABLE_GC
+ if (!IsGCEnabled())
+ return;
+#endif
+
+ INVARIANT(tuple->is_locked());
+ INVARIANT(tuple->is_lock_owner());
+ INVARIANT(tuple->is_write_intent());
+ INVARIANT(tuple->is_latest());
+ INVARIANT(tuple->is_deleting());
+ INVARIANT(!tuple->size);
+ INVARIANT(rcu::s_instance.in_rcu_region());
+
+ const uint64_t ro_tick = to_read_only_tick(this->u_.commit_epoch);
+ threadctx &ctx = g_threadctxs.my();
+
+#ifdef CHECK_INVARIANTS
+ uint64_t exp = 0;
+ INVARIANT(tuple->opaque.compare_exchange_strong(exp, 1, std::memory_order_acq_rel));
+#endif
+
+ if (likely(key.size() <= tuple->alloc_size)) {
+ NDB_MEMCPY(tuple->get_value_start(), key.data(), key.size());
+ tuple->size = key.size();
+
+ // eligible for deletion when all snapshots >= the current epoch
+ marked_ptr<std::string> mpx;
+ mpx.set_flags(0x1);
+
+ ctx.queue_.enqueue(
+ delete_entry(nullptr, tuple->version, tuple, mpx, btr),
+ ro_tick);
+ } else {
+ // this is a rare event
+ ++g_evt_dbtuple_no_space_for_delkey;
+ std::string *spx = nullptr;
+ if (ctx.pool_.empty()) {
+ spx = new std::string(key.data(), key.size()); // XXX: use numa memory?
+ } else {
+ spx = ctx.pool_.front();
+ ctx.pool_.pop_front();
+ spx->assign(key.data(), key.size());
+ }
+ INVARIANT(spx);
+
+ marked_ptr<std::string> mpx(spx);
+ mpx.set_flags(0x1);
+
+ ctx.queue_.enqueue(
+ delete_entry(nullptr, tuple->version, tuple, mpx, btr),
+ ro_tick);
+ }
+ }
+
+ void
+ on_post_rcu_region_completion()
+ {
+#ifdef PROTO2_CAN_DISABLE_GC
+ if (!IsGCEnabled())
+ return;
+#endif
+ const uint64_t last_tick_ex = ticker::s_instance.global_last_tick_exclusive();
+ if (unlikely(!last_tick_ex))
+ return;
+ // we subtract one from the global last tick, because of the way
+ // consistent TIDs are computed, the global_last_tick_exclusive() can
+ // increase by at most one tick during a transaction.
+ const uint64_t ro_tick_ex = to_read_only_tick(last_tick_ex - 1);
+ if (unlikely(!ro_tick_ex))
+ // won't have anything to clean
+ return;
+ // all reads happening at >= ro_tick_geq
+ const uint64_t ro_tick_geq = ro_tick_ex - 1;
+ threadctx &ctx = g_threadctxs.my();
+ clean_up_to_including(ctx, ro_tick_geq);
+ }
+
+private:
+
+ union {
+ // the global epoch this txn is running in (this # is read when it starts)
+ // -- snapshot txns only
+ uint64_t last_consistent_tid;
+ // the epoch for this txn -- committing non-snapshot txns only
+ uint64_t commit_epoch;
+ } u_;
+};
+
+// txn_btree_handler specialization
+template <>
+struct base_txn_btree_handler<transaction_proto2> {
+ static inline void
+ on_construct()
+ {
+#ifndef PROTO2_CAN_DISABLE_GC
+ transaction_proto2_static::InitGC();
+#endif
+ }
+ static const bool has_background_task = true;
+};
+
+template <>
+struct txn_epoch_sync<transaction_proto2> : public transaction_proto2_static {
+ static void
+ sync()
+ {
+ wait_an_epoch();
+ if (txn_logger::IsPersistenceEnabled())
+ txn_logger::wait_until_current_point_persisted();
+ }
+ static void
+ finish()
+ {
+ if (txn_logger::IsPersistenceEnabled())
+ txn_logger::wait_until_current_point_persisted();
+ }
+ static void
+ thread_init(bool loader)
+ {
+ if (!txn_logger::IsPersistenceEnabled())
+ return;
+ const unsigned long my_core_id = coreid::core_id();
+ // try to initialize using numa allocator
+ txn_logger::persist_ctx_for(
+ my_core_id,
+ loader ? txn_logger::INITMODE_REG : txn_logger::INITMODE_RCU);
+ }
+ static void
+ thread_end()
+ {
+ if (!txn_logger::IsPersistenceEnabled())
+ return;
+ const unsigned long my_core_id = coreid::core_id();
+ txn_logger::persist_ctx &ctx =
+ txn_logger::persist_ctx_for(my_core_id, txn_logger::INITMODE_NONE);
+ if (unlikely(!ctx.init_))
+ return;
+ txn_logger::persist_stats &stats =
+ txn_logger::g_persist_stats[my_core_id];
+ txn_logger::pbuffer_circbuf &pull_buf = ctx.all_buffers_;
+ txn_logger::pbuffer_circbuf &push_buf = ctx.persist_buffers_;
+ if (txn_logger::IsCompressionEnabled() &&
+ ctx.horizon_->header()->nentries_) {
+ INVARIANT(ctx.horizon_->datasize());
+ const uint64_t npushed =
+ push_horizon_to_buffer(ctx.horizon_, ctx.lz4ctx_, pull_buf, push_buf);
+ if (npushed)
+ util::non_atomic_fetch_add(stats.ntxns_pushed_, npushed);
+ }
+ txn_logger::pbuffer *px = pull_buf.peek();
+ if (!px || !px->header()->nentries_) {
+ //std::cerr << "core " << my_core_id
+ // << " nothing to push to logger" << std::endl;
+ return;
+ }
+ //std::cerr << "core " << my_core_id
+ // << " pushing buffer to logger" << std::endl;
+ txn_logger::pbuffer *px0 = pull_buf.deq();
+ util::non_atomic_fetch_add(stats.ntxns_pushed_, px0->header()->nentries_);
+ INVARIANT(px0 == px);
+ push_buf.enq(px0);
+ }
+ static std::tuple<uint64_t, uint64_t, double>
+ compute_ntxn_persisted()
+ {
+ if (!txn_logger::IsPersistenceEnabled())
+ return std::make_tuple(0, 0, 0.0);
+ return txn_logger::compute_ntxns_persisted_statistics();
+ }
+ static void
+ reset_ntxn_persisted()
+ {
+ if (!txn_logger::IsPersistenceEnabled())
+ return;
+ txn_logger::clear_ntxns_persisted_statistics();
+ }
+};
+
+#endif /* _NDB_TXN_PROTO2_IMPL_H_ */