Fix copyright lines
[folly.git] / folly / experimental / io / AsyncIO.cpp
index 2080b72038e8c15de1e2e8e9cc2d4eeedd844786..55883ff91a75da76518005b1578084f4d4569477 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2013-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#include "folly/experimental/io/AsyncIO.h"
+#include <folly/experimental/io/AsyncIO.h>
 
+#include <sys/eventfd.h>
 #include <cerrno>
+#include <ostream>
+#include <stdexcept>
+#include <string>
 
+#include <boost/intrusive/parent_from_member.hpp>
 #include <glog/logging.h>
 
-#include "folly/Exception.h"
-#include "folly/Likely.h"
-#include "folly/String.h"
-#include "folly/eventfd.h"
+#include <folly/Exception.h>
+#include <folly/Format.h>
+#include <folly/Likely.h>
+#include <folly/String.h>
+#include <folly/portability/Unistd.h>
 
 namespace folly {
 
-AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
-  : ctx_(0),
-    pending_(0),
-    capacity_(capacity),
-    pollFd_(-1) {
-  if (UNLIKELY(capacity_ == 0)) {
-    throw std::out_of_range("AsyncIO: capacity must not be 0");
+AsyncIOOp::AsyncIOOp(NotificationCallback cb)
+    : cb_(std::move(cb)), state_(State::UNINITIALIZED), result_(-EINVAL) {
+  memset(&iocb_, 0, sizeof(iocb_));
+}
+
+void AsyncIOOp::reset(NotificationCallback cb) {
+  CHECK_NE(state_, State::PENDING);
+  cb_ = std::move(cb);
+  state_ = State::UNINITIALIZED;
+  result_ = -EINVAL;
+  memset(&iocb_, 0, sizeof(iocb_));
+}
+
+AsyncIOOp::~AsyncIOOp() {
+  CHECK_NE(state_, State::PENDING);
+}
+
+void AsyncIOOp::start() {
+  DCHECK_EQ(state_, State::INITIALIZED);
+  state_ = State::PENDING;
+}
+
+void AsyncIOOp::complete(ssize_t result) {
+  DCHECK_EQ(state_, State::PENDING);
+  state_ = State::COMPLETED;
+  result_ = result;
+  if (cb_) {
+    cb_(this);
   }
+}
+
+void AsyncIOOp::cancel() {
+  DCHECK_EQ(state_, State::PENDING);
+  state_ = State::CANCELED;
+}
+
+ssize_t AsyncIOOp::result() const {
+  CHECK_EQ(state_, State::COMPLETED);
+  return result_;
+}
+
+void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
+  init();
+  io_prep_pread(&iocb_, fd, buf, size, start);
+}
+
+void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
+  pread(fd, range.begin(), range.size(), start);
+}
+
+void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
+  init();
+  io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
+}
+
+void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
+  init();
+  io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
+}
+
+void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
+  pwrite(fd, range.begin(), range.size(), start);
+}
+
+void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
+  init();
+  io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
+}
+
+void AsyncIOOp::init() {
+  CHECK_EQ(state_, State::UNINITIALIZED);
+  state_ = State::INITIALIZED;
+}
+
+AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
+  CHECK_GT(capacity_, 0);
   completed_.reserve(capacity_);
   if (pollMode == POLLABLE) {
     pollFd_ = eventfd(0, EFD_NONBLOCK);
@@ -53,89 +127,82 @@ AsyncIO::~AsyncIO() {
   }
 }
 
