6 #include <base_logger.h>
7 #include <stream_writer.h>
13 static stream_writer stdout_writer(stdout);
15 thread_logqueue_holder this_thread_logqueue;
17 size_t thread_queue_size;
18 size_t thread_ringbuf_size;
22 thread_logqueue::thread_logqueue(size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), output_thread(true)
25 thread_logqueue::thread_logqueue(thread_logqueue *head, size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), head(head), output_thread(false){
26 thread_logqueue * p = nullptr;
29 // insert self into the head of the global linked list lockfreely
30 p = head->next.load(std::memory_order_acquire);
31 this->next.store(p, std::memory_order_release);
32 // head might have been modified or deleted cas until this is inserted
33 // Use seq_cst ordering is a temporary fix. C11tester did not implement pipe (used in notifier.cpp) so that
34 // writing to/reading from a pipe establish a happens-before relation
35 if (head->next.compare_exchange_weak(p, this, std::memory_order_seq_cst /*std::memory_order_release*/)) {
41 thread_logqueue::~thread_logqueue() {
44 thread_logqueue * p = nullptr, *pnext = nullptr, *q = this;
46 // remove self from the global linked list lockfreely
48 while (p->next.load(std::memory_order_acquire) != q)
49 p = p->next.load(std::memory_order_acquire);
51 pnext = this->next.load(std::memory_order_acquire);
52 // mark this as deleted(by setting this->next to nullptr)
53 while (!this->next.compare_exchange_weak(pnext, nullptr, std::memory_order_release)) {
54 next = this->next.load(std::memory_order_acquire);
58 if (p->next.compare_exchange_weak(q, pnext, std::memory_order_release)) {
61 // some other nodes have been inserted after p, restart.
63 while (p->next.load(std::memory_order_acquire) != q)
64 p = p->next.load(std::memory_order_acquire);
68 static void do_format_and_flush(thread_logqueue *p, std::vector<loglet_t> & logs, buffered_writer & w) {
69 for (size_t i = 0; i < logs.size(); ++i) {
70 auto f = *reinterpret_cast<formatter_type*>(logs[i].rbuf_ptr);
72 p->rbuf.release(logs[i].rbuf_alloc_size);
77 static void iris_thread(writer * pwriter, std::atomic<bool> * stop, size_t scan_interval,size_t output_buffer_size, thread_logqueue * head) {
78 std::vector<loglet_t> logs;
79 std::vector<ntf_t> ntfs;
80 buffered_writer bw(*pwriter, output_buffer_size);
82 // iterate through the linked list of input queues, collect log entries
83 thread_logqueue * p = head;
86 if (p->q.batch_poll(logs)) {
87 do_format_and_flush(p, logs, bw);
95 // wait for notification or 100ms
96 ntfer.wait(scan_interval, ntfs);
97 for (size_t i = 0; i < ntfs.size(); ++i) {
99 p = reinterpret_cast<thread_logqueue *>(notifier::to_data_t(ntf));
100 switch(notifier::to_ntf_type(ntf)) {
102 if (p->q.batch_poll(logs))
103 do_format_and_flush(p, logs, bw);
105 case ntf_queue_deletion:
106 if (p->q.batch_poll(logs))
107 do_format_and_flush(p, logs, bw);
117 //collect one more time
118 thread_logqueue * p = head;
120 if (p->q.batch_poll(logs))
121 do_format_and_flush(p, logs, bw);
127 base_logger::base_logger(writer * pwriter,
128 size_t scan_interval,
129 size_t output_buffer_size,
130 size_t default_thread_ringbuf_size,
131 size_t default_thread_queue_size):
133 m_default_thread_ringbuf_size(default_thread_ringbuf_size),
134 m_default_thread_queue_size(default_thread_queue_size)
136 if (pwriter == nullptr)
137 pwriter = &stdout_writer;
138 m_output_thread = std::thread(iris_thread, pwriter, &m_stop, scan_interval, output_buffer_size, &m_head);