X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fexperimental%2Flogging%2Ftest%2FAsyncFileWriterTest.cpp;h=3333c0657b4f45131ae2408fd8184cb20266e361;hp=1f98a4219db73298eb4eb37bbde31edd5ab5b4d0;hb=00ff5917775f9b58a04f74835585cbb32e306289;hpb=84458670bb24056e0bb65d50b1653dcf0636f776 diff --git a/folly/experimental/logging/test/AsyncFileWriterTest.cpp b/folly/experimental/logging/test/AsyncFileWriterTest.cpp index 1f98a421..3333c065 100644 --- a/folly/experimental/logging/test/AsyncFileWriterTest.cpp +++ b/folly/experimental/logging/test/AsyncFileWriterTest.cpp @@ -13,38 +13,59 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include + #include #include #include #include #include +#include #include #include +#include #include +#include +#include +#include +#include #include #include #include #include +DEFINE_string(logging, "", "folly::logging configuration"); DEFINE_int64( - async_discard_num_writer_threads, - 32, - "number of threads to use to generate log messages during " + async_discard_num_normal_writers, + 30, + "number of threads to use to generate normal log messages during " "the AsyncFileWriter.discard test"); DEFINE_int64( - async_discard_messages_per_writer, - 200000, - "number of messages each writer threads should generate in " + async_discard_num_nodiscard_writers, + 2, + "number of threads to use to generate non-discardable log messages during " "the AsyncFileWriter.discard test"); DEFINE_int64( async_discard_read_sleep_usec, 500, "how long the read thread should sleep between reads in " "the AsyncFileWriter.discard test"); +DEFINE_int64( + async_discard_timeout_msec, + 10000, + "A timeout for the AsyncFileWriter.discard test if it cannot generate " + "enough discards"); +DEFINE_int64( + async_discard_num_events, + 10, + "The number of discard events to wait for in the AsyncFileWriter.discard " + "test"); using namespace folly; using namespace std::literals::chrono_literals; using folly::test::TemporaryFile; +using std::chrono::steady_clock; +using std::chrono::milliseconds; TEST(AsyncFileWriter, noMessages) { TemporaryFile tmpFile{"logging_test"}; @@ -133,26 +154,107 @@ TEST(AsyncFileWriter, ioError) { EXPECT_GT(logErrors.size(), 0); EXPECT_LE(logErrors.size(), numMessages); } -#endif -/** - * writeThread() writes a series of messages to the AsyncFileWriter - */ -void writeThread( - AsyncFileWriter* writer, - size_t id, - size_t numMessages, - uint32_t flags) { - for (size_t n = 0; n < numMessages; ++n) { - writer->writeMessage( - folly::to("thread ", id, " message ", n + 1, '\n'), flags); +namespace { +size_t fillUpPipe(int fd) { + int flags = fcntl(fd, F_GETFL); + folly::checkUnixError(flags, "failed get file descriptor flags"); + auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + folly::checkUnixError(rc, "failed to put pipe in non-blocking mode"); + std::vector data; + data.resize(4000); + size_t totalBytes = 0; + size_t bytesToWrite = data.size(); + while (true) { + auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite); + if (bytesWritten < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // We blocked. Keep trying smaller writes, until we get down to a + // single byte, just to make sure the logging code really won't be able + // to write anything to the pipe. + if (bytesToWrite <= 1) { + break; + } else { + bytesToWrite /= 2; + } + } else { + throwSystemError("error writing to pipe"); + } + } else { + totalBytes += bytesWritten; + } } + XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes"); + + rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); + folly::checkUnixError(rc, "failed to put pipe back in blocking mode"); + + return totalBytes; +} +} + +TEST(AsyncFileWriter, flush) { + // Set up a pipe(), then write data to the write endpoint until it fills up + // and starts blocking. + std::array fds; + auto rc = pipe(fds.data()); + folly::checkUnixError(rc, "failed to create pipe"); + File readPipe{fds[0], true}; + File writePipe{fds[1], true}; + + auto paddingSize = fillUpPipe(writePipe.fd()); + + // Now set up an AsyncFileWriter pointing at the write end of the pipe + AsyncFileWriter writer{std::move(writePipe)}; + + // Write a message + writer.writeMessage("test message: " + std::string(200, 'x')); + + // Call flush(). Use a separate thread, since this should block until we + // consume data from the pipe. + Promise promise; + auto future = promise.getFuture(); + auto flushFunction = [&] { writer.flush(); }; + std::thread flushThread{ + [&]() { promise.setTry(makeTryWith(flushFunction)); }}; + // Detach the flush thread now rather than joining it at the end of the + // function. This way if something goes wrong during the test we will fail + // with the real error, rather than crashing due to the std::thread + // destructor running on a still-joinable thread. + flushThread.detach(); + + // Sleep briefly, and make sure flush() still hasn't completed. + // If it has completed this doesn't necessarily indicate a bug in + // AsyncFileWriter, but instead indicates that our test code failed to + // successfully cause a blocking write. + /* sleep override */ + std::this_thread::sleep_for(10ms); + EXPECT_FALSE(future.isReady()); + + // Now read from the pipe + std::vector buf; + buf.resize(paddingSize); + readFull(readPipe.fd(), buf.data(), buf.size()); + + // Make sure flush completes successfully now + future.get(10ms); } +#endif + +// A large-ish message suffix, just to consume space and help fill up +// log buffers faster. +static constexpr StringPiece kMsgSuffix{ + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"}; class ReadStats { public: ReadStats() - : readSleepUS_{static_cast( + : deadline_{steady_clock::now() + + milliseconds{FLAGS_async_discard_timeout_msec}}, + readSleepUS_{static_cast( std::min(0L, FLAGS_async_discard_read_sleep_usec))} {} void clearSleepDuration() { @@ -162,36 +264,85 @@ class ReadStats { return std::chrono::microseconds{readSleepUS_.load()}; } - void check(size_t numThreads, size_t messagesPerThread) { - EXPECT_EQ("", trailingData_); - EXPECT_EQ(numThreads, writers_.size()); - size_t totalMessagesReceived = 0; - for (const auto& writerData : writers_) { - EXPECT_LE(writerData.numMessages, messagesPerThread); - EXPECT_LE(writerData.lastId, messagesPerThread); - totalMessagesReceived += writerData.numMessages; + bool shouldWriterStop() const { + // Stop after we have seen the required number of separate discard events. + // We stop based on discardEventsSeen_ rather than numDiscarded_ since this + // ensures the async writer blocks and then makes progress again multiple + // times. + if (FLAGS_async_discard_num_events > 0 && + discardEventsSeen_.load() > + static_cast(FLAGS_async_discard_num_events)) { + return true; } + // Stop after a timeout, even if we don't hit the number of requested + // discards. + return steady_clock::now() > deadline_; + } + void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) { + auto map = perThreadWriteData_.wlock(); + assert(map->find(threadID) == map->end()); + auto& data = (*map)[threadID]; + data.numMessagesWritten = messagesWritten; + data.flags = flags; + } + + void check() { + auto writeDataMap = perThreadWriteData_.wlock(); + + EXPECT_EQ("", trailingData_); EXPECT_EQ(0, numUnableToParse_); EXPECT_EQ(0, numOutOfOrder_); - EXPECT_EQ( - numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_); - } - /** - * Check that no messages were dropped from the specified thread. - */ - void checkNoDrops(size_t threadIndex, size_t messagesPerThread) { - EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread); - EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread); + // Check messages received from each writer thread + size_t readerStatsChecked = 0; + size_t totalMessagesWritten = 0; + size_t totalMessagesRead = 0; + for (const auto& writeEntry : *writeDataMap) { + const auto& writeInfo = writeEntry.second; + totalMessagesWritten += writeInfo.numMessagesWritten; + + auto iter = perThreadReadData_.find(writeEntry.first); + if (iter == perThreadReadData_.end()) { + // We never received any messages from this writer thread. + // This is okay as long as this is not a NEVER_DISCARD writer. + EXPECT_EQ(0, writeInfo.flags); + continue; + } + const auto& readInfo = iter->second; + ++readerStatsChecked; + totalMessagesRead += readInfo.numMessagesRead; + if (writeInfo.flags & LogWriter::NEVER_DISCARD) { + // Non-discarding threads should never discard anything + EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten); + EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten); + } else { + // Other threads may have discarded some messages + EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten); + EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten); + } + } + + EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_); + EXPECT_EQ(readerStatsChecked, perThreadReadData_.size()); + + // This test is intended to check the discard behavior. + // Fail the test if we didn't actually trigger any discards before we timed + // out. + EXPECT_GT(numDiscarded_, 0); + + XLOG(DBG1) << totalMessagesWritten << " messages written, " + << totalMessagesRead << " messages read, " << numDiscarded_ + << " messages discarded"; } void messageReceived(StringPiece msg) { if (msg.endsWith(" log messages discarded: " "logging faster than we can write")) { auto discardCount = folly::to(msg.subpiece(0, msg.find(' '))); - fprintf(stderr, "received discard notification: %zu\n", discardCount); + XLOG(DBG3, "received discard notification: ", discardCount); numDiscarded_ += discardCount; + ++discardEventsSeen_; return; } @@ -201,28 +352,18 @@ class ReadStats { parseMessage(msg, &threadID, &messageIndex); } catch (const std::exception& ex) { ++numUnableToParse_; - fprintf( - stderr, - "unable to parse log message: %s\n", - folly::humanify(msg.str()).c_str()); + XLOG(ERR, "unable to parse log message: ", msg); return; } - if (threadID >= writers_.size()) { - writers_.resize(threadID + 1); - } - writers_[threadID].numMessages++; - if (messageIndex > writers_[threadID].lastId) { - writers_[threadID].lastId = messageIndex; + auto& data = perThreadReadData_[threadID]; + data.numMessagesRead++; + if (messageIndex > data.lastId) { + data.lastId = messageIndex; } else { ++numOutOfOrder_; - fprintf( - stderr, - "received out-of-order messages from writer %zu: " - "%zu received after %zu\n", - threadID, - messageIndex, - writers_[threadID].lastId); + XLOG(ERR) << "received out-of-order messages from writer " << threadID + << ": " << messageIndex << " received after " << data.lastId; } } @@ -231,41 +372,110 @@ class ReadStats { } private: - struct WriterStats { - size_t numMessages{0}; + struct ReaderData { + size_t numMessagesRead{0}; size_t lastId{0}; }; + struct WriterData { + size_t numMessagesWritten{0}; + int flags{0}; + }; void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) { + // Validate and strip off the message prefix and suffix constexpr StringPiece prefix{"thread "}; - constexpr StringPiece middle{" message "}; if (!msg.startsWith(prefix)) { throw std::runtime_error("bad message prefix"); } + msg.advance(prefix.size()); + if (!msg.endsWith(kMsgSuffix)) { + throw std::runtime_error("bad message suffix"); + } + msg.subtract(kMsgSuffix.size()); - auto idx = prefix.size(); - auto end = msg.find(' ', idx); - if (end == StringPiece::npos) { + // Parse then strip off the thread index + auto threadIDEnd = msg.find(' '); + if (threadIDEnd == StringPiece::npos) { throw std::runtime_error("no middle found"); } + *threadID = folly::to(msg.subpiece(0, threadIDEnd)); + msg.advance(threadIDEnd); - *threadID = folly::to(msg.subpiece(idx, end - idx)); - auto rest = msg.subpiece(end); - if (!rest.startsWith(middle)) { + // Validate that the middle of the message is what we expect, + // then strip it off + constexpr StringPiece middle{" message "}; + if (!msg.startsWith(middle)) { throw std::runtime_error("bad message middle"); } + msg.advance(middle.size()); - rest.advance(middle.size()); - *messageIndex = folly::to(rest); + // Parse the message index + *messageIndex = folly::to(msg); } - std::vector writers_; + /** + * Data about each writer thread, as recorded by the reader thread. + * + * At the end of the test we will compare perThreadReadData_ (recorded by the + * reader) with perThreadWriteData_ (recorded by the writers) to make sure + * the data matches up. + * + * This is a map from writer_thread_id to ReaderData. + * The writer_thread_id is extracted from the received messages. + * + * This field does not need locking as it is only updated by the single + * reader thread. + */ + std::unordered_map perThreadReadData_; + + /* + * Additional information recorded by the reader thread. + */ std::string trailingData_; size_t numUnableToParse_{0}; size_t numOutOfOrder_{0}; size_t numDiscarded_{0}; - std::atomic readSleepUS_; + /** + * deadline_ is a maximum end time for the test. + * + * The writer threads quit if the deadline is reached even if they have not + * produced the desired number of discard events yet. + */ + const std::chrono::steady_clock::time_point deadline_; + + /** + * How long the reader thread should sleep between each read event. + * + * This is initially set to a non-zero value (read from the + * FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads + * slowly, which will fill up the pipe buffer and cause discard events. + * + * Once we have produce enough discards and are ready to finish the test the + * main thread reduces readSleepUS_ to 0, so the reader will finish the + * remaining message backlog quickly. + */ + std::atomic readSleepUS_{0}; + + /** + * A count of how many discard events have been seen so far. + * + * The reader increments discardEventsSeen_ each time it sees a discard + * notification message. A "discard event" basically corresponds to a single + * group of dropped messages. Once the reader pulls some messages off out of + * the pipe the writers should be able to send more data, but the buffer will + * eventually fill up again, producing another discard event. + */ + std::atomic discardEventsSeen_{0}; + + /** + * Data about each writer thread, as recorded by the writers. + * + * When each writer thread finishes it records how many messages it wrote, + * plus the flags it used to write the messages. + */ + folly::Synchronized> + perThreadWriteData_; }; /** @@ -279,16 +489,16 @@ void readThread(folly::File&& file, ReadStats* stats) { size_t bufferIdx = 0; while (true) { /* sleep override */ - usleep(stats->getSleepUS().count()); + std::this_thread::sleep_for(stats->getSleepUS()); auto readResult = folly::readNoInt( file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx); if (readResult < 0) { - fprintf(stderr, "error reading from pipe: %d\n", errno); + XLOG(ERR, "error reading from pipe: ", errno); return; } if (readResult == 0) { - fprintf(stderr, "read EOF\n"); + XLOG(DBG2, "read EOF"); break; } @@ -314,6 +524,30 @@ void readThread(folly::File&& file, ReadStats* stats) { } } +/** + * writeThread() writes a series of messages to the AsyncFileWriter + */ +void writeThread( + AsyncFileWriter* writer, + size_t id, + uint32_t flags, + ReadStats* readStats) { + size_t msgID = 0; + while (true) { + ++msgID; + writer->writeMessage( + folly::to( + "thread ", id, " message ", msgID, kMsgSuffix, '\n'), + flags); + + // Break out once the reader has seen enough discards + if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) { + readStats->writerFinished(id, msgID, flags); + break; + } + } +} + /* * The discard test spawns a number of threads that each write a large number * of messages quickly. The AsyncFileWriter writes to a pipe, an a separate @@ -332,46 +566,41 @@ TEST(AsyncFileWriter, discard) { folly::File readPipe{fds[0], true}; folly::File writePipe{fds[1], true}; - // This test should always be run with at least 2 writer threads. - // The last one will use the NEVER_DISCARD flag, and we want at least - // one that does discard messages. - ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2); - ReadStats readStats; std::thread reader(readThread, std::move(readPipe), &readStats); { AsyncFileWriter writer{std::move(writePipe)}; std::vector writeThreads; - for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) { + size_t numThreads = FLAGS_async_discard_num_normal_writers + + FLAGS_async_discard_num_nodiscard_writers; + + for (size_t n = 0; n < numThreads; ++n) { uint32_t flags = 0; - // Configure the last writer thread to never drop messages - if (n == (FLAGS_async_discard_num_writer_threads - 1)) { + if (n >= static_cast(FLAGS_async_discard_num_normal_writers)) { flags = LogWriter::NEVER_DISCARD; } + XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags); - writeThreads.emplace_back( - writeThread, - &writer, - n, - FLAGS_async_discard_messages_per_writer, - flags); + writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats); } for (auto& t : writeThreads) { t.join(); } - fprintf(stderr, "writers done\n"); + XLOG(DBG2, "writers done"); } // Clear the read sleep duration so the reader will finish quickly now readStats.clearSleepDuration(); reader.join(); - readStats.check( - FLAGS_async_discard_num_writer_threads, - FLAGS_async_discard_messages_per_writer); - // Check that no messages were dropped from the thread using the - // NEVER_DISCARD flag. - readStats.checkNoDrops( - FLAGS_async_discard_num_writer_threads - 1, - FLAGS_async_discard_messages_per_writer); + readStats.check(); +} + +int main(int argc, char* argv[]) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv); + // Don't use async logging in the async logging tests :-) + folly::initLoggingGlogStyle(FLAGS_logging, LogLevel::INFO, /* async */ false); + + return RUN_ALL_TESTS(); }