-void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
-  iocb cb;
-  io_prep_pread(&cb, fd, buf, size, start);
-  submit(op, &cb);
-}
-
-void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
-                    off_t start) {
-  pread(op, fd, range.begin(), range.size(), start);
-}
-
-void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
-                     off_t start) {
-  iocb cb;
-  io_prep_preadv(&cb, fd, iov, iovcnt, start);
-  submit(op, &cb);
-}
-
-void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
-                     off_t start) {
-  iocb cb;
-  io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
-  submit(op, &cb);
-}
-
-void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
-                     off_t start) {
-  pwrite(op, fd, range.begin(), range.size(), start);
-}
-
-void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
-                      off_t start) {
-  iocb cb;
-  io_prep_pwritev(&cb, fd, iov, iovcnt, start);
-  submit(op, &cb);
+void AsyncIO::decrementPending() {
+  auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
+  DCHECK_GE(p, 1);
 }
 
 void AsyncIO::initializeContext() {
-  if (!ctx_) {
-    int rc = io_queue_init(capacity_, &ctx_);
-    // returns negative errno
-    checkKernelError(rc, "AsyncIO: io_queue_init failed");
-    DCHECK(ctx_);
+  if (!ctxSet_.load(std::memory_order_acquire)) {
+    std::lock_guard<std::mutex> lock(initMutex_);
+    if (!ctxSet_.load(std::memory_order_relaxed)) {
+      int rc = io_queue_init(capacity_, &ctx_);
+      // returns negative errno
+      if (rc == -EAGAIN) {
+        long aio_nr, aio_max;
+        std::unique_ptr<FILE, int (*)(FILE*)> fp(
+            fopen("/proc/sys/fs/aio-nr", "r"), fclose);
+        PCHECK(fp);
+        CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
+
+        std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
+            fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
+        PCHECK(aio_max_fp);
+        CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
+
+        LOG(ERROR) << "No resources for requested capacity of " << capacity_;
+        LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
+      }
+
+      checkKernelError(rc, "AsyncIO: io_queue_init failed");
+      DCHECK(ctx_);
+      ctxSet_.store(true, std::memory_order_release);
+    }
   }
 }
 
-void AsyncIO::submit(Op* op, iocb* cb) {
-  if (UNLIKELY(pending_ >= capacity_)) {
-    throw std::out_of_range("AsyncIO: too many pending requests");
-  }
-  if (UNLIKELY(op->state() != Op::UNINITIALIZED)) {
-    throw std::logic_error("AsyncIO: Invalid Op state in submit");
+void AsyncIO::submit(Op* op) {
+  CHECK_EQ(op->state(), Op::State::INITIALIZED);
+  initializeContext(); // on demand
+
+  // We can increment past capacity, but we'll clean up after ourselves.
+  auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
+  if (p >= capacity_) {
+    decrementPending();
+    throw std::range_error("AsyncIO: too many pending requests");
   }
-  initializeContext();  // on demand
-  cb->data = op;
+  iocb* cb = &op->iocb_;
+  cb->data = nullptr; // unused
   if (pollFd_ != -1) {
     io_set_eventfd(cb, pollFd_);
   }
   int rc = io_submit(ctx_, 1, &cb);
-  checkKernelError(rc, "AsyncIO: io_submit failed");
+  if (rc < 0) {
+    decrementPending();
+    throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
+  }
+  submitted_++;
   DCHECK_EQ(rc, 1);
   op->start();
-  ++pending_;
 }
 
 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
-  if (UNLIKELY(!ctx_)) {
-    throw std::logic_error("AsyncIO: wait called with no requests");
-  }
-  if (UNLIKELY(pollFd_ != -1)) {
-    throw std::logic_error("AsyncIO: wait not allowed on pollable object");
-  }
-  return doWait(minRequests, pending_);
+  CHECK(ctx_);
+  CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
+  auto p = pending_.load(std::memory_order_acquire);
+  CHECK_LE(minRequests, p);
+  return doWait(WaitType::COMPLETE, minRequests, p, completed_);
+}
+
+Range<AsyncIO::Op**> AsyncIO::cancel() {
+  CHECK(ctx_);
+  auto p = pending_.load(std::memory_order_acquire);
+  return doWait(WaitType::CANCEL, p, p, canceled_);
 }
 
 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
-  if (UNLIKELY(!ctx_)) {
-    throw std::logic_error("AsyncIO: pollCompleted called with no requests");
-  }
-  if (UNLIKELY(pollFd_ == -1)) {
-    throw std::logic_error(
-        "AsyncIO: pollCompleted not allowed on non-pollable object");
-  }
+  CHECK(ctx_);
+  CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
   uint64_t numEvents;
   // This sets the eventFd counter to 0, see
   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
@@ -144,7 +211,7 @@ Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
     rc = ::read(pollFd_, &numEvents, 8);
   } while (rc == -1 && errno == EINTR);
   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
-    return Range<Op**>();  // nothing completed
+    return Range<Op**>(); // nothing completed
   }
   checkUnixError(rc, "AsyncIO: read from event fd failed");
   DCHECK_EQ(rc, 8);
@@ -153,87 +220,191 @@ Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
   DCHECK_LE(numEvents, pending_);
 
   // Don't reap more than numEvents, as we've just reset the counter to 0.
-  return doWait(numEvents, numEvents);
+  return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
 }
 
-Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
-  io_event events[pending_];
-  int count;
-  do {
-    // Wait forever
-    count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
-  } while (count == -EINTR);
-  checkKernelError(count, "AsyncIO: io_getevents failed");
-  DCHECK_GE(count, minRequests);  // the man page says so
-  DCHECK_LE(count, pending_);
-
-  completed_.clear();
-  if (count == 0) {
-    return folly::Range<Op**>();
-  }
+Range<AsyncIO::Op**> AsyncIO::doWait(
+    WaitType type,
+    size_t minRequests,
+    size_t maxRequests,
+    std::vector<Op*>& result) {
+  io_event events[maxRequests];
 
+  // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
+  // WaitType::CANCEL we have to wait for IO completion.
+  size_t count = 0;
+  do {
+    int ret;
+    do {
+      // GOTCHA: io_getevents() may returns less than min_nr results if
+      // interrupted after some events have been read (if before, -EINTR
+      // is returned).
+      ret = io_getevents(
+          ctx_,
+          minRequests - count,
+          maxRequests - count,
+          events + count,
+          /* timeout */ nullptr); // wait forever
+    } while (ret == -EINTR);
+    // Check as may not be able to recover without leaking events.
+    CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
+                     << errnoStr(-ret);
+    count += ret;
+  } while (count < minRequests);
+  DCHECK_LE(count, maxRequests);
+
+  result.clear();
   for (size_t i = 0; i < count; ++i) {
-    Op* op = static_cast<Op*>(events[i].data);
-    DCHECK(op);
-    op->complete(events[i].res);
-    completed_.push_back(op);
+    DCHECK(events[i].obj);
+    Op* op = boost::intrusive::get_parent_from_member(
+        events[i].obj, &AsyncIOOp::iocb_);
+    decrementPending();
+    switch (type) {
+      case WaitType::COMPLETE:
+        op->complete(events[i].res);
+        break;
+      case WaitType::CANCEL:
+        op->cancel();
+        break;
+    }
+    result.push_back(op);
   }
-  pending_ -= count;
 
-  return folly::Range<Op**>(&completed_.front(), count);
+  return range(result);
 }
 
-AsyncIO::Op::Op()
-  : state_(UNINITIALIZED),
-    result_(-EINVAL) {
+AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) : asyncIO_(asyncIO) {}
+
+AsyncIOQueue::~AsyncIOQueue() {
+  CHECK_EQ(asyncIO_->pending(), 0);
 }
 
-void AsyncIO::Op::reset() {
-  if (UNLIKELY(state_ == PENDING)) {
-    throw std::logic_error("AsyncIO: invalid state for reset");
-  }
-  state_ = UNINITIALIZED;
-  result_ = -EINVAL;
+void AsyncIOQueue::submit(AsyncIOOp* op) {
+  submit([op]() { return op; });
 }
 
-AsyncIO::Op::~Op() {
-  CHECK_NE(state_, PENDING);
+void AsyncIOQueue::submit(OpFactory op) {
+  queue_.push_back(op);
+  maybeDequeue();
 }
 
-void AsyncIO::Op::start() {
-  DCHECK_EQ(state_, UNINITIALIZED);
-  state_ = PENDING;
+void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) {
+  maybeDequeue();
 }
 
-void AsyncIO::Op::complete(ssize_t result) {
-  DCHECK_EQ(state_, PENDING);
-  state_ = COMPLETED;
-  result_ = result;
-  onCompleted();
+void AsyncIOQueue::maybeDequeue() {
+  while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
+    auto& opFactory = queue_.front();
+    auto op = opFactory();
+    queue_.pop_front();
+
+    // Interpose our completion callback
+    auto& nextCb = op->notificationCallback();
+    op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
+      this->onCompleted(op2);
+      if (nextCb) {
+        nextCb(op2);
+      }
+    });
+
+    asyncIO_->submit(op);
+  }
 }
 
