7c776ef05b7f094cde60c704affe88280dd4e9d5
[folly.git] / folly / experimental / io / test / AsyncIOTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/io/AsyncIO.h>
18
19 #include <sys/stat.h>
20 #include <sys/types.h>
21 #include <fcntl.h>
22
23 #include <cstdlib>
24 #include <cstdio>
25 #include <memory>
26 #include <random>
27 #include <thread>
28 #include <vector>
29
30 #include <glog/logging.h>
31
32 #include <folly/experimental/io/FsUtil.h>
33 #include <folly/ScopeGuard.h>
34 #include <folly/String.h>
35 #include <folly/portability/GTest.h>
36 #include <folly/portability/Sockets.h>
37
38 namespace fs = folly::fs;
39
40 using folly::AsyncIO;
41 using folly::AsyncIOOp;
42 using folly::AsyncIOQueue;
43
44 namespace {
45
46 constexpr size_t kAlign = 4096;  // align reads to 4096 B (for O_DIRECT)
47
48 struct TestSpec {
49   off_t start;
50   size_t size;
51 };
52
53 void waitUntilReadable(int fd) {
54   pollfd pfd;
55   pfd.fd = fd;
56   pfd.events = POLLIN;
57
58   int r;
59   do {
60     r = poll(&pfd, 1, -1);  // wait forever
61   } while (r == -1 && errno == EINTR);
62   PCHECK(r == 1);
63   CHECK_EQ(pfd.revents, POLLIN);  // no errors etc
64 }
65
66 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
67   int fd = reader->pollFd();
68   if (fd == -1) {
69     return reader->wait(1);
70   } else {
71     waitUntilReadable(fd);
72     return reader->pollCompleted();
73   }
74 }
75
76 // Temporary file that is NOT kept open but is deleted on exit.
77 // Generate random-looking but reproduceable data.
78 class TemporaryFile {
79  public:
80   explicit TemporaryFile(size_t size);
81   ~TemporaryFile();
82
83   const fs::path path() const { return path_; }
84
85  private:
86   fs::path path_;
87 };
88
89 TemporaryFile::TemporaryFile(size_t size)
90     : path_(fs::temp_directory_path() / fs::unique_path()) {
91   CHECK_EQ(size % sizeof(uint32_t), 0);
92   size /= sizeof(uint32_t);
93   const uint32_t seed = 42;
94   std::mt19937 rnd(seed);
95
96   const size_t bufferSize = 1U << 16;
97   uint32_t buffer[bufferSize];
98
99   FILE* fp = ::fopen(path_.c_str(), "wb");
100   PCHECK(fp != nullptr);
101   while (size) {
102     size_t n = std::min(size, bufferSize);
103     for (size_t i = 0; i < n; ++i) {
104       buffer[i] = rnd();
105     }
106     size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
107     PCHECK(written == n);
108     size -= written;
109   }
110   PCHECK(::fclose(fp) == 0);
111 }
112
113 TemporaryFile::~TemporaryFile() {
114   try {
115     fs::remove(path_);
116   } catch (const fs::filesystem_error& e) {
117     LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
118   }
119 }
120
121 TemporaryFile tempFile(6 << 20);  // 6MiB
122
123 typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
124 ManagedBuffer allocateAligned(size_t size) {
125   void* buf;
126   int rc = posix_memalign(&buf, kAlign, size);
127   CHECK_EQ(rc, 0) << strerror(rc);
128   return ManagedBuffer(reinterpret_cast<char*>(buf), free);
129 }
130
131 void testReadsSerially(const std::vector<TestSpec>& specs,
132                        AsyncIO::PollMode pollMode) {
133   AsyncIO aioReader(1, pollMode);
134   AsyncIO::Op op;
135   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
136   PCHECK(fd != -1);
137   SCOPE_EXIT {
138     ::close(fd);
139   };
140
141   for (size_t i = 0; i < specs.size(); i++) {
142     auto buf = allocateAligned(specs[i].size);
143     op.pread(fd, buf.get(), specs[i].size, specs[i].start);
144     aioReader.submit(&op);
145     EXPECT_EQ((i + 1), aioReader.totalSubmits());
146     EXPECT_EQ(aioReader.pending(), 1);
147     auto ops = readerWait(&aioReader);
148     EXPECT_EQ(1, ops.size());
149     EXPECT_TRUE(ops[0] == &op);
150     EXPECT_EQ(aioReader.pending(), 0);
151     ssize_t res = op.result();
152     EXPECT_LE(0, res) << folly::errnoStr(-res);
153     EXPECT_EQ(specs[i].size, res);
154     op.reset();
155   }
156 }
157
158 void testReadsParallel(const std::vector<TestSpec>& specs,
159                        AsyncIO::PollMode pollMode,
160                        bool multithreaded) {
161   AsyncIO aioReader(specs.size(), pollMode);
162   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
163   std::vector<ManagedBuffer> bufs;
164   bufs.reserve(specs.size());
165
166   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
167   PCHECK(fd != -1);
168   SCOPE_EXIT {
169     ::close(fd);
170   };
171
172   std::vector<std::thread> threads;
173   if (multithreaded) {
174     threads.reserve(specs.size());
175   }
176   for (size_t i = 0; i < specs.size(); i++) {
177     bufs.push_back(allocateAligned(specs[i].size));
178   }
179   auto submit = [&] (size_t i) {
180     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
181     aioReader.submit(&ops[i]);
182   };
183   for (size_t i = 0; i < specs.size(); i++) {
184     if (multithreaded) {
185       threads.emplace_back([&submit, i] { submit(i); });
186     } else {
187       submit(i);
188     }
189   }
190   for (auto& t : threads) {
191     t.join();
192   }
193   std::vector<bool> pending(specs.size(), true);
194
195   size_t remaining = specs.size();
196   while (remaining != 0) {
197     EXPECT_EQ(remaining, aioReader.pending());
198     auto completed = readerWait(&aioReader);
199     size_t nrRead = completed.size();
200     EXPECT_NE(nrRead, 0);
201     remaining -= nrRead;
202
203     for (size_t i = 0; i < nrRead; i++) {
204       int id = completed[i] - ops.get();
205       EXPECT_GE(id, 0);
206       EXPECT_LT(id, specs.size());
207       EXPECT_TRUE(pending[id]);
208       pending[id] = false;
209       ssize_t res = ops[id].result();
210       EXPECT_LE(0, res) << folly::errnoStr(-res);
211       EXPECT_EQ(specs[id].size, res);
212     }
213   }
214   EXPECT_EQ(specs.size(), aioReader.totalSubmits());
215
216   EXPECT_EQ(aioReader.pending(), 0);
217   for (size_t i = 0; i < pending.size(); i++) {
218     EXPECT_FALSE(pending[i]);
219   }
220 }
221
222 void testReadsQueued(const std::vector<TestSpec>& specs,
223                      AsyncIO::PollMode pollMode) {
224   size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
225   AsyncIO aioReader(readerCapacity, pollMode);
226   AsyncIOQueue aioQueue(&aioReader);
227   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
228   std::vector<ManagedBuffer> bufs;
229
230   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
231   PCHECK(fd != -1);
232   SCOPE_EXIT {
233     ::close(fd);
234   };
235   for (size_t i = 0; i < specs.size(); i++) {
236     bufs.push_back(allocateAligned(specs[i].size));
237     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
238     aioQueue.submit(&ops[i]);
239   }
240   std::vector<bool> pending(specs.size(), true);
241
242   size_t remaining = specs.size();
243   while (remaining != 0) {
244     if (remaining >= readerCapacity) {
245       EXPECT_EQ(readerCapacity, aioReader.pending());
246       EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
247     } else {
248       EXPECT_EQ(remaining, aioReader.pending());
249       EXPECT_EQ(0, aioQueue.queued());
250     }
251     auto completed = readerWait(&aioReader);
252     size_t nrRead = completed.size();
253     EXPECT_NE(nrRead, 0);
254     remaining -= nrRead;
255
256     for (size_t i = 0; i < nrRead; i++) {
257       int id = completed[i] - ops.get();
258       EXPECT_GE(id, 0);
259       EXPECT_LT(id, specs.size());
260       EXPECT_TRUE(pending[id]);
261       pending[id] = false;
262       ssize_t res = ops[id].result();
263       EXPECT_LE(0, res) << folly::errnoStr(-res);
264       EXPECT_EQ(specs[id].size, res);
265     }
266   }
267   EXPECT_EQ(specs.size(), aioReader.totalSubmits());
268   EXPECT_EQ(aioReader.pending(), 0);
269   EXPECT_EQ(aioQueue.queued(), 0);
270   for (size_t i = 0; i < pending.size(); i++) {
271     EXPECT_FALSE(pending[i]);
272   }
273 }
274
275 void testReads(const std::vector<TestSpec>& specs,
276                AsyncIO::PollMode pollMode) {
277   testReadsSerially(specs, pollMode);
278   testReadsParallel(specs, pollMode, false);
279   testReadsParallel(specs, pollMode, true);
280   testReadsQueued(specs, pollMode);
281 }
282
283 }  // anonymous namespace
284
285 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
286   testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
287 }
288
289 TEST(AsyncIO, ZeroAsyncDataPollable) {
290   testReads({{0, 0}}, AsyncIO::POLLABLE);
291 }
292
293 TEST(AsyncIO, SingleAsyncDataNotPollable) {
294   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
295   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
296 }
297
298 TEST(AsyncIO, SingleAsyncDataPollable) {
299   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
300   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
301 }
302
303 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
304   testReads(
305       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
306       AsyncIO::NOT_POLLABLE);
307   testReads(
308       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
309       AsyncIO::NOT_POLLABLE);
310
311   testReads({
312     {0, 5*1024*1024},
313     {kAlign, 5*1024*1024}
314   }, AsyncIO::NOT_POLLABLE);
315
316   testReads({
317     {kAlign, 0},
318     {kAlign, kAlign},
319     {kAlign, 2*kAlign},
320     {kAlign, 20*kAlign},
321     {kAlign, 1024*1024},
322   }, AsyncIO::NOT_POLLABLE);
323 }
324
325 TEST(AsyncIO, MultipleAsyncDataPollable) {
326   testReads(
327       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
328       AsyncIO::POLLABLE);
329   testReads(
330       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
331       AsyncIO::POLLABLE);
332
333   testReads({
334     {0, 5*1024*1024},
335     {kAlign, 5*1024*1024}
336   }, AsyncIO::NOT_POLLABLE);
337
338   testReads({
339     {kAlign, 0},
340     {kAlign, kAlign},
341     {kAlign, 2*kAlign},
342     {kAlign, 20*kAlign},
343     {kAlign, 1024*1024},
344   }, AsyncIO::NOT_POLLABLE);
345 }
346
347 TEST(AsyncIO, ManyAsyncDataNotPollable) {
348   {
349     std::vector<TestSpec> v;
350     for (int i = 0; i < 1000; i++) {
351       v.push_back({off_t(kAlign * i), kAlign});
352     }
353     testReads(v, AsyncIO::NOT_POLLABLE);
354   }
355 }
356
357 TEST(AsyncIO, ManyAsyncDataPollable) {
358   {
359     std::vector<TestSpec> v;
360     for (int i = 0; i < 1000; i++) {
361       v.push_back({off_t(kAlign * i), kAlign});
362     }
363     testReads(v, AsyncIO::POLLABLE);
364   }
365 }
366
367 TEST(AsyncIO, NonBlockingWait) {
368   AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
369   AsyncIO::Op op;
370   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
371   PCHECK(fd != -1);
372   SCOPE_EXIT {
373     ::close(fd);
374   };
375   size_t size = 2 * kAlign;
376   auto buf = allocateAligned(size);
377   op.pread(fd, buf.get(), size, 0);
378   aioReader.submit(&op);
379   EXPECT_EQ(aioReader.pending(), 1);
380
381   folly::Range<AsyncIO::Op**> completed;
382   while (completed.empty()) {
383     // poll without blocking until the read request completes.
384     completed = aioReader.wait(0);
385   }
386   EXPECT_EQ(completed.size(), 1);
387
388   EXPECT_TRUE(completed[0] == &op);
389   ssize_t res = op.result();
390   EXPECT_LE(0, res) << folly::errnoStr(-res);
391   EXPECT_EQ(size, res);
392   EXPECT_EQ(aioReader.pending(), 0);
393 }
394
395 TEST(AsyncIO, Cancel) {
396   constexpr size_t kNumOps = 10;
397
398   AsyncIO aioReader(kNumOps, AsyncIO::NOT_POLLABLE);
399   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
400   PCHECK(fd != -1);
401   SCOPE_EXIT {
402     ::close(fd);
403   };
404
405   std::vector<AsyncIO::Op> ops(kNumOps);
406   std::vector<ManagedBuffer> bufs;
407
408   size_t completed = 0;
409   for (auto& op : ops) {
410     const size_t size = 2 * kAlign;
411     bufs.push_back(allocateAligned(size));
412     op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
413     op.pread(fd, bufs.back().get(), size, 0);
414     aioReader.submit(&op);
415   }
416
417   EXPECT_EQ(aioReader.pending(), kNumOps);
418   EXPECT_EQ(completed, 0);
419
420   {
421     auto result = aioReader.wait(1);
422     EXPECT_EQ(result.size(), 1);
423   }
424   EXPECT_EQ(completed, 1);
425   EXPECT_EQ(aioReader.pending(), kNumOps - 1);
426
427   EXPECT_EQ(aioReader.cancel(), kNumOps - 1);
428   EXPECT_EQ(aioReader.pending(), 0);
429   EXPECT_EQ(completed, 1);
430
431   completed = 0;
432   for (auto& op : ops) {
433     if (op.state() == AsyncIOOp::State::COMPLETED) {
434       ++completed;
435     } else {
436       EXPECT_TRUE(op.state() == AsyncIOOp::State::CANCELED) << op;
437     }
438   }
439   EXPECT_EQ(completed, 1);
440 }