59dc866c2d0dbe66f2585674fdd4368b0d8c4c31
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/io/AsyncIO.h>
18
19 #include <sys/eventfd.h>
20 #include <cerrno>
21 #include <ostream>
22 #include <stdexcept>
23 #include <string>
24
25 #include <boost/intrusive/parent_from_member.hpp>
26 #include <glog/logging.h>
27
28 #include <folly/Exception.h>
29 #include <folly/Format.h>
30 #include <folly/Likely.h>
31 #include <folly/String.h>
32 #include <folly/portability/Unistd.h>
33
34 namespace folly {
35
36 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
37     : cb_(std::move(cb)), state_(State::UNINITIALIZED), result_(-EINVAL) {
38   memset(&iocb_, 0, sizeof(iocb_));
39 }
40
41 void AsyncIOOp::reset(NotificationCallback cb) {
42   CHECK_NE(state_, State::PENDING);
43   cb_ = std::move(cb);
44   state_ = State::UNINITIALIZED;
45   result_ = -EINVAL;
46   memset(&iocb_, 0, sizeof(iocb_));
47 }
48
49 AsyncIOOp::~AsyncIOOp() {
50   CHECK_NE(state_, State::PENDING);
51 }
52
53 void AsyncIOOp::start() {
54   DCHECK_EQ(state_, State::INITIALIZED);
55   state_ = State::PENDING;
56 }
57
58 void AsyncIOOp::complete(ssize_t result) {
59   DCHECK_EQ(state_, State::PENDING);
60   state_ = State::COMPLETED;
61   result_ = result;
62   if (cb_) {
63     cb_(this);
64   }
65 }
66
67 void AsyncIOOp::cancel() {
68   DCHECK_EQ(state_, State::PENDING);
69   state_ = State::CANCELED;
70 }
71
72 ssize_t AsyncIOOp::result() const {
73   CHECK_EQ(state_, State::COMPLETED);
74   return result_;
75 }
76
77 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
78   init();
79   io_prep_pread(&iocb_, fd, buf, size, start);
80 }
81
82 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
83   pread(fd, range.begin(), range.size(), start);
84 }
85
86 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
87   init();
88   io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
89 }
90
91 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
92   init();
93   io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
94 }
95
96 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
97   pwrite(fd, range.begin(), range.size(), start);
98 }
99
100 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
101   init();
102   io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
103 }
104
105 void AsyncIOOp::init() {
106   CHECK_EQ(state_, State::UNINITIALIZED);
107   state_ = State::INITIALIZED;
108 }
109
110 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
111   CHECK_GT(capacity_, 0);
112   completed_.reserve(capacity_);
113   if (pollMode == POLLABLE) {
114     pollFd_ = eventfd(0, EFD_NONBLOCK);
115     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
116   }
117 }
118
119 AsyncIO::~AsyncIO() {
120   CHECK_EQ(pending_, 0);
121   if (ctx_) {
122     int rc = io_queue_release(ctx_);
123     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
124   }
125   if (pollFd_ != -1) {
126     CHECK_ERR(close(pollFd_));
127   }
128 }
129
130 void AsyncIO::decrementPending() {
131   auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
132   DCHECK_GE(p, 1);
133 }
134
135 void AsyncIO::initializeContext() {
136   if (!ctxSet_.load(std::memory_order_acquire)) {
137     std::lock_guard<std::mutex> lock(initMutex_);
138     if (!ctxSet_.load(std::memory_order_relaxed)) {
139       int rc = io_queue_init(capacity_, &ctx_);
140       // returns negative errno
141       if (rc == -EAGAIN) {
142         long aio_nr, aio_max;
143         std::unique_ptr<FILE, int (*)(FILE*)> fp(
144             fopen("/proc/sys/fs/aio-nr", "r"), fclose);
145         PCHECK(fp);
146         CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
147
148         std::unique_ptr<FILE, int (*)(FILE*)> aio_max_fp(
149             fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
150         PCHECK(aio_max_fp);
151         CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
152
153         LOG(ERROR) << "No resources for requested capacity of " << capacity_;
154         LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
155       }
156
157       checkKernelError(rc, "AsyncIO: io_queue_init failed");
158       DCHECK(ctx_);
159       ctxSet_.store(true, std::memory_order_release);
160     }
161   }
162 }
163
164 void AsyncIO::submit(Op* op) {
165   CHECK_EQ(op->state(), Op::State::INITIALIZED);
166   initializeContext(); // on demand
167
168   // We can increment past capacity, but we'll clean up after ourselves.
169   auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
170   if (p >= capacity_) {
171     decrementPending();
172     throw std::range_error("AsyncIO: too many pending requests");
173   }
174   iocb* cb = &op->iocb_;
175   cb->data = nullptr; // unused
176   if (pollFd_ != -1) {
177     io_set_eventfd(cb, pollFd_);
178   }
179   int rc = io_submit(ctx_, 1, &cb);
180   if (rc < 0) {
181     decrementPending();
182     throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
183   }
184   submitted_++;
185   DCHECK_EQ(rc, 1);
186   op->start();
187 }
188
189 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
190   CHECK(ctx_);
191   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
192   auto p = pending_.load(std::memory_order_acquire);
193   CHECK_LE(minRequests, p);
194   return doWait(WaitType::COMPLETE, minRequests, p, completed_);
195 }
196
197 Range<AsyncIO::Op**> AsyncIO::cancel() {
198   CHECK(ctx_);
199   auto p = pending_.load(std::memory_order_acquire);
200   return doWait(WaitType::CANCEL, p, p, canceled_);
201 }
202
203 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
204   CHECK(ctx_);
205   CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
206   uint64_t numEvents;
207   // This sets the eventFd counter to 0, see
208   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
209   ssize_t rc;
210   do {
211     rc = ::read(pollFd_, &numEvents, 8);
212   } while (rc == -1 && errno == EINTR);
213   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
214     return Range<Op**>(); // nothing completed
215   }
216   checkUnixError(rc, "AsyncIO: read from event fd failed");
217   DCHECK_EQ(rc, 8);
218
219   DCHECK_GT(numEvents, 0);
220   DCHECK_LE(numEvents, pending_);
221
222   // Don't reap more than numEvents, as we've just reset the counter to 0.
223   return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
224 }
225
226 Range<AsyncIO::Op**> AsyncIO::doWait(
227     WaitType type,
228     size_t minRequests,
229     size_t maxRequests,
230     std::vector<Op*>& result) {
231   io_event events[maxRequests];
232
233   // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
234   // WaitType::CANCEL we have to wait for IO completion.
235   size_t count = 0;
236   do {
237     int ret;
238     do {
239       // GOTCHA: io_getevents() may returns less than min_nr results if
240       // interrupted after some events have been read (if before, -EINTR
241       // is returned).
242       ret = io_getevents(
243           ctx_,
244           minRequests - count,
245           maxRequests - count,
246           events + count,
247           /* timeout */ nullptr); // wait forever
248     } while (ret == -EINTR);
249     // Check as may not be able to recover without leaking events.
250     CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
251                      << errnoStr(-ret);
252     count += ret;
253   } while (count < minRequests);
254   DCHECK_LE(count, maxRequests);
255
256   result.clear();
257   for (size_t i = 0; i < count; ++i) {
258     DCHECK(events[i].obj);
259     Op* op = boost::intrusive::get_parent_from_member(
260         events[i].obj, &AsyncIOOp::iocb_);
261     decrementPending();
262     switch (type) {
263       case WaitType::COMPLETE:
264         op->complete(events[i].res);
265         break;
266       case WaitType::CANCEL:
267         op->cancel();
268         break;
269     }
270     result.push_back(op);
271   }
272
273   return range(result);
274 }
275
276 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO) : asyncIO_(asyncIO) {}
277
278 AsyncIOQueue::~AsyncIOQueue() {
279   CHECK_EQ(asyncIO_->pending(), 0);
280 }
281
282 void AsyncIOQueue::submit(AsyncIOOp* op) {
283   submit([op]() { return op; });
284 }
285
286 void AsyncIOQueue::submit(OpFactory op) {
287   queue_.push_back(op);
288   maybeDequeue();
289 }
290
291 void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) {
292   maybeDequeue();
293 }
294
295 void AsyncIOQueue::maybeDequeue() {
296   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
297     auto& opFactory = queue_.front();
298     auto op = opFactory();
299     queue_.pop_front();
300
301     // Interpose our completion callback
302     auto& nextCb = op->notificationCallback();
303     op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
304       this->onCompleted(op2);
305       if (nextCb) {
306         nextCb(op2);
307       }
308     });
309
310     asyncIO_->submit(op);
311   }
312 }
313
314 // debugging helpers:
315
316 namespace {
317
318 #define X(c) \
319   case c:    \
320     return #c
321
322 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
323   switch (state) {
324     X(AsyncIOOp::State::UNINITIALIZED);
325     X(AsyncIOOp::State::INITIALIZED);
326     X(AsyncIOOp::State::PENDING);
327     X(AsyncIOOp::State::COMPLETED);
328     X(AsyncIOOp::State::CANCELED);
329   }
330   return "<INVALID AsyncIOOp::State>";
331 }
332
333 const char* iocbCmdToString(short int cmd_short) {
334   io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
335   switch (cmd) {
336     X(IO_CMD_PREAD);
337     X(IO_CMD_PWRITE);
338     X(IO_CMD_FSYNC);
339     X(IO_CMD_FDSYNC);
340     X(IO_CMD_POLL);
341     X(IO_CMD_NOOP);
342     X(IO_CMD_PREADV);
343     X(IO_CMD_PWRITEV);
344   };
345   return "<INVALID io_iocb_cmd>";
346 }
347
348 #undef X
349
350 std::string fd2name(int fd) {
351   std::string path = folly::to<std::string>("/proc/self/fd/", fd);
352   char link[PATH_MAX];
353   const ssize_t length =
354       std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
355   return path.assign(link, length);
356 }
357
358 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
359   os << folly::format(
360       "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
361       cb.data,
362       cb.key,
363       iocbCmdToString(cb.aio_lio_opcode),
364       cb.aio_reqprio,
365       cb.aio_fildes,
366       fd2name(cb.aio_fildes));
367
368   switch (cb.aio_lio_opcode) {
369     case IO_CMD_PREAD:
370     case IO_CMD_PWRITE:
371       os << folly::format(
372           "buf={}, offset={}, nbytes={}, ",
373           cb.u.c.buf,
374           cb.u.c.offset,
375           cb.u.c.nbytes);
376       break;
377     default:
378       os << "[TODO: write debug string for "
379          << iocbCmdToString(cb.aio_lio_opcode) << "] ";
380       break;
381   }
382
383   return os;
384 }
385
386 } // anonymous namespace
387
388 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
389   os << "{" << op.state_ << ", ";
390
391   if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
392     os << op.iocb_;
393   }
394
395   if (op.state_ == AsyncIOOp::State::COMPLETED) {
396     os << "result=" << op.result_;
397     if (op.result_ < 0) {
398       os << " (" << errnoStr(-op.result_) << ')';
399     }
400     os << ", ";
401   }
402
403   return os << "}";
404 }
405
406 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
407   return os << asyncIoOpStateToString(state);
408 }
409
410 } // namespace folly