make folly build on OSX
[folly.git] / folly / experimental / io / AsyncIO.h
index 4abd12e4ebbbf398885cfb9ed5a1f259fc627506..83c37f167ae2838905762b70fa0f78371d96ac02 100644 (file)
 #include <sys/uio.h>
 #include <libaio.h>
 
+#include <atomic>
 #include <cstdint>
+#include <deque>
 #include <functional>
+#include <mutex>
+#include <ostream>
 #include <utility>
 #include <vector>
 
 
 namespace folly {
 
+/**
+ * An AsyncIOOp represents a pending operation.  You may set a notification
+ * callback or you may use this class's methods directly.
+ *
+ * The op must remain allocated until completion.
+ */
+class AsyncIOOp : private boost::noncopyable {
+  friend class AsyncIO;
+  friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
+ public:
+  typedef std::function<void(AsyncIOOp*)> NotificationCallback;
+
+  explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
+  ~AsyncIOOp();
+
+  // There would be a cancel() method here if Linux AIO actually implemented
+  // it.  But let's not get your hopes up.
+
+  enum class State {
+    UNINITIALIZED,
+    INITIALIZED,
+    PENDING,
+    COMPLETED
+  };
+
+  /**
+   * Initiate a read request.
+   */
+  void pread(int fd, void* buf, size_t size, off_t start);
+  void pread(int fd, Range<unsigned char*> range, off_t start);
+  void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
+
+  /**
+   * Initiate a write request.
+   */
+  void pwrite(int fd, const void* buf, size_t size, off_t start);
+  void pwrite(int fd, Range<const unsigned char*> range, off_t start);
+  void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
+
+  /**
+   * Return the current operation state.
+   */
+  State state() const { return state_; }
+
+  /**
+   * Reset the operation for reuse.  It is an error to call reset() on
+   * an Op that is still pending.
+   */
+  void reset(NotificationCallback cb = NotificationCallback());
+
+  void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
+  const NotificationCallback& notificationCallback() const { return cb_; }
+
+  /**
+   * Retrieve the result of this operation.  Returns >=0 on success,
+   * -errno on failure (that is, using the Linux kernel error reporting
+   * conventions).  Use checkKernelError (folly/Exception.h) on the result to
+   * throw a std::system_error in case of error instead.
+   *
+   * It is an error to call this if the Op hasn't yet started or is still
+   * pending.
+   */
+  ssize_t result() const;
+
+ private:
+  void init();
+  void start();
+  void complete(ssize_t result);
+
+  NotificationCallback cb_;
+  iocb iocb_;
+  State state_;
+  ssize_t result_;
+};
+
+std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
+std::ostream& operator<<(std::ostream& stream, AsyncIOOp::State state);
+
 /**
  * C++ interface around Linux Async IO.
  */
 class AsyncIO : private boost::noncopyable {
  public:
+  typedef AsyncIOOp Op;
+
   enum PollMode {
     NOT_POLLABLE,
     POLLABLE
@@ -56,78 +140,16 @@ class AsyncIO : private boost::noncopyable {
    * any IOs on this AsyncIO have completed.  If you do this, you must use
    * pollCompleted() instead of wait() -- do not read from the pollFd()
    * file descriptor directly.
+   *
+   * You may use the same AsyncIO object from multiple threads, as long as
+   * there is only one concurrent caller of wait() / pollCompleted() (perhaps
+   * by always calling it from the same thread, or by providing appropriate
+   * mutual exclusion)  In this case, pending() returns a snapshot
+   * of the current number of pending requests.
    */
   explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
   ~AsyncIO();
 
-  /**
-   * An Op represents a pending operation.  You may inherit from Op (and
-   * override onCompleted) in order to be notified of completion (see
-   * CallbackOp below for an example), or you may use Op's methods directly.
-   *
-   * The Op must remain allocated until completion.
-   */
-  class Op : private boost::noncopyable {
-    friend class AsyncIO;
-   public:
-    Op();
-    virtual ~Op();
-
-    // There would be a cancel() method here if Linux AIO actually implemented
-    // it.  But let's not get your hopes up.
-
-    enum State {
-      UNINITIALIZED,
-      PENDING,
-      COMPLETED
-    };
-
-    /**
-     * Return the current operation state.
-     */
-    State state() const { return state_; }
-
-    /**
-     * Reset the operation for reuse.  It is an error to call reset() on
-     * an Op that is still pending.
-     */
-    void reset();
-
-    /**
-     * Retrieve the result of this operation.  Returns >=0 on success,
-     * -errno on failure (that is, using the Linux kernel error reporting
-     * conventions).  Use checkKernelError (folly/Exception.h) on the result to
-     * throw a std::system_error in case of error instead.
-     *
-     * It is an error to call this if the Op hasn't yet started or is still
-     * pending.
-     */
-    ssize_t result() const;
-
-   private:
-    void start();
-    void complete(ssize_t result);
-
-    virtual void onCompleted();
-
-    State state_;
-    ssize_t result_;
-  };
-
-  /**
-   * Initiate a read request.
-   */
-  void pread(Op* op, int fd, void* buf, size_t size, off_t start);
-  void pread(Op* op, int fd, Range<unsigned char*> range, off_t start);
-  void preadv(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
-
-  /**
-   * Initiate a write request.
-   */
-  void pwrite(Op* op, int fd, const void* buf, size_t size, off_t start);
-  void pwrite(Op* op, int fd, Range<const unsigned char*> range, off_t start);
-  void pwritev(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
-
   /**
    * Wait for at least minRequests to complete.  Returns the requests that
    * have completed; the returned range is valid until the next call to
@@ -160,33 +182,64 @@ class AsyncIO : private boost::noncopyable {
    */
   Range<Op**> pollCompleted();
 
+  /**
+   * Submit an op for execution.
+   */
+  void submit(Op* op);
+
  private:
+  void decrementPending();
   void initializeContext();
-  void submit(Op* op, iocb* cb);
+
   Range<Op**> doWait(size_t minRequests, size_t maxRequests);
 
   io_context_t ctx_;
-  size_t pending_;
-  size_t capacity_;
+  std::atomic<bool> ctxSet_;
+  std::mutex initMutex_;
+
+  std::atomic<ssize_t> pending_;
+  const ssize_t capacity_;
   int pollFd_;
   std::vector<Op*> completed_;
 };
 
 /**
- * Implementation of AsyncIO::Op that calls a callback and then deletes
- * itself.
+ * Wrapper around AsyncIO that allows you to schedule more requests than
+ * the AsyncIO's object capacity.  Other requests are queued and processed
+ * in a FIFO order.
  */
-class CallbackOp : public AsyncIO::Op {
+class AsyncIOQueue {
  public:
-  typedef std::function<void(ssize_t)> Callback;
-  static CallbackOp* make(Callback&& callback);
+  /**
+   * Create a queue, using the given AsyncIO object.
+   * The AsyncIO object may not be used by anything else until the
+   * queue is destroyed.
+   */
+  explicit AsyncIOQueue(AsyncIO* asyncIO);
+  ~AsyncIOQueue();
 
+  size_t queued() const { return queue_.size(); }
+
+  /**
+   * Submit an op to the AsyncIO queue.  The op will be queued until
+   * the AsyncIO object has room.
+   */
+  void submit(AsyncIOOp* op);
+
+  /**
+   * Submit a delayed op to the AsyncIO queue; this allows you to postpone
+   * creation of the Op (which may require allocating memory, etc) until
+   * the AsyncIO object has room.
+   */
+  typedef std::function<AsyncIOOp*()> OpFactory;
+  void submit(OpFactory op);
  private:
-  explicit CallbackOp(Callback&& callback);
-  ~CallbackOp();
-  void onCompleted() FOLLY_OVERRIDE;
+  void onCompleted(AsyncIOOp* op);
+  void maybeDequeue();
+
+  AsyncIO* asyncIO_;
 
-  Callback callback_;
+  std::deque<OpFactory> queue_;
 };
 
 }  // namespace folly