logging: improve the AsyncFileWriterTest discard test
authorAdam Simpkins <simpkins@fb.com>
Wed, 21 Jun 2017 02:44:16 +0000 (19:44 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 21 Jun 2017 02:50:53 +0000 (19:50 -0700)
Summary:
This improves the test that exercises the AsyncFileWriter message discarding
logic.

Previously each writer thread wrote a fixed number of small messages.  This was
fairly slow, and wasn't always guaranteed to reliably produce discards.
Now each writer thread writes larger messages, which produce discards faster.
The test also automatically stops after 10 separate discard events, so that it
finishes faster (typically a few hundred milliseconds, rather than 5+ seconds).

This also updates the test to use XLOG() internally rather than using fprintf()
to print to stderr, now that the XLOG() diffs have landed.

Reviewed By: wez

Differential Revision: D5261059

fbshipit-source-id: 120224706fee36948ef76efbeb579ccc56400c51

folly/experimental/logging/test/AsyncFileWriterTest.cpp

index 9e4694df375e9fd43016a02be01631d716e8e463..7b03ef5ee229feddddec48791c2844d626ba4d6e 100644 (file)
 #include <folly/File.h>
 #include <folly/FileUtil.h>
 #include <folly/String.h>
+#include <folly/Synchronized.h>
 #include <folly/experimental/TestUtil.h>
 #include <folly/experimental/logging/AsyncFileWriter.h>
+#include <folly/experimental/logging/Init.h>
 #include <folly/experimental/logging/LoggerDB.h>
+#include <folly/experimental/logging/xlog.h>
 #include <folly/futures/Future.h>
 #include <folly/futures/Promise.h>
+#include <folly/init/Init.h>
 #include <folly/portability/GFlags.h>
 #include <folly/portability/GMock.h>
 #include <folly/portability/GTest.h>
 #include <folly/portability/Unistd.h>
 
+DEFINE_string(logging, "", "folly::logging configuration");
 DEFINE_int64(
-    async_discard_num_writer_threads,
-    32,
-    "number of threads to use to generate log messages during "
+    async_discard_num_normal_writers,
+    30,
+    "number of threads to use to generate normal log messages during "
     "the AsyncFileWriter.discard test");
 DEFINE_int64(
-    async_discard_messages_per_writer,
-    200000,
-    "number of messages each writer threads should generate in "
+    async_discard_num_nodiscard_writers,
+    2,
+    "number of threads to use to generate non-discardable log messages during "
     "the AsyncFileWriter.discard test");
 DEFINE_int64(
     async_discard_read_sleep_usec,
     500,
     "how long the read thread should sleep between reads in "
     "the AsyncFileWriter.discard test");
+DEFINE_int64(
+    async_discard_timeout_msec,
+    10000,
+    "A timeout for the AsyncFileWriter.discard test if it cannot generate "
+    "enough discards");
+DEFINE_int64(
+    async_discard_num_events,
+    10,
+    "The number of discard events to wait for in the AsyncFileWriter.discard "
+    "test");
 
 using namespace folly;
 using namespace std::literals::chrono_literals;
 using folly::test::TemporaryFile;
+using std::chrono::steady_clock;
+using std::chrono::milliseconds;
 
 TEST(AsyncFileWriter, noMessages) {
   TemporaryFile tmpFile{"logging_test"};
@@ -167,7 +184,7 @@ size_t fillUpPipe(int fd) {
       totalBytes += bytesWritten;
     }
   }
-  fprintf(stderr, "pipe filled up after %zu bytes\n", totalBytes);
+  XLOG(DBG1, "pipe filled up after ", totalBytes, " bytes");
 
   rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
   folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
@@ -217,24 +234,20 @@ TEST(AsyncFileWriter, flush) {
 }
 #endif
 
-/**
- * writeThread() writes a series of messages to the AsyncFileWriter
- */
-void writeThread(
-    AsyncFileWriter* writer,
-    size_t id,
-    size_t numMessages,
-    uint32_t flags) {
-  for (size_t n = 0; n < numMessages; ++n) {
-    writer->writeMessage(
-        folly::to<std::string>("thread ", id, " message ", n + 1, '\n'), flags);
-  }
-}
+// A large-ish message suffix, just to consume space and help fill up
+// log buffers faster.
+static constexpr StringPiece kMsgSuffix{
+    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+    "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"};
 
 class ReadStats {
  public:
   ReadStats()
-      : readSleepUS_{static_cast<uint64_t>(
+      : deadline_{steady_clock::now() +
+                  milliseconds{FLAGS_async_discard_timeout_msec}},
+        readSleepUS_{static_cast<uint64_t>(
             std::min(0L, FLAGS_async_discard_read_sleep_usec))} {}
 
   void clearSleepDuration() {
@@ -244,36 +257,85 @@ class ReadStats {
     return std::chrono::microseconds{readSleepUS_.load()};
   }
 
-  void check(size_t numThreads, size_t messagesPerThread) {
-    EXPECT_EQ("", trailingData_);
-    EXPECT_EQ(numThreads, writers_.size());
-    size_t totalMessagesReceived = 0;
-    for (const auto& writerData : writers_) {
-      EXPECT_LE(writerData.numMessages, messagesPerThread);
-      EXPECT_LE(writerData.lastId, messagesPerThread);
-      totalMessagesReceived += writerData.numMessages;
+  bool shouldWriterStop() const {
+    // Stop after we have seen the required number of separate discard events.
+    // We stop based on discardEventsSeen_ rather than numDiscarded_ since this
+    // ensures the async writer blocks and then makes progress again multiple
+    // times.
+    if (FLAGS_async_discard_num_events > 0 &&
+        discardEventsSeen_.load() >
+            static_cast<uint64_t>(FLAGS_async_discard_num_events)) {
+      return true;
     }
 
+    // Stop after a timeout, even if we don't hit the number of requested
+    // discards.
+    return steady_clock::now() > deadline_;
+  }
+  void writerFinished(size_t threadID, size_t messagesWritten, uint32_t flags) {
+    auto map = perThreadWriteData_.wlock();
+    assert(map->find(threadID) == map->end());
+    auto& data = (*map)[threadID];
+    data.numMessagesWritten = messagesWritten;
+    data.flags = flags;
+  }
+
+  void check() {
+    auto writeDataMap = perThreadWriteData_.wlock();
+
+    EXPECT_EQ("", trailingData_);
     EXPECT_EQ(0, numUnableToParse_);
     EXPECT_EQ(0, numOutOfOrder_);
-    EXPECT_EQ(
-        numThreads * messagesPerThread, totalMessagesReceived + numDiscarded_);
-  }
 
-  /**
-   * Check that no messages were dropped from the specified thread.
-   */
-  void checkNoDrops(size_t threadIndex, size_t messagesPerThread) {
-    EXPECT_EQ(writers_[threadIndex].numMessages, messagesPerThread);
-    EXPECT_EQ(writers_[threadIndex].lastId, messagesPerThread);
+    // Check messages received from each writer thread
+    size_t readerStatsChecked = 0;
+    size_t totalMessagesWritten = 0;
+    size_t totalMessagesRead = 0;
+    for (const auto& writeEntry : *writeDataMap) {
+      const auto& writeInfo = writeEntry.second;
+      totalMessagesWritten += writeInfo.numMessagesWritten;
+
+      auto iter = perThreadReadData_.find(writeEntry.first);
+      if (iter == perThreadReadData_.end()) {
+        // We never received any messages from this writer thread.
+        // This is okay as long as this is not a NEVER_DISCARD writer.
+        EXPECT_EQ(0, writeInfo.flags);
+        continue;
+      }
+      const auto& readInfo = iter->second;
+      ++readerStatsChecked;
+      totalMessagesRead += readInfo.numMessagesRead;
+      if (writeInfo.flags & LogWriter::NEVER_DISCARD) {
+        // Non-discarding threads should never discard anything
+        EXPECT_EQ(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
+        EXPECT_EQ(readInfo.lastId, writeInfo.numMessagesWritten);
+      } else {
+        // Other threads may have discarded some messages
+        EXPECT_LE(readInfo.numMessagesRead, writeInfo.numMessagesWritten);
+        EXPECT_LE(readInfo.lastId, writeInfo.numMessagesWritten);
+      }
+    }
+
+    EXPECT_EQ(totalMessagesWritten, totalMessagesRead + numDiscarded_);
+    EXPECT_EQ(readerStatsChecked, perThreadReadData_.size());
+
+    // This test is intended to check the discard behavior.
+    // Fail the test if we didn't actually trigger any discards before we timed
+    // out.
+    EXPECT_GT(numDiscarded_, 0);
+
+    XLOG(DBG1) << totalMessagesWritten << " messages written, "
+               << totalMessagesRead << " messages read, " << numDiscarded_
+               << " messages discarded";
   }
 
   void messageReceived(StringPiece msg) {
     if (msg.endsWith(" log messages discarded: "
                      "logging faster than we can write")) {
       auto discardCount = folly::to<size_t>(msg.subpiece(0, msg.find(' ')));
-      fprintf(stderr, "received discard notification: %zu\n", discardCount);
+      XLOG(DBG3, "received discard notification: ", discardCount);
       numDiscarded_ += discardCount;
+      ++discardEventsSeen_;
       return;
     }
 
@@ -283,28 +345,18 @@ class ReadStats {
       parseMessage(msg, &threadID, &messageIndex);
     } catch (const std::exception& ex) {
       ++numUnableToParse_;
-      fprintf(
-          stderr,
-          "unable to parse log message: %s\n",
-          folly::humanify(msg.str()).c_str());
+      XLOG(ERR, "unable to parse log message: ", msg);
       return;
     }
 
-    if (threadID >= writers_.size()) {
-      writers_.resize(threadID + 1);
-    }
-    writers_[threadID].numMessages++;
-    if (messageIndex > writers_[threadID].lastId) {
-      writers_[threadID].lastId = messageIndex;
+    auto& data = perThreadReadData_[threadID];
+    data.numMessagesRead++;
+    if (messageIndex > data.lastId) {
+      data.lastId = messageIndex;
     } else {
       ++numOutOfOrder_;
-      fprintf(
-          stderr,
-          "received out-of-order messages from writer %zu: "
-          "%zu received after %zu\n",
-          threadID,
-          messageIndex,
-          writers_[threadID].lastId);
+      XLOG(ERR) << "received out-of-order messages from writer " << threadID
+                << ": " << messageIndex << " received after " << data.lastId;
     }
   }
 
@@ -313,41 +365,110 @@ class ReadStats {
   }
 
  private:
-  struct WriterStats {
-    size_t numMessages{0};
+  struct ReaderData {
+    size_t numMessagesRead{0};
     size_t lastId{0};
   };
+  struct WriterData {
+    size_t numMessagesWritten{0};
+    int flags{0};
+  };
 
   void parseMessage(StringPiece msg, size_t* threadID, size_t* messageIndex) {
+    // Validate and strip off the message prefix and suffix
     constexpr StringPiece prefix{"thread "};
-    constexpr StringPiece middle{" message "};
     if (!msg.startsWith(prefix)) {
       throw std::runtime_error("bad message prefix");
     }
+    msg.advance(prefix.size());
+    if (!msg.endsWith(kMsgSuffix)) {
+      throw std::runtime_error("bad message suffix");
+    }
+    msg.subtract(kMsgSuffix.size());
 
-    auto idx = prefix.size();
-    auto end = msg.find(' ', idx);
-    if (end == StringPiece::npos) {
+    // Parse then strip off the thread index
+    auto threadIDEnd = msg.find(' ');
+    if (threadIDEnd == StringPiece::npos) {
       throw std::runtime_error("no middle found");
     }
+    *threadID = folly::to<size_t>(msg.subpiece(0, threadIDEnd));
+    msg.advance(threadIDEnd);
 
-    *threadID = folly::to<size_t>(msg.subpiece(idx, end - idx));
-    auto rest = msg.subpiece(end);
-    if (!rest.startsWith(middle)) {
+    // Validate that the middle of the message is what we expect,
+    // then strip it off
+    constexpr StringPiece middle{" message "};
+    if (!msg.startsWith(middle)) {
       throw std::runtime_error("bad message middle");
     }
+    msg.advance(middle.size());
 
-    rest.advance(middle.size());
-    *messageIndex = folly::to<size_t>(rest);
+    // Parse the message index
+    *messageIndex = folly::to<size_t>(msg);
   }
 
-  std::vector<WriterStats> writers_;
+  /**
+   * Data about each writer thread, as recorded by the reader thread.
+   *
+   * At the end of the test we will compare perThreadReadData_ (recorded by the
+   * reader) with perThreadWriteData_ (recorded by the writers) to make sure
+   * the data matches up.
+   *
+   * This is a map from writer_thread_id to ReaderData.
+   * The writer_thread_id is extracted from the received messages.
+   *
+   * This field does not need locking as it is only updated by the single
+   * reader thread.
+   */
+  std::unordered_map<size_t, ReaderData> perThreadReadData_;
+
+  /*
+   * Additional information recorded by the reader thread.
+   */
   std::string trailingData_;
   size_t numUnableToParse_{0};
   size_t numOutOfOrder_{0};
   size_t numDiscarded_{0};
 
-  std::atomic<uint64_t> readSleepUS_;
+  /**
+   * deadline_ is a maximum end time for the test.
+   *
+   * The writer threads quit if the deadline is reached even if they have not
+   * produced the desired number of discard events yet.
+   */
+  const std::chrono::steady_clock::time_point deadline_;
+
+  /**
+   * How long the reader thread should sleep between each read event.
+   *
+   * This is initially set to a non-zero value (read from the
+   * FLAGS_async_discard_read_sleep_usec flag) so that the reader thread reads
+   * slowly, which will fill up the pipe buffer and cause discard events.
+   *
+   * Once we have produce enough discards and are ready to finish the test the
+   * main thread reduces readSleepUS_ to 0, so the reader will finish the
+   * remaining message backlog quickly.
+   */
+  std::atomic<uint64_t> readSleepUS_{0};
+
+  /**
+   * A count of how many discard events have been seen so far.
+   *
+   * The reader increments discardEventsSeen_ each time it sees a discard
+   * notification message.  A "discard event" basically corresponds to a single
+   * group of dropped messages.  Once the reader pulls some messages off out of
+   * the pipe the writers should be able to send more data, but the buffer will
+   * eventually fill up again, producing another discard event.
+   */
+  std::atomic<uint64_t> discardEventsSeen_{0};
+
+  /**
+   * Data about each writer thread, as recorded by the writers.
+   *
+   * When each writer thread finishes it records how many messages it wrote,
+   * plus the flags it used to write the messages.
+   */
+  folly::Synchronized<std::unordered_map<size_t, WriterData>>
+      perThreadWriteData_;
 };
 
 /**
@@ -366,11 +487,11 @@ void readThread(folly::File&& file, ReadStats* stats) {
     auto readResult = folly::readNoInt(
         file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
     if (readResult < 0) {
-      fprintf(stderr, "error reading from pipe: %d\n", errno);
+      XLOG(ERR, "error reading from pipe: ", errno);
       return;
     }
     if (readResult == 0) {
-      fprintf(stderr, "read EOF\n");
+      XLOG(DBG2, "read EOF");
       break;
     }
 
@@ -396,6 +517,30 @@ void readThread(folly::File&& file, ReadStats* stats) {
   }
 }
 
+/**
+ * writeThread() writes a series of messages to the AsyncFileWriter
+ */
+void writeThread(
+    AsyncFileWriter* writer,
+    size_t id,
+    uint32_t flags,
+    ReadStats* readStats) {
+  size_t msgID = 0;
+  while (true) {
+    ++msgID;
+    writer->writeMessage(
+        folly::to<std::string>(
+            "thread ", id, " message ", msgID, kMsgSuffix, '\n'),
+        flags);
+
+    // Break out once the reader has seen enough discards
+    if (((msgID & 0xff) == 0) && readStats->shouldWriterStop()) {
+      readStats->writerFinished(id, msgID, flags);
+      break;
+    }
+  }
+}
+
 /*
  * The discard test spawns a number of threads that each write a large number
  * of messages quickly.  The AsyncFileWriter writes to a pipe, an a separate
@@ -414,46 +559,41 @@ TEST(AsyncFileWriter, discard) {
   folly::File readPipe{fds[0], true};
   folly::File writePipe{fds[1], true};
 
-  // This test should always be run with at least 2 writer threads.
-  // The last one will use the NEVER_DISCARD flag, and we want at least
-  // one that does discard messages.
-  ASSERT_GT(FLAGS_async_discard_num_writer_threads, 2);
-
   ReadStats readStats;
   std::thread reader(readThread, std::move(readPipe), &readStats);
   {
     AsyncFileWriter writer{std::move(writePipe)};
 
     std::vector<std::thread> writeThreads;
-    for (int n = 0; n < FLAGS_async_discard_num_writer_threads; ++n) {
+    size_t numThreads = FLAGS_async_discard_num_normal_writers +
+        FLAGS_async_discard_num_nodiscard_writers;
+
+    for (size_t n = 0; n < numThreads; ++n) {
       uint32_t flags = 0;
-      // Configure the last writer thread to never drop messages
-      if (n == (FLAGS_async_discard_num_writer_threads - 1)) {
+      if (n >= static_cast<size_t>(FLAGS_async_discard_num_normal_writers)) {
         flags = LogWriter::NEVER_DISCARD;
       }
+      XLOGF(DBG4, "writer {:4d} flags {:#02x}", n, flags);
 
-      writeThreads.emplace_back(
-          writeThread,
-          &writer,
-          n,
-          FLAGS_async_discard_messages_per_writer,
-          flags);
+      writeThreads.emplace_back(writeThread, &writer, n, flags, &readStats);
     }
 
     for (auto& t : writeThreads) {
       t.join();
     }
-    fprintf(stderr, "writers done\n");
+    XLOG(DBG2, "writers done");
   }
   // Clear the read sleep duration so the reader will finish quickly now
   readStats.clearSleepDuration();
   reader.join();
-  readStats.check(
-      FLAGS_async_discard_num_writer_threads,
-      FLAGS_async_discard_messages_per_writer);
-  // Check that no messages were dropped from the thread using the
-  // NEVER_DISCARD flag.
-  readStats.checkNoDrops(
-      FLAGS_async_discard_num_writer_threads - 1,
-      FLAGS_async_discard_messages_per_writer);
+  readStats.check();
+}
+
+int main(int argc, char* argv[]) {
+  testing::InitGoogleTest(&argc, argv);
+  folly::init(&argc, &argv);
+  // Don't use async logging in the async logging tests :-)
+  folly::initLoggingGlogStyle(FLAGS_logging, LogLevel::INFO, /* async */ false);
+
+  return RUN_ALL_TESTS();
 }