2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/io/AsyncIO.h>
21 #include <sys/types.h>
30 #include <glog/logging.h>
32 #include <folly/ScopeGuard.h>
33 #include <folly/String.h>
34 #include <folly/experimental/io/FsUtil.h>
35 #include <folly/portability/GTest.h>
36 #include <folly/portability/Sockets.h>
38 namespace fs = folly::fs;
41 using folly::AsyncIOOp;
42 using folly::AsyncIOQueue;
46 constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
53 void waitUntilReadable(int fd) {
60 r = poll(&pfd, 1, -1); // wait forever
61 } while (r == -1 && errno == EINTR);
63 CHECK_EQ(pfd.revents, POLLIN); // no errors etc
66 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
67 int fd = reader->pollFd();
69 return reader->wait(1);
71 waitUntilReadable(fd);
72 return reader->pollCompleted();
76 // Temporary file that is NOT kept open but is deleted on exit.
77 // Generate random-looking but reproduceable data.
80 explicit TemporaryFile(size_t size);
83 const fs::path path() const {
91 TemporaryFile::TemporaryFile(size_t size)
92 : path_(fs::temp_directory_path() / fs::unique_path()) {
93 CHECK_EQ(size % sizeof(uint32_t), 0);
94 size /= sizeof(uint32_t);
95 const uint32_t seed = 42;
96 std::mt19937 rnd(seed);
98 const size_t bufferSize = 1U << 16;
99 uint32_t buffer[bufferSize];
101 FILE* fp = ::fopen(path_.c_str(), "wb");
102 PCHECK(fp != nullptr);
104 size_t n = std::min(size, bufferSize);
105 for (size_t i = 0; i < n; ++i) {
108 size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
109 PCHECK(written == n);
112 PCHECK(::fclose(fp) == 0);
115 TemporaryFile::~TemporaryFile() {
118 } catch (const fs::filesystem_error& e) {
119 LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
123 TemporaryFile tempFile(6 << 20); // 6MiB
125 typedef std::unique_ptr<char, void (*)(void*)> ManagedBuffer;
126 ManagedBuffer allocateAligned(size_t size) {
128 int rc = posix_memalign(&buf, kAlign, size);
129 CHECK_EQ(rc, 0) << strerror(rc);
130 return ManagedBuffer(reinterpret_cast<char*>(buf), free);
133 void testReadsSerially(
134 const std::vector<TestSpec>& specs,
135 AsyncIO::PollMode pollMode) {
136 AsyncIO aioReader(1, pollMode);
138 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
144 for (size_t i = 0; i < specs.size(); i++) {
145 auto buf = allocateAligned(specs[i].size);
146 op.pread(fd, buf.get(), specs[i].size, specs[i].start);
147 aioReader.submit(&op);
148 EXPECT_EQ((i + 1), aioReader.totalSubmits());
149 EXPECT_EQ(aioReader.pending(), 1);
150 auto ops = readerWait(&aioReader);
151 EXPECT_EQ(1, ops.size());
152 EXPECT_TRUE(ops[0] == &op);
153 EXPECT_EQ(aioReader.pending(), 0);
154 ssize_t res = op.result();
155 EXPECT_LE(0, res) << folly::errnoStr(-res);
156 EXPECT_EQ(specs[i].size, res);
161 void testReadsParallel(
162 const std::vector<TestSpec>& specs,
163 AsyncIO::PollMode pollMode,
164 bool multithreaded) {
165 AsyncIO aioReader(specs.size(), pollMode);
166 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
167 std::vector<ManagedBuffer> bufs;
168 bufs.reserve(specs.size());
170 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
176 std::vector<std::thread> threads;
178 threads.reserve(specs.size());
180 for (size_t i = 0; i < specs.size(); i++) {
181 bufs.push_back(allocateAligned(specs[i].size));
183 auto submit = [&](size_t i) {
184 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
185 aioReader.submit(&ops[i]);
187 for (size_t i = 0; i < specs.size(); i++) {
189 threads.emplace_back([&submit, i] { submit(i); });
194 for (auto& t : threads) {
197 std::vector<bool> pending(specs.size(), true);
199 size_t remaining = specs.size();
200 while (remaining != 0) {
201 EXPECT_EQ(remaining, aioReader.pending());
202 auto completed = readerWait(&aioReader);
203 size_t nrRead = completed.size();
204 EXPECT_NE(nrRead, 0);
207 for (size_t i = 0; i < nrRead; i++) {
208 int id = completed[i] - ops.get();
210 EXPECT_LT(id, specs.size());
211 EXPECT_TRUE(pending[id]);
213 ssize_t res = ops[id].result();
214 EXPECT_LE(0, res) << folly::errnoStr(-res);
215 EXPECT_EQ(specs[id].size, res);
218 EXPECT_EQ(specs.size(), aioReader.totalSubmits());
220 EXPECT_EQ(aioReader.pending(), 0);
221 for (size_t i = 0; i < pending.size(); i++) {
222 EXPECT_FALSE(pending[i]);
226 void testReadsQueued(
227 const std::vector<TestSpec>& specs,
228 AsyncIO::PollMode pollMode) {
229 size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
230 AsyncIO aioReader(readerCapacity, pollMode);
231 AsyncIOQueue aioQueue(&aioReader);
232 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
233 std::vector<ManagedBuffer> bufs;
235 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
240 for (size_t i = 0; i < specs.size(); i++) {
241 bufs.push_back(allocateAligned(specs[i].size));
242 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
243 aioQueue.submit(&ops[i]);
245 std::vector<bool> pending(specs.size(), true);
247 size_t remaining = specs.size();
248 while (remaining != 0) {
249 if (remaining >= readerCapacity) {
250 EXPECT_EQ(readerCapacity, aioReader.pending());
251 EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
253 EXPECT_EQ(remaining, aioReader.pending());
254 EXPECT_EQ(0, aioQueue.queued());
256 auto completed = readerWait(&aioReader);
257 size_t nrRead = completed.size();
258 EXPECT_NE(nrRead, 0);
261 for (size_t i = 0; i < nrRead; i++) {
262 int id = completed[i] - ops.get();
264 EXPECT_LT(id, specs.size());
265 EXPECT_TRUE(pending[id]);
267 ssize_t res = ops[id].result();
268 EXPECT_LE(0, res) << folly::errnoStr(-res);
269 EXPECT_EQ(specs[id].size, res);
272 EXPECT_EQ(specs.size(), aioReader.totalSubmits());
273 EXPECT_EQ(aioReader.pending(), 0);
274 EXPECT_EQ(aioQueue.queued(), 0);
275 for (size_t i = 0; i < pending.size(); i++) {
276 EXPECT_FALSE(pending[i]);
280 void testReads(const std::vector<TestSpec>& specs, AsyncIO::PollMode pollMode) {
281 testReadsSerially(specs, pollMode);
282 testReadsParallel(specs, pollMode, false);
283 testReadsParallel(specs, pollMode, true);
284 testReadsQueued(specs, pollMode);
287 } // anonymous namespace
289 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
290 testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
293 TEST(AsyncIO, ZeroAsyncDataPollable) {
294 testReads({{0, 0}}, AsyncIO::POLLABLE);
297 TEST(AsyncIO, SingleAsyncDataNotPollable) {
298 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
299 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
302 TEST(AsyncIO, SingleAsyncDataPollable) {
303 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
304 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
307 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
309 {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
310 AsyncIO::NOT_POLLABLE);
312 {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
313 AsyncIO::NOT_POLLABLE);
316 {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
322 {kAlign, 2 * kAlign},
323 {kAlign, 20 * kAlign},
324 {kAlign, 1024 * 1024},
326 AsyncIO::NOT_POLLABLE);
329 TEST(AsyncIO, MultipleAsyncDataPollable) {
331 {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
334 {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
338 {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
344 {kAlign, 2 * kAlign},
345 {kAlign, 20 * kAlign},
346 {kAlign, 1024 * 1024},
348 AsyncIO::NOT_POLLABLE);
351 TEST(AsyncIO, ManyAsyncDataNotPollable) {
353 std::vector<TestSpec> v;
354 for (int i = 0; i < 1000; i++) {
355 v.push_back({off_t(kAlign * i), kAlign});
357 testReads(v, AsyncIO::NOT_POLLABLE);
361 TEST(AsyncIO, ManyAsyncDataPollable) {
363 std::vector<TestSpec> v;
364 for (int i = 0; i < 1000; i++) {
365 v.push_back({off_t(kAlign * i), kAlign});
367 testReads(v, AsyncIO::POLLABLE);
371 TEST(AsyncIO, NonBlockingWait) {
372 AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
374 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
379 size_t size = 2 * kAlign;
380 auto buf = allocateAligned(size);
381 op.pread(fd, buf.get(), size, 0);
382 aioReader.submit(&op);
383 EXPECT_EQ(aioReader.pending(), 1);
385 folly::Range<AsyncIO::Op**> completed;
386 while (completed.empty()) {
387 // poll without blocking until the read request completes.
388 completed = aioReader.wait(0);
390 EXPECT_EQ(completed.size(), 1);
392 EXPECT_TRUE(completed[0] == &op);
393 ssize_t res = op.result();
394 EXPECT_LE(0, res) << folly::errnoStr(-res);
395 EXPECT_EQ(size, res);
396 EXPECT_EQ(aioReader.pending(), 0);
399 TEST(AsyncIO, Cancel) {
400 constexpr size_t kNumOpsBatch1 = 10;
401 constexpr size_t kNumOpsBatch2 = 10;
403 AsyncIO aioReader(kNumOpsBatch1 + kNumOpsBatch2, AsyncIO::NOT_POLLABLE);
404 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
410 size_t completed = 0;
412 std::vector<std::unique_ptr<AsyncIO::Op>> ops;
413 std::vector<ManagedBuffer> bufs;
414 const auto schedule = [&](size_t n) {
415 for (size_t i = 0; i < n; ++i) {
416 const size_t size = 2 * kAlign;
417 bufs.push_back(allocateAligned(size));
419 ops.push_back(std::make_unique<AsyncIO::Op>());
420 auto& op = *ops.back();
422 op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
423 op.pread(fd, bufs.back().get(), size, 0);
424 aioReader.submit(&op);
428 // Mix completed and canceled operations for this test.
429 // In order to achieve that, schedule in two batches and do partial
430 // wait() after the first one.
432 schedule(kNumOpsBatch1);
433 EXPECT_EQ(aioReader.pending(), kNumOpsBatch1);
434 EXPECT_EQ(completed, 0);
436 auto result = aioReader.wait(1);
437 EXPECT_GE(result.size(), 1);
438 EXPECT_EQ(completed, result.size());
439 EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size());
441 schedule(kNumOpsBatch2);
442 EXPECT_EQ(aioReader.pending(), ops.size() - result.size());
443 EXPECT_EQ(completed, result.size());
445 auto canceled = aioReader.cancel();
446 EXPECT_EQ(canceled.size(), ops.size() - result.size());
447 EXPECT_EQ(aioReader.pending(), 0);
448 EXPECT_EQ(completed, result.size());
450 size_t foundCompleted = 0;
451 for (auto& op : ops) {
452 if (op->state() == AsyncIOOp::State::COMPLETED) {
455 EXPECT_TRUE(op->state() == AsyncIOOp::State::CANCELED) << *op;
458 EXPECT_EQ(foundCompleted, completed);