2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/io/AsyncIO.h>
19 #include <sys/eventfd.h>
25 #include <boost/intrusive/parent_from_member.hpp>
26 #include <glog/logging.h>
28 #include <folly/Exception.h>
29 #include <folly/Format.h>
30 #include <folly/Likely.h>
31 #include <folly/String.h>
32 #include <folly/portability/Unistd.h>
36 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
38 state_(State::UNINITIALIZED),
40 memset(&iocb_, 0, sizeof(iocb_));
43 void AsyncIOOp::reset(NotificationCallback cb) {
44 CHECK_NE(state_, State::PENDING);
46 state_ = State::UNINITIALIZED;
48 memset(&iocb_, 0, sizeof(iocb_));
51 AsyncIOOp::~AsyncIOOp() {
52 CHECK_NE(state_, State::PENDING);
55 void AsyncIOOp::start() {
56 DCHECK_EQ(state_, State::INITIALIZED);
57 state_ = State::PENDING;
60 void AsyncIOOp::complete(ssize_t result) {
61 DCHECK_EQ(state_, State::PENDING);
62 state_ = State::COMPLETED;
69 void AsyncIOOp::cancel() {
70 DCHECK_EQ(state_, State::PENDING);
71 state_ = State::CANCELED;
74 ssize_t AsyncIOOp::result() const {
75 CHECK_EQ(state_, State::COMPLETED);
79 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
81 io_prep_pread(&iocb_, fd, buf, size, start);
84 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
85 pread(fd, range.begin(), range.size(), start);
88 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
90 io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
93 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
95 io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
98 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
99 pwrite(fd, range.begin(), range.size(), start);
102 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
104 io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
107 void AsyncIOOp::init() {
108 CHECK_EQ(state_, State::UNINITIALIZED);
109 state_ = State::INITIALIZED;
112 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
113 CHECK_GT(capacity_, 0);
114 completed_.reserve(capacity_);
115 if (pollMode == POLLABLE) {
116 pollFd_ = eventfd(0, EFD_NONBLOCK);
117 checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
121 AsyncIO::~AsyncIO() {
122 CHECK_EQ(pending_, 0);
124 int rc = io_queue_release(ctx_);
125 CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
128 CHECK_ERR(close(pollFd_));
132 void AsyncIO::decrementPending() {
133 auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
137 void AsyncIO::initializeContext() {
138 if (!ctxSet_.load(std::memory_order_acquire)) {
139 std::lock_guard<std::mutex> lock(initMutex_);
140 if (!ctxSet_.load(std::memory_order_relaxed)) {
141 int rc = io_queue_init(capacity_, &ctx_);
142 // returns negative errno
144 long aio_nr, aio_max;
145 std::unique_ptr<FILE, int(*)(FILE*)>
146 fp(fopen("/proc/sys/fs/aio-nr", "r"), fclose);
148 CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
150 std::unique_ptr<FILE, int(*)(FILE*)>
151 aio_max_fp(fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
153 CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
155 LOG(ERROR) << "No resources for requested capacity of " << capacity_;
156 LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
159 checkKernelError(rc, "AsyncIO: io_queue_init failed");
161 ctxSet_.store(true, std::memory_order_release);
166 void AsyncIO::submit(Op* op) {
167 CHECK_EQ(op->state(), Op::State::INITIALIZED);
168 initializeContext(); // on demand
170 // We can increment past capacity, but we'll clean up after ourselves.
171 auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
172 if (p >= capacity_) {
174 throw std::range_error("AsyncIO: too many pending requests");
176 iocb* cb = &op->iocb_;
177 cb->data = nullptr; // unused
179 io_set_eventfd(cb, pollFd_);
181 int rc = io_submit(ctx_, 1, &cb);
184 throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
191 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
193 CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
194 auto p = pending_.load(std::memory_order_acquire);
195 CHECK_LE(minRequests, p);
196 return doWait(WaitType::COMPLETE, minRequests, p, completed_);
199 Range<AsyncIO::Op**> AsyncIO::cancel() {
201 auto p = pending_.load(std::memory_order_acquire);
202 return doWait(WaitType::CANCEL, p, p, canceled_);
205 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
207 CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
209 // This sets the eventFd counter to 0, see
210 // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
213 rc = ::read(pollFd_, &numEvents, 8);
214 } while (rc == -1 && errno == EINTR);
215 if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
216 return Range<Op**>(); // nothing completed
218 checkUnixError(rc, "AsyncIO: read from event fd failed");
221 DCHECK_GT(numEvents, 0);
222 DCHECK_LE(numEvents, pending_);
224 // Don't reap more than numEvents, as we've just reset the counter to 0.
225 return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
228 Range<AsyncIO::Op**> AsyncIO::doWait(
232 std::vector<Op*>& result) {
233 io_event events[maxRequests];
235 // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
236 // WaitType::CANCEL we have to wait for IO completion.
241 // GOTCHA: io_getevents() may returns less than min_nr results if
242 // interrupted after some events have been read (if before, -EINTR
244 ret = io_getevents(ctx_,
248 /* timeout */ nullptr); // wait forever
249 } while (ret == -EINTR);
250 // Check as may not be able to recover without leaking events.
251 CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
254 } while (count < minRequests);
255 DCHECK_LE(count, maxRequests);
258 for (size_t i = 0; i < count; ++i) {
259 DCHECK(events[i].obj);
260 Op* op = boost::intrusive::get_parent_from_member(
261 events[i].obj, &AsyncIOOp::iocb_);
264 case WaitType::COMPLETE:
265 op->complete(events[i].res);
267 case WaitType::CANCEL:
271 result.push_back(op);
274 return range(result);
277 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
278 : asyncIO_(asyncIO) {
281 AsyncIOQueue::~AsyncIOQueue() {
282 CHECK_EQ(asyncIO_->pending(), 0);
285 void AsyncIOQueue::submit(AsyncIOOp* op) {
286 submit([op]() { return op; });
289 void AsyncIOQueue::submit(OpFactory op) {
290 queue_.push_back(op);
294 void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
296 void AsyncIOQueue::maybeDequeue() {
297 while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
298 auto& opFactory = queue_.front();
299 auto op = opFactory();
302 // Interpose our completion callback
303 auto& nextCb = op->notificationCallback();
304 op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
305 this->onCompleted(op2);
306 if (nextCb) nextCb(op2);
309 asyncIO_->submit(op);
313 // debugging helpers:
317 #define X(c) case c: return #c
319 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
321 X(AsyncIOOp::State::UNINITIALIZED);
322 X(AsyncIOOp::State::INITIALIZED);
323 X(AsyncIOOp::State::PENDING);
324 X(AsyncIOOp::State::COMPLETED);
325 X(AsyncIOOp::State::CANCELED);
327 return "<INVALID AsyncIOOp::State>";
330 const char* iocbCmdToString(short int cmd_short) {
331 io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
342 return "<INVALID io_iocb_cmd>";
347 std::string fd2name(int fd) {
348 std::string path = folly::to<std::string>("/proc/self/fd/", fd);
350 const ssize_t length =
351 std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
352 return path.assign(link, length);
355 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
357 "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
358 cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
359 cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
361 switch (cb.aio_lio_opcode) {
364 os << folly::format("buf={}, offset={}, nbytes={}, ",
365 cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
368 os << "[TODO: write debug string for "
369 << iocbCmdToString(cb.aio_lio_opcode) << "] ";
376 } // anonymous namespace
378 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
379 os << "{" << op.state_ << ", ";
381 if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
385 if (op.state_ == AsyncIOOp::State::COMPLETED) {
386 os << "result=" << op.result_;
387 if (op.result_ < 0) {
388 os << " (" << errnoStr(-op.result_) << ')';
396 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
397 return os << asyncIoOpStateToString(state);