adding iris benchmark
[c11concurrency-benchmarks.git] / iris / include / base_logger.h
1 #ifndef IRIS_BASE_LOG_H
2 #define IRIS_BASE_LOG_H
3
4 #include <cmath>
5 #include <cstdio>
6
7 #include <thread>
8 #include <vector>
9 #include <memory>
10 #include <atomic>
11 #include <string>
12 #include <utility>
13 #include <string.h>
14
15 #include "define.h"
16 #include "writer.h"
17 #include "lfringbuffer.h"
18 #include "sslfqueue.h"
19 #include "notifier.h"
20 #include "utils.h"
21 #include "formatter.h"
22
23 namespace iris {
24
25 struct thread_logqueue;
26 struct thread_logqueue_holder;
27 extern notifier ntfer;
28 extern thread_logqueue_holder this_thread_logqueue;
29 extern size_t thread_queue_size;
30 extern size_t thread_ringbuf_size;
31
32 struct thread_logqueue_holder {
33 private:
34     thread_logqueue * q;
35 public:
36     thread_logqueue_holder():q(nullptr){}
37     inline void assign(thread_logqueue * queue) {
38         q = queue;
39     }
40     inline thread_logqueue *get() {
41         return q;
42     }
43     inline void notify_and_quit() {
44         if (q) {
45             ntfer.notify(notifier::to_ntf_t((long)q, ntf_queue_deletion));
46             q = nullptr;
47         }
48     }
49     ~thread_logqueue_holder() {
50         notify_and_quit();
51     }
52 };
53
54 // Every thread which uses logging primitives owns one of this 
55 // single-wrtiter-single-reader queue for buffering logs.
56 // These thread_local queues form a singly-linked list which is
57 // polled by a dedicated output thread to collect logs.
58 struct thread_logqueue {
59 public:
60     thread_logqueue(size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by normal threadss
61     thread_logqueue(thread_logqueue *head, size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by output thread
62     ~thread_logqueue();
63 public:
64     sslfqueue<loglet_t>             q;
65     lfringbuffer                    rbuf;
66     std::atomic<thread_logqueue *>  next;
67     thread_logqueue               * head;
68     bool                            output_thread; // if current thread is the output thread
69 };
70
71 class base_logger {
72 public:
73     base_logger(writer * pwriter = nullptr,
74                 size_t scan_interval = 100,
75                 size_t output_buffer_size = 102400,
76                 size_t default_thread_ringbuf_size = 102400,
77                 size_t default_thread_queue_size = 5000);
78
79     // set the thread local queue size
80     void set_thread_queue_size(size_t size) {
81         thread_queue_size = size;
82     }
83     // set the thread local ring buffer size
84     void set_thread_ringbuf_size(size_t size) {
85         thread_ringbuf_size = size;
86     }
87
88     template<typename Formatter, typename... Args>
89     void log(Args&&... args) {
90         thread_logqueue * this_queue = this_thread_logqueue.get();
91
92         if (iris_unlikely(!this_queue)) {
93             if (thread_queue_size == 0)
94                 thread_queue_size = m_default_thread_queue_size;
95             if (thread_ringbuf_size == 0)
96                 thread_ringbuf_size = m_default_thread_ringbuf_size;
97             this_queue = new thread_logqueue(&m_head, thread_queue_size, thread_ringbuf_size);
98             this_thread_logqueue.assign(this_queue);
99         }
100
101         typedef std::tuple<typename std::decay<Args>::type...> Args_t;
102         const size_t buffer_size = sizeof(formatter_type) + sizeof(Args_t); 
103         const size_t args_offset = sizeof(formatter_type);
104
105         loglet_t l;
106         while ((l.rbuf_alloc_size = this_queue->rbuf.acquire(buffer_size, l.rbuf_ptr)) == 0)
107             std::this_thread::yield();
108         *reinterpret_cast<formatter_type*>(l.rbuf_ptr) = &formatter_caller<Formatter, typename std::decay<Args>::type...>;
109         // inplace construct parameter pack
110         new (l.rbuf_ptr + args_offset) Args_t(std::forward<Args>(args)...);
111
112         // try to publish to output thread
113         if (iris_likely(this_queue->q.offer(l))) {
114             return;
115         }
116
117         // queue is full, notify the output thread
118         ntfer.notify(notifier::to_ntf_t(reinterpret_cast<long>(this_queue), ntf_msg));
119         // busy yielding until the log is buffered
120         do {
121             std::this_thread::yield();
122         } while (!this_queue->q.offer(l));
123     }
124
125     void sync_and_close() {
126         if (!m_stop.load(std::memory_order_acquire)) {
127             this_thread_logqueue.notify_and_quit();
128             m_stop.store(true, std::memory_order_release);
129             m_output_thread.join();
130         }
131     }
132
133     ~base_logger() {
134         sync_and_close();
135     }
136 private:
137     std::atomic<bool> m_stop;
138     thread_logqueue m_head;
139     std::thread m_output_thread;
140     size_t m_default_thread_ringbuf_size;
141     size_t m_default_thread_queue_size;
142 };
143
144 }
145
146 #endif