2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/experimental/io/AsyncIO.h"
20 #include <sys/types.h>
30 #include <glog/logging.h>
31 #include <gtest/gtest.h>
33 #include "folly/experimental/io/FsUtil.h"
34 #include "folly/ScopeGuard.h"
35 #include "folly/String.h"
37 namespace fs = folly::fs;
39 using folly::AsyncIOQueue;
43 constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
50 void waitUntilReadable(int fd) {
57 r = poll(&pfd, 1, -1); // wait forever
58 } while (r == -1 && errno == EINTR);
60 CHECK_EQ(pfd.revents, POLLIN); // no errors etc
63 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
64 int fd = reader->pollFd();
66 return reader->wait(1);
68 waitUntilReadable(fd);
69 return reader->pollCompleted();
73 // Temporary file that is NOT kept open but is deleted on exit.
74 // Generate random-looking but reproduceable data.
77 explicit TemporaryFile(size_t size);
80 const fs::path path() const { return path_; }
86 TemporaryFile::TemporaryFile(size_t size)
87 : path_(fs::temp_directory_path() / fs::unique_path()) {
88 CHECK_EQ(size % sizeof(uint32_t), 0);
89 size /= sizeof(uint32_t);
90 const uint32_t seed = 42;
91 std::mt19937 rnd(seed);
93 const size_t bufferSize = 1U << 16;
94 uint32_t buffer[bufferSize];
96 FILE* fp = ::fopen(path_.c_str(), "wb");
97 PCHECK(fp != nullptr);
99 size_t n = std::min(size, bufferSize);
100 for (size_t i = 0; i < n; ++i) {
103 size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
104 PCHECK(written == n);
107 PCHECK(::fclose(fp) == 0);
110 TemporaryFile::~TemporaryFile() {
113 } catch (const fs::filesystem_error& e) {
114 LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
118 TemporaryFile tempFile(6 << 20); // 6MiB
120 typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
121 ManagedBuffer allocateAligned(size_t size) {
123 int rc = posix_memalign(&buf, kAlign, size);
124 CHECK_EQ(rc, 0) << strerror(rc);
125 return ManagedBuffer(reinterpret_cast<char*>(buf), free);
128 void testReadsSerially(const std::vector<TestSpec>& specs,
129 AsyncIO::PollMode pollMode) {
130 AsyncIO aioReader(1, pollMode);
132 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
138 for (int i = 0; i < specs.size(); i++) {
139 auto buf = allocateAligned(specs[i].size);
140 op.pread(fd, buf.get(), specs[i].size, specs[i].start);
141 aioReader.submit(&op);
142 EXPECT_EQ(aioReader.pending(), 1);
143 auto ops = readerWait(&aioReader);
144 EXPECT_EQ(1, ops.size());
145 EXPECT_TRUE(ops[0] == &op);
146 EXPECT_EQ(aioReader.pending(), 0);
147 ssize_t res = op.result();
148 EXPECT_LE(0, res) << folly::errnoStr(-res);
149 EXPECT_EQ(specs[i].size, res);
154 void testReadsParallel(const std::vector<TestSpec>& specs,
155 AsyncIO::PollMode pollMode) {
156 AsyncIO aioReader(specs.size(), pollMode);
157 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
158 std::vector<ManagedBuffer> bufs;
160 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
165 for (int i = 0; i < specs.size(); i++) {
166 bufs.push_back(allocateAligned(specs[i].size));
167 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
168 aioReader.submit(&ops[i]);
170 std::vector<bool> pending(specs.size(), true);
172 size_t remaining = specs.size();
173 while (remaining != 0) {
174 EXPECT_EQ(remaining, aioReader.pending());
175 auto completed = readerWait(&aioReader);
176 size_t nrRead = completed.size();
177 EXPECT_NE(nrRead, 0);
180 for (int i = 0; i < nrRead; i++) {
181 int id = completed[i] - ops.get();
183 EXPECT_LT(id, specs.size());
184 EXPECT_TRUE(pending[id]);
186 ssize_t res = ops[id].result();
187 EXPECT_LE(0, res) << folly::errnoStr(-res);
188 EXPECT_EQ(specs[id].size, res);
191 EXPECT_EQ(aioReader.pending(), 0);
192 for (int i = 0; i < pending.size(); i++) {
193 EXPECT_FALSE(pending[i]);
197 void testReadsQueued(const std::vector<TestSpec>& specs,
198 AsyncIO::PollMode pollMode) {
199 size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
200 AsyncIO aioReader(readerCapacity, pollMode);
201 AsyncIOQueue aioQueue(&aioReader);
202 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
203 std::vector<ManagedBuffer> bufs;
205 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
210 for (int i = 0; i < specs.size(); i++) {
211 bufs.push_back(allocateAligned(specs[i].size));
212 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
213 aioQueue.submit(&ops[i]);
215 std::vector<bool> pending(specs.size(), true);
217 size_t remaining = specs.size();
218 while (remaining != 0) {
219 if (remaining >= readerCapacity) {
220 EXPECT_EQ(readerCapacity, aioReader.pending());
221 EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
223 EXPECT_EQ(remaining, aioReader.pending());
224 EXPECT_EQ(0, aioQueue.queued());
226 auto completed = readerWait(&aioReader);
227 size_t nrRead = completed.size();
228 EXPECT_NE(nrRead, 0);
231 for (int i = 0; i < nrRead; i++) {
232 int id = completed[i] - ops.get();
234 EXPECT_LT(id, specs.size());
235 EXPECT_TRUE(pending[id]);
237 ssize_t res = ops[id].result();
238 EXPECT_LE(0, res) << folly::errnoStr(-res);
239 EXPECT_EQ(specs[id].size, res);
242 EXPECT_EQ(aioReader.pending(), 0);
243 EXPECT_EQ(aioQueue.queued(), 0);
244 for (int i = 0; i < pending.size(); i++) {
245 EXPECT_FALSE(pending[i]);
249 void testReads(const std::vector<TestSpec>& specs,
250 AsyncIO::PollMode pollMode) {
251 testReadsSerially(specs, pollMode);
252 testReadsParallel(specs, pollMode);
253 testReadsQueued(specs, pollMode);
256 } // anonymous namespace
258 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
259 testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
262 TEST(AsyncIO, ZeroAsyncDataPollable) {
263 testReads({{0, 0}}, AsyncIO::POLLABLE);
266 TEST(AsyncIO, SingleAsyncDataNotPollable) {
267 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
268 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
271 TEST(AsyncIO, SingleAsyncDataPollable) {
272 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
273 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
276 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
278 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
279 AsyncIO::NOT_POLLABLE);
281 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
282 AsyncIO::NOT_POLLABLE);
286 {kAlign, 5*1024*1024}
287 }, AsyncIO::NOT_POLLABLE);
295 }, AsyncIO::NOT_POLLABLE);
298 TEST(AsyncIO, MultipleAsyncDataPollable) {
300 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
303 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
308 {kAlign, 5*1024*1024}
309 }, AsyncIO::NOT_POLLABLE);
317 }, AsyncIO::NOT_POLLABLE);
320 TEST(AsyncIO, ManyAsyncDataNotPollable) {
322 std::vector<TestSpec> v;
323 for (int i = 0; i < 1000; i++) {
324 v.push_back({kAlign * i, kAlign});
326 testReads(v, AsyncIO::NOT_POLLABLE);
330 TEST(AsyncIO, ManyAsyncDataPollable) {
332 std::vector<TestSpec> v;
333 for (int i = 0; i < 1000; i++) {
334 v.push_back({kAlign * i, kAlign});
336 testReads(v, AsyncIO::POLLABLE);
340 TEST(AsyncIO, NonBlockingWait) {
341 AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
343 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
348 size_t size = 2*kAlign;
349 auto buf = allocateAligned(size);
350 op.pread(fd, buf.get(), size, 0);
351 aioReader.submit(&op);
352 EXPECT_EQ(aioReader.pending(), 1);
354 folly::Range<AsyncIO::Op**> completed;
355 while (completed.empty()) {
356 // poll without blocking until the read request completes.
357 completed = aioReader.wait(0);
359 EXPECT_EQ(completed.size(), 1);
361 EXPECT_TRUE(completed[0] == &op);
362 ssize_t res = op.result();
363 EXPECT_LE(0, res) << folly::errnoStr(-res);
364 EXPECT_EQ(size, res);
365 EXPECT_EQ(aioReader.pending(), 0);