folly: build with -Wunused-parameter
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/io/AsyncIO.h>
18
19 #include <sys/eventfd.h>
20 #include <unistd.h>
21 #include <cerrno>
22 #include <ostream>
23 #include <stdexcept>
24 #include <string>
25
26 #include <boost/intrusive/parent_from_member.hpp>
27 #include <glog/logging.h>
28
29 #include <folly/Exception.h>
30 #include <folly/Format.h>
31 #include <folly/Likely.h>
32 #include <folly/String.h>
33
34 namespace folly {
35
36 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
37   : cb_(std::move(cb)),
38     state_(State::UNINITIALIZED),
39     result_(-EINVAL) {
40   memset(&iocb_, 0, sizeof(iocb_));
41 }
42
43 void AsyncIOOp::reset(NotificationCallback cb) {
44   CHECK_NE(state_, State::PENDING);
45   cb_ = std::move(cb);
46   state_ = State::UNINITIALIZED;
47   result_ = -EINVAL;
48   memset(&iocb_, 0, sizeof(iocb_));
49 }
50
51 AsyncIOOp::~AsyncIOOp() {
52   CHECK_NE(state_, State::PENDING);
53 }
54
55 void AsyncIOOp::start() {
56   DCHECK_EQ(state_, State::INITIALIZED);
57   state_ = State::PENDING;
58 }
59
60 void AsyncIOOp::complete(ssize_t result) {
61   DCHECK_EQ(state_, State::PENDING);
62   state_ = State::COMPLETED;
63   result_ = result;
64   if (cb_) {
65     cb_(this);
66   }
67 }
68
69 ssize_t AsyncIOOp::result() const {
70   CHECK_EQ(state_, State::COMPLETED);
71   return result_;
72 }
73
74 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
75   init();
76   io_prep_pread(&iocb_, fd, buf, size, start);
77 }
78
79 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
80   pread(fd, range.begin(), range.size(), start);
81 }
82
83 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
84   init();
85   io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
86 }
87
88 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
89   init();
90   io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
91 }
92
93 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
94   pwrite(fd, range.begin(), range.size(), start);
95 }
96
97 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
98   init();
99   io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
100 }
101
102 void AsyncIOOp::init() {
103   CHECK_EQ(state_, State::UNINITIALIZED);
104   state_ = State::INITIALIZED;
105 }
106
107 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
108   : ctx_(0),
109     ctxSet_(false),
110     pending_(0),
111     submitted_(0),
112     capacity_(capacity),
113     pollFd_(-1) {
114   CHECK_GT(capacity_, 0);
115   completed_.reserve(capacity_);
116   if (pollMode == POLLABLE) {
117     pollFd_ = eventfd(0, EFD_NONBLOCK);
118     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
119   }
120 }
121
122 AsyncIO::~AsyncIO() {
123   CHECK_EQ(pending_, 0);
124   if (ctx_) {
125     int rc = io_queue_release(ctx_);
126     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
127   }
128   if (pollFd_ != -1) {
129     CHECK_ERR(close(pollFd_));
130   }
131 }
132
133 void AsyncIO::decrementPending() {
134   auto p = pending_.fetch_add(-1, std::memory_order_acq_rel);
135   DCHECK_GE(p, 1);
136 }
137
138 void AsyncIO::initializeContext() {
139   if (!ctxSet_.load(std::memory_order_acquire)) {
140     std::lock_guard<std::mutex> lock(initMutex_);
141     if (!ctxSet_.load(std::memory_order_relaxed)) {
142       int rc = io_queue_init(capacity_, &ctx_);
143       // returns negative errno
144       if (rc == -EAGAIN) {
145         long aio_nr, aio_max;
146         std::unique_ptr<FILE, int(*)(FILE*)>
147           fp(fopen("/proc/sys/fs/aio-nr", "r"), fclose);
148         PCHECK(fp);
149         CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
150
151         std::unique_ptr<FILE, int(*)(FILE*)>
152           aio_max_fp(fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
153         PCHECK(aio_max_fp);
154         CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
155
156         LOG(ERROR) << "No resources for requested capacity of " << capacity_;
157         LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
158       }
159
160       checkKernelError(rc, "AsyncIO: io_queue_init failed");
161       DCHECK(ctx_);
162       ctxSet_.store(true, std::memory_order_release);
163     }
164   }
165 }
166
167 void AsyncIO::submit(Op* op) {
168   CHECK_EQ(op->state(), Op::State::INITIALIZED);
169   initializeContext();  // on demand
170
171   // We can increment past capacity, but we'll clean up after ourselves.
172   auto p = pending_.fetch_add(1, std::memory_order_acq_rel);
173   if (p >= capacity_) {
174     decrementPending();
175     throw std::range_error("AsyncIO: too many pending requests");
176   }
177   iocb* cb = &op->iocb_;
178   cb->data = nullptr;  // unused
179   if (pollFd_ != -1) {
180     io_set_eventfd(cb, pollFd_);
181   }
182   int rc = io_submit(ctx_, 1, &cb);
183   if (rc < 0) {
184     decrementPending();
185     throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
186   }
187   submitted_++;
188   DCHECK_EQ(rc, 1);
189   op->start();
190 }
191
192 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
193   CHECK(ctx_);
194   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
195   auto p = pending_.load(std::memory_order_acquire);
196   CHECK_LE(minRequests, p);
197   return doWait(minRequests, p);
198 }
199
200 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
201   CHECK(ctx_);
202   CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
203   uint64_t numEvents;
204   // This sets the eventFd counter to 0, see
205   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
206   ssize_t rc;
207   do {
208     rc = ::read(pollFd_, &numEvents, 8);
209   } while (rc == -1 && errno == EINTR);
210   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
211     return Range<Op**>();  // nothing completed
212   }
213   checkUnixError(rc, "AsyncIO: read from event fd failed");
214   DCHECK_EQ(rc, 8);
215
216   DCHECK_GT(numEvents, 0);
217   DCHECK_LE(numEvents, pending_);
218
219   // Don't reap more than numEvents, as we've just reset the counter to 0.
220   return doWait(numEvents, numEvents);
221 }
222
223 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
224   io_event events[maxRequests];
225
226   size_t count = 0;
227   do {
228     int ret;
229     do {
230       // GOTCHA: io_getevents() may returns less than min_nr results if
231       // interrupted after some events have been read (if before, -EINTR
232       // is returned).
233       ret = io_getevents(ctx_,
234                          minRequests - count,
235                          maxRequests - count,
236                          events + count,
237                          /* timeout */ nullptr);  // wait forever
238     } while (ret == -EINTR);
239     // Check as may not be able to recover without leaking events.
240     CHECK_GE(ret, 0)
241       << "AsyncIO: io_getevents failed with error " << errnoStr(-ret);
242     count += ret;
243   } while (count < minRequests);
244   DCHECK_LE(count, maxRequests);
245
246   completed_.clear();
247   if (count == 0) {
248     return folly::Range<Op**>();
249   }
250
251   for (size_t i = 0; i < count; ++i) {
252     DCHECK(events[i].obj);
253     Op* op = boost::intrusive::get_parent_from_member(
254         events[i].obj, &AsyncIOOp::iocb_);
255     decrementPending();
256     op->complete(events[i].res);
257     completed_.push_back(op);
258   }
259
260   return folly::Range<Op**>(&completed_.front(), count);
261 }
262
263 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
264   : asyncIO_(asyncIO) {
265 }
266
267 AsyncIOQueue::~AsyncIOQueue() {
268   CHECK_EQ(asyncIO_->pending(), 0);
269 }
270
271 void AsyncIOQueue::submit(AsyncIOOp* op) {
272   submit([op]() { return op; });
273 }
274
275 void AsyncIOQueue::submit(OpFactory op) {
276   queue_.push_back(op);
277   maybeDequeue();
278 }
279
280 void AsyncIOQueue::onCompleted(AsyncIOOp* /* op */) { maybeDequeue(); }
281
282 void AsyncIOQueue::maybeDequeue() {
283   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
284     auto& opFactory = queue_.front();
285     auto op = opFactory();
286     queue_.pop_front();
287
288     // Interpose our completion callback
289     auto& nextCb = op->notificationCallback();
290     op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
291       this->onCompleted(op);
292       if (nextCb) nextCb(op);
293     });
294
295     asyncIO_->submit(op);
296   }
297 }
298
299 // debugging helpers:
300
301 namespace {
302
303 #define X(c) case c: return #c
304
305 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
306   switch (state) {
307     X(AsyncIOOp::State::UNINITIALIZED);
308     X(AsyncIOOp::State::INITIALIZED);
309     X(AsyncIOOp::State::PENDING);
310     X(AsyncIOOp::State::COMPLETED);
311   }
312   return "<INVALID AsyncIOOp::State>";
313 }
314
315 const char* iocbCmdToString(short int cmd_short) {
316   io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
317   switch (cmd) {
318     X(IO_CMD_PREAD);
319     X(IO_CMD_PWRITE);
320     X(IO_CMD_FSYNC);
321     X(IO_CMD_FDSYNC);
322     X(IO_CMD_POLL);
323     X(IO_CMD_NOOP);
324     X(IO_CMD_PREADV);
325     X(IO_CMD_PWRITEV);
326   };
327   return "<INVALID io_iocb_cmd>";
328 }
329
330 #undef X
331
332 std::string fd2name(int fd) {
333   std::string path = folly::to<std::string>("/proc/self/fd/", fd);
334   char link[PATH_MAX];
335   const ssize_t length =
336     std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
337   return path.assign(link, length);
338 }
339
340 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
341   os << folly::format(
342     "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
343     cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
344     cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
345
346   switch (cb.aio_lio_opcode) {
347     case IO_CMD_PREAD:
348     case IO_CMD_PWRITE:
349       os << folly::format("buf={}, offset={}, nbytes={}, ",
350                           cb.u.c.buf, cb.u.c.offset, cb.u.c.nbytes);
351       break;
352     default:
353       os << "[TODO: write debug string for "
354          << iocbCmdToString(cb.aio_lio_opcode) << "] ";
355       break;
356   }
357
358   return os;
359 }
360
361 }  // anonymous namespace
362
363 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
364   os << "{" << op.state_ << ", ";
365
366   if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
367     os << op.iocb_;
368   }
369
370   if (op.state_ == AsyncIOOp::State::COMPLETED) {
371     os << "result=" << op.result_;
372     if (op.result_ < 0) {
373       os << " (" << errnoStr(-op.result_) << ')';
374     }
375     os << ", ";
376   }
377
378   return os << "}";
379 }
380
381 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
382   return os << asyncIoOpStateToString(state);
383 }
384
385 }  // namespace folly