logging: add a LogHandler::flush() call
[folly.git] / folly / experimental / logging / test / AsyncFileWriterTest.cpp
index 1f98a4219db73298eb4eb37bbde31edd5ab5b4d0..9e4694df375e9fd43016a02be01631d716e8e463 100644 (file)
@@ -13,6 +13,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <thread>
+
 #include <folly/Conv.h>
 #include <folly/Exception.h>
 #include <folly/File.h>
@@ -21,6 +23,8 @@
 #include <folly/experimental/TestUtil.h>
 #include <folly/experimental/logging/AsyncFileWriter.h>
 #include <folly/experimental/logging/LoggerDB.h>
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
 #include <folly/portability/GFlags.h>
 #include <folly/portability/GMock.h>
 #include <folly/portability/GTest.h>
@@ -133,6 +137,84 @@ TEST(AsyncFileWriter, ioError) {
   EXPECT_GT(logErrors.size(), 0);
   EXPECT_LE(logErrors.size(), numMessages);
 }
+
+namespace {
+size_t fillUpPipe(int fd) {
+  int flags = fcntl(fd, F_GETFL);
+  folly::checkUnixError(flags, "failed get file descriptor flags");
+  auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+  folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
+  std::vector<char> data;
+  data.resize(4000);
+  size_t totalBytes = 0;
+  size_t bytesToWrite = data.size();
+  while (true) {
+    auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
+    if (bytesWritten < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        // We blocked.  Keep trying smaller writes, until we get down to a
+        // single byte, just to make sure the logging code really won't be able
+        // to write anything to the pipe.
+        if (bytesToWrite <= 1) {
+          break;
+        } else {
+          bytesToWrite /= 2;
+        }
+      } else {
+        throwSystemError("error writing to pipe");
+      }
+    } else {
+      totalBytes += bytesWritten;
+    }
+  }
+  fprintf(stderr, "pipe filled up after %zu bytes\n", totalBytes);
+
+  rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+  folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
+
+  return totalBytes;
+}
+}
+
+TEST(AsyncFileWriter, flush) {
+  // Set up a pipe(), then write data to the write endpoint until it fills up
+  // and starts blocking.
+  std::array<int, 2> fds;
+  auto rc = pipe(fds.data());
+  folly::checkUnixError(rc, "failed to create pipe");
+  File readPipe{fds[0], true};
+  File writePipe{fds[1], true};
+
+  auto paddingSize = fillUpPipe(writePipe.fd());
+
+  // Now set up an AsyncFileWriter pointing at the write end of the pipe
+  AsyncFileWriter writer{std::move(writePipe)};
+
+  // Write a message
+  writer.writeMessage(std::string{"test message"});
+
+  // Call flush().  Use a separate thread, since this should block until we
+  // consume data from the pipe.
+  Promise<Unit> promise;
+  auto future = promise.getFuture();
+  auto flushFunction = [&] { writer.flush(); };
+  std::thread flushThread{
+      [&]() { promise.setTry(makeTryWith(flushFunction)); }};
+
+  // Sleep briefly, and make sure flush() still hasn't completed.
+  /* sleep override */
+  std::this_thread::sleep_for(10ms);
+  EXPECT_FALSE(future.isReady());
+
+  // Now read from the pipe
+  std::vector<char> buf;
+  buf.resize(paddingSize);
+  readFull(readPipe.fd(), buf.data(), buf.size());
+
+  // Make sure flush completes successfully now
+  future.get(10ms);
+  flushThread.join();
+}
 #endif
 
 /**
@@ -279,7 +361,7 @@ void readThread(folly::File&& file, ReadStats* stats) {
   size_t bufferIdx = 0;
   while (true) {
     /* sleep override */
-    usleep(stats->getSleepUS().count());
+    std::this_thread::sleep_for(stats->getSleepUS());
 
     auto readResult = folly::readNoInt(
         file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);