edit
[c11concurrency-benchmarks.git] / src / base_logger.cpp
1 #include <cstdio>
2 #include <cstdlib>
3 #include <cmath>
4
5 #include <memory>
6 #include <base_logger.h>
7 #include <stream_writer.h>
8
9 using namespace iris;
10
11 namespace iris{
12
13 static stream_writer stdout_writer(stdout);
14
15 thread_logqueue_holder this_thread_logqueue; 
16 notifier ntfer;
17 size_t thread_queue_size;
18 size_t thread_ringbuf_size;
19
20 }
21
22 thread_logqueue::thread_logqueue(size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), output_thread(true)
23     {}
24
25 thread_logqueue::thread_logqueue(thread_logqueue *head, size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), head(head), output_thread(false){
26     thread_logqueue * p = nullptr;
27
28     do {
29         // insert self into the head of the global linked list lockfreely
30         p = head->next.load(std::memory_order_acquire);
31         this->next.store(p, std::memory_order_release);
32         // head might have been modified or deleted cas until this is inserted
33         // Use seq_cst ordering is a temporary fix.  C11tester did not implement pipe (used in notifier.cpp) so that
34         // writing to/reading from a pipe establish a happens-before relation
35         if (head->next.compare_exchange_weak(p, this, std::memory_order_seq_cst /*std::memory_order_release*/)) {
36             return;
37         }
38     } while (true);
39 }
40
41 thread_logqueue::~thread_logqueue() {
42     if (output_thread)
43         return;
44     thread_logqueue * p = nullptr, *pnext = nullptr, *q = this;
45
46     // remove self from the global linked list lockfreely
47     p = head;
48     while (p->next.load(std::memory_order_acquire) != q)
49         p = p->next.load(std::memory_order_acquire);
50
51     pnext = this->next.load(std::memory_order_acquire);
52     // mark this as deleted(by setting this->next to nullptr)
53     while (!this->next.compare_exchange_weak(pnext, nullptr, std::memory_order_release)) {
54         next = this->next.load(std::memory_order_acquire);
55     }
56
57     do {
58         if (p->next.compare_exchange_weak(q, pnext, std::memory_order_release)) {
59             return;
60         }
61         // some other nodes have been inserted after p, restart.
62         p = head;
63         while (p->next.load(std::memory_order_acquire) != q)
64             p = p->next.load(std::memory_order_acquire);
65     } while(true);
66 }
67
68 static void do_format_and_flush(thread_logqueue *p, std::vector<loglet_t> & logs, buffered_writer & w) {
69     for (size_t i = 0; i < logs.size(); ++i) {
70         auto f = *reinterpret_cast<formatter_type*>(logs[i].rbuf_ptr);
71         (*f)(logs[i], w);
72         p->rbuf.release(logs[i].rbuf_alloc_size);
73     }
74     logs.clear();
75 }
76
77 static void iris_thread(writer * pwriter, std::atomic<bool> * stop, size_t scan_interval,size_t output_buffer_size, thread_logqueue * head) {
78     std::vector<loglet_t> logs;
79     std::vector<ntf_t>    ntfs;
80     buffered_writer bw(*pwriter, output_buffer_size);
81     while(!*stop) {
82         // iterate through the linked list of input queues, collect log entries
83         thread_logqueue * p = head;
84         bool empty = true;
85         while (p) {
86             if (p->q.batch_poll(logs)) {
87                 do_format_and_flush(p, logs, bw);
88                 empty = false;
89             }
90             p = p->next;
91         }
92
93         if (empty) {
94             ntfs.clear();
95             // wait for notification or 100ms
96             ntfer.wait(scan_interval, ntfs);
97             for (size_t i = 0; i < ntfs.size(); ++i) {
98                 ntf_t ntf = ntfs[i];
99                 p = reinterpret_cast<thread_logqueue *>(notifier::to_data_t(ntf));
100                 switch(notifier::to_ntf_type(ntf)) {
101                     case ntf_msg:
102                         if (p->q.batch_poll(logs))
103                             do_format_and_flush(p, logs, bw);
104                     break;
105                     case ntf_queue_deletion:
106                         if (p->q.batch_poll(logs))
107                             do_format_and_flush(p, logs, bw);
108                         delete p;
109                     break;
110                     default:
111                     break;
112                 }
113             }
114         }
115     }
116
117     //collect one more time
118     thread_logqueue * p = head;
119     while (p) {
120         if (p->q.batch_poll(logs))
121             do_format_and_flush(p, logs, bw);
122         p = p->next;
123     }
124     bw.flush();
125 }
126
127 base_logger::base_logger(writer * pwriter,
128                 size_t scan_interval,
129                 size_t output_buffer_size,
130                 size_t default_thread_ringbuf_size,
131                 size_t default_thread_queue_size):
132     m_stop(0),
133     m_default_thread_ringbuf_size(default_thread_ringbuf_size),
134     m_default_thread_queue_size(default_thread_queue_size)
135 {
136     if (pwriter == nullptr)
137         pwriter = &stdout_writer;
138     m_output_thread = std::thread(iris_thread, pwriter, &m_stop, scan_interval, output_buffer_size, &m_head);
139 }