Codemod: use #include angle brackets in folly and thrift
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/io/AsyncIO.h>
18
19 #include <sys/eventfd.h>
20 #include <unistd.h>
21 #include <cerrno>
22 #include <ostream>
23 #include <stdexcept>
24 #include <string>
25
26 #include <boost/intrusive/parent_from_member.hpp>
27 #include <glog/logging.h>
28
29 #include <folly/Exception.h>
30 #include <folly/Format.h>
31 #include <folly/Likely.h>
32 #include <folly/String.h>
33
34 namespace folly {
35
36 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
37   : cb_(std::move(cb)),
38     state_(State::UNINITIALIZED),
39     result_(-EINVAL) {
40   memset(&iocb_, 0, sizeof(iocb_));
41 }
42
43 void AsyncIOOp::reset(NotificationCallback cb) {
44   CHECK_NE(state_, State::PENDING);
45   cb_ = std::move(cb);
46   state_ = State::UNINITIALIZED;
47   result_ = -EINVAL;
48   memset(&iocb_, 0, sizeof(iocb_));
49 }
50
51 AsyncIOOp::~AsyncIOOp() {
52   CHECK_NE(state_, State::PENDING);
53 }
54
55 void AsyncIOOp::start() {
56   DCHECK_EQ(state_, State::INITIALIZED);
57   state_ = State::PENDING;
58 }
59
60 void AsyncIOOp::complete(ssize_t result) {
61   DCHECK_EQ(state_, State::PENDING);
62   state_ = State::COMPLETED;
63   result_ = result;
64   if (cb_) {
65     cb_(this);
66   }
67 }
68
69 ssize_t AsyncIOOp::result() const {
70   CHECK_EQ(state_, State::COMPLETED);
71   return result_;
72 }
73
74 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
75   init();
76   io_prep_pread(&iocb_, fd, buf, size, start);
77 }
78
79 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
80   pread(fd, range.begin(), range.size(), start);
81 }
82
83 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
84   init();
85   io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
86 }
87
88 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
89   init();
90   io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
91 }
92
93 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
94   pwrite(fd, range.begin(), range.size(), start);
95 }
96
97 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
98   init();
99   io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
100 }
101
102 void AsyncIOOp::init() {
103   CHECK_EQ(state_, State::UNINITIALIZED);
104   state_ = State::INITIALIZED;
105 }
106
107 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
108   : ctx_(0),
109     ctxSet_(false),
110     pending_(0),
111     capacity_(capacity),
112     pollFd_(-1) {
113   CHECK_GT(capacity_, 0);
114   completed_.reserve(capacity_);
115   if (pollMode == POLLABLE) {
116     pollFd_ = eventfd(0, EFD_NONBLOCK);
117     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
118   }
119 }
120
121 AsyncIO::~AsyncIO() {
122   CHECK_EQ(pending_, 0);
123   if (ctx_) {
124     int rc = io_queue_release(ctx_);
125     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
126   }
127   if (pollFd_ != -1) {
128     CHECK_ERR(close(pollFd_));
129   }
130 }
131
132 void AsyncIO::decrementPending() {
133   ssize_t p = pending_.fetch_add(-1, std::memory_order_acq_rel);
134   DCHECK_GE(p, 1);
135 }
136
137 void AsyncIO::initializeContext() {
138   if (!ctxSet_.load(std::memory_order_acquire)) {
139     std::lock_guard<std::mutex> lock(initMutex_);
140     if (!ctxSet_.load(std::memory_order_relaxed)) {
141       int rc = io_queue_init(capacity_, &ctx_);
142       // returns negative errno
143       if (rc == -EAGAIN) {
144         long aio_nr, aio_max;
145         std::unique_ptr<FILE, int(*)(FILE*)>
146           fp(fopen("/proc/sys/fs/aio-nr", "r"), fclose);
147         PCHECK(fp);
148         CHECK_EQ(fscanf(fp.get(), "%ld", &aio_nr), 1);
149
150         std::unique_ptr<FILE, int(*)(FILE*)>
151           aio_max_fp(fopen("/proc/sys/fs/aio-max-nr", "r"), fclose);
152         PCHECK(aio_max_fp);
153         CHECK_EQ(fscanf(aio_max_fp.get(), "%ld", &aio_max), 1);
154
155         LOG(ERROR) << "No resources for requested capacity of " << capacity_;
156         LOG(ERROR) << "aio_nr " << aio_nr << ", aio_max_nr " << aio_max;
157       }
158
159       checkKernelError(rc, "AsyncIO: io_queue_init failed");
160       DCHECK(ctx_);
161       ctxSet_.store(true, std::memory_order_release);
162     }
163   }
164 }
165
166 void AsyncIO::submit(Op* op) {
167   CHECK_EQ(op->state(), Op::State::INITIALIZED);
168   initializeContext();  // on demand
169
170   // We can increment past capacity, but we'll clean up after ourselves.
171   ssize_t p = pending_.fetch_add(1, std::memory_order_acq_rel);
172   if (p >= capacity_) {
173     decrementPending();
174     throw std::range_error("AsyncIO: too many pending requests");
175   }
176   iocb* cb = &op->iocb_;
177   cb->data = nullptr;  // unused
178   if (pollFd_ != -1) {
179     io_set_eventfd(cb, pollFd_);
180   }
181   int rc = io_submit(ctx_, 1, &cb);
182   if (rc < 0) {
183     decrementPending();
184     throwSystemErrorExplicit(-rc, "AsyncIO: io_submit failed");
185   }
186   DCHECK_EQ(rc, 1);
187   op->start();
188 }
189
190 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
191   CHECK(ctx_);
192   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
193   ssize_t p = pending_.load(std::memory_order_acquire);
194   CHECK_LE(minRequests, p);
195   return doWait(minRequests, p);
196 }
197
198 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
199   CHECK(ctx_);
200   CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
201   uint64_t numEvents;
202   // This sets the eventFd counter to 0, see
203   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
204   ssize_t rc;
205   do {
206     rc = ::read(pollFd_, &numEvents, 8);
207   } while (rc == -1 && errno == EINTR);
208   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
209     return Range<Op**>();  // nothing completed
210   }
211   checkUnixError(rc, "AsyncIO: read from event fd failed");
212   DCHECK_EQ(rc, 8);
213
214   DCHECK_GT(numEvents, 0);
215   DCHECK_LE(numEvents, pending_);
216
217   // Don't reap more than numEvents, as we've just reset the counter to 0.
218   return doWait(numEvents, numEvents);
219 }
220
221 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
222   io_event events[maxRequests];
223
224   size_t count = 0;
225   do {
226     int ret;
227     do {
228       // GOTCHA: io_getevents() may returns less than min_nr results if
229       // interrupted after some events have been read (if before, -EINTR
230       // is returned).
231       ret = io_getevents(ctx_,
232                          minRequests - count,
233                          maxRequests - count,
234                          events + count,
235                          /* timeout */ nullptr);  // wait forever
236     } while (ret == -EINTR);
237     // Check as may not be able to recover without leaking events.
238     CHECK_GE(ret, 0)
239       << "AsyncIO: io_getevents failed with error " << errnoStr(-ret);
240     count += ret;
241   } while (count < minRequests);
242   DCHECK_LE(count, maxRequests);
243
244   completed_.clear();
245   if (count == 0) {
246     return folly::Range<Op**>();
247   }
248
249   for (size_t i = 0; i < count; ++i) {
250     DCHECK(events[i].obj);
251     Op* op = boost::intrusive::get_parent_from_member(
252         events[i].obj, &AsyncIOOp::iocb_);
253     decrementPending();
254     op->complete(events[i].res);
255     completed_.push_back(op);
256   }
257
258   return folly::Range<Op**>(&completed_.front(), count);
259 }
260
261 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
262   : asyncIO_(asyncIO) {
263 }
264
265 AsyncIOQueue::~AsyncIOQueue() {
266   CHECK_EQ(asyncIO_->pending(), 0);
267 }
268
269 void AsyncIOQueue::submit(AsyncIOOp* op) {
270   submit([op]() { return op; });
271 }
272
273 void AsyncIOQueue::submit(OpFactory op) {
274   queue_.push_back(op);
275   maybeDequeue();
276 }
277
278 void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
279   maybeDequeue();
280 }
281
282 void AsyncIOQueue::maybeDequeue() {
283   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
284     auto& opFactory = queue_.front();
285     auto op = opFactory();
286     queue_.pop_front();
287
288     // Interpose our completion callback
289     auto& nextCb = op->notificationCallback();
290     op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
291       this->onCompleted(op);
292       if (nextCb) nextCb(op);
293     });
294
295     asyncIO_->submit(op);
296   }
297 }
298
299 // debugging helpers:
300
301 namespace {
302
303 #define X(c) case c: return #c
304
305 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
306   switch (state) {
307     X(AsyncIOOp::State::UNINITIALIZED);
308     X(AsyncIOOp::State::INITIALIZED);
309     X(AsyncIOOp::State::PENDING);
310     X(AsyncIOOp::State::COMPLETED);
311   }
312   return "<INVALID AsyncIOOp::State>";
313 }
314
315 const char* iocbCmdToString(short int cmd_short) {
316   io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
317   switch (cmd) {
318     X(IO_CMD_PREAD);
319     X(IO_CMD_PWRITE);
320     X(IO_CMD_FSYNC);
321     X(IO_CMD_FDSYNC);
322     X(IO_CMD_POLL);
323     X(IO_CMD_NOOP);
324     X(IO_CMD_PREADV);
325     X(IO_CMD_PWRITEV);
326   };
327   return "<INVALID io_iocb_cmd>";
328 }
329
330 #undef X
331
332 std::string fd2name(int fd) {
333   std::string path = folly::to<std::string>("/proc/self/fd/", fd);
334   char link[PATH_MAX];
335   const ssize_t length =
336     std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
337   return path.assign(link, length);
338 }
339
340 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
341   os << folly::format(
342     "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
343     cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
344     cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
345
346   switch (cb.aio_lio_opcode) {
347     case IO_CMD_PREAD:
348     case IO_CMD_PWRITE:
349       os << folly::format("buf={}, off={}, size={}, ",
350                           cb.u.c.buf, cb.u.c.nbytes, cb.u.c.offset);
351     default:
352       os << "[TODO: write debug string for "
353          << iocbCmdToString(cb.aio_lio_opcode) << "] ";
354   }
355
356   return os;
357 }
358
359 }  // anonymous namespace
360
361 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
362   os << "{" << op.state_ << ", ";
363
364   if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
365     os << op.iocb_;
366   }
367
368   if (op.state_ == AsyncIOOp::State::COMPLETED) {
369     os << "result=" << op.result_ << ", ";
370   }
371
372   return os << "}";
373 }
374
375 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
376   return os << asyncIoOpStateToString(state);
377 }
378
379 }  // namespace folly
380