folly: AsyncIO: add debuging helper
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/AsyncIO.h"
18
19 #include <unistd.h>
20 #include <cerrno>
21 #include <string>
22
23 #include <boost/intrusive/parent_from_member.hpp>
24 #include <glog/logging.h>
25
26 #include "folly/Exception.h"
27 #include "folly/Format.h"
28 #include "folly/Likely.h"
29 #include "folly/String.h"
30 #include "folly/eventfd.h"
31
32 namespace folly {
33
34 AsyncIOOp::AsyncIOOp(NotificationCallback cb)
35   : cb_(std::move(cb)),
36     state_(State::UNINITIALIZED),
37     result_(-EINVAL) {
38   memset(&iocb_, 0, sizeof(iocb_));
39 }
40
41 void AsyncIOOp::reset(NotificationCallback cb) {
42   CHECK_NE(state_, State::PENDING);
43   cb_ = std::move(cb);
44   state_ = State::UNINITIALIZED;
45   result_ = -EINVAL;
46   memset(&iocb_, 0, sizeof(iocb_));
47 }
48
49 AsyncIOOp::~AsyncIOOp() {
50   CHECK_NE(state_, State::PENDING);
51 }
52
53 void AsyncIOOp::start() {
54   DCHECK_EQ(state_, State::INITIALIZED);
55   state_ = State::PENDING;
56 }
57
58 void AsyncIOOp::complete(ssize_t result) {
59   DCHECK_EQ(state_, State::PENDING);
60   state_ = State::COMPLETED;
61   result_ = result;
62   if (cb_) {
63     cb_(this);
64   }
65 }
66
67 ssize_t AsyncIOOp::result() const {
68   CHECK_EQ(state_, State::COMPLETED);
69   return result_;
70 }
71
72 void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
73   init();
74   io_prep_pread(&iocb_, fd, buf, size, start);
75 }
76
77 void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
78   pread(fd, range.begin(), range.size(), start);
79 }
80
81 void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
82   init();
83   io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
84 }
85
86 void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
87   init();
88   io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
89 }
90
91 void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
92   pwrite(fd, range.begin(), range.size(), start);
93 }
94
95 void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
96   init();
97   io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
98 }
99
100 void AsyncIOOp::init() {
101   CHECK_EQ(state_, State::UNINITIALIZED);
102   state_ = State::INITIALIZED;
103 }
104
105 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
106   : ctx_(0),
107     pending_(0),
108     capacity_(capacity),
109     pollFd_(-1) {
110   CHECK_GT(capacity_, 0);
111   completed_.reserve(capacity_);
112   if (pollMode == POLLABLE) {
113     pollFd_ = eventfd(0, EFD_NONBLOCK);
114     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
115   }
116 }
117
118 AsyncIO::~AsyncIO() {
119   CHECK_EQ(pending_, 0);
120   if (ctx_) {
121     int rc = io_queue_release(ctx_);
122     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
123   }
124   if (pollFd_ != -1) {
125     CHECK_ERR(close(pollFd_));
126   }
127 }
128
129 void AsyncIO::initializeContext() {
130   if (!ctx_) {
131     int rc = io_queue_init(capacity_, &ctx_);
132     // returns negative errno
133     checkKernelError(rc, "AsyncIO: io_queue_init failed");
134     DCHECK(ctx_);
135   }
136 }
137
138 void AsyncIO::submit(Op* op) {
139   CHECK_EQ(op->state(), Op::State::INITIALIZED);
140   CHECK_LT(pending_, capacity_) << "too many pending requests";
141   initializeContext();  // on demand
142   iocb* cb = &op->iocb_;
143   cb->data = nullptr;  // unused
144   if (pollFd_ != -1) {
145     io_set_eventfd(cb, pollFd_);
146   }
147   int rc = io_submit(ctx_, 1, &cb);
148   checkKernelError(rc, "AsyncIO: io_submit failed");
149   DCHECK_EQ(rc, 1);
150   op->start();
151   ++pending_;
152 }
153
154 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
155   CHECK(ctx_);
156   CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
157   return doWait(minRequests, pending_);
158 }
159
160 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
161   CHECK(ctx_);
162   CHECK_NE(pollFd_, -1) << "pollCompleted() only allowed on pollable object";
163   uint64_t numEvents;
164   // This sets the eventFd counter to 0, see
165   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
166   ssize_t rc;
167   do {
168     rc = ::read(pollFd_, &numEvents, 8);
169   } while (rc == -1 && errno == EINTR);
170   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
171     return Range<Op**>();  // nothing completed
172   }
173   checkUnixError(rc, "AsyncIO: read from event fd failed");
174   DCHECK_EQ(rc, 8);
175
176   DCHECK_GT(numEvents, 0);
177   DCHECK_LE(numEvents, pending_);
178
179   // Don't reap more than numEvents, as we've just reset the counter to 0.
180   return doWait(numEvents, numEvents);
181 }
182
183 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
184   io_event events[pending_];
185   int count;
186   do {
187     // Wait forever
188     count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
189   } while (count == -EINTR);
190   checkKernelError(count, "AsyncIO: io_getevents failed");
191   DCHECK_GE(count, minRequests);  // the man page says so
192   DCHECK_LE(count, pending_);
193
194   completed_.clear();
195   if (count == 0) {
196     return folly::Range<Op**>();
197   }
198
199   for (size_t i = 0; i < count; ++i) {
200     DCHECK(events[i].obj);
201     Op* op = boost::intrusive::get_parent_from_member(
202         events[i].obj, &AsyncIOOp::iocb_);
203     --pending_;
204     op->complete(events[i].res);
205     completed_.push_back(op);
206   }
207
208   return folly::Range<Op**>(&completed_.front(), count);
209 }
210
211 AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
212   : asyncIO_(asyncIO) {
213 }
214
215 AsyncIOQueue::~AsyncIOQueue() {
216   CHECK_EQ(asyncIO_->pending(), 0);
217 }
218
219 void AsyncIOQueue::submit(AsyncIOOp* op) {
220   submit([op]() { return op; });
221 }
222
223 void AsyncIOQueue::submit(OpFactory op) {
224   queue_.push_back(op);
225   maybeDequeue();
226 }
227
228 void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
229   maybeDequeue();
230 }
231
232 void AsyncIOQueue::maybeDequeue() {
233   while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
234     auto& opFactory = queue_.front();
235     auto op = opFactory();
236     queue_.pop_front();
237
238     // Interpose our completion callback
239     auto& nextCb = op->notificationCallback();
240     op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
241       this->onCompleted(op);
242       if (nextCb) nextCb(op);
243     });
244
245     asyncIO_->submit(op);
246   }
247 }
248
249 // debugging helpers:
250
251 namespace {
252
253 #define X(c) case c: return #c
254
255 const char* asyncIoOpStateToString(AsyncIOOp::State state) {
256   switch (state) {
257     X(AsyncIOOp::State::UNINITIALIZED);
258     X(AsyncIOOp::State::INITIALIZED);
259     X(AsyncIOOp::State::PENDING);
260     X(AsyncIOOp::State::COMPLETED);
261   }
262   return "<INVALID AsyncIOOp::State>";
263 }
264
265 const char* iocbCmdToString(short int cmd_short) {
266   io_iocb_cmd cmd = static_cast<io_iocb_cmd>(cmd_short);
267   switch (cmd) {
268     X(IO_CMD_PREAD);
269     X(IO_CMD_PWRITE);
270     X(IO_CMD_FSYNC);
271     X(IO_CMD_FDSYNC);
272     X(IO_CMD_POLL);
273     X(IO_CMD_NOOP);
274     X(IO_CMD_PREADV);
275     X(IO_CMD_PWRITEV);
276   };
277   return "<INVALID io_iocb_cmd>";
278 }
279
280 #undef X
281
282 std::string fd2name(int fd) {
283   std::string path = folly::to<std::string>("/proc/self/fd/", fd);
284   char link[PATH_MAX];
285   const ssize_t length =
286     std::max<ssize_t>(readlink(path.c_str(), link, PATH_MAX), 0);
287   return path.assign(link, length);
288 }
289
290 std::ostream& operator<<(std::ostream& os, const iocb& cb) {
291   os << folly::format(
292     "data={}, key={}, opcode={}, reqprio={}, fd={}, f={}, ",
293     cb.data, cb.key, iocbCmdToString(cb.aio_lio_opcode),
294     cb.aio_reqprio, cb.aio_fildes, fd2name(cb.aio_fildes));
295
296   switch (cb.aio_lio_opcode) {
297     case IO_CMD_PREAD:
298     case IO_CMD_PWRITE:
299       os << folly::format("buf={}, off={}, size={}, ",
300                           cb.u.c.buf, cb.u.c.nbytes, cb.u.c.offset);
301     default:
302       os << "[TODO: write debug string for "
303          << iocbCmdToString(cb.aio_lio_opcode) << "] ";
304   }
305
306   return os;
307 }
308
309 }  // anonymous namespace
310
311 std::ostream& operator<<(std::ostream& os, const AsyncIOOp& op) {
312   os << "{" << op.state_ << ", ";
313
314   if (op.state_ != AsyncIOOp::State::UNINITIALIZED) {
315     os << op.iocb_;
316   }
317
318   if (op.state_ == AsyncIOOp::State::COMPLETED) {
319     os << "result=" << op.result_ << ", ";
320   }
321
322   return os << "}";
323 }
324
325 std::ostream& operator<<(std::ostream& os, AsyncIOOp::State state) {
326   return os << asyncIoOpStateToString(state);
327 }
328
329 }  // namespace folly
330