/*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*/
-#include "folly/experimental/io/AsyncIO.h"
+#include <folly/experimental/io/AsyncIO.h>
-#include <unistd.h>
+#include <sys/eventfd.h>
#include <cerrno>
+#include <ostream>
+#include <stdexcept>
#include <string>
#include <boost/intrusive/parent_from_member.hpp>
#include <glog/logging.h>
-#include "folly/Exception.h"
-#include "folly/Format.h"
-#include "folly/Likely.h"
-#include "folly/String.h"
-#include "folly/eventfd.h"
+#include <folly/Exception.h>
+#include <folly/Format.h>
+#include <folly/Likely.h>
+#include <folly/String.h>
+#include <folly/portability/Unistd.h>
namespace folly {
AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
: ctx_(0),
+ ctxSet_(false),
pending_(0),
+ submitted_(0),
capacity_(capacity),
pollFd_(-1) {
CHECK_GT(capacity_, 0);
}
}
+void AsyncIO::decrementPending() {
+ auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
+ DCHECK_GE(p, 1);
+}
+
void AsyncIO::initializeContext() {
- if (!ctx_) {
- int rc = io_queue_init(capacity_, &ctx_);
- // returns negative errno
- checkKernelError(rc, "AsyncIO: io_queue_init failed");
- DCHECK(ctx_);
+ if (!ctxSet_.load(std::memory_order_acquire)) {
+ std::lock_guard<std::mutex> lock(initMutex_);
+ if (!ctxSet_.load(std::memory_order_relaxed)) {
+ int rc = io_queue_init(capacity_, &ctx_);
+ // returns negative errno
+ if (rc == -EAGAIN) {
+ long aio_nr, aio_max;
+ std::unique_ptr<FILE, int(*)(FILE*)>
+ fp(fopen("/proc/sys/fs/aio-nr", "r"), fclose);
+ PCHECK(fp);
+ CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
+
+ std::unique_ptr<FILE, int(*)(FILE*)>
+ aio_max_fp(fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
+ PCHECK(aio_max_fp);
+ CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
+
+ LOG(ERROR) << "No resources for requested capacity of " << capacity_;
+ LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
+ }
+
+ checkKernelError(rc, "AsyncIO: io_queue_init failed");
+ DCHECK(ctx_);
+ ctxSet_.store(true, std::memory_order_release);
+ }
}
}
void AsyncIO::submit(Op* op) {
CHECK_EQ(op->state(), Op::State::INITIALIZED);
- CHECK_LT(pending_, capacity_) << "too many pending requests";
initializeContext(); // on demand
+
+ // We can increment past capacity, but we'll clean up after ourselves.
+ auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
+ if (p >= capacity_) {
+ decrementPending();
+ throw std::range_error("AsyncIO: too many pending requests");
+ }
iocb* cb = &op->iocb_;
cb->data = nullptr; // unused
if (pollFd_ != -1) {
io_set_eventfd(cb, pollFd_);
}
int rc = io_submit(ctx_, 1, &cb);
- checkKernelError(rc, "AsyncIO: io_submit failed");
+ if (rc < 0) {
+ decrementPending();
+ throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
+ }
+ submitted_++;
DCHECK_EQ(rc, 1);
op->start();
- ++pending_;
}
Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
CHECK(ctx_);
CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
- CHECK_LE(minRequests, pending_);
- return doWait(minRequests, pending_);
+ auto p = pending_.load(std::memory_order_acquire);
+ CHECK_LE(minRequests, p);
+ return doWait(minRequests, p);
}
Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
}
Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
- io_event events[pending_];
- int count;
+ io_event events[maxRequests];
+
+ size_t count = 0;
do {
- // Wait forever
- count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
- } while (count == -EINTR);
- checkKernelError(count, "AsyncIO: io_getevents failed");
- DCHECK_GE(count, minRequests); // the man page says so
- DCHECK_LE(count, pending_);
+ int ret;
+ do {
+ // GOTCHA: io_getevents() may returns less than min_nr results if
+ // interrupted after some events have been read (if before, -EINTR
+ // is returned).
+ ret = io_getevents(ctx_,
+ minRequests - count,
+ maxRequests - count,
+ events + count,
+ /* timeout */ nullptr); // wait forever
+ } while (ret == -EINTR);
+ // Check as may not be able to recover without leaking events.
+ CHECK_GE(ret, 0)
+ << "AsyncIO: io_getevents failed with error " << errnoStr(-ret);
+ count += ret;
+ } while (count < minRequests);
+ DCHECK_LE(count, maxRequests);
completed_.clear();
if (count == 0) {
DCHECK(events[i].obj);
Op* op = boost::intrusive::get_parent_from_member(
events[i].obj, &AsyncIOOp::iocb_);
- --pending_;
+ decrementPending();
op->complete(events[i].res);
completed_.push_back(op);
}
maybeDequeue();
}
-void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
- maybeDequeue();
-}
+void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
void AsyncIOQueue::maybeDequeue() {
while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
// Interpose our completion callback
auto& nextCb = op->notificationCallback();
- op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
- this->onCompleted(op);
- if (nextCb) nextCb(op);
+ op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
+ this->onCompleted(op2);
+ if (nextCb) nextCb(op2);
});
asyncIO_->submit(op);
switch (cb.aio_lio_opcode) {
case IO_CMD_PREAD:
case IO_CMD_PWRITE:
- os << folly::format("buf={}, off={}, size={}, ",
- cb.u.c.buf, cb.u.c.nbytes, cb.u.c.offset);
+ os << folly::format("buf={}, offset={}, nbytes={}, ",
+ cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
+ break;
default:
os << "[TODO: write debug string for "
<< iocbCmdToString(cb.aio_lio_opcode) << "] ";
+ break;
}
return os;
}
if (op.state_ == AsyncIOOp::State::COMPLETED) {
- os << "result=" << op.result_ << ", ";
+ os << "result=" << op.result_;
+ if (op.result_ < 0) {
+ os << " (" << errnoStr(-op.result_) << ')';
+ }
+ os << ", ";
}
return os << "}";
}
} // namespace folly
-