2080b72038e8c15de1e2e8e9cc2d4eeedd844786
[folly.git] / folly / experimental / io / AsyncIO.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/AsyncIO.h"
18
19 #include <cerrno>
20
21 #include <glog/logging.h>
22
23 #include "folly/Exception.h"
24 #include "folly/Likely.h"
25 #include "folly/String.h"
26 #include "folly/eventfd.h"
27
28 namespace folly {
29
30 AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
31   : ctx_(0),
32     pending_(0),
33     capacity_(capacity),
34     pollFd_(-1) {
35   if (UNLIKELY(capacity_ == 0)) {
36     throw std::out_of_range("AsyncIO: capacity must not be 0");
37   }
38   completed_.reserve(capacity_);
39   if (pollMode == POLLABLE) {
40     pollFd_ = eventfd(0, EFD_NONBLOCK);
41     checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
42   }
43 }
44
45 AsyncIO::~AsyncIO() {
46   CHECK_EQ(pending_, 0);
47   if (ctx_) {
48     int rc = io_queue_release(ctx_);
49     CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
50   }
51   if (pollFd_ != -1) {
52     CHECK_ERR(close(pollFd_));
53   }
54 }
55
56 void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
57   iocb cb;
58   io_prep_pread(&cb, fd, buf, size, start);
59   submit(op, &cb);
60 }
61
62 void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
63                     off_t start) {
64   pread(op, fd, range.begin(), range.size(), start);
65 }
66
67 void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
68                      off_t start) {
69   iocb cb;
70   io_prep_preadv(&cb, fd, iov, iovcnt, start);
71   submit(op, &cb);
72 }
73
74 void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
75                      off_t start) {
76   iocb cb;
77   io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
78   submit(op, &cb);
79 }
80
81 void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
82                      off_t start) {
83   pwrite(op, fd, range.begin(), range.size(), start);
84 }
85
86 void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
87                       off_t start) {
88   iocb cb;
89   io_prep_pwritev(&cb, fd, iov, iovcnt, start);
90   submit(op, &cb);
91 }
92
93 void AsyncIO::initializeContext() {
94   if (!ctx_) {
95     int rc = io_queue_init(capacity_, &ctx_);
96     // returns negative errno
97     checkKernelError(rc, "AsyncIO: io_queue_init failed");
98     DCHECK(ctx_);
99   }
100 }
101
102 void AsyncIO::submit(Op* op, iocb* cb) {
103   if (UNLIKELY(pending_ >= capacity_)) {
104     throw std::out_of_range("AsyncIO: too many pending requests");
105   }
106   if (UNLIKELY(op->state() != Op::UNINITIALIZED)) {
107     throw std::logic_error("AsyncIO: Invalid Op state in submit");
108   }
109   initializeContext();  // on demand
110   cb->data = op;
111   if (pollFd_ != -1) {
112     io_set_eventfd(cb, pollFd_);
113   }
114   int rc = io_submit(ctx_, 1, &cb);
115   checkKernelError(rc, "AsyncIO: io_submit failed");
116   DCHECK_EQ(rc, 1);
117   op->start();
118   ++pending_;
119 }
120
121 Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
122   if (UNLIKELY(!ctx_)) {
123     throw std::logic_error("AsyncIO: wait called with no requests");
124   }
125   if (UNLIKELY(pollFd_ != -1)) {
126     throw std::logic_error("AsyncIO: wait not allowed on pollable object");
127   }
128   return doWait(minRequests, pending_);
129 }
130
131 Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
132   if (UNLIKELY(!ctx_)) {
133     throw std::logic_error("AsyncIO: pollCompleted called with no requests");
134   }
135   if (UNLIKELY(pollFd_ == -1)) {
136     throw std::logic_error(
137         "AsyncIO: pollCompleted not allowed on non-pollable object");
138   }
139   uint64_t numEvents;
140   // This sets the eventFd counter to 0, see
141   // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
142   ssize_t rc;
143   do {
144     rc = ::read(pollFd_, &numEvents, 8);
145   } while (rc == -1 && errno == EINTR);
146   if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
147     return Range<Op**>();  // nothing completed
148   }
149   checkUnixError(rc, "AsyncIO: read from event fd failed");
150   DCHECK_EQ(rc, 8);
151
152   DCHECK_GT(numEvents, 0);
153   DCHECK_LE(numEvents, pending_);
154
155   // Don't reap more than numEvents, as we've just reset the counter to 0.
156   return doWait(numEvents, numEvents);
157 }
158
159 Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
160   io_event events[pending_];
161   int count;
162   do {
163     // Wait forever
164     count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
165   } while (count == -EINTR);
166   checkKernelError(count, "AsyncIO: io_getevents failed");
167   DCHECK_GE(count, minRequests);  // the man page says so
168   DCHECK_LE(count, pending_);
169
170   completed_.clear();
171   if (count == 0) {
172     return folly::Range<Op**>();
173   }
174
175   for (size_t i = 0; i < count; ++i) {
176     Op* op = static_cast<Op*>(events[i].data);
177     DCHECK(op);
178     op->complete(events[i].res);
179     completed_.push_back(op);
180   }
181   pending_ -= count;
182
183   return folly::Range<Op**>(&completed_.front(), count);
184 }
185
186 AsyncIO::Op::Op()
187   : state_(UNINITIALIZED),
188     result_(-EINVAL) {
189 }
190
191 void AsyncIO::Op::reset() {
192   if (UNLIKELY(state_ == PENDING)) {
193     throw std::logic_error("AsyncIO: invalid state for reset");
194   }
195   state_ = UNINITIALIZED;
196   result_ = -EINVAL;
197 }
198
199 AsyncIO::Op::~Op() {
200   CHECK_NE(state_, PENDING);
201 }
202
203 void AsyncIO::Op::start() {
204   DCHECK_EQ(state_, UNINITIALIZED);
205   state_ = PENDING;
206 }
207
208 void AsyncIO::Op::complete(ssize_t result) {
209   DCHECK_EQ(state_, PENDING);
210   state_ = COMPLETED;
211   result_ = result;
212   onCompleted();
213 }
214
215 void AsyncIO::Op::onCompleted() { }  // default: do nothing
216
217 ssize_t AsyncIO::Op::result() const {
218   if (UNLIKELY(state_ != COMPLETED)) {
219     throw std::logic_error("AsyncIO: Invalid Op state in result");
220   }
221   return result_;
222 }
223
224 CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
225
226 CallbackOp::~CallbackOp() { }
227
228 CallbackOp* CallbackOp::make(Callback&& callback) {
229   // Ensure created on the heap
230   return new CallbackOp(std::move(callback));
231 }
232
233 void CallbackOp::onCompleted() {
234   callback_(result());
235   delete this;
236 }
237
238 }  // namespace folly
239