/*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*/
-#include "folly/experimental/io/AsyncIO.h"
+#include <folly/experimental/io/AsyncIO.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <cstdio>
#include <memory>
#include <random>
+#include <thread>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "folly/experimental/io/FsUtil.h"
-#include "folly/ScopeGuard.h"
-#include "folly/String.h"
+#include <folly/experimental/io/FsUtil.h>
+#include <folly/ScopeGuard.h>
+#include <folly/String.h>
namespace fs = folly::fs;
using folly::AsyncIO;
+using folly::AsyncIOQueue;
namespace {
-constexpr size_t kAlignment = 512; // align reads to 512 B (for O_DIRECT)
+constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
struct TestSpec {
off_t start;
TemporaryFile tempFile(6 << 20); // 6MiB
+typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
+ManagedBuffer allocateAligned(size_t size) {
+ void* buf;
+ int rc = posix_memalign(&buf, kAlign, size);
+ CHECK_EQ(rc, 0) << strerror(rc);
+ return ManagedBuffer(reinterpret_cast<char*>(buf), free);
+}
+
void testReadsSerially(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
AsyncIO aioReader(1, pollMode);
};
for (int i = 0; i < specs.size(); i++) {
- std::unique_ptr<char[]> buf(new char[specs[i].size]);
- aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
+ auto buf = allocateAligned(specs[i].size);
+ op.pread(fd, buf.get(), specs[i].size, specs[i].start);
+ aioReader.submit(&op);
EXPECT_EQ(aioReader.pending(), 1);
auto ops = readerWait(&aioReader);
EXPECT_EQ(1, ops.size());
}
void testReadsParallel(const std::vector<TestSpec>& specs,
- AsyncIO::PollMode pollMode) {
+ AsyncIO::PollMode pollMode,
+ bool multithreaded) {
AsyncIO aioReader(specs.size(), pollMode);
std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
- std::vector<std::unique_ptr<char[]>> bufs(specs.size());
+ std::vector<ManagedBuffer> bufs;
+ bufs.reserve(specs.size());
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
PCHECK(fd != -1);
SCOPE_EXIT {
::close(fd);
};
+
+ std::vector<std::thread> threads;
+ if (multithreaded) {
+ threads.reserve(specs.size());
+ }
for (int i = 0; i < specs.size(); i++) {
- bufs[i].reset(new char[specs[i].size]);
- aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
- specs[i].start);
+ bufs.push_back(allocateAligned(specs[i].size));
+ }
+ auto submit = [&] (int i) {
+ ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
+ aioReader.submit(&ops[i]);
+ };
+ for (int i = 0; i < specs.size(); i++) {
+ if (multithreaded) {
+ threads.emplace_back([&submit, i] { submit(i); });
+ } else {
+ submit(i);
+ }
+ }
+ for (auto& t : threads) {
+ t.join();
}
std::vector<bool> pending(specs.size(), true);
}
}
+void testReadsQueued(const std::vector<TestSpec>& specs,
+ AsyncIO::PollMode pollMode) {
+ size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
+ AsyncIO aioReader(readerCapacity, pollMode);
+ AsyncIOQueue aioQueue(&aioReader);
+ std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
+ std::vector<ManagedBuffer> bufs;
+
+ int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
+ PCHECK(fd != -1);
+ SCOPE_EXIT {
+ ::close(fd);
+ };
+ for (int i = 0; i < specs.size(); i++) {
+ bufs.push_back(allocateAligned(specs[i].size));
+ ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
+ aioQueue.submit(&ops[i]);
+ }
+ std::vector<bool> pending(specs.size(), true);
+
+ size_t remaining = specs.size();
+ while (remaining != 0) {
+ if (remaining >= readerCapacity) {
+ EXPECT_EQ(readerCapacity, aioReader.pending());
+ EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
+ } else {
+ EXPECT_EQ(remaining, aioReader.pending());
+ EXPECT_EQ(0, aioQueue.queued());
+ }
+ auto completed = readerWait(&aioReader);
+ size_t nrRead = completed.size();
+ EXPECT_NE(nrRead, 0);
+ remaining -= nrRead;
+
+ for (int i = 0; i < nrRead; i++) {
+ int id = completed[i] - ops.get();
+ EXPECT_GE(id, 0);
+ EXPECT_LT(id, specs.size());
+ EXPECT_TRUE(pending[id]);
+ pending[id] = false;
+ ssize_t res = ops[id].result();
+ EXPECT_LE(0, res) << folly::errnoStr(-res);
+ EXPECT_EQ(specs[id].size, res);
+ }
+ }
+ EXPECT_EQ(aioReader.pending(), 0);
+ EXPECT_EQ(aioQueue.queued(), 0);
+ for (int i = 0; i < pending.size(); i++) {
+ EXPECT_FALSE(pending[i]);
+ }
+}
+
void testReads(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
testReadsSerially(specs, pollMode);
- testReadsParallel(specs, pollMode);
+ testReadsParallel(specs, pollMode, false);
+ testReadsParallel(specs, pollMode, true);
+ testReadsQueued(specs, pollMode);
}
} // anonymous namespace
}
TEST(AsyncIO, SingleAsyncDataNotPollable) {
- testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
- testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
+ testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
+ testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, SingleAsyncDataPollable) {
- testReads({{0, 512}}, AsyncIO::POLLABLE);
- testReads({{0, 512}}, AsyncIO::POLLABLE);
+ testReads({{0, kAlign}}, AsyncIO::POLLABLE);
+ testReads({{0, kAlign}}, AsyncIO::POLLABLE);
}
TEST(AsyncIO, MultipleAsyncDataNotPollable) {
- testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
- testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
+ testReads(
+ {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ AsyncIO::NOT_POLLABLE);
+ testReads(
+ {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ AsyncIO::NOT_POLLABLE);
testReads({
{0, 5*1024*1024},
- {512, 5*1024*1024},
+ {kAlign, 5*1024*1024}
}, AsyncIO::NOT_POLLABLE);
testReads({
- {512, 0},
- {512, 512},
- {512, 1024},
- {512, 10*1024},
- {512, 1024*1024},
+ {kAlign, 0},
+ {kAlign, kAlign},
+ {kAlign, 2*kAlign},
+ {kAlign, 20*kAlign},
+ {kAlign, 1024*1024},
}, AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, MultipleAsyncDataPollable) {
- testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
- testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
+ testReads(
+ {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ AsyncIO::POLLABLE);
+ testReads(
+ {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
+ AsyncIO::POLLABLE);
testReads({
{0, 5*1024*1024},
- {512, 5*1024*1024},
- }, AsyncIO::POLLABLE);
+ {kAlign, 5*1024*1024}
+ }, AsyncIO::NOT_POLLABLE);
testReads({
- {512, 0},
- {512, 512},
- {512, 1024},
- {512, 10*1024},
- {512, 1024*1024},
- }, AsyncIO::POLLABLE);
+ {kAlign, 0},
+ {kAlign, kAlign},
+ {kAlign, 2*kAlign},
+ {kAlign, 20*kAlign},
+ {kAlign, 1024*1024},
+ }, AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, ManyAsyncDataNotPollable) {
{
std::vector<TestSpec> v;
for (int i = 0; i < 1000; i++) {
- v.push_back({512 * i, 512});
+ v.push_back({off_t(kAlign * i), kAlign});
}
testReads(v, AsyncIO::NOT_POLLABLE);
}
{
std::vector<TestSpec> v;
for (int i = 0; i < 1000; i++) {
- v.push_back({512 * i, 512});
+ v.push_back({off_t(kAlign * i), kAlign});
}
testReads(v, AsyncIO::POLLABLE);
}
SCOPE_EXIT {
::close(fd);
};
- size_t size = 1024;
- std::unique_ptr<char[]> buf(new char[size]);
- aioReader.pread(&op, fd, buf.get(), size, 0);
+ size_t size = 2*kAlign;
+ auto buf = allocateAligned(size);
+ op.pread(fd, buf.get(), size, 0);
+ aioReader.submit(&op);
EXPECT_EQ(aioReader.pending(), 1);
folly::Range<AsyncIO::Op**> completed;
EXPECT_EQ(aioReader.pending(), 0);
}
+