1 #ifndef IRIS_BASE_LOG_H
2 #define IRIS_BASE_LOG_H
17 #include "lfringbuffer.h"
18 #include "sslfqueue.h"
21 #include "formatter.h"
25 struct thread_logqueue;
26 struct thread_logqueue_holder;
27 extern notifier ntfer;
28 extern thread_logqueue_holder this_thread_logqueue;
29 extern size_t thread_queue_size;
30 extern size_t thread_ringbuf_size;
32 struct thread_logqueue_holder {
36 thread_logqueue_holder():q(nullptr){}
37 inline void assign(thread_logqueue * queue) {
40 inline thread_logqueue *get() {
43 inline void notify_and_quit() {
45 ntfer.notify(notifier::to_ntf_t((long)q, ntf_queue_deletion));
49 ~thread_logqueue_holder() {
54 // Every thread which uses logging primitives owns one of this
55 // single-wrtiter-single-reader queue for buffering logs.
56 // These thread_local queues form a singly-linked list which is
57 // polled by a dedicated output thread to collect logs.
58 struct thread_logqueue {
60 thread_logqueue(size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by normal threadss
61 thread_logqueue(thread_logqueue *head, size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by output thread
64 sslfqueue<loglet_t> q;
66 std::atomic<thread_logqueue *> next;
67 thread_logqueue * head;
68 bool output_thread; // if current thread is the output thread
73 base_logger(writer * pwriter = nullptr,
74 size_t scan_interval = 100,
75 size_t output_buffer_size = 102400,
76 size_t default_thread_ringbuf_size = 102400,
77 size_t default_thread_queue_size = 5000);
79 // set the thread local queue size
80 void set_thread_queue_size(size_t size) {
81 thread_queue_size = size;
83 // set the thread local ring buffer size
84 void set_thread_ringbuf_size(size_t size) {
85 thread_ringbuf_size = size;
88 template<typename Formatter, typename... Args>
89 void log(Args&&... args) {
90 thread_logqueue * this_queue = this_thread_logqueue.get();
92 if (iris_unlikely(!this_queue)) {
93 if (thread_queue_size == 0)
94 thread_queue_size = m_default_thread_queue_size;
95 if (thread_ringbuf_size == 0)
96 thread_ringbuf_size = m_default_thread_ringbuf_size;
97 this_queue = new thread_logqueue(&m_head, thread_queue_size, thread_ringbuf_size);
98 this_thread_logqueue.assign(this_queue);
101 typedef std::tuple<typename std::decay<Args>::type...> Args_t;
102 const size_t buffer_size = sizeof(formatter_type) + sizeof(Args_t);
103 const size_t args_offset = sizeof(formatter_type);
106 while ((l.rbuf_alloc_size = this_queue->rbuf.acquire(buffer_size, l.rbuf_ptr)) == 0)
107 std::this_thread::yield();
108 *reinterpret_cast<formatter_type*>(l.rbuf_ptr) = &formatter_caller<Formatter, typename std::decay<Args>::type...>;
109 // inplace construct parameter pack
110 new (l.rbuf_ptr + args_offset) Args_t(std::forward<Args>(args)...);
112 // try to publish to output thread
113 if (iris_likely(this_queue->q.offer(l))) {
117 // queue is full, notify the output thread
118 ntfer.notify(notifier::to_ntf_t(reinterpret_cast<long>(this_queue), ntf_msg));
119 // busy yielding until the log is buffered
121 std::this_thread::yield();
122 } while (!this_queue->q.offer(l));
125 void sync_and_close() {
126 if (!m_stop.load(std::memory_order_acquire)) {
127 this_thread_logqueue.notify_and_quit();
128 m_stop.store(true, std::memory_order_release);
129 m_output_thread.join();
137 std::atomic<bool> m_stop;
138 thread_logqueue m_head;
139 std::thread m_output_thread;
140 size_t m_default_thread_ringbuf_size;
141 size_t m_default_thread_queue_size;