--- /dev/null
+The MIT License (MIT)
+
+Copyright (c) 2015 Xinjing Cho
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+
--- /dev/null
+#sources
+IRIS_SRC = src/base_logger.cpp src/buffered_writer.cpp src/file_writer.cpp src/level_logger.cpp src/notifier.cpp src/stream_writer.cpp src/utils.cpp
+IRIS_TEST_SRC = $(IRIS_SRC) tests/test_lfringbuffer.cpp
+IRIS_TEST2_SRC = $(IRIS_SRC) src/main.cpp
+#object files
+IRIS_OBJS = base_logger.o buffered_writer.o file_writer.o level_logger.o notifier.o stream_writer.o utils.o
+IRIS_TEST_OBJS = $(IRIS_OBJS) test_lfringbuffer.o
+IRIS_TEST2_OBJS = $(IRIS_OBJS) test2.o
+#executable
+PROGRAM = libiris.so
+#PROGRAM = main
+#compiler
+CC = g++
+
+CACHELINE_SIZE =
+OS := $(shell uname)
+ifeq ($(OS), Darwin)
+CACHELINE_SIZE = $(shell sysctl -n hw.cachelinesize)
+else
+CACHELINE_SIZE = $(shell getconf LEVEL1_DCACHE_LINESIZE)
+endif
+
+#includes
+INCLUDE = -Iinclude
+#linker params
+LINKPARAMS = -fpic -lpthread -shared
+#linker params for tests
+LINKPARAMS_TEST = -fpic -lpthread
+#options for development
+CFLAGS = --std=c++11 -g -O3 -Wall -fpic -DIRIS_CACHELINE_SIZE=$(CACHELINE_SIZE) # removed flags -Werror -Wno-unused-private-field
+#options for release
+#CFLAGS = --std=c++11 -g -O2 -Wall -Werror -fpic -shared
+
+$(PROGRAM): $(IRIS_OBJS)
+ $(CC) -o $(PROGRAM) $(IRIS_OBJS) $(CFLAGS) $(LINKPARAMS)
+
+base_logger.o: src/base_logger.cpp include/base_logger.h
+ $(CC) -c src/base_logger.cpp -o base_logger.o $(CFLAGS) $(INCLUDE)
+
+buffered_writer.o: src/buffered_writer.cpp include/buffered_writer.h
+ $(CC) -c src/buffered_writer.cpp -o buffered_writer.o $(CFLAGS) $(INCLUDE)
+
+file_writer.o: src/file_writer.cpp include/file_writer.h
+ $(CC) -c src/file_writer.cpp -o file_writer.o $(CFLAGS) $(INCLUDE)
+
+stream_writer.o: src/stream_writer.cpp include/stream_writer.h
+ $(CC) -c src/stream_writer.cpp -o stream_writer.o $(CFLAGS) $(INCLUDE)
+
+level_logger.o: src/level_logger.cpp include/level_logger.h
+ $(CC) -c src/level_logger.cpp -o level_logger.o $(CFLAGS) $(INCLUDE)
+
+utils.o: src/utils.cpp include/utils.h
+ $(CC) -c src/utils.cpp -o utils.o $(CFLAGS) $(INCLUDE)
+
+notifier.o: src/notifier.cpp include/notifier.h
+ $(CC) -c src/notifier.cpp -o notifier.o $(CFLAGS) $(INCLUDE)
+
+test: test_lfringbuffer
+
+test_lfringbuffer: $(IRIS_TEST_OBJS)
+ $(CC) -o test_lfringbuffer $(IRIS_TEST_OBJS) $(CFLAGS) $(LINKPARAMS_TEST)
+ $(CC) -o test2 $(IRIS_TEST2_OBJS) $(CFLAGS) $(LINKPARAMS_TEST)
+
+test_lfringbuffer.o: tests/test_lfringbuffer.cpp src/main.cpp
+ $(CC) -c tests/test_lfringbuffer.cpp -o test_lfringbuffer.o $(CFLAGS) $(INCLUDE)
+ $(CC) -c src/main.cpp -o test2.o $(CFLAGS) $(INCLUDE)
+
+
+clean:
+ rm -rf *.o
+ rm -rf $(PROGRAM) test_lfringbuffer
+ rm -rf $(PROGRAM) test2
+
+install:
+ mkdir -p /usr/local/include/iris
+ cp include/* /usr/local/include/iris
+ cp $(PROGRAM) /usr/local/lib
+.PHONY: clean test
--- /dev/null
+# Overview
+`iris` is asynchronous logging library designed to log messages with minimum overhead to the performance. The cost of logging a message is merely copying the arguments into a lockfree queue.
+
+# Design
+Under the hood, `iris` uses a background thread (logging thread) to offload logging from other threads.
+1. For every other threads, they keep a lockfree queue to buffer the messages.
+2. The logging thread periodically scans through all these queues to collect messages, formats them and writes them into log file.
+
+A few highlights of the design of `iris` to achieve low latency:
+1. Buffering messages with thread local lockfree queue.
+2. Memory is managed by a thread local ringbuffer taking the advantage that logging is FIFO. Because the fact that logging is FIFO. This scales well in multithreaded environment.
+3. Minimum context switches.
+
+# Usage
+The supported severity levels are:
+```c++
+enum severity_level {
+ TRACE,
+ DEBUG,
+ INFO,
+ WARN,
+ ERROR,
+ FATAL
+};
+```
+By default, `iris` logs messages to `stdout` and filters out severity level less than `INFO`.
+
+```c++
+#include <iris/level_logger.h>
+
+// this creates a logging thread, logs messages to stdout
+iris::level_logger g_log;
+
+int main (int argc, char const *argv[]) {
+ //configure thread level parameters, these should be done before any logging
+ // queue size
+ g_log.set_thread_queue_size(1024);
+ // ring buffer size
+ g_log.set_thread_ringbuf_size(10240);
+
+ g_log.info("Greetings from %s, bye %d\n", 'iris', 0);
+
+ //this tells logging thread to persist the data into file and waits
+ g_log.sync_and_close();
+
+ return 0;
+}
+```
+Using a `file_writer` to logs all messages into a file:
+```c++
+#include <iris/level_logger.h>
+#include <iris/file_writer.h>
+
+iris::file_writer writer("./log.txt");
+// this creates a logging thread
+iris::level_logger g_log(&writer, iris::TRACE);
+
+int main (int argc, char const *argv[]) {
+ //configure thread level parameters, these should be done before any logging
+ // queue size
+ g_log.set_thread_queue_size(1024);
+ // ring buffer size
+ g_log.set_thread_ringbuf_size(20480);
+
+ g_log.info("Greetings from %s, bye %d\n", 'iris', 0);
+
+ //this tells logging thread to persist the data into file and waits
+ g_log.sync_and_close();
+
+ return 0;
+}
+```
+
+# Building & Installation
+
+To build the library, simply clone the projet, go inside the `iris` direcotry and run following commands.
+```shell
+make
+make test
+make install
+```
+To integrate `iris` into your program, link with `-liris` options.
\ No newline at end of file
--- /dev/null
+#!/bin/bash
+make
+make test
+
+
--- /dev/null
+#ifndef IRIS_BASE_LOG_H
+#define IRIS_BASE_LOG_H
+
+#include <cmath>
+#include <cstdio>
+
+#include <thread>
+#include <vector>
+#include <memory>
+#include <atomic>
+#include <string>
+#include <utility>
+#include <string.h>
+
+#include "define.h"
+#include "writer.h"
+#include "lfringbuffer.h"
+#include "sslfqueue.h"
+#include "notifier.h"
+#include "utils.h"
+#include "formatter.h"
+
+namespace iris {
+
+struct thread_logqueue;
+struct thread_logqueue_holder;
+extern notifier ntfer;
+extern thread_logqueue_holder this_thread_logqueue;
+extern size_t thread_queue_size;
+extern size_t thread_ringbuf_size;
+
+struct thread_logqueue_holder {
+private:
+ thread_logqueue * q;
+public:
+ thread_logqueue_holder():q(nullptr){}
+ inline void assign(thread_logqueue * queue) {
+ q = queue;
+ }
+ inline thread_logqueue *get() {
+ return q;
+ }
+ inline void notify_and_quit() {
+ if (q) {
+ ntfer.notify(notifier::to_ntf_t((long)q, ntf_queue_deletion));
+ q = nullptr;
+ }
+ }
+ ~thread_logqueue_holder() {
+ notify_and_quit();
+ }
+};
+
+// Every thread which uses logging primitives owns one of this
+// single-wrtiter-single-reader queue for buffering logs.
+// These thread_local queues form a singly-linked list which is
+// polled by a dedicated output thread to collect logs.
+struct thread_logqueue {
+public:
+ thread_logqueue(size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by normal threadss
+ thread_logqueue(thread_logqueue *head, size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by output thread
+ ~thread_logqueue();
+public:
+ sslfqueue<loglet_t> q;
+ lfringbuffer rbuf;
+ std::atomic<thread_logqueue *> next;
+ thread_logqueue * head;
+ bool output_thread; // if current thread is the output thread
+};
+
+class base_logger {
+public:
+ base_logger(writer * pwriter = nullptr,
+ size_t scan_interval = 100,
+ size_t output_buffer_size = 102400,
+ size_t default_thread_ringbuf_size = 102400,
+ size_t default_thread_queue_size = 5000);
+
+ // set the thread local queue size
+ void set_thread_queue_size(size_t size) {
+ thread_queue_size = size;
+ }
+ // set the thread local ring buffer size
+ void set_thread_ringbuf_size(size_t size) {
+ thread_ringbuf_size = size;
+ }
+
+ template<typename Formatter, typename... Args>
+ void log(Args&&... args) {
+ thread_logqueue * this_queue = this_thread_logqueue.get();
+
+ if (iris_unlikely(!this_queue)) {
+ if (thread_queue_size == 0)
+ thread_queue_size = m_default_thread_queue_size;
+ if (thread_ringbuf_size == 0)
+ thread_ringbuf_size = m_default_thread_ringbuf_size;
+ this_queue = new thread_logqueue(&m_head, thread_queue_size, thread_ringbuf_size);
+ this_thread_logqueue.assign(this_queue);
+ }
+
+ typedef std::tuple<typename std::decay<Args>::type...> Args_t;
+ const size_t buffer_size = sizeof(formatter_type) + sizeof(Args_t);
+ const size_t args_offset = sizeof(formatter_type);
+
+ loglet_t l;
+ while ((l.rbuf_alloc_size = this_queue->rbuf.acquire(buffer_size, l.rbuf_ptr)) == 0)
+ std::this_thread::yield();
+ *reinterpret_cast<formatter_type*>(l.rbuf_ptr) = &formatter_caller<Formatter, typename std::decay<Args>::type...>;
+ // inplace construct parameter pack
+ new (l.rbuf_ptr + args_offset) Args_t(std::forward<Args>(args)...);
+
+ // try to publish to output thread
+ if (iris_likely(this_queue->q.offer(l))) {
+ return;
+ }
+
+ // queue is full, notify the output thread
+ ntfer.notify(notifier::to_ntf_t(reinterpret_cast<long>(this_queue), ntf_msg));
+ // busy yielding until the log is buffered
+ do {
+ std::this_thread::yield();
+ } while (!this_queue->q.offer(l));
+ }
+
+ void sync_and_close() {
+ if (!m_stop.load(std::memory_order_acquire)) {
+ this_thread_logqueue.notify_and_quit();
+ m_stop.store(true, std::memory_order_release);
+ m_output_thread.join();
+ }
+ }
+
+ ~base_logger() {
+ sync_and_close();
+ }
+private:
+ std::atomic<bool> m_stop;
+ thread_logqueue m_head;
+ std::thread m_output_thread;
+ size_t m_default_thread_ringbuf_size;
+ size_t m_default_thread_queue_size;
+};
+
+}
+
+#endif
--- /dev/null
+#ifndef IRIS_BUFFERED_WRITER_H_
+#define IRIS_BUFFERED_WRITER_H_
+#include <string.h>
+#include <memory>
+
+#include "writer.h"
+
+namespace iris {
+
+struct buffered_writer {
+private:
+ buffered_writer(const buffered_writer & rhs)=delete;
+ struct buffered_writer & operator=(const buffered_writer &rhs)=delete;
+ writer &m_w;
+
+ char *m_buf;
+ int m_pos;
+ int m_capacity;
+public:
+ buffered_writer(writer & w, size_t cap = 0);
+
+ ~buffered_writer();
+
+ void expand(size_t n);
+
+ inline void reset() {
+ m_pos = 0;
+ }
+
+ inline char * reserve(size_t s) {
+ if (freespace() < s) {
+ flush();
+ if (freespace() < s)
+ throw std::bad_alloc();
+ }
+
+ return write_pointer();
+ }
+
+ inline size_t freespace() {
+ return m_capacity - m_pos;
+ }
+
+ inline void inc_write_pointer(size_t s) {
+ m_pos += s;
+ }
+
+ inline char * write_pointer() {
+ return m_buf + m_pos;
+ }
+
+ inline size_t size() {
+ return m_pos;
+ }
+
+ inline operator char*() {
+ return m_buf;
+ }
+
+ inline void flush() {
+ m_w.write(m_buf, m_pos);
+ reset();
+ }
+};
+
+}
+#endif
--- /dev/null
+#ifndef IRIS_DEFINE_H_
+#define IRIS_DEFINE_H_
+#include <sys/uio.h>
+
+namespace iris {
+
+#define iris_likely(x) __builtin_expect(!!(long)(x), 1)
+#define iris_unlikely(x) __builtin_expect(!!(long)(x), 0)
+
+struct loglet_t {
+ // @pbuffer holds formatter function address and arguments
+ char * rbuf_ptr;
+ size_t rbuf_alloc_size;
+ loglet_t(char * ptr = nullptr, size_t alloc_size = 0):rbuf_ptr(ptr), rbuf_alloc_size(alloc_size) {}
+};
+
+#if defined(IOV_MAX) /* Linux x86 (glibc-2.3.6-3) */
+ #define MAX_IOVECS IOV_MAX
+#elif defined(MAX_IOVEC) /* Linux ia64 (glibc-2.3.3-98.28) */
+ #define MAX_IOVECS MAX_IOVEC
+#elif defined(UIO_MAXIOV) /* Linux x86 (glibc-2.2.5-233) */
+ #define MAX_IOVECS UIO_MAXIOV
+#elif (defined(__FreeBSD__) && __FreeBSD_version < 500000) || defined(__DragonFly__) || defined(__APPLE__)
+ /* - FreeBSD 4.x
+ * - MacOS X 10.3.x
+ * (covered in -DKERNEL)
+ * */
+ #define MAX_IOVECS 1024
+#else
+ #error "can't deduce the maximum number of iovec in a readv/writev syscall"
+#endif
+}
+#endif
--- /dev/null
+#ifndef IRIS_FILE_WRITER_H_
+#define IRIS_FILE_WRITER_H_
+
+
+#include <cstdio>
+#include <string>
+
+#include "stream_writer.h"
+
+namespace iris {
+
+// A log writer that appends logs to a file.
+class file_writer: public stream_writer {
+public:
+ file_writer(const char * filename);
+ ~file_writer();
+ virtual void write(const char * buffer, size_t len);
+private:
+ std::string m_filename;
+};
+
+}
+
+#endif
\ No newline at end of file
--- /dev/null
+#ifndef IRIS_FORMATTER_H_
+#define IRIS_FORMATTER_H_
+#include "define.h"
+#include "buffered_writer.h"
+#include "utils.h"
+
+namespace iris {
+// fomatter do the real formatting logic, output into @obuf
+// return false if there is not enough space in @obuf.
+typedef void (*formatter_type)(const loglet_t & l, buffered_writer & w);
+
+template<typename Formatter, typename...Args, std::size_t...Indexes>
+static void call_format(buffered_writer & w, const std::tuple<Args...> & args, seq<Indexes...>) {
+ return Formatter::format(&w, std::move(std::get<Indexes>(args))...);
+}
+
+template<typename Formatter, typename... Args>
+static void formatter_caller(const loglet_t & l, buffered_writer & w) {
+ const size_t args_offset = sizeof(formatter_type);
+
+ typedef std::tuple<Args...> Args_t;
+ Args_t & args = *reinterpret_cast<Args_t*>(l.rbuf_ptr + args_offset);
+ typename make_sequence<sizeof...(Args)>::type indexes;
+ call_format<Formatter>(w, args, indexes);
+ //deconstruct parameter pack
+ args.~Args_t();
+}
+
+}
+#endif
\ No newline at end of file
--- /dev/null
+#ifndef IRIS_SEVERITY_LOGGER_H_
+#define IRIS_SEVERITY_LOGGER_H_
+
+#include "snprintf_formatter.h"
+#include "base_logger.h"
+
+namespace iris {
+
+enum severity_level {
+ UNSET,
+ TRACE,
+ DEBUG,
+ INFO,
+ WARN,
+ ERROR,
+ FATAL
+};
+extern severity_level thread_severity_level;
+
+// a level logger provides per-thread severity level filtering .
+class level_logger: public base_logger {
+public:
+ level_logger(writer * pwriter = nullptr,
+ severity_level default_level = INFO,
+ size_t scan_interval = 100,
+ size_t output_buffer_size = 102400,
+ size_t default_thread_ringbuf_size = 102400,
+ size_t default_thread_queue_size = 5000):
+ base_logger(pwriter, scan_interval,
+ output_buffer_size,
+ default_thread_ringbuf_size,
+ default_thread_queue_size),
+ m_default_level(default_level) {}
+
+ // set the severity level for the current thread
+ void set_severity_level(severity_level level) {
+ thread_severity_level = level;
+ }
+
+ template<typename... Args>
+ void log(severity_level level, const char * fmt, Args&&... args) {
+ if (iris_unlikely(thread_severity_level == UNSET))
+ thread_severity_level = m_default_level;
+ if (thread_severity_level <= level) {
+ base_logger::log<snprintf_formatter>(fmt, std::forward<Args>(args)...);
+ }
+ }
+
+ template<typename... Args>
+ void trace(const char * fmt, Args&&... args) {
+ log(TRACE, fmt, std::forward<Args>(args)...);
+ }
+
+ template<typename... Args>
+ void debug(const char * fmt, Args&&... args) {
+ log(DEBUG, fmt, std::forward<Args>(args)...);
+ }
+
+ template<typename... Args>
+ void info(const char * fmt, Args&&... args) {
+ log(INFO, fmt, std::forward<Args>(args)...);
+ }
+
+ template<typename... Args>
+ void warn(const char * fmt, Args&&... args) {
+ log(WARN, fmt, std::forward<Args>(args)...);
+ }
+
+ template<typename... Args>
+ void error(const char * fmt, Args&&... args) {
+ log(ERROR, fmt, std::forward<Args>(args)...);
+ }
+
+ template<typename... Args>
+ void fatal(const char * fmt, Args&&... args) {
+ log(FATAL, fmt, std::forward<Args>(args)...);
+ }
+private:
+ severity_level m_default_level;
+};
+
+}
+#endif
--- /dev/null
+#ifndef IRIS_LF_RINGBUFFER_H_
+#define IRIS_LF_RINGBUFFER_H_
+#include <atomic>
+
+#include "define.h"
+#include "utils.h"
+#include <queue>
+#include <assert.h>
+namespace iris {
+
+// A single reader single writer lockfree ring buffer
+// this should be used as following:
+// A thread calls acquire() method repeatedly
+// until the return value is nonzero to allocate @size bytes of memory.
+// And release() method should be called in the order of allocation
+// with the same @size parameter to declare the release.
+class lfringbuffer {
+private:
+ size_t cap;
+ size_t mask;
+ char * buffer;
+ char __pad0__[IRIS_CACHELINE_SIZE - sizeof(size_t) * 2 - sizeof(char*)];
+ unsigned long head;
+ unsigned long tail_cache;
+ char __pad1__[IRIS_CACHELINE_SIZE - sizeof(unsigned long) * 2];
+ std::atomic<unsigned long> tail;
+ char __pad2__[IRIS_CACHELINE_SIZE - sizeof(std::atomic<unsigned long>)];
+public:
+ lfringbuffer(size_t size) {
+ cap = round_up_to_next_multiple_of_2(size);
+ mask = cap - 1;
+ buffer = new char[cap];
+ head = 0;
+ tail = cap;
+ tail_cache = cap;
+ }
+
+ ~lfringbuffer() {
+ delete[] buffer;
+ buffer = nullptr;
+ }
+
+ // acquire() tries to allocate @size bytes of memory
+ // from the buffer, the address of the memory acquired
+ // is stored in @ptr.
+ // returns the actual bytes allocated insided the buffer
+ // which might be larger than the requested size, since
+ // internally the buffer is implemented by a array and
+ // there is unusable memory need to be taken into account
+ // when the request can not be fulfilled using only the memory
+ // left at the end of the buffer.
+ // 0 will be returned if there is not enough free space.
+ size_t acquire(size_t size, char *& ptr) {
+ assert(size);
+ size_t total_free = tail_cache - head;
+ if (iris_unlikely(total_free < size)) {
+ // load the latest tail
+ tail_cache = tail.load(std::memory_order_acquire);
+ total_free = tail_cache - head;
+ if (total_free < size)
+ return 0; // no enough freespace
+ }
+
+ // now check if there is enough memory at the rear
+ size_t rear_free = cap - (head & mask);
+ if (iris_likely(rear_free >= size)) {
+ // ok, take the rear chunk
+ ptr = buffer + (head & mask);
+ head += size;
+ return size;
+ }
+
+ // now check if there is enough memory at the front
+ size_t front_free = tail_cache & mask;
+ if (front_free < size) {
+ tail_cache = tail.load(std::memory_order_acquire);
+ front_free = tail_cache & mask;
+ if (front_free < size)
+ return 0;// no enough space
+ }
+
+ ptr = buffer;
+
+ // throw away the rear fragmentation
+ head += rear_free + size;
+
+ return rear_free + size;
+ }
+
+ // release() method releases @size bytes of memory allocated from
+ // acquire() method. @size should be the same as the one retuned from
+ // acquire().
+ void release(size_t size) {
+ tail.fetch_add(size, std::memory_order_release);
+ }
+
+ size_t freespace() {
+ return tail.load(std::memory_order_acquire) - head;
+ }
+};
+
+}
+#endif
\ No newline at end of file
--- /dev/null
+#ifndef IRIS_NOTIFIER_H_
+#define IRIS_NOTIFIER_H_
+#include <vector>
+
+namespace iris {
+
+// Bitmask at the lowest bits of a long type
+// representing types of notification.
+enum ntf_type{
+ ntf_msg = 1, // first bit for message
+ ntf_queue_deletion = 2, // second bit for queue deletion
+ ntf_type_mask = 3
+};
+
+typedef long ntf_t;
+
+class notifier {
+public:
+ notifier();
+ ~notifier();
+ // send a notification to the receiver
+ void notify(const ntf_t & ntf);
+ // wait for notifications
+ // return false if timed out or on error
+ bool wait(long timeout, std::vector<ntf_t> & ntfs);
+ static inline ntf_t to_ntf_t(long data, ntf_type type) {
+ return data | (int)type;
+ }
+ static inline long to_data_t(ntf_t ntf) {
+ return ntf & ~ntf_type_mask;
+ }
+ static inline ntf_type to_ntf_type(ntf_t ntf) {
+ return static_cast<ntf_type>(ntf & ntf_type_mask);
+ }
+private:
+ int m_pipe[2];
+ long m_poll_time;
+};
+
+}
+#endif
\ No newline at end of file
--- /dev/null
+#ifndef IRIS_SNPRINTF_FORMATTER_H_
+#define IRIS_SNPRINTF_FORMATTER_H_
+
+#include <cstdio>
+#include <utility>
+
+#include <assert.h>
+
+#include "formatter.h"
+#include "utils.h"
+#include "define.h"
+#include "buffered_writer.h"
+
+namespace iris {
+class snprintf_formatter {
+public:
+
+template<typename... Args>
+static void format(buffered_writer * bw, const char * fmt, Args&&... args) {
+ size_t n;
+ while (true) {
+ n = std::snprintf(bw->write_pointer(), bw->freespace(), fmt, args...);
+ if (iris_unlikely(n < 0)) {
+ return;//skip over this formatting if error occurred
+ }
+ if (iris_unlikely(n >= bw->freespace())) {
+ bw->flush(); // flush buffer and retried
+ continue;
+ }
+ bw->inc_write_pointer(n);
+ assert(*bw->write_pointer() == 0);
+ *bw->write_pointer() = '\n';// replace '\0' with '\n'
+ bw->inc_write_pointer(1);
+ break; // TODO what to do with failed formatting?
+ }
+
+}
+
+
+};
+}
+#endif
\ No newline at end of file
--- /dev/null
+#ifndef IRIS_SSLFQUEUE_H_
+#define IRIS_SSLFQUEUE_H_
+#include <stdint.h>
+#include <atomic>
+#include <vector>
+#include "utils.h"
+
+namespace iris {
+
+//a single producer single consumer lockfree ring queue.
+template<typename T>
+class sslfqueue {
+private:
+ size_t cap;
+ size_t mask;
+ T * buffer;
+ char _pad0_[IRIS_CACHELINE_SIZE - sizeof(T*) - sizeof(int) * 2];
+ std::atomic<long> head;
+ char _pad1_[IRIS_CACHELINE_SIZE - sizeof(std::atomic<long>)];
+ long tail_cache;
+ char _pad2_[IRIS_CACHELINE_SIZE - sizeof(long)];
+ std::atomic<long> tail;
+ char _pad3_[IRIS_CACHELINE_SIZE - sizeof(std::atomic<long>)];
+ long head_cache;
+ char _pad4_[IRIS_CACHELINE_SIZE - sizeof(long)];
+
+public:
+
+ sslfqueue(size_t capacity = 5000):head(0), tail_cache(0), tail(0), head_cache(0)
+ {
+ cap = round_up_to_next_multiple_of_2(capacity);
+ mask = cap - 1;
+ buffer = new T[cap];
+ }
+
+ ~sslfqueue(){
+ delete []buffer;
+ }
+
+ bool offer(const T & e) {
+ const long cur_tail = tail.load(std::memory_order_acquire);
+ const long len = cur_tail - cap;
+ if (head_cache <= len) {
+ head_cache = head.load(std::memory_order_acquire);
+ if (head_cache <= len)
+ return false; // queue is full
+ }
+
+ buffer[cur_tail & mask] = e;
+ tail.store(cur_tail + 1, std::memory_order_release);
+
+ return true;
+ }
+
+ bool poll(T & e) {
+ const long cur_head = head.load(std::memory_order_acquire);
+ if (cur_head >= tail_cache) {
+ tail_cache = tail.load(std::memory_order_acquire);
+ if (cur_head >= tail_cache)
+ return false; // queue is empty
+ }
+
+ e = buffer[cur_head & mask];
+ head.store(cur_head + 1, std::memory_order_release);
+
+ return true;
+ }
+
+ bool batch_poll(std::vector<T> & vec) {
+ long cur_head = head.load(std::memory_order_acquire);
+ if (cur_head >= tail_cache) {
+ tail_cache = tail.load(std::memory_order_acquire);
+ if (cur_head >= tail_cache)
+ return false; // queue is empty
+ }
+
+ while (cur_head < tail_cache) {
+ vec.push_back(buffer[cur_head & mask]);
+ ++cur_head;
+ }
+
+ head.store(cur_head, std::memory_order_release);
+
+ return true;
+ }
+
+ bool empty() {
+ return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == 0;
+ }
+
+ bool full() {
+ return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == cap;
+ }
+
+ size_t size() {
+ return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire);
+ }
+};
+
+}
+#endif
+
+
--- /dev/null
+#ifndef IRIS_STREAM_WRITER_H_
+#define IRIS_STREAM_WRITER_H_
+
+
+#include <cstdio>
+#include <string>
+
+#include "writer.h"
+
+namespace iris {
+
+// A log writer that appends logs to a standard stream.
+class stream_writer: public writer {
+public:
+ stream_writer(FILE * stream);
+ virtual void write(const char * msg);
+ virtual void write(const char * buffer, size_t len);
+protected:
+ FILE * m_stream;
+ int m_fd;
+};
+
+}
+
+#endif
\ No newline at end of file
--- /dev/null
+#ifndef IRIS_UTILS_H_
+#define IRIS_UTILS_H_
+#include <sys/time.h>
+
+#include <cstddef>
+namespace iris{
+
+long long get_current_time_in_us();
+
+
+int round_up_to_next_multiple_of_2(int n);
+
+/* utilitis for making sequence for tuple element retrieval */
+template<size_t ...> struct seq {};
+template<size_t idx, std::size_t N, std::size_t... S> struct seq_helper: seq_helper<idx + 1, N, S..., idx> {};
+template<size_t N, size_t ...S> struct seq_helper<N, N, S...> {
+ typedef seq<S...> type;
+};
+template<size_t N>
+struct make_sequence {
+ typedef typename seq_helper<0, N>::type type;
+};
+
+
+}
+#endif
\ No newline at end of file
--- /dev/null
+#ifndef IRIS_WRITER_H_
+#define IRIS_WRITER_H_
+
+#include <sys/uio.h>
+
+#include <vector>
+namespace iris {
+
+// interface for log writting policy
+class writer {
+public:
+ virtual void write(const char * msg) = 0;
+ virtual void write(const char * buffer, size_t len) = 0;
+};
+
+}
+
+#endif
\ No newline at end of file
--- /dev/null
+#!/bin/bash
+./test_lfringbuffer
+./test2
+rm log.txt
+
+
--- /dev/null
+#include <cstdio>
+#include <cstdlib>
+#include <cmath>
+
+#include <memory>
+#include <base_logger.h>
+#include <stream_writer.h>
+
+using namespace iris;
+
+namespace iris{
+
+static stream_writer stdout_writer(stdout);
+
+thread_logqueue_holder this_thread_logqueue;
+notifier ntfer;
+size_t thread_queue_size;
+size_t thread_ringbuf_size;
+
+}
+
+thread_logqueue::thread_logqueue(size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), output_thread(true)
+ {}
+
+thread_logqueue::thread_logqueue(thread_logqueue *head, size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), head(head), output_thread(false){
+ thread_logqueue * p = nullptr;
+
+ do {
+ // insert self into the head of the global linked list lockfreely
+ p = head->next.load(std::memory_order_acquire);
+ this->next.store(p, std::memory_order_release);
+ // head might have been modified or deleted cas until this is inserted
+ if (head->next.compare_exchange_weak(p, this, std::memory_order_release)) {
+ return;
+ }
+ } while (true);
+}
+
+thread_logqueue::~thread_logqueue() {
+ if (output_thread)
+ return;
+ thread_logqueue * p = nullptr, *pnext = nullptr, *q = this;
+
+ // remove self from the global linked list lockfreely
+ p = head;
+ while (p->next.load(std::memory_order_acquire) != q)
+ p = p->next.load(std::memory_order_acquire);
+
+ pnext = this->next.load(std::memory_order_acquire);
+ // mark this as deleted(by setting this->next to nullptr)
+ while (!this->next.compare_exchange_weak(pnext, nullptr, std::memory_order_release)) {
+ next = this->next.load(std::memory_order_acquire);
+ }
+
+ do {
+ if (p->next.compare_exchange_weak(q, pnext, std::memory_order_release)) {
+ return;
+ }
+ // some other nodes have been inserted after p, restart.
+ p = head;
+ while (p->next.load(std::memory_order_acquire) != q)
+ p = p->next.load(std::memory_order_acquire);
+ } while(true);
+}
+
+static void do_format_and_flush(thread_logqueue *p, std::vector<loglet_t> & logs, buffered_writer & w) {
+ for (size_t i = 0; i < logs.size(); ++i) {
+ auto f = *reinterpret_cast<formatter_type*>(logs[i].rbuf_ptr);
+ (*f)(logs[i], w);
+ p->rbuf.release(logs[i].rbuf_alloc_size);
+ }
+ logs.clear();
+}
+
+static void iris_thread(writer * pwriter, std::atomic<bool> * stop, size_t scan_interval,size_t output_buffer_size, thread_logqueue * head) {
+ std::vector<loglet_t> logs;
+ std::vector<ntf_t> ntfs;
+ buffered_writer bw(*pwriter, output_buffer_size);
+ while(!*stop) {
+ // iterate through the linked list of input queues, collect log entries
+ thread_logqueue * p = head;
+ bool empty = true;
+ while (p) {
+ if (p->q.batch_poll(logs)) {
+ do_format_and_flush(p, logs, bw);
+ empty = false;
+ }
+ p = p->next;
+ }
+
+ if (empty) {
+ ntfs.clear();
+ // wait for notification or 100ms
+ ntfer.wait(scan_interval, ntfs);
+ for (size_t i = 0; i < ntfs.size(); ++i) {
+ ntf_t ntf = ntfs[i];
+ p = reinterpret_cast<thread_logqueue *>(notifier::to_data_t(ntf));
+ switch(notifier::to_ntf_type(ntf)) {
+ case ntf_msg:
+ if (p->q.batch_poll(logs))
+ do_format_and_flush(p, logs, bw);
+ break;
+ case ntf_queue_deletion:
+ if (p->q.batch_poll(logs))
+ do_format_and_flush(p, logs, bw);
+ delete p;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+
+ //collect one more time
+ thread_logqueue * p = head;
+ while (p) {
+ if (p->q.batch_poll(logs))
+ do_format_and_flush(p, logs, bw);
+ p = p->next;
+ }
+ bw.flush();
+}
+
+base_logger::base_logger(writer * pwriter,
+ size_t scan_interval,
+ size_t output_buffer_size,
+ size_t default_thread_ringbuf_size,
+ size_t default_thread_queue_size):
+ m_stop(0),
+ m_default_thread_ringbuf_size(default_thread_ringbuf_size),
+ m_default_thread_queue_size(default_thread_queue_size)
+{
+ if (pwriter == nullptr)
+ pwriter = &stdout_writer;
+ m_output_thread = std::thread(iris_thread, pwriter, &m_stop, scan_interval, output_buffer_size, &m_head);
+}
--- /dev/null
+#include <buffered_writer.h>
+
+using namespace iris;
+
+buffered_writer::buffered_writer(writer & wter, size_t cap): m_w(wter), m_buf(nullptr), m_pos(0), m_capacity(cap) {
+ if (m_capacity) {
+ m_buf = new char[m_capacity];
+ }
+}
+
+buffered_writer::~buffered_writer() {
+ if (m_buf) {
+ delete[] m_buf;
+ m_buf = nullptr;
+ m_pos = m_capacity = 0;
+ }
+}
+
+void buffered_writer::expand(size_t n) {
+ if (n <= m_capacity)
+ return;
+ char *tmp_buf = new char[n];
+ memcpy(tmp_buf, this->m_buf, this->size());
+ delete[] this->m_buf;
+ this->m_buf = tmp_buf;
+ this->m_capacity = n;
+}
\ No newline at end of file
--- /dev/null
+#include <file_writer.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+
+using namespace iris;
+
+file_writer::file_writer(const char * filename): stream_writer(nullptr) {
+ FILE *fp = fopen(filename, "a");
+ if (fp == nullptr) {
+ std::string error_string = "[iris] failed to open log file <";
+ error_string += filename;
+ error_string += ">, reason: ";
+ error_string += strerror(errno);
+ throw error_string;
+ }
+ m_stream = fp;
+ m_fd = fileno(m_stream);
+}
+
+file_writer::~file_writer() {
+ if (m_stream == nullptr)
+ return;
+ fclose(m_stream);
+ m_stream = nullptr;
+ m_fd = -1;
+}
+
+void file_writer::write(const char * buffer, size_t len) {
+ size_t offset = 0;
+ while (len) {
+ size_t written = ::write(m_fd, buffer + offset, len);
+ if (written <= 0) {
+ if (errno == EINTR)
+ continue;
+ fprintf(stderr, "[iris] error, should write %lu byts, only %lu bytes written, reason: %s.\n", len, written, strerror(errno));
+ break;
+ }
+ len -= written;
+ offset += written;
+ }
+}
--- /dev/null
+#include <level_logger.h>
+
+namespace iris {
+severity_level thread_severity_level;
+}
\ No newline at end of file
--- /dev/null
+#include <unistd.h>
+
+
+#include <cstdio>
+#include <chrono>
+#include <limits>
+#include <cmath>
+
+#include <level_logger.h>
+#include <file_writer.h>
+
+#include <map>
+#include <thread>
+#include <vector>
+/*
+using log_t = reckless::severity_log<
+ reckless::indent<4>, // 4 spaces of indent
+ ' ' // Field separator
+ >;
+reckless::file_writer rwriter("log_reckless.txt");
+log_t r_log(&rwriter, 102400, 65534, 102400);
+*/
+
+iris::file_writer writer("./log.txt");
+iris::level_logger g_log(&writer, iris::INFO);
+int freq_map[50000000];
+
+long long g_max_lat;
+long long g_min_lat;
+#define ITERATIONS 1e3
+
+void worker_thread() {
+ std::hash<std::thread::id> hasher;
+ long long max_lat = std::numeric_limits<long long>::lowest(), min_lat = std::numeric_limits<long long>::max(), avg_lat = 0, sum = 0;
+ size_t tid = hasher(std::this_thread::get_id());
+ g_log.set_thread_queue_size(65534);
+ g_log.set_thread_ringbuf_size(655340);
+ auto start = std::chrono::high_resolution_clock::now();
+ for (int i = 1; i <= ITERATIONS; ++i) {
+ auto start = std::chrono::high_resolution_clock::now();
+ g_log.info("hello %s, world %d, there %d, here %d, idx: %d", "dsads", 231, 0, 0, i);
+ auto finish = std::chrono::high_resolution_clock::now();
+ long long lat = std::chrono::duration_cast<std::chrono::nanoseconds>(finish-start).count();
+ //sum += lat;
+ max_lat = std::max(max_lat, lat);
+ min_lat = std::min(min_lat, lat);
+ g_max_lat = std::max(max_lat, g_max_lat);
+ g_min_lat = std::min(min_lat, g_min_lat);
+ //avg_lat = sum / i;
+ if (lat < 50000000)
+ freq_map[lat]++;
+ //if (i % 10000 == 0)
+ // usleep(1000);
+ }
+ auto finish = std::chrono::high_resolution_clock::now();
+ long long lat = std::chrono::duration_cast<std::chrono::nanoseconds>(finish-start).count();
+ printf("thread_id: %lu max_lat: %lldns, min_lat: %lldns, avg_lat: %lldns, latency sum %lldns\n", tid, 0ll, min_lat, (long long)lat / (long long)ITERATIONS, lat);
+}
+int main(int argc, char const *argv[]) {
+ g_max_lat = 0;
+ g_min_lat = std::numeric_limits<int>::max();
+ int n = 10;
+ if (argc > 1) {
+ n = std::stoi(argv[1]);
+ }
+ std::vector<std::thread> workers;
+
+ for (int i = 0; i < n; ++i) {
+ workers.push_back(std::thread(worker_thread));
+ }
+ for (int i = 0; i < n; ++i) {
+ workers[i].join();
+ }
+ printf("\nlatency,count\n");
+ g_max_lat = std::min(50000000 - 1, (int)g_max_lat);
+ for (int i = g_min_lat; i <= 400; ++i) {
+ if (freq_map[i])
+ printf("%d,%d\n", i, freq_map[i]);
+ }
+ //printf("\nno,latency\n");
+ //for (size_t i = 0; i < lats.size(); ++i) {
+ // printf("%zu,%d\n", i + 1, lats[i]);
+ //}
+ //g_log.sync_and_close();
+ return 0;
+}
--- /dev/null
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+#include <poll.h>
+
+#include <string>
+#include <system_error>
+
+#include <notifier.h>
+
+using namespace iris;
+
+notifier::notifier() {
+ if (::pipe(m_pipe)) {
+ std::string error_str = "failed to call pipe(), reason:";
+ error_str += strerror(errno);
+ throw std::system_error(std::error_code(errno, std::system_category()), error_str);
+ }
+ if (fcntl(m_pipe[1], F_SETFL, O_NONBLOCK)) {
+ std::string error_str = "failed to set piep's write side to nonblock mode, reason:";
+ error_str += strerror(errno);
+ throw std::system_error(std::error_code(errno, std::system_category()), error_str);
+ }
+}
+
+notifier::~notifier() {
+ close(m_pipe[0]);
+ close(m_pipe[1]);
+}
+
+void notifier::notify(const ntf_t & ntf) {
+ size_t written = 0;
+ while (written < sizeof(ntf_t)) {
+ int n = write(m_pipe[1], (char *)&ntf, sizeof(ntf_t) - written);
+ if (n <= 0) {
+ if (errno == EINTR)
+ continue;
+ break;
+ }
+ written += n;
+ }
+}
+
+bool notifier::wait(long timeout, std::vector<ntf_t> & ntfs) {
+ struct pollfd pfd;
+ pfd.fd = m_pipe[0];
+ pfd.events = POLLIN;
+
+ if(poll(&pfd, 1, timeout) == 1) {
+ int flags = fcntl(m_pipe[0], F_GETFL, 0);
+ flags |= O_NONBLOCK;
+ fcntl(m_pipe[0], F_SETFL, flags);
+ while (true) {
+ ntf_t ntf;
+ size_t readn = 0;
+ while (readn < sizeof(ntf_t)) {
+ int n = read(m_pipe[0], &ntf, sizeof(ntf_t) - readn);
+ if (n <= 0) {
+ if (errno == EINTR)
+ continue;
+ break;
+ }
+ readn += n;
+ }
+ if (readn >= sizeof(ntf_t)) {
+ ntfs.push_back(ntf);
+ } else {
+ break;
+ }
+ }
+ flags &= ~O_NONBLOCK;
+ fcntl(m_pipe[0], F_SETFL, flags);
+
+ return true;
+ }
+
+ return false;
+}
\ No newline at end of file
--- /dev/null
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <define.h>
+#include <stream_writer.h>
+using namespace iris;
+
+stream_writer::stream_writer(FILE * stream): m_stream(stream), m_fd(-1) {
+ if (stream)
+ m_fd = fileno(m_stream);
+}
+
+void stream_writer::write(const char * msg) {
+ size_t len = strlen(msg);
+ this->write(msg, len);
+}
+
+void stream_writer::write(const char * buffer, size_t len){
+ size_t written = 0;
+ while (written < len) {
+ size_t n = fwrite(buffer + written, 1, len - written, m_stream);
+ written += n;
+ if (n < len) {
+ if (errno == EINTR) {
+ continue;
+ }
+ fprintf(stderr, "[iris] error, should write %lu byts, only %lu bytes written, reason: %s.\n", len, n, strerror(errno));
+ break;
+ }
+ }
+ fflush(m_stream);
+}
--- /dev/null
+#include <utils.h>
+
+long long iris::get_current_time_in_us() {
+ struct timeval v;
+ gettimeofday(&v, nullptr);
+ return (long long)v.tv_sec * 1000000 + v.tv_usec;
+}
+
+
+int iris::round_up_to_next_multiple_of_2(int n) {
+ int bits = 0, ones = 0, oldn = n;
+ while(n) {
+ if (n & 1)
+ ++ones;
+ ++bits;
+ n >>= 1;
+ }
+
+ if (ones == 1) // already a multiple of 2
+ return oldn;
+ return 1 << bits;
+}
\ No newline at end of file
--- /dev/null
+#include <thread>
+#include <string>
+#include<cstring>
+#include <assert.h>
+
+#include <lfringbuffer.h>
+#include <sslfqueue.h>
+#include <define.h>
+
+#include <iris/level_logger.h>
+#include <iris/file_writer.h>
+
+iris::file_writer writer("./log.txt");
+// this creates a logging thread
+iris::level_logger g_log(&writer, iris::TRACE);
+
+
+using namespace iris;
+#define ITERATIONS (int)1e7
+lfringbuffer rbuf(1024);
+struct buffer_t {
+ char * b;
+ int size;
+ int alloc_size;
+ int data;
+};
+sslfqueue<buffer_t> q;
+
+void recyle() {
+ int i = 1;
+ while (i <= ITERATIONS) {
+ buffer_t b;
+ while (!q.poll(b))
+ std::this_thread::yield();
+
+ assert(std::stoi(std::string(b.b, b.b + b.size)) == b.data);
+
+ rbuf.release(b.alloc_size);
+ ++i;
+ }
+}
+
+int main(int argc, char const *argv[]) {
+
+
+ //configure thread level parameters, these should be done before any logging
+ // queue size
+ g_log.set_thread_queue_size(1024);
+ // ring buffer size
+ g_log.set_thread_ringbuf_size(20480);
+
+ g_log.info("Greetings from %s, bye %d\n", "iris", 0);
+
+ //this tells logging thread to persist the data into file and waits
+ g_log.sync_and_close();
+
+ char *p1;
+ char *p2;
+ char *p3;
+ assert(512 == rbuf.acquire(512, p1));
+ assert(p1);
+
+ assert(256 == rbuf.acquire(256, p2));
+ assert(p2);
+
+ assert(p2 - p1 == 512);
+
+ assert(0 == rbuf.acquire(512, p1));
+
+ assert(rbuf.freespace() == 256);
+
+ assert(0 == rbuf.acquire(512, p3));
+
+ rbuf.release(512);
+
+ assert(768 == rbuf.acquire(512, p3));
+
+ printf("rbuf.freespace(): %lu\n", rbuf.freespace());
+ assert(rbuf.freespace() == 0);
+
+ rbuf.release(256);
+ printf("rbuf.freespace(): %lu\n", rbuf.freespace());
+ assert(256 == rbuf.freespace());
+ rbuf.release(768);
+ printf("rbuf.freespace(): %lu\n", rbuf.freespace());
+ assert(1024 == rbuf.freespace());
+
+ std::thread recyler(recyle);
+
+ int i = 1;
+ while (i <= ITERATIONS) {
+ std::string s(std::to_string(i));
+
+ buffer_t b;
+ char *ptr;
+ int size;
+ while (!(size = rbuf.acquire(s.size(), ptr)))
+ std::this_thread::yield();
+
+ b.b = ptr;
+ memcpy(b.b, s.c_str(), s.size());
+ b.size = s.size();
+ b.alloc_size = size;
+ b.data = i;
+
+ while (!q.offer(b))
+ std::this_thread::yield();
+ ++i;
+ }
+
+
+
+ recyler.join();
+ printf("passed\n");
+ return 0;
+}