folly copyright 2015 -> copyright 2016
[folly.git] / folly / experimental / io / AsyncIO.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_IO_ASYNCIO_H_
18 #define FOLLY_IO_ASYNCIO_H_
19
20 #include <sys/types.h>
21 #include <sys/uio.h>
22 #include <libaio.h>
23
24 #include <atomic>
25 #include <cstdint>
26 #include <deque>
27 #include <functional>
28 #include <iosfwd>
29 #include <mutex>
30 #include <utility>
31 #include <vector>
32
33 #include <boost/noncopyable.hpp>
34
35 #include <folly/Portability.h>
36 #include <folly/Range.h>
37
38 namespace folly {
39
40 /**
41  * An AsyncIOOp represents a pending operation.  You may set a notification
42  * callback or you may use this class's methods directly.
43  *
44  * The op must remain allocated until completion.
45  */
46 class AsyncIOOp : private boost::noncopyable {
47   friend class AsyncIO;
48   friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
49  public:
50   typedef std::function<void(AsyncIOOp*)> NotificationCallback;
51
52   explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
53   ~AsyncIOOp();
54
55   // There would be a cancel() method here if Linux AIO actually implemented
56   // it.  But let's not get your hopes up.
57
58   enum class State {
59     UNINITIALIZED,
60     INITIALIZED,
61     PENDING,
62     COMPLETED
63   };
64
65   /**
66    * Initiate a read request.
67    */
68   void pread(int fd, void* buf, size_t size, off_t start);
69   void pread(int fd, Range<unsigned char*> range, off_t start);
70   void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
71
72   /**
73    * Initiate a write request.
74    */
75   void pwrite(int fd, const void* buf, size_t size, off_t start);
76   void pwrite(int fd, Range<const unsigned char*> range, off_t start);
77   void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
78
79   /**
80    * Return the current operation state.
81    */
82   State state() const { return state_; }
83
84   /**
85    * Reset the operation for reuse.  It is an error to call reset() on
86    * an Op that is still pending.
87    */
88   void reset(NotificationCallback cb = NotificationCallback());
89
90   void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
91   const NotificationCallback& notificationCallback() const { return cb_; }
92
93   /**
94    * Retrieve the result of this operation.  Returns >=0 on success,
95    * -errno on failure (that is, using the Linux kernel error reporting
96    * conventions).  Use checkKernelError (folly/Exception.h) on the result to
97    * throw a std::system_error in case of error instead.
98    *
99    * It is an error to call this if the Op hasn't yet started or is still
100    * pending.
101    */
102   ssize_t result() const;
103
104  private:
105   void init();
106   void start();
107   void complete(ssize_t result);
108
109   NotificationCallback cb_;
110   iocb iocb_;
111   State state_;
112   ssize_t result_;
113 };
114
115 std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
116 std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state);
117
118 /**
119  * C++ interface around Linux Async IO.
120  */
121 class AsyncIO : private boost::noncopyable {
122  public:
123   typedef AsyncIOOp Op;
124
125   enum PollMode {
126     NOT_POLLABLE,
127     POLLABLE
128   };
129
130   /**
131    * Create an AsyncIO context capable of holding at most 'capacity' pending
132    * requests at the same time.  As requests complete, others can be scheduled,
133    * as long as this limit is not exceeded.
134    *
135    * Note: the maximum number of allowed concurrent requests is controlled
136    * by the fs.aio-max-nr sysctl, the default value is usually 64K.
137    *
138    * If pollMode is POLLABLE, pollFd() will return a file descriptor that
139    * can be passed to poll / epoll / select and will become readable when
140    * any IOs on this AsyncIO have completed.  If you do this, you must use
141    * pollCompleted() instead of wait() -- do not read from the pollFd()
142    * file descriptor directly.
143    *
144    * You may use the same AsyncIO object from multiple threads, as long as
145    * there is only one concurrent caller of wait() / pollCompleted() (perhaps
146    * by always calling it from the same thread, or by providing appropriate
147    * mutual exclusion)  In this case, pending() returns a snapshot
148    * of the current number of pending requests.
149    */
150   explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
151   ~AsyncIO();
152
153   /**
154    * Wait for at least minRequests to complete.  Returns the requests that
155    * have completed; the returned range is valid until the next call to
156    * wait().  minRequests may be 0 to not block.
157    */
158   Range<Op**> wait(size_t minRequests);
159
160   /**
161    * Return the number of pending requests.
162    */
163   size_t pending() const { return pending_; }
164
165   /**
166    * Return the maximum number of requests that can be kept outstanding
167    * at any one time.
168    */
169   size_t capacity() const { return capacity_; }
170
171   /**
172    * Return the accumulative number of submitted I/O, since this object
173    * has been created.
174    */
175   size_t totalSubmits() const { return submitted_; }
176
177   /**
178    * If POLLABLE, return a file descriptor that can be passed to poll / epoll
179    * and will become readable when any async IO operations have completed.
180    * If NOT_POLLABLE, return -1.
181    */
182   int pollFd() const { return pollFd_; }
183
184   /**
185    * If POLLABLE, call instead of wait after the file descriptor returned
186    * by pollFd() became readable.  The returned range is valid until the next
187    * call to pollCompleted().
188    */
189   Range<Op**> pollCompleted();
190
191   /**
192    * Submit an op for execution.
193    */
194   void submit(Op* op);
195
196  private:
197   void decrementPending();
198   void initializeContext();
199
200   Range<Op**> doWait(size_t minRequests, size_t maxRequests);
201
202   io_context_t ctx_;
203   std::atomic<bool> ctxSet_;
204   std::mutex initMutex_;
205
206   std::atomic<size_t> pending_;
207   std::atomic<size_t> submitted_;
208   const size_t capacity_;
209   int pollFd_;
210   std::vector<Op*> completed_;
211 };
212
213 /**
214  * Wrapper around AsyncIO that allows you to schedule more requests than
215  * the AsyncIO's object capacity.  Other requests are queued and processed
216  * in a FIFO order.
217  */
218 class AsyncIOQueue {
219  public:
220   /**
221    * Create a queue, using the given AsyncIO object.
222    * The AsyncIO object may not be used by anything else until the
223    * queue is destroyed.
224    */
225   explicit AsyncIOQueue(AsyncIO* asyncIO);
226   ~AsyncIOQueue();
227
228   size_t queued() const { return queue_.size(); }
229
230   /**
231    * Submit an op to the AsyncIO queue.  The op will be queued until
232    * the AsyncIO object has room.
233    */
234   void submit(AsyncIOOp* op);
235
236   /**
237    * Submit a delayed op to the AsyncIO queue; this allows you to postpone
238    * creation of the Op (which may require allocating memory, etc) until
239    * the AsyncIO object has room.
240    */
241   typedef std::function<AsyncIOOp*()> OpFactory;
242   void submit(OpFactory op);
243
244  private:
245   void onCompleted(AsyncIOOp* op);
246   void maybeDequeue();
247
248   AsyncIO* asyncIO_;
249
250   std::deque<OpFactory> queue_;
251 };
252
253 }  // namespace folly
254
255 #endif /* FOLLY_IO_ASYNCIO_H_ */