update readme
[c11concurrency-benchmarks.git] / silo / rcu.cc
1 #include <unistd.h>
2 #include <time.h>
3 #include <string.h>
4 #include <numa.h>
5 #include <sched.h>
6 #include <iostream>
7 #include <thread>
8 #include <atomic>
9
10 #include "rcu.h"
11 #include "macros.h"
12 #include "util.h"
13 #include "thread.h"
14 #include "counter.h"
15 #include "lockguard.h"
16
17 using namespace std;
18 using namespace util;
19
20 rcu rcu::s_instance;
21
22 static event_counter evt_rcu_deletes("rcu_deletes");
23 static event_counter evt_rcu_frees("rcu_frees");
24 static event_counter evt_rcu_local_reaps("rcu_local_reaps");
25 static event_counter evt_rcu_incomplete_local_reaps("rcu_incomplete_local_reaps");
26 static event_counter evt_rcu_loop_reaps("rcu_loop_reaps");
27 static event_counter *evt_allocator_arena_allocations[::allocator::MAX_ARENAS] = {nullptr};
28 static event_counter *evt_allocator_arena_deallocations[::allocator::MAX_ARENAS] = {nullptr};
29 static event_counter evt_allocator_large_allocation("allocator_large_allocation");
30
31 static event_avg_counter evt_avg_gc_reaper_queue_len("avg_gc_reaper_queue_len");
32 static event_avg_counter evt_avg_rcu_delete_queue_len("avg_rcu_delete_queue_len");
33 static event_avg_counter evt_avg_rcu_local_delete_queue_len("avg_rcu_local_delete_queue_len");
34 static event_avg_counter evt_avg_rcu_sync_try_release("avg_rcu_sync_try_release");
35 static event_avg_counter evt_avg_time_inbetween_rcu_epochs_usec(
36     "avg_time_inbetween_rcu_epochs_usec");
37 static event_avg_counter evt_avg_time_inbetween_allocator_releases_usec(
38     "avg_time_inbetween_allocator_releases_usec");
39
40 #ifdef MEMCHECK_MAGIC
41 static void
42 report_error_and_die(
43     const void *p, size_t alloc_size, const char *msg,
44     const string &prefix="", unsigned recurse=3, bool first=true)
45 {
46   // print the entire allocation block, for debugging reference
47   static_assert(::allocator::AllocAlignment % 8 == 0, "xx");
48   const void *pnext = *((const void **) p);
49   cerr << prefix << "Address " << p << " error found! (next ptr " << pnext << ")" << endl;
50   if (pnext) {
51     const ::allocator::pgmetadata *pmd = ::allocator::PointerToPgMetadata(pnext);
52     if (!pmd) {
53       cerr << prefix << "Error: could not get pgmetadata for next ptr" << endl;
54       cerr << prefix << "Allocator managed next ptr? " << ::allocator::ManagesPointer(pnext) << endl;
55     } else {
56       cerr << prefix << "Next ptr allocation size: " << pmd->unit_ << endl;
57       if (((uintptr_t)pnext % pmd->unit_) == 0) {
58         if (recurse)
59           report_error_and_die(
60               pnext, pmd->unit_, "", prefix + "    ", recurse - 1, false);
61         else
62           cerr << prefix << "recursion stopped" << endl;
63       } else {
64         cerr << prefix << "Next ptr not properly aligned" << endl;
65         if (recurse)
66           report_error_and_die(
67               (const void *) slow_round_down((uintptr_t)pnext, (uintptr_t)pmd->unit_),
68               pmd->unit_, "", prefix + "    ", recurse - 1, false);
69         else
70           cerr << prefix << "recursion stopped" << endl;
71       }
72     }
73   }
74   cerr << prefix << "Msg: " << msg << endl;
75   cerr << prefix << "Allocation size: " << alloc_size << endl;
76   cerr << prefix << "Ptr aligned properly? " << (((uintptr_t)p % alloc_size) == 0) << endl;
77   for (const char *buf = (const char *) p;
78       buf < (const char *) p + alloc_size;
79       buf += 8) {
80     cerr << prefix << hexify_buf(buf, 8) << endl;
81   }
82   if (first)
83     ALWAYS_ASSERT(false);
84 }
85
86 static void
87 check_pointer_or_die(void *p, size_t alloc_size)
88 {
89   ALWAYS_ASSERT(p);
90   if (unlikely(((uintptr_t)p % alloc_size) != 0))
91     report_error_and_die(p, alloc_size, "pointer not properly aligned");
92   for (size_t off = sizeof(void **); off < alloc_size; off++)
93     if (unlikely(
94          (unsigned char) *((const char *) p + off) !=
95          (unsigned char) MEMCHECK_MAGIC ) )
96       report_error_and_die(p, alloc_size, "memory magic not found");
97   void *pnext = *((void **) p);
98   if (unlikely(((uintptr_t)pnext % alloc_size) != 0))
99     report_error_and_die(p, alloc_size, "next pointer not properly aligned");
100 }
101 #endif
102
103 void *
104 rcu::sync::alloc(size_t sz)
105 {
106   if (pin_cpu_ == -1)
107     // fallback to regular allocator
108     return malloc(sz);
109   auto sizes = ::allocator::ArenaSize(sz);
110   auto arena = sizes.second;
111   if (arena >= ::allocator::MAX_ARENAS) {
112     // fallback to regular allocator
113     ++evt_allocator_large_allocation;
114     return malloc(sz);
115   }
116   ensure_arena(arena);
117   void *p = arenas_[arena];
118   INVARIANT(p);
119 #ifdef MEMCHECK_MAGIC
120   const size_t alloc_size = (arena + 1) * ::allocator::AllocAlignment;
121   check_pointer_or_die(p, alloc_size);
122 #endif
123   arenas_[arena] = *reinterpret_cast<void **>(p);
124   evt_allocator_arena_allocations[arena]->inc();
125   return p;
126 }
127
128 void *
129 rcu::sync::alloc_static(size_t sz)
130 {
131   if (pin_cpu_ == -1)
132     return malloc(sz);
133   // round up to hugepagesize
134   static const size_t hugepgsize = ::allocator::GetHugepageSize();
135   sz = slow_round_up(sz, hugepgsize);
136   INVARIANT((sz % hugepgsize) == 0);
137   return ::allocator::AllocateUnmanaged(pin_cpu_, sz / hugepgsize);
138 }
139
140 void
141 rcu::sync::dealloc(void *p, size_t sz)
142 {
143   if (!::allocator::ManagesPointer(p)) {
144     ::free(p);
145     return;
146   }
147   auto sizes = ::allocator::ArenaSize(sz);
148   auto arena = sizes.second;
149   ALWAYS_ASSERT(arena < ::allocator::MAX_ARENAS);
150   *reinterpret_cast<void **>(p) = arenas_[arena];
151 #ifdef MEMCHECK_MAGIC
152   const size_t alloc_size = (arena + 1) * ::allocator::AllocAlignment;
153   ALWAYS_ASSERT( ((uintptr_t)p % alloc_size) == 0 );
154   NDB_MEMSET(
155       (char *) p + sizeof(void **),
156       MEMCHECK_MAGIC, alloc_size - sizeof(void **));
157   ALWAYS_ASSERT(*((void **) p) == arenas_[arena]);
158   check_pointer_or_die(p, alloc_size);
159 #endif
160   arenas_[arena] = p;
161   evt_allocator_arena_deallocations[arena]->inc();
162   deallocs_[arena]++;
163 }
164
165 bool
166 rcu::sync::try_release()
167 {
168   // XXX: tune
169   static const size_t threshold = 10000;
170   // only release if there are > threshold segments to release (over all arenas)
171   size_t acc = 0;
172   for (size_t i = 0; i < ::allocator::MAX_ARENAS; i++)
173     acc += deallocs_[i];
174   if (acc > threshold) {
175     do_release();
176     evt_avg_rcu_sync_try_release.offer(acc);
177     return true;
178   }
179   return false;
180 }
181
182 void
183 rcu::sync::do_release()
184 {
185 #ifdef MEMCHECK_MAGIC
186   for (size_t i = 0; i < ::allocator::MAX_ARENAS; i++) {
187     const size_t alloc_size = (i + 1) * ::allocator::AllocAlignment;
188     void *p = arenas_[i];
189     while (p) {
190       check_pointer_or_die(p, alloc_size);
191       p = *((void **) p);
192     }
193   }
194 #endif
195   ::allocator::ReleaseArenas(&arenas_[0]);
196   NDB_MEMSET(&arenas_[0], 0, sizeof(arenas_));
197   NDB_MEMSET(&deallocs_[0], 0, sizeof(deallocs_));
198 }
199
200 void
201 rcu::sync::do_cleanup()
202 {
203   // compute cleaner epoch
204   const uint64_t clean_tick_exclusive = impl_->cleaning_rcu_tick_exclusive();
205   if (!clean_tick_exclusive)
206     return;
207   const uint64_t clean_tick = clean_tick_exclusive - 1;
208
209   INVARIANT(last_reaped_epoch_ <= clean_tick);
210   INVARIANT(scratch_.empty());
211   if (last_reaped_epoch_ == clean_tick)
212     return;
213
214 #ifdef ENABLE_EVENT_COUNTERS
215   const uint64_t now = timer::cur_usec();
216   if (last_reaped_timestamp_us_ > 0) {
217     const uint64_t diff = now - last_reaped_timestamp_us_;
218     evt_avg_time_inbetween_rcu_epochs_usec.offer(diff);
219   }
220   last_reaped_timestamp_us_ = now;
221 #endif
222   last_reaped_epoch_ = clean_tick;
223
224   scratch_.empty_accept_from(queue_, clean_tick);
225   scratch_.transfer_freelist(queue_);
226   rcu::px_queue &q = scratch_;
227   if (q.empty())
228     return;
229   scoped_rcu_region guard;
230   size_t n = 0;
231   for (auto it = q.begin(); it != q.end(); ++it, ++n) {
232     try {
233       it->run(*this);
234     } catch (...) {
235       cerr << "rcu::region_end: uncaught exception in free routine" << endl;
236     }
237   }
238   q.clear();
239   evt_rcu_deletes += n;
240   evt_avg_rcu_local_delete_queue_len.offer(n);
241
242   // try to release memory from allocator slabs back
243   if (try_release()) {
244 #ifdef ENABLE_EVENT_COUNTERS
245     const uint64_t now = timer::cur_usec();
246     if (last_release_timestamp_us_ > 0) {
247       const uint64_t diff = now - last_release_timestamp_us_;
248       evt_avg_time_inbetween_allocator_releases_usec.offer(diff);
249     }
250     last_release_timestamp_us_ = now;
251 #endif
252   }
253 }
254
255 void
256 rcu::free_with_fn(void *p, deleter_t fn)
257 {
258   sync &s = mysync();
259   uint64_t cur_tick = 0; // ticker units
260   const bool is_guarded = ticker::s_instance.is_locally_guarded(cur_tick);
261   if (!is_guarded)
262     INVARIANT(false);
263   INVARIANT(s.depth());
264   // all threads are either at cur_tick or cur_tick + 1, so we must wait for
265   // the system to move beyond cur_tick + 1
266   s.queue_.enqueue(delete_entry(p, fn), to_rcu_ticks(cur_tick + 1));
267   ++evt_rcu_frees;
268 }
269
270 void
271 rcu::dealloc_rcu(void *p, size_t sz)
272 {
273   sync &s = mysync();
274   uint64_t cur_tick = 0; // ticker units
275   const bool is_guarded = ticker::s_instance.is_locally_guarded(cur_tick);
276   if (!is_guarded)
277     INVARIANT(false);
278   INVARIANT(s.depth());
279   // all threads are either at cur_tick or cur_tick + 1, so we must wait for
280   // the system to move beyond cur_tick + 1
281   s.queue_.enqueue(delete_entry(p, sz), to_rcu_ticks(cur_tick + 1));
282   ++evt_rcu_frees;
283 }
284
285 void
286 rcu::pin_current_thread(size_t cpu)
287 {
288   sync &s = mysync();
289   s.set_pin_cpu(cpu);
290   auto node = numa_node_of_cpu(cpu);
291   // pin to node
292   ALWAYS_ASSERT(!numa_run_on_node(node));
293   // is numa_run_on_node() guaranteed to take effect immediately?
294   ALWAYS_ASSERT(!sched_yield());
295   // release current thread-local cache back to allocator
296   s.do_release();
297 }
298
299 void
300 rcu::fault_region()
301 {
302   sync &s = mysync();
303   if (s.get_pin_cpu() == -1)
304     return;
305   ::allocator::FaultRegion(s.get_pin_cpu());
306 }
307
308 rcu::rcu()
309   : syncs_()
310 {
311   // XXX: these should really be instance members of RCU
312   // we are assuming only one rcu object is ever created
313   for (size_t i = 0; i < ::allocator::MAX_ARENAS; i++) {
314     evt_allocator_arena_allocations[i] =
315       new event_counter("allocator_arena" + to_string(i) + "_allocation");
316     evt_allocator_arena_deallocations[i] =
317       new event_counter("allocator_arena" + to_string(i) + "_deallocation");
318   }
319 }
320
321 struct rcu_stress_test_rec {
322   uint64_t magic_;
323   uint64_t counter_;
324 } PACKED;
325
326 static const uint64_t rcu_stress_test_magic = 0xABCDDEAD01234567UL;
327 static const size_t rcu_stress_test_nthreads = 28;
328 static atomic<rcu_stress_test_rec *> rcu_stress_test_array[rcu_stress_test_nthreads];
329 static atomic<bool> rcu_stress_test_keep_going(true);
330
331 static void
332 rcu_stress_test_deleter_fn(void *px)
333 {
334   ALWAYS_ASSERT( ((rcu_stress_test_rec *) px)->magic_ == rcu_stress_test_magic );
335   rcu::s_instance.dealloc(px, sizeof(rcu_stress_test_rec));
336 }
337
338 static void
339 rcu_stress_test_worker(unsigned id)
340 {
341   rcu::s_instance.pin_current_thread(id);
342   rcu_stress_test_rec *mypx =
343     (rcu_stress_test_rec *) rcu::s_instance.alloc(sizeof(rcu_stress_test_rec));
344   mypx->magic_ = rcu_stress_test_magic;
345   mypx->counter_ = 0;
346   rcu_stress_test_array[id].store(mypx, memory_order_release);
347   while (rcu_stress_test_keep_going.load(memory_order_acquire)) {
348     scoped_rcu_region rcu;
349     for (size_t i = 0; i < rcu_stress_test_nthreads; i++) {
350       rcu_stress_test_rec *p = rcu_stress_test_array[i].load(memory_order_acquire);
351       if (!p)
352         continue;
353       ALWAYS_ASSERT(p->magic_ == rcu_stress_test_magic);
354       p->counter_++; // let it be racy, doesn't matter
355     }
356     // swap it out
357     mypx = rcu_stress_test_array[id].load(memory_order_acquire);
358     if (mypx) {
359       rcu_stress_test_array[id].store(nullptr, memory_order_release);
360       rcu::s_instance.free_with_fn(mypx, rcu_stress_test_deleter_fn);
361     } else {
362       mypx = (rcu_stress_test_rec *)
363         rcu::s_instance.alloc(sizeof(rcu_stress_test_rec));
364       mypx->magic_ = rcu_stress_test_magic;
365       mypx->counter_ = 0;
366       rcu_stress_test_array[id].store(mypx, memory_order_release);
367     }
368   }
369 }
370
371 static void
372 rcu_stress_test()
373 {
374   for (size_t i = 0; i < rcu_stress_test_nthreads; i++)
375     rcu_stress_test_array[i].store(nullptr, memory_order_release);
376   vector<thread> workers;
377   for (size_t i = 0; i < rcu_stress_test_nthreads; i++)
378     workers.emplace_back(rcu_stress_test_worker, i);
379   sleep(120);
380   rcu_stress_test_keep_going.store(false, memory_order_release);
381   for (auto &t : workers)
382     t.join();
383   cerr << "rcu stress test completed" << endl;
384 }
385
386 void
387 rcu::Test()
388 {
389   rcu_stress_test();
390 }