2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/io/AsyncIO.h>
20 #include <sys/types.h>
30 #include <glog/logging.h>
32 #include <folly/experimental/io/FsUtil.h>
33 #include <folly/ScopeGuard.h>
34 #include <folly/String.h>
35 #include <folly/portability/GTest.h>
36 #include <folly/portability/Sockets.h>
38 namespace fs = folly::fs;
41 using folly::AsyncIOOp;
42 using folly::AsyncIOQueue;
46 constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
53 void waitUntilReadable(int fd) {
60 r = poll(&pfd, 1, -1); // wait forever
61 } while (r == -1 && errno == EINTR);
63 CHECK_EQ(pfd.revents, POLLIN); // no errors etc
66 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
67 int fd = reader->pollFd();
69 return reader->wait(1);
71 waitUntilReadable(fd);
72 return reader->pollCompleted();
76 // Temporary file that is NOT kept open but is deleted on exit.
77 // Generate random-looking but reproduceable data.
80 explicit TemporaryFile(size_t size);
83 const fs::path path() const { return path_; }
89 TemporaryFile::TemporaryFile(size_t size)
90 : path_(fs::temp_directory_path() / fs::unique_path()) {
91 CHECK_EQ(size % sizeof(uint32_t), 0);
92 size /= sizeof(uint32_t);
93 const uint32_t seed = 42;
94 std::mt19937 rnd(seed);
96 const size_t bufferSize = 1U << 16;
97 uint32_t buffer[bufferSize];
99 FILE* fp = ::fopen(path_.c_str(), "wb");
100 PCHECK(fp != nullptr);
102 size_t n = std::min(size, bufferSize);
103 for (size_t i = 0; i < n; ++i) {
106 size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
107 PCHECK(written == n);
110 PCHECK(::fclose(fp) == 0);
113 TemporaryFile::~TemporaryFile() {
116 } catch (const fs::filesystem_error& e) {
117 LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
121 TemporaryFile tempFile(6 << 20); // 6MiB
123 typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
124 ManagedBuffer allocateAligned(size_t size) {
126 int rc = posix_memalign(&buf, kAlign, size);
127 CHECK_EQ(rc, 0) << strerror(rc);
128 return ManagedBuffer(reinterpret_cast<char*>(buf), free);
131 void testReadsSerially(const std::vector<TestSpec>& specs,
132 AsyncIO::PollMode pollMode) {
133 AsyncIO aioReader(1, pollMode);
135 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
141 for (size_t i = 0; i < specs.size(); i++) {
142 auto buf = allocateAligned(specs[i].size);
143 op.pread(fd, buf.get(), specs[i].size, specs[i].start);
144 aioReader.submit(&op);
145 EXPECT_EQ((i + 1), aioReader.totalSubmits());
146 EXPECT_EQ(aioReader.pending(), 1);
147 auto ops = readerWait(&aioReader);
148 EXPECT_EQ(1, ops.size());
149 EXPECT_TRUE(ops[0] == &op);
150 EXPECT_EQ(aioReader.pending(), 0);
151 ssize_t res = op.result();
152 EXPECT_LE(0, res) << folly::errnoStr(-res);
153 EXPECT_EQ(specs[i].size, res);
158 void testReadsParallel(const std::vector<TestSpec>& specs,
159 AsyncIO::PollMode pollMode,
160 bool multithreaded) {
161 AsyncIO aioReader(specs.size(), pollMode);
162 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
163 std::vector<ManagedBuffer> bufs;
164 bufs.reserve(specs.size());
166 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
172 std::vector<std::thread> threads;
174 threads.reserve(specs.size());
176 for (size_t i = 0; i < specs.size(); i++) {
177 bufs.push_back(allocateAligned(specs[i].size));
179 auto submit = [&] (size_t i) {
180 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
181 aioReader.submit(&ops[i]);
183 for (size_t i = 0; i < specs.size(); i++) {
185 threads.emplace_back([&submit, i] { submit(i); });
190 for (auto& t : threads) {
193 std::vector<bool> pending(specs.size(), true);
195 size_t remaining = specs.size();
196 while (remaining != 0) {
197 EXPECT_EQ(remaining, aioReader.pending());
198 auto completed = readerWait(&aioReader);
199 size_t nrRead = completed.size();
200 EXPECT_NE(nrRead, 0);
203 for (size_t i = 0; i < nrRead; i++) {
204 int id = completed[i] - ops.get();
206 EXPECT_LT(id, specs.size());
207 EXPECT_TRUE(pending[id]);
209 ssize_t res = ops[id].result();
210 EXPECT_LE(0, res) << folly::errnoStr(-res);
211 EXPECT_EQ(specs[id].size, res);
214 EXPECT_EQ(specs.size(), aioReader.totalSubmits());
216 EXPECT_EQ(aioReader.pending(), 0);
217 for (size_t i = 0; i < pending.size(); i++) {
218 EXPECT_FALSE(pending[i]);
222 void testReadsQueued(const std::vector<TestSpec>& specs,
223 AsyncIO::PollMode pollMode) {
224 size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
225 AsyncIO aioReader(readerCapacity, pollMode);
226 AsyncIOQueue aioQueue(&aioReader);
227 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
228 std::vector<ManagedBuffer> bufs;
230 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
235 for (size_t i = 0; i < specs.size(); i++) {
236 bufs.push_back(allocateAligned(specs[i].size));
237 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
238 aioQueue.submit(&ops[i]);
240 std::vector<bool> pending(specs.size(), true);
242 size_t remaining = specs.size();
243 while (remaining != 0) {
244 if (remaining >= readerCapacity) {
245 EXPECT_EQ(readerCapacity, aioReader.pending());
246 EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
248 EXPECT_EQ(remaining, aioReader.pending());
249 EXPECT_EQ(0, aioQueue.queued());
251 auto completed = readerWait(&aioReader);
252 size_t nrRead = completed.size();
253 EXPECT_NE(nrRead, 0);
256 for (size_t i = 0; i < nrRead; i++) {
257 int id = completed[i] - ops.get();
259 EXPECT_LT(id, specs.size());
260 EXPECT_TRUE(pending[id]);
262 ssize_t res = ops[id].result();
263 EXPECT_LE(0, res) << folly::errnoStr(-res);
264 EXPECT_EQ(specs[id].size, res);
267 EXPECT_EQ(specs.size(), aioReader.totalSubmits());
268 EXPECT_EQ(aioReader.pending(), 0);
269 EXPECT_EQ(aioQueue.queued(), 0);
270 for (size_t i = 0; i < pending.size(); i++) {
271 EXPECT_FALSE(pending[i]);
275 void testReads(const std::vector<TestSpec>& specs,
276 AsyncIO::PollMode pollMode) {
277 testReadsSerially(specs, pollMode);
278 testReadsParallel(specs, pollMode, false);
279 testReadsParallel(specs, pollMode, true);
280 testReadsQueued(specs, pollMode);
283 } // anonymous namespace
285 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
286 testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
289 TEST(AsyncIO, ZeroAsyncDataPollable) {
290 testReads({{0, 0}}, AsyncIO::POLLABLE);
293 TEST(AsyncIO, SingleAsyncDataNotPollable) {
294 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
295 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
298 TEST(AsyncIO, SingleAsyncDataPollable) {
299 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
300 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
303 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
305 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
306 AsyncIO::NOT_POLLABLE);
308 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
309 AsyncIO::NOT_POLLABLE);
313 {kAlign, 5*1024*1024}
314 }, AsyncIO::NOT_POLLABLE);
322 }, AsyncIO::NOT_POLLABLE);
325 TEST(AsyncIO, MultipleAsyncDataPollable) {
327 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
330 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
335 {kAlign, 5*1024*1024}
336 }, AsyncIO::NOT_POLLABLE);
344 }, AsyncIO::NOT_POLLABLE);
347 TEST(AsyncIO, ManyAsyncDataNotPollable) {
349 std::vector<TestSpec> v;
350 for (int i = 0; i < 1000; i++) {
351 v.push_back({off_t(kAlign * i), kAlign});
353 testReads(v, AsyncIO::NOT_POLLABLE);
357 TEST(AsyncIO, ManyAsyncDataPollable) {
359 std::vector<TestSpec> v;
360 for (int i = 0; i < 1000; i++) {
361 v.push_back({off_t(kAlign * i), kAlign});
363 testReads(v, AsyncIO::POLLABLE);
367 TEST(AsyncIO, NonBlockingWait) {
368 AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
370 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
375 size_t size = 2 * kAlign;
376 auto buf = allocateAligned(size);
377 op.pread(fd, buf.get(), size, 0);
378 aioReader.submit(&op);
379 EXPECT_EQ(aioReader.pending(), 1);
381 folly::Range<AsyncIO::Op**> completed;
382 while (completed.empty()) {
383 // poll without blocking until the read request completes.
384 completed = aioReader.wait(0);
386 EXPECT_EQ(completed.size(), 1);
388 EXPECT_TRUE(completed[0] == &op);
389 ssize_t res = op.result();
390 EXPECT_LE(0, res) << folly::errnoStr(-res);
391 EXPECT_EQ(size, res);
392 EXPECT_EQ(aioReader.pending(), 0);
395 TEST(AsyncIO, Cancel) {
396 constexpr size_t kNumOps = 10;
398 AsyncIO aioReader(kNumOps, AsyncIO::NOT_POLLABLE);
399 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
405 std::vector<AsyncIO::Op> ops(kNumOps);
406 std::vector<ManagedBuffer> bufs;
408 size_t completed = 0;
409 for (auto& op : ops) {
410 const size_t size = 2 * kAlign;
411 bufs.push_back(allocateAligned(size));
412 op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
413 op.pread(fd, bufs.back().get(), size, 0);
414 aioReader.submit(&op);
417 EXPECT_EQ(aioReader.pending(), kNumOps);
418 EXPECT_EQ(completed, 0);
421 auto result = aioReader.wait(1);
422 EXPECT_EQ(result.size(), 1);
424 EXPECT_EQ(completed, 1);
425 EXPECT_EQ(aioReader.pending(), kNumOps - 1);
427 EXPECT_EQ(aioReader.cancel(), kNumOps - 1);
428 EXPECT_EQ(aioReader.pending(), 0);
429 EXPECT_EQ(completed, 1);
432 for (auto& op : ops) {
433 if (op.state() == AsyncIOOp::State::COMPLETED) {
436 EXPECT_TRUE(op.state() == AsyncIOOp::State::CANCELED) << op;
439 EXPECT_EQ(completed, 1);