Make folly::AsyncIO thread safe
[folly.git] / folly / experimental / io / test / AsyncIOTest.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/experimental/io/AsyncIO.h"
18
19 #include <sys/stat.h>
20 #include <sys/types.h>
21 #include <fcntl.h>
22 #include <poll.h>
23
24 #include <cstdlib>
25 #include <cstdio>
26 #include <memory>
27 #include <random>
28 #include <thread>
29 #include <vector>
30
31 #include <glog/logging.h>
32 #include <gtest/gtest.h>
33
34 #include "folly/experimental/io/FsUtil.h"
35 #include "folly/ScopeGuard.h"
36 #include "folly/String.h"
37
38 namespace fs = folly::fs;
39 using folly::AsyncIO;
40 using folly::AsyncIOQueue;
41
42 namespace {
43
44 constexpr size_t kAlign = 4096;  // align reads to 4096 B (for O_DIRECT)
45
46 struct TestSpec {
47   off_t start;
48   size_t size;
49 };
50
51 void waitUntilReadable(int fd) {
52   pollfd pfd;
53   pfd.fd = fd;
54   pfd.events = POLLIN;
55
56   int r;
57   do {
58     r = poll(&pfd, 1, -1);  // wait forever
59   } while (r == -1 && errno == EINTR);
60   PCHECK(r == 1);
61   CHECK_EQ(pfd.revents, POLLIN);  // no errors etc
62 }
63
64 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
65   int fd = reader->pollFd();
66   if (fd == -1) {
67     return reader->wait(1);
68   } else {
69     waitUntilReadable(fd);
70     return reader->pollCompleted();
71   }
72 }
73
74 // Temporary file that is NOT kept open but is deleted on exit.
75 // Generate random-looking but reproduceable data.
76 class TemporaryFile {
77  public:
78   explicit TemporaryFile(size_t size);
79   ~TemporaryFile();
80
81   const fs::path path() const { return path_; }
82
83  private:
84   fs::path path_;
85 };
86
87 TemporaryFile::TemporaryFile(size_t size)
88   : path_(fs::temp_directory_path() / fs::unique_path()) {
89   CHECK_EQ(size % sizeof(uint32_t), 0);
90   size /= sizeof(uint32_t);
91   const uint32_t seed = 42;
92   std::mt19937 rnd(seed);
93
94   const size_t bufferSize = 1U << 16;
95   uint32_t buffer[bufferSize];
96
97   FILE* fp = ::fopen(path_.c_str(), "wb");
98   PCHECK(fp != nullptr);
99   while (size) {
100     size_t n = std::min(size, bufferSize);
101     for (size_t i = 0; i < n; ++i) {
102       buffer[i] = rnd();
103     }
104     size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
105     PCHECK(written == n);
106     size -= written;
107   }
108   PCHECK(::fclose(fp) == 0);
109 }
110
111 TemporaryFile::~TemporaryFile() {
112   try {
113     fs::remove(path_);
114   } catch (const fs::filesystem_error& e) {
115     LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
116   }
117 }
118
119 TemporaryFile tempFile(6 << 20);  // 6MiB
120
121 typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
122 ManagedBuffer allocateAligned(size_t size) {
123   void* buf;
124   int rc = posix_memalign(&buf, kAlign, size);
125   CHECK_EQ(rc, 0) << strerror(rc);
126   return ManagedBuffer(reinterpret_cast<char*>(buf), free);
127 }
128
129 void testReadsSerially(const std::vector<TestSpec>& specs,
130                        AsyncIO::PollMode pollMode) {
131   AsyncIO aioReader(1, pollMode);
132   AsyncIO::Op op;
133   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
134   PCHECK(fd != -1);
135   SCOPE_EXIT {
136     ::close(fd);
137   };
138
139   for (int i = 0; i < specs.size(); i++) {
140     auto buf = allocateAligned(specs[i].size);
141     op.pread(fd, buf.get(), specs[i].size, specs[i].start);
142     aioReader.submit(&op);
143     EXPECT_EQ(aioReader.pending(), 1);
144     auto ops = readerWait(&aioReader);
145     EXPECT_EQ(1, ops.size());
146     EXPECT_TRUE(ops[0] == &op);
147     EXPECT_EQ(aioReader.pending(), 0);
148     ssize_t res = op.result();
149     EXPECT_LE(0, res) << folly::errnoStr(-res);
150     EXPECT_EQ(specs[i].size, res);
151     op.reset();
152   }
153 }
154
155 void testReadsParallel(const std::vector<TestSpec>& specs,
156                        AsyncIO::PollMode pollMode,
157                        bool multithreaded) {
158   AsyncIO aioReader(specs.size(), pollMode);
159   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
160   std::vector<ManagedBuffer> bufs;
161   bufs.reserve(specs.size());
162
163   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
164   PCHECK(fd != -1);
165   SCOPE_EXIT {
166     ::close(fd);
167   };
168
169   std::vector<std::thread> threads;
170   if (multithreaded) {
171     threads.reserve(specs.size());
172   }
173   for (int i = 0; i < specs.size(); i++) {
174     bufs.push_back(allocateAligned(specs[i].size));
175   }
176   auto submit = [&] (int i) {
177     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
178     aioReader.submit(&ops[i]);
179   };
180   for (int i = 0; i < specs.size(); i++) {
181     if (multithreaded) {
182       threads.emplace_back([&submit, i] { submit(i); });
183     } else {
184       submit(i);
185     }
186   }
187   for (auto& t : threads) {
188     t.join();
189   }
190   std::vector<bool> pending(specs.size(), true);
191
192   size_t remaining = specs.size();
193   while (remaining != 0) {
194     EXPECT_EQ(remaining, aioReader.pending());
195     auto completed = readerWait(&aioReader);
196     size_t nrRead = completed.size();
197     EXPECT_NE(nrRead, 0);
198     remaining -= nrRead;
199
200     for (int i = 0; i < nrRead; i++) {
201       int id = completed[i] - ops.get();
202       EXPECT_GE(id, 0);
203       EXPECT_LT(id, specs.size());
204       EXPECT_TRUE(pending[id]);
205       pending[id] = false;
206       ssize_t res = ops[id].result();
207       EXPECT_LE(0, res) << folly::errnoStr(-res);
208       EXPECT_EQ(specs[id].size, res);
209     }
210   }
211   EXPECT_EQ(aioReader.pending(), 0);
212   for (int i = 0; i < pending.size(); i++) {
213     EXPECT_FALSE(pending[i]);
214   }
215 }
216
217 void testReadsQueued(const std::vector<TestSpec>& specs,
218                      AsyncIO::PollMode pollMode) {
219   size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
220   AsyncIO aioReader(readerCapacity, pollMode);
221   AsyncIOQueue aioQueue(&aioReader);
222   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
223   std::vector<ManagedBuffer> bufs;
224
225   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
226   PCHECK(fd != -1);
227   SCOPE_EXIT {
228     ::close(fd);
229   };
230   for (int i = 0; i < specs.size(); i++) {
231     bufs.push_back(allocateAligned(specs[i].size));
232     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
233     aioQueue.submit(&ops[i]);
234   }
235   std::vector<bool> pending(specs.size(), true);
236
237   size_t remaining = specs.size();
238   while (remaining != 0) {
239     if (remaining >= readerCapacity) {
240       EXPECT_EQ(readerCapacity, aioReader.pending());
241       EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
242     } else {
243       EXPECT_EQ(remaining, aioReader.pending());
244       EXPECT_EQ(0, aioQueue.queued());
245     }
246     auto completed = readerWait(&aioReader);
247     size_t nrRead = completed.size();
248     EXPECT_NE(nrRead, 0);
249     remaining -= nrRead;
250
251     for (int i = 0; i < nrRead; i++) {
252       int id = completed[i] - ops.get();
253       EXPECT_GE(id, 0);
254       EXPECT_LT(id, specs.size());
255       EXPECT_TRUE(pending[id]);
256       pending[id] = false;
257       ssize_t res = ops[id].result();
258       EXPECT_LE(0, res) << folly::errnoStr(-res);
259       EXPECT_EQ(specs[id].size, res);
260     }
261   }
262   EXPECT_EQ(aioReader.pending(), 0);
263   EXPECT_EQ(aioQueue.queued(), 0);
264   for (int i = 0; i < pending.size(); i++) {
265     EXPECT_FALSE(pending[i]);
266   }
267 }
268
269 void testReads(const std::vector<TestSpec>& specs,
270                AsyncIO::PollMode pollMode) {
271   testReadsSerially(specs, pollMode);
272   testReadsParallel(specs, pollMode, false);
273   testReadsParallel(specs, pollMode, true);
274   testReadsQueued(specs, pollMode);
275 }
276
277 }  // anonymous namespace
278
279 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
280   testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
281 }
282
283 TEST(AsyncIO, ZeroAsyncDataPollable) {
284   testReads({{0, 0}}, AsyncIO::POLLABLE);
285 }
286
287 TEST(AsyncIO, SingleAsyncDataNotPollable) {
288   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
289   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
290 }
291
292 TEST(AsyncIO, SingleAsyncDataPollable) {
293   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
294   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
295 }
296
297 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
298   testReads(
299       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
300       AsyncIO::NOT_POLLABLE);
301   testReads(
302       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
303       AsyncIO::NOT_POLLABLE);
304
305   testReads({
306     {0, 5*1024*1024},
307     {kAlign, 5*1024*1024}
308   }, AsyncIO::NOT_POLLABLE);
309
310   testReads({
311     {kAlign, 0},
312     {kAlign, kAlign},
313     {kAlign, 2*kAlign},
314     {kAlign, 20*kAlign},
315     {kAlign, 1024*1024},
316   }, AsyncIO::NOT_POLLABLE);
317 }
318
319 TEST(AsyncIO, MultipleAsyncDataPollable) {
320   testReads(
321       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
322       AsyncIO::POLLABLE);
323   testReads(
324       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
325       AsyncIO::POLLABLE);
326
327   testReads({
328     {0, 5*1024*1024},
329     {kAlign, 5*1024*1024}
330   }, AsyncIO::NOT_POLLABLE);
331
332   testReads({
333     {kAlign, 0},
334     {kAlign, kAlign},
335     {kAlign, 2*kAlign},
336     {kAlign, 20*kAlign},
337     {kAlign, 1024*1024},
338   }, AsyncIO::NOT_POLLABLE);
339 }
340
341 TEST(AsyncIO, ManyAsyncDataNotPollable) {
342   {
343     std::vector<TestSpec> v;
344     for (int i = 0; i < 1000; i++) {
345       v.push_back({off_t(kAlign * i), kAlign});
346     }
347     testReads(v, AsyncIO::NOT_POLLABLE);
348   }
349 }
350
351 TEST(AsyncIO, ManyAsyncDataPollable) {
352   {
353     std::vector<TestSpec> v;
354     for (int i = 0; i < 1000; i++) {
355       v.push_back({off_t(kAlign * i), kAlign});
356     }
357     testReads(v, AsyncIO::POLLABLE);
358   }
359 }
360
361 TEST(AsyncIO, NonBlockingWait) {
362   AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
363   AsyncIO::Op op;
364   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
365   PCHECK(fd != -1);
366   SCOPE_EXIT {
367     ::close(fd);
368   };
369   size_t size = 2*kAlign;
370   auto buf = allocateAligned(size);
371   op.pread(fd, buf.get(), size, 0);
372   aioReader.submit(&op);
373   EXPECT_EQ(aioReader.pending(), 1);
374
375   folly::Range<AsyncIO::Op**> completed;
376   while (completed.empty()) {
377     // poll without blocking until the read request completes.
378     completed = aioReader.wait(0);
379   }
380   EXPECT_EQ(completed.size(), 1);
381
382   EXPECT_TRUE(completed[0] == &op);
383   ssize_t res = op.result();
384   EXPECT_LE(0, res) << folly::errnoStr(-res);
385   EXPECT_EQ(size, res);
386   EXPECT_EQ(aioReader.pending(), 0);
387 }
388
389