From: Adam Simpkins Date: Thu, 15 Jun 2017 18:03:57 +0000 (-0700) Subject: logging: add AsyncFileWriter X-Git-Tag: v2017.06.19.00~16 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=82b71ca37d76487b5ee0b0e517576f6111d1d3ea logging: add AsyncFileWriter Summary: Add an AsyncFileWriter class that implements the LogWriter interface using a separate I/O thread to write the log messages to a file descriptor. This LogWriter implementation ensures that normal process threads will never block due to logging I/O. By default it will buffer up to 1MB of data. If log messages are generated faster than they can be written to the file, log messages will be discarded once the buffer limit is exceeded. The LogWriter will emit a message into the file recording how many messages were discarded where the dropped messages should have been. The downside of this class is that unwritten log messages still in the buffer will be lost when the program crashes. Reviewed By: wez Differential Revision: D5083107 fbshipit-source-id: c67226f4d0726675d480b03eae83a29c5c3431b2 --- diff --git a/CMakeLists.txt b/CMakeLists.txt index 97f8e002..1db5d952 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -320,6 +320,7 @@ if (BUILD_TESTS) XlogHeader1.h XlogHeader2.h SOURCES + AsyncFileWriterTest.cpp ImmediateFileWriterTest.cpp LogCategoryTest.cpp LoggerDBTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 5aefef35..aa1a473c 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -121,6 +121,7 @@ nobase_follyinclude_HEADERS = \ experimental/JemallocNodumpAllocator.h \ experimental/JSONSchema.h \ experimental/LockFreeRingBuffer.h \ + experimental/logging/AsyncFileWriter.h \ experimental/logging/ImmediateFileWriter.h \ experimental/logging/LogCategory.h \ experimental/logging/LogFormatter.h \ diff --git a/folly/experimental/logging/AsyncFileWriter.cpp b/folly/experimental/logging/AsyncFileWriter.cpp new file mode 100644 index 00000000..3aab81ad --- /dev/null +++ b/folly/experimental/logging/AsyncFileWriter.cpp @@ -0,0 +1,172 @@ +/* + * Copyright 2004-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include +#include + +using folly::File; +using folly::StringPiece; + +namespace folly { + +AsyncFileWriter::AsyncFileWriter(StringPiece path) + : AsyncFileWriter{File{path.str(), O_WRONLY | O_APPEND | O_CREAT}} {} + +AsyncFileWriter::AsyncFileWriter(folly::File&& file) + : file_{std::move(file)}, ioThread_([this] { ioThread(); }) {} + +AsyncFileWriter::~AsyncFileWriter() { + data_->stop = true; + messageReady_.notify_one(); + ioThread_.join(); +} + +void AsyncFileWriter::writeMessage(StringPiece buffer) { + return writeMessage(buffer.str()); +} + +void AsyncFileWriter::writeMessage(std::string&& buffer) { + auto data = data_.lock(); + if (data->currentBufferSize >= data->maxBufferBytes) { + ++data->numDiscarded; + return; + } + + data->currentBufferSize += buffer.size(); + auto* queue = data->getCurrentQueue(); + queue->emplace_back(std::move(buffer)); + messageReady_.notify_one(); +} + +void AsyncFileWriter::flush() { + auto data = data_.lock(); + auto start = data->ioThreadCounter; + + // Wait until ioThreadCounter increments by at least two. + // Waiting for a single increment is not sufficient, as this happens after + // the I/O thread has swapped the queues, which is before it has actually + // done the I/O. + while (data->ioThreadCounter < start + 2) { + if (data->ioThreadDone) { + return; + } + + // Enqueue an empty string and wake the I/O thread. + // The empty string ensures that the I/O thread will break out of its wait + // loop and increment the ioThreadCounter, even if there is no other work + // to do. + data->getCurrentQueue()->emplace_back(); + messageReady_.notify_one(); + + // Wait for notification from the I/O thread that it has done work. + ioCV_.wait(data.getUniqueLock()); + } +} + +void AsyncFileWriter::ioThread() { + while (true) { + // With the lock held, grab a pointer to the current queue, then increment + // the ioThreadCounter index so that other threads will write into the + // other queue as we process this one. + std::vector* ioQueue; + size_t numDiscarded; + bool stop; + { + auto data = data_.lock(); + ioQueue = data->getCurrentQueue(); + while (ioQueue->empty() && !data->stop) { + messageReady_.wait(data.getUniqueLock()); + } + + ++data->ioThreadCounter; + numDiscarded = data->numDiscarded; + data->numDiscarded = 0; + data->currentBufferSize = 0; + stop = data->stop; + } + ioCV_.notify_all(); + + // Write the log messages now that we have released the lock + try { + performIO(ioQueue); + } catch (const std::exception& ex) { + onIoError(ex); + } + + // clear() empties the vector, but the allocated capacity remains so we can + // just reuse it without having to re-allocate in most cases. + ioQueue->clear(); + + if (numDiscarded > 0) { + auto msg = getNumDiscardedMsg(numDiscarded); + if (!msg.empty()) { + auto ret = folly::writeFull(file_.fd(), msg.data(), msg.size()); + // We currently ignore errors from writeFull() here. + // There's not much we can really do. + (void)ret; + } + } + + if (stop) { + data_->ioThreadDone = true; + break; + } + } +} + +void AsyncFileWriter::performIO(std::vector* ioQueue) { + // kNumIovecs controls the maximum number of strings we write at once in a + // single writev() call. + constexpr int kNumIovecs = 64; + std::array iovecs; + + size_t idx = 0; + while (idx < ioQueue->size()) { + int numIovecs = 0; + while (numIovecs < kNumIovecs && idx < ioQueue->size()) { + const auto& str = (*ioQueue)[idx]; + iovecs[numIovecs].iov_base = const_cast(str.data()); + iovecs[numIovecs].iov_len = str.size(); + ++numIovecs; + ++idx; + } + + auto ret = folly::writevFull(file_.fd(), iovecs.data(), numIovecs); + folly::checkUnixError(ret, "writeFull() failed"); + } +} + +void AsyncFileWriter::onIoError(const std::exception& ex) { + LoggerDB::internalWarning( + __FILE__, + __LINE__, + "error writing to log file ", + file_.fd(), + " in AsyncFileWriter: ", + folly::exceptionStr(ex)); +} + +std::string AsyncFileWriter::getNumDiscardedMsg(size_t numDiscarded) { + // We may want to make this customizable in the future (e.g., to allow it to + // conform to the LogFormatter style being used). + // For now just return a simple fixed message. + return folly::to( + numDiscarded, + " log messages discarded: logging faster than we can write\n"); +} +} diff --git a/folly/experimental/logging/AsyncFileWriter.h b/folly/experimental/logging/AsyncFileWriter.h new file mode 100644 index 00000000..0d05ccd3 --- /dev/null +++ b/folly/experimental/logging/AsyncFileWriter.h @@ -0,0 +1,118 @@ +/* + * Copyright 2004-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +namespace folly { + +/** + * A LogWriter implementation that asynchronously writes to a file descriptor. + * + * This class performs the log I/O in a separarate thread. + * + * The advantage of this class over ImmediateFileWriter is that logging I/O can + * never slow down or block your normal program operation. If log messages are + * generated faster than they can be written, messages will be dropped (and an + * indication of how many messages were dropped will be written to the log file + * when we are able to catch up a bit.) + * + * However, one downside is that if your program crashes, not all log messages + * may have been written, so you may lose messages generated immediately before + * the crash. + */ +class AsyncFileWriter : public LogWriter { + public: + /** + * Construct an AsyncFileWriter that appends to the file at the specified + * path. + */ + explicit AsyncFileWriter(folly::StringPiece path); + + /** + * Construct an AsyncFileWriter that writes to the specified File object. + */ + explicit AsyncFileWriter(folly::File&& file); + + ~AsyncFileWriter(); + + void writeMessage(folly::StringPiece buffer) override; + void writeMessage(std::string&& buffer) override; + + /** + * Block until the I/O thread has finished writing all messages that + * were already enqueued when flush() was called. + */ + void flush(); + + private: + /* + * A simple implementation using two queues. + * All writer threads enqueue into one queue while the I/O thread is + * processing the other. + * + * We could potentially also provide an implementation using folly::MPMCQueue + * in the future, which may improve contention under very high write loads. + */ + struct Data { + std::array, 2> queues; + bool stop{false}; + bool ioThreadDone{false}; + uint64_t ioThreadCounter{0}; + size_t maxBufferBytes{1024 * 1024}; + size_t currentBufferSize{0}; + size_t numDiscarded{0}; + + std::vector* getCurrentQueue() { + return &queues[ioThreadCounter & 0x1]; + } + }; + + void ioThread(); + void performIO(std::vector* ioQueue); + + void onIoError(const std::exception& ex); + std::string getNumDiscardedMsg(size_t numDiscarded); + + folly::File file_; + folly::Synchronized data_; + /** + * messageReady_ is signaled by writer threads whenever they add a new + * message to the current queue. + */ + std::condition_variable messageReady_; + /** + * ioCV_ is signaled by the I/O thread each time it increments + * the ioThreadCounter (once each time around its loop). + */ + std::condition_variable ioCV_; + + /** + * The I/O thread. + * + * This should come last, since all other member variables need to be + * constructed before the I/O thread starts. + */ + std::thread ioThread_; +}; +} diff --git a/folly/experimental/logging/Makefile.am b/folly/experimental/logging/Makefile.am index c3750a4d..69adaf92 100644 --- a/folly/experimental/logging/Makefile.am +++ b/folly/experimental/logging/Makefile.am @@ -3,6 +3,7 @@ SUBDIRS = . lib_LTLIBRARIES = libfollylogging.la libfollylogging_la_SOURCES = \ + AsyncFileWriter.cpp \ ImmediateFileWriter.cpp \ LogCategory.cpp \ Logger.cpp \ diff --git a/folly/experimental/logging/test/AsyncFileWriterTest.cpp b/folly/experimental/logging/test/AsyncFileWriterTest.cpp new file mode 100644 index 00000000..d1541dc5 --- /dev/null +++ b/folly/experimental/logging/test/AsyncFileWriterTest.cpp @@ -0,0 +1,329 @@ +/* + * Copyright 2004-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +DEFINE_int64( + async_discard_num_writer_threads, + 32, + "number of threads to use to generate log messages during " + "the AsyncFileWriter.discard test"); +DEFINE_int64( + async_discard_messages_per_writer, + 200000, + "number of messages each writer threads should generate in " + "the AsyncFileWriter.discard test"); +DEFINE_int64( + async_discard_read_sleep_usec, + 500, + "how long the read thread should sleep between reads in " + "the AsyncFileWriter.discard test"); + +using namespace folly; +using folly::test::TemporaryFile; + +TEST(AsyncFileWriter, noMessages) { + TemporaryFile tmpFile{"logging_test"}; + + // Test the simple construction and destruction of an AsyncFileWriter + // without ever writing any messages. This still exercises the I/O + // thread start-up and shutdown code. + AsyncFileWriter writer{folly::File{tmpFile.fd(), false}}; +} + +TEST(AsyncFileWriter, simpleMessages) { + TemporaryFile tmpFile{"logging_test"}; + + { + AsyncFileWriter writer{folly::File{tmpFile.fd(), false}}; + for (int n = 0; n < 10; ++n) { + writer.writeMessage(folly::to("message ", n, "\n")); + sched_yield(); + } + } + + std::string data; + auto ret = folly::readFile(tmpFile.path().native().c_str(), data); + ASSERT_TRUE(ret); + + std::string expected = + "message 0\n" + "message 1\n" + "message 2\n" + "message 3\n" + "message 4\n" + "message 5\n" + "message 6\n" + "message 7\n" + "message 8\n" + "message 9\n"; + EXPECT_EQ(expected, data); +} + +#ifndef _WIN32 +namespace { +static std::vector* internalWarnings; + +void handleLoggingError( + StringPiece /* file */, + int /* lineNumber */, + std::string&& msg) { + internalWarnings->emplace_back(std::move(msg)); +} +} + +TEST(AsyncFileWriter, ioError) { + // Set the LoggerDB internal warning handler so we can record the messages + std::vector logErrors; + internalWarnings = &logErrors; + LoggerDB::setInternalWarningHandler(handleLoggingError); + + // Create an AsyncFileWriter that refers to a pipe whose read end is closed + std::array fds; + auto rc = pipe(fds.data()); + folly::checkUnixError(rc, "failed to create pipe"); + signal(SIGPIPE, SIG_IGN); + ::close(fds[0]); + + // Log a bunch of messages to the writer + size_t numMessages = 100; + { + AsyncFileWriter writer{folly::File{fds[1], true}}; + for (size_t n = 0; n < numMessages; ++n) { + writer.writeMessage(folly::to("message ", n, "\n")); + sched_yield(); + } + } + + LoggerDB::setInternalWarningHandler(nullptr); + + // AsyncFileWriter should have some internal warning messages about the + // log failures. This will generally be many fewer than the number of + // messages we wrote, though, since it performs write batching. + for (const auto& msg : logErrors) { + EXPECT_THAT( + msg, + testing::ContainsRegex( + "error writing to log file .* in AsyncFileWriter.*: Broken pipe")); + } + EXPECT_GT(logErrors.size(), 0); + EXPECT_LE(logErrors.size(), numMessages); +} +#endif + +/** + * writeThread() writes a series of messages to the AsyncFileWriter + */ +void writeThread(AsyncFileWriter* writer, size_t id, size_t numMessages) { + for (size_t n = 0; n < numMessages; ++n) { + writer->writeMessage( + folly::to("thread ", id, " message ", n + 1, '\n')); + } +} + +class ReadStats { + public: + void check(size_t numThreads, size_t messagesPerThread) { + EXPECT_EQ("", trailingData_); + EXPECT_EQ(numThreads, writers_.size()); + size_t totalMessagesReceived = 0; + for (const auto& writerData : writers_) { + EXPECT_LE(writerData.numMessages, messagesPerThread); + EXPECT_LE(writerData.lastId, messagesPerThread); + totalMessagesReceived += writerData.numMessages; + } + + EXPECT_EQ(0, numUnableToParse_); + EXPECT_EQ(0, numOutOfOrder_); + EXPECT_EQ( + numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_); + } + + void messageReceived(StringPiece msg) { + if (msg.endsWith(" log messages discarded: " + "logging faster than we can write")) { + auto discardCount = folly::to(msg.subpiece(0, msg.find(' '))); + fprintf(stderr, "received discard notification: %zu\n", discardCount); + numDiscarded_ += discardCount; + return; + } + + size_t threadID = 0; + size_t messageIndex = 0; + try { + parseMessage(msg, &threadID, &messageIndex); + } catch (const std::exception& ex) { + ++numUnableToParse_; + fprintf( + stderr, + "unable to parse log message: %s\n", + folly::humanify(msg.str()).c_str()); + return; + } + + if (threadID >= writers_.size()) { + writers_.resize(threadID + 1); + } + writers_[threadID].numMessages++; + if (messageIndex > writers_[threadID].lastId) { + writers_[threadID].lastId = messageIndex; + } else { + ++numOutOfOrder_; + fprintf( + stderr, + "received out-of-order messages from writer %zu: " + "%zu received after %zu\n", + threadID, + messageIndex, + writers_[threadID].lastId); + } + } + + void trailingData(StringPiece data) { + trailingData_ = data.str(); + } + + private: + struct WriterStats { + size_t numMessages{0}; + size_t lastId{0}; + }; + + void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) { + constexpr StringPiece prefix{"thread "}; + constexpr StringPiece middle{" message "}; + if (!msg.startsWith(prefix)) { + throw std::runtime_error("bad message prefix"); + } + + auto idx = prefix.size(); + auto end = msg.find(' ', idx); + if (end == StringPiece::npos) { + throw std::runtime_error("no middle found"); + } + + *threadID = folly::to(msg.subpiece(idx, end - idx)); + auto rest = msg.subpiece(end); + if (!rest.startsWith(middle)) { + throw std::runtime_error("bad message middle"); + } + + rest.advance(middle.size()); + *messageIndex = folly::to(rest); + } + + std::vector writers_; + std::string trailingData_; + size_t numUnableToParse_{0}; + size_t numOutOfOrder_{0}; + size_t numDiscarded_{0}; +}; + +/** + * readThread() reads messages slowly from a pipe. This helps test the + * AsyncFileWriter behavior when I/O is slow. + */ +void readThread(folly::File&& file, ReadStats* stats) { + std::vector buffer; + buffer.resize(1024); + + size_t bufferIdx = 0; + while (true) { + /* sleep override */ + usleep(FLAGS_async_discard_read_sleep_usec); + + auto readResult = folly::readNoInt( + file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx); + if (readResult < 0) { + fprintf(stderr, "error reading from pipe: %d\n", errno); + return; + } + if (readResult == 0) { + fprintf(stderr, "read EOF\n"); + break; + } + + auto logDataLen = bufferIdx + readResult; + StringPiece logData{buffer.data(), logDataLen}; + auto idx = 0; + while (true) { + auto end = logData.find('\n', idx); + if (end == StringPiece::npos) { + bufferIdx = logDataLen - idx; + memmove(buffer.data(), buffer.data() + idx, bufferIdx); + break; + } + + StringPiece logMsg{logData.data() + idx, end - idx}; + stats->messageReceived(logMsg); + idx = end + 1; + } + } + + if (bufferIdx != 0) { + stats->trailingData(StringPiece{buffer.data(), bufferIdx}); + } +} + +/* + * The discard test spawns a number of threads that each write a large number + * of messages quickly. The AsyncFileWriter writes to a pipe, an a separate + * thread reads from it slowly, causing a backlog to build up. + * + * The test then checks that: + * - The read thread always receives full messages (no partial log messages) + * - Messages that are received are received in order + * - The number of messages received plus the number reported in discard + * notifications matches the number of messages sent. + */ +TEST(AsyncFileWriter, discard) { + std::array fds; + auto pipeResult = pipe(fds.data()); + folly::checkUnixError(pipeResult, "pipe failed"); + folly::File readPipe{fds[0], true}; + folly::File writePipe{fds[1], true}; + + ReadStats readStats; + std::thread reader(readThread, std::move(readPipe), &readStats); + { + AsyncFileWriter writer{std::move(writePipe)}; + + std::vector writeThreads; + for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) { + writeThreads.emplace_back( + writeThread, &writer, n, FLAGS_async_discard_messages_per_writer); + } + + for (auto& t : writeThreads) { + t.join(); + } + fprintf(stderr, "writers done\n"); + } + reader.join(); + readStats.check( + FLAGS_async_discard_num_writer_threads, + FLAGS_async_discard_messages_per_writer); +}