-void AsyncIO::Op::onCompleted() { }  // default: do nothing
+// debugging helpers:
+
+namespace {
+
+#define X(c) \
+  case c:    \
+    return #c
 
-ssize_t AsyncIO::Op::result() const {
-  if (UNLIKELY(state_ != COMPLETED)) {
-    throw std::logic_error("AsyncIO: Invalid Op state in result");
+const char* asyncIoOpStateToString(AsyncIOOp::State state) {
+  switch (state) {
+    X(AsyncIOOp::State::UNINITIALIZED);
+    X(AsyncIOOp::State::INITIALIZED);
+    X(AsyncIOOp::State::PENDING);
+    X(AsyncIOOp::State::COMPLETED);
+    X(AsyncIOOp::State::CANCELED);
   }
-  return result_;
+  return "<INVALID AsyncIOOp::State>";
+}
+
+const char* iocbCmdToString(short int cmd_short) {
+  io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
+  switch (cmd) {
+    X(IO_CMD_PREAD);
+    X(IO_CMD_PWRITE);
+    X(IO_CMD_FSYNC);
+    X(IO_CMD_FDSYNC);
+    X(IO_CMD_POLL);
+    X(IO_CMD_NOOP);
+    X(IO_CMD_PREADV);
+    X(IO_CMD_PWRITEV);
+  };
+  return "<INVALID io_iocb_cmd>";
 }
 
-CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
+#undef X
+
+std::string fd2name(int fd) {
+  std::string path = folly::to<std::string>("/proc/self/fd/", fd);
+  char link[PATH_MAX];
+  const ssize_t length =
+      std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
+  return path.assign(link, length);
+}
 
-CallbackOp::~CallbackOp() { }
+std::ostream& operator<<(std::ostream& os, const iocb& cb) {
+  os << folly::format(
+      "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
+      cb.data,
+      cb.key,
+      iocbCmdToString(cb.aio_lio_opcode),
+      cb.aio_reqprio,
+      cb.aio_fildes,
+      fd2name(cb.aio_fildes));
+
+  switch (cb.aio_lio_opcode) {
+    case IO_CMD_PREAD:
+    case IO_CMD_PWRITE:
+      os << folly::format(
+          "buf={}, offset={}, nbytes={}, ",
+          cb.u.c.buf,
+          cb.u.c.offset,
+          cb.u.c.nbytes);
+      break;
+    default:
+      os << "[TODO: write debug string for "
+         << iocbCmdToString(cb.aio_lio_opcode) << "] ";
+      break;
+  }
 
-CallbackOp* CallbackOp::make(Callback&& callback) {
-  // Ensure created on the heap
-  return new CallbackOp(std::move(callback));
+  return os;
 }
 
-void CallbackOp::onCompleted() {
-  callback_(result());
-  delete this;
+} // namespace
+
+std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
+  os << "{" << op.state_ << ", ";
+
+  if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
+    os << op.iocb_;
+  }
+
+  if (op.state_ == AsyncIOOp::State::COMPLETED) {
+    os << "result=" << op.result_;
+    if (op.result_ < 0) {
+      os << " (" << errnoStr(-op.result_) << ')';
+    }
+    os << ", ";
+  }
+
+  return os << "}";
 }
 
-}  // namespace folly
+std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
+  return os << asyncIoOpStateToString(state);
+}
 
+} // namespace folly