Make folly::AsyncIO thread safe
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/AsyncIO.h"
18
19 #include <sys/eventfd.h>
20 #include <unistd.h>
21 #include <cerrno>
22 #include <stdexcept>
23 #include <string>
24
25 #include <boost/intrusive/parent_from_member.hpp>
26 #include <glog/logging.h>
27
28 #include "folly/Exception.h"
29 #include "folly/Format.h"
30 #include "folly/Likely.h"
31 #include "folly/String.h"
32
33 namespace folly {
34
35 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
36   : cb_(std::move(cb)),
37     state_(State::UNINITIALIZED),
38     result_(-EINVAL) {
39   memset(&iocb_, 0, sizeof(iocb_));
40 }
41
42 void AsyncIOOp::reset(NotificationCallback cb) {
43   CHECK_NE(state_, State::PENDING);
44   cb_ = std::move(cb);
45   state_ = State::UNINITIALIZED;
46   result_ = -EINVAL;
47   memset(&iocb_, 0, sizeof(iocb_));
48 }
49
50 AsyncIOOp::~AsyncIOOp() {
51   CHECK_NE(state_, State::PENDING);
52 }
53
54 void AsyncIOOp::start() {
55   DCHECK_EQ(state_, State::INITIALIZED);
56   state_ = State::PENDING;
57 }
58
59 void AsyncIOOp::complete(ssize_t result) {
60   DCHECK_EQ(state_, State::PENDING);
61   state_ = State::COMPLETED;
62   result_ = result;
63   if (cb_) {
64     cb_(this);
65   }
66 }
67
68 ssize_t AsyncIOOp::result() const {
69   CHECK_EQ(state_, State::COMPLETED);
70   return result_;
71 }
72
73 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
74   init();
75   io_prep_pread(&iocb_, fd, buf, size, start);
76 }
77
78 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
79   pread(fd, range.begin(), range.size(), start);
80 }
81
82 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
83   init();
84   io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
85 }
86
87 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
88   init();
89   io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
90 }
91
92 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
93   pwrite(fd, range.begin(), range.size(), start);
94 }
95
96 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
97   init();
98   io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
99 }
100
101 void AsyncIOOp::init() {
102   CHECK_EQ(state_, State::UNINITIALIZED);
103   state_ = State::INITIALIZED;
104 }
105
106 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
107   : ctx_(0),
108     ctxSet_(false),
109     pending_(0),
110     capacity_(capacity),
111     pollFd_(-1) {
112   CHECK_GT(capacity_, 0);
113   completed_.reserve(capacity_);
114   if (pollMode == POLLABLE) {
115     pollFd_ = eventfd(0, EFD_NONBLOCK);
116     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
117   }
118 }
119
120 AsyncIO::~AsyncIO() {
121   CHECK_EQ(pending_, 0);
122   if (ctx_) {
123     int rc = io_queue_release(ctx_);
124     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
125   }
126   if (pollFd_ != -1) {
127     CHECK_ERR(close(pollFd_));
128   }
129 }
130
131 void AsyncIO::decrementPending() {
132   ssize_t p = pending_.fetch_add(-1, std::memory_order_acq_rel);
133   DCHECK_GE(p, 1);
134 }
135
136 void AsyncIO::initializeContext() {
137   if (!ctxSet_.load(std::memory_order_acquire)) {
138     std::lock_guard<std::mutex> lock(initMutex_);
139     if (!ctxSet_.load(std::memory_order_relaxed)) {
140       int rc = io_queue_init(capacity_, &ctx_);
141       // returns negative errno
142       checkKernelError(rc, "AsyncIO: io_queue_init failed");
143       DCHECK(ctx_);
144       ctxSet_.store(true, std::memory_order_release);
145     }
146   }
147 }
148
149 void AsyncIO::submit(Op* op) {
150   CHECK_EQ(op->state(), Op::State::INITIALIZED);
151   initializeContext();  // on demand
152
153   // We can increment past capacity, but we'll clean up after ourselves.
154   ssize_t p = pending_.fetch_add(1, std::memory_order_acq_rel);
155   if (p >= capacity_) {
156     decrementPending();
157     throw std::range_error("AsyncIO: too many pending requests");
158   }
159   iocb* cb = &op->iocb_;
160   cb->data = nullptr;  // unused
161   if (pollFd_ != -1) {
162     io_set_eventfd(cb, pollFd_);
163   }
164   int rc = io_submit(ctx_, 1, &cb);
165   if (rc < 0) {
166     decrementPending();
167     throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
168   }
169   DCHECK_EQ(rc, 1);
170   op->start();
171 }
172
173 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
174   CHECK(ctx_);
175   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
176   ssize_t p = pending_.load(std::memory_order_acquire);
177   CHECK_LE(minRequests, p);
178   return doWait(minRequests, p);
179 }
180
181 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
182   CHECK(ctx_);
183   CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
184   uint64_t numEvents;
185   // This sets the eventFd counter to 0, see
186   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
187   ssize_t rc;
188   do {
189     rc = ::read(pollFd_, &numEvents, 8);
190   } while (rc == -1 && errno == EINTR);
191   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
192     return Range<Op**>();  // nothing completed
193   }
194   checkUnixError(rc, "AsyncIO: read from event fd failed");
195   DCHECK_EQ(rc, 8);
196
197   DCHECK_GT(numEvents, 0);
198   DCHECK_LE(numEvents, pending_);
199
200   // Don't reap more than numEvents, as we've just reset the counter to 0.
201   return doWait(numEvents, numEvents);
202 }
203
204 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
205   io_event events[maxRequests];
206   int count;
207   do {
208     // Wait forever
209     count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
210   } while (count == -EINTR);
211   checkKernelError(count, "AsyncIO: io_getevents failed");
212   DCHECK_GE(count, minRequests);  // the man page says so
213   DCHECK_LE(count, maxRequests);
214
215   completed_.clear();
216   if (count == 0) {
217     return folly::Range<Op**>();
218   }
219
220   for (size_t i = 0; i < count; ++i) {
221     DCHECK(events[i].obj);
222     Op* op = boost::intrusive::get_parent_from_member(
223         events[i].obj, &AsyncIOOp::iocb_);
224     decrementPending();
225     op->complete(events[i].res);
226     completed_.push_back(op);
227   }
228
229   return folly::Range<Op**>(&completed_.front(), count);
230 }
231
232 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
233   : asyncIO_(asyncIO) {
234 }
235
236 AsyncIOQueue::~AsyncIOQueue() {
237   CHECK_EQ(asyncIO_->pending(), 0);
238 }
239
240 void AsyncIOQueue::submit(AsyncIOOp* op) {
241   submit([op]() { return op; });
242 }
243
244 void AsyncIOQueue::submit(OpFactory op) {
245   queue_.push_back(op);
246   maybeDequeue();
247 }
248
249 void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
250   maybeDequeue();
251 }
252
253 void AsyncIOQueue::maybeDequeue() {
254   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
255     auto& opFactory = queue_.front();
256     auto op = opFactory();
257     queue_.pop_front();
258
259     // Interpose our completion callback
260     auto& nextCb = op->notificationCallback();
261     op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
262       this->onCompleted(op);
263       if (nextCb) nextCb(op);
264     });
265
266     asyncIO_->submit(op);
267   }
268 }
269
270 // debugging helpers:
271
272 namespace {
273
274 #define X(c) case c: return #c
275
276 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
277   switch (state) {
278     X(AsyncIOOp::State::UNINITIALIZED);
279     X(AsyncIOOp::State::INITIALIZED);
280     X(AsyncIOOp::State::PENDING);
281     X(AsyncIOOp::State::COMPLETED);
282   }
283   return "<INVALID AsyncIOOp::State>";
284 }
285
286 const char* iocbCmdToString(short int cmd_short) {
287   io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
288   switch (cmd) {
289     X(IO_CMD_PREAD);
290     X(IO_CMD_PWRITE);
291     X(IO_CMD_FSYNC);
292     X(IO_CMD_FDSYNC);
293     X(IO_CMD_POLL);
294     X(IO_CMD_NOOP);
295     X(IO_CMD_PREADV);
296     X(IO_CMD_PWRITEV);
297   };
298   return "<INVALID io_iocb_cmd>";
299 }
300
301 #undef X
302
303 std::string fd2name(int fd) {
304   std::string path = folly::to<std::string>("/proc/self/fd/", fd);
305   char link[PATH_MAX];
306   const ssize_t length =
307     std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
308   return path.assign(link, length);
309 }
310
311 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
312   os << folly::format(
313     "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
314     cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
315     cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
316
317   switch (cb.aio_lio_opcode) {
318     case IO_CMD_PREAD:
319     case IO_CMD_PWRITE:
320       os << folly::format("buf={}, off={}, size={}, ",
321                           cb.u.c.buf, cb.u.c.nbytes, cb.u.c.offset);
322     default:
323       os << "[TODO: write debug string for "
324          << iocbCmdToString(cb.aio_lio_opcode) << "] ";
325   }
326
327   return os;
328 }
329
330 }  // anonymous namespace
331
332 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
333   os << "{" << op.state_ << ", ";
334
335   if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
336     os << op.iocb_;
337   }
338
339   if (op.state_ == AsyncIOOp::State::COMPLETED) {
340     os << "result=" << op.result_ << ", ";
341   }
342
343   return os << "}";
344 }
345
346 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
347   return os << asyncIoOpStateToString(state);
348 }
349
350 }  // namespace folly
351