Consistency in namespace-closing comments
[folly.git] / folly / experimental / io / test / AsyncIOTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/io/AsyncIO.h>
18
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <sys/types.h>
22
23 #include <cstdio>
24 #include <cstdlib>
25 #include <memory>
26 #include <random>
27 #include <thread>
28 #include <vector>
29
30 #include <glog/logging.h>
31
32 #include <folly/ScopeGuard.h>
33 #include <folly/String.h>
34 #include <folly/experimental/io/FsUtil.h>
35 #include <folly/portability/GTest.h>
36 #include <folly/portability/Sockets.h>
37
38 namespace fs = folly::fs;
39
40 using folly::AsyncIO;
41 using folly::AsyncIOOp;
42 using folly::AsyncIOQueue;
43
44 namespace {
45
46 constexpr size_t kAlign = 4096; // align reads to 4096 B (for O_DIRECT)
47
48 struct TestSpec {
49   off_t start;
50   size_t size;
51 };
52
53 void waitUntilReadable(int fd) {
54   pollfd pfd;
55   pfd.fd = fd;
56   pfd.events = POLLIN;
57
58   int r;
59   do {
60     r = poll(&pfd, 1, -1); // wait forever
61   } while (r == -1 && errno == EINTR);
62   PCHECK(r == 1);
63   CHECK_EQ(pfd.revents, POLLIN); // no errors etc
64 }
65
66 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
67   int fd = reader->pollFd();
68   if (fd == -1) {
69     return reader->wait(1);
70   } else {
71     waitUntilReadable(fd);
72     return reader->pollCompleted();
73   }
74 }
75
76 // Temporary file that is NOT kept open but is deleted on exit.
77 // Generate random-looking but reproduceable data.
78 class TemporaryFile {
79  public:
80   explicit TemporaryFile(size_t size);
81   ~TemporaryFile();
82
83   const fs::path path() const {
84     return path_;
85   }
86
87  private:
88   fs::path path_;
89 };
90
91 TemporaryFile::TemporaryFile(size_t size)
92     : path_(fs::temp_directory_path() / fs::unique_path()) {
93   CHECK_EQ(size % sizeof(uint32_t), 0);
94   size /= sizeof(uint32_t);
95   const uint32_t seed = 42;
96   std::mt19937 rnd(seed);
97
98   const size_t bufferSize = 1U << 16;
99   uint32_t buffer[bufferSize];
100
101   FILE* fp = ::fopen(path_.c_str(), "wb");
102   PCHECK(fp != nullptr);
103   while (size) {
104     size_t n = std::min(size, bufferSize);
105     for (size_t i = 0; i < n; ++i) {
106       buffer[i] = rnd();
107     }
108     size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
109     PCHECK(written == n);
110     size -= written;
111   }
112   PCHECK(::fclose(fp) == 0);
113 }
114
115 TemporaryFile::~TemporaryFile() {
116   try {
117     fs::remove(path_);
118   } catch (const fs::filesystem_error& e) {
119     LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
120   }
121 }
122
123 TemporaryFile tempFile(6 << 20); // 6MiB
124
125 typedef std::unique_ptr<char, void (*)(void*)> ManagedBuffer;
126 ManagedBuffer allocateAligned(size_t size) {
127   void* buf;
128   int rc = posix_memalign(&buf, kAlign, size);
129   CHECK_EQ(rc, 0) << strerror(rc);
130   return ManagedBuffer(reinterpret_cast<char*>(buf), free);
131 }
132
133 void testReadsSerially(
134     const std::vector<TestSpec>& specs,
135     AsyncIO::PollMode pollMode) {
136   AsyncIO aioReader(1, pollMode);
137   AsyncIO::Op op;
138   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
139   PCHECK(fd != -1);
140   SCOPE_EXIT {
141     ::close(fd);
142   };
143
144   for (size_t i = 0; i < specs.size(); i++) {
145     auto buf = allocateAligned(specs[i].size);
146     op.pread(fd, buf.get(), specs[i].size, specs[i].start);
147     aioReader.submit(&op);
148     EXPECT_EQ((i + 1), aioReader.totalSubmits());
149     EXPECT_EQ(aioReader.pending(), 1);
150     auto ops = readerWait(&aioReader);
151     EXPECT_EQ(1, ops.size());
152     EXPECT_TRUE(ops[0] == &op);
153     EXPECT_EQ(aioReader.pending(), 0);
154     ssize_t res = op.result();
155     EXPECT_LE(0, res) << folly::errnoStr(-res);
156     EXPECT_EQ(specs[i].size, res);
157     op.reset();
158   }
159 }
160
161 void testReadsParallel(
162     const std::vector<TestSpec>& specs,
163     AsyncIO::PollMode pollMode,
164     bool multithreaded) {
165   AsyncIO aioReader(specs.size(), pollMode);
166   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
167   std::vector<ManagedBuffer> bufs;
168   bufs.reserve(specs.size());
169
170   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
171   PCHECK(fd != -1);
172   SCOPE_EXIT {
173     ::close(fd);
174   };
175
176   std::vector<std::thread> threads;
177   if (multithreaded) {
178     threads.reserve(specs.size());
179   }
180   for (size_t i = 0; i < specs.size(); i++) {
181     bufs.push_back(allocateAligned(specs[i].size));
182   }
183   auto submit = [&](size_t i) {
184     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
185     aioReader.submit(&ops[i]);
186   };
187   for (size_t i = 0; i < specs.size(); i++) {
188     if (multithreaded) {
189       threads.emplace_back([&submit, i] { submit(i); });
190     } else {
191       submit(i);
192     }
193   }
194   for (auto& t : threads) {
195     t.join();
196   }
197   std::vector<bool> pending(specs.size(), true);
198
199   size_t remaining = specs.size();
200   while (remaining != 0) {
201     EXPECT_EQ(remaining, aioReader.pending());
202     auto completed = readerWait(&aioReader);
203     size_t nrRead = completed.size();
204     EXPECT_NE(nrRead, 0);
205     remaining -= nrRead;
206
207     for (size_t i = 0; i < nrRead; i++) {
208       int id = completed[i] - ops.get();
209       EXPECT_GE(id, 0);
210       EXPECT_LT(id, specs.size());
211       EXPECT_TRUE(pending[id]);
212       pending[id] = false;
213       ssize_t res = ops[id].result();
214       EXPECT_LE(0, res) << folly::errnoStr(-res);
215       EXPECT_EQ(specs[id].size, res);
216     }
217   }
218   EXPECT_EQ(specs.size(), aioReader.totalSubmits());
219
220   EXPECT_EQ(aioReader.pending(), 0);
221   for (size_t i = 0; i < pending.size(); i++) {
222     EXPECT_FALSE(pending[i]);
223   }
224 }
225
226 void testReadsQueued(
227     const std::vector<TestSpec>& specs,
228     AsyncIO::PollMode pollMode) {
229   size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
230   AsyncIO aioReader(readerCapacity, pollMode);
231   AsyncIOQueue aioQueue(&aioReader);
232   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
233   std::vector<ManagedBuffer> bufs;
234
235   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
236   PCHECK(fd != -1);
237   SCOPE_EXIT {
238     ::close(fd);
239   };
240   for (size_t i = 0; i < specs.size(); i++) {
241     bufs.push_back(allocateAligned(specs[i].size));
242     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
243     aioQueue.submit(&ops[i]);
244   }
245   std::vector<bool> pending(specs.size(), true);
246
247   size_t remaining = specs.size();
248   while (remaining != 0) {
249     if (remaining >= readerCapacity) {
250       EXPECT_EQ(readerCapacity, aioReader.pending());
251       EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
252     } else {
253       EXPECT_EQ(remaining, aioReader.pending());
254       EXPECT_EQ(0, aioQueue.queued());
255     }
256     auto completed = readerWait(&aioReader);
257     size_t nrRead = completed.size();
258     EXPECT_NE(nrRead, 0);
259     remaining -= nrRead;
260
261     for (size_t i = 0; i < nrRead; i++) {
262       int id = completed[i] - ops.get();
263       EXPECT_GE(id, 0);
264       EXPECT_LT(id, specs.size());
265       EXPECT_TRUE(pending[id]);
266       pending[id] = false;
267       ssize_t res = ops[id].result();
268       EXPECT_LE(0, res) << folly::errnoStr(-res);
269       EXPECT_EQ(specs[id].size, res);
270     }
271   }
272   EXPECT_EQ(specs.size(), aioReader.totalSubmits());
273   EXPECT_EQ(aioReader.pending(), 0);
274   EXPECT_EQ(aioQueue.queued(), 0);
275   for (size_t i = 0; i < pending.size(); i++) {
276     EXPECT_FALSE(pending[i]);
277   }
278 }
279
280 void testReads(const std::vector<TestSpec>& specs, AsyncIO::PollMode pollMode) {
281   testReadsSerially(specs, pollMode);
282   testReadsParallel(specs, pollMode, false);
283   testReadsParallel(specs, pollMode, true);
284   testReadsQueued(specs, pollMode);
285 }
286
287 } // namespace
288
289 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
290   testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
291 }
292
293 TEST(AsyncIO, ZeroAsyncDataPollable) {
294   testReads({{0, 0}}, AsyncIO::POLLABLE);
295 }
296
297 TEST(AsyncIO, SingleAsyncDataNotPollable) {
298   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
299   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
300 }
301
302 TEST(AsyncIO, SingleAsyncDataPollable) {
303   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
304   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
305 }
306
307 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
308   testReads(
309       {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
310       AsyncIO::NOT_POLLABLE);
311   testReads(
312       {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
313       AsyncIO::NOT_POLLABLE);
314
315   testReads(
316       {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
317
318   testReads(
319       {
320           {kAlign, 0},
321           {kAlign, kAlign},
322           {kAlign, 2 * kAlign},
323           {kAlign, 20 * kAlign},
324           {kAlign, 1024 * 1024},
325       },
326       AsyncIO::NOT_POLLABLE);
327 }
328
329 TEST(AsyncIO, MultipleAsyncDataPollable) {
330   testReads(
331       {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
332       AsyncIO::POLLABLE);
333   testReads(
334       {{kAlign, 2 * kAlign}, {kAlign, 2 * kAlign}, {kAlign, 4 * kAlign}},
335       AsyncIO::POLLABLE);
336
337   testReads(
338       {{0, 5 * 1024 * 1024}, {kAlign, 5 * 1024 * 1024}}, AsyncIO::NOT_POLLABLE);
339
340   testReads(
341       {
342           {kAlign, 0},
343           {kAlign, kAlign},
344           {kAlign, 2 * kAlign},
345           {kAlign, 20 * kAlign},
346           {kAlign, 1024 * 1024},
347       },
348       AsyncIO::NOT_POLLABLE);
349 }
350
351 TEST(AsyncIO, ManyAsyncDataNotPollable) {
352   {
353     std::vector<TestSpec> v;
354     for (int i = 0; i < 1000; i++) {
355       v.push_back({off_t(kAlign * i), kAlign});
356     }
357     testReads(v, AsyncIO::NOT_POLLABLE);
358   }
359 }
360
361 TEST(AsyncIO, ManyAsyncDataPollable) {
362   {
363     std::vector<TestSpec> v;
364     for (int i = 0; i < 1000; i++) {
365       v.push_back({off_t(kAlign * i), kAlign});
366     }
367     testReads(v, AsyncIO::POLLABLE);
368   }
369 }
370
371 TEST(AsyncIO, NonBlockingWait) {
372   AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
373   AsyncIO::Op op;
374   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
375   PCHECK(fd != -1);
376   SCOPE_EXIT {
377     ::close(fd);
378   };
379   size_t size = 2 * kAlign;
380   auto buf = allocateAligned(size);
381   op.pread(fd, buf.get(), size, 0);
382   aioReader.submit(&op);
383   EXPECT_EQ(aioReader.pending(), 1);
384
385   folly::Range<AsyncIO::Op**> completed;
386   while (completed.empty()) {
387     // poll without blocking until the read request completes.
388     completed = aioReader.wait(0);
389   }
390   EXPECT_EQ(completed.size(), 1);
391
392   EXPECT_TRUE(completed[0] == &op);
393   ssize_t res = op.result();
394   EXPECT_LE(0, res) << folly::errnoStr(-res);
395   EXPECT_EQ(size, res);
396   EXPECT_EQ(aioReader.pending(), 0);
397 }
398
399 TEST(AsyncIO, Cancel) {
400   constexpr size_t kNumOpsBatch1 = 10;
401   constexpr size_t kNumOpsBatch2 = 10;
402
403   AsyncIO aioReader(kNumOpsBatch1 + kNumOpsBatch2, AsyncIO::NOT_POLLABLE);
404   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
405   PCHECK(fd != -1);
406   SCOPE_EXIT {
407     ::close(fd);
408   };
409
410   size_t completed = 0;
411
412   std::vector<std::unique_ptr<AsyncIO::Op>> ops;
413   std::vector<ManagedBuffer> bufs;
414   const auto schedule = [&](size_t n) {
415     for (size_t i = 0; i < n; ++i) {
416       const size_t size = 2 * kAlign;
417       bufs.push_back(allocateAligned(size));
418
419       ops.push_back(std::make_unique<AsyncIO::Op>());
420       auto& op = *ops.back();
421
422       op.setNotificationCallback([&](AsyncIOOp*) { ++completed; });
423       op.pread(fd, bufs.back().get(), size, 0);
424       aioReader.submit(&op);
425     }
426   };
427
428   // Mix completed and canceled operations for this test.
429   // In order to achieve that, schedule in two batches and do partial
430   // wait() after the first one.
431
432   schedule(kNumOpsBatch1);
433   EXPECT_EQ(aioReader.pending(), kNumOpsBatch1);
434   EXPECT_EQ(completed, 0);
435
436   auto result = aioReader.wait(1);
437   EXPECT_GE(result.size(), 1);
438   EXPECT_EQ(completed, result.size());
439   EXPECT_EQ(aioReader.pending(), kNumOpsBatch1 - result.size());
440
441   schedule(kNumOpsBatch2);
442   EXPECT_EQ(aioReader.pending(), ops.size() - result.size());
443   EXPECT_EQ(completed, result.size());
444
445   auto canceled = aioReader.cancel();
446   EXPECT_EQ(canceled.size(), ops.size() - result.size());
447   EXPECT_EQ(aioReader.pending(), 0);
448   EXPECT_EQ(completed, result.size());
449
450   size_t foundCompleted = 0;
451   for (auto& op : ops) {
452     if (op->state() == AsyncIOOp::State::COMPLETED) {
453       ++foundCompleted;
454     } else {
455       EXPECT_TRUE(op->state() == AsyncIOOp::State::CANCELED) << *op;
456     }
457   }
458   EXPECT_EQ(foundCompleted, completed);
459 }