3c84ebb0c4f2be331669ffb88b379fdfb8fbbfc3
[folly.git] / folly / experimental / io / AsyncIO.h
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_IO_ASYNCIO_H_
18 #define FOLLY_IO_ASYNCIO_H_
19
20 #include <sys/types.h>
21 #include <sys/uio.h>
22 #include <libaio.h>
23
24 #include <cstdint>
25 #include <deque>
26 #include <functional>
27 #include <utility>
28 #include <vector>
29
30 #include <boost/noncopyable.hpp>
31
32 #include "folly/Portability.h"
33 #include "folly/Range.h"
34
35 namespace folly {
36
37 /**
38  * An AsyncIOOp represents a pending operation.  You may set a notification
39  * callback or you may use this class's methods directly.
40  *
41  * The op must remain allocated until completion.
42  */
43 class AsyncIOOp : private boost::noncopyable {
44   friend class AsyncIO;
45  public:
46   typedef std::function<void(AsyncIOOp*)> NotificationCallback;
47
48   explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
49   ~AsyncIOOp();
50
51   // There would be a cancel() method here if Linux AIO actually implemented
52   // it.  But let's not get your hopes up.
53
54   enum State {
55     UNINITIALIZED,
56     INITIALIZED,
57     PENDING,
58     COMPLETED
59   };
60
61   /**
62    * Initiate a read request.
63    */
64   void pread(int fd, void* buf, size_t size, off_t start);
65   void pread(int fd, Range<unsigned char*> range, off_t start);
66   void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
67
68   /**
69    * Initiate a write request.
70    */
71   void pwrite(int fd, const void* buf, size_t size, off_t start);
72   void pwrite(int fd, Range<const unsigned char*> range, off_t start);
73   void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
74
75   /**
76    * Return the current operation state.
77    */
78   State state() const { return state_; }
79
80   /**
81    * Reset the operation for reuse.  It is an error to call reset() on
82    * an Op that is still pending.
83    */
84   void reset(NotificationCallback cb = NotificationCallback());
85
86   void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
87   const NotificationCallback& notificationCallback() const { return cb_; }
88
89   /**
90    * Retrieve the result of this operation.  Returns >=0 on success,
91    * -errno on failure (that is, using the Linux kernel error reporting
92    * conventions).  Use checkKernelError (folly/Exception.h) on the result to
93    * throw a std::system_error in case of error instead.
94    *
95    * It is an error to call this if the Op hasn't yet started or is still
96    * pending.
97    */
98   ssize_t result() const;
99
100  private:
101   void init();
102   void start();
103   void complete(ssize_t result);
104
105   NotificationCallback cb_;
106   iocb iocb_;
107   State state_;
108   ssize_t result_;
109 };
110
111 /**
112  * C++ interface around Linux Async IO.
113  */
114 class AsyncIO : private boost::noncopyable {
115  public:
116   typedef AsyncIOOp Op;
117
118   enum PollMode {
119     NOT_POLLABLE,
120     POLLABLE
121   };
122
123   /**
124    * Create an AsyncIO context capable of holding at most 'capacity' pending
125    * requests at the same time.  As requests complete, others can be scheduled,
126    * as long as this limit is not exceeded.
127    *
128    * Note: the maximum number of allowed concurrent requests is controlled
129    * by the fs.aio-max-nr sysctl, the default value is usually 64K.
130    *
131    * If pollMode is POLLABLE, pollFd() will return a file descriptor that
132    * can be passed to poll / epoll / select and will become readable when
133    * any IOs on this AsyncIO have completed.  If you do this, you must use
134    * pollCompleted() instead of wait() -- do not read from the pollFd()
135    * file descriptor directly.
136    */
137   explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
138   ~AsyncIO();
139
140   /**
141    * Wait for at least minRequests to complete.  Returns the requests that
142    * have completed; the returned range is valid until the next call to
143    * wait().  minRequests may be 0 to not block.
144    */
145   Range<Op**> wait(size_t minRequests);
146
147   /**
148    * Return the number of pending requests.
149    */
150   size_t pending() const { return pending_; }
151
152   /**
153    * Return the maximum number of requests that can be kept outstanding
154    * at any one time.
155    */
156   size_t capacity() const { return capacity_; }
157
158   /**
159    * If POLLABLE, return a file descriptor that can be passed to poll / epoll
160    * and will become readable when any async IO operations have completed.
161    * If NOT_POLLABLE, return -1.
162    */
163   int pollFd() const { return pollFd_; }
164
165   /**
166    * If POLLABLE, call instead of wait after the file descriptor returned
167    * by pollFd() became readable.  The returned range is valid until the next
168    * call to pollCompleted().
169    */
170   Range<Op**> pollCompleted();
171
172   /**
173    * Submit an op for execution.
174    */
175   void submit(Op* op);
176
177  private:
178   void initializeContext();
179   Range<Op**> doWait(size_t minRequests, size_t maxRequests);
180
181   io_context_t ctx_;
182   size_t pending_;
183   const size_t capacity_;
184   int pollFd_;
185   std::vector<Op*> completed_;
186 };
187
188 /**
189  * Wrapper around AsyncIO that allows you to schedule more requests than
190  * the AsyncIO's object capacity.  Other requests are queued and processed
191  * in a FIFO order.
192  */
193 class AsyncIOQueue {
194  public:
195   /**
196    * Create a queue, using the given AsyncIO object.
197    * The AsyncIO object may not be used by anything else until the
198    * queue is destroyed.
199    */
200   explicit AsyncIOQueue(AsyncIO* asyncIO);
201   ~AsyncIOQueue();
202
203   size_t queued() const { return queue_.size(); }
204
205   /**
206    * Submit an op to the AsyncIO queue.  The op will be queued until
207    * the AsyncIO object has room.
208    */
209   void submit(AsyncIOOp* op);
210
211   /**
212    * Submit a delayed op to the AsyncIO queue; this allows you to postpone
213    * creation of the Op (which may require allocating memory, etc) until
214    * the AsyncIO object has room.
215    */
216   typedef std::function<AsyncIOOp*()> OpFactory;
217   void submit(OpFactory op);
218  private:
219   void onCompleted(AsyncIOOp* op);
220   void maybeDequeue();
221
222   AsyncIO* asyncIO_;
223
224   std::deque<OpFactory> queue_;
225 };
226
227 }  // namespace folly
228
229 #endif /* FOLLY_IO_ASYNCIO_H_ */
230