AsyncIO in folly
authorTudor Bosman <tudorb@fb.com>
Tue, 5 Feb 2013 02:38:14 +0000 (18:38 -0800)
committerJordan DeLong <jdelong@fb.com>
Tue, 19 Mar 2013 00:05:03 +0000 (17:05 -0700)
Summary:
Interface extended and cleaned up.  Also, now
actually allows you to retrieve IO errors.  Also moved some useful functions
out of Subprocess.cpp into a separate header file.

Test Plan: async_io_test, subprocess_test

Reviewed By: philipp@fb.com

FB internal diff: D698412

folly/Exception.h [new file with mode: 0644]
folly/Subprocess.cpp
folly/experimental/io/AsyncIO.cpp [new file with mode: 0644]
folly/experimental/io/AsyncIO.h [new file with mode: 0644]
folly/experimental/io/test/AsyncIOTest.cpp [new file with mode: 0644]

diff --git a/folly/Exception.h b/folly/Exception.h
new file mode 100644 (file)
index 0000000..5820f68
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_EXCEPTION_H_
+#define FOLLY_EXCEPTION_H_
+
+#include <errno.h>
+
+#include <stdexcept>
+#include <system_error>
+
+#include "folly/Likely.h"
+
+namespace folly {
+
+// Helper to throw std::system_error
+void throwSystemError(int err, const char* msg) __attribute__((noreturn));
+inline void throwSystemError(int err, const char* msg) {
+  throw std::system_error(err, std::system_category(), msg);
+}
+
+// Helper to throw std::system_error from errno
+void throwSystemError(const char* msg) __attribute__((noreturn));
+inline void throwSystemError(const char* msg) {
+  throwSystemError(errno, msg);
+}
+
+// Check a Posix return code (0 on success, error number on error), throw
+// on error.
+inline void checkPosixError(int err, const char* msg) {
+  if (UNLIKELY(err != 0)) {
+    throwSystemError(err, msg);
+  }
+}
+
+// Check a Linux kernel-style return code (>= 0 on success, negative error
+// number on error), throw on error.
+inline void checkKernelError(ssize_t ret, const char* msg) {
+  if (UNLIKELY(ret < 0)) {
+    throwSystemError(-ret, msg);
+  }
+}
+
+// Check a traditional Uinx return code (-1 and sets errno on error), throw
+// on error.
+inline void checkUnixError(ssize_t ret, const char* msg) {
+  if (UNLIKELY(ret == -1)) {
+    throwSystemError(msg);
+  }
+}
+inline void checkUnixError(ssize_t ret, int savedErrno, const char* msg) {
+  if (UNLIKELY(ret == -1)) {
+    throwSystemError(savedErrno, msg);
+  }
+}
+
+}  // namespace folly
+
+#endif /* FOLLY_EXCEPTION_H_ */
+
index 1470a7066384bb66c1f81732fcce7c7385e2e0aa..e2d5f7bd818a9ac55b12ee14e68b0c03408c7385 100644 (file)
@@ -31,6 +31,7 @@
 #include <glog/logging.h>
 
 #include "folly/Conv.h"
+#include "folly/Exception.h"
 #include "folly/ScopeGuard.h"
 #include "folly/String.h"
 #include "folly/io/Cursor.h"
@@ -101,39 +102,6 @@ std::unique_ptr<const char*[]> cloneStrings(const std::vector<std::string>& s) {
   return d;
 }
 
-// Helper to throw std::system_error
-void throwSystemError(int err, const char* msg) __attribute__((noreturn));
-void throwSystemError(int err, const char* msg) {
-  throw std::system_error(err, std::system_category(), msg);
-}
-
-// Helper to throw std::system_error from errno
-void throwSystemError(const char* msg) __attribute__((noreturn));
-void throwSystemError(const char* msg) {
-  throwSystemError(errno, msg);
-}
-
-// Check a Posix return code (0 on success, error number on error), throw
-// on error.
-void checkPosixError(int err, const char* msg) {
-  if (err != 0) {
-    throwSystemError(err, msg);
-  }
-}
-
-// Check a traditional Uinx return code (-1 and sets errno on error), throw
-// on error.
-void checkUnixError(ssize_t ret, const char* msg) {
-  if (ret == -1) {
-    throwSystemError(msg);
-  }
-}
-void checkUnixError(ssize_t ret, int savedErrno, const char* msg) {
-  if (ret == -1) {
-    throwSystemError(savedErrno, msg);
-  }
-}
-
 // Check a wait() status, throw on non-successful
 void checkStatus(ProcessReturnCode returnCode) {
   if (returnCode.state() != ProcessReturnCode::EXITED ||
diff --git a/folly/experimental/io/AsyncIO.cpp b/folly/experimental/io/AsyncIO.cpp
new file mode 100644 (file)
index 0000000..2080b72
--- /dev/null
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/io/AsyncIO.h"
+
+#include <cerrno>
+
+#include <glog/logging.h>
+
+#include "folly/Exception.h"
+#include "folly/Likely.h"
+#include "folly/String.h"
+#include "folly/eventfd.h"
+
+namespace folly {
+
+AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
+  : ctx_(0),
+    pending_(0),
+    capacity_(capacity),
+    pollFd_(-1) {
+  if (UNLIKELY(capacity_ == 0)) {
+    throw std::out_of_range("AsyncIO: capacity must not be 0");
+  }
+  completed_.reserve(capacity_);
+  if (pollMode == POLLABLE) {
+    pollFd_ = eventfd(0, EFD_NONBLOCK);
+    checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
+  }
+}
+
+AsyncIO::~AsyncIO() {
+  CHECK_EQ(pending_, 0);
+  if (ctx_) {
+    int rc = io_queue_release(ctx_);
+    CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
+  }
+  if (pollFd_ != -1) {
+    CHECK_ERR(close(pollFd_));
+  }
+}
+
+void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
+  iocb cb;
+  io_prep_pread(&cb, fd, buf, size, start);
+  submit(op, &cb);
+}
+
+void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
+                    off_t start) {
+  pread(op, fd, range.begin(), range.size(), start);
+}
+
+void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
+                     off_t start) {
+  iocb cb;
+  io_prep_preadv(&cb, fd, iov, iovcnt, start);
+  submit(op, &cb);
+}
+
+void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
+                     off_t start) {
+  iocb cb;
+  io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
+  submit(op, &cb);
+}
+
+void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
+                     off_t start) {
+  pwrite(op, fd, range.begin(), range.size(), start);
+}
+
+void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
+                      off_t start) {
+  iocb cb;
+  io_prep_pwritev(&cb, fd, iov, iovcnt, start);
+  submit(op, &cb);
+}
+
+void AsyncIO::initializeContext() {
+  if (!ctx_) {
+    int rc = io_queue_init(capacity_, &ctx_);
+    // returns negative errno
+    checkKernelError(rc, "AsyncIO: io_queue_init failed");
+    DCHECK(ctx_);
+  }
+}
+
+void AsyncIO::submit(Op* op, iocb* cb) {
+  if (UNLIKELY(pending_ >= capacity_)) {
+    throw std::out_of_range("AsyncIO: too many pending requests");
+  }
+  if (UNLIKELY(op->state() != Op::UNINITIALIZED)) {
+    throw std::logic_error("AsyncIO: Invalid Op state in submit");
+  }
+  initializeContext();  // on demand
+  cb->data = op;
+  if (pollFd_ != -1) {
+    io_set_eventfd(cb, pollFd_);
+  }
+  int rc = io_submit(ctx_, 1, &cb);
+  checkKernelError(rc, "AsyncIO: io_submit failed");
+  DCHECK_EQ(rc, 1);
+  op->start();
+  ++pending_;
+}
+
+Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
+  if (UNLIKELY(!ctx_)) {
+    throw std::logic_error("AsyncIO: wait called with no requests");
+  }
+  if (UNLIKELY(pollFd_ != -1)) {
+    throw std::logic_error("AsyncIO: wait not allowed on pollable object");
+  }
+  return doWait(minRequests, pending_);
+}
+
+Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
+  if (UNLIKELY(!ctx_)) {
+    throw std::logic_error("AsyncIO: pollCompleted called with no requests");
+  }
+  if (UNLIKELY(pollFd_ == -1)) {
+    throw std::logic_error(
+        "AsyncIO: pollCompleted not allowed on non-pollable object");
+  }
+  uint64_t numEvents;
+  // This sets the eventFd counter to 0, see
+  // http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
+  ssize_t rc;
+  do {
+    rc = ::read(pollFd_, &numEvents, 8);
+  } while (rc == -1 && errno == EINTR);
+  if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
+    return Range<Op**>();  // nothing completed
+  }
+  checkUnixError(rc, "AsyncIO: read from event fd failed");
+  DCHECK_EQ(rc, 8);
+
+  DCHECK_GT(numEvents, 0);
+  DCHECK_LE(numEvents, pending_);
+
+  // Don't reap more than numEvents, as we've just reset the counter to 0.
+  return doWait(numEvents, numEvents);
+}
+
+Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
+  io_event events[pending_];
+  int count;
+  do {
+    // Wait forever
+    count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
+  } while (count == -EINTR);
+  checkKernelError(count, "AsyncIO: io_getevents failed");
+  DCHECK_GE(count, minRequests);  // the man page says so
+  DCHECK_LE(count, pending_);
+
+  completed_.clear();
+  if (count == 0) {
+    return folly::Range<Op**>();
+  }
+
+  for (size_t i = 0; i < count; ++i) {
+    Op* op = static_cast<Op*>(events[i].data);
+    DCHECK(op);
+    op->complete(events[i].res);
+    completed_.push_back(op);
+  }
+  pending_ -= count;
+
+  return folly::Range<Op**>(&completed_.front(), count);
+}
+
+AsyncIO::Op::Op()
+  : state_(UNINITIALIZED),
+    result_(-EINVAL) {
+}
+
+void AsyncIO::Op::reset() {
+  if (UNLIKELY(state_ == PENDING)) {
+    throw std::logic_error("AsyncIO: invalid state for reset");
+  }
+  state_ = UNINITIALIZED;
+  result_ = -EINVAL;
+}
+
+AsyncIO::Op::~Op() {
+  CHECK_NE(state_, PENDING);
+}
+
+void AsyncIO::Op::start() {
+  DCHECK_EQ(state_, UNINITIALIZED);
+  state_ = PENDING;
+}
+
+void AsyncIO::Op::complete(ssize_t result) {
+  DCHECK_EQ(state_, PENDING);
+  state_ = COMPLETED;
+  result_ = result;
+  onCompleted();
+}
+
+void AsyncIO::Op::onCompleted() { }  // default: do nothing
+
+ssize_t AsyncIO::Op::result() const {
+  if (UNLIKELY(state_ != COMPLETED)) {
+    throw std::logic_error("AsyncIO: Invalid Op state in result");
+  }
+  return result_;
+}
+
+CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
+
+CallbackOp::~CallbackOp() { }
+
+CallbackOp* CallbackOp::make(Callback&& callback) {
+  // Ensure created on the heap
+  return new CallbackOp(std::move(callback));
+}
+
+void CallbackOp::onCompleted() {
+  callback_(result());
+  delete this;
+}
+
+}  // namespace folly
+
diff --git a/folly/experimental/io/AsyncIO.h b/folly/experimental/io/AsyncIO.h
new file mode 100644 (file)
index 0000000..81ed94d
--- /dev/null
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_ASYNCIO_H_
+#define FOLLY_IO_ASYNCIO_H_
+
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <libaio.h>
+
+#include <cstdint>
+#include <functional>
+#include <utility>
+#include <vector>
+
+#include <boost/noncopyable.hpp>
+
+#include "folly/Portability.h"
+#include "folly/Range.h"
+
+namespace folly {
+
+/**
+ * C++ interface around Linux Async IO.
+ */
+class AsyncIO : private boost::noncopyable {
+ public:
+  enum PollMode {
+    NOT_POLLABLE,
+    POLLABLE
+  };
+
+  /**
+   * Create an AsyncIO context capacble of holding at most 'capacity' pending
+   * requests at the same time.  As requests complete, others can be scheduled,
+   * as long as this limit is not exceeded.
+   *
+   * Note: the maximum number of allowed concurrent requests is controlled
+   * by the fs.aio-max-nr sysctl, the default value is usually 64K.
+   *
+   * If pollMode is POLLABLE, pollFd() will return a file descriptor that
+   * can be passed to poll / epoll / select and will become readable when
+   * any IOs on this AioReader have completed.  If you do this, you must use
+   * pollCompleted() instead of wait() -- do not read from the pollFd()
+   * file descriptor directly.
+   */
+  explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
+  ~AsyncIO();
+
+  /**
+   * An Op represents a pending operation.  You may inherit from Op (and
+   * override onCompleted) in order to be notified of completion (see
+   * CallbackOp below for an example), or you may use Op's methods directly.
+   *
+   * The Op must remain allocated until completion.
+   */
+  class Op : private boost::noncopyable {
+    friend class AsyncIO;
+   public:
+    Op();
+    virtual ~Op();
+
+    // There would be a cancel() method here if Linux AIO actually implemented
+    // it.  But let's not get your hopes up.
+
+    enum State {
+      UNINITIALIZED,
+      PENDING,
+      COMPLETED
+    };
+
+    /**
+     * Return the current operation state.
+     */
+    State state() const { return state_; }
+
+    /**
+     * Reset the operation for reuse.  It is an error to call reset() on
+     * an Op that is still pending.
+     */
+    void reset();
+
+    /**
+     * Retrieve the result of this operation.  Returns >=0 on success,
+     * -errno on failure (that is, using the Linux kernel error reporting
+     * conventions).  Use checkKernelError (folly/Exception.h) on the result to
+     * throw a std::system_error in case of error instead.
+     *
+     * It is an error to call this if the Op hasn't yet started or is still
+     * pending.
+     */
+    ssize_t result() const;
+
+   private:
+    void start();
+    void complete(ssize_t result);
+
+    virtual void onCompleted();
+
+    State state_;
+    ssize_t result_;
+  };
+
+  /**
+   * Initiate a read request.
+   */
+  void pread(Op* op, int fd, void* buf, size_t size, off_t start);
+  void pread(Op* op, int fd, Range<unsigned char*> range, off_t start);
+  void preadv(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
+
+  /**
+   * Initiate a write request.
+   */
+  void pwrite(Op* op, int fd, const void* buf, size_t size, off_t start);
+  void pwrite(Op* op, int fd, Range<const unsigned char*> range, off_t start);
+  void pwritev(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
+
+  /**
+   * Wait for at least minRequests to complete.  Returns the requests that
+   * have completed; the returned range is valid until the next call to
+   * wait().  minRequests may be 0 to not block.
+   */
+  Range<Op**> wait(size_t minRequests);
+
+  /**
+   * Return the number of pending requests.
+   */
+  size_t pending() const { return pending_; }
+
+  /**
+   * Return the maximum number of requests that can be kept outstanding
+   * at any one time.
+   */
+  size_t capacity() const { return capacity_; }
+
+  /**
+   * If POLLABLE, return a file descriptor that can be passed to poll / epoll
+   * and will become readable when any async IO operations have completed.
+   * If NOT_POLLABLE, return -1.
+   */
+  int pollFd() const { return pollFd_; }
+
+  /**
+   * If POLLABLE, call instead of wait after the file descriptor returned
+   * by pollFd() became readable.  The returned range is valid until the next
+   * call to pollCompleted().
+   */
+  Range<Op**> pollCompleted();
+
+ private:
+  void initializeContext();
+  void submit(Op* op, iocb* cb);
+  Range<Op**> doWait(size_t minRequests, size_t maxRequests);
+
+  io_context_t ctx_;
+  size_t pending_;
+  size_t capacity_;
+  int pollFd_;
+  std::vector<Op*> completed_;
+};
+
+/**
+ * Implementation of AsyncIO::Op that calls a callback and then deletes
+ * itself.
+ */
+class CallbackOp : public AsyncIO::Op {
+ public:
+  typedef std::function<void(ssize_t)> Callback;
+  static CallbackOp* make(Callback&& callback);
+
+ private:
+  explicit CallbackOp(Callback&& callback);
+  ~CallbackOp();
+  void onCompleted() FOLLY_OVERRIDE;
+
+  Callback callback_;
+};
+
+}  // namespace folly
+
+#endif /* FOLLY_IO_ASYNCIO_H_ */
+
diff --git a/folly/experimental/io/test/AsyncIOTest.cpp b/folly/experimental/io/test/AsyncIOTest.cpp
new file mode 100644 (file)
index 0000000..77595c6
--- /dev/null
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/io/AsyncIO.h"
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <poll.h>
+
+#include <cstdlib>
+#include <cstdio>
+#include <memory>
+#include <random>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/experimental/io/FsUtil.h"
+#include "folly/ScopeGuard.h"
+#include "folly/String.h"
+
+namespace fs = folly::fs;
+using folly::AsyncIO;
+
+namespace {
+
+constexpr size_t kAlignment = 512;  // align reads to 512 B (for O_DIRECT)
+
+struct TestSpec {
+  off_t start;
+  size_t size;
+};
+
+void waitUntilReadable(int fd) {
+  pollfd pfd;
+  pfd.fd = fd;
+  pfd.events = POLLIN;
+
+  int r;
+  do {
+    r = poll(&pfd, 1, -1);  // wait forever
+  } while (r == -1 && errno == EINTR);
+  PCHECK(r == 1);
+  CHECK_EQ(pfd.revents, POLLIN);  // no errors etc
+}
+
+folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
+  int fd = reader->pollFd();
+  if (fd == -1) {
+    return reader->wait(1);
+  } else {
+    waitUntilReadable(fd);
+    return reader->pollCompleted();
+  }
+}
+
+// Temporary file that is NOT kept open but is deleted on exit.
+// Generate random-looking but reproduceable data.
+class TemporaryFile {
+ public:
+  explicit TemporaryFile(size_t size);
+  ~TemporaryFile();
+
+  const fs::path path() const { return path_; }
+
+ private:
+  fs::path path_;
+};
+
+TemporaryFile::TemporaryFile(size_t size)
+  : path_(fs::temp_directory_path() / fs::unique_path()) {
+  CHECK_EQ(size % sizeof(uint32_t), 0);
+  size /= sizeof(uint32_t);
+  const uint32_t seed = 42;
+  std::mt19937 rnd(seed);
+
+  const size_t bufferSize = 1U << 16;
+  uint32_t buffer[bufferSize];
+
+  FILE* fp = ::fopen(path_.c_str(), "wb");
+  PCHECK(fp != nullptr);
+  while (size) {
+    size_t n = std::min(size, bufferSize);
+    for (size_t i = 0; i < n; ++i) {
+      buffer[i] = rnd();
+    }
+    size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
+    PCHECK(written == n);
+    size -= written;
+  }
+  PCHECK(::fclose(fp) == 0);
+}
+
+TemporaryFile::~TemporaryFile() {
+  try {
+    fs::remove(path_);
+  } catch (const fs::filesystem_error& e) {
+    LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
+  }
+}
+
+TemporaryFile thisBinary(6 << 20);  // 6MiB
+
+void testReadsSerially(const std::vector<TestSpec>& specs,
+                       AsyncIO::PollMode pollMode) {
+  AsyncIO aioReader(1, pollMode);
+  AsyncIO::Op op;
+  int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
+  PCHECK(fd != -1);
+  SCOPE_EXIT {
+    ::close(fd);
+  };
+
+  for (int i = 0; i < specs.size(); i++) {
+    std::unique_ptr<char[]> buf(new char[specs[i].size]);
+    aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
+    EXPECT_EQ(aioReader.pending(), 1);
+    auto ops = readerWait(&aioReader);
+    EXPECT_EQ(1, ops.size());
+    EXPECT_TRUE(ops[0] == &op);
+    EXPECT_EQ(aioReader.pending(), 0);
+    ssize_t res = op.result();
+    EXPECT_LE(0, res) << folly::errnoStr(-res);
+    EXPECT_EQ(specs[i].size, res);
+    op.reset();
+  }
+}
+
+void testReadsParallel(const std::vector<TestSpec>& specs,
+                       AsyncIO::PollMode pollMode) {
+  AsyncIO aioReader(specs.size(), pollMode);
+  std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
+  std::vector<std::unique_ptr<char[]>> bufs(specs.size());
+
+  int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
+  PCHECK(fd != -1);
+  SCOPE_EXIT {
+    ::close(fd);
+  };
+  for (int i = 0; i < specs.size(); i++) {
+    bufs[i].reset(new char[specs[i].size]);
+    aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
+                    specs[i].start);
+  }
+  std::vector<bool> pending(specs.size(), true);
+
+  size_t remaining = specs.size();
+  while (remaining != 0) {
+    EXPECT_EQ(remaining, aioReader.pending());
+    auto completed = readerWait(&aioReader);
+    size_t nrRead = completed.size();
+    EXPECT_NE(nrRead, 0);
+    remaining -= nrRead;
+
+    for (int i = 0; i < nrRead; i++) {
+      int id = completed[i] - ops.get();
+      EXPECT_GE(id, 0);
+      EXPECT_LT(id, specs.size());
+      EXPECT_TRUE(pending[id]);
+      pending[id] = false;
+      ssize_t res = ops[id].result();
+      EXPECT_LE(0, res) << folly::errnoStr(-res);
+      EXPECT_EQ(specs[id].size, res);
+    }
+  }
+  EXPECT_EQ(aioReader.pending(), 0);
+  for (int i = 0; i < pending.size(); i++) {
+    EXPECT_FALSE(pending[i]);
+  }
+}
+
+void testReads(const std::vector<TestSpec>& specs,
+               AsyncIO::PollMode pollMode) {
+  testReadsSerially(specs, pollMode);
+  testReadsParallel(specs, pollMode);
+}
+
+}  // anonymous namespace
+
+TEST(AsyncIO, ZeroAsyncDataNotPollable) {
+  testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
+}
+
+TEST(AsyncIO, ZeroAsyncDataPollable) {
+  testReads({{0, 0}}, AsyncIO::POLLABLE);
+}
+
+TEST(AsyncIO, SingleAsyncDataNotPollable) {
+  testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
+  testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
+}
+
+TEST(AsyncIO, SingleAsyncDataPollable) {
+  testReads({{0, 512}}, AsyncIO::POLLABLE);
+  testReads({{0, 512}}, AsyncIO::POLLABLE);
+}
+
+TEST(AsyncIO, MultipleAsyncDataNotPollable) {
+  testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
+  testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
+
+  testReads({
+    {0, 5*1024*1024},
+    {512, 5*1024*1024},
+  }, AsyncIO::NOT_POLLABLE);
+
+  testReads({
+    {512, 0},
+    {512, 512},
+    {512, 1024},
+    {512, 10*1024},
+    {512, 1024*1024},
+  }, AsyncIO::NOT_POLLABLE);
+}
+
+TEST(AsyncIO, MultipleAsyncDataPollable) {
+  testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
+  testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
+
+  testReads({
+    {0, 5*1024*1024},
+    {512, 5*1024*1024},
+  }, AsyncIO::POLLABLE);
+
+  testReads({
+    {512, 0},
+    {512, 512},
+    {512, 1024},
+    {512, 10*1024},
+    {512, 1024*1024},
+  }, AsyncIO::POLLABLE);
+}
+
+TEST(AsyncIO, ManyAsyncDataNotPollable) {
+  {
+    std::vector<TestSpec> v;
+    for (int i = 0; i < 1000; i++) {
+      v.push_back({512 * i, 512});
+    }
+    testReads(v, AsyncIO::NOT_POLLABLE);
+  }
+}
+
+TEST(AsyncIO, ManyAsyncDataPollable) {
+  {
+    std::vector<TestSpec> v;
+    for (int i = 0; i < 1000; i++) {
+      v.push_back({512 * i, 512});
+    }
+    testReads(v, AsyncIO::POLLABLE);
+  }
+}
+
+TEST(AsyncIO, NonBlockingWait) {
+  AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
+  AsyncIO::Op op;
+  int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
+  PCHECK(fd != -1);
+  SCOPE_EXIT {
+    ::close(fd);
+  };
+  size_t size = 1024;
+  std::unique_ptr<char[]> buf(new char[size]);
+  aioReader.pread(&op, fd, buf.get(), size, 0);
+  EXPECT_EQ(aioReader.pending(), 1);
+
+  folly::Range<AsyncIO::Op**> completed;
+  while (completed.empty()) {
+    // poll without blocking until the read request completes.
+    completed = aioReader.wait(0);
+  }
+  EXPECT_EQ(completed.size(), 1);
+
+  EXPECT_TRUE(completed[0] == &op);
+  ssize_t res = op.result();
+  EXPECT_LE(0, res) << folly::errnoStr(-res);
+  EXPECT_EQ(size, res);
+  EXPECT_EQ(aioReader.pending(), 0);
+}
+