From 72045f68e6d1ad46133bdc507b409b676ce64957 Mon Sep 17 00:00:00 2001 From: ahmad Date: Fri, 1 Feb 2019 18:41:31 -0800 Subject: [PATCH] adding iris benchmark --- iris/LICENSE | 22 +++++ iris/Makefile | 78 ++++++++++++++++ iris/README.md | 82 +++++++++++++++++ iris/compile.sh | 5 + iris/include/base_logger.h | 146 ++++++++++++++++++++++++++++++ iris/include/buffered_writer.h | 67 ++++++++++++++ iris/include/define.h | 33 +++++++ iris/include/file_writer.h | 24 +++++ iris/include/formatter.h | 30 ++++++ iris/include/level_logger.h | 83 +++++++++++++++++ iris/include/lfringbuffer.h | 103 +++++++++++++++++++++ iris/include/notifier.h | 41 +++++++++ iris/include/snprintf_formatter.h | 42 +++++++++ iris/include/sslfqueue.h | 103 +++++++++++++++++++++ iris/include/stream_writer.h | 25 +++++ iris/include/utils.h | 26 ++++++ iris/include/writer.h | 18 ++++ iris/run.sh | 6 ++ iris/src/base_logger.cpp | 137 ++++++++++++++++++++++++++++ iris/src/buffered_writer.cpp | 27 ++++++ iris/src/file_writer.cpp | 43 +++++++++ iris/src/level_logger.cpp | 5 + iris/src/main.cpp | 86 ++++++++++++++++++ iris/src/notifier.cpp | 79 ++++++++++++++++ iris/src/stream_writer.cpp | 34 +++++++ iris/src/utils.cpp | 22 +++++ iris/tests/test_lfringbuffer.cpp | 116 ++++++++++++++++++++++++ 27 files changed, 1483 insertions(+) create mode 100644 iris/LICENSE create mode 100644 iris/Makefile create mode 100644 iris/README.md create mode 100755 iris/compile.sh create mode 100644 iris/include/base_logger.h create mode 100644 iris/include/buffered_writer.h create mode 100644 iris/include/define.h create mode 100644 iris/include/file_writer.h create mode 100644 iris/include/formatter.h create mode 100644 iris/include/level_logger.h create mode 100644 iris/include/lfringbuffer.h create mode 100644 iris/include/notifier.h create mode 100644 iris/include/snprintf_formatter.h create mode 100644 iris/include/sslfqueue.h create mode 100644 iris/include/stream_writer.h create mode 100644 iris/include/utils.h create mode 100644 iris/include/writer.h create mode 100755 iris/run.sh create mode 100644 iris/src/base_logger.cpp create mode 100644 iris/src/buffered_writer.cpp create mode 100644 iris/src/file_writer.cpp create mode 100644 iris/src/level_logger.cpp create mode 100644 iris/src/main.cpp create mode 100644 iris/src/notifier.cpp create mode 100644 iris/src/stream_writer.cpp create mode 100644 iris/src/utils.cpp create mode 100644 iris/tests/test_lfringbuffer.cpp diff --git a/iris/LICENSE b/iris/LICENSE new file mode 100644 index 0000000..fc140f3 --- /dev/null +++ b/iris/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Xinjing Cho + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/iris/Makefile b/iris/Makefile new file mode 100644 index 0000000..a2ee497 --- /dev/null +++ b/iris/Makefile @@ -0,0 +1,78 @@ +#sources +IRIS_SRC = src/base_logger.cpp src/buffered_writer.cpp src/file_writer.cpp src/level_logger.cpp src/notifier.cpp src/stream_writer.cpp src/utils.cpp +IRIS_TEST_SRC = $(IRIS_SRC) tests/test_lfringbuffer.cpp +IRIS_TEST2_SRC = $(IRIS_SRC) src/main.cpp +#object files +IRIS_OBJS = base_logger.o buffered_writer.o file_writer.o level_logger.o notifier.o stream_writer.o utils.o +IRIS_TEST_OBJS = $(IRIS_OBJS) test_lfringbuffer.o +IRIS_TEST2_OBJS = $(IRIS_OBJS) test2.o +#executable +PROGRAM = libiris.so +#PROGRAM = main +#compiler +CC = g++ + +CACHELINE_SIZE = +OS := $(shell uname) +ifeq ($(OS), Darwin) +CACHELINE_SIZE = $(shell sysctl -n hw.cachelinesize) +else +CACHELINE_SIZE = $(shell getconf LEVEL1_DCACHE_LINESIZE) +endif + +#includes +INCLUDE = -Iinclude +#linker params +LINKPARAMS = -fpic -lpthread -shared +#linker params for tests +LINKPARAMS_TEST = -fpic -lpthread +#options for development +CFLAGS = --std=c++11 -g -O3 -Wall -fpic -DIRIS_CACHELINE_SIZE=$(CACHELINE_SIZE) # removed flags -Werror -Wno-unused-private-field +#options for release +#CFLAGS = --std=c++11 -g -O2 -Wall -Werror -fpic -shared + +$(PROGRAM): $(IRIS_OBJS) + $(CC) -o $(PROGRAM) $(IRIS_OBJS) $(CFLAGS) $(LINKPARAMS) + +base_logger.o: src/base_logger.cpp include/base_logger.h + $(CC) -c src/base_logger.cpp -o base_logger.o $(CFLAGS) $(INCLUDE) + +buffered_writer.o: src/buffered_writer.cpp include/buffered_writer.h + $(CC) -c src/buffered_writer.cpp -o buffered_writer.o $(CFLAGS) $(INCLUDE) + +file_writer.o: src/file_writer.cpp include/file_writer.h + $(CC) -c src/file_writer.cpp -o file_writer.o $(CFLAGS) $(INCLUDE) + +stream_writer.o: src/stream_writer.cpp include/stream_writer.h + $(CC) -c src/stream_writer.cpp -o stream_writer.o $(CFLAGS) $(INCLUDE) + +level_logger.o: src/level_logger.cpp include/level_logger.h + $(CC) -c src/level_logger.cpp -o level_logger.o $(CFLAGS) $(INCLUDE) + +utils.o: src/utils.cpp include/utils.h + $(CC) -c src/utils.cpp -o utils.o $(CFLAGS) $(INCLUDE) + +notifier.o: src/notifier.cpp include/notifier.h + $(CC) -c src/notifier.cpp -o notifier.o $(CFLAGS) $(INCLUDE) + +test: test_lfringbuffer + +test_lfringbuffer: $(IRIS_TEST_OBJS) + $(CC) -o test_lfringbuffer $(IRIS_TEST_OBJS) $(CFLAGS) $(LINKPARAMS_TEST) + $(CC) -o test2 $(IRIS_TEST2_OBJS) $(CFLAGS) $(LINKPARAMS_TEST) + +test_lfringbuffer.o: tests/test_lfringbuffer.cpp src/main.cpp + $(CC) -c tests/test_lfringbuffer.cpp -o test_lfringbuffer.o $(CFLAGS) $(INCLUDE) + $(CC) -c src/main.cpp -o test2.o $(CFLAGS) $(INCLUDE) + + +clean: + rm -rf *.o + rm -rf $(PROGRAM) test_lfringbuffer + rm -rf $(PROGRAM) test2 + +install: + mkdir -p /usr/local/include/iris + cp include/* /usr/local/include/iris + cp $(PROGRAM) /usr/local/lib +.PHONY: clean test diff --git a/iris/README.md b/iris/README.md new file mode 100644 index 0000000..dd1f2b0 --- /dev/null +++ b/iris/README.md @@ -0,0 +1,82 @@ +# Overview +`iris` is asynchronous logging library designed to log messages with minimum overhead to the performance. The cost of logging a message is merely copying the arguments into a lockfree queue. + +# Design +Under the hood, `iris` uses a background thread (logging thread) to offload logging from other threads. +1. For every other threads, they keep a lockfree queue to buffer the messages. +2. The logging thread periodically scans through all these queues to collect messages, formats them and writes them into log file. + +A few highlights of the design of `iris` to achieve low latency: +1. Buffering messages with thread local lockfree queue. +2. Memory is managed by a thread local ringbuffer taking the advantage that logging is FIFO. Because the fact that logging is FIFO. This scales well in multithreaded environment. +3. Minimum context switches. + +# Usage +The supported severity levels are: +```c++ +enum severity_level { + TRACE, + DEBUG, + INFO, + WARN, + ERROR, + FATAL +}; +``` +By default, `iris` logs messages to `stdout` and filters out severity level less than `INFO`. + +```c++ +#include + +// this creates a logging thread, logs messages to stdout +iris::level_logger g_log; + +int main (int argc, char const *argv[]) { + //configure thread level parameters, these should be done before any logging + // queue size + g_log.set_thread_queue_size(1024); + // ring buffer size + g_log.set_thread_ringbuf_size(10240); + + g_log.info("Greetings from %s, bye %d\n", 'iris', 0); + + //this tells logging thread to persist the data into file and waits + g_log.sync_and_close(); + + return 0; +} +``` +Using a `file_writer` to logs all messages into a file: +```c++ +#include +#include + +iris::file_writer writer("./log.txt"); +// this creates a logging thread +iris::level_logger g_log(&writer, iris::TRACE); + +int main (int argc, char const *argv[]) { + //configure thread level parameters, these should be done before any logging + // queue size + g_log.set_thread_queue_size(1024); + // ring buffer size + g_log.set_thread_ringbuf_size(20480); + + g_log.info("Greetings from %s, bye %d\n", 'iris', 0); + + //this tells logging thread to persist the data into file and waits + g_log.sync_and_close(); + + return 0; +} +``` + +# Building & Installation + +To build the library, simply clone the projet, go inside the `iris` direcotry and run following commands. +```shell +make +make test +make install +``` +To integrate `iris` into your program, link with `-liris` options. \ No newline at end of file diff --git a/iris/compile.sh b/iris/compile.sh new file mode 100755 index 0000000..75d763f --- /dev/null +++ b/iris/compile.sh @@ -0,0 +1,5 @@ +#!/bin/bash +make +make test + + diff --git a/iris/include/base_logger.h b/iris/include/base_logger.h new file mode 100644 index 0000000..69cf1ff --- /dev/null +++ b/iris/include/base_logger.h @@ -0,0 +1,146 @@ +#ifndef IRIS_BASE_LOG_H +#define IRIS_BASE_LOG_H + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "define.h" +#include "writer.h" +#include "lfringbuffer.h" +#include "sslfqueue.h" +#include "notifier.h" +#include "utils.h" +#include "formatter.h" + +namespace iris { + +struct thread_logqueue; +struct thread_logqueue_holder; +extern notifier ntfer; +extern thread_logqueue_holder this_thread_logqueue; +extern size_t thread_queue_size; +extern size_t thread_ringbuf_size; + +struct thread_logqueue_holder { +private: + thread_logqueue * q; +public: + thread_logqueue_holder():q(nullptr){} + inline void assign(thread_logqueue * queue) { + q = queue; + } + inline thread_logqueue *get() { + return q; + } + inline void notify_and_quit() { + if (q) { + ntfer.notify(notifier::to_ntf_t((long)q, ntf_queue_deletion)); + q = nullptr; + } + } + ~thread_logqueue_holder() { + notify_and_quit(); + } +}; + +// Every thread which uses logging primitives owns one of this +// single-wrtiter-single-reader queue for buffering logs. +// These thread_local queues form a singly-linked list which is +// polled by a dedicated output thread to collect logs. +struct thread_logqueue { +public: + thread_logqueue(size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by normal threadss + thread_logqueue(thread_logqueue *head, size_t queue_size = 5000, size_t rbuf_size = 102400); // used only by output thread + ~thread_logqueue(); +public: + sslfqueue q; + lfringbuffer rbuf; + std::atomic next; + thread_logqueue * head; + bool output_thread; // if current thread is the output thread +}; + +class base_logger { +public: + base_logger(writer * pwriter = nullptr, + size_t scan_interval = 100, + size_t output_buffer_size = 102400, + size_t default_thread_ringbuf_size = 102400, + size_t default_thread_queue_size = 5000); + + // set the thread local queue size + void set_thread_queue_size(size_t size) { + thread_queue_size = size; + } + // set the thread local ring buffer size + void set_thread_ringbuf_size(size_t size) { + thread_ringbuf_size = size; + } + + template + void log(Args&&... args) { + thread_logqueue * this_queue = this_thread_logqueue.get(); + + if (iris_unlikely(!this_queue)) { + if (thread_queue_size == 0) + thread_queue_size = m_default_thread_queue_size; + if (thread_ringbuf_size == 0) + thread_ringbuf_size = m_default_thread_ringbuf_size; + this_queue = new thread_logqueue(&m_head, thread_queue_size, thread_ringbuf_size); + this_thread_logqueue.assign(this_queue); + } + + typedef std::tuple::type...> Args_t; + const size_t buffer_size = sizeof(formatter_type) + sizeof(Args_t); + const size_t args_offset = sizeof(formatter_type); + + loglet_t l; + while ((l.rbuf_alloc_size = this_queue->rbuf.acquire(buffer_size, l.rbuf_ptr)) == 0) + std::this_thread::yield(); + *reinterpret_cast(l.rbuf_ptr) = &formatter_caller::type...>; + // inplace construct parameter pack + new (l.rbuf_ptr + args_offset) Args_t(std::forward(args)...); + + // try to publish to output thread + if (iris_likely(this_queue->q.offer(l))) { + return; + } + + // queue is full, notify the output thread + ntfer.notify(notifier::to_ntf_t(reinterpret_cast(this_queue), ntf_msg)); + // busy yielding until the log is buffered + do { + std::this_thread::yield(); + } while (!this_queue->q.offer(l)); + } + + void sync_and_close() { + if (!m_stop.load(std::memory_order_acquire)) { + this_thread_logqueue.notify_and_quit(); + m_stop.store(true, std::memory_order_release); + m_output_thread.join(); + } + } + + ~base_logger() { + sync_and_close(); + } +private: + std::atomic m_stop; + thread_logqueue m_head; + std::thread m_output_thread; + size_t m_default_thread_ringbuf_size; + size_t m_default_thread_queue_size; +}; + +} + +#endif diff --git a/iris/include/buffered_writer.h b/iris/include/buffered_writer.h new file mode 100644 index 0000000..1614c78 --- /dev/null +++ b/iris/include/buffered_writer.h @@ -0,0 +1,67 @@ +#ifndef IRIS_BUFFERED_WRITER_H_ +#define IRIS_BUFFERED_WRITER_H_ +#include +#include + +#include "writer.h" + +namespace iris { + +struct buffered_writer { +private: + buffered_writer(const buffered_writer & rhs)=delete; + struct buffered_writer & operator=(const buffered_writer &rhs)=delete; + writer &m_w; + + char *m_buf; + int m_pos; + int m_capacity; +public: + buffered_writer(writer & w, size_t cap = 0); + + ~buffered_writer(); + + void expand(size_t n); + + inline void reset() { + m_pos = 0; + } + + inline char * reserve(size_t s) { + if (freespace() < s) { + flush(); + if (freespace() < s) + throw std::bad_alloc(); + } + + return write_pointer(); + } + + inline size_t freespace() { + return m_capacity - m_pos; + } + + inline void inc_write_pointer(size_t s) { + m_pos += s; + } + + inline char * write_pointer() { + return m_buf + m_pos; + } + + inline size_t size() { + return m_pos; + } + + inline operator char*() { + return m_buf; + } + + inline void flush() { + m_w.write(m_buf, m_pos); + reset(); + } +}; + +} +#endif diff --git a/iris/include/define.h b/iris/include/define.h new file mode 100644 index 0000000..8ee1cf8 --- /dev/null +++ b/iris/include/define.h @@ -0,0 +1,33 @@ +#ifndef IRIS_DEFINE_H_ +#define IRIS_DEFINE_H_ +#include + +namespace iris { + +#define iris_likely(x) __builtin_expect(!!(long)(x), 1) +#define iris_unlikely(x) __builtin_expect(!!(long)(x), 0) + +struct loglet_t { + // @pbuffer holds formatter function address and arguments + char * rbuf_ptr; + size_t rbuf_alloc_size; + loglet_t(char * ptr = nullptr, size_t alloc_size = 0):rbuf_ptr(ptr), rbuf_alloc_size(alloc_size) {} +}; + +#if defined(IOV_MAX) /* Linux x86 (glibc-2.3.6-3) */ + #define MAX_IOVECS IOV_MAX +#elif defined(MAX_IOVEC) /* Linux ia64 (glibc-2.3.3-98.28) */ + #define MAX_IOVECS MAX_IOVEC +#elif defined(UIO_MAXIOV) /* Linux x86 (glibc-2.2.5-233) */ + #define MAX_IOVECS UIO_MAXIOV +#elif (defined(__FreeBSD__) && __FreeBSD_version < 500000) || defined(__DragonFly__) || defined(__APPLE__) + /* - FreeBSD 4.x + * - MacOS X 10.3.x + * (covered in -DKERNEL) + * */ + #define MAX_IOVECS 1024 +#else + #error "can't deduce the maximum number of iovec in a readv/writev syscall" +#endif +} +#endif diff --git a/iris/include/file_writer.h b/iris/include/file_writer.h new file mode 100644 index 0000000..d59fe8e --- /dev/null +++ b/iris/include/file_writer.h @@ -0,0 +1,24 @@ +#ifndef IRIS_FILE_WRITER_H_ +#define IRIS_FILE_WRITER_H_ + + +#include +#include + +#include "stream_writer.h" + +namespace iris { + +// A log writer that appends logs to a file. +class file_writer: public stream_writer { +public: + file_writer(const char * filename); + ~file_writer(); + virtual void write(const char * buffer, size_t len); +private: + std::string m_filename; +}; + +} + +#endif \ No newline at end of file diff --git a/iris/include/formatter.h b/iris/include/formatter.h new file mode 100644 index 0000000..198a274 --- /dev/null +++ b/iris/include/formatter.h @@ -0,0 +1,30 @@ +#ifndef IRIS_FORMATTER_H_ +#define IRIS_FORMATTER_H_ +#include "define.h" +#include "buffered_writer.h" +#include "utils.h" + +namespace iris { +// fomatter do the real formatting logic, output into @obuf +// return false if there is not enough space in @obuf. +typedef void (*formatter_type)(const loglet_t & l, buffered_writer & w); + +template +static void call_format(buffered_writer & w, const std::tuple & args, seq) { + return Formatter::format(&w, std::move(std::get(args))...); +} + +template +static void formatter_caller(const loglet_t & l, buffered_writer & w) { + const size_t args_offset = sizeof(formatter_type); + + typedef std::tuple Args_t; + Args_t & args = *reinterpret_cast(l.rbuf_ptr + args_offset); + typename make_sequence::type indexes; + call_format(w, args, indexes); + //deconstruct parameter pack + args.~Args_t(); +} + +} +#endif \ No newline at end of file diff --git a/iris/include/level_logger.h b/iris/include/level_logger.h new file mode 100644 index 0000000..59dff46 --- /dev/null +++ b/iris/include/level_logger.h @@ -0,0 +1,83 @@ +#ifndef IRIS_SEVERITY_LOGGER_H_ +#define IRIS_SEVERITY_LOGGER_H_ + +#include "snprintf_formatter.h" +#include "base_logger.h" + +namespace iris { + +enum severity_level { + UNSET, + TRACE, + DEBUG, + INFO, + WARN, + ERROR, + FATAL +}; +extern severity_level thread_severity_level; + +// a level logger provides per-thread severity level filtering . +class level_logger: public base_logger { +public: + level_logger(writer * pwriter = nullptr, + severity_level default_level = INFO, + size_t scan_interval = 100, + size_t output_buffer_size = 102400, + size_t default_thread_ringbuf_size = 102400, + size_t default_thread_queue_size = 5000): + base_logger(pwriter, scan_interval, + output_buffer_size, + default_thread_ringbuf_size, + default_thread_queue_size), + m_default_level(default_level) {} + + // set the severity level for the current thread + void set_severity_level(severity_level level) { + thread_severity_level = level; + } + + template + void log(severity_level level, const char * fmt, Args&&... args) { + if (iris_unlikely(thread_severity_level == UNSET)) + thread_severity_level = m_default_level; + if (thread_severity_level <= level) { + base_logger::log(fmt, std::forward(args)...); + } + } + + template + void trace(const char * fmt, Args&&... args) { + log(TRACE, fmt, std::forward(args)...); + } + + template + void debug(const char * fmt, Args&&... args) { + log(DEBUG, fmt, std::forward(args)...); + } + + template + void info(const char * fmt, Args&&... args) { + log(INFO, fmt, std::forward(args)...); + } + + template + void warn(const char * fmt, Args&&... args) { + log(WARN, fmt, std::forward(args)...); + } + + template + void error(const char * fmt, Args&&... args) { + log(ERROR, fmt, std::forward(args)...); + } + + template + void fatal(const char * fmt, Args&&... args) { + log(FATAL, fmt, std::forward(args)...); + } +private: + severity_level m_default_level; +}; + +} +#endif diff --git a/iris/include/lfringbuffer.h b/iris/include/lfringbuffer.h new file mode 100644 index 0000000..fb6d922 --- /dev/null +++ b/iris/include/lfringbuffer.h @@ -0,0 +1,103 @@ +#ifndef IRIS_LF_RINGBUFFER_H_ +#define IRIS_LF_RINGBUFFER_H_ +#include + +#include "define.h" +#include "utils.h" +#include +#include +namespace iris { + +// A single reader single writer lockfree ring buffer +// this should be used as following: +// A thread calls acquire() method repeatedly +// until the return value is nonzero to allocate @size bytes of memory. +// And release() method should be called in the order of allocation +// with the same @size parameter to declare the release. +class lfringbuffer { +private: + size_t cap; + size_t mask; + char * buffer; + char __pad0__[IRIS_CACHELINE_SIZE - sizeof(size_t) * 2 - sizeof(char*)]; + unsigned long head; + unsigned long tail_cache; + char __pad1__[IRIS_CACHELINE_SIZE - sizeof(unsigned long) * 2]; + std::atomic tail; + char __pad2__[IRIS_CACHELINE_SIZE - sizeof(std::atomic)]; +public: + lfringbuffer(size_t size) { + cap = round_up_to_next_multiple_of_2(size); + mask = cap - 1; + buffer = new char[cap]; + head = 0; + tail = cap; + tail_cache = cap; + } + + ~lfringbuffer() { + delete[] buffer; + buffer = nullptr; + } + + // acquire() tries to allocate @size bytes of memory + // from the buffer, the address of the memory acquired + // is stored in @ptr. + // returns the actual bytes allocated insided the buffer + // which might be larger than the requested size, since + // internally the buffer is implemented by a array and + // there is unusable memory need to be taken into account + // when the request can not be fulfilled using only the memory + // left at the end of the buffer. + // 0 will be returned if there is not enough free space. + size_t acquire(size_t size, char *& ptr) { + assert(size); + size_t total_free = tail_cache - head; + if (iris_unlikely(total_free < size)) { + // load the latest tail + tail_cache = tail.load(std::memory_order_acquire); + total_free = tail_cache - head; + if (total_free < size) + return 0; // no enough freespace + } + + // now check if there is enough memory at the rear + size_t rear_free = cap - (head & mask); + if (iris_likely(rear_free >= size)) { + // ok, take the rear chunk + ptr = buffer + (head & mask); + head += size; + return size; + } + + // now check if there is enough memory at the front + size_t front_free = tail_cache & mask; + if (front_free < size) { + tail_cache = tail.load(std::memory_order_acquire); + front_free = tail_cache & mask; + if (front_free < size) + return 0;// no enough space + } + + ptr = buffer; + + // throw away the rear fragmentation + head += rear_free + size; + + return rear_free + size; + } + + // release() method releases @size bytes of memory allocated from + // acquire() method. @size should be the same as the one retuned from + // acquire(). + void release(size_t size) { + tail.fetch_add(size, std::memory_order_release); + } + + size_t freespace() { + return tail.load(std::memory_order_acquire) - head; + } +}; + +} +#endif \ No newline at end of file diff --git a/iris/include/notifier.h b/iris/include/notifier.h new file mode 100644 index 0000000..89ce3fc --- /dev/null +++ b/iris/include/notifier.h @@ -0,0 +1,41 @@ +#ifndef IRIS_NOTIFIER_H_ +#define IRIS_NOTIFIER_H_ +#include + +namespace iris { + +// Bitmask at the lowest bits of a long type +// representing types of notification. +enum ntf_type{ + ntf_msg = 1, // first bit for message + ntf_queue_deletion = 2, // second bit for queue deletion + ntf_type_mask = 3 +}; + +typedef long ntf_t; + +class notifier { +public: + notifier(); + ~notifier(); + // send a notification to the receiver + void notify(const ntf_t & ntf); + // wait for notifications + // return false if timed out or on error + bool wait(long timeout, std::vector & ntfs); + static inline ntf_t to_ntf_t(long data, ntf_type type) { + return data | (int)type; + } + static inline long to_data_t(ntf_t ntf) { + return ntf & ~ntf_type_mask; + } + static inline ntf_type to_ntf_type(ntf_t ntf) { + return static_cast(ntf & ntf_type_mask); + } +private: + int m_pipe[2]; + long m_poll_time; +}; + +} +#endif \ No newline at end of file diff --git a/iris/include/snprintf_formatter.h b/iris/include/snprintf_formatter.h new file mode 100644 index 0000000..5dfdcbc --- /dev/null +++ b/iris/include/snprintf_formatter.h @@ -0,0 +1,42 @@ +#ifndef IRIS_SNPRINTF_FORMATTER_H_ +#define IRIS_SNPRINTF_FORMATTER_H_ + +#include +#include + +#include + +#include "formatter.h" +#include "utils.h" +#include "define.h" +#include "buffered_writer.h" + +namespace iris { +class snprintf_formatter { +public: + +template +static void format(buffered_writer * bw, const char * fmt, Args&&... args) { + size_t n; + while (true) { + n = std::snprintf(bw->write_pointer(), bw->freespace(), fmt, args...); + if (iris_unlikely(n < 0)) { + return;//skip over this formatting if error occurred + } + if (iris_unlikely(n >= bw->freespace())) { + bw->flush(); // flush buffer and retried + continue; + } + bw->inc_write_pointer(n); + assert(*bw->write_pointer() == 0); + *bw->write_pointer() = '\n';// replace '\0' with '\n' + bw->inc_write_pointer(1); + break; // TODO what to do with failed formatting? + } + +} + + +}; +} +#endif \ No newline at end of file diff --git a/iris/include/sslfqueue.h b/iris/include/sslfqueue.h new file mode 100644 index 0000000..d27fb35 --- /dev/null +++ b/iris/include/sslfqueue.h @@ -0,0 +1,103 @@ +#ifndef IRIS_SSLFQUEUE_H_ +#define IRIS_SSLFQUEUE_H_ +#include +#include +#include +#include "utils.h" + +namespace iris { + +//a single producer single consumer lockfree ring queue. +template +class sslfqueue { +private: + size_t cap; + size_t mask; + T * buffer; + char _pad0_[IRIS_CACHELINE_SIZE - sizeof(T*) - sizeof(int) * 2]; + std::atomic head; + char _pad1_[IRIS_CACHELINE_SIZE - sizeof(std::atomic)]; + long tail_cache; + char _pad2_[IRIS_CACHELINE_SIZE - sizeof(long)]; + std::atomic tail; + char _pad3_[IRIS_CACHELINE_SIZE - sizeof(std::atomic)]; + long head_cache; + char _pad4_[IRIS_CACHELINE_SIZE - sizeof(long)]; + +public: + + sslfqueue(size_t capacity = 5000):head(0), tail_cache(0), tail(0), head_cache(0) + { + cap = round_up_to_next_multiple_of_2(capacity); + mask = cap - 1; + buffer = new T[cap]; + } + + ~sslfqueue(){ + delete []buffer; + } + + bool offer(const T & e) { + const long cur_tail = tail.load(std::memory_order_acquire); + const long len = cur_tail - cap; + if (head_cache <= len) { + head_cache = head.load(std::memory_order_acquire); + if (head_cache <= len) + return false; // queue is full + } + + buffer[cur_tail & mask] = e; + tail.store(cur_tail + 1, std::memory_order_release); + + return true; + } + + bool poll(T & e) { + const long cur_head = head.load(std::memory_order_acquire); + if (cur_head >= tail_cache) { + tail_cache = tail.load(std::memory_order_acquire); + if (cur_head >= tail_cache) + return false; // queue is empty + } + + e = buffer[cur_head & mask]; + head.store(cur_head + 1, std::memory_order_release); + + return true; + } + + bool batch_poll(std::vector & vec) { + long cur_head = head.load(std::memory_order_acquire); + if (cur_head >= tail_cache) { + tail_cache = tail.load(std::memory_order_acquire); + if (cur_head >= tail_cache) + return false; // queue is empty + } + + while (cur_head < tail_cache) { + vec.push_back(buffer[cur_head & mask]); + ++cur_head; + } + + head.store(cur_head, std::memory_order_release); + + return true; + } + + bool empty() { + return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == 0; + } + + bool full() { + return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire) == cap; + } + + size_t size() { + return tail.load(std::memory_order_acquire) - head.load(std::memory_order_acquire); + } +}; + +} +#endif + + diff --git a/iris/include/stream_writer.h b/iris/include/stream_writer.h new file mode 100644 index 0000000..df1888b --- /dev/null +++ b/iris/include/stream_writer.h @@ -0,0 +1,25 @@ +#ifndef IRIS_STREAM_WRITER_H_ +#define IRIS_STREAM_WRITER_H_ + + +#include +#include + +#include "writer.h" + +namespace iris { + +// A log writer that appends logs to a standard stream. +class stream_writer: public writer { +public: + stream_writer(FILE * stream); + virtual void write(const char * msg); + virtual void write(const char * buffer, size_t len); +protected: + FILE * m_stream; + int m_fd; +}; + +} + +#endif \ No newline at end of file diff --git a/iris/include/utils.h b/iris/include/utils.h new file mode 100644 index 0000000..e998c8a --- /dev/null +++ b/iris/include/utils.h @@ -0,0 +1,26 @@ +#ifndef IRIS_UTILS_H_ +#define IRIS_UTILS_H_ +#include + +#include +namespace iris{ + +long long get_current_time_in_us(); + + +int round_up_to_next_multiple_of_2(int n); + +/* utilitis for making sequence for tuple element retrieval */ +template struct seq {}; +template struct seq_helper: seq_helper {}; +template struct seq_helper { + typedef seq type; +}; +template +struct make_sequence { + typedef typename seq_helper<0, N>::type type; +}; + + +} +#endif \ No newline at end of file diff --git a/iris/include/writer.h b/iris/include/writer.h new file mode 100644 index 0000000..d32f20b --- /dev/null +++ b/iris/include/writer.h @@ -0,0 +1,18 @@ +#ifndef IRIS_WRITER_H_ +#define IRIS_WRITER_H_ + +#include + +#include +namespace iris { + +// interface for log writting policy +class writer { +public: + virtual void write(const char * msg) = 0; + virtual void write(const char * buffer, size_t len) = 0; +}; + +} + +#endif \ No newline at end of file diff --git a/iris/run.sh b/iris/run.sh new file mode 100755 index 0000000..ff6f270 --- /dev/null +++ b/iris/run.sh @@ -0,0 +1,6 @@ +#!/bin/bash +./test_lfringbuffer +./test2 +rm log.txt + + diff --git a/iris/src/base_logger.cpp b/iris/src/base_logger.cpp new file mode 100644 index 0000000..a1a43ca --- /dev/null +++ b/iris/src/base_logger.cpp @@ -0,0 +1,137 @@ +#include +#include +#include + +#include +#include +#include + +using namespace iris; + +namespace iris{ + +static stream_writer stdout_writer(stdout); + +thread_logqueue_holder this_thread_logqueue; +notifier ntfer; +size_t thread_queue_size; +size_t thread_ringbuf_size; + +} + +thread_logqueue::thread_logqueue(size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), output_thread(true) + {} + +thread_logqueue::thread_logqueue(thread_logqueue *head, size_t queue_size, size_t rbuf_size): q(queue_size), rbuf(rbuf_size), next(nullptr), head(head), output_thread(false){ + thread_logqueue * p = nullptr; + + do { + // insert self into the head of the global linked list lockfreely + p = head->next.load(std::memory_order_acquire); + this->next.store(p, std::memory_order_release); + // head might have been modified or deleted cas until this is inserted + if (head->next.compare_exchange_weak(p, this, std::memory_order_release)) { + return; + } + } while (true); +} + +thread_logqueue::~thread_logqueue() { + if (output_thread) + return; + thread_logqueue * p = nullptr, *pnext = nullptr, *q = this; + + // remove self from the global linked list lockfreely + p = head; + while (p->next.load(std::memory_order_acquire) != q) + p = p->next.load(std::memory_order_acquire); + + pnext = this->next.load(std::memory_order_acquire); + // mark this as deleted(by setting this->next to nullptr) + while (!this->next.compare_exchange_weak(pnext, nullptr, std::memory_order_release)) { + next = this->next.load(std::memory_order_acquire); + } + + do { + if (p->next.compare_exchange_weak(q, pnext, std::memory_order_release)) { + return; + } + // some other nodes have been inserted after p, restart. + p = head; + while (p->next.load(std::memory_order_acquire) != q) + p = p->next.load(std::memory_order_acquire); + } while(true); +} + +static void do_format_and_flush(thread_logqueue *p, std::vector & logs, buffered_writer & w) { + for (size_t i = 0; i < logs.size(); ++i) { + auto f = *reinterpret_cast(logs[i].rbuf_ptr); + (*f)(logs[i], w); + p->rbuf.release(logs[i].rbuf_alloc_size); + } + logs.clear(); +} + +static void iris_thread(writer * pwriter, std::atomic * stop, size_t scan_interval,size_t output_buffer_size, thread_logqueue * head) { + std::vector logs; + std::vector ntfs; + buffered_writer bw(*pwriter, output_buffer_size); + while(!*stop) { + // iterate through the linked list of input queues, collect log entries + thread_logqueue * p = head; + bool empty = true; + while (p) { + if (p->q.batch_poll(logs)) { + do_format_and_flush(p, logs, bw); + empty = false; + } + p = p->next; + } + + if (empty) { + ntfs.clear(); + // wait for notification or 100ms + ntfer.wait(scan_interval, ntfs); + for (size_t i = 0; i < ntfs.size(); ++i) { + ntf_t ntf = ntfs[i]; + p = reinterpret_cast(notifier::to_data_t(ntf)); + switch(notifier::to_ntf_type(ntf)) { + case ntf_msg: + if (p->q.batch_poll(logs)) + do_format_and_flush(p, logs, bw); + break; + case ntf_queue_deletion: + if (p->q.batch_poll(logs)) + do_format_and_flush(p, logs, bw); + delete p; + break; + default: + break; + } + } + } + } + + //collect one more time + thread_logqueue * p = head; + while (p) { + if (p->q.batch_poll(logs)) + do_format_and_flush(p, logs, bw); + p = p->next; + } + bw.flush(); +} + +base_logger::base_logger(writer * pwriter, + size_t scan_interval, + size_t output_buffer_size, + size_t default_thread_ringbuf_size, + size_t default_thread_queue_size): + m_stop(0), + m_default_thread_ringbuf_size(default_thread_ringbuf_size), + m_default_thread_queue_size(default_thread_queue_size) +{ + if (pwriter == nullptr) + pwriter = &stdout_writer; + m_output_thread = std::thread(iris_thread, pwriter, &m_stop, scan_interval, output_buffer_size, &m_head); +} diff --git a/iris/src/buffered_writer.cpp b/iris/src/buffered_writer.cpp new file mode 100644 index 0000000..6713a7e --- /dev/null +++ b/iris/src/buffered_writer.cpp @@ -0,0 +1,27 @@ +#include + +using namespace iris; + +buffered_writer::buffered_writer(writer & wter, size_t cap): m_w(wter), m_buf(nullptr), m_pos(0), m_capacity(cap) { + if (m_capacity) { + m_buf = new char[m_capacity]; + } +} + +buffered_writer::~buffered_writer() { + if (m_buf) { + delete[] m_buf; + m_buf = nullptr; + m_pos = m_capacity = 0; + } +} + +void buffered_writer::expand(size_t n) { + if (n <= m_capacity) + return; + char *tmp_buf = new char[n]; + memcpy(tmp_buf, this->m_buf, this->size()); + delete[] this->m_buf; + this->m_buf = tmp_buf; + this->m_capacity = n; +} \ No newline at end of file diff --git a/iris/src/file_writer.cpp b/iris/src/file_writer.cpp new file mode 100644 index 0000000..b322d5c --- /dev/null +++ b/iris/src/file_writer.cpp @@ -0,0 +1,43 @@ +#include +#include +#include +#include + + +using namespace iris; + +file_writer::file_writer(const char * filename): stream_writer(nullptr) { + FILE *fp = fopen(filename, "a"); + if (fp == nullptr) { + std::string error_string = "[iris] failed to open log file <"; + error_string += filename; + error_string += ">, reason: "; + error_string += strerror(errno); + throw error_string; + } + m_stream = fp; + m_fd = fileno(m_stream); +} + +file_writer::~file_writer() { + if (m_stream == nullptr) + return; + fclose(m_stream); + m_stream = nullptr; + m_fd = -1; +} + +void file_writer::write(const char * buffer, size_t len) { + size_t offset = 0; + while (len) { + size_t written = ::write(m_fd, buffer + offset, len); + if (written <= 0) { + if (errno == EINTR) + continue; + fprintf(stderr, "[iris] error, should write %lu byts, only %lu bytes written, reason: %s.\n", len, written, strerror(errno)); + break; + } + len -= written; + offset += written; + } +} diff --git a/iris/src/level_logger.cpp b/iris/src/level_logger.cpp new file mode 100644 index 0000000..102bfed --- /dev/null +++ b/iris/src/level_logger.cpp @@ -0,0 +1,5 @@ +#include + +namespace iris { +severity_level thread_severity_level; +} \ No newline at end of file diff --git a/iris/src/main.cpp b/iris/src/main.cpp new file mode 100644 index 0000000..3771525 --- /dev/null +++ b/iris/src/main.cpp @@ -0,0 +1,86 @@ +#include + + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +/* +using log_t = reckless::severity_log< + reckless::indent<4>, // 4 spaces of indent + ' ' // Field separator + >; +reckless::file_writer rwriter("log_reckless.txt"); +log_t r_log(&rwriter, 102400, 65534, 102400); +*/ + +iris::file_writer writer("./log.txt"); +iris::level_logger g_log(&writer, iris::INFO); +int freq_map[50000000]; + +long long g_max_lat; +long long g_min_lat; +#define ITERATIONS 1e3 + +void worker_thread() { + std::hash hasher; + long long max_lat = std::numeric_limits::lowest(), min_lat = std::numeric_limits::max(), avg_lat = 0, sum = 0; + size_t tid = hasher(std::this_thread::get_id()); + g_log.set_thread_queue_size(65534); + g_log.set_thread_ringbuf_size(655340); + auto start = std::chrono::high_resolution_clock::now(); + for (int i = 1; i <= ITERATIONS; ++i) { + auto start = std::chrono::high_resolution_clock::now(); + g_log.info("hello %s, world %d, there %d, here %d, idx: %d", "dsads", 231, 0, 0, i); + auto finish = std::chrono::high_resolution_clock::now(); + long long lat = std::chrono::duration_cast(finish-start).count(); + //sum += lat; + max_lat = std::max(max_lat, lat); + min_lat = std::min(min_lat, lat); + g_max_lat = std::max(max_lat, g_max_lat); + g_min_lat = std::min(min_lat, g_min_lat); + //avg_lat = sum / i; + if (lat < 50000000) + freq_map[lat]++; + //if (i % 10000 == 0) + // usleep(1000); + } + auto finish = std::chrono::high_resolution_clock::now(); + long long lat = std::chrono::duration_cast(finish-start).count(); + printf("thread_id: %lu max_lat: %lldns, min_lat: %lldns, avg_lat: %lldns, latency sum %lldns\n", tid, 0ll, min_lat, (long long)lat / (long long)ITERATIONS, lat); +} +int main(int argc, char const *argv[]) { + g_max_lat = 0; + g_min_lat = std::numeric_limits::max(); + int n = 10; + if (argc > 1) { + n = std::stoi(argv[1]); + } + std::vector workers; + + for (int i = 0; i < n; ++i) { + workers.push_back(std::thread(worker_thread)); + } + for (int i = 0; i < n; ++i) { + workers[i].join(); + } + printf("\nlatency,count\n"); + g_max_lat = std::min(50000000 - 1, (int)g_max_lat); + for (int i = g_min_lat; i <= 400; ++i) { + if (freq_map[i]) + printf("%d,%d\n", i, freq_map[i]); + } + //printf("\nno,latency\n"); + //for (size_t i = 0; i < lats.size(); ++i) { + // printf("%zu,%d\n", i + 1, lats[i]); + //} + //g_log.sync_and_close(); + return 0; +} diff --git a/iris/src/notifier.cpp b/iris/src/notifier.cpp new file mode 100644 index 0000000..5d93185 --- /dev/null +++ b/iris/src/notifier.cpp @@ -0,0 +1,79 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include + +using namespace iris; + +notifier::notifier() { + if (::pipe(m_pipe)) { + std::string error_str = "failed to call pipe(), reason:"; + error_str += strerror(errno); + throw std::system_error(std::error_code(errno, std::system_category()), error_str); + } + if (fcntl(m_pipe[1], F_SETFL, O_NONBLOCK)) { + std::string error_str = "failed to set piep's write side to nonblock mode, reason:"; + error_str += strerror(errno); + throw std::system_error(std::error_code(errno, std::system_category()), error_str); + } +} + +notifier::~notifier() { + close(m_pipe[0]); + close(m_pipe[1]); +} + +void notifier::notify(const ntf_t & ntf) { + size_t written = 0; + while (written < sizeof(ntf_t)) { + int n = write(m_pipe[1], (char *)&ntf, sizeof(ntf_t) - written); + if (n <= 0) { + if (errno == EINTR) + continue; + break; + } + written += n; + } +} + +bool notifier::wait(long timeout, std::vector & ntfs) { + struct pollfd pfd; + pfd.fd = m_pipe[0]; + pfd.events = POLLIN; + + if(poll(&pfd, 1, timeout) == 1) { + int flags = fcntl(m_pipe[0], F_GETFL, 0); + flags |= O_NONBLOCK; + fcntl(m_pipe[0], F_SETFL, flags); + while (true) { + ntf_t ntf; + size_t readn = 0; + while (readn < sizeof(ntf_t)) { + int n = read(m_pipe[0], &ntf, sizeof(ntf_t) - readn); + if (n <= 0) { + if (errno == EINTR) + continue; + break; + } + readn += n; + } + if (readn >= sizeof(ntf_t)) { + ntfs.push_back(ntf); + } else { + break; + } + } + flags &= ~O_NONBLOCK; + fcntl(m_pipe[0], F_SETFL, flags); + + return true; + } + + return false; +} \ No newline at end of file diff --git a/iris/src/stream_writer.cpp b/iris/src/stream_writer.cpp new file mode 100644 index 0000000..1be37d2 --- /dev/null +++ b/iris/src/stream_writer.cpp @@ -0,0 +1,34 @@ +#include +#include +#include +#include + +#include +#include +using namespace iris; + +stream_writer::stream_writer(FILE * stream): m_stream(stream), m_fd(-1) { + if (stream) + m_fd = fileno(m_stream); +} + +void stream_writer::write(const char * msg) { + size_t len = strlen(msg); + this->write(msg, len); +} + +void stream_writer::write(const char * buffer, size_t len){ + size_t written = 0; + while (written < len) { + size_t n = fwrite(buffer + written, 1, len - written, m_stream); + written += n; + if (n < len) { + if (errno == EINTR) { + continue; + } + fprintf(stderr, "[iris] error, should write %lu byts, only %lu bytes written, reason: %s.\n", len, n, strerror(errno)); + break; + } + } + fflush(m_stream); +} diff --git a/iris/src/utils.cpp b/iris/src/utils.cpp new file mode 100644 index 0000000..b152761 --- /dev/null +++ b/iris/src/utils.cpp @@ -0,0 +1,22 @@ +#include + +long long iris::get_current_time_in_us() { + struct timeval v; + gettimeofday(&v, nullptr); + return (long long)v.tv_sec * 1000000 + v.tv_usec; +} + + +int iris::round_up_to_next_multiple_of_2(int n) { + int bits = 0, ones = 0, oldn = n; + while(n) { + if (n & 1) + ++ones; + ++bits; + n >>= 1; + } + + if (ones == 1) // already a multiple of 2 + return oldn; + return 1 << bits; +} \ No newline at end of file diff --git a/iris/tests/test_lfringbuffer.cpp b/iris/tests/test_lfringbuffer.cpp new file mode 100644 index 0000000..16e7f37 --- /dev/null +++ b/iris/tests/test_lfringbuffer.cpp @@ -0,0 +1,116 @@ +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +iris::file_writer writer("./log.txt"); +// this creates a logging thread +iris::level_logger g_log(&writer, iris::TRACE); + + +using namespace iris; +#define ITERATIONS (int)1e7 +lfringbuffer rbuf(1024); +struct buffer_t { + char * b; + int size; + int alloc_size; + int data; +}; +sslfqueue q; + +void recyle() { + int i = 1; + while (i <= ITERATIONS) { + buffer_t b; + while (!q.poll(b)) + std::this_thread::yield(); + + assert(std::stoi(std::string(b.b, b.b + b.size)) == b.data); + + rbuf.release(b.alloc_size); + ++i; + } +} + +int main(int argc, char const *argv[]) { + + + //configure thread level parameters, these should be done before any logging + // queue size + g_log.set_thread_queue_size(1024); + // ring buffer size + g_log.set_thread_ringbuf_size(20480); + + g_log.info("Greetings from %s, bye %d\n", "iris", 0); + + //this tells logging thread to persist the data into file and waits + g_log.sync_and_close(); + + char *p1; + char *p2; + char *p3; + assert(512 == rbuf.acquire(512, p1)); + assert(p1); + + assert(256 == rbuf.acquire(256, p2)); + assert(p2); + + assert(p2 - p1 == 512); + + assert(0 == rbuf.acquire(512, p1)); + + assert(rbuf.freespace() == 256); + + assert(0 == rbuf.acquire(512, p3)); + + rbuf.release(512); + + assert(768 == rbuf.acquire(512, p3)); + + printf("rbuf.freespace(): %lu\n", rbuf.freespace()); + assert(rbuf.freespace() == 0); + + rbuf.release(256); + printf("rbuf.freespace(): %lu\n", rbuf.freespace()); + assert(256 == rbuf.freespace()); + rbuf.release(768); + printf("rbuf.freespace(): %lu\n", rbuf.freespace()); + assert(1024 == rbuf.freespace()); + + std::thread recyler(recyle); + + int i = 1; + while (i <= ITERATIONS) { + std::string s(std::to_string(i)); + + buffer_t b; + char *ptr; + int size; + while (!(size = rbuf.acquire(s.size(), ptr))) + std::this_thread::yield(); + + b.b = ptr; + memcpy(b.b, s.c_str(), s.size()); + b.size = s.size(); + b.alloc_size = size; + b.data = i; + + while (!q.offer(b)) + std::this_thread::yield(); + ++i; + } + + + + recyler.join(); + printf("passed\n"); + return 0; +} -- 2.34.1