adding iris benchmark
[c11concurrency-benchmarks.git] / iris / src / base_logger.cpp
1 #include <cstdio>
2 #include <cstdlib>
3 #include <cmath>
4
5 #include <memory>
6 #include <base_logger.h>
7 #include <stream_writer.h>
8
9 using namespace iris;
10
11 namespace iris{
12
13 static stream_writer stdout_writer(stdout);
14
15 thread_logqueue_holder this_thread_logqueue; 
16 notifier ntfer;
17 size_t thread_queue_size;
18 size_t thread_ringbuf_size;
19
20 }
21
22 thread_logqueue::thread_logqueue(size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), output_thread(true)
23     {}
24
25 thread_logqueue::thread_logqueue(thread_logqueue *head, size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), head(head), output_thread(false){
26     thread_logqueue * p = nullptr;
27
28     do {
29         // insert self into the head of the global linked list lockfreely
30         p = head->next.load(std::memory_order_acquire);
31         this->next.store(p, std::memory_order_release);
32         // head might have been modified or deleted cas until this is inserted
33         if (head->next.compare_exchange_weak(p, this, std::memory_order_release)) {
34             return;
35         }
36     } while (true);
37 }
38
39 thread_logqueue::~thread_logqueue() {
40     if (output_thread)
41         return;
42     thread_logqueue * p = nullptr, *pnext = nullptr, *q = this;
43
44     // remove self from the global linked list lockfreely
45     p = head;
46     while (p->next.load(std::memory_order_acquire) != q)
47         p = p->next.load(std::memory_order_acquire);
48
49     pnext = this->next.load(std::memory_order_acquire);
50     // mark this as deleted(by setting this->next to nullptr)
51     while (!this->next.compare_exchange_weak(pnext, nullptr, std::memory_order_release)) {
52         next = this->next.load(std::memory_order_acquire);
53     }
54
55     do {
56         if (p->next.compare_exchange_weak(q, pnext, std::memory_order_release)) {
57             return;
58         }
59         // some other nodes have been inserted after p, restart.
60         p = head;
61         while (p->next.load(std::memory_order_acquire) != q)
62             p = p->next.load(std::memory_order_acquire);
63     } while(true);
64 }
65
66 static void do_format_and_flush(thread_logqueue *p, std::vector<loglet_t> & logs, buffered_writer & w) {
67     for (size_t i = 0; i < logs.size(); ++i) {
68         auto f = *reinterpret_cast<formatter_type*>(logs[i].rbuf_ptr);
69         (*f)(logs[i], w);
70         p->rbuf.release(logs[i].rbuf_alloc_size);
71     }
72     logs.clear();
73 }
74
75 static void iris_thread(writer * pwriter, std::atomic<bool> * stop, size_t scan_interval,size_t output_buffer_size, thread_logqueue * head) {
76     std::vector<loglet_t> logs;
77     std::vector<ntf_t>    ntfs;
78     buffered_writer bw(*pwriter, output_buffer_size);
79     while(!*stop) {
80         // iterate through the linked list of input queues, collect log entries
81         thread_logqueue * p = head;
82         bool empty = true;
83         while (p) {
84             if (p->q.batch_poll(logs)) {
85                 do_format_and_flush(p, logs, bw);
86                 empty = false;
87             }
88             p = p->next;
89         }
90
91         if (empty) {
92             ntfs.clear();
93             // wait for notification or 100ms
94             ntfer.wait(scan_interval, ntfs);
95             for (size_t i = 0; i < ntfs.size(); ++i) {
96                 ntf_t ntf = ntfs[i];
97                 p = reinterpret_cast<thread_logqueue *>(notifier::to_data_t(ntf));
98                 switch(notifier::to_ntf_type(ntf)) {
99                     case ntf_msg:
100                         if (p->q.batch_poll(logs))
101                             do_format_and_flush(p, logs, bw);
102                     break;
103                     case ntf_queue_deletion:
104                         if (p->q.batch_poll(logs))
105                             do_format_and_flush(p, logs, bw);
106                         delete p;
107                     break;
108                     default:
109                     break;
110                 }
111             }
112         }
113     }
114
115     //collect one more time
116     thread_logqueue * p = head;
117     while (p) {
118         if (p->q.batch_poll(logs))
119             do_format_and_flush(p, logs, bw);
120         p = p->next;
121     }
122     bw.flush();
123 }
124
125 base_logger::base_logger(writer * pwriter,
126                 size_t scan_interval,
127                 size_t output_buffer_size,
128                 size_t default_thread_ringbuf_size,
129                 size_t default_thread_queue_size):
130     m_stop(0),
131     m_default_thread_ringbuf_size(default_thread_ringbuf_size),
132     m_default_thread_queue_size(default_thread_queue_size)
133 {
134     if (pwriter == nullptr)
135         pwriter = &stdout_writer;
136     m_output_thread = std::thread(iris_thread, pwriter, &m_stop, scan_interval, output_buffer_size, &m_head);
137 }