logging: improve the AsyncFileWriterTest discard test
[folly.git] / folly / experimental / logging / test / AsyncFileWriterTest.cpp
index 9e4694df375e9fd43016a02be01631d716e8e463..7b03ef5ee229feddddec48791c2844d626ba4d6e 100644 (file)
 #include <folly/File.h>
 #include <folly/FileUtil.h>
 #include <folly/String.h>
+#include <folly/Synchronized.h>
 #include <folly/experimental/TestUtil.h>
 #include <folly/experimental/logging/AsyncFileWriter.h>
+#include <folly/experimental/logging/Init.h>
 #include <folly/experimental/logging/LoggerDB.h>
+#include <folly/experimental/logging/xlog.h>
 #include <folly/futures/Future.h>
 #include <folly/futures/Promise.h>
+#include <folly/init/Init.h>
 #include <folly/portability/GFlags.h>
 #include <folly/portability/GMock.h>
 #include <folly/portability/GTest.h>
 #include <folly/portability/Unistd.h>
 
+DEFINE_string(logging, "", "folly::logging configuration");
 DEFINE_int64(
-    async_discard_num_writer_threads,
-    32,
-    "number of threads to use to generate log messages during "
+    async_discard_num_normal_writers,
+    30,
+    "number of threads to use to generate normal log messages during "
     "the AsyncFileWriter.discard test");
 DEFINE_int64(
-    async_discard_messages_per_writer,
-    200000,
-    "number of messages each writer threads should generate in "
+    async_discard_num_nodiscard_writers,
+    2,
+    "number of threads to use to generate non-discardable log messages during "
     "the AsyncFileWriter.discard test");
 DEFINE_int64(
     async_discard_read_sleep_usec,
     500,
     "how long the read thread should sleep between reads in "
     "the AsyncFileWriter.discard test");
+DEFINE_int64(
+    async_discard_timeout_msec,
+    10000,
+    "A timeout for the AsyncFileWriter.discard test if it cannot generate "
+    "enough discards");
+DEFINE_int64(
+    async_discard_num_events,
+    10,
+    "The number of discard events to wait for in the AsyncFileWriter.discard "
+    "test");
 
 using namespace folly;
 using namespace std::literals::chrono_literals;
 using folly::test::TemporaryFile;
+using std::chrono::steady_clock;
+using std::chrono::milliseconds;
 
 TEST(AsyncFileWriter, noMessages) {
   TemporaryFile tmpFile{"logging_test"};
@@ -167,7 +184,7 @@ size_t fillUpPipe(int fd) {
       totalBytes += bytesWritten;
     }
   }
-  fprintf(stderr, "pipe filled up after %zu bytes\n", totalBytes);
+  XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes");
 
   rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
   folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
@@ -217,24 +234,20 @@ TEST(AsyncFileWriter, flush) {
 }
 #endif
 
