return range from AsyncIO::cancel(), fix test
[folly.git] / folly / experimental / io / AsyncIO.h
index 6c9da8d258325d2b1db6c9b71c7631ff476c5b7f..3af437a0b00ab5ef6d6feaee1dbc612ea454d1ea 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#ifndef FOLLY_IO_ASYNCIO_H_
-#define FOLLY_IO_ASYNCIO_H_
+#pragma once
 
 #include <sys/types.h>
-#include <sys/uio.h>
 #include <libaio.h>
 
 #include <atomic>
 #include <cstdint>
 #include <deque>
 #include <functional>
+#include <iosfwd>
 #include <mutex>
-#include <ostream>
 #include <utility>
 #include <vector>
 
 #include <boost/noncopyable.hpp>
 
-#include "folly/Portability.h"
-#include "folly/Range.h"
+#include <folly/Portability.h>
+#include <folly/Range.h>
+#include <folly/portability/SysUio.h>
 
 namespace folly {
 
@@ -41,25 +40,24 @@ namespace folly {
  * An AsyncIOOp represents a pending operation.  You may set a notification
  * callback or you may use this class's methods directly.
  *
- * The op must remain allocated until completion.
+ * The op must remain allocated until it is completed or canceled.
  */
 class AsyncIOOp : private boost::noncopyable {
   friend class AsyncIO;
   friend std::ostream& operator<<(std::ostream& stream, const AsyncIOOp& o);
+
  public:
   typedef std::function<void(AsyncIOOp*)> NotificationCallback;
 
   explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
   ~AsyncIOOp();
 
-  // There would be a cancel() method here if Linux AIO actually implemented
-  // it.  But let's not get your hopes up.
-
   enum class State {
     UNINITIALIZED,
     INITIALIZED,
     PENDING,
-    COMPLETED
+    COMPLETED,
+    CANCELED,
   };
 
   /**
@@ -96,8 +94,7 @@ class AsyncIOOp : private boost::noncopyable {
    * conventions).  Use checkKernelError (folly/Exception.h) on the result to
    * throw a std::system_error in case of error instead.
    *
-   * It is an error to call this if the Op hasn't yet started or is still
-   * pending.
+   * It is an error to call this if the Op hasn't completed.
    */
   ssize_t result() const;
 
@@ -105,6 +102,7 @@ class AsyncIOOp : private boost::noncopyable {
   void init();
   void start();
   void complete(ssize_t result);
+  void cancel();
 
   NotificationCallback cb_;
   iocb iocb_;
@@ -124,7 +122,7 @@ class AsyncIO : private boost::noncopyable {
 
   enum PollMode {
     NOT_POLLABLE,
-    POLLABLE
+    POLLABLE,
   };
 
   /**
@@ -142,12 +140,12 @@ class AsyncIO : private boost::noncopyable {
    * file descriptor directly.
    *
    * You may use the same AsyncIO object from multiple threads, as long as
-   * there is only one concurrent caller of wait() / pollCompleted() (perhaps
-   * by always calling it from the same thread, or by providing appropriate
-   * mutual exclusion)  In this case, pending() returns a snapshot
+   * there is only one concurrent caller of wait() / pollCompleted() / cancel()
+   * (perhaps by always calling it from the same thread, or by providing
+   * appropriate mutual exclusion).  In this case, pending() returns a snapshot
    * of the current number of pending requests.
    */
-  explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
+  explicit AsyncIO(size_t capacity, PollMode pollMode = NOT_POLLABLE);
   ~AsyncIO();
 
   /**
@@ -157,6 +155,12 @@ class AsyncIO : private boost::noncopyable {
    */
   Range<Op**> wait(size_t minRequests);
 
+  /**
+   * Cancel all pending requests and return them; the returned range is
+   * valid until the next call to cancel().
+   */
+  Range<Op**> cancel();
+
   /**
    * Return the number of pending requests.
    */
@@ -168,6 +172,12 @@ class AsyncIO : private boost::noncopyable {
    */
   size_t capacity() const { return capacity_; }
 
+  /**
+   * Return the accumulative number of submitted I/O, since this object
+   * has been created.
+   */
+  size_t totalSubmits() const { return submitted_; }
+
   /**
    * If POLLABLE, return a file descriptor that can be passed to poll / epoll
    * and will become readable when any async IO operations have completed.
@@ -191,16 +201,23 @@ class AsyncIO : private boost::noncopyable {
   void decrementPending();
   void initializeContext();
 
-  Range<Op**> doWait(size_t minRequests, size_t maxRequests);
+  enum class WaitType { COMPLETE, CANCEL };
+  Range<AsyncIO::Op**> doWait(
+      WaitType type,
+      size_t minRequests,
+      size_t maxRequests,
+      std::vector<Op*>& result);
 
-  io_context_t ctx_;
-  std::atomic<bool> ctxSet_;
+  io_context_t ctx_{nullptr};
+  std::atomic<bool> ctxSet_{false};
   std::mutex initMutex_;
 
-  std::atomic<ssize_t> pending_;
-  const ssize_t capacity_;
-  int pollFd_;
+  std::atomic<size_t> pending_{0};
+  std::atomic<size_t> submitted_{0};
+  const size_t capacity_;
+  int pollFd_{-1};
   std::vector<Op*> completed_;
+  std::vector<Op*> canceled_;
 };
 
 /**
@@ -233,6 +250,7 @@ class AsyncIOQueue {
    */
   typedef std::function<AsyncIOOp*()> OpFactory;
   void submit(OpFactory op);
+
  private:
   void onCompleted(AsyncIOOp* op);
   void maybeDequeue();
@@ -243,6 +261,3 @@ class AsyncIOQueue {
 };
 
 }  // namespace folly
-
-#endif /* FOLLY_IO_ASYNCIO_H_ */
-