6 #include <base_logger.h>
7 #include <stream_writer.h>
13 static stream_writer stdout_writer(stdout);
15 thread_logqueue_holder this_thread_logqueue;
17 size_t thread_queue_size;
18 size_t thread_ringbuf_size;
22 thread_logqueue::thread_logqueue(size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), output_thread(true)
25 thread_logqueue::thread_logqueue(thread_logqueue *head, size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), head(head), output_thread(false){
26 thread_logqueue * p = nullptr;
29 // insert self into the head of the global linked list lockfreely
30 p = head->next.load(std::memory_order_acquire);
31 this->next.store(p, std::memory_order_release);
32 // head might have been modified or deleted cas until this is inserted
33 if (head->next.compare_exchange_weak(p, this, std::memory_order_release)) {
39 thread_logqueue::~thread_logqueue() {
42 thread_logqueue * p = nullptr, *pnext = nullptr, *q = this;
44 // remove self from the global linked list lockfreely
46 while (p->next.load(std::memory_order_acquire) != q)
47 p = p->next.load(std::memory_order_acquire);
49 pnext = this->next.load(std::memory_order_acquire);
50 // mark this as deleted(by setting this->next to nullptr)
51 while (!this->next.compare_exchange_weak(pnext, nullptr, std::memory_order_release)) {
52 next = this->next.load(std::memory_order_acquire);
56 if (p->next.compare_exchange_weak(q, pnext, std::memory_order_release)) {
59 // some other nodes have been inserted after p, restart.
61 while (p->next.load(std::memory_order_acquire) != q)
62 p = p->next.load(std::memory_order_acquire);
66 static void do_format_and_flush(thread_logqueue *p, std::vector<loglet_t> & logs, buffered_writer & w) {
67 for (size_t i = 0; i < logs.size(); ++i) {
68 auto f = *reinterpret_cast<formatter_type*>(logs[i].rbuf_ptr);
70 p->rbuf.release(logs[i].rbuf_alloc_size);
75 static void iris_thread(writer * pwriter, std::atomic<bool> * stop, size_t scan_interval,size_t output_buffer_size, thread_logqueue * head) {
76 std::vector<loglet_t> logs;
77 std::vector<ntf_t> ntfs;
78 buffered_writer bw(*pwriter, output_buffer_size);
80 // iterate through the linked list of input queues, collect log entries
81 thread_logqueue * p = head;
84 if (p->q.batch_poll(logs)) {
85 do_format_and_flush(p, logs, bw);
93 // wait for notification or 100ms
94 ntfer.wait(scan_interval, ntfs);
95 for (size_t i = 0; i < ntfs.size(); ++i) {
97 p = reinterpret_cast<thread_logqueue *>(notifier::to_data_t(ntf));
98 switch(notifier::to_ntf_type(ntf)) {
100 if (p->q.batch_poll(logs))
101 do_format_and_flush(p, logs, bw);
103 case ntf_queue_deletion:
104 if (p->q.batch_poll(logs))
105 do_format_and_flush(p, logs, bw);
115 //collect one more time
116 thread_logqueue * p = head;
118 if (p->q.batch_poll(logs))
119 do_format_and_flush(p, logs, bw);
125 base_logger::base_logger(writer * pwriter,
126 size_t scan_interval,
127 size_t output_buffer_size,
128 size_t default_thread_ringbuf_size,
129 size_t default_thread_queue_size):
131 m_default_thread_ringbuf_size(default_thread_ringbuf_size),
132 m_default_thread_queue_size(default_thread_queue_size)
134 if (pwriter == nullptr)
135 pwriter = &stdout_writer;
136 m_output_thread = std::thread(iris_thread, pwriter, &m_stop, scan_interval, output_buffer_size, &m_head);