Fix async_io_test to work with larger block sizes
[folly.git] / folly / experimental / io / test / AsyncIOTest.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/AsyncIO.h"
18
19 #include <sys/stat.h>
20 #include <sys/types.h>
21 #include <fcntl.h>
22 #include <poll.h>
23
24 #include <cstdlib>
25 #include <cstdio>
26 #include <memory>
27 #include <random>
28 #include <vector>
29
30 #include <glog/logging.h>
31 #include <gtest/gtest.h>
32
33 #include "folly/experimental/io/FsUtil.h"
34 #include "folly/ScopeGuard.h"
35 #include "folly/String.h"
36
37 namespace fs = folly::fs;
38 using folly::AsyncIO;
39 using folly::AsyncIOQueue;
40
41 namespace {
42
43 constexpr size_t kAlign = 4096;  // align reads to 4096 B (for O_DIRECT)
44
45 struct TestSpec {
46   off_t start;
47   size_t size;
48 };
49
50 void waitUntilReadable(int fd) {
51   pollfd pfd;
52   pfd.fd = fd;
53   pfd.events = POLLIN;
54
55   int r;
56   do {
57     r = poll(&pfd, 1, -1);  // wait forever
58   } while (r == -1 && errno == EINTR);
59   PCHECK(r == 1);
60   CHECK_EQ(pfd.revents, POLLIN);  // no errors etc
61 }
62
63 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
64   int fd = reader->pollFd();
65   if (fd == -1) {
66     return reader->wait(1);
67   } else {
68     waitUntilReadable(fd);
69     return reader->pollCompleted();
70   }
71 }
72
73 // Temporary file that is NOT kept open but is deleted on exit.
74 // Generate random-looking but reproduceable data.
75 class TemporaryFile {
76  public:
77   explicit TemporaryFile(size_t size);
78   ~TemporaryFile();
79
80   const fs::path path() const { return path_; }
81
82  private:
83   fs::path path_;
84 };
85
86 TemporaryFile::TemporaryFile(size_t size)
87   : path_(fs::temp_directory_path() / fs::unique_path()) {
88   CHECK_EQ(size % sizeof(uint32_t), 0);
89   size /= sizeof(uint32_t);
90   const uint32_t seed = 42;
91   std::mt19937 rnd(seed);
92
93   const size_t bufferSize = 1U << 16;
94   uint32_t buffer[bufferSize];
95
96   FILE* fp = ::fopen(path_.c_str(), "wb");
97   PCHECK(fp != nullptr);
98   while (size) {
99     size_t n = std::min(size, bufferSize);
100     for (size_t i = 0; i < n; ++i) {
101       buffer[i] = rnd();
102     }
103     size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
104     PCHECK(written == n);
105     size -= written;
106   }
107   PCHECK(::fclose(fp) == 0);
108 }
109
110 TemporaryFile::~TemporaryFile() {
111   try {
112     fs::remove(path_);
113   } catch (const fs::filesystem_error& e) {
114     LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
115   }
116 }
117
118 TemporaryFile tempFile(6 << 20);  // 6MiB
119
120 typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
121 ManagedBuffer allocateAligned(size_t size) {
122   void* buf;
123   int rc = posix_memalign(&buf, kAlign, size);
124   CHECK_EQ(rc, 0) << strerror(rc);
125   return ManagedBuffer(reinterpret_cast<char*>(buf), free);
126 }
127
128 void testReadsSerially(const std::vector<TestSpec>& specs,
129                        AsyncIO::PollMode pollMode) {
130   AsyncIO aioReader(1, pollMode);
131   AsyncIO::Op op;
132   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
133   PCHECK(fd != -1);
134   SCOPE_EXIT {
135     ::close(fd);
136   };
137
138   for (int i = 0; i < specs.size(); i++) {
139     auto buf = allocateAligned(specs[i].size);
140     op.pread(fd, buf.get(), specs[i].size, specs[i].start);
141     aioReader.submit(&op);
142     EXPECT_EQ(aioReader.pending(), 1);
143     auto ops = readerWait(&aioReader);
144     EXPECT_EQ(1, ops.size());
145     EXPECT_TRUE(ops[0] == &op);
146     EXPECT_EQ(aioReader.pending(), 0);
147     ssize_t res = op.result();
148     EXPECT_LE(0, res) << folly::errnoStr(-res);
149     EXPECT_EQ(specs[i].size, res);
150     op.reset();
151   }
152 }
153
154 void testReadsParallel(const std::vector<TestSpec>& specs,
155                        AsyncIO::PollMode pollMode) {
156   AsyncIO aioReader(specs.size(), pollMode);
157   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
158   std::vector<ManagedBuffer> bufs;
159
160   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
161   PCHECK(fd != -1);
162   SCOPE_EXIT {
163     ::close(fd);
164   };
165   for (int i = 0; i < specs.size(); i++) {
166     bufs.push_back(allocateAligned(specs[i].size));
167     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
168     aioReader.submit(&ops[i]);
169   }
170   std::vector<bool> pending(specs.size(), true);
171
172   size_t remaining = specs.size();
173   while (remaining != 0) {
174     EXPECT_EQ(remaining, aioReader.pending());
175     auto completed = readerWait(&aioReader);
176     size_t nrRead = completed.size();
177     EXPECT_NE(nrRead, 0);
178     remaining -= nrRead;
179
180     for (int i = 0; i < nrRead; i++) {
181       int id = completed[i] - ops.get();
182       EXPECT_GE(id, 0);
183       EXPECT_LT(id, specs.size());
184       EXPECT_TRUE(pending[id]);
185       pending[id] = false;
186       ssize_t res = ops[id].result();
187       EXPECT_LE(0, res) << folly::errnoStr(-res);
188       EXPECT_EQ(specs[id].size, res);
189     }
190   }
191   EXPECT_EQ(aioReader.pending(), 0);
192   for (int i = 0; i < pending.size(); i++) {
193     EXPECT_FALSE(pending[i]);
194   }
195 }
196
197 void testReadsQueued(const std::vector<TestSpec>& specs,
198                      AsyncIO::PollMode pollMode) {
199   size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
200   AsyncIO aioReader(readerCapacity, pollMode);
201   AsyncIOQueue aioQueue(&aioReader);
202   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
203   std::vector<ManagedBuffer> bufs;
204
205   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
206   PCHECK(fd != -1);
207   SCOPE_EXIT {
208     ::close(fd);
209   };
210   for (int i = 0; i < specs.size(); i++) {
211     bufs.push_back(allocateAligned(specs[i].size));
212     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
213     aioQueue.submit(&ops[i]);
214   }
215   std::vector<bool> pending(specs.size(), true);
216
217   size_t remaining = specs.size();
218   while (remaining != 0) {
219     if (remaining >= readerCapacity) {
220       EXPECT_EQ(readerCapacity, aioReader.pending());
221       EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
222     } else {
223       EXPECT_EQ(remaining, aioReader.pending());
224       EXPECT_EQ(0, aioQueue.queued());
225     }
226     auto completed = readerWait(&aioReader);
227     size_t nrRead = completed.size();
228     EXPECT_NE(nrRead, 0);
229     remaining -= nrRead;
230
231     for (int i = 0; i < nrRead; i++) {
232       int id = completed[i] - ops.get();
233       EXPECT_GE(id, 0);
234       EXPECT_LT(id, specs.size());
235       EXPECT_TRUE(pending[id]);
236       pending[id] = false;
237       ssize_t res = ops[id].result();
238       EXPECT_LE(0, res) << folly::errnoStr(-res);
239       EXPECT_EQ(specs[id].size, res);
240     }
241   }
242   EXPECT_EQ(aioReader.pending(), 0);
243   EXPECT_EQ(aioQueue.queued(), 0);
244   for (int i = 0; i < pending.size(); i++) {
245     EXPECT_FALSE(pending[i]);
246   }
247 }
248
249 void testReads(const std::vector<TestSpec>& specs,
250                AsyncIO::PollMode pollMode) {
251   testReadsSerially(specs, pollMode);
252   testReadsParallel(specs, pollMode);
253   testReadsQueued(specs, pollMode);
254 }
255
256 }  // anonymous namespace
257
258 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
259   testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
260 }
261
262 TEST(AsyncIO, ZeroAsyncDataPollable) {
263   testReads({{0, 0}}, AsyncIO::POLLABLE);
264 }
265
266 TEST(AsyncIO, SingleAsyncDataNotPollable) {
267   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
268   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
269 }
270
271 TEST(AsyncIO, SingleAsyncDataPollable) {
272   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
273   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
274 }
275
276 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
277   testReads(
278       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
279       AsyncIO::NOT_POLLABLE);
280   testReads(
281       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
282       AsyncIO::NOT_POLLABLE);
283
284   testReads({
285     {0, 5*1024*1024},
286     {kAlign, 5*1024*1024}
287   }, AsyncIO::NOT_POLLABLE);
288
289   testReads({
290     {kAlign, 0},
291     {kAlign, kAlign},
292     {kAlign, 2*kAlign},
293     {kAlign, 20*kAlign},
294     {kAlign, 1024*1024},
295   }, AsyncIO::NOT_POLLABLE);
296 }
297
298 TEST(AsyncIO, MultipleAsyncDataPollable) {
299   testReads(
300       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
301       AsyncIO::POLLABLE);
302   testReads(
303       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
304       AsyncIO::POLLABLE);
305
306   testReads({
307     {0, 5*1024*1024},
308     {kAlign, 5*1024*1024}
309   }, AsyncIO::NOT_POLLABLE);
310
311   testReads({
312     {kAlign, 0},
313     {kAlign, kAlign},
314     {kAlign, 2*kAlign},
315     {kAlign, 20*kAlign},
316     {kAlign, 1024*1024},
317   }, AsyncIO::NOT_POLLABLE);
318 }
319
320 TEST(AsyncIO, ManyAsyncDataNotPollable) {
321   {
322     std::vector<TestSpec> v;
323     for (int i = 0; i < 1000; i++) {
324       v.push_back({kAlign * i, kAlign});
325     }
326     testReads(v, AsyncIO::NOT_POLLABLE);
327   }
328 }
329
330 TEST(AsyncIO, ManyAsyncDataPollable) {
331   {
332     std::vector<TestSpec> v;
333     for (int i = 0; i < 1000; i++) {
334       v.push_back({kAlign * i, kAlign});
335     }
336     testReads(v, AsyncIO::POLLABLE);
337   }
338 }
339
340 TEST(AsyncIO, NonBlockingWait) {
341   AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
342   AsyncIO::Op op;
343   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
344   PCHECK(fd != -1);
345   SCOPE_EXIT {
346     ::close(fd);
347   };
348   size_t size = 2*kAlign;
349   auto buf = allocateAligned(size);
350   op.pread(fd, buf.get(), size, 0);
351   aioReader.submit(&op);
352   EXPECT_EQ(aioReader.pending(), 1);
353
354   folly::Range<AsyncIO::Op**> completed;
355   while (completed.empty()) {
356     // poll without blocking until the read request completes.
357     completed = aioReader.wait(0);
358   }
359   EXPECT_EQ(completed.size(), 1);
360
361   EXPECT_TRUE(completed[0] == &op);
362   ssize_t res = op.result();
363   EXPECT_LE(0, res) << folly::errnoStr(-res);
364   EXPECT_EQ(size, res);
365   EXPECT_EQ(aioReader.pending(), 0);
366 }
367
368