Merge
[c11concurrency-benchmarks.git] / silo / ticker.h
1 #pragma once
2
3 #include <cstdint>
4 #include <atomic>
5 #include <thread>
6
7 #include "core.h"
8 #include "macros.h"
9 #include "spinlock.h"
10 #include "lockguard.h"
11
12 class ticker {
13 public:
14
15 #ifdef CHECK_INVARIANTS
16   static const uint64_t tick_us = 1 * 1000; /* 1 ms */
17 #else
18   static const uint64_t tick_us = 40 * 1000; /* 40 ms */
19 #endif
20
21   ticker()
22     : current_tick_(1), last_tick_inclusive_(0)
23   {
24     std::thread thd(&ticker::tickerloop, this);
25     thd.detach();
26   }
27
28   inline uint64_t
29   global_current_tick() const
30   {
31     return current_tick_.load(std::memory_order_acquire);
32   }
33
34   inline uint64_t
35   global_last_tick_inclusive() const
36   {
37     return last_tick_inclusive_.load(std::memory_order_acquire);
38   }
39
40   inline uint64_t
41   global_last_tick_exclusive() const
42   {
43     return global_last_tick_inclusive() + 1;
44   }
45
46   // should yield a # >= global_last_tick_exclusive()
47   uint64_t
48   compute_global_last_tick_exclusive() const
49   {
50     uint64_t e = ticks_[0].current_tick_.load(std::memory_order_acquire);
51     for (size_t i = 1; i < ticks_.size(); i++)
52       e = std::min(e, ticks_[i].current_tick_.load(std::memory_order_acquire));
53     return e;
54   }
55
56   // returns true if guard is currently active, along with filling
57   // cur_epoch out
58   inline bool
59   is_locally_guarded(uint64_t &cur_epoch) const
60   {
61     const uint64_t core_id = coreid::core_id();
62     const uint64_t current_tick =
63       ticks_[core_id].current_tick_.load(std::memory_order_acquire);
64     const uint64_t current_depth =
65       ticks_[core_id].depth_.load(std::memory_order_acquire);
66     if (current_depth)
67       cur_epoch = current_tick;
68     return current_depth;
69   }
70
71   inline bool
72   is_locally_guarded() const
73   {
74     uint64_t c;
75     return is_locally_guarded(c);
76   }
77
78   inline spinlock &
79   lock_for(uint64_t core_id)
80   {
81     INVARIANT(core_id < ticks_.size());
82     return ticks_[core_id].lock_;
83   }
84
85   // a guard is re-entrant within a single thread
86   class guard {
87   public:
88
89     guard(ticker &impl)
90       : impl_(&impl), core_(coreid::core_id()), start_us_(0)
91     {
92       tickinfo &ti = impl_->ticks_[core_];
93       // bump the depth first
94       const uint64_t prev_depth = util::non_atomic_fetch_add(ti.depth_, 1UL);
95       // grab the lock
96       if (!prev_depth) {
97         ti.lock_.lock();
98         // read epoch # (try to advance forward)
99         tick_ = impl_->global_current_tick();
100         INVARIANT(ti.current_tick_.load(std::memory_order_acquire) <= tick_);
101         ti.current_tick_.store(tick_, std::memory_order_release);
102         start_us_ = util::timer::cur_usec();
103         ti.start_us_.store(start_us_, std::memory_order_release);
104       } else {
105         tick_ = ti.current_tick_.load(std::memory_order_acquire);
106         start_us_ = ti.start_us_.load(std::memory_order_acquire);
107       }
108       INVARIANT(ti.lock_.is_locked());
109       depth_ = prev_depth + 1;
110     }
111
112     guard(guard &&) = default;
113     guard(const guard &) = delete;
114     guard &operator=(const guard &) = delete;
115
116     ~guard()
117     {
118       if (!impl_)
119         return;
120       INVARIANT(core_ == coreid::core_id());
121       tickinfo &ti = impl_->ticks_[core_];
122       INVARIANT(ti.lock_.is_locked());
123       INVARIANT(tick_ > impl_->global_last_tick_inclusive());
124       const uint64_t prev_depth = util::non_atomic_fetch_sub(ti.depth_, 1UL);
125       INVARIANT(prev_depth);
126       // unlock
127       if (prev_depth == 1) {
128         ti.start_us_.store(0, std::memory_order_release);
129         ti.lock_.unlock();
130       }
131     }
132
133     inline uint64_t
134     tick() const
135     {
136       INVARIANT(impl_);
137       return tick_;
138     }
139
140     inline uint64_t
141     core() const
142     {
143       INVARIANT(impl_);
144       return core_;
145     }
146
147     inline uint64_t
148     depth() const
149     {
150       INVARIANT(impl_);
151       return depth_;
152     }
153
154     inline const ticker &
155     impl() const
156     {
157       INVARIANT(impl_);
158       return *impl_;
159     }
160
161     // refers to the start time of the *outermost* scope
162     inline uint64_t
163     start_us() const
164     {
165       return start_us_;
166     }
167
168   private:
169     ticker *impl_;
170     uint64_t core_;
171     uint64_t tick_;
172     uint64_t depth_;
173     uint64_t start_us_;
174   };
175
176   static ticker s_instance CACHE_ALIGNED; // system wide ticker
177
178 private:
179
180   void
181   tickerloop()
182   {
183     // runs as daemon
184     util::timer loop_timer;
185     struct timespec t;
186     for (;;) {
187
188       const uint64_t last_loop_usec = loop_timer.lap();
189       const uint64_t delay_time_usec = tick_us;
190       if (last_loop_usec < delay_time_usec) {
191         const uint64_t sleep_ns = (delay_time_usec - last_loop_usec) * 1000;
192         t.tv_sec  = sleep_ns / ONE_SECOND_NS;
193         t.tv_nsec = sleep_ns % ONE_SECOND_NS;
194         nanosleep(&t, nullptr);
195         loop_timer.lap(); // since we slept away the lag
196       }
197
198       // bump the current tick
199       // XXX: ignore overflow
200       const uint64_t last_tick = util::non_atomic_fetch_add(current_tick_, 1UL);
201       const uint64_t cur_tick  = last_tick + 1;
202
203       // wait for all threads to finish the last tick
204       for (size_t i = 0; i < ticks_.size(); i++) {
205         tickinfo &ti = ticks_[i];
206         const uint64_t thread_cur_tick =
207           ti.current_tick_.load(std::memory_order_acquire);
208         INVARIANT(thread_cur_tick == last_tick ||
209                   thread_cur_tick == cur_tick);
210         if (thread_cur_tick == cur_tick)
211           continue;
212         lock_guard<spinlock> lg(ti.lock_);
213         ti.current_tick_.store(cur_tick, std::memory_order_release);
214       }
215
216       last_tick_inclusive_.store(last_tick, std::memory_order_release);
217     }
218   }
219
220   struct tickinfo {
221     spinlock lock_; // guards current_tick_ and depth_
222
223     std::atomic<uint64_t> current_tick_; // last RCU epoch this thread has seen
224                                          // (implies completion through current_tick_ - 1)
225     std::atomic<uint64_t> depth_; // 0 if not in RCU section
226     std::atomic<uint64_t> start_us_; // 0 if not in RCU section
227
228     tickinfo()
229       : current_tick_(1), depth_(0), start_us_(0)
230     {
231       ALWAYS_ASSERT(((uintptr_t)this % CACHELINE_SIZE) == 0);
232     }
233   };
234
235   percore<tickinfo> ticks_;
236
237   std::atomic<uint64_t> current_tick_; // which tick are we currenlty on?
238   std::atomic<uint64_t> last_tick_inclusive_;
239     // all threads have *completed* ticks <= last_tick_inclusive_
240     // (< current_tick_)
241 };