adding iris benchmark
authorahmad <ahmad@dw-10.eecs.uci.edu>
Sat, 2 Feb 2019 02:41:31 +0000 (18:41 -0800)
committerahmad <ahmad@dw-10.eecs.uci.edu>
Sat, 2 Feb 2019 02:41:31 +0000 (18:41 -0800)
27 files changed:
iris/LICENSE [new file with mode: 0644]
iris/Makefile [new file with mode: 0644]
iris/README.md [new file with mode: 0644]
iris/compile.sh [new file with mode: 0755]
iris/include/base_logger.h [new file with mode: 0644]
iris/include/buffered_writer.h [new file with mode: 0644]
iris/include/define.h [new file with mode: 0644]
iris/include/file_writer.h [new file with mode: 0644]
iris/include/formatter.h [new file with mode: 0644]
iris/include/level_logger.h [new file with mode: 0644]
iris/include/lfringbuffer.h [new file with mode: 0644]
iris/include/notifier.h [new file with mode: 0644]
iris/include/snprintf_formatter.h [new file with mode: 0644]
iris/include/sslfqueue.h [new file with mode: 0644]
iris/include/stream_writer.h [new file with mode: 0644]
iris/include/utils.h [new file with mode: 0644]
iris/include/writer.h [new file with mode: 0644]
iris/run.sh [new file with mode: 0755]
iris/src/base_logger.cpp [new file with mode: 0644]
iris/src/buffered_writer.cpp [new file with mode: 0644]
iris/src/file_writer.cpp [new file with mode: 0644]
iris/src/level_logger.cpp [new file with mode: 0644]
iris/src/main.cpp [new file with mode: 0644]
iris/src/notifier.cpp [new file with mode: 0644]
iris/src/stream_writer.cpp [new file with mode: 0644]
iris/src/utils.cpp [new file with mode: 0644]
iris/tests/test_lfringbuffer.cpp [new file with mode: 0644]

