2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/experimental/io/AsyncIO.h"
21 #include <glog/logging.h>
23 #include "folly/Exception.h"
24 #include "folly/Likely.h"
25 #include "folly/String.h"
26 #include "folly/eventfd.h"
30 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
35 if (UNLIKELY(capacity_ == 0)) {
36 throw std::out_of_range("AsyncIO: capacity must not be 0");
38 completed_.reserve(capacity_);
39 if (pollMode == POLLABLE) {
40 pollFd_ = eventfd(0, EFD_NONBLOCK);
41 checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
46 CHECK_EQ(pending_, 0);
48 int rc = io_queue_release(ctx_);
49 CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
52 CHECK_ERR(close(pollFd_));
56 void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
58 io_prep_pread(&cb, fd, buf, size, start);
62 void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
64 pread(op, fd, range.begin(), range.size(), start);
67 void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
70 io_prep_preadv(&cb, fd, iov, iovcnt, start);
74 void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
77 io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
81 void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
83 pwrite(op, fd, range.begin(), range.size(), start);
86 void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
89 io_prep_pwritev(&cb, fd, iov, iovcnt, start);
93 void AsyncIO::initializeContext() {
95 int rc = io_queue_init(capacity_, &ctx_);
96 // returns negative errno
97 checkKernelError(rc, "AsyncIO: io_queue_init failed");
102 void AsyncIO::submit(Op* op, iocb* cb) {
103 if (UNLIKELY(pending_ >= capacity_)) {
104 throw std::out_of_range("AsyncIO: too many pending requests");
106 if (UNLIKELY(op->state() != Op::UNINITIALIZED)) {
107 throw std::logic_error("AsyncIO: Invalid Op state in submit");
109 initializeContext(); // on demand
112 io_set_eventfd(cb, pollFd_);
114 int rc = io_submit(ctx_, 1, &cb);
115 checkKernelError(rc, "AsyncIO: io_submit failed");
121 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
122 if (UNLIKELY(!ctx_)) {
123 throw std::logic_error("AsyncIO: wait called with no requests");
125 if (UNLIKELY(pollFd_ != -1)) {
126 throw std::logic_error("AsyncIO: wait not allowed on pollable object");
128 return doWait(minRequests, pending_);
131 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
132 if (UNLIKELY(!ctx_)) {
133 throw std::logic_error("AsyncIO: pollCompleted called with no requests");
135 if (UNLIKELY(pollFd_ == -1)) {
136 throw std::logic_error(
137 "AsyncIO: pollCompleted not allowed on non-pollable object");
140 // This sets the eventFd counter to 0, see
141 // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
144 rc = ::read(pollFd_, &numEvents, 8);
145 } while (rc == -1 && errno == EINTR);
146 if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
147 return Range<Op**>(); // nothing completed
149 checkUnixError(rc, "AsyncIO: read from event fd failed");
152 DCHECK_GT(numEvents, 0);
153 DCHECK_LE(numEvents, pending_);
155 // Don't reap more than numEvents, as we've just reset the counter to 0.
156 return doWait(numEvents, numEvents);
159 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
160 io_event events[pending_];
164 count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
165 } while (count == -EINTR);
166 checkKernelError(count, "AsyncIO: io_getevents failed");
167 DCHECK_GE(count, minRequests); // the man page says so
168 DCHECK_LE(count, pending_);
172 return folly::Range<Op**>();
175 for (size_t i = 0; i < count; ++i) {
176 Op* op = static_cast<Op*>(events[i].data);
178 op->complete(events[i].res);
179 completed_.push_back(op);
183 return folly::Range<Op**>(&completed_.front(), count);
187 : state_(UNINITIALIZED),
191 void AsyncIO::Op::reset() {
192 if (UNLIKELY(state_ == PENDING)) {
193 throw std::logic_error("AsyncIO: invalid state for reset");
195 state_ = UNINITIALIZED;
200 CHECK_NE(state_, PENDING);
203 void AsyncIO::Op::start() {
204 DCHECK_EQ(state_, UNINITIALIZED);
208 void AsyncIO::Op::complete(ssize_t result) {
209 DCHECK_EQ(state_, PENDING);
215 void AsyncIO::Op::onCompleted() { } // default: do nothing
217 ssize_t AsyncIO::Op::result() const {
218 if (UNLIKELY(state_ != COMPLETED)) {
219 throw std::logic_error("AsyncIO: Invalid Op state in result");
224 CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
226 CallbackOp::~CallbackOp() { }
228 CallbackOp* CallbackOp::make(Callback&& callback) {
229 // Ensure created on the heap
230 return new CallbackOp(std::move(callback));
233 void CallbackOp::onCompleted() {