-/**
- * writeThread() writes a series of messages to the AsyncFileWriter
- */
-void writeThread(
-    AsyncFileWriter* writer,
-    size_t id,
-    size_t numMessages,
-    uint32_t flags) {
-  for (size_t n = 0; n < numMessages; ++n) {
-    writer->writeMessage(
-        folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
-  }
-}
+// A large-ish message suffix, just to consume space and help fill up
+// log buffers faster.
+static constexpr StringPiece kMsgSuffix{
+    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"};
 
 class ReadStats {
  public:
   ReadStats()
-      : readSleepUS_{static_cast<uint64_t>(
+      : deadline_{steady_clock::now() +
+                  milliseconds{FLAGS_async_discard_timeout_msec}},
+        readSleepUS_{static_cast<uint64_t>(
             std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
 
   void clearSleepDuration() {
@@ -244,36 +257,85 @@ class ReadStats {
     return std::chrono::microseconds{readSleepUS_.load()};
   }
 
-  void check(size_t numThreads, size_t messagesPerThread) {
-    EXPECT_EQ("", trailingData_);
-    EXPECT_EQ(numThreads, writers_.size());
-    size_t totalMessagesReceived = 0;
-    for (const auto& writerData : writers_) {
-      EXPECT_LE(writerData.numMessages, messagesPerThread);
-      EXPECT_LE(writerData.lastId, messagesPerThread);
-      totalMessagesReceived += writerData.numMessages;
+  bool shouldWriterStop() const {
+    // Stop after we have seen the required number of separate discard events.
+    // We stop based on discardEventsSeen_ rather than numDiscarded_ since this
+    // ensures the async writer blocks and then makes progress again multiple
+    // times.
+    if (FLAGS_async_discard_num_events > 0 &&
+        discardEventsSeen_.load() >
+            static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
+      return true;
     }
 
+    // Stop after a timeout, even if we don't hit the number of requested
+    // discards.
+    return steady_clock::now() > deadline_;
+  }
+  void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) {
+    auto map = perThreadWriteData_.wlock();
+    assert(map->find(threadID) == map->end());
+    auto& data = (*map)[threadID];
+    data.numMessagesWritten = messagesWritten;
+    data.flags = flags;
+  }
+
+  void check() {
+    auto writeDataMap = perThreadWriteData_.wlock();
+
+    EXPECT_EQ("", trailingData_);
     EXPECT_EQ(0, numUnableToParse_);
     EXPECT_EQ(0, numOutOfOrder_);
-    EXPECT_EQ(
-        numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
-  }
 
-  /**
-   * Check that no messages were dropped from the specified thread.
-   */
-  void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
-    EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
-    EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
+    // Check messages received from each writer thread
+    size_t readerStatsChecked = 0;
+    size_t totalMessagesWritten = 0;
+    size_t totalMessagesRead = 0;
+    for (const auto& writeEntry : *writeDataMap) {
+      const auto& writeInfo = writeEntry.second;
+      totalMessagesWritten += writeInfo.numMessagesWritten;
+
+      auto iter = perThreadReadData_.find(writeEntry.first);
+      if (iter == perThreadReadData_.end()) {
+        // We never received any messages from this writer thread.
+        // This is okay as long as this is not a NEVER_DISCARD writer.
+        EXPECT_EQ(0, writeInfo.flags);
+        continue;
+      }
+      const auto& readInfo = iter->second;
+      ++readerStatsChecked;
+      totalMessagesRead += readInfo.numMessagesRead;
+      if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
+        // Non-discarding threads should never discard anything
+        EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
+        EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
+      } else {
+        // Other threads may have discarded some messages
+        EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
+        EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
+      }
+    }
+
+    EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
+    EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
+
+    // This test is intended to check the discard behavior.
+    // Fail the test if we didn't actually trigger any discards before we timed
+    // out.
+    EXPECT_GT(numDiscarded_, 0);
+
+    XLOG(DBG1) << totalMessagesWritten << " messages written, "
+               << totalMessagesRead << " messages read, " << numDiscarded_
+               << " messages discarded";
   }
 
   void messageReceived(StringPiece msg) {
     if (msg.endsWith(" log messages discarded: "
                      "logging faster than we can write")) {
       auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
-      fprintf(stderr, "received discard notification: %zu\n", discardCount);
+      XLOG(DBG3, "received discard notification: ", discardCount);
       numDiscarded_ += discardCount;
+      ++discardEventsSeen_;
       return;
     }
 
@@ -283,28 +345,18 @@ class ReadStats {
       parseMessage(msg, &threadID, &messageIndex);
     } catch (const std::exception& ex) {
       ++numUnableToParse_;
-      fprintf(
-          stderr,
-          "unable to parse log message: %s\n",
-          folly::humanify(msg.str()).c_str());
+      XLOG(ERR, "unable to parse log message: ", msg);
       return;
     }
 
-    if (threadID >= writers_.size()) {
-      writers_.resize(threadID + 1);
-    }
-    writers_[threadID].numMessages++;
-    if (messageIndex > writers_[threadID].lastId) {
-      writers_[threadID].lastId = messageIndex;
+    auto& data = perThreadReadData_[threadID];
+    data.numMessagesRead++;
+    if (messageIndex > data.lastId) {
+      data.lastId = messageIndex;
     } else {
       ++numOutOfOrder_;
-      fprintf(
-          stderr,
-          "received out-of-order messages from writer %zu: "
-          "%zu received after %zu\n",
-          threadID,
-          messageIndex,
-          writers_[threadID].lastId);
+      XLOG(ERR) << "received out-of-order messages from writer " << threadID
+                << ": " << messageIndex << " received after " << data.lastId;
     }
   }
 
@@ -313,41 +365,110 @@ class ReadStats {
   }
 
  private:
-  struct WriterStats {
-    size_t numMessages{0};
+  struct ReaderData {
+    size_t numMessagesRead{0};
     size_t lastId{0};
   };
+  struct WriterData {
+    size_t numMessagesWritten{0};
+    int flags{0};
+  };
 
   void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
+    // Validate and strip off the message prefix and suffix
     constexpr StringPiece prefix{"thread "};
-    constexpr StringPiece middle{" message "};
     if (!msg.startsWith(prefix)) {
       throw std::runtime_error("bad message prefix");
     }
+    msg.advance(prefix.size());
+    if (!msg.endsWith(kMsgSuffix)) {
+      throw std::runtime_error("bad message suffix");
+    }
+    msg.subtract(kMsgSuffix.size());
 
-    auto idx = prefix.size();
-    auto end = msg.find(' ', idx);
-    if (end == StringPiece::npos) {
+    // Parse then strip off the thread index
+    auto threadIDEnd = msg.find(' ');
+    if (threadIDEnd == StringPiece::npos) {
       throw std::runtime_error("no middle found");
     }
+    *threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));
+    msg.advance(threadIDEnd);
 
-    *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
-    auto rest = msg.subpiece(end);
-    if (!rest.startsWith(middle)) {
+    // Validate that the middle of the message is what we expect,
+    // then strip it off
+    constexpr StringPiece middle{" message "};
+    if (!msg.startsWith(middle)) {
       throw std::runtime_error("bad message middle");
     }
+    msg.advance(middle.size());
 
-    rest.advance(middle.size());
-    *messageIndex = folly::to<size_t>(rest);
+    // Parse the message index
+    *messageIndex = folly::to<size_t>(msg);
   }
 
-  std::vector<WriterStats> writers_;
+  /**
+   * Data about each writer thread, as recorded by the reader thread.
+   *
+   * At the end of the test we will compare perThreadReadData_ (recorded by the
+   * reader) with perThreadWriteData_ (recorded by the writers) to make sure
+   * the data matches up.
+   *
+   * This is a map from writer_thread_id to ReaderData.
+   * The writer_thread_id is extracted from the received messages.
+   *
+   * This field does not need locking as it is only updated by the single
+   * reader thread.
+   */
+  std::unordered_map<size_t, ReaderData> perThreadReadData_;
+
+  /*
+   * Additional information recorded by the reader thread.
+   */
   std::string trailingData_;
   size_t numUnableToParse_{0};
   size_t numOutOfOrder_{0};
   size_t numDiscarded_{0};
 
-  std::atomic<uint64_t> readSleepUS_;
+  /**
+   * deadline_ is a maximum end time for the test.
+   *
+   * The writer threads quit if the deadline is reached even if they have not
+   * produced the desired number of discard events yet.
+   */
+  const std::chrono::steady_clock::time_point deadline_;
+
+  /**
+   * How long the reader thread should sleep between each read event.
+   *
+   * This is initially set to a non-zero value (read from the
+   * FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads
+   * slowly, which will fill up the pipe buffer and cause discard events.
+   *
+   * Once we have produce enough discards and are ready to finish the test the
+   * main thread reduces readSleepUS_ to 0, so the reader will finish the
+   * remaining message backlog quickly.
+   */
+  std::atomic<uint64_t> readSleepUS_{0};
+
+  /**
+   * A count of how many discard events have been seen so far.
+   *
+   * The reader increments discardEventsSeen_ each time it sees a discard
+   * notification message.  A "discard event" basically corresponds to a single
+   * group of dropped messages.  Once the reader pulls some messages off out of
+   * the pipe the writers should be able to send more data, but the buffer will
+   * eventually fill up again, producing another discard event.
+   */
+  std::atomic<uint64_t> discardEventsSeen_{0};
+
+  /**
+   * Data about each writer thread, as recorded by the writers.
+   *
+   * When each writer thread finishes it records how many messages it wrote,
+   * plus the flags it used to write the messages.
+   */
+  folly::Synchronized<std::unordered_map<size_t, WriterData>>
+      perThreadWriteData_;
 };
 
 /**
@@ -366,11 +487,11 @@ void readThread(folly::File&& file, ReadStats* stats) {
     auto readResult = folly::readNoInt(
         file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
     if (readResult < 0) {
-      fprintf(stderr, "error reading from pipe: %d\n", errno);
+      XLOG(ERR, "error reading from pipe: ", errno);
       return;
     }
     if (readResult == 0) {
-      fprintf(stderr, "read EOF\n");
+      XLOG(DBG2, "read EOF");
       break;
     }
 
@@ -396,6 +517,30 @@ void readThread(folly::File&& file, ReadStats* stats) {
   }
 }
 
+/**
+ * writeThread() writes a series of messages to the AsyncFileWriter
+ */
+void writeThread(
+    AsyncFileWriter* writer,
+    size_t id,
+    uint32_t flags,
+    ReadStats* readStats) {
+  size_t msgID = 0;
+  while (true) {
+    ++msgID;
+    writer->writeMessage(
+        folly::to<std::string>(
+            "thread ", id, " message ", msgID, kMsgSuffix, '\n'),
+        flags);
+
+    // Break out once the reader has seen enough discards
+    if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) {
+      readStats->writerFinished(id, msgID, flags);
+      break;
+    }
+  }
+}
+
 /*
  * The discard test spawns a number of threads that each write a large number
  * of messages quickly.  The AsyncFileWriter writes to a pipe, an a separate
@@ -414,46 +559,41 @@ TEST(AsyncFileWriter, discard) {
   folly::File readPipe{fds[0], true};
   folly::File writePipe{fds[1], true};
 
-  // This test should always be run with at least 2 writer threads.
-  // The last one will use the NEVER_DISCARD flag, and we want at least
-  // one that does discard messages.
-  ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
-
   ReadStats readStats;
   std::thread reader(readThread, std::move(readPipe), &readStats);
   {
     AsyncFileWriter writer{std::move(writePipe)};
 
     std::vector<std::thread> writeThreads;
-    for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
+    size_t numThreads = FLAGS_async_discard_num_normal_writers +
+        FLAGS_async_discard_num_nodiscard_writers;
+
+    for (size_t n = 0; n < numThreads; ++n) {
       uint32_t flags = 0;
-      // Configure the last writer thread to never drop messages
-      if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
+      if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
         flags = LogWriter::NEVER_DISCARD;
       }
+      XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags);
 
-      writeThreads.emplace_back(
-          writeThread,
-          &writer,
-          n,
-          FLAGS_async_discard_messages_per_writer,
-          flags);
+      writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats);
     }
 
     for (auto& t : writeThreads) {
       t.join();
     }
-    fprintf(stderr, "writers done\n");
+    XLOG(DBG2, "writers done");
   }
   // Clear the read sleep duration so the reader will finish quickly now
   readStats.clearSleepDuration();
   reader.join();
-  readStats.check(
-      FLAGS_async_discard_num_writer_threads,
-      FLAGS_async_discard_messages_per_writer);
-  // Check that no messages were dropped from the thread using the
-  // NEVER_DISCARD flag.
-  readStats.checkNoDrops(
-      FLAGS_async_discard_num_writer_threads - 1,
-      FLAGS_async_discard_messages_per_writer);
+  readStats.check();
+}
+
+int main(int argc, char* argv[]) {
+  testing::InitGoogleTest(&argc, argv);
+  folly::init(&argc, &argv);
+  // Don't use async logging in the async logging tests :-)
+  folly::initLoggingGlogStyle(FLAGS_logging, LogLevel::INFO, /* async */ false);
+
+  return RUN_ALL_TESTS();
 }