Adds writer test case for RCU
[folly.git] / folly / experimental / io / AsyncIO.h
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <sys/types.h>
20
21 #include <atomic>
22 #include <cstdint>
23 #include <deque>
24 #include <functional>
25 #include <iosfwd>
26 #include <mutex>
27 #include <utility>
28 #include <vector>
29
30 #include <boost/noncopyable.hpp>
31 #include <libaio.h>
32
33 #include <folly/Portability.h>
34 #include <folly/Range.h>
35 #include <folly/portability/SysUio.h>
36
37 namespace folly {
38
39 /**
40  * An AsyncIOOp represents a pending operation.  You may set a notification
41  * callback or you may use this class's methods directly.
42  *
43  * The op must remain allocated until it is completed or canceled.
44  */
45 class AsyncIOOp : private boost::noncopyable {
46   friend class AsyncIO;
47   friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
48
49  public:
50   typedef std::function<void(AsyncIOOp*)> NotificationCallback;
51
52   explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
53   ~AsyncIOOp();
54
55   enum class State {
56     UNINITIALIZED,
57     INITIALIZED,
58     PENDING,
59     COMPLETED,
60     CANCELED,
61   };
62
63   /**
64    * Initiate a read request.
65    */
66   void pread(int fd, void* buf, size_t size, off_t start);
67   void pread(int fd, Range<unsigned char*> range, off_t start);
68   void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
69
70   /**
71    * Initiate a write request.
72    */
73   void pwrite(int fd, const void* buf, size_t size, off_t start);
74   void pwrite(int fd, Range<const unsigned char*> range, off_t start);
75   void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
76
77   /**
78    * Return the current operation state.
79    */
80   State state() const {
81     return state_;
82   }
83
84   /**
85    * Reset the operation for reuse.  It is an error to call reset() on
86    * an Op that is still pending.
87    */
88   void reset(NotificationCallback cb = NotificationCallback());
89
90   void setNotificationCallback(NotificationCallback cb) {
91     cb_ = std::move(cb);
92   }
93   const NotificationCallback& notificationCallback() const {
94     return cb_;
95   }
96
97   /**
98    * Retrieve the result of this operation.  Returns >=0 on success,
99    * -errno on failure (that is, using the Linux kernel error reporting
100    * conventions).  Use checkKernelError (folly/Exception.h) on the result to
101    * throw a std::system_error in case of error instead.
102    *
103    * It is an error to call this if the Op hasn't completed.
104    */
105   ssize_t result() const;
106
107  private:
108   void init();
109   void start();
110   void complete(ssize_t result);
111   void cancel();
112
113   NotificationCallback cb_;
114   iocb iocb_;
115   State state_;
116   ssize_t result_;
117 };
118
119 std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
120 std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state);
121
122 /**
123  * C++ interface around Linux Async IO.
124  */
125 class AsyncIO : private boost::noncopyable {
126  public:
127   typedef AsyncIOOp Op;
128
129   enum PollMode {
130     NOT_POLLABLE,
131     POLLABLE,
132   };
133
134   /**
135    * Create an AsyncIO context capable of holding at most 'capacity' pending
136    * requests at the same time.  As requests complete, others can be scheduled,
137    * as long as this limit is not exceeded.
138    *
139    * Note: the maximum number of allowed concurrent requests is controlled
140    * by the fs.aio-max-nr sysctl, the default value is usually 64K.
141    *
142    * If pollMode is POLLABLE, pollFd() will return a file descriptor that
143    * can be passed to poll / epoll / select and will become readable when
144    * any IOs on this AsyncIO have completed.  If you do this, you must use
145    * pollCompleted() instead of wait() -- do not read from the pollFd()
146    * file descriptor directly.
147    *
148    * You may use the same AsyncIO object from multiple threads, as long as
149    * there is only one concurrent caller of wait() / pollCompleted() / cancel()
150    * (perhaps by always calling it from the same thread, or by providing
151    * appropriate mutual exclusion).  In this case, pending() returns a snapshot
152    * of the current number of pending requests.
153    */
154   explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE);
155   ~AsyncIO();
156
157   /**
158    * Wait for at least minRequests to complete.  Returns the requests that
159    * have completed; the returned range is valid until the next call to
160    * wait().  minRequests may be 0 to not block.
161    */
162   Range<Op**> wait(size_t minRequests);
163
164   /**
165    * Cancel all pending requests and return them; the returned range is
166    * valid until the next call to cancel().
167    */
168   Range<Op**> cancel();
169
170   /**
171    * Return the number of pending requests.
172    */
173   size_t pending() const {
174     return pending_;
175   }
176
177   /**
178    * Return the maximum number of requests that can be kept outstanding
179    * at any one time.
180    */
181   size_t capacity() const {
182     return capacity_;
183   }
184
185   /**
186    * Return the accumulative number of submitted I/O, since this object
187    * has been created.
188    */
189   size_t totalSubmits() const {
190     return submitted_;
191   }
192
193   /**
194    * If POLLABLE, return a file descriptor that can be passed to poll / epoll
195    * and will become readable when any async IO operations have completed.
196    * If NOT_POLLABLE, return -1.
197    */
198   int pollFd() const {
199     return pollFd_;
200   }
201
202   /**
203    * If POLLABLE, call instead of wait after the file descriptor returned
204    * by pollFd() became readable.  The returned range is valid until the next
205    * call to pollCompleted().
206    */
207   Range<Op**> pollCompleted();
208
209   /**
210    * Submit an op for execution.
211    */
212   void submit(Op* op);
213
214  private:
215   void decrementPending();
216   void initializeContext();
217
218   enum class WaitType { COMPLETE, CANCEL };
219   Range<AsyncIO::Op**> doWait(
220       WaitType type,
221       size_t minRequests,
222       size_t maxRequests,
223       std::vector<Op*>& result);
224
225   io_context_t ctx_{nullptr};
226   std::atomic<bool> ctxSet_{false};
227   std::mutex initMutex_;
228
229   std::atomic<size_t> pending_{0};
230   std::atomic<size_t> submitted_{0};
231   const size_t capacity_;
232   int pollFd_{-1};
233   std::vector<Op*> completed_;
234   std::vector<Op*> canceled_;
235 };
236
237 /**
238  * Wrapper around AsyncIO that allows you to schedule more requests than
239  * the AsyncIO's object capacity.  Other requests are queued and processed
240  * in a FIFO order.
241  */
242 class AsyncIOQueue {
243  public:
244   /**
245    * Create a queue, using the given AsyncIO object.
246    * The AsyncIO object may not be used by anything else until the
247    * queue is destroyed.
248    */
249   explicit AsyncIOQueue(AsyncIO* asyncIO);
250   ~AsyncIOQueue();
251
252   size_t queued() const {
253     return queue_.size();
254   }
255
256   /**
257    * Submit an op to the AsyncIO queue.  The op will be queued until
258    * the AsyncIO object has room.
259    */
260   void submit(AsyncIOOp* op);
261
262   /**
263    * Submit a delayed op to the AsyncIO queue; this allows you to postpone
264    * creation of the Op (which may require allocating memory, etc) until
265    * the AsyncIO object has room.
266    */
267   typedef std::function<AsyncIOOp*()> OpFactory;
268   void submit(OpFactory op);
269
270  private:
271   void onCompleted(AsyncIOOp* op);
272   void maybeDequeue();
273
274   AsyncIO* asyncIO_;
275
276   std::deque<OpFactory> queue_;
277 };
278
279 } // namespace folly