diff --git a/iris/LICENSE b/iris/LICENSE
new file mode 100644 (file)
index 0000000..fc140f3
--- /dev/null
@@ -0,0 +1,22 @@
+The MIT License (MIT)
+
+Copyright (c) 2015 Xinjing Cho
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+
diff --git a/iris/Makefile b/iris/Makefile
new file mode 100644 (file)
index 0000000..a2ee497
--- /dev/null
@@ -0,0 +1,78 @@
+#sources
+IRIS_SRC = src/base_logger.cpp src/buffered_writer.cpp src/file_writer.cpp src/level_logger.cpp src/notifier.cpp src/stream_writer.cpp src/utils.cpp
+IRIS_TEST_SRC = $(IRIS_SRC) tests/test_lfringbuffer.cpp
+IRIS_TEST2_SRC = $(IRIS_SRC) src/main.cpp
+#object files
+IRIS_OBJS = base_logger.o buffered_writer.o file_writer.o level_logger.o notifier.o stream_writer.o utils.o
+IRIS_TEST_OBJS = $(IRIS_OBJS) test_lfringbuffer.o
+IRIS_TEST2_OBJS = $(IRIS_OBJS) test2.o
+#executable
+PROGRAM = libiris.so
+#PROGRAM = main
+#compiler
+CC = g++
+
+CACHELINE_SIZE =
+OS := $(shell uname)
+ifeq ($(OS), Darwin)
+CACHELINE_SIZE = $(shell sysctl -n hw.cachelinesize)
+else
+CACHELINE_SIZE = $(shell getconf LEVEL1_DCACHE_LINESIZE)
+endif
+
+#includes
+INCLUDE = -Iinclude
+#linker params
+LINKPARAMS = -fpic -lpthread -shared
+#linker params for tests
+LINKPARAMS_TEST = -fpic -lpthread
+#options for development
+CFLAGS = --std=c++11 -g -O3 -Wall  -fpic -DIRIS_CACHELINE_SIZE=$(CACHELINE_SIZE)  # removed flags -Werror -Wno-unused-private-field
+#options for release
+#CFLAGS = --std=c++11 -g -O2 -Wall -Werror -fpic -shared
+
+$(PROGRAM): $(IRIS_OBJS)
+       $(CC) -o $(PROGRAM) $(IRIS_OBJS) $(CFLAGS) $(LINKPARAMS)
+
+base_logger.o: src/base_logger.cpp include/base_logger.h
+       $(CC) -c src/base_logger.cpp -o base_logger.o $(CFLAGS) $(INCLUDE)
+
+buffered_writer.o: src/buffered_writer.cpp include/buffered_writer.h
+       $(CC) -c src/buffered_writer.cpp -o buffered_writer.o $(CFLAGS) $(INCLUDE)
+
+file_writer.o: src/file_writer.cpp include/file_writer.h
+       $(CC) -c src/file_writer.cpp -o file_writer.o $(CFLAGS) $(INCLUDE)
+
+stream_writer.o: src/stream_writer.cpp include/stream_writer.h
+       $(CC) -c src/stream_writer.cpp -o stream_writer.o $(CFLAGS) $(INCLUDE)
+
+level_logger.o: src/level_logger.cpp include/level_logger.h
+       $(CC) -c src/level_logger.cpp -o level_logger.o $(CFLAGS) $(INCLUDE)
+
+utils.o: src/utils.cpp include/utils.h
+       $(CC) -c src/utils.cpp -o utils.o $(CFLAGS) $(INCLUDE)
+
+notifier.o: src/notifier.cpp include/notifier.h
+       $(CC) -c src/notifier.cpp -o notifier.o $(CFLAGS) $(INCLUDE)
+
+test: test_lfringbuffer
+
+test_lfringbuffer: $(IRIS_TEST_OBJS)
+       $(CC) -o test_lfringbuffer $(IRIS_TEST_OBJS) $(CFLAGS) $(LINKPARAMS_TEST)
+       $(CC) -o test2 $(IRIS_TEST2_OBJS) $(CFLAGS) $(LINKPARAMS_TEST)
+
+test_lfringbuffer.o: tests/test_lfringbuffer.cpp   src/main.cpp
+       $(CC) -c tests/test_lfringbuffer.cpp -o test_lfringbuffer.o $(CFLAGS) $(INCLUDE)
+       $(CC) -c src/main.cpp -o test2.o $(CFLAGS) $(INCLUDE)
+
+
+clean:
+       rm -rf *.o
+       rm -rf $(PROGRAM) test_lfringbuffer
+       rm -rf $(PROGRAM) test2
+
+install:
+       mkdir -p /usr/local/include/iris
+       cp include/* /usr/local/include/iris
+       cp $(PROGRAM) /usr/local/lib
+.PHONY: clean test
diff --git a/iris/README.md b/iris/README.md
new file mode 100644 (file)
index 0000000..dd1f2b0
--- /dev/null
@@ -0,0 +1,82 @@
+# Overview
+`iris` is asynchronous logging library designed to log messages with minimum overhead to the performance. The cost of logging a message is merely copying the arguments into a lockfree queue.
+
+# Design
+Under the hood, `iris` uses a background thread (logging thread) to offload logging from other threads.
+1. For every other threads, they keep a lockfree queue to buffer the messages.
+2. The logging thread periodically scans through all these queues to collect messages, formats them and writes them into log file.  
+
+A few highlights of the design of `iris` to achieve low latency:
+1. Buffering messages with thread local lockfree queue.
+2. Memory is managed by a thread local ringbuffer taking the advantage that logging is FIFO. Because the fact that logging is FIFO. This scales well in multithreaded environment. 
+3. Minimum context switches.
+
+# Usage
+The supported severity levels are:
+```c++
+enum severity_level {
+    TRACE,
+    DEBUG,
+    INFO,
+    WARN,
+    ERROR,
+    FATAL
+};
+```
+By default, `iris` logs messages to `stdout` and filters out severity level less than `INFO`.
+
+```c++
+#include <iris/level_logger.h>
+
+// this creates a logging thread, logs messages to stdout
+iris::level_logger g_log;
+
+int main (int argc, char const *argv[]) {
+    //configure thread level parameters, these should be done before any logging
+    // queue size
+    g_log.set_thread_queue_size(1024);
+    // ring buffer size
+    g_log.set_thread_ringbuf_size(10240);
+
+    g_log.info("Greetings from %s, bye %d\n", 'iris', 0);
+
+    //this tells logging thread to persist the data into file and waits
+    g_log.sync_and_close();
+
+    return 0;
+}
+```
+Using a `file_writer` to logs all messages into a file:
+```c++
+#include <iris/level_logger.h>
+#include <iris/file_writer.h>
+
+iris::file_writer writer("./log.txt");
+// this creates a logging thread
+iris::level_logger g_log(&writer, iris::TRACE);
+
+int main (int argc, char const *argv[]) {
+    //configure thread level parameters, these should be done before any logging
+    // queue size
+    g_log.set_thread_queue_size(1024);
+    // ring buffer size
+    g_log.set_thread_ringbuf_size(20480);
+    
+    g_log.info("Greetings from %s, bye %d\n", 'iris', 0);
+    
+    //this tells logging thread to persist the data into file and waits
+    g_log.sync_and_close();
+
+    return 0;
+}
+```
+
+# Building & Installation
+
+To build the library, simply clone the projet, go inside the `iris` direcotry and  run following commands.
+```shell
+make
+make test
+make install
+```
+To integrate `iris` into your program, link with `-liris` options.
\ No newline at end of file
diff --git a/iris/compile.sh b/iris/compile.sh
new file mode 100755 (executable)
index 0000000..75d763f
--- /dev/null
@@ -0,0 +1,5 @@
+#!/bin/bash
+make
+make test
+
+
diff --git a/iris/include/base_logger.h b/iris/include/base_logger.h
new file mode 100644 (file)
index 0000000..69cf1ff
--- /dev/null
@@ -0,0 +1,146 @@
+#ifndef IRIS_BASE_LOG_H
+#define IRIS_BASE_LOG_H
+
+#include <cmath>
+#include <cstdio>
+
+#include <thread>
+#include <vector>
+#include <memory>
+#include <atomic>
+#include <string>
+#include <utility>
+#include <string.h>
+
+#include "define.h"
+#include "writer.h"
+#include "lfringbuffer.h"
+#include "sslfqueue.h"
+#include "notifier.h"
+#include "utils.h"
+#include "formatter.h"
+
+namespace iris {
+
+struct thread_logqueue;
+struct thread_logqueue_holder;
+extern notifier ntfer;
+extern thread_logqueue_holder this_thread_logqueue;
+extern size_t thread_queue_size;
+extern size_t thread_ringbuf_size;
+
+struct thread_logqueue_holder {
+private:
+    thread_logqueue * q;
+public:
+    thread_logqueue_holder():q(nullptr){}
+    inline void assign(thread_logqueue * queue) {
+        q = queue;
+    }
+    inline thread_logqueue *get() {
+        return q;
+    }
+    inline void notify_and_quit() {
+        if (q) {
+            ntfer.notify(notifier::to_ntf_t((long)q, ntf_queue_deletion));
+            q = nullptr;
+        }
+    }
+    ~thread_logqueue_holder() {
+        notify_and_quit();
+    }
+};
+
+// Every thread which uses logging primitives owns one of this 
+// single-wrtiter-single-reader queue for buffering logs.
+// These thread_local queues form a singly-linked list which is
+// polled by a dedicated output thread to collect logs.
+struct thread_logqueue {
+public:
+    thread_logqueue(size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by normal threadss
+    thread_logqueue(thread_logqueue *head, size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by output thread
+    ~thread_logqueue();
+public:
+    sslfqueue<loglet_t>             q;
+    lfringbuffer                    rbuf;
+    std::atomic<thread_logqueue *>  next;
+    thread_logqueue               * head;
+    bool                            output_thread; // if current thread is the output thread
+};
+
+class base_logger {
+public:
+    base_logger(writer * pwriter = nullptr,
+                size_t scan_interval = 100,
+                size_t output_buffer_size = 102400,
+                size_t default_thread_ringbuf_size = 102400,
+                size_t default_thread_queue_size = 5000);
+
+    // set the thread local queue size
+    void set_thread_queue_size(size_t size) {
+        thread_queue_size = size;
+    }
+    // set the thread local ring buffer size
+    void set_thread_ringbuf_size(size_t size) {
+        thread_ringbuf_size = size;
+    }
+
+    template<typename Formatter, typename... Args>
+    void log(Args&&... args) {
+        thread_logqueue * this_queue = this_thread_logqueue.get();
+
+        if (iris_unlikely(!this_queue)) {
+            if (thread_queue_size == 0)
+                thread_queue_size = m_default_thread_queue_size;
+            if (thread_ringbuf_size == 0)
+                thread_ringbuf_size = m_default_thread_ringbuf_size;
+            this_queue = new thread_logqueue(&m_head, thread_queue_size, thread_ringbuf_size);
+            this_thread_logqueue.assign(this_queue);
+        }
+
+        typedef std::tuple<typename std::decay<Args>::type...> Args_t;
+        const size_t buffer_size = sizeof(formatter_type) + sizeof(Args_t); 
+        const size_t args_offset = sizeof(formatter_type);
+
+        loglet_t l;
+        while ((l.rbuf_alloc_size = this_queue->rbuf.acquire(buffer_size, l.rbuf_ptr)) == 0)
+            std::this_thread::yield();
+        *reinterpret_cast<formatter_type*>(l.rbuf_ptr) = &formatter_caller<Formatter, typename std::decay<Args>::type...>;
+        // inplace construct parameter pack
+        new (l.rbuf_ptr + args_offset) Args_t(std::forward<Args>(args)...);
+
+        // try to publish to output thread
+        if (iris_likely(this_queue->q.offer(l))) {
+            return;
+        }
+
+        // queue is full, notify the output thread
+        ntfer.notify(notifier::to_ntf_t(reinterpret_cast<long>(this_queue), ntf_msg));
+        // busy yielding until the log is buffered
+        do {
+            std::this_thread::yield();
+        } while (!this_queue->q.offer(l));
+    }
+
+    void sync_and_close() {
+        if (!m_stop.load(std::memory_order_acquire)) {
+            this_thread_logqueue.notify_and_quit();
+            m_stop.store(true, std::memory_order_release);
+            m_output_thread.join();
+        }
+    }
+
+    ~base_logger() {
+        sync_and_close();
+    }
+private:
+    std::atomic<bool> m_stop;
+    thread_logqueue m_head;
+    std::thread m_output_thread;
+    size_t m_default_thread_ringbuf_size;
+    size_t m_default_thread_queue_size;
+};
+
+}
+
+#endif
diff --git a/iris/include/buffered_writer.h b/iris/include/buffered_writer.h
new file mode 100644 (file)
index 0000000..1614c78
--- /dev/null
@@ -0,0 +1,67 @@
+#ifndef IRIS_BUFFERED_WRITER_H_
+#define IRIS_BUFFERED_WRITER_H_
+#include <string.h>
+#include <memory>
+
+#include "writer.h"
+
+namespace iris {
+
+struct buffered_writer {
+private:
+    buffered_writer(const buffered_writer & rhs)=delete;
+    struct buffered_writer & operator=(const buffered_writer &rhs)=delete;
+    writer     &m_w;
+
+    char       *m_buf;
+    int         m_pos;
+    int         m_capacity;
+public:
+    buffered_writer(writer & w, size_t cap = 0);
+
+    ~buffered_writer();
+
+    void expand(size_t n);
+
+    inline void reset() {
+        m_pos = 0;
+    }
+
+    inline char * reserve(size_t s) {
+        if (freespace() < s) {
+            flush();
+            if (freespace() < s)
+                throw std::bad_alloc();
+        }
+
+        return write_pointer();
+    }
+
+    inline size_t freespace() {
+        return m_capacity - m_pos;
+    }
+
+    inline void inc_write_pointer(size_t s) {
+        m_pos += s;
+    }
+
+    inline char * write_pointer() {
+        return m_buf + m_pos;
+    }
+
+    inline size_t size() {
+        return m_pos;
+    }
+
+    inline operator char*() {
+        return m_buf;
+    }
+
+    inline void flush() {
+        m_w.write(m_buf, m_pos);
+        reset();
+    }
+};
+
+}
+#endif
diff --git a/iris/include/define.h b/iris/include/define.h
new file mode 100644 (file)
index 0000000..8ee1cf8
--- /dev/null
@@ -0,0 +1,33 @@
+#ifndef IRIS_DEFINE_H_
+#define IRIS_DEFINE_H_
+#include <sys/uio.h>
+
+namespace iris {
+
+#define iris_likely(x) __builtin_expect(!!(long)(x), 1)
+#define iris_unlikely(x) __builtin_expect(!!(long)(x), 0)
+
+struct loglet_t {
+    // @pbuffer holds formatter function address and arguments
+    char                                  * rbuf_ptr;
+    size_t                                  rbuf_alloc_size;
+    loglet_t(char * ptr = nullptr, size_t alloc_size = 0):rbuf_ptr(ptr), rbuf_alloc_size(alloc_size) {}
+};
+
+#if defined(IOV_MAX) /* Linux x86 (glibc-2.3.6-3) */
+    #define MAX_IOVECS IOV_MAX
+#elif defined(MAX_IOVEC) /* Linux ia64 (glibc-2.3.3-98.28) */
+    #define MAX_IOVECS MAX_IOVEC
+#elif defined(UIO_MAXIOV) /* Linux x86 (glibc-2.2.5-233) */
+    #define MAX_IOVECS UIO_MAXIOV
+#elif (defined(__FreeBSD__) && __FreeBSD_version < 500000) || defined(__DragonFly__) || defined(__APPLE__) 
+    /* - FreeBSD 4.x
+     * - MacOS X 10.3.x
+     *   (covered in -DKERNEL)
+     *  */
+    #define MAX_IOVECS 1024
+#else
+    #error "can't deduce the maximum number of iovec in a readv/writev syscall"
+#endif
+}
+#endif
diff --git a/iris/include/file_writer.h b/iris/include/file_writer.h
new file mode 100644 (file)
index 0000000..d59fe8e
--- /dev/null
@@ -0,0 +1,24 @@
+#ifndef IRIS_FILE_WRITER_H_
+#define IRIS_FILE_WRITER_H_
+
+
+#include <cstdio>
+#include <string>
+
+#include "stream_writer.h"
+
+namespace iris {
+
+// A log writer that appends logs to a file.
+class file_writer: public stream_writer {
+public:
+    file_writer(const char * filename);
+    ~file_writer();
+    virtual void write(const char * buffer, size_t len);
+private:
+    std::string m_filename;
+};
+
+}
+
+#endif
\ No newline at end of file
diff --git a/iris/include/formatter.h b/iris/include/formatter.h
new file mode 100644 (file)
index 0000000..198a274
--- /dev/null
@@ -0,0 +1,30 @@
+#ifndef IRIS_FORMATTER_H_
+#define IRIS_FORMATTER_H_
+#include "define.h"
+#include "buffered_writer.h"
+#include "utils.h"
+
+namespace iris {
+// fomatter do the real formatting logic, output into @obuf
+// return false if there is not enough space in @obuf.
+typedef void (*formatter_type)(const loglet_t & l, buffered_writer & w);
+
+template<typename Formatter, typename...Args, std::size_t...Indexes>
+static void call_format(buffered_writer & w, const std::tuple<Args...> & args, seq<Indexes...>) {
+    return Formatter::format(&w, std::move(std::get<Indexes>(args))...);
+}
+
+template<typename Formatter, typename... Args>
+static void formatter_caller(const loglet_t & l, buffered_writer & w) {
+    const size_t args_offset = sizeof(formatter_type);
+
+    typedef std::tuple<Args...> Args_t;
+    Args_t & args = *reinterpret_cast<Args_t*>(l.rbuf_ptr + args_offset);
+    typename make_sequence<sizeof...(Args)>::type indexes;
+    call_format<Formatter>(w, args, indexes);
+    //deconstruct parameter pack
+    args.~Args_t();
+}
+
+}
+#endif
\ No newline at end of file
diff --git a/iris/include/level_logger.h b/iris/include/level_logger.h
new file mode 100644 (file)
index 0000000..59dff46
--- /dev/null
@@ -0,0 +1,83 @@
+#ifndef IRIS_SEVERITY_LOGGER_H_
+#define IRIS_SEVERITY_LOGGER_H_
+
+#include "snprintf_formatter.h"
+#include "base_logger.h"
+
+namespace iris {
+
+enum severity_level {
+    UNSET,
+    TRACE,
+    DEBUG,
+    INFO,
+    WARN,
+    ERROR,
+    FATAL
+};
+extern severity_level thread_severity_level;
+
+// a level logger provides per-thread severity level filtering .
+class level_logger: public base_logger {
+public:
+    level_logger(writer * pwriter = nullptr,
+                severity_level default_level = INFO,
+                size_t scan_interval = 100,
+                size_t output_buffer_size = 102400,
+                size_t default_thread_ringbuf_size = 102400,
+                size_t default_thread_queue_size = 5000):
+        base_logger(pwriter, scan_interval, 
+                    output_buffer_size, 
+                    default_thread_ringbuf_size,
+                    default_thread_queue_size),
+        m_default_level(default_level) {}
+
+    // set the severity level for the current thread
+    void set_severity_level(severity_level level) {
+        thread_severity_level = level;
+    }
+
+    template<typename... Args>
+    void log(severity_level level, const char * fmt, Args&&... args) {
+        if (iris_unlikely(thread_severity_level == UNSET))
+            thread_severity_level = m_default_level;
+        if (thread_severity_level <= level) {
+            base_logger::log<snprintf_formatter>(fmt, std::forward<Args>(args)...);    
+        }
+    }
+
+    template<typename... Args>
+    void trace(const char * fmt, Args&&... args) {
+        log(TRACE, fmt, std::forward<Args>(args)...);
+    }
+
+    template<typename... Args>
+    void debug(const char * fmt, Args&&... args) {
+        log(DEBUG, fmt, std::forward<Args>(args)...);
+    }
+
+    template<typename... Args>
+    void info(const char * fmt, Args&&... args) {
+        log(INFO, fmt, std::forward<Args>(args)...);
+    }
+
+    template<typename... Args>
+    void warn(const char * fmt, Args&&... args) {
+        log(WARN, fmt, std::forward<Args>(args)...);
+    }
+
+    template<typename... Args>
+    void error(const char * fmt, Args&&... args) {
+        log(ERROR, fmt, std::forward<Args>(args)...);
+    }
+
+    template<typename... Args>
+    void fatal(const char * fmt, Args&&... args) {
+        log(FATAL, fmt, std::forward<Args>(args)...);
+    }
+private:
+    severity_level m_default_level;
+};
+
+}
+#endif
diff --git a/iris/include/lfringbuffer.h b/iris/include/lfringbuffer.h
new file mode 100644 (file)
index 0000000..fb6d922
--- /dev/null
@@ -0,0 +1,103 @@
+#ifndef IRIS_LF_RINGBUFFER_H_
+#define IRIS_LF_RINGBUFFER_H_
+#include <atomic>
+
+#include "define.h"
+#include "utils.h"
+#include <queue>
+#include <assert.h>
+namespace iris {
+
+// A single reader single writer lockfree ring buffer
+// this should be used as following:
+// A thread calls acquire() method repeatedly 
+// until the return value is nonzero to allocate @size bytes of memory.
+// And release() method should be called in the order of allocation 
+// with the same @size parameter to declare the release.
+class lfringbuffer {
+private:
+    size_t                      cap;
+    size_t                      mask;
+    char                     *  buffer;
+    char                        __pad0__[IRIS_CACHELINE_SIZE - sizeof(size_t) * 2 - sizeof(char*)];
+    unsigned long               head;
+    unsigned long               tail_cache;
+    char                        __pad1__[IRIS_CACHELINE_SIZE - sizeof(unsigned long) * 2];
+    std::atomic<unsigned long>  tail;
+    char                        __pad2__[IRIS_CACHELINE_SIZE - sizeof(std::atomic<unsigned long>)];
+public:
+    lfringbuffer(size_t size) {
+        cap = round_up_to_next_multiple_of_2(size);
+        mask = cap - 1;
+        buffer = new char[cap];
+        head = 0;
+        tail = cap;
+        tail_cache = cap;
+    }
+
+    ~lfringbuffer() {
+        delete[] buffer;
+        buffer = nullptr;
+    }
+
+    // acquire() tries to allocate @size bytes of memory
+    // from the buffer, the address of the memory acquired
+    // is stored in @ptr.
+    // returns the actual bytes allocated insided the buffer 
+    // which might be larger than the requested size, since
+    // internally the buffer is implemented by a array and 
+    // there is unusable memory need to be taken into account
+    // when the request can not be fulfilled using only the memory
+    // left at the end of the buffer.
+    // 0 will be returned if there is not enough free space.
+    size_t acquire(size_t size, char *& ptr) {
+        assert(size);
+        size_t total_free = tail_cache - head;
+        if (iris_unlikely(total_free < size)) {
+            // load the latest tail
+            tail_cache = tail.load(std::memory_order_acquire);
+            total_free = tail_cache - head;
+            if (total_free < size)
+                return 0; // no enough freespace
+        }
+
+        // now check if there is enough memory at the rear
+        size_t rear_free = cap - (head & mask);
+        if (iris_likely(rear_free >= size)) {
+            // ok, take the rear chunk
+            ptr = buffer + (head & mask);
+            head += size;
+            return size;
+        }
+
+        // now check if there is enough memory at the front
+        size_t front_free = tail_cache & mask;
+        if (front_free < size) {
+            tail_cache = tail.load(std::memory_order_acquire);
+            front_free = tail_cache & mask;
+            if (front_free < size)
+                return 0;// no enough space
+        }
+
+        ptr = buffer;
+
+        // throw away the rear fragmentation
+        head += rear_free + size;
+
+        return rear_free + size;
+    }
+
+    // release() method releases @size bytes of memory allocated from
+    // acquire() method. @size should be the same as the one retuned from
+    // acquire().
+    void   release(size_t size) {
+        tail.fetch_add(size, std::memory_order_release);
+    }
+
+    size_t freespace() {
+        return tail.load(std::memory_order_acquire) - head;
+    }
+};
+
+}
+#endif
\ No newline at end of file
diff --git a/iris/include/notifier.h b/iris/include/notifier.h
new file mode 100644 (file)
index 0000000..89ce3fc
--- /dev/null
@@ -0,0 +1,41 @@
+#ifndef IRIS_NOTIFIER_H_
+#define IRIS_NOTIFIER_H_
+#include <vector>
+
+namespace iris {
+
+// Bitmask at the lowest bits of a long type
+// representing types of notification.
+enum ntf_type{
+    ntf_msg = 1,           // first bit for message
+    ntf_queue_deletion = 2, // second bit for queue deletion
+    ntf_type_mask = 3
+};
+
+typedef long ntf_t;
+
+class notifier {
+public:
+    notifier();
+    ~notifier();
+    // send a notification to the receiver
+    void notify(const ntf_t & ntf);
+    // wait for notifications
+    // return false if timed out or on error
+    bool wait(long timeout, std::vector<ntf_t> & ntfs);
+    static inline ntf_t to_ntf_t(long data, ntf_type type) {
+        return data | (int)type;
+    }
+    static inline long to_data_t(ntf_t ntf) {
+        return ntf & ~ntf_type_mask;
+    }
+    static inline ntf_type to_ntf_type(ntf_t ntf) {
+        return static_cast<ntf_type>(ntf & ntf_type_mask);
+    }
+private:
+    int  m_pipe[2];
+    long m_poll_time;
+};
+
+}
+#endif
\ No newline at end of file
diff --git a/iris/include/snprintf_formatter.h b/iris/include/snprintf_formatter.h
new file mode 100644 (file)
index 0000000..5dfdcbc
--- /dev/null
@@ -0,0 +1,42 @@
+#ifndef IRIS_SNPRINTF_FORMATTER_H_
+#define IRIS_SNPRINTF_FORMATTER_H_
+
+#include <cstdio>
+#include <utility>
+
+#include <assert.h>
+
+#include "formatter.h"
+#include "utils.h"
+#include "define.h"
+#include "buffered_writer.h"
+
+namespace iris {
+class snprintf_formatter {
+public:
+
+template<typename... Args>
+static void format(buffered_writer * bw, const char * fmt, Args&&... args) {
+    size_t n;
+    while (true) {
+        n = std::snprintf(bw->write_pointer(), bw->freespace(), fmt, args...);
+        if (iris_unlikely(n < 0)) {
+            return;//skip over this formatting if error occurred
+        }
+        if (iris_unlikely(n >= bw->freespace())) {
+            bw->flush(); // flush buffer and retried
+            continue;
+        }
+        bw->inc_write_pointer(n);
+        assert(*bw->write_pointer() == 0);
+        *bw->write_pointer() = '\n';// replace '\0' with '\n'
+        bw->inc_write_pointer(1);
+        break; // TODO what to do with failed formatting?
+    }
+
+}
+
+    
+};
+}
+#endif
\ No newline at end of file
diff --git a/iris/include/sslfqueue.h b/iris/include/sslfqueue.h
new file mode 100644 (file)
index 0000000..d27fb35
--- /dev/null
@@ -0,0 +1,103 @@
+#ifndef IRIS_SSLFQUEUE_H_
+#define IRIS_SSLFQUEUE_H_
+#include <stdint.h>
+#include <atomic>
+#include <vector>
+#include "utils.h"
+
+namespace iris {
+
+//a single producer single consumer lockfree ring queue.
+template<typename T>
+class sslfqueue {
+private:
+       size_t              cap;
+       size_t              mask;
+    T *                 buffer;
+       char                _pad0_[IRIS_CACHELINE_SIZE - sizeof(T*) - sizeof(int) * 2];
+    std::atomic<long>   head;
+       char                _pad1_[IRIS_CACHELINE_SIZE - sizeof(std::atomic<long>)];
+       long                    tail_cache;
+       char                _pad2_[IRIS_CACHELINE_SIZE - sizeof(long)];
+    std::atomic<long>   tail;
+       char                _pad3_[IRIS_CACHELINE_SIZE - sizeof(std::atomic<long>)];
+       long                    head_cache;
+       char                _pad4_[IRIS_CACHELINE_SIZE - sizeof(long)];
+
+public:
+
+    sslfqueue(size_t capacity = 5000):head(0), tail_cache(0), tail(0), head_cache(0)
+    {
+       cap = round_up_to_next_multiple_of_2(capacity);
+       mask = cap - 1;
+               buffer = new T[cap];
+       }
+
+       ~sslfqueue(){
+               delete []buffer;
+       }
+
+    bool offer(const T & e) {
+        const long cur_tail = tail.load(std::memory_order_acquire);
+        const long len = cur_tail - cap;
+        if (head_cache <= len) {
+                       head_cache = head.load(std::memory_order_acquire);
+                       if (head_cache <= len)
+                               return false; // queue is full
+        }
+
+        buffer[cur_tail & mask] = e;
+        tail.store(cur_tail + 1, std::memory_order_release);
+
+        return true;
+    }
+
+    bool poll(T & e) {
+        const long cur_head = head.load(std::memory_order_acquire);
+        if (cur_head >= tail_cache) {
+                       tail_cache = tail.load(std::memory_order_acquire);
+                       if (cur_head >= tail_cache)
+                               return false; // queue is empty
+        }
+
+        e = buffer[cur_head & mask];
+        head.store(cur_head + 1, std::memory_order_release);
+
+        return true;
+    }
+
+    bool batch_poll(std::vector<T> & vec) {
+        long cur_head = head.load(std::memory_order_acquire);
+        if (cur_head >= tail_cache) {
+            tail_cache = tail.load(std::memory_order_acquire);
+            if (cur_head >= tail_cache)
+                return false; // queue is empty
+        }
+
+        while (cur_head < tail_cache) {
+            vec.push_back(buffer[cur_head & mask]);
+            ++cur_head;
+        }
+
+        head.store(cur_head, std::memory_order_release);
+
+        return true;
+    }
+
+    bool empty() {
+        return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == 0;
+    }
+
+    bool full() {
+        return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == cap;
+    }
+
+    size_t size() {
+        return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire);
+    }
+};
+
+}
+#endif
+
+
diff --git a/iris/include/stream_writer.h b/iris/include/stream_writer.h
new file mode 100644 (file)
index 0000000..df1888b
--- /dev/null
@@ -0,0 +1,25 @@
+#ifndef IRIS_STREAM_WRITER_H_
+#define IRIS_STREAM_WRITER_H_
+
+
+#include <cstdio>
+#include <string>
+
+#include "writer.h"
+
+namespace iris {
+
+// A log writer that appends logs to a standard stream.
+class stream_writer: public writer {
+public:
+    stream_writer(FILE * stream);
+    virtual void write(const char * msg);
+    virtual void write(const char * buffer, size_t len);
+protected:
+    FILE *      m_stream;
+    int         m_fd;
+};
+
+}
+
+#endif
\ No newline at end of file
diff --git a/iris/include/utils.h b/iris/include/utils.h
new file mode 100644 (file)
index 0000000..e998c8a
--- /dev/null
@@ -0,0 +1,26 @@
+#ifndef IRIS_UTILS_H_
+#define IRIS_UTILS_H_
+#include <sys/time.h>
+
+#include <cstddef>
+namespace iris{
+
+long long get_current_time_in_us();
+
+
+int round_up_to_next_multiple_of_2(int n);
+
+/* utilitis for making sequence for tuple element retrieval */
+template<size_t ...> struct seq {};
+template<size_t idx, std::size_t N, std::size_t... S> struct seq_helper: seq_helper<idx + 1, N, S..., idx> {};
+template<size_t N, size_t ...S> struct seq_helper<N, N, S...> {
+    typedef seq<S...> type;
+};
+template<size_t N>
+struct make_sequence {
+    typedef typename seq_helper<0, N>::type type;
+};
+
+
+}
+#endif
\ No newline at end of file
diff --git a/iris/include/writer.h b/iris/include/writer.h
new file mode 100644 (file)
index 0000000..d32f20b
--- /dev/null
@@ -0,0 +1,18 @@
+#ifndef IRIS_WRITER_H_
+#define IRIS_WRITER_H_
+
+#include <sys/uio.h>
+
+#include <vector>
+namespace iris {
+
+// interface for log writting policy
+class writer {
+public:
+    virtual void write(const char * msg) = 0;
+    virtual void write(const char * buffer, size_t len) = 0;
+};
+
+}
+
+#endif
\ No newline at end of file
diff --git a/iris/run.sh b/iris/run.sh
new file mode 100755 (executable)
index 0000000..ff6f270
--- /dev/null
@@ -0,0 +1,6 @@
+#!/bin/bash
+./test_lfringbuffer
+./test2
+rm log.txt
+
+
diff --git a/iris/src/base_logger.cpp b/iris/src/base_logger.cpp
new file mode 100644 (file)
index 0000000..a1a43ca
--- /dev/null
@@ -0,0 +1,137 @@
+#include <cstdio>
+#include <cstdlib>
+#include <cmath>
+
+#include <memory>
+#include <base_logger.h>
+#include <stream_writer.h>
+
+using namespace iris;
+
+namespace iris{
+
+static stream_writer stdout_writer(stdout);
+
+thread_logqueue_holder this_thread_logqueue; 
+notifier ntfer;
+size_t thread_queue_size;
+size_t thread_ringbuf_size;
+
+}
+
+thread_logqueue::thread_logqueue(size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), output_thread(true)
+    {}
+
+thread_logqueue::thread_logqueue(thread_logqueue *head, size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), head(head), output_thread(false){
+    thread_logqueue * p = nullptr;
+
+    do {
+        // insert self into the head of the global linked list lockfreely
+        p = head->next.load(std::memory_order_acquire);
+        this->next.store(p, std::memory_order_release);
+        // head might have been modified or deleted cas until this is inserted
+        if (head->next.compare_exchange_weak(p, this, std::memory_order_release)) {
+            return;
+        }
+    } while (true);
+}
+
+thread_logqueue::~thread_logqueue() {
+    if (output_thread)
+        return;
+    thread_logqueue * p = nullptr, *pnext = nullptr, *q = this;
+
+    // remove self from the global linked list lockfreely
+    p = head;
+    while (p->next.load(std::memory_order_acquire) != q)
+        p = p->next.load(std::memory_order_acquire);
+
+    pnext = this->next.load(std::memory_order_acquire);
+    // mark this as deleted(by setting this->next to nullptr)
+    while (!this->next.compare_exchange_weak(pnext, nullptr, std::memory_order_release)) {
+        next = this->next.load(std::memory_order_acquire);
+    }
+
+    do {
+        if (p->next.compare_exchange_weak(q, pnext, std::memory_order_release)) {
+            return;
+        }
+        // some other nodes have been inserted after p, restart.
+        p = head;
+        while (p->next.load(std::memory_order_acquire) != q)
+            p = p->next.load(std::memory_order_acquire);
+    } while(true);
+}
+
+static void do_format_and_flush(thread_logqueue *p, std::vector<loglet_t> & logs, buffered_writer & w) {
+    for (size_t i = 0; i < logs.size(); ++i) {
+        auto f = *reinterpret_cast<formatter_type*>(logs[i].rbuf_ptr);
+        (*f)(logs[i], w);
+        p->rbuf.release(logs[i].rbuf_alloc_size);
+    }
+    logs.clear();
+}
+
+static void iris_thread(writer * pwriter, std::atomic<bool> * stop, size_t scan_interval,size_t output_buffer_size, thread_logqueue * head) {
+    std::vector<loglet_t> logs;
+    std::vector<ntf_t>    ntfs;
+    buffered_writer bw(*pwriter, output_buffer_size);
+    while(!*stop) {
+        // iterate through the linked list of input queues, collect log entries
+        thread_logqueue * p = head;
+        bool empty = true;
+        while (p) {
+            if (p->q.batch_poll(logs)) {
+                do_format_and_flush(p, logs, bw);
+                empty = false;
+            }
+            p = p->next;
+        }
+
+        if (empty) {
+            ntfs.clear();
+            // wait for notification or 100ms
+            ntfer.wait(scan_interval, ntfs);
+            for (size_t i = 0; i < ntfs.size(); ++i) {
+                ntf_t ntf = ntfs[i];
+                p = reinterpret_cast<thread_logqueue *>(notifier::to_data_t(ntf));
+                switch(notifier::to_ntf_type(ntf)) {
+                    case ntf_msg:
+                        if (p->q.batch_poll(logs))
+                            do_format_and_flush(p, logs, bw);
+                    break;
+                    case ntf_queue_deletion:
+                        if (p->q.batch_poll(logs))
+                            do_format_and_flush(p, logs, bw);
+                        delete p;
+                    break;
+                    default:
+                    break;
+                }
+            }
+        }
+    }
+
+    //collect one more time
+    thread_logqueue * p = head;
+    while (p) {
+        if (p->q.batch_poll(logs))
+            do_format_and_flush(p, logs, bw);
+        p = p->next;
+    }
+    bw.flush();
+}
+
+base_logger::base_logger(writer * pwriter,
+                size_t scan_interval,
+                size_t output_buffer_size,
+                size_t default_thread_ringbuf_size,
+                size_t default_thread_queue_size):
+    m_stop(0),
+    m_default_thread_ringbuf_size(default_thread_ringbuf_size),
+    m_default_thread_queue_size(default_thread_queue_size)
+{
+    if (pwriter == nullptr)
+        pwriter = &stdout_writer;
+    m_output_thread = std::thread(iris_thread, pwriter, &m_stop, scan_interval, output_buffer_size, &m_head);
+}
diff --git a/iris/src/buffered_writer.cpp b/iris/src/buffered_writer.cpp
new file mode 100644 (file)
index 0000000..6713a7e
--- /dev/null
@@ -0,0 +1,27 @@
+#include <buffered_writer.h>
+
+using namespace iris;
+
+buffered_writer::buffered_writer(writer & wter, size_t cap): m_w(wter), m_buf(nullptr), m_pos(0), m_capacity(cap) {
+    if (m_capacity) {
+        m_buf = new char[m_capacity];
+    }
+}
+
+buffered_writer::~buffered_writer() {
+    if (m_buf) {
+        delete[] m_buf;
+        m_buf = nullptr;
+        m_pos = m_capacity = 0;
+    }
+}
+
+void buffered_writer::expand(size_t n) {
+    if (n <= m_capacity) 
+        return;
+    char *tmp_buf = new char[n];
+    memcpy(tmp_buf, this->m_buf, this->size());
+    delete[] this->m_buf;
+    this->m_buf = tmp_buf;
+    this->m_capacity = n;
+}
\ No newline at end of file
diff --git a/iris/src/file_writer.cpp b/iris/src/file_writer.cpp
new file mode 100644 (file)
index 0000000..b322d5c
--- /dev/null
@@ -0,0 +1,43 @@
+#include <file_writer.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+
+using namespace iris;
+
+file_writer::file_writer(const char * filename): stream_writer(nullptr) {
+    FILE *fp = fopen(filename, "a");
+    if (fp == nullptr) {
+        std::string error_string = "[iris] failed to open log file <";
+        error_string += filename;
+        error_string += ">, reason: ";
+        error_string += strerror(errno);
+        throw error_string;
+    }
+    m_stream = fp;
+    m_fd = fileno(m_stream);
+}
+
+file_writer::~file_writer() {
+    if (m_stream == nullptr)
+        return;
+    fclose(m_stream);
+    m_stream = nullptr;
+    m_fd = -1;
+}
+
+void file_writer::write(const char * buffer, size_t len) {
+    size_t offset = 0;
+    while (len) {
+        size_t written = ::write(m_fd, buffer + offset, len);
+        if (written <= 0) {
+            if (errno == EINTR)
+                continue;
+            fprintf(stderr, "[iris] error, should write %lu byts, only %lu bytes written, reason: %s.\n", len, written, strerror(errno));
+            break;
+        }   
+        len -= written;
+        offset += written;
+    }   
+}
diff --git a/iris/src/level_logger.cpp b/iris/src/level_logger.cpp
new file mode 100644 (file)
index 0000000..102bfed
--- /dev/null
@@ -0,0 +1,5 @@
+#include <level_logger.h>
+
+namespace iris {
+severity_level thread_severity_level;
+}
\ No newline at end of file
diff --git a/iris/src/main.cpp b/iris/src/main.cpp
new file mode 100644 (file)
index 0000000..3771525
--- /dev/null
@@ -0,0 +1,86 @@
+#include <unistd.h>
+
+
+#include <cstdio>
+#include <chrono>
+#include <limits>
+#include <cmath>
+
+#include <level_logger.h>
+#include <file_writer.h>
+
+#include <map>
+#include <thread>
+#include <vector>
+/*
+using log_t = reckless::severity_log<
+    reckless::indent<4>,       // 4 spaces of indent
+    ' '                       // Field separator
+    >;
+reckless::file_writer rwriter("log_reckless.txt");
+log_t r_log(&rwriter, 102400, 65534, 102400);
+*/
+
+iris::file_writer writer("./log.txt");
+iris::level_logger g_log(&writer, iris::INFO);
+int freq_map[50000000];
+
+long long g_max_lat;
+long long g_min_lat;
+#define ITERATIONS 1e3
+
+void worker_thread() {
+    std::hash<std::thread::id> hasher;
+    long long max_lat = std::numeric_limits<long long>::lowest(), min_lat = std::numeric_limits<long long>::max(), avg_lat = 0, sum = 0;
+    size_t tid = hasher(std::this_thread::get_id());
+    g_log.set_thread_queue_size(65534);
+    g_log.set_thread_ringbuf_size(655340);
+    auto start = std::chrono::high_resolution_clock::now();
+    for (int i = 1; i <= ITERATIONS; ++i) {
+        auto start = std::chrono::high_resolution_clock::now();
+        g_log.info("hello %s, world %d, there %d, here %d, idx: %d", "dsads", 231, 0, 0, i);
+        auto finish = std::chrono::high_resolution_clock::now();
+        long long lat = std::chrono::duration_cast<std::chrono::nanoseconds>(finish-start).count();
+        //sum += lat;
+        max_lat = std::max(max_lat, lat);
+        min_lat = std::min(min_lat, lat);
+        g_max_lat = std::max(max_lat, g_max_lat);
+        g_min_lat = std::min(min_lat, g_min_lat);
+        //avg_lat = sum / i;
+        if (lat < 50000000)
+            freq_map[lat]++;
+        //if (i % 10000 == 0)
+        //    usleep(1000);
+    }
+    auto finish = std::chrono::high_resolution_clock::now();
+    long long lat = std::chrono::duration_cast<std::chrono::nanoseconds>(finish-start).count();
+    printf("thread_id: %lu max_lat: %lldns, min_lat: %lldns, avg_lat: %lldns, latency sum %lldns\n", tid, 0ll, min_lat, (long long)lat / (long long)ITERATIONS, lat);
+}
+int main(int argc, char const *argv[]) {
+    g_max_lat = 0;
+    g_min_lat = std::numeric_limits<int>::max();
+    int n = 10;
+    if (argc > 1) {
+        n = std::stoi(argv[1]);
+    }
+    std::vector<std::thread> workers;
+
+    for (int i = 0; i < n; ++i) {
+        workers.push_back(std::thread(worker_thread));
+    }
+    for (int i = 0; i < n; ++i) {
+        workers[i].join();
+    }
+    printf("\nlatency,count\n");
+    g_max_lat = std::min(50000000 - 1, (int)g_max_lat);
+    for (int i = g_min_lat; i <= 400; ++i) {
+        if (freq_map[i])
+            printf("%d,%d\n", i, freq_map[i]);
+    }
+    //printf("\nno,latency\n");
+    //for (size_t i = 0; i < lats.size(); ++i) {
+    //    printf("%zu,%d\n", i + 1, lats[i]);
+    //}
+    //g_log.sync_and_close();
+    return 0;
+}
diff --git a/iris/src/notifier.cpp b/iris/src/notifier.cpp
new file mode 100644 (file)
index 0000000..5d93185
--- /dev/null
@@ -0,0 +1,79 @@
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+#include <poll.h>
+
+#include <string>
+#include <system_error>
+
+#include <notifier.h>
+
+using namespace iris;
+
+notifier::notifier() {
+    if (::pipe(m_pipe)) {
+        std::string error_str = "failed to call pipe(), reason:";
+        error_str += strerror(errno);
+        throw std::system_error(std::error_code(errno, std::system_category()), error_str);
+    }   
+    if (fcntl(m_pipe[1], F_SETFL, O_NONBLOCK)) {
+        std::string error_str = "failed to set piep's write side to nonblock mode, reason:";
+        error_str += strerror(errno);
+        throw std::system_error(std::error_code(errno, std::system_category()), error_str);
+    }
+}
+
+notifier::~notifier() {
+    close(m_pipe[0]);
+    close(m_pipe[1]);
+}
+
+void notifier::notify(const ntf_t & ntf) {
+    size_t written = 0;
+    while (written < sizeof(ntf_t)) {
+        int n = write(m_pipe[1], (char *)&ntf, sizeof(ntf_t) - written);
+        if (n <= 0) {
+            if (errno == EINTR)
+                continue;
+            break;
+        }
+        written += n;
+    }
+}
+
+bool notifier::wait(long timeout, std::vector<ntf_t> & ntfs) {
+    struct pollfd pfd;
+    pfd.fd = m_pipe[0];
+    pfd.events = POLLIN;
+
+    if(poll(&pfd, 1, timeout) == 1) {
+        int flags = fcntl(m_pipe[0], F_GETFL, 0);
+        flags |= O_NONBLOCK;
+        fcntl(m_pipe[0], F_SETFL, flags);
+        while (true) {
+            ntf_t ntf;
+            size_t readn = 0;
+            while (readn < sizeof(ntf_t)) {
+                int n = read(m_pipe[0], &ntf, sizeof(ntf_t) - readn);
+                if (n <= 0) {
+                    if (errno == EINTR)
+                        continue;
+                    break;
+                }
+                readn += n;
+            }
+            if (readn >= sizeof(ntf_t)) {
+                ntfs.push_back(ntf);
+            } else {
+                break;
+            }
+        }
+        flags &= ~O_NONBLOCK;
+        fcntl(m_pipe[0], F_SETFL, flags);
+
+        return true;
+    }
+
+    return false;
+}
\ No newline at end of file
diff --git a/iris/src/stream_writer.cpp b/iris/src/stream_writer.cpp
new file mode 100644 (file)
index 0000000..1be37d2
--- /dev/null
@@ -0,0 +1,34 @@
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <define.h>
+#include <stream_writer.h>
+using namespace iris;
+
+stream_writer::stream_writer(FILE * stream): m_stream(stream), m_fd(-1) {
+    if (stream)
+        m_fd = fileno(m_stream);
+}
+
+void stream_writer::write(const char * msg) {
+    size_t len = strlen(msg);
+    this->write(msg, len);
+}
+
+void stream_writer::write(const char * buffer, size_t len){
+    size_t written = 0;
+    while (written < len) {
+        size_t n = fwrite(buffer + written, 1, len - written, m_stream);
+        written += n;
+        if (n < len) {
+            if (errno == EINTR) {
+                continue;
+            }
+            fprintf(stderr, "[iris] error, should write %lu byts, only %lu bytes written, reason: %s.\n", len, n, strerror(errno));
+            break;
+        }
+    }
+    fflush(m_stream);
+}
diff --git a/iris/src/utils.cpp b/iris/src/utils.cpp
new file mode 100644 (file)
index 0000000..b152761
--- /dev/null
@@ -0,0 +1,22 @@
+#include <utils.h>
+
+long long iris::get_current_time_in_us() {
+    struct timeval v;
+    gettimeofday(&v, nullptr);
+    return (long long)v.tv_sec * 1000000 + v.tv_usec;
+}
+
+
+int iris::round_up_to_next_multiple_of_2(int n) {
+    int bits = 0, ones = 0, oldn = n;
+    while(n) {
+        if (n & 1)
+            ++ones;
+        ++bits;
+        n >>= 1;
+    }
+
+    if (ones == 1) // already a multiple of 2
+        return oldn;
+    return 1 << bits;
+}
\ No newline at end of file
diff --git a/iris/tests/test_lfringbuffer.cpp b/iris/tests/test_lfringbuffer.cpp
new file mode 100644 (file)
index 0000000..16e7f37
--- /dev/null
@@ -0,0 +1,116 @@
+#include <thread>
+#include <string>
+#include<cstring>
+#include <assert.h>
+
+#include <lfringbuffer.h>
+#include <sslfqueue.h>
+#include <define.h>
+
+#include <iris/level_logger.h>
+#include <iris/file_writer.h>
+
+iris::file_writer writer("./log.txt");
+// this creates a logging thread
+iris::level_logger g_log(&writer, iris::TRACE);
+
+
+using namespace iris;
+#define ITERATIONS (int)1e7
+lfringbuffer rbuf(1024);
+struct buffer_t {
+    char * b;
+    int    size;
+    int    alloc_size;
+    int    data;
+};
+sslfqueue<buffer_t> q;
+
+void recyle() {
+    int i = 1;
+    while (i <= ITERATIONS) {
+        buffer_t b;
+        while (!q.poll(b))
+            std::this_thread::yield();
+
+        assert(std::stoi(std::string(b.b, b.b + b.size)) == b.data);
+
+        rbuf.release(b.alloc_size);
+        ++i;
+    }
+}
+
+int main(int argc, char const *argv[]) {
+
+
+    //configure thread level parameters, these should be done before any logging
+    // queue size
+    g_log.set_thread_queue_size(1024);
+    // ring buffer size
+    g_log.set_thread_ringbuf_size(20480);
+    
+    g_log.info("Greetings from %s, bye %d\n", "iris", 0);
+    
+    //this tells logging thread to persist the data into file and waits
+    g_log.sync_and_close();
+
+    char *p1;
+    char *p2;
+    char *p3;
+    assert(512 == rbuf.acquire(512, p1));
+    assert(p1);
+
+    assert(256 == rbuf.acquire(256, p2));
+    assert(p2);
+
+    assert(p2 - p1 == 512);
+
+    assert(0 == rbuf.acquire(512, p1));
+
+    assert(rbuf.freespace() == 256);
+
+    assert(0 == rbuf.acquire(512, p3));
+
+    rbuf.release(512);
+
+    assert(768 == rbuf.acquire(512, p3));
+
+    printf("rbuf.freespace(): %lu\n", rbuf.freespace());
+    assert(rbuf.freespace() == 0);
+
+    rbuf.release(256);
+    printf("rbuf.freespace(): %lu\n", rbuf.freespace());
+    assert(256 == rbuf.freespace());
+    rbuf.release(768);
+    printf("rbuf.freespace(): %lu\n", rbuf.freespace());
+    assert(1024 == rbuf.freespace());
+
+    std::thread recyler(recyle);
+
+    int i = 1;
+    while (i <= ITERATIONS) {
+        std::string s(std::to_string(i));
+
+        buffer_t b;
+        char *ptr;
+        int size;
+        while (!(size = rbuf.acquire(s.size(), ptr)))
+            std::this_thread::yield();
+
+        b.b = ptr;
+        memcpy(b.b, s.c_str(), s.size());
+        b.size = s.size();
+        b.alloc_size = size;
+        b.data = i;
+
+        while (!q.offer(b))
+            std::this_thread::yield();
+        ++i;
+    }
+
+
+
+    recyler.join();
+    printf("passed\n");
+    return 0;
+}