2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/experimental/io/AsyncIO.h"
20 #include <sys/types.h>
31 #include <glog/logging.h>
32 #include <gtest/gtest.h>
34 #include "folly/experimental/io/FsUtil.h"
35 #include "folly/ScopeGuard.h"
36 #include "folly/String.h"
38 namespace fs = folly::fs;
40 using folly::AsyncIOQueue;
44 constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
51 void waitUntilReadable(int fd) {
58 r = poll(&pfd, 1, -1); // wait forever
59 } while (r == -1 && errno == EINTR);
61 CHECK_EQ(pfd.revents, POLLIN); // no errors etc
64 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
65 int fd = reader->pollFd();
67 return reader->wait(1);
69 waitUntilReadable(fd);
70 return reader->pollCompleted();
74 // Temporary file that is NOT kept open but is deleted on exit.
75 // Generate random-looking but reproduceable data.
78 explicit TemporaryFile(size_t size);
81 const fs::path path() const { return path_; }
87 TemporaryFile::TemporaryFile(size_t size)
88 : path_(fs::temp_directory_path() / fs::unique_path()) {
89 CHECK_EQ(size % sizeof(uint32_t), 0);
90 size /= sizeof(uint32_t);
91 const uint32_t seed = 42;
92 std::mt19937 rnd(seed);
94 const size_t bufferSize = 1U << 16;
95 uint32_t buffer[bufferSize];
97 FILE* fp = ::fopen(path_.c_str(), "wb");
98 PCHECK(fp != nullptr);
100 size_t n = std::min(size, bufferSize);
101 for (size_t i = 0; i < n; ++i) {
104 size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
105 PCHECK(written == n);
108 PCHECK(::fclose(fp) == 0);
111 TemporaryFile::~TemporaryFile() {
114 } catch (const fs::filesystem_error& e) {
115 LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
119 TemporaryFile tempFile(6 << 20); // 6MiB
121 typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
122 ManagedBuffer allocateAligned(size_t size) {
124 int rc = posix_memalign(&buf, kAlign, size);
125 CHECK_EQ(rc, 0) << strerror(rc);
126 return ManagedBuffer(reinterpret_cast<char*>(buf), free);
129 void testReadsSerially(const std::vector<TestSpec>& specs,
130 AsyncIO::PollMode pollMode) {
131 AsyncIO aioReader(1, pollMode);
133 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
139 for (int i = 0; i < specs.size(); i++) {
140 auto buf = allocateAligned(specs[i].size);
141 op.pread(fd, buf.get(), specs[i].size, specs[i].start);
142 aioReader.submit(&op);
143 EXPECT_EQ(aioReader.pending(), 1);
144 auto ops = readerWait(&aioReader);
145 EXPECT_EQ(1, ops.size());
146 EXPECT_TRUE(ops[0] == &op);
147 EXPECT_EQ(aioReader.pending(), 0);
148 ssize_t res = op.result();
149 EXPECT_LE(0, res) << folly::errnoStr(-res);
150 EXPECT_EQ(specs[i].size, res);
155 void testReadsParallel(const std::vector<TestSpec>& specs,
156 AsyncIO::PollMode pollMode,
157 bool multithreaded) {
158 AsyncIO aioReader(specs.size(), pollMode);
159 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
160 std::vector<ManagedBuffer> bufs;
161 bufs.reserve(specs.size());
163 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
169 std::vector<std::thread> threads;
171 threads.reserve(specs.size());
173 for (int i = 0; i < specs.size(); i++) {
174 bufs.push_back(allocateAligned(specs[i].size));
176 auto submit = [&] (int i) {
177 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
178 aioReader.submit(&ops[i]);
180 for (int i = 0; i < specs.size(); i++) {
182 threads.emplace_back([&submit, i] { submit(i); });
187 for (auto& t : threads) {
190 std::vector<bool> pending(specs.size(), true);
192 size_t remaining = specs.size();
193 while (remaining != 0) {
194 EXPECT_EQ(remaining, aioReader.pending());
195 auto completed = readerWait(&aioReader);
196 size_t nrRead = completed.size();
197 EXPECT_NE(nrRead, 0);
200 for (int i = 0; i < nrRead; i++) {
201 int id = completed[i] - ops.get();
203 EXPECT_LT(id, specs.size());
204 EXPECT_TRUE(pending[id]);
206 ssize_t res = ops[id].result();
207 EXPECT_LE(0, res) << folly::errnoStr(-res);
208 EXPECT_EQ(specs[id].size, res);
211 EXPECT_EQ(aioReader.pending(), 0);
212 for (int i = 0; i < pending.size(); i++) {
213 EXPECT_FALSE(pending[i]);
217 void testReadsQueued(const std::vector<TestSpec>& specs,
218 AsyncIO::PollMode pollMode) {
219 size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
220 AsyncIO aioReader(readerCapacity, pollMode);
221 AsyncIOQueue aioQueue(&aioReader);
222 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
223 std::vector<ManagedBuffer> bufs;
225 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
230 for (int i = 0; i < specs.size(); i++) {
231 bufs.push_back(allocateAligned(specs[i].size));
232 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
233 aioQueue.submit(&ops[i]);
235 std::vector<bool> pending(specs.size(), true);
237 size_t remaining = specs.size();
238 while (remaining != 0) {
239 if (remaining >= readerCapacity) {
240 EXPECT_EQ(readerCapacity, aioReader.pending());
241 EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
243 EXPECT_EQ(remaining, aioReader.pending());
244 EXPECT_EQ(0, aioQueue.queued());
246 auto completed = readerWait(&aioReader);
247 size_t nrRead = completed.size();
248 EXPECT_NE(nrRead, 0);
251 for (int i = 0; i < nrRead; i++) {
252 int id = completed[i] - ops.get();
254 EXPECT_LT(id, specs.size());
255 EXPECT_TRUE(pending[id]);
257 ssize_t res = ops[id].result();
258 EXPECT_LE(0, res) << folly::errnoStr(-res);
259 EXPECT_EQ(specs[id].size, res);
262 EXPECT_EQ(aioReader.pending(), 0);
263 EXPECT_EQ(aioQueue.queued(), 0);
264 for (int i = 0; i < pending.size(); i++) {
265 EXPECT_FALSE(pending[i]);
269 void testReads(const std::vector<TestSpec>& specs,
270 AsyncIO::PollMode pollMode) {
271 testReadsSerially(specs, pollMode);
272 testReadsParallel(specs, pollMode, false);
273 testReadsParallel(specs, pollMode, true);
274 testReadsQueued(specs, pollMode);
277 } // anonymous namespace
279 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
280 testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
283 TEST(AsyncIO, ZeroAsyncDataPollable) {
284 testReads({{0, 0}}, AsyncIO::POLLABLE);
287 TEST(AsyncIO, SingleAsyncDataNotPollable) {
288 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
289 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
292 TEST(AsyncIO, SingleAsyncDataPollable) {
293 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
294 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
297 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
299 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
300 AsyncIO::NOT_POLLABLE);
302 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
303 AsyncIO::NOT_POLLABLE);
307 {kAlign, 5*1024*1024}
308 }, AsyncIO::NOT_POLLABLE);
316 }, AsyncIO::NOT_POLLABLE);
319 TEST(AsyncIO, MultipleAsyncDataPollable) {
321 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
324 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
329 {kAlign, 5*1024*1024}
330 }, AsyncIO::NOT_POLLABLE);
338 }, AsyncIO::NOT_POLLABLE);
341 TEST(AsyncIO, ManyAsyncDataNotPollable) {
343 std::vector<TestSpec> v;
344 for (int i = 0; i < 1000; i++) {
345 v.push_back({off_t(kAlign * i), kAlign});
347 testReads(v, AsyncIO::NOT_POLLABLE);
351 TEST(AsyncIO, ManyAsyncDataPollable) {
353 std::vector<TestSpec> v;
354 for (int i = 0; i < 1000; i++) {
355 v.push_back({off_t(kAlign * i), kAlign});
357 testReads(v, AsyncIO::POLLABLE);
361 TEST(AsyncIO, NonBlockingWait) {
362 AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
364 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
369 size_t size = 2*kAlign;
370 auto buf = allocateAligned(size);
371 op.pread(fd, buf.get(), size, 0);
372 aioReader.submit(&op);
373 EXPECT_EQ(aioReader.pending(), 1);
375 folly::Range<AsyncIO::Op**> completed;
376 while (completed.empty()) {
377 // poll without blocking until the read request completes.
378 completed = aioReader.wait(0);
380 EXPECT_EQ(completed.size(), 1);
382 EXPECT_TRUE(completed[0] == &op);
383 ssize_t res = op.result();
384 EXPECT_LE(0, res) << folly::errnoStr(-res);
385 EXPECT_EQ(size, res);
386 EXPECT_EQ(aioReader.pending(), 0);