/*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2013-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <folly/experimental/io/AsyncIO.h>
+#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
-#include <fcntl.h>
-#include <poll.h>
-#include <cstdlib>
#include <cstdio>
+#include <cstdlib>
#include <memory>
#include <random>
#include <thread>
#include <vector>
#include <glog/logging.h>
-#include <gtest/gtest.h>
-#include <folly/experimental/io/FsUtil.h>
#include <folly/ScopeGuard.h>
#include <folly/String.h>
+#include <folly/experimental/io/FsUtil.h>
+#include <folly/portability/GTest.h>
+#include <folly/portability/Sockets.h>
namespace fs = folly::fs;
+
using folly::AsyncIO;
+using folly::AsyncIOOp;
using folly::AsyncIOQueue;
namespace {
-constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
+constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
struct TestSpec {
off_t start;
int r;
do {
- r = poll(&pfd, 1, -1); // wait forever
+ r = poll(&pfd, 1, -1); // wait forever
} while (r == -1 && errno == EINTR);
PCHECK(r == 1);
- CHECK_EQ(pfd.revents, POLLIN); // no errors etc
+ CHECK_EQ(pfd.revents, POLLIN); // no errors etc
}
folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
explicit TemporaryFile(size_t size);
~TemporaryFile();
- const fs::path path() const { return path_; }
+ const fs::path path() const {
+ return path_;
+ }
private:
fs::path path_;
};
TemporaryFile::TemporaryFile(size_t size)
- : path_(fs::temp_directory_path() / fs::unique_path()) {
+ : path_(fs::temp_directory_path() / fs::unique_path()) {
CHECK_EQ(size % sizeof(uint32_t), 0);
size /= sizeof(uint32_t);
const uint32_t seed = 42;
}
}
-TemporaryFile tempFile(6 << 20); // 6MiB
+TemporaryFile tempFile(6 << 20); // 6MiB
-typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
+typedef std::unique_ptr<char, void (*)(void*)> ManagedBuffer;
ManagedBuffer allocateAligned(size_t size) {
void* buf;
int rc = posix_memalign(&buf, kAlign, size);
return ManagedBuffer(reinterpret_cast<char*>(buf), free);
}
-void testReadsSerially(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode) {
+void testReadsSerially(
+ const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode) {
AsyncIO aioReader(1, pollMode);
AsyncIO::Op op;
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
::close(fd);
};
- for (int i = 0; i < specs.size(); i++) {
+ for (size_t i = 0; i < specs.size(); i++) {
auto buf = allocateAligned(specs[i].size);
op.pread(fd, buf.get(), specs[i].size, specs[i].start);
aioReader.submit(&op);
+ EXPECT_EQ((i + 1), aioReader.totalSubmits());
EXPECT_EQ(aioReader.pending(), 1);
auto ops = readerWait(&aioReader);
EXPECT_EQ(1, ops.size());
}
}
-void testReadsParallel(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode,
- bool multithreaded) {
+void testReadsParallel(
+ const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode,
+ bool multithreaded) {
AsyncIO aioReader(specs.size(), pollMode);
std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
std::vector<ManagedBuffer> bufs;
if (multithreaded) {
threads.reserve(specs.size());
}
- for (int i = 0; i < specs.size(); i++) {
+ for (size_t i = 0; i < specs.size(); i++) {
bufs.push_back(allocateAligned(specs[i].size));
}
- auto submit = [&] (int i) {
+ auto submit = [&](size_t i) {
ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
aioReader.submit(&ops[i]);
};
- for (int i = 0; i < specs.size(); i++) {
+ for (size_t i = 0; i < specs.size(); i++) {
if (multithreaded) {
threads.emplace_back([&submit, i] { submit(i); });
} else {
EXPECT_NE(nrRead, 0);
remaining -= nrRead;
- for (int i = 0; i < nrRead; i++) {
+ for (size_t i = 0; i < nrRead; i++) {
int id = completed[i] - ops.get();
EXPECT_GE(id, 0);
EXPECT_LT(id, specs.size());
EXPECT_EQ(specs[id].size, res);
}
}
+ EXPECT_EQ(specs.size(), aioReader.totalSubmits());
+
EXPECT_EQ(aioReader.pending(), 0);
- for (int i = 0; i < pending.size(); i++) {
+ for (size_t i = 0; i < pending.size(); i++) {
EXPECT_FALSE(pending[i]);
}
}
-void testReadsQueued(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode) {
+void testReadsQueued(
+ const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode) {
size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
AsyncIO aioReader(readerCapacity, pollMode);
AsyncIOQueue aioQueue(&aioReader);
SCOPE_EXIT {
::close(fd);
};
- for (int i = 0; i < specs.size(); i++) {
+ for (size_t i = 0; i < specs.size(); i++) {
bufs.push_back(allocateAligned(specs[i].size));
ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
aioQueue.submit(&ops[i]);
EXPECT_NE(nrRead, 0);
remaining -= nrRead;
- for (int i = 0; i < nrRead; i++) {
+ for (size_t i = 0; i < nrRead; i++) {
int id = completed[i] - ops.get();
EXPECT_GE(id, 0);
EXPECT_LT(id, specs.size());
EXPECT_EQ(specs[id].size, res);
}
}
+ EXPECT_EQ(specs.size(), aioReader.totalSubmits());
EXPECT_EQ(aioReader.pending(), 0);
EXPECT_EQ(aioQueue.queued(), 0);
- for (int i = 0; i < pending.size(); i++) {
+ for (size_t i = 0; i < pending.size(); i++) {
EXPECT_FALSE(pending[i]);
}
}
-void testReads(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode) {
+void testReads(const std::vector<TestSpec>& specs, AsyncIO::PollMode pollMode) {
testReadsSerially(specs, pollMode);
testReadsParallel(specs, pollMode, false);
testReadsParallel(specs, pollMode, true);
testReadsQueued(specs, pollMode);
}
-} // anonymous namespace
+} // namespace
TEST(AsyncIO, ZeroAsyncDataNotPollable) {
testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
TEST(AsyncIO, MultipleAsyncDataNotPollable) {
testReads(
- {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
AsyncIO::NOT_POLLABLE);
testReads(
- {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
AsyncIO::NOT_POLLABLE);
- testReads({
- {0, 5*1024*1024},
- {kAlign, 5*1024*1024}
- }, AsyncIO::NOT_POLLABLE);
-
- testReads({
- {kAlign, 0},
- {kAlign, kAlign},
- {kAlign, 2*kAlign},
- {kAlign, 20*kAlign},
- {kAlign, 1024*1024},
- }, AsyncIO::NOT_POLLABLE);
+ testReads(
+ {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
+
+ testReads(
+ {
+ {kAlign, 0},
+ {kAlign, kAlign},
+ {kAlign, 2 * kAlign},
+ {kAlign, 20 * kAlign},
+ {kAlign, 1024 * 1024},
+ },
+ AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, MultipleAsyncDataPollable) {
testReads(
- {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
AsyncIO::POLLABLE);
testReads(
- {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
AsyncIO::POLLABLE);
- testReads({
- {0, 5*1024*1024},
- {kAlign, 5*1024*1024}
- }, AsyncIO::NOT_POLLABLE);
-
- testReads({
- {kAlign, 0},
- {kAlign, kAlign},
- {kAlign, 2*kAlign},
- {kAlign, 20*kAlign},
- {kAlign, 1024*1024},
- }, AsyncIO::NOT_POLLABLE);
+ testReads(
+ {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
+
+ testReads(
+ {
+ {kAlign, 0},
+ {kAlign, kAlign},
+ {kAlign, 2 * kAlign},
+ {kAlign, 20 * kAlign},
+ {kAlign, 1024 * 1024},
+ },
+ AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, ManyAsyncDataNotPollable) {
SCOPE_EXIT {
::close(fd);
};
- size_t size = 2*kAlign;
+ size_t size = 2 * kAlign;
auto buf = allocateAligned(size);
op.pread(fd, buf.get(), size, 0);
aioReader.submit(&op);
EXPECT_EQ(aioReader.pending(), 0);
}
+TEST(AsyncIO, Cancel) {
+ constexpr size_t kNumOpsBatch1 = 10;
+ constexpr size_t kNumOpsBatch2 = 10;
+ AsyncIO aioReader(kNumOpsBatch1 + kNumOpsBatch2, AsyncIO::NOT_POLLABLE);
+ int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
+ PCHECK(fd != -1);
+ SCOPE_EXIT {
+ ::close(fd);
+ };
+
+ size_t completed = 0;
+
+ std::vector<std::unique_ptr<AsyncIO::Op>> ops;
+ std::vector<ManagedBuffer> bufs;
+ const auto schedule = [&](size_t n) {
+ for (size_t i = 0; i < n; ++i) {
+ const size_t size = 2 * kAlign;
+ bufs.push_back(allocateAligned(size));
+
+ ops.push_back(std::make_unique<AsyncIO::Op>());
+ auto& op = *ops.back();
+
+ op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
+ op.pread(fd, bufs.back().get(), size, 0);
+ aioReader.submit(&op);
+ }
+ };
+
+ // Mix completed and canceled operations for this test.
+ // In order to achieve that, schedule in two batches and do partial
+ // wait() after the first one.
+
+ schedule(kNumOpsBatch1);
+ EXPECT_EQ(aioReader.pending(), kNumOpsBatch1);
+ EXPECT_EQ(completed, 0);
+
+ auto result = aioReader.wait(1);
+ EXPECT_GE(result.size(), 1);
+ EXPECT_EQ(completed, result.size());
+ EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size());
+
+ schedule(kNumOpsBatch2);
+ EXPECT_EQ(aioReader.pending(), ops.size() - result.size());
+ EXPECT_EQ(completed, result.size());
+
+ auto canceled = aioReader.cancel();
+ EXPECT_EQ(canceled.size(), ops.size() - result.size());
+ EXPECT_EQ(aioReader.pending(), 0);
+ EXPECT_EQ(completed, result.size());
+
+ size_t foundCompleted = 0;
+ for (auto& op : ops) {
+ if (op->state() == AsyncIOOp::State::COMPLETED) {
+ ++foundCompleted;
+ } else {
+ EXPECT_TRUE(op->state() == AsyncIOOp::State::CANCELED) << *op;
+ }
+ }
+ EXPECT_EQ(foundCompleted, completed);
+}