2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/experimental/io/AsyncIO.h"
19 #include <sys/eventfd.h>
25 #include <boost/intrusive/parent_from_member.hpp>
26 #include <glog/logging.h>
28 #include "folly/Exception.h"
29 #include "folly/Format.h"
30 #include "folly/Likely.h"
31 #include "folly/String.h"
35 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
37 state_(State::UNINITIALIZED),
39 memset(&iocb_, 0, sizeof(iocb_));
42 void AsyncIOOp::reset(NotificationCallback cb) {
43 CHECK_NE(state_, State::PENDING);
45 state_ = State::UNINITIALIZED;
47 memset(&iocb_, 0, sizeof(iocb_));
50 AsyncIOOp::~AsyncIOOp() {
51 CHECK_NE(state_, State::PENDING);
54 void AsyncIOOp::start() {
55 DCHECK_EQ(state_, State::INITIALIZED);
56 state_ = State::PENDING;
59 void AsyncIOOp::complete(ssize_t result) {
60 DCHECK_EQ(state_, State::PENDING);
61 state_ = State::COMPLETED;
68 ssize_t AsyncIOOp::result() const {
69 CHECK_EQ(state_, State::COMPLETED);
73 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
75 io_prep_pread(&iocb_, fd, buf, size, start);
78 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
79 pread(fd, range.begin(), range.size(), start);
82 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
84 io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
87 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
89 io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
92 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
93 pwrite(fd, range.begin(), range.size(), start);
96 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
98 io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
101 void AsyncIOOp::init() {
102 CHECK_EQ(state_, State::UNINITIALIZED);
103 state_ = State::INITIALIZED;
106 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
112 CHECK_GT(capacity_, 0);
113 completed_.reserve(capacity_);
114 if (pollMode == POLLABLE) {
115 pollFd_ = eventfd(0, EFD_NONBLOCK);
116 checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
120 AsyncIO::~AsyncIO() {
121 CHECK_EQ(pending_, 0);
123 int rc = io_queue_release(ctx_);
124 CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
127 CHECK_ERR(close(pollFd_));
131 void AsyncIO::decrementPending() {
132 ssize_t p = pending_.fetch_add(-1, std::memory_order_acq_rel);
136 void AsyncIO::initializeContext() {
137 if (!ctxSet_.load(std::memory_order_acquire)) {
138 std::lock_guard<std::mutex> lock(initMutex_);
139 if (!ctxSet_.load(std::memory_order_relaxed)) {
140 int rc = io_queue_init(capacity_, &ctx_);
141 // returns negative errno
142 checkKernelError(rc, "AsyncIO: io_queue_init failed");
144 ctxSet_.store(true, std::memory_order_release);
149 void AsyncIO::submit(Op* op) {
150 CHECK_EQ(op->state(), Op::State::INITIALIZED);
151 initializeContext(); // on demand
153 // We can increment past capacity, but we'll clean up after ourselves.
154 ssize_t p = pending_.fetch_add(1, std::memory_order_acq_rel);
155 if (p >= capacity_) {
157 throw std::range_error("AsyncIO: too many pending requests");
159 iocb* cb = &op->iocb_;
160 cb->data = nullptr; // unused
162 io_set_eventfd(cb, pollFd_);
164 int rc = io_submit(ctx_, 1, &cb);
167 throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
173 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
175 CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
176 ssize_t p = pending_.load(std::memory_order_acquire);
177 CHECK_LE(minRequests, p);
178 return doWait(minRequests, p);
181 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
183 CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
185 // This sets the eventFd counter to 0, see
186 // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
189 rc = ::read(pollFd_, &numEvents, 8);
190 } while (rc == -1 && errno == EINTR);
191 if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
192 return Range<Op**>(); // nothing completed
194 checkUnixError(rc, "AsyncIO: read from event fd failed");
197 DCHECK_GT(numEvents, 0);
198 DCHECK_LE(numEvents, pending_);
200 // Don't reap more than numEvents, as we've just reset the counter to 0.
201 return doWait(numEvents, numEvents);
204 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
205 io_event events[maxRequests];
209 count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
210 } while (count == -EINTR);
211 checkKernelError(count, "AsyncIO: io_getevents failed");
212 DCHECK_GE(count, minRequests); // the man page says so
213 DCHECK_LE(count, maxRequests);
217 return folly::Range<Op**>();
220 for (size_t i = 0; i < count; ++i) {
221 DCHECK(events[i].obj);
222 Op* op = boost::intrusive::get_parent_from_member(
223 events[i].obj, &AsyncIOOp::iocb_);
225 op->complete(events[i].res);
226 completed_.push_back(op);
229 return folly::Range<Op**>(&completed_.front(), count);
232 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
233 : asyncIO_(asyncIO) {
236 AsyncIOQueue::~AsyncIOQueue() {
237 CHECK_EQ(asyncIO_->pending(), 0);
240 void AsyncIOQueue::submit(AsyncIOOp* op) {
241 submit([op]() { return op; });
244 void AsyncIOQueue::submit(OpFactory op) {
245 queue_.push_back(op);
249 void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
253 void AsyncIOQueue::maybeDequeue() {
254 while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
255 auto& opFactory = queue_.front();
256 auto op = opFactory();
259 // Interpose our completion callback
260 auto& nextCb = op->notificationCallback();
261 op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
262 this->onCompleted(op);
263 if (nextCb) nextCb(op);
266 asyncIO_->submit(op);
270 // debugging helpers:
274 #define X(c) case c: return #c
276 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
278 X(AsyncIOOp::State::UNINITIALIZED);
279 X(AsyncIOOp::State::INITIALIZED);
280 X(AsyncIOOp::State::PENDING);
281 X(AsyncIOOp::State::COMPLETED);
283 return "<INVALID AsyncIOOp::State>";
286 const char* iocbCmdToString(short int cmd_short) {
287 io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
298 return "<INVALID io_iocb_cmd>";
303 std::string fd2name(int fd) {
304 std::string path = folly::to<std::string>("/proc/self/fd/", fd);
306 const ssize_t length =
307 std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
308 return path.assign(link, length);
311 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
313 "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
314 cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
315 cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
317 switch (cb.aio_lio_opcode) {
320 os << folly::format("buf={}, off={}, size={}, ",
321 cb.u.c.buf, cb.u.c.nbytes, cb.u.c.offset);
323 os << "[TODO: write debug string for "
324 << iocbCmdToString(cb.aio_lio_opcode) << "] ";
330 } // anonymous namespace
332 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
333 os << "{" << op.state_ << ", ";
335 if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
339 if (op.state_ == AsyncIOOp::State::COMPLETED) {
340 os << "result=" << op.result_ << ", ";
346 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
347 return os << asyncIoOpStateToString(state);