return range from AsyncIO::cancel(), fix test
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/io/AsyncIO.h>
18
19 #include <sys/eventfd.h>
20 #include <cerrno>
21 #include <ostream>
22 #include <stdexcept>
23 #include <string>
24
25 #include <boost/intrusive/parent_from_member.hpp>
26 #include <glog/logging.h>
27
28 #include <folly/Exception.h>
29 #include <folly/Format.h>
30 #include <folly/Likely.h>
31 #include <folly/String.h>
32 #include <folly/portability/Unistd.h>
33
34 namespace folly {
35
36 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
37   : cb_(std::move(cb)),
38     state_(State::UNINITIALIZED),
39     result_(-EINVAL) {
40   memset(&iocb_, 0, sizeof(iocb_));
41 }
42
43 void AsyncIOOp::reset(NotificationCallback cb) {
44   CHECK_NE(state_, State::PENDING);
45   cb_ = std::move(cb);
46   state_ = State::UNINITIALIZED;
47   result_ = -EINVAL;
48   memset(&iocb_, 0, sizeof(iocb_));
49 }
50
51 AsyncIOOp::~AsyncIOOp() {
52   CHECK_NE(state_, State::PENDING);
53 }
54
55 void AsyncIOOp::start() {
56   DCHECK_EQ(state_, State::INITIALIZED);
57   state_ = State::PENDING;
58 }
59
60 void AsyncIOOp::complete(ssize_t result) {
61   DCHECK_EQ(state_, State::PENDING);
62   state_ = State::COMPLETED;
63   result_ = result;
64   if (cb_) {
65     cb_(this);
66   }
67 }
68
69 void AsyncIOOp::cancel() {
70   DCHECK_EQ(state_, State::PENDING);
71   state_ = State::CANCELED;
72 }
73
74 ssize_t AsyncIOOp::result() const {
75   CHECK_EQ(state_, State::COMPLETED);
76   return result_;
77 }
78
79 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
80   init();
81   io_prep_pread(&iocb_, fd, buf, size, start);
82 }
83
84 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
85   pread(fd, range.begin(), range.size(), start);
86 }
87
88 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
89   init();
90   io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
91 }
92
93 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
94   init();
95   io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
96 }
97
98 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
99   pwrite(fd, range.begin(), range.size(), start);
100 }
101
102 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
103   init();
104   io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
105 }
106
107 void AsyncIOOp::init() {
108   CHECK_EQ(state_, State::UNINITIALIZED);
109   state_ = State::INITIALIZED;
110 }
111
112 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode) : capacity_(capacity) {
113   CHECK_GT(capacity_, 0);
114   completed_.reserve(capacity_);
115   if (pollMode == POLLABLE) {
116     pollFd_ = eventfd(0, EFD_NONBLOCK);
117     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
118   }
119 }
120
121 AsyncIO::~AsyncIO() {
122   CHECK_EQ(pending_, 0);
123   if (ctx_) {
124     int rc = io_queue_release(ctx_);
125     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
126   }
127   if (pollFd_ != -1) {
128     CHECK_ERR(close(pollFd_));
129   }
130 }
131
132 void AsyncIO::decrementPending() {
133   auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
134   DCHECK_GE(p, 1);
135 }
136
137 void AsyncIO::initializeContext() {
138   if (!ctxSet_.load(std::memory_order_acquire)) {
139     std::lock_guard<std::mutex> lock(initMutex_);
140     if (!ctxSet_.load(std::memory_order_relaxed)) {
141       int rc = io_queue_init(capacity_, &ctx_);
142       // returns negative errno
143       if (rc == -EAGAIN) {
144         long aio_nr, aio_max;
145         std::unique_ptr<FILE, int(*)(FILE*)>
146           fp(fopen("/proc/sys/fs/aio-nr", "r"), fclose);
147         PCHECK(fp);
148         CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
149
150         std::unique_ptr<FILE, int(*)(FILE*)>
151           aio_max_fp(fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
152         PCHECK(aio_max_fp);
153         CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
154
155         LOG(ERROR) << "No resources for requested capacity of " << capacity_;
156         LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
157       }
158
159       checkKernelError(rc, "AsyncIO: io_queue_init failed");
160       DCHECK(ctx_);
161       ctxSet_.store(true, std::memory_order_release);
162     }
163   }
164 }
165
166 void AsyncIO::submit(Op* op) {
167   CHECK_EQ(op->state(), Op::State::INITIALIZED);
168   initializeContext();  // on demand
169
170   // We can increment past capacity, but we'll clean up after ourselves.
171   auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
172   if (p >= capacity_) {
173     decrementPending();
174     throw std::range_error("AsyncIO: too many pending requests");
175   }
176   iocb* cb = &op->iocb_;
177   cb->data = nullptr;  // unused
178   if (pollFd_ != -1) {
179     io_set_eventfd(cb, pollFd_);
180   }
181   int rc = io_submit(ctx_, 1, &cb);
182   if (rc < 0) {
183     decrementPending();
184     throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
185   }
186   submitted_++;
187   DCHECK_EQ(rc, 1);
188   op->start();
189 }
190
191 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
192   CHECK(ctx_);
193   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
194   auto p = pending_.load(std::memory_order_acquire);
195   CHECK_LE(minRequests, p);
196   return doWait(WaitType::COMPLETE, minRequests, p, completed_);
197 }
198
199 Range<AsyncIO::Op**> AsyncIO::cancel() {
200   CHECK(ctx_);
201   auto p = pending_.load(std::memory_order_acquire);
202   return doWait(WaitType::CANCEL, p, p, canceled_);
203 }
204
205 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
206   CHECK(ctx_);
207   CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
208   uint64_t numEvents;
209   // This sets the eventFd counter to 0, see
210   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
211   ssize_t rc;
212   do {
213     rc = ::read(pollFd_, &numEvents, 8);
214   } while (rc == -1 && errno == EINTR);
215   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
216     return Range<Op**>();  // nothing completed
217   }
218   checkUnixError(rc, "AsyncIO: read from event fd failed");
219   DCHECK_EQ(rc, 8);
220
221   DCHECK_GT(numEvents, 0);
222   DCHECK_LE(numEvents, pending_);
223
224   // Don't reap more than numEvents, as we've just reset the counter to 0.
225   return doWait(WaitType::COMPLETE, numEvents, numEvents, completed_);
226 }
227
228 Range<AsyncIO::Op**> AsyncIO::doWait(
229     WaitType type,
230     size_t minRequests,
231     size_t maxRequests,
232     std::vector<Op*>& result) {
233   io_event events[maxRequests];
234
235   // Unfortunately, Linux AIO doesn't implement io_cancel, so even for
236   // WaitType::CANCEL we have to wait for IO completion.
237   size_t count = 0;
238   do {
239     int ret;
240     do {
241       // GOTCHA: io_getevents() may returns less than min_nr results if
242       // interrupted after some events have been read (if before, -EINTR
243       // is returned).
244       ret = io_getevents(ctx_,
245                          minRequests - count,
246                          maxRequests - count,
247                          events + count,
248                          /* timeout */ nullptr);  // wait forever
249     } while (ret == -EINTR);
250     // Check as may not be able to recover without leaking events.
251     CHECK_GE(ret, 0) << "AsyncIO: io_getevents failed with error "
252                      << errnoStr(-ret);
253     count += ret;
254   } while (count < minRequests);
255   DCHECK_LE(count, maxRequests);
256
257   result.clear();
258   for (size_t i = 0; i < count; ++i) {
259     DCHECK(events[i].obj);
260     Op* op = boost::intrusive::get_parent_from_member(
261         events[i].obj, &AsyncIOOp::iocb_);
262     decrementPending();
263     switch (type) {
264       case WaitType::COMPLETE:
265         op->complete(events[i].res);
266         break;
267       case WaitType::CANCEL:
268         op->cancel();
269         break;
270     }
271     result.push_back(op);
272   }
273
274   return range(result);
275 }
276
277 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
278   : asyncIO_(asyncIO) {
279 }
280
281 AsyncIOQueue::~AsyncIOQueue() {
282   CHECK_EQ(asyncIO_->pending(), 0);
283 }
284
285 void AsyncIOQueue::submit(AsyncIOOp* op) {
286   submit([op]() { return op; });
287 }
288
289 void AsyncIOQueue::submit(OpFactory op) {
290   queue_.push_back(op);
291   maybeDequeue();
292 }
293
294 void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
295
296 void AsyncIOQueue::maybeDequeue() {
297   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
298     auto& opFactory = queue_.front();
299     auto op = opFactory();
300     queue_.pop_front();
301
302     // Interpose our completion callback
303     auto& nextCb = op->notificationCallback();
304     op->setNotificationCallback([this, nextCb](AsyncIOOp* op2) {
305       this->onCompleted(op2);
306       if (nextCb) nextCb(op2);
307     });
308
309     asyncIO_->submit(op);
310   }
311 }
312
313 // debugging helpers:
314
315 namespace {
316
317 #define X(c) case c: return #c
318
319 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
320   switch (state) {
321     X(AsyncIOOp::State::UNINITIALIZED);
322     X(AsyncIOOp::State::INITIALIZED);
323     X(AsyncIOOp::State::PENDING);
324     X(AsyncIOOp::State::COMPLETED);
325     X(AsyncIOOp::State::CANCELED);
326   }
327   return "<INVALID AsyncIOOp::State>";
328 }
329
330 const char* iocbCmdToString(short int cmd_short) {
331   io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
332   switch (cmd) {
333     X(IO_CMD_PREAD);
334     X(IO_CMD_PWRITE);
335     X(IO_CMD_FSYNC);
336     X(IO_CMD_FDSYNC);
337     X(IO_CMD_POLL);
338     X(IO_CMD_NOOP);
339     X(IO_CMD_PREADV);
340     X(IO_CMD_PWRITEV);
341   };
342   return "<INVALID io_iocb_cmd>";
343 }
344
345 #undef X
346
347 std::string fd2name(int fd) {
348   std::string path = folly::to<std::string>("/proc/self/fd/", fd);
349   char link[PATH_MAX];
350   const ssize_t length =
351     std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
352   return path.assign(link, length);
353 }
354
355 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
356   os << folly::format(
357     "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
358     cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
359     cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
360
361   switch (cb.aio_lio_opcode) {
362     case IO_CMD_PREAD:
363     case IO_CMD_PWRITE:
364       os << folly::format("buf={}, offset={}, nbytes={}, ",
365                           cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
366       break;
367     default:
368       os << "[TODO: write debug string for "
369          << iocbCmdToString(cb.aio_lio_opcode) << "] ";
370       break;
371   }
372
373   return os;
374 }
375
376 }  // anonymous namespace
377
378 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
379   os << "{" << op.state_ << ", ";
380
381   if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
382     os << op.iocb_;
383   }
384
385   if (op.state_ == AsyncIOOp::State::COMPLETED) {
386     os << "result=" << op.result_;
387     if (op.result_ < 0) {
388       os << " (" << errnoStr(-op.result_) << ')';
389     }
390     os << ", ";
391   }
392
393   return os << "}";
394 }
395
396 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
397   return os << asyncIoOpStateToString(state);
398 }
399
400 }  // namespace folly