Make Observer.Stress test not fail under load
[folly.git] / folly / experimental / io / AsyncIO.cpp
index addcc7e177850323c2e654bf281cddba4968f9c7..c3bde31ec668f0c15fb81a25dfa956508d59b1c6 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#include "folly/experimental/io/AsyncIO.h"
+#include <folly/experimental/io/AsyncIO.h>
 
-#include <unistd.h>
+#include <sys/eventfd.h>
 #include <cerrno>
+#include <ostream>
+#include <stdexcept>
 #include <string>
 
 #include <boost/intrusive/parent_from_member.hpp>
 #include <glog/logging.h>
 
-#include "folly/Exception.h"
-#include "folly/Format.h"
-#include "folly/Likely.h"
-#include "folly/String.h"
-#include "folly/eventfd.h"
+#include <folly/Exception.h>
+#include <folly/Format.h>
+#include <folly/Likely.h>
+#include <folly/String.h>
+#include <folly/portability/Unistd.h>
 
 namespace folly {
 
@@ -104,7 +106,9 @@ void AsyncIOOp::init() {
 
 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
   : ctx_(0),
+    ctxSet_(false),
     pending_(0),
+    submitted_(0),
     capacity_(capacity),
     pollFd_(-1) {
   CHECK_GT(capacity_, 0);
@@ -126,36 +130,71 @@ AsyncIO::~AsyncIO() {
   }
 }
 
+void AsyncIO::decrementPending() {
+  auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
+  DCHECK_GE(p, 1);
+}
+
 void AsyncIO::initializeContext() {
-  if (!ctx_) {
-    int rc = io_queue_init(capacity_, &ctx_);
-    // returns negative errno
-    checkKernelError(rc, "AsyncIO: io_queue_init failed");
-    DCHECK(ctx_);
+  if (!ctxSet_.load(std::memory_order_acquire)) {
+    std::lock_guard<std::mutex> lock(initMutex_);
+    if (!ctxSet_.load(std::memory_order_relaxed)) {
+      int rc = io_queue_init(capacity_, &ctx_);
+      // returns negative errno
+      if (rc == -EAGAIN) {
+        long aio_nr, aio_max;
+        std::unique_ptr<FILE, int(*)(FILE*)>
+          fp(fopen("/proc/sys/fs/aio-nr", "r"), fclose);
+        PCHECK(fp);
+        CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
+
+        std::unique_ptr<FILE, int(*)(FILE*)>
+          aio_max_fp(fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
+        PCHECK(aio_max_fp);
+        CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
+
+        LOG(ERROR) << "No resources for requested capacity of " << capacity_;
+        LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
+      }
+
+      checkKernelError(rc, "AsyncIO: io_queue_init failed");
+      DCHECK(ctx_);
+      ctxSet_.store(true, std::memory_order_release);
+    }
   }
 }
 
 void AsyncIO::submit(Op* op) {
   CHECK_EQ(op->state(), Op::State::INITIALIZED);
-  CHECK_LT(pending_, capacity_) << "too many pending requests";
   initializeContext();  // on demand
+
+  // We can increment past capacity, but we'll clean up after ourselves.
+  auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
+  if (p >= capacity_) {
+    decrementPending();
+    throw std::range_error("AsyncIO: too many pending requests");
+  }
   iocb* cb = &op->iocb_;
   cb->data = nullptr;  // unused
   if (pollFd_ != -1) {
     io_set_eventfd(cb, pollFd_);
   }
   int rc = io_submit(ctx_, 1, &cb);
-  checkKernelError(rc, "AsyncIO: io_submit failed");
+  if (rc < 0) {
+    decrementPending();
+    throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
+  }
+  submitted_++;
   DCHECK_EQ(rc, 1);
   op->start();
-  ++pending_;
 }
 
 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
   CHECK(ctx_);
   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
-  CHECK_LE(minRequests, pending_);
-  return doWait(minRequests, pending_);
+  auto p = pending_.load(std::memory_order_acquire);
+  CHECK_LE(minRequests, p);
+  return doWait(minRequests, p);
 }
 
 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
@@ -182,15 +221,27 @@ Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
 }
 
 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
-  io_event events[pending_];
-  int count;
+  io_event events[maxRequests];
+
+  size_t count = 0;
   do {
-    // Wait forever
-    count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
-  } while (count == -EINTR);
-  checkKernelError(count, "AsyncIO: io_getevents failed");
-  DCHECK_GE(count, minRequests);  // the man page says so
-  DCHECK_LE(count, pending_);
+    int ret;
+    do {
+      // GOTCHA: io_getevents() may returns less than min_nr results if
+      // interrupted after some events have been read (if before, -EINTR
+      // is returned).
+      ret = io_getevents(ctx_,
+                         minRequests - count,
+                         maxRequests - count,
+                         events + count,
+                         /* timeout */ nullptr);  // wait forever
+    } while (ret == -EINTR);
+    // Check as may not be able to recover without leaking events.
+    CHECK_GE(ret, 0)
+      << "AsyncIO: io_getevents failed with error " << errnoStr(-ret);
+    count += ret;
+  } while (count < minRequests);
+  DCHECK_LE(count, maxRequests);
 
   completed_.clear();
   if (count == 0) {
@@ -201,7 +252,7 @@ Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
     DCHECK(events[i].obj);
     Op* op = boost::intrusive::get_parent_from_member(
         events[i].obj, &AsyncIOOp::iocb_);
-    --pending_;
+    decrementPending();
     op->complete(events[i].res);
     completed_.push_back(op);
   }
@@ -226,9 +277,7 @@ void AsyncIOQueue::submit(OpFactory op) {
   maybeDequeue();
 }
 
-void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
-  maybeDequeue();
-}
+void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
 
 void AsyncIOQueue::maybeDequeue() {
   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
@@ -238,9 +287,9 @@ void AsyncIOQueue::maybeDequeue() {
 
     // Interpose our completion callback
     auto& nextCb = op->notificationCallback();
-    op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
-      this->onCompleted(op);
-      if (nextCb) nextCb(op);
+    op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
+      this->onCompleted(op2);
+      if (nextCb) nextCb(op2);
     });
 
     asyncIO_->submit(op);
@@ -297,11 +346,13 @@ std::ostream& operator<<(std::ostream& os, const iocb& cb) {
   switch (cb.aio_lio_opcode) {
     case IO_CMD_PREAD:
     case IO_CMD_PWRITE:
-      os << folly::format("buf={}, off={}, size={}, ",
-                          cb.u.c.buf, cb.u.c.nbytes, cb.u.c.offset);
+      os << folly::format("buf={}, offset={}, nbytes={}, ",
+                          cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
+      break;
     default:
       os << "[TODO: write debug string for "
          << iocbCmdToString(cb.aio_lio_opcode) << "] ";
+      break;
   }
 
   return os;
@@ -317,7 +368,11 @@ std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
   }
 
   if (op.state_ == AsyncIOOp::State::COMPLETED) {
-    os << "result=" << op.result_ << ", ";
+    os << "result=" << op.result_;
+    if (op.result_ < 0) {
+      os << " (" << errnoStr(-op.result_) << ')';
+    }
+    os << ", ";
   }
 
   return os << "}";
@@ -328,4 +383,3 @@ std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
 }
 
 }  // namespace folly
-