Fix copyright lines
[folly.git] / folly / experimental / io / test / AsyncIOTest.cpp
index 7025eff5218119da823d6c5d85101f52d43762d4..0d5732bed1bcc12eb9053a3090e56e2fbf65a501 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2013-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #include <folly/experimental/io/AsyncIO.h>
 
+#include <fcntl.h>
 #include <sys/stat.h>
 #include <sys/types.h>
-#include <fcntl.h>
-#include <poll.h>
 
-#include <cstdlib>
 #include <cstdio>
+#include <cstdlib>
 #include <memory>
 #include <random>
 #include <thread>
 #include <vector>
 
 #include <glog/logging.h>
-#include <gtest/gtest.h>
 
-#include <folly/experimental/io/FsUtil.h>
 #include <folly/ScopeGuard.h>
 #include <folly/String.h>
+#include <folly/experimental/io/FsUtil.h>
+#include <folly/portability/GTest.h>
+#include <folly/portability/Sockets.h>
 
 namespace fs = folly::fs;
+
 using folly::AsyncIO;
+using folly::AsyncIOOp;
 using folly::AsyncIOQueue;
 
 namespace {
 
-constexpr size_t kAlign = 4096;  // align reads to 4096 B (for O_DIRECT)
+constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
 
 struct TestSpec {
   off_t start;
@@ -55,10 +57,10 @@ void waitUntilReadable(int fd) {
 
   int r;
   do {
-    r = poll(&pfd, 1, -1);  // wait forever
+    r = poll(&pfd, 1, -1); // wait forever
   } while (r == -1 && errno == EINTR);
   PCHECK(r == 1);
-  CHECK_EQ(pfd.revents, POLLIN);  // no errors etc
+  CHECK_EQ(pfd.revents, POLLIN); // no errors etc
 }
 
 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
@@ -78,14 +80,16 @@ class TemporaryFile {
   explicit TemporaryFile(size_t size);
   ~TemporaryFile();
 
-  const fs::path path() const { return path_; }
+  const fs::path path() const {
+    return path_;
+  }
 
  private:
   fs::path path_;
 };
 
 TemporaryFile::TemporaryFile(size_t size)
-  : path_(fs::temp_directory_path() / fs::unique_path()) {
+    : path_(fs::temp_directory_path() / fs::unique_path()) {
   CHECK_EQ(size % sizeof(uint32_t), 0);
   size /= sizeof(uint32_t);
   const uint32_t seed = 42;
@@ -116,9 +120,9 @@ TemporaryFile::~TemporaryFile() {
   }
 }
 
-TemporaryFile tempFile(6 << 20);  // 6MiB
+TemporaryFile tempFile(6 << 20); // 6MiB
 
-typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
+typedef std::unique_ptr<char, void (*)(void*)> ManagedBuffer;
 ManagedBuffer allocateAligned(size_t size) {
   void* buf;
   int rc = posix_memalign(&buf, kAlign, size);
@@ -126,8 +130,9 @@ ManagedBuffer allocateAligned(size_t size) {
   return ManagedBuffer(reinterpret_cast<char*>(buf), free);
 }
 
-void testReadsSerially(const std::vector<TestSpec>& specs,
-                       AsyncIO::PollMode pollMode) {
+void testReadsSerially(
+    const std::vector<TestSpec>& specs,
+    AsyncIO::PollMode pollMode) {
   AsyncIO aioReader(1, pollMode);
   AsyncIO::Op op;
   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
@@ -136,10 +141,11 @@ void testReadsSerially(const std::vector<TestSpec>& specs,
     ::close(fd);
   };
 
-  for (int i = 0; i < specs.size(); i++) {
+  for (size_t i = 0; i < specs.size(); i++) {
     auto buf = allocateAligned(specs[i].size);
     op.pread(fd, buf.get(), specs[i].size, specs[i].start);
     aioReader.submit(&op);
+    EXPECT_EQ((i + 1), aioReader.totalSubmits());
     EXPECT_EQ(aioReader.pending(), 1);
     auto ops = readerWait(&aioReader);
     EXPECT_EQ(1, ops.size());
@@ -152,9 +158,10 @@ void testReadsSerially(const std::vector<TestSpec>& specs,
   }
 }
 
-void testReadsParallel(const std::vector<TestSpec>& specs,
-                       AsyncIO::PollMode pollMode,
-                       bool multithreaded) {
+void testReadsParallel(
+    const std::vector<TestSpec>& specs,
+    AsyncIO::PollMode pollMode,
+    bool multithreaded) {
   AsyncIO aioReader(specs.size(), pollMode);
   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
   std::vector<ManagedBuffer> bufs;
@@ -170,14 +177,14 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
   if (multithreaded) {
     threads.reserve(specs.size());
   }
-  for (int i = 0; i < specs.size(); i++) {
+  for (size_t i = 0; i < specs.size(); i++) {
     bufs.push_back(allocateAligned(specs[i].size));
   }
-  auto submit = [&] (int i) {
+  auto submit = [&](size_t i) {
     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
     aioReader.submit(&ops[i]);
   };
-  for (int i = 0; i < specs.size(); i++) {
+  for (size_t i = 0; i < specs.size(); i++) {
     if (multithreaded) {
       threads.emplace_back([&submit, i] { submit(i); });
     } else {
@@ -197,7 +204,7 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
     EXPECT_NE(nrRead, 0);
     remaining -= nrRead;
 
-    for (int i = 0; i < nrRead; i++) {
+    for (size_t i = 0; i < nrRead; i++) {
       int id = completed[i] - ops.get();
       EXPECT_GE(id, 0);
       EXPECT_LT(id, specs.size());
@@ -208,14 +215,17 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
       EXPECT_EQ(specs[id].size, res);
     }
   }
+  EXPECT_EQ(specs.size(), aioReader.totalSubmits());
+
   EXPECT_EQ(aioReader.pending(), 0);
-  for (int i = 0; i < pending.size(); i++) {
+  for (size_t i = 0; i < pending.size(); i++) {
     EXPECT_FALSE(pending[i]);
   }
 }
 
-void testReadsQueued(const std::vector<TestSpec>& specs,
-                     AsyncIO::PollMode pollMode) {
+void testReadsQueued(
+    const std::vector<TestSpec>& specs,
+    AsyncIO::PollMode pollMode) {
   size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
   AsyncIO aioReader(readerCapacity, pollMode);
   AsyncIOQueue aioQueue(&aioReader);
@@ -227,7 +237,7 @@ void testReadsQueued(const std::vector<TestSpec>& specs,
   SCOPE_EXIT {
     ::close(fd);
   };
-  for (int i = 0; i < specs.size(); i++) {
+  for (size_t i = 0; i < specs.size(); i++) {
     bufs.push_back(allocateAligned(specs[i].size));
     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
     aioQueue.submit(&ops[i]);
@@ -248,7 +258,7 @@ void testReadsQueued(const std::vector<TestSpec>& specs,
     EXPECT_NE(nrRead, 0);
     remaining -= nrRead;
 
-    for (int i = 0; i < nrRead; i++) {
+    for (size_t i = 0; i < nrRead; i++) {
       int id = completed[i] - ops.get();
       EXPECT_GE(id, 0);
       EXPECT_LT(id, specs.size());
@@ -259,22 +269,22 @@ void testReadsQueued(const std::vector<TestSpec>& specs,
       EXPECT_EQ(specs[id].size, res);
     }
   }
+  EXPECT_EQ(specs.size(), aioReader.totalSubmits());
   EXPECT_EQ(aioReader.pending(), 0);
   EXPECT_EQ(aioQueue.queued(), 0);
-  for (int i = 0; i < pending.size(); i++) {
+  for (size_t i = 0; i < pending.size(); i++) {
     EXPECT_FALSE(pending[i]);
   }
 }
 
-void testReads(const std::vector<TestSpec>& specs,
-               AsyncIO::PollMode pollMode) {
+void testReads(const std::vector<TestSpec>& specs, AsyncIO::PollMode pollMode) {
   testReadsSerially(specs, pollMode);
   testReadsParallel(specs, pollMode, false);
   testReadsParallel(specs, pollMode, true);
   testReadsQueued(specs, pollMode);
 }
 
-}  // anonymous namespace
+} // namespace
 
 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
   testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
@@ -296,46 +306,46 @@ TEST(AsyncIO, SingleAsyncDataPollable) {
 
 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
   testReads(
-      {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+      {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
       AsyncIO::NOT_POLLABLE);
   testReads(
-      {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+      {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
       AsyncIO::NOT_POLLABLE);
 
-  testReads({
-    {0, 5*1024*1024},
-    {kAlign, 5*1024*1024}
-  }, AsyncIO::NOT_POLLABLE);
-
-  testReads({
-    {kAlign, 0},
-    {kAlign, kAlign},
-    {kAlign, 2*kAlign},
-    {kAlign, 20*kAlign},
-    {kAlign, 1024*1024},
-  }, AsyncIO::NOT_POLLABLE);
+  testReads(
+      {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
+
+  testReads(
+      {
+          {kAlign, 0},
+          {kAlign, kAlign},
+          {kAlign, 2 * kAlign},
+          {kAlign, 20 * kAlign},
+          {kAlign, 1024 * 1024},
+      },
+      AsyncIO::NOT_POLLABLE);
 }
 
 TEST(AsyncIO, MultipleAsyncDataPollable) {
   testReads(
-      {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+      {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
       AsyncIO::POLLABLE);
   testReads(
-      {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+      {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
       AsyncIO::POLLABLE);
 
-  testReads({
-    {0, 5*1024*1024},
-    {kAlign, 5*1024*1024}
-  }, AsyncIO::NOT_POLLABLE);
-
-  testReads({
-    {kAlign, 0},
-    {kAlign, kAlign},
-    {kAlign, 2*kAlign},
-    {kAlign, 20*kAlign},
-    {kAlign, 1024*1024},
-  }, AsyncIO::NOT_POLLABLE);
+  testReads(
+      {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
+
+  testReads(
+      {
+          {kAlign, 0},
+          {kAlign, kAlign},
+          {kAlign, 2 * kAlign},
+          {kAlign, 20 * kAlign},
+          {kAlign, 1024 * 1024},
+      },
+      AsyncIO::NOT_POLLABLE);
 }
 
 TEST(AsyncIO, ManyAsyncDataNotPollable) {
@@ -366,7 +376,7 @@ TEST(AsyncIO, NonBlockingWait) {
   SCOPE_EXIT {
     ::close(fd);
   };
-  size_t size = 2*kAlign;
+  size_t size = 2 * kAlign;
   auto buf = allocateAligned(size);
   op.pread(fd, buf.get(), size, 0);
   aioReader.submit(&op);
@@ -386,4 +396,64 @@ TEST(AsyncIO, NonBlockingWait) {
   EXPECT_EQ(aioReader.pending(), 0);
 }
 
+TEST(AsyncIO, Cancel) {
+  constexpr size_t kNumOpsBatch1 = 10;
+  constexpr size_t kNumOpsBatch2 = 10;
 
+  AsyncIO aioReader(kNumOpsBatch1 + kNumOpsBatch2, AsyncIO::NOT_POLLABLE);
+  int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
+  PCHECK(fd != -1);
+  SCOPE_EXIT {
+    ::close(fd);
+  };
+
+  size_t completed = 0;
+
+  std::vector<std::unique_ptr<AsyncIO::Op>> ops;
+  std::vector<ManagedBuffer> bufs;
+  const auto schedule = [&](size_t n) {
+    for (size_t i = 0; i < n; ++i) {
+      const size_t size = 2 * kAlign;
+      bufs.push_back(allocateAligned(size));
+
+      ops.push_back(std::make_unique<AsyncIO::Op>());
+      auto& op = *ops.back();
+
+      op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
+      op.pread(fd, bufs.back().get(), size, 0);
+      aioReader.submit(&op);
+    }
+  };
+
+  // Mix completed and canceled operations for this test.
+  // In order to achieve that, schedule in two batches and do partial
+  // wait() after the first one.
+
+  schedule(kNumOpsBatch1);
+  EXPECT_EQ(aioReader.pending(), kNumOpsBatch1);
+  EXPECT_EQ(completed, 0);
+
+  auto result = aioReader.wait(1);
+  EXPECT_GE(result.size(), 1);
+  EXPECT_EQ(completed, result.size());
+  EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size());
+
+  schedule(kNumOpsBatch2);
+  EXPECT_EQ(aioReader.pending(), ops.size() - result.size());
+  EXPECT_EQ(completed, result.size());
+
+  auto canceled = aioReader.cancel();
+  EXPECT_EQ(canceled.size(), ops.size() - result.size());
+  EXPECT_EQ(aioReader.pending(), 0);
+  EXPECT_EQ(completed, result.size());
+
+  size_t foundCompleted = 0;
+  for (auto& op : ops) {
+    if (op->state() == AsyncIOOp::State::COMPLETED) {
+      ++foundCompleted;
+    } else {
+      EXPECT_TRUE(op->state() == AsyncIOOp::State::CANCELED) << *op;
+    }
+  }
+  EXPECT_EQ(foundCompleted, completed);
+}