2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/io/AsyncIO.h>
20 #include <sys/types.h>
30 #include <glog/logging.h>
31 #include <gtest/gtest.h>
33 #include <folly/experimental/io/FsUtil.h>
34 #include <folly/ScopeGuard.h>
35 #include <folly/String.h>
36 #include <folly/portability/Sockets.h>
38 namespace fs = folly::fs;
40 using folly::AsyncIOQueue;
44 constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
51 void waitUntilReadable(int fd) {
58 r = poll(&pfd, 1, -1); // wait forever
59 } while (r == -1 && errno == EINTR);
61 CHECK_EQ(pfd.revents, POLLIN); // no errors etc
64 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
65 int fd = reader->pollFd();
67 return reader->wait(1);
69 waitUntilReadable(fd);
70 return reader->pollCompleted();
74 // Temporary file that is NOT kept open but is deleted on exit.
75 // Generate random-looking but reproduceable data.
78 explicit TemporaryFile(size_t size);
81 const fs::path path() const { return path_; }
87 TemporaryFile::TemporaryFile(size_t size)
88 : path_(fs::temp_directory_path() / fs::unique_path()) {
89 CHECK_EQ(size % sizeof(uint32_t), 0);
90 size /= sizeof(uint32_t);
91 const uint32_t seed = 42;
92 std::mt19937 rnd(seed);
94 const size_t bufferSize = 1U << 16;
95 uint32_t buffer[bufferSize];
97 FILE* fp = ::fopen(path_.c_str(), "wb");
98 PCHECK(fp != nullptr);
100 size_t n = std::min(size, bufferSize);
101 for (size_t i = 0; i < n; ++i) {
104 size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
105 PCHECK(written == n);
108 PCHECK(::fclose(fp) == 0);
111 TemporaryFile::~TemporaryFile() {
114 } catch (const fs::filesystem_error& e) {
115 LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
119 TemporaryFile tempFile(6 << 20); // 6MiB
121 typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
122 ManagedBuffer allocateAligned(size_t size) {
124 int rc = posix_memalign(&buf, kAlign, size);
125 CHECK_EQ(rc, 0) << strerror(rc);
126 return ManagedBuffer(reinterpret_cast<char*>(buf), free);
129 void testReadsSerially(const std::vector<TestSpec>& specs,
130 AsyncIO::PollMode pollMode) {
131 AsyncIO aioReader(1, pollMode);
133 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
139 for (size_t i = 0; i < specs.size(); i++) {
140 auto buf = allocateAligned(specs[i].size);
141 op.pread(fd, buf.get(), specs[i].size, specs[i].start);
142 aioReader.submit(&op);
143 EXPECT_EQ((i + 1), aioReader.totalSubmits());
144 EXPECT_EQ(aioReader.pending(), 1);
145 auto ops = readerWait(&aioReader);
146 EXPECT_EQ(1, ops.size());
147 EXPECT_TRUE(ops[0] == &op);
148 EXPECT_EQ(aioReader.pending(), 0);
149 ssize_t res = op.result();
150 EXPECT_LE(0, res) << folly::errnoStr(-res);
151 EXPECT_EQ(specs[i].size, res);
156 void testReadsParallel(const std::vector<TestSpec>& specs,
157 AsyncIO::PollMode pollMode,
158 bool multithreaded) {
159 AsyncIO aioReader(specs.size(), pollMode);
160 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
161 std::vector<ManagedBuffer> bufs;
162 bufs.reserve(specs.size());
164 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
170 std::vector<std::thread> threads;
172 threads.reserve(specs.size());
174 for (size_t i = 0; i < specs.size(); i++) {
175 bufs.push_back(allocateAligned(specs[i].size));
177 auto submit = [&] (size_t i) {
178 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
179 aioReader.submit(&ops[i]);
181 for (size_t i = 0; i < specs.size(); i++) {
183 threads.emplace_back([&submit, i] { submit(i); });
188 for (auto& t : threads) {
191 std::vector<bool> pending(specs.size(), true);
193 size_t remaining = specs.size();
194 while (remaining != 0) {
195 EXPECT_EQ(remaining, aioReader.pending());
196 auto completed = readerWait(&aioReader);
197 size_t nrRead = completed.size();
198 EXPECT_NE(nrRead, 0);
201 for (size_t i = 0; i < nrRead; i++) {
202 int id = completed[i] - ops.get();
204 EXPECT_LT(id, specs.size());
205 EXPECT_TRUE(pending[id]);
207 ssize_t res = ops[id].result();
208 EXPECT_LE(0, res) << folly::errnoStr(-res);
209 EXPECT_EQ(specs[id].size, res);
212 EXPECT_EQ(specs.size(), aioReader.totalSubmits());
214 EXPECT_EQ(aioReader.pending(), 0);
215 for (size_t i = 0; i < pending.size(); i++) {
216 EXPECT_FALSE(pending[i]);
220 void testReadsQueued(const std::vector<TestSpec>& specs,
221 AsyncIO::PollMode pollMode) {
222 size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
223 AsyncIO aioReader(readerCapacity, pollMode);
224 AsyncIOQueue aioQueue(&aioReader);
225 std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
226 std::vector<ManagedBuffer> bufs;
228 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
233 for (size_t i = 0; i < specs.size(); i++) {
234 bufs.push_back(allocateAligned(specs[i].size));
235 ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
236 aioQueue.submit(&ops[i]);
238 std::vector<bool> pending(specs.size(), true);
240 size_t remaining = specs.size();
241 while (remaining != 0) {
242 if (remaining >= readerCapacity) {
243 EXPECT_EQ(readerCapacity, aioReader.pending());
244 EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
246 EXPECT_EQ(remaining, aioReader.pending());
247 EXPECT_EQ(0, aioQueue.queued());
249 auto completed = readerWait(&aioReader);
250 size_t nrRead = completed.size();
251 EXPECT_NE(nrRead, 0);
254 for (size_t i = 0; i < nrRead; i++) {
255 int id = completed[i] - ops.get();
257 EXPECT_LT(id, specs.size());
258 EXPECT_TRUE(pending[id]);
260 ssize_t res = ops[id].result();
261 EXPECT_LE(0, res) << folly::errnoStr(-res);
262 EXPECT_EQ(specs[id].size, res);
265 EXPECT_EQ(specs.size(), aioReader.totalSubmits());
266 EXPECT_EQ(aioReader.pending(), 0);
267 EXPECT_EQ(aioQueue.queued(), 0);
268 for (size_t i = 0; i < pending.size(); i++) {
269 EXPECT_FALSE(pending[i]);
273 void testReads(const std::vector<TestSpec>& specs,
274 AsyncIO::PollMode pollMode) {
275 testReadsSerially(specs, pollMode);
276 testReadsParallel(specs, pollMode, false);
277 testReadsParallel(specs, pollMode, true);
278 testReadsQueued(specs, pollMode);
281 } // anonymous namespace
283 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
284 testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
287 TEST(AsyncIO, ZeroAsyncDataPollable) {
288 testReads({{0, 0}}, AsyncIO::POLLABLE);
291 TEST(AsyncIO, SingleAsyncDataNotPollable) {
292 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
293 testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
296 TEST(AsyncIO, SingleAsyncDataPollable) {
297 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
298 testReads({{0, kAlign}}, AsyncIO::POLLABLE);
301 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
303 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
304 AsyncIO::NOT_POLLABLE);
306 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
307 AsyncIO::NOT_POLLABLE);
311 {kAlign, 5*1024*1024}
312 }, AsyncIO::NOT_POLLABLE);
320 }, AsyncIO::NOT_POLLABLE);
323 TEST(AsyncIO, MultipleAsyncDataPollable) {
325 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
328 {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
333 {kAlign, 5*1024*1024}
334 }, AsyncIO::NOT_POLLABLE);
342 }, AsyncIO::NOT_POLLABLE);
345 TEST(AsyncIO, ManyAsyncDataNotPollable) {
347 std::vector<TestSpec> v;
348 for (int i = 0; i < 1000; i++) {
349 v.push_back({off_t(kAlign * i), kAlign});
351 testReads(v, AsyncIO::NOT_POLLABLE);
355 TEST(AsyncIO, ManyAsyncDataPollable) {
357 std::vector<TestSpec> v;
358 for (int i = 0; i < 1000; i++) {
359 v.push_back({off_t(kAlign * i), kAlign});
361 testReads(v, AsyncIO::POLLABLE);
365 TEST(AsyncIO, NonBlockingWait) {
366 AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
368 int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
373 size_t size = 2*kAlign;
374 auto buf = allocateAligned(size);
375 op.pread(fd, buf.get(), size, 0);
376 aioReader.submit(&op);
377 EXPECT_EQ(aioReader.pending(), 1);
379 folly::Range<AsyncIO::Op**> completed;
380 while (completed.empty()) {
381 // poll without blocking until the read request completes.
382 completed = aioReader.wait(0);
384 EXPECT_EQ(completed.size(), 1);
386 EXPECT_TRUE(completed[0] == &op);
387 ssize_t res = op.result();
388 EXPECT_LE(0, res) << folly::errnoStr(-res);
389 EXPECT_EQ(size, res);
390 EXPECT_EQ(aioReader.pending(), 0);