logging: add a LogHandler::flush() call
authorAdam Simpkins <simpkins@fb.com>
Tue, 20 Jun 2017 18:01:57 +0000 (11:01 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Tue, 20 Jun 2017 18:06:44 +0000 (11:06 -0700)
Summary:
Add a flush() call to the LogHandler interface.  This is needed to implement
`FB_LOG(FATAL)` so that we can flush all LogHandlers before aborting the
program.

Reviewed By: wez

Differential Revision: D5189499

fbshipit-source-id: 75fa4d7e75ea26de5b7383bf7e8d073fb37e9309

folly/experimental/logging/AsyncFileWriter.h
folly/experimental/logging/ImmediateFileWriter.cpp
folly/experimental/logging/ImmediateFileWriter.h
folly/experimental/logging/LogHandler.h
folly/experimental/logging/LogWriter.h
folly/experimental/logging/StandardLogHandler.cpp
folly/experimental/logging/StandardLogHandler.h
folly/experimental/logging/test/AsyncFileWriterTest.cpp
folly/experimental/logging/test/StandardLogHandlerTest.cpp
folly/experimental/logging/test/TestLogHandler.h

index 890ae80..2fb2e45 100644 (file)
@@ -63,7 +63,7 @@ class AsyncFileWriter : public LogWriter {
    * Block until the I/O thread has finished writing all messages that
    * were already enqueued when flush() was called.
    */
-  void flush();
+  void flush() override;
 
  private:
   /*
index 4135be6..2035efd 100644 (file)
@@ -47,4 +47,6 @@ void ImmediateFileWriter::writeMessage(
         errnoStr(errnum));
   }
 }
+
+void ImmediateFileWriter::flush() {}
 }
index 9703cca..d21c25e 100644 (file)
@@ -48,6 +48,7 @@ class ImmediateFileWriter : public LogWriter {
 
   using LogWriter::writeMessage;
   void writeMessage(folly::StringPiece buffer, uint32_t flags = 0) override;
+  void flush() override;
 
  private:
   ImmediateFileWriter(ImmediateFileWriter const&) = delete;
index 3a77887..60d1db6 100644 (file)
@@ -65,5 +65,19 @@ class LogHandler {
   virtual void handleMessage(
       const LogMessage& message,
       const LogCategory* handlerCategory) = 0;
+
+  /**
+   * Block until all messages that have already been sent to this LogHandler
+   * have been processed.
+   *
+   * For LogHandlers that perform asynchronous processing of log messages,
+   * this ensures that messages already sent to this handler have finished
+   * being processed.
+   *
+   * Other threads may still call handleMessage() while flush() is running.
+   * handleMessage() calls that did not complete before the flush() call
+   * started will not necessarily be processed by the flush call.
+   */
+  virtual void flush() = 0;
 };
 }
index 6d6158c..eba3771 100644 (file)
@@ -62,5 +62,15 @@ class LogWriter {
   virtual void writeMessage(std::string&& buffer, uint32_t flags = 0) {
     writeMessage(folly::StringPiece{buffer}, flags);
   }
+
+  /**
+   * Block until all messages that have already been sent to this LogWriter
+   * have been written.
+   *
+   * Other threads may still call writeMessage() while flush() is running.
+   * writeMessage() calls that did not complete before the flush() call started
+   * will not necessarily be processed by the flush call.
+   */
+  virtual void flush() = 0;
 };
 }
index 4da5505..51debda 100644 (file)
@@ -36,4 +36,8 @@ void StandardLogHandler::handleMessage(
   }
   writer_->writeMessage(formatter_->formatMessage(message, handlerCategory));
 }
+
+void StandardLogHandler::flush() {
+  writer_->flush();
+}
 }
index dfd69c5..7fd0820 100644 (file)
@@ -67,6 +67,8 @@ class StandardLogHandler : public LogHandler {
       const LogMessage& message,
       const LogCategory* handlerCategory) override;
 
+  void flush() override;
+
  private:
   std::atomic<LogLevel> level_{LogLevel::NONE};
   std::shared_ptr<LogFormatter> formatter_;
index 1f98a42..9e4694d 100644 (file)
@@ -13,6 +13,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <thread>
+
 #include <folly/Conv.h>
 #include <folly/Exception.h>
 #include <folly/File.h>
@@ -21,6 +23,8 @@
 #include <folly/experimental/TestUtil.h>
 #include <folly/experimental/logging/AsyncFileWriter.h>
 #include <folly/experimental/logging/LoggerDB.h>
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
 #include <folly/portability/GFlags.h>
 #include <folly/portability/GMock.h>
 #include <folly/portability/GTest.h>
@@ -133,6 +137,84 @@ TEST(AsyncFileWriter, ioError) {
   EXPECT_GT(logErrors.size(), 0);
   EXPECT_LE(logErrors.size(), numMessages);
 }
+
+namespace {
+size_t fillUpPipe(int fd) {
+  int flags = fcntl(fd, F_GETFL);
+  folly::checkUnixError(flags, "failed get file descriptor flags");
+  auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+  folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
+  std::vector<char> data;
+  data.resize(4000);
+  size_t totalBytes = 0;
+  size_t bytesToWrite = data.size();
+  while (true) {
+    auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
+    if (bytesWritten < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        // We blocked.  Keep trying smaller writes, until we get down to a
+        // single byte, just to make sure the logging code really won't be able
+        // to write anything to the pipe.
+        if (bytesToWrite <= 1) {
+          break;
+        } else {
+          bytesToWrite /= 2;
+        }
+      } else {
+        throwSystemError("error writing to pipe");
+      }
+    } else {
+      totalBytes += bytesWritten;
+    }
+  }
+  fprintf(stderr, "pipe filled up after %zu bytes\n", totalBytes);
+
+  rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+  folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
+
+  return totalBytes;
+}
+}
+
+TEST(AsyncFileWriter, flush) {
+  // Set up a pipe(), then write data to the write endpoint until it fills up
+  // and starts blocking.
+  std::array<int, 2> fds;
+  auto rc = pipe(fds.data());
+  folly::checkUnixError(rc, "failed to create pipe");
+  File readPipe{fds[0], true};
+  File writePipe{fds[1], true};
+
+  auto paddingSize = fillUpPipe(writePipe.fd());
+
+  // Now set up an AsyncFileWriter pointing at the write end of the pipe
+  AsyncFileWriter writer{std::move(writePipe)};
+
+  // Write a message
+  writer.writeMessage(std::string{"test message"});
+
+  // Call flush().  Use a separate thread, since this should block until we
+  // consume data from the pipe.
+  Promise<Unit> promise;
+  auto future = promise.getFuture();
+  auto flushFunction = [&] { writer.flush(); };
+  std::thread flushThread{
+      [&]() { promise.setTry(makeTryWith(flushFunction)); }};
+
+  // Sleep briefly, and make sure flush() still hasn't completed.
+  /* sleep override */
+  std::this_thread::sleep_for(10ms);
+  EXPECT_FALSE(future.isReady());
+
+  // Now read from the pipe
+  std::vector<char> buf;
+  buf.resize(paddingSize);
+  readFull(readPipe.fd(), buf.data(), buf.size());
+
+  // Make sure flush completes successfully now
+  future.get(10ms);
+  flushThread.join();
+}
 #endif
 
 /**
@@ -279,7 +361,7 @@ void readThread(folly::File&& file, ReadStats* stats) {
   size_t bufferIdx = 0;
   while (true) {
     /* sleep override */
-    usleep(stats->getSleepUS().count());
+    std::this_thread::sleep_for(stats->getSleepUS());
 
     auto readResult = folly::readNoInt(
         file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
index 8ea4efe..6a1f78d 100644 (file)
@@ -53,6 +53,7 @@ class TestLogWriter : public LogWriter {
       override {
     messages_.emplace_back(buffer.str());
   }
+  void flush() override {}
 
   std::vector<std::string>& getMessages() {
     return messages_;
index b35c47a..ae74313 100644 (file)
@@ -38,6 +38,8 @@ class TestLogHandler : public LogHandler {
     messages_.emplace_back(message, handlerCategory);
   }
 
+  void flush() override {}
+
  private:
   std::vector<std::pair<LogMessage, const LogCategory*>> messages_;
 };