adding iris benchmark
[c11concurrency-benchmarks.git] / iris / src / base_logger.cpp
diff --git a/iris/src/base_logger.cpp b/iris/src/base_logger.cpp
new file mode 100644 (file)
index 0000000..a1a43ca
--- /dev/null
@@ -0,0 +1,137 @@
+#include <cstdio>
+#include <cstdlib>
+#include <cmath>
+
+#include <memory>
+#include <base_logger.h>
+#include <stream_writer.h>
+
+using namespace iris;
+
+namespace iris{
+
+static stream_writer stdout_writer(stdout);
+
+thread_logqueue_holder this_thread_logqueue; 
+notifier ntfer;
+size_t thread_queue_size;
+size_t thread_ringbuf_size;
+
+}
+
+thread_logqueue::thread_logqueue(size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), output_thread(true)
+    {}
+
+thread_logqueue::thread_logqueue(thread_logqueue *head, size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), head(head), output_thread(false){
+    thread_logqueue * p = nullptr;
+
+    do {
+        // insert self into the head of the global linked list lockfreely
+        p = head->next.load(std::memory_order_acquire);
+        this->next.store(p, std::memory_order_release);
+        // head might have been modified or deleted cas until this is inserted
+        if (head->next.compare_exchange_weak(p, this, std::memory_order_release)) {
+            return;
+        }
+    } while (true);
+}
+
+thread_logqueue::~thread_logqueue() {
+    if (output_thread)
+        return;
+    thread_logqueue * p = nullptr, *pnext = nullptr, *q = this;
+
+    // remove self from the global linked list lockfreely
+    p = head;
+    while (p->next.load(std::memory_order_acquire) != q)
+        p = p->next.load(std::memory_order_acquire);
+
+    pnext = this->next.load(std::memory_order_acquire);
+    // mark this as deleted(by setting this->next to nullptr)
+    while (!this->next.compare_exchange_weak(pnext, nullptr, std::memory_order_release)) {
+        next = this->next.load(std::memory_order_acquire);
+    }
+
+    do {
+        if (p->next.compare_exchange_weak(q, pnext, std::memory_order_release)) {
+            return;
+        }
+        // some other nodes have been inserted after p, restart.
+        p = head;
+        while (p->next.load(std::memory_order_acquire) != q)
+            p = p->next.load(std::memory_order_acquire);
+    } while(true);
+}
+
+static void do_format_and_flush(thread_logqueue *p, std::vector<loglet_t> & logs, buffered_writer & w) {
+    for (size_t i = 0; i < logs.size(); ++i) {
+        auto f = *reinterpret_cast<formatter_type*>(logs[i].rbuf_ptr);
+        (*f)(logs[i], w);
+        p->rbuf.release(logs[i].rbuf_alloc_size);
+    }
+    logs.clear();
+}
+
+static void iris_thread(writer * pwriter, std::atomic<bool> * stop, size_t scan_interval,size_t output_buffer_size, thread_logqueue * head) {
+    std::vector<loglet_t> logs;
+    std::vector<ntf_t>    ntfs;
+    buffered_writer bw(*pwriter, output_buffer_size);
+    while(!*stop) {
+        // iterate through the linked list of input queues, collect log entries
+        thread_logqueue * p = head;
+        bool empty = true;
+        while (p) {
+            if (p->q.batch_poll(logs)) {
+                do_format_and_flush(p, logs, bw);
+                empty = false;
+            }
+            p = p->next;
+        }
+
+        if (empty) {
+            ntfs.clear();
+            // wait for notification or 100ms
+            ntfer.wait(scan_interval, ntfs);
+            for (size_t i = 0; i < ntfs.size(); ++i) {
+                ntf_t ntf = ntfs[i];
+                p = reinterpret_cast<thread_logqueue *>(notifier::to_data_t(ntf));
+                switch(notifier::to_ntf_type(ntf)) {
+                    case ntf_msg:
+                        if (p->q.batch_poll(logs))
+                            do_format_and_flush(p, logs, bw);
+                    break;
+                    case ntf_queue_deletion:
+                        if (p->q.batch_poll(logs))
+                            do_format_and_flush(p, logs, bw);
+                        delete p;
+                    break;
+                    default:
+                    break;
+                }
+            }
+        }
+    }
+
+    //collect one more time
+    thread_logqueue * p = head;
+    while (p) {
+        if (p->q.batch_poll(logs))
+            do_format_and_flush(p, logs, bw);
+        p = p->next;
+    }
+    bw.flush();
+}
+
+base_logger::base_logger(writer * pwriter,
+                size_t scan_interval,
+                size_t output_buffer_size,
+                size_t default_thread_ringbuf_size,
+                size_t default_thread_queue_size):
+    m_stop(0),
+    m_default_thread_ringbuf_size(default_thread_ringbuf_size),
+    m_default_thread_queue_size(default_thread_queue_size)
+{
+    if (pwriter == nullptr)
+        pwriter = &stdout_writer;
+    m_output_thread = std::thread(iris_thread, pwriter, &m_stop, scan_interval, output_buffer_size, &m_head);
+}