AsyncIO::cancel
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/io/AsyncIO.h>
18
19 #include <sys/eventfd.h>
20 #include <cerrno>
21 #include <ostream>
22 #include <stdexcept>
23 #include <string>
24
25 #include <boost/intrusive/parent_from_member.hpp>
26 #include <glog/logging.h>
27
28 #include <folly/Exception.h>
29 #include <folly/Format.h>
30 #include <folly/Likely.h>
31 #include <folly/String.h>
32 #include <folly/portability/Unistd.h>
33
34 namespace folly {
35
36 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
37   : cb_(std::move(cb)),
38     state_(State::UNINITIALIZED),
39     result_(-EINVAL) {
40   memset(&iocb_, 0, sizeof(iocb_));
41 }
42
43 void AsyncIOOp::reset(NotificationCallback cb) {
44   CHECK_NE(state_, State::PENDING);
45   cb_ = std::move(cb);
46   state_ = State::UNINITIALIZED;
47   result_ = -EINVAL;
48   memset(&iocb_, 0, sizeof(iocb_));
49 }
50
51 AsyncIOOp::~AsyncIOOp() {
52   CHECK_NE(state_, State::PENDING);
53 }
54
55 void AsyncIOOp::start() {
56   DCHECK_EQ(state_, State::INITIALIZED);
57   state_ = State::PENDING;
58 }
59
60 void AsyncIOOp::complete(ssize_t result) {
61   DCHECK_EQ(state_, State::PENDING);
62   state_ = State::COMPLETED;
63   result_ = result;
64   if (cb_) {
65     cb_(this);
66   }
67 }
68
69 void AsyncIOOp::cancel() {
70   DCHECK_EQ(state_, State::PENDING);
71   state_ = State::CANCELED;
72 }
73
74 ssize_t AsyncIOOp::result() const {
75   CHECK_EQ(state_, State::COMPLETED);
76   return result_;
77 }
78
79 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
80   init();
81   io_prep_pread(&iocb_, fd, buf, size, start);
82 }
83
84 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
85   pread(fd, range.begin(), range.size(), start);
86 }
87
88 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
89   init();
90   io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
91 }
92
93 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
94   init();
95   io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
96 }
97
98 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
99   pwrite(fd, range.begin(), range.size(), start);
100 }
101
102 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
103   init();
104   io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
105 }
106
107 void AsyncIOOp::init() {
108   CHECK_EQ(state_, State::UNINITIALIZED);
109   state_ = State::INITIALIZED;
110 }
111
112 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
113   CHECK_GT(capacity_, 0);
114   completed_.reserve(capacity_);
115   if (pollMode == POLLABLE) {
116     pollFd_ = eventfd(0, EFD_NONBLOCK);
117     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
118   }
119 }
120
121 AsyncIO::~AsyncIO() {
122   CHECK_EQ(pending_, 0);
123   if (ctx_) {
124     int rc = io_queue_release(ctx_);
125     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
126   }
127   if (pollFd_ != -1) {
128     CHECK_ERR(close(pollFd_));
129   }
130 }
131
132 void AsyncIO::decrementPending() {
133   auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
134   DCHECK_GE(p, 1);
135 }
136
137 void AsyncIO::initializeContext() {
138   if (!ctxSet_.load(std::memory_order_acquire)) {
139     std::lock_guard<std::mutex> lock(initMutex_);
140     if (!ctxSet_.load(std::memory_order_relaxed)) {
141       int rc = io_queue_init(capacity_, &ctx_);
142       // returns negative errno
143       if (rc == -EAGAIN) {
144         long aio_nr, aio_max;
145         std::unique_ptr<FILE, int(*)(FILE*)>
146           fp(fopen("/proc/sys/fs/aio-nr", "r"), fclose);
147         PCHECK(fp);
148         CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
149
150         std::unique_ptr<FILE, int(*)(FILE*)>
151           aio_max_fp(fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
152         PCHECK(aio_max_fp);
153         CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
154
155         LOG(ERROR) << "No resources for requested capacity of " << capacity_;
156         LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
157       }
158
159       checkKernelError(rc, "AsyncIO: io_queue_init failed");
160       DCHECK(ctx_);
161       ctxSet_.store(true, std::memory_order_release);
162     }
163   }
164 }
165
166 void AsyncIO::submit(Op* op) {
167   CHECK_EQ(op->state(), Op::State::INITIALIZED);
168   initializeContext();  // on demand
169
170   // We can increment past capacity, but we'll clean up after ourselves.
171   auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
172   if (p >= capacity_) {
173     decrementPending();
174     throw std::range_error("AsyncIO: too many pending requests");
175   }
176   iocb* cb = &op->iocb_;
177   cb->data = nullptr;  // unused
178   if (pollFd_ != -1) {
179     io_set_eventfd(cb, pollFd_);
180   }
181   int rc = io_submit(ctx_, 1, &cb);
182   if (rc < 0) {
183     decrementPending();
184     throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
185   }
186   submitted_++;
187   DCHECK_EQ(rc, 1);
188   op->start();
189 }
190
191 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
192   CHECK(ctx_);
193   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
194   auto p = pending_.load(std::memory_order_acquire);
195   CHECK_LE(minRequests, p);
196   doWait(WaitType::COMPLETE, minRequests, p, &completed_);
197   return Range<Op**>(completed_.data(), completed_.size());
198 }
199
200 size_t AsyncIO::cancel() {
201   CHECK(ctx_);
202   auto p = pending_.load(std::memory_order_acquire);
203   doWait(WaitType::CANCEL, p, p, nullptr);
204   return p;
205 }
206
207 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
208   CHECK(ctx_);
209   CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
210   uint64_t numEvents;
211   // This sets the eventFd counter to 0, see
212   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
213   ssize_t rc;
214   do {
215     rc = ::read(pollFd_, &numEvents, 8);
216   } while (rc == -1 && errno == EINTR);
217   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
218     return Range<Op**>();  // nothing completed
219   }
220   checkUnixError(rc, "AsyncIO: read from event fd failed");
221   DCHECK_EQ(rc, 8);
222
223   DCHECK_GT(numEvents, 0);
224   DCHECK_LE(numEvents, pending_);
225
226   // Don't reap more than numEvents, as we've just reset the counter to 0.
227   doWait(WaitType::COMPLETE, numEvents, numEvents, &completed_);
228   return Range<Op**>(completed_.data(), completed_.size());
229 }
230
231 void AsyncIO::doWait(
232     WaitType type,
233     size_t minRequests,
234     size_t maxRequests,
235     std::vector<Op*>* result) {
236   io_event events[maxRequests];
237
238   // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
239   // WaitType::CANCEL we have to wait for IO completion.
240   size_t count = 0;
241   do {
242     int ret;
243     do {
244       // GOTCHA: io_getevents() may returns less than min_nr results if
245       // interrupted after some events have been read (if before, -EINTR
246       // is returned).
247       ret = io_getevents(ctx_,
248                          minRequests - count,
249                          maxRequests - count,
250                          events + count,
251                          /* timeout */ nullptr);  // wait forever
252     } while (ret == -EINTR);
253     // Check as may not be able to recover without leaking events.
254     CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
255                      << errnoStr(-ret);
256     count += ret;
257   } while (count < minRequests);
258   DCHECK_LE(count, maxRequests);
259
260   if (result != nullptr) {
261     result->clear();
262   }
263   for (size_t i = 0; i < count; ++i) {
264     DCHECK(events[i].obj);
265     Op* op = boost::intrusive::get_parent_from_member(
266         events[i].obj, &AsyncIOOp::iocb_);
267     decrementPending();
268     switch (type) {
269       case WaitType::COMPLETE:
270         op->complete(events[i].res);
271         break;
272       case WaitType::CANCEL:
273         op->cancel();
274         break;
275     }
276     if (result != nullptr) {
277       result->push_back(op);
278     }
279   }
280 }
281
282 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
283   : asyncIO_(asyncIO) {
284 }
285
286 AsyncIOQueue::~AsyncIOQueue() {
287   CHECK_EQ(asyncIO_->pending(), 0);
288 }
289
290 void AsyncIOQueue::submit(AsyncIOOp* op) {
291   submit([op]() { return op; });
292 }
293
294 void AsyncIOQueue::submit(OpFactory op) {
295   queue_.push_back(op);
296   maybeDequeue();
297 }
298
299 void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
300
301 void AsyncIOQueue::maybeDequeue() {
302   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
303     auto& opFactory = queue_.front();
304     auto op = opFactory();
305     queue_.pop_front();
306
307     // Interpose our completion callback
308     auto& nextCb = op->notificationCallback();
309     op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
310       this->onCompleted(op2);
311       if (nextCb) nextCb(op2);
312     });
313
314     asyncIO_->submit(op);
315   }
316 }
317
318 // debugging helpers:
319
320 namespace {
321
322 #define X(c) case c: return #c
323
324 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
325   switch (state) {
326     X(AsyncIOOp::State::UNINITIALIZED);
327     X(AsyncIOOp::State::INITIALIZED);
328     X(AsyncIOOp::State::PENDING);
329     X(AsyncIOOp::State::COMPLETED);
330     X(AsyncIOOp::State::CANCELED);
331   }
332   return "<INVALID AsyncIOOp::State>";
333 }
334
335 const char* iocbCmdToString(short int cmd_short) {
336   io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
337   switch (cmd) {
338     X(IO_CMD_PREAD);
339     X(IO_CMD_PWRITE);
340     X(IO_CMD_FSYNC);
341     X(IO_CMD_FDSYNC);
342     X(IO_CMD_POLL);
343     X(IO_CMD_NOOP);
344     X(IO_CMD_PREADV);
345     X(IO_CMD_PWRITEV);
346   };
347   return "<INVALID io_iocb_cmd>";
348 }
349
350 #undef X
351
352 std::string fd2name(int fd) {
353   std::string path = folly::to<std::string>("/proc/self/fd/", fd);
354   char link[PATH_MAX];
355   const ssize_t length =
356     std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
357   return path.assign(link, length);
358 }
359
360 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
361   os << folly::format(
362     "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
363     cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
364     cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
365
366   switch (cb.aio_lio_opcode) {
367     case IO_CMD_PREAD:
368     case IO_CMD_PWRITE:
369       os << folly::format("buf={}, offset={}, nbytes={}, ",
370                           cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
371       break;
372     default:
373       os << "[TODO: write debug string for "
374          << iocbCmdToString(cb.aio_lio_opcode) << "] ";
375       break;
376   }
377
378   return os;
379 }
380
381 }  // anonymous namespace
382
383 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
384   os << "{" << op.state_ << ", ";
385
386   if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
387     os << op.iocb_;
388   }
389
390   if (op.state_ == AsyncIOOp::State::COMPLETED) {
391     os << "result=" << op.result_;
392     if (op.result_ < 0) {
393       os << " (" << errnoStr(-op.result_) << ')';
394     }
395     os << ", ";
396   }
397
398   return os << "}";
399 }
400
401 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
402   return os << asyncIoOpStateToString(state);
403 }
404
405 }  // namespace folly