folly: replace old-style header guards with "pragma once"
[folly.git] / folly / experimental / io / AsyncIO.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <sys/types.h>
20 #include <sys/uio.h>
21 #include <libaio.h>
22
23 #include <atomic>
24 #include <cstdint>
25 #include <deque>
26 #include <functional>
27 #include <iosfwd>
28 #include <mutex>
29 #include <utility>
30 #include <vector>
31
32 #include <boost/noncopyable.hpp>
33
34 #include <folly/Portability.h>
35 #include <folly/Range.h>
36
37 namespace folly {
38
39 /**
40  * An AsyncIOOp represents a pending operation.  You may set a notification
41  * callback or you may use this class's methods directly.
42  *
43  * The op must remain allocated until completion.
44  */
45 class AsyncIOOp : private boost::noncopyable {
46   friend class AsyncIO;
47   friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
48  public:
49   typedef std::function<void(AsyncIOOp*)> NotificationCallback;
50
51   explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
52   ~AsyncIOOp();
53
54   // There would be a cancel() method here if Linux AIO actually implemented
55   // it.  But let's not get your hopes up.
56
57   enum class State {
58     UNINITIALIZED,
59     INITIALIZED,
60     PENDING,
61     COMPLETED
62   };
63
64   /**
65    * Initiate a read request.
66    */
67   void pread(int fd, void* buf, size_t size, off_t start);
68   void pread(int fd, Range<unsigned char*> range, off_t start);
69   void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
70
71   /**
72    * Initiate a write request.
73    */
74   void pwrite(int fd, const void* buf, size_t size, off_t start);
75   void pwrite(int fd, Range<const unsigned char*> range, off_t start);
76   void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
77
78   /**
79    * Return the current operation state.
80    */
81   State state() const { return state_; }
82
83   /**
84    * Reset the operation for reuse.  It is an error to call reset() on
85    * an Op that is still pending.
86    */
87   void reset(NotificationCallback cb = NotificationCallback());
88
89   void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
90   const NotificationCallback& notificationCallback() const { return cb_; }
91
92   /**
93    * Retrieve the result of this operation.  Returns >=0 on success,
94    * -errno on failure (that is, using the Linux kernel error reporting
95    * conventions).  Use checkKernelError (folly/Exception.h) on the result to
96    * throw a std::system_error in case of error instead.
97    *
98    * It is an error to call this if the Op hasn't yet started or is still
99    * pending.
100    */
101   ssize_t result() const;
102
103  private:
104   void init();
105   void start();
106   void complete(ssize_t result);
107
108   NotificationCallback cb_;
109   iocb iocb_;
110   State state_;
111   ssize_t result_;
112 };
113
114 std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
115 std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state);
116
117 /**
118  * C++ interface around Linux Async IO.
119  */
120 class AsyncIO : private boost::noncopyable {
121  public:
122   typedef AsyncIOOp Op;
123
124   enum PollMode {
125     NOT_POLLABLE,
126     POLLABLE
127   };
128
129   /**
130    * Create an AsyncIO context capable of holding at most 'capacity' pending
131    * requests at the same time.  As requests complete, others can be scheduled,
132    * as long as this limit is not exceeded.
133    *
134    * Note: the maximum number of allowed concurrent requests is controlled
135    * by the fs.aio-max-nr sysctl, the default value is usually 64K.
136    *
137    * If pollMode is POLLABLE, pollFd() will return a file descriptor that
138    * can be passed to poll / epoll / select and will become readable when
139    * any IOs on this AsyncIO have completed.  If you do this, you must use
140    * pollCompleted() instead of wait() -- do not read from the pollFd()
141    * file descriptor directly.
142    *
143    * You may use the same AsyncIO object from multiple threads, as long as
144    * there is only one concurrent caller of wait() / pollCompleted() (perhaps
145    * by always calling it from the same thread, or by providing appropriate
146    * mutual exclusion)  In this case, pending() returns a snapshot
147    * of the current number of pending requests.
148    */
149   explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
150   ~AsyncIO();
151
152   /**
153    * Wait for at least minRequests to complete.  Returns the requests that
154    * have completed; the returned range is valid until the next call to
155    * wait().  minRequests may be 0 to not block.
156    */
157   Range<Op**> wait(size_t minRequests);
158
159   /**
160    * Return the number of pending requests.
161    */
162   size_t pending() const { return pending_; }
163
164   /**
165    * Return the maximum number of requests that can be kept outstanding
166    * at any one time.
167    */
168   size_t capacity() const { return capacity_; }
169
170   /**
171    * Return the accumulative number of submitted I/O, since this object
172    * has been created.
173    */
174   size_t totalSubmits() const { return submitted_; }
175
176   /**
177    * If POLLABLE, return a file descriptor that can be passed to poll / epoll
178    * and will become readable when any async IO operations have completed.
179    * If NOT_POLLABLE, return -1.
180    */
181   int pollFd() const { return pollFd_; }
182
183   /**
184    * If POLLABLE, call instead of wait after the file descriptor returned
185    * by pollFd() became readable.  The returned range is valid until the next
186    * call to pollCompleted().
187    */
188   Range<Op**> pollCompleted();
189
190   /**
191    * Submit an op for execution.
192    */
193   void submit(Op* op);
194
195  private:
196   void decrementPending();
197   void initializeContext();
198
199   Range<Op**> doWait(size_t minRequests, size_t maxRequests);
200
201   io_context_t ctx_;
202   std::atomic<bool> ctxSet_;
203   std::mutex initMutex_;
204
205   std::atomic<size_t> pending_;
206   std::atomic<size_t> submitted_;
207   const size_t capacity_;
208   int pollFd_;
209   std::vector<Op*> completed_;
210 };
211
212 /**
213  * Wrapper around AsyncIO that allows you to schedule more requests than
214  * the AsyncIO's object capacity.  Other requests are queued and processed
215  * in a FIFO order.
216  */
217 class AsyncIOQueue {
218  public:
219   /**
220    * Create a queue, using the given AsyncIO object.
221    * The AsyncIO object may not be used by anything else until the
222    * queue is destroyed.
223    */
224   explicit AsyncIOQueue(AsyncIO* asyncIO);
225   ~AsyncIOQueue();
226
227   size_t queued() const { return queue_.size(); }
228
229   /**
230    * Submit an op to the AsyncIO queue.  The op will be queued until
231    * the AsyncIO object has room.
232    */
233   void submit(AsyncIOOp* op);
234
235   /**
236    * Submit a delayed op to the AsyncIO queue; this allows you to postpone
237    * creation of the Op (which may require allocating memory, etc) until
238    * the AsyncIO object has room.
239    */
240   typedef std::function<AsyncIOOp*()> OpFactory;
241   void submit(OpFactory op);
242
243  private:
244   void onCompleted(AsyncIOOp* op);
245   void maybeDequeue();
246
247   AsyncIO* asyncIO_;
248
249   std::deque<OpFactory> queue_;
250 };
251
252 }  // namespace folly