benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / kvthread.hh
diff --git a/silo/masstree/kvthread.hh b/silo/masstree/kvthread.hh
new file mode 100644 (file)
index 0000000..63657a3
--- /dev/null
@@ -0,0 +1,477 @@
+/* Masstree
+ * Eddie Kohler, Yandong Mao, Robert Morris
+ * Copyright (c) 2012-2014 President and Fellows of Harvard College
+ * Copyright (c) 2012-2014 Massachusetts Institute of Technology
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, subject to the conditions
+ * listed in the Masstree LICENSE file. These conditions include: you must
+ * preserve this copyright notice, and you cannot mention the copyright
+ * holders in advertising related to the Software without their permission.
+ * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
+ * notice is a summary of the Masstree LICENSE file; the license in that file
+ * is legally binding.
+ */
+#ifndef KVTHREAD_HH
+#define KVTHREAD_HH 1
+#include "mtcounters.hh"
+#include "compiler.hh"
+#include "circular_int.hh"
+#include "timestamp.hh"
+#include <assert.h>
+#include <pthread.h>
+#include <sys/mman.h>
+#include <stdlib.h>
+
+class threadinfo;
+class loginfo;
+
+extern volatile uint64_t globalepoch;    // global epoch, updated regularly
+extern volatile bool recovering;
+
+struct memdebug {
+#if HAVE_MEMDEBUG
+    enum {
+        magic_value = 389612313 /* = 0x17390319 */,
+        magic_free_value = 2015593488 /* = 0x78238410 */
+    };
+    int magic;
+    int freetype;
+    size_t size;
+    int after_rcu;
+    int line;
+    const char* file;
+
+    static void* make(void* p, size_t size, int freetype) {
+        if (p) {
+            memdebug *m = reinterpret_cast<memdebug *>(p);
+            m->magic = magic_value;
+            m->freetype = freetype;
+            m->size = size;
+            m->after_rcu = 0;
+            m->line = 0;
+            m->file = 0;
+            return m + 1;
+        } else
+            return p;
+    }
+    static void set_landmark(void* p, const char* file, int line) {
+        if (p) {
+            memdebug* m = reinterpret_cast<memdebug*>(p) - 1;
+            m->file = file;
+            m->line = line;
+        }
+    }
+    static void *check_free(void *p, size_t size, int freetype) {
+        memdebug *m = reinterpret_cast<memdebug *>(p) - 1;
+        free_checks(m, size, freetype, false, "deallocate");
+        m->magic = magic_free_value;
+        return m;
+    }
+    static void check_rcu(void *p, size_t size, int freetype) {
+        memdebug *m = reinterpret_cast<memdebug *>(p) - 1;
+        free_checks(m, size, freetype, false, "deallocate_rcu");
+        m->after_rcu = 1;
+    }
+    static void *check_free_after_rcu(void *p, int freetype) {
+        memdebug *m = reinterpret_cast<memdebug *>(p) - 1;
+        free_checks(m, 0, freetype, true, "free_after_rcu");
+        m->magic = magic_free_value;
+        return m;
+    }
+    static bool check_use(const void *p, int type) {
+        const memdebug *m = reinterpret_cast<const memdebug *>(p) - 1;
+        return m->magic == magic_value && (type == 0 || (m->freetype >> 8) == type);
+    }
+    static bool check_use(const void *p, int type1, int type2) {
+        const memdebug *m = reinterpret_cast<const memdebug *>(p) - 1;
+        return m->magic == magic_value
+            && ((m->freetype >> 8) == type1 || (m->freetype >> 8) == type2);
+    }
+    static void assert_use(const void *p, memtag tag) {
+        if (!check_use(p, tag))
+            hard_assert_use(p, tag, (memtag) -1);
+    }
+    static void assert_use(const void *p, memtag tag1, memtag tag2) {
+        if (!check_use(p, tag1, tag2))
+            hard_assert_use(p, tag1, tag2);
+    }
+  private:
+    static void free_checks(const memdebug *m, size_t size, int freetype,
+                            int after_rcu, const char *op) {
+        if (m->magic != magic_value
+            || m->freetype != freetype
+            || (!after_rcu && m->size != size)
+            || m->after_rcu != after_rcu)
+            hard_free_checks(m, freetype, size, after_rcu, op);
+    }
+    void landmark(char* buf, size_t size) const;
+    static void hard_free_checks(const memdebug* m, size_t size, int freetype,
+                                 int after_rcu, const char* op);
+    static void hard_assert_use(const void* ptr, memtag tag1, memtag tag2);
+#else
+    static void *make(void *p, size_t, int) {
+        return p;
+    }
+    static void set_landmark(void*, const char*, int) {
+    }
+    static void *check_free(void *p, size_t, int) {
+        return p;
+    }
+    static void check_rcu(void *, size_t, int) {
+    }
+    static void *check_free_after_rcu(void *p, int) {
+        return p;
+    }
+    static bool check_use(void *, memtag) {
+        return true;
+    }
+    static bool check_use(void *, memtag, memtag) {
+        return true;
+    }
+    static void assert_use(void *, memtag) {
+    }
+    static void assert_use(void *, memtag, memtag) {
+    }
+#endif
+};
+
+enum {
+#if HAVE_MEMDEBUG
+    memdebug_size = sizeof(memdebug)
+#else
+    memdebug_size = 0
+#endif
+};
+
+struct limbo_element {
+    void *ptr_;
+    int freetype_;
+    uint64_t epoch_;
+};
+
+struct limbo_group {
+    enum { capacity = (4076 - sizeof(limbo_group *)) / sizeof(limbo_element) };
+    int head_;
+    int tail_;
+    limbo_element e_[capacity];
+    limbo_group *next_;
+    limbo_group()
+        : head_(0), tail_(0), next_() {
+    }
+    void push_back(void *ptr, int freetype, uint64_t epoch) {
+        assert(tail_ < capacity);
+        e_[tail_].ptr_ = ptr;
+        e_[tail_].freetype_ = freetype;
+        e_[tail_].epoch_ = epoch;
+        ++tail_;
+    }
+};
+
+template <int N> struct has_threadcounter {
+    static bool test(threadcounter ci) {
+        return unsigned(ci) < unsigned(N);
+    }
+};
+template <> struct has_threadcounter<0> {
+    static bool test(threadcounter) {
+        return false;
+    }
+};
+
+struct rcu_callback {
+    virtual ~rcu_callback() {
+    }
+    virtual void operator()(threadinfo& ti) = 0;
+};
+
+class threadinfo {
+  public:
+    enum {
+        TI_MAIN, TI_PROCESS, TI_LOG, TI_CHECKPOINT
+    };
+
+    static threadinfo *make(int purpose, int index);
+    // XXX destructor
+    static pthread_key_t key;
+
+    // thread information
+    int purpose() const {
+        return purpose_;
+    }
+    int index() const {
+        return index_;
+    }
+    loginfo* logger() const {
+        return logger_;
+    }
+    void set_logger(loginfo* logger) {
+        assert(!logger_ && logger);
+        logger_ = logger;
+    }
+    static threadinfo *allthreads;
+    threadinfo* next() const {
+        return next_;
+    }
+
+    // timestamps
+    kvtimestamp_t operation_timestamp() const {
+        return timestamp();
+    }
+    kvtimestamp_t update_timestamp() const {
+        return ts_;
+    }
+    kvtimestamp_t update_timestamp(kvtimestamp_t x) const {
+        if (circular_int<kvtimestamp_t>::less_equal(ts_, x))
+            // x might be a marker timestamp; ensure result is not
+            ts_ = (x | 1) + 1;
+        return ts_;
+    }
+    kvtimestamp_t update_timestamp(kvtimestamp_t x, kvtimestamp_t y) const {
+        if (circular_int<kvtimestamp_t>::less(x, y))
+            x = y;
+        if (circular_int<kvtimestamp_t>::less_equal(ts_, x))
+            // x might be a marker timestamp; ensure result is not
+            ts_ = (x | 1) + 1;
+        return ts_;
+    }
+    void increment_timestamp() {
+        ts_ += 2;
+    }
+    void advance_timestamp(kvtimestamp_t x) {
+        if (circular_int<kvtimestamp_t>::less(ts_, x))
+            ts_ = x;
+    }
+
+    // event counters
+    void mark(threadcounter ci) {
+        if (has_threadcounter<int(ncounters)>::test(ci))
+            ++counters_[ci];
+    }
+    void mark(threadcounter ci, int64_t delta) {
+        if (has_threadcounter<int(ncounters)>::test(ci))
+            counters_[ci] += delta;
+    }
+    bool has_counter(threadcounter ci) const {
+        return has_threadcounter<int(ncounters)>::test(ci);
+    }
+    uint64_t counter(threadcounter ci) const {
+        return has_threadcounter<int(ncounters)>::test(ci) ? counters_[ci] : 0;
+    }
+
+    struct accounting_relax_fence_function {
+        threadinfo *ti_;
+        threadcounter ci_;
+        accounting_relax_fence_function(threadinfo *ti, threadcounter ci)
+            : ti_(ti), ci_(ci) {
+        }
+        void operator()() {
+            relax_fence();
+            ti_->mark(ci_);
+        }
+    };
+    /** @brief Return a function object that calls mark(ci); relax_fence().
+     *
+     * This function object can be used to count the number of relax_fence()s
+     * executed. */
+    accounting_relax_fence_function accounting_relax_fence(threadcounter ci) {
+        return accounting_relax_fence_function(this, ci);
+    }
+
+    struct stable_accounting_relax_fence_function {
+        threadinfo *ti_;
+        stable_accounting_relax_fence_function(threadinfo *ti)
+            : ti_(ti) {
+        }
+        template <typename V>
+        void operator()(V v) {
+            relax_fence();
+            ti_->mark(threadcounter(tc_stable + (v.isleaf() << 1) + v.splitting()));
+        }
+    };
+    /** @brief Return a function object that calls mark(ci); relax_fence().
+     *
+     * This function object can be used to count the number of relax_fence()s
+     * executed. */
+    stable_accounting_relax_fence_function stable_fence() {
+        return stable_accounting_relax_fence_function(this);
+    }
+
+    accounting_relax_fence_function lock_fence(threadcounter ci) {
+        return accounting_relax_fence_function(this, ci);
+    }
+
+    // memory allocation
+    void* allocate(size_t sz, memtag tag) {
+        void *p = malloc(sz + memdebug_size);
+        p = memdebug::make(p, sz, tag << 8);
+        if (p)
+            mark(threadcounter(tc_alloc + (tag > memtag_value)), sz);
+        return p;
+    }
+    void deallocate(void* p, size_t sz, memtag tag) {
+        // in C++ allocators, 'p' must be nonnull
+        assert(p);
+        p = memdebug::check_free(p, sz, tag << 8);
+        free(p);
+        mark(threadcounter(tc_alloc + (tag > memtag_value)), -sz);
+    }
+    void deallocate_rcu(void *p, size_t sz, memtag tag) {
+        assert(p);
+        memdebug::check_rcu(p, sz, tag << 8);
+        record_rcu(p, tag << 8);
+        mark(threadcounter(tc_alloc + (tag > memtag_value)), -sz);
+    }
+
+    void* pool_allocate(size_t sz, memtag tag) {
+        int nl = (sz + memdebug_size + CACHE_LINE_SIZE - 1) / CACHE_LINE_SIZE;
+        assert(nl <= pool_max_nlines);
+        if (unlikely(!pool_[nl - 1]))
+            refill_pool(nl);
+        void *p = pool_[nl - 1];
+        if (p) {
+            pool_[nl - 1] = *reinterpret_cast<void **>(p);
+            p = memdebug::make(p, sz, (tag << 8) + nl);
+            mark(threadcounter(tc_alloc + (tag > memtag_value)),
+                 nl * CACHE_LINE_SIZE);
+        }
+        return p;
+    }
+    void pool_deallocate(void* p, size_t sz, memtag tag) {
+        int nl = (sz + memdebug_size + CACHE_LINE_SIZE - 1) / CACHE_LINE_SIZE;
+        assert(p && nl <= pool_max_nlines);
+        p = memdebug::check_free(p, sz, (tag << 8) + nl);
+        if (use_pool()) {
+            *reinterpret_cast<void **>(p) = pool_[nl - 1];
+            pool_[nl - 1] = p;
+        } else
+            free(p);
+        mark(threadcounter(tc_alloc + (tag > memtag_value)),
+             -nl * CACHE_LINE_SIZE);
+    }
+    void pool_deallocate_rcu(void* p, size_t sz, memtag tag) {
+        int nl = (sz + memdebug_size + CACHE_LINE_SIZE - 1) / CACHE_LINE_SIZE;
+        assert(p && nl <= pool_max_nlines);
+        memdebug::check_rcu(p, sz, (tag << 8) + nl);
+        record_rcu(p, (tag << 8) + nl);
+        mark(threadcounter(tc_alloc + (tag > memtag_value)),
+             -nl * CACHE_LINE_SIZE);
+    }
+
+    // RCU
+    void rcu_start() {
+        if (gc_epoch_ != globalepoch)
+            gc_epoch_ = globalepoch;
+    }
+    void rcu_stop() {
+        if (limbo_epoch_ && (gc_epoch_ - limbo_epoch_) > 1)
+            hard_rcu_quiesce();
+        gc_epoch_ = 0;
+    }
+    void rcu_quiesce() {
+        rcu_start();
+        if (limbo_epoch_ && (gc_epoch_ - limbo_epoch_) > 2)
+            hard_rcu_quiesce();
+    }
+    typedef ::rcu_callback rcu_callback;
+    void rcu_register(rcu_callback* cb) {
+        record_rcu(cb, -1);
+    }
+
+    // thread management
+    void run();
+    int run(void* (*thread_func)(threadinfo*), void* thread_data = 0);
+    pthread_t threadid() const {
+        return threadid_;
+    }
+    void* thread_data() const {
+        return thread_data_;
+    }
+
+    static threadinfo *current() {
+        return (threadinfo *) pthread_getspecific(key);
+    }
+
+    void report_rcu(void *ptr) const;
+    static void report_rcu_all(void *ptr);
+
+  private:
+    union {
+        struct {
+            uint64_t gc_epoch_;
+            uint64_t limbo_epoch_;
+            loginfo *logger_;
+
+            threadinfo *next_;
+            int purpose_;
+            int index_;         // the index of a udp, logging, tcp,
+                                // checkpoint or recover thread
+
+            pthread_t threadid_;
+        };
+        char padding1[CACHE_LINE_SIZE];
+    };
+
+  private:
+    enum { pool_max_nlines = 20 };
+    void *pool_[pool_max_nlines];
+
+    limbo_group *limbo_head_;
+    limbo_group *limbo_tail_;
+    mutable kvtimestamp_t ts_;
+
+    //enum { ncounters = (int) tc_max };
+    enum { ncounters = 0 };
+    uint64_t counters_[ncounters];
+
+    void* (*thread_func_)(threadinfo*);
+    void* thread_data_;
+
+    void refill_pool(int nl);
+    void refill_rcu();
+
+    void free_rcu(void *p, int freetype) {
+        if ((freetype & 255) == 0) {
+            p = memdebug::check_free_after_rcu(p, freetype);
+            ::free(p);
+        } else if (freetype == -1)
+            (*static_cast<rcu_callback *>(p))(*this);
+        else {
+            p = memdebug::check_free_after_rcu(p, freetype);
+            int nl = freetype & 255;
+            *reinterpret_cast<void **>(p) = pool_[nl - 1];
+            pool_[nl - 1] = p;
+        }
+    }
+
+    void record_rcu(void* ptr, int freetype) {
+        if (recovering && freetype == (memtag_value << 8)) {
+            free_rcu(ptr, freetype);
+            return;
+        }
+        if (limbo_tail_->tail_ == limbo_tail_->capacity)
+            refill_rcu();
+        uint64_t epoch = globalepoch;
+        limbo_tail_->push_back(ptr, freetype, epoch);
+        if (!limbo_epoch_)
+            limbo_epoch_ = epoch;
+    }
+
+#if ENABLE_ASSERTIONS
+    static int no_pool_value;
+    static bool use_pool() {
+        return !no_pool_value;
+    }
+#else
+    static bool use_pool() {
+        return true;
+    }
+#endif
+
+    void hard_rcu_quiesce();
+    static void* thread_trampoline(void*);
+    friend class loginfo;
+};
+
+#endif