Fix -Wsign-compare
[folly.git] / folly / experimental / io / test / AsyncIOTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/io/AsyncIO.h>
18
19 #include <sys/stat.h>
20 #include <sys/types.h>
21 #include <fcntl.h>
22 #include <poll.h>
23
24 #include <cstdlib>
25 #include <cstdio>
26 #include <memory>
27 #include <random>
28 #include <thread>
29 #include <vector>
30
31 #include <glog/logging.h>
32 #include <gtest/gtest.h>
33
34 #include <folly/experimental/io/FsUtil.h>
35 #include <folly/ScopeGuard.h>
36 #include <folly/String.h>
37
38 namespace fs = folly::fs;
39 using folly::AsyncIO;
40 using folly::AsyncIOQueue;
41
42 namespace {
43
44 constexpr size_t kAlign = 4096;  // align reads to 4096 B (for O_DIRECT)
45
46 struct TestSpec {
47   off_t start;
48   size_t size;
49 };
50
51 void waitUntilReadable(int fd) {
52   pollfd pfd;
53   pfd.fd = fd;
54   pfd.events = POLLIN;
55
56   int r;
57   do {
58     r = poll(&pfd, 1, -1);  // wait forever
59   } while (r == -1 && errno == EINTR);
60   PCHECK(r == 1);
61   CHECK_EQ(pfd.revents, POLLIN);  // no errors etc
62 }
63
64 folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
65   int fd = reader->pollFd();
66   if (fd == -1) {
67     return reader->wait(1);
68   } else {
69     waitUntilReadable(fd);
70     return reader->pollCompleted();
71   }
72 }
73
74 // Temporary file that is NOT kept open but is deleted on exit.
75 // Generate random-looking but reproduceable data.
76 class TemporaryFile {
77  public:
78   explicit TemporaryFile(size_t size);
79   ~TemporaryFile();
80
81   const fs::path path() const { return path_; }
82
83  private:
84   fs::path path_;
85 };
86
87 TemporaryFile::TemporaryFile(size_t size)
88   : path_(fs::temp_directory_path() / fs::unique_path()) {
89   CHECK_EQ(size % sizeof(uint32_t), 0);
90   size /= sizeof(uint32_t);
91   const uint32_t seed = 42;
92   std::mt19937 rnd(seed);
93
94   const size_t bufferSize = 1U << 16;
95   uint32_t buffer[bufferSize];
96
97   FILE* fp = ::fopen(path_.c_str(), "wb");
98   PCHECK(fp != nullptr);
99   while (size) {
100     size_t n = std::min(size, bufferSize);
101     for (size_t i = 0; i < n; ++i) {
102       buffer[i] = rnd();
103     }
104     size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
105     PCHECK(written == n);
106     size -= written;
107   }
108   PCHECK(::fclose(fp) == 0);
109 }
110
111 TemporaryFile::~TemporaryFile() {
112   try {
113     fs::remove(path_);
114   } catch (const fs::filesystem_error& e) {
115     LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
116   }
117 }
118
119 TemporaryFile tempFile(6 << 20);  // 6MiB
120
121 typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
122 ManagedBuffer allocateAligned(size_t size) {
123   void* buf;
124   int rc = posix_memalign(&buf, kAlign, size);
125   CHECK_EQ(rc, 0) << strerror(rc);
126   return ManagedBuffer(reinterpret_cast<char*>(buf), free);
127 }
128
129 void testReadsSerially(const std::vector<TestSpec>& specs,
130                        AsyncIO::PollMode pollMode) {
131   AsyncIO aioReader(1, pollMode);
132   AsyncIO::Op op;
133   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
134   PCHECK(fd != -1);
135   SCOPE_EXIT {
136     ::close(fd);
137   };
138
139   for (size_t i = 0; i < specs.size(); i++) {
140     auto buf = allocateAligned(specs[i].size);
141     op.pread(fd, buf.get(), specs[i].size, specs[i].start);
142     aioReader.submit(&op);
143     EXPECT_EQ((i + 1), aioReader.totalSubmits());
144     EXPECT_EQ(aioReader.pending(), 1);
145     auto ops = readerWait(&aioReader);
146     EXPECT_EQ(1, ops.size());
147     EXPECT_TRUE(ops[0] == &op);
148     EXPECT_EQ(aioReader.pending(), 0);
149     ssize_t res = op.result();
150     EXPECT_LE(0, res) << folly::errnoStr(-res);
151     EXPECT_EQ(specs[i].size, res);
152     op.reset();
153   }
154 }
155
156 void testReadsParallel(const std::vector<TestSpec>& specs,
157                        AsyncIO::PollMode pollMode,
158                        bool multithreaded) {
159   AsyncIO aioReader(specs.size(), pollMode);
160   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
161   std::vector<ManagedBuffer> bufs;
162   bufs.reserve(specs.size());
163
164   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
165   PCHECK(fd != -1);
166   SCOPE_EXIT {
167     ::close(fd);
168   };
169
170   std::vector<std::thread> threads;
171   if (multithreaded) {
172     threads.reserve(specs.size());
173   }
174   for (size_t i = 0; i < specs.size(); i++) {
175     bufs.push_back(allocateAligned(specs[i].size));
176   }
177   auto submit = [&] (size_t i) {
178     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
179     aioReader.submit(&ops[i]);
180   };
181   for (size_t i = 0; i < specs.size(); i++) {
182     if (multithreaded) {
183       threads.emplace_back([&submit, i] { submit(i); });
184     } else {
185       submit(i);
186     }
187   }
188   for (auto& t : threads) {
189     t.join();
190   }
191   std::vector<bool> pending(specs.size(), true);
192
193   size_t remaining = specs.size();
194   while (remaining != 0) {
195     EXPECT_EQ(remaining, aioReader.pending());
196     auto completed = readerWait(&aioReader);
197     size_t nrRead = completed.size();
198     EXPECT_NE(nrRead, 0);
199     remaining -= nrRead;
200
201     for (size_t i = 0; i < nrRead; i++) {
202       int id = completed[i] - ops.get();
203       EXPECT_GE(id, 0);
204       EXPECT_LT(id, specs.size());
205       EXPECT_TRUE(pending[id]);
206       pending[id] = false;
207       ssize_t res = ops[id].result();
208       EXPECT_LE(0, res) << folly::errnoStr(-res);
209       EXPECT_EQ(specs[id].size, res);
210     }
211   }
212   EXPECT_EQ(specs.size(), aioReader.totalSubmits());
213
214   EXPECT_EQ(aioReader.pending(), 0);
215   for (size_t i = 0; i < pending.size(); i++) {
216     EXPECT_FALSE(pending[i]);
217   }
218 }
219
220 void testReadsQueued(const std::vector<TestSpec>& specs,
221                      AsyncIO::PollMode pollMode) {
222   size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
223   AsyncIO aioReader(readerCapacity, pollMode);
224   AsyncIOQueue aioQueue(&aioReader);
225   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
226   std::vector<ManagedBuffer> bufs;
227
228   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
229   PCHECK(fd != -1);
230   SCOPE_EXIT {
231     ::close(fd);
232   };
233   for (size_t i = 0; i < specs.size(); i++) {
234     bufs.push_back(allocateAligned(specs[i].size));
235     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
236     aioQueue.submit(&ops[i]);
237   }
238   std::vector<bool> pending(specs.size(), true);
239
240   size_t remaining = specs.size();
241   while (remaining != 0) {
242     if (remaining >= readerCapacity) {
243       EXPECT_EQ(readerCapacity, aioReader.pending());
244       EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
245     } else {
246       EXPECT_EQ(remaining, aioReader.pending());
247       EXPECT_EQ(0, aioQueue.queued());
248     }
249     auto completed = readerWait(&aioReader);
250     size_t nrRead = completed.size();
251     EXPECT_NE(nrRead, 0);
252     remaining -= nrRead;
253
254     for (size_t i = 0; i < nrRead; i++) {
255       int id = completed[i] - ops.get();
256       EXPECT_GE(id, 0);
257       EXPECT_LT(id, specs.size());
258       EXPECT_TRUE(pending[id]);
259       pending[id] = false;
260       ssize_t res = ops[id].result();
261       EXPECT_LE(0, res) << folly::errnoStr(-res);
262       EXPECT_EQ(specs[id].size, res);
263     }
264   }
265   EXPECT_EQ(specs.size(), aioReader.totalSubmits());
266   EXPECT_EQ(aioReader.pending(), 0);
267   EXPECT_EQ(aioQueue.queued(), 0);
268   for (size_t i = 0; i < pending.size(); i++) {
269     EXPECT_FALSE(pending[i]);
270   }
271 }
272
273 void testReads(const std::vector<TestSpec>& specs,
274                AsyncIO::PollMode pollMode) {
275   testReadsSerially(specs, pollMode);
276   testReadsParallel(specs, pollMode, false);
277   testReadsParallel(specs, pollMode, true);
278   testReadsQueued(specs, pollMode);
279 }
280
281 }  // anonymous namespace
282
283 TEST(AsyncIO, ZeroAsyncDataNotPollable) {
284   testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
285 }
286
287 TEST(AsyncIO, ZeroAsyncDataPollable) {
288   testReads({{0, 0}}, AsyncIO::POLLABLE);
289 }
290
291 TEST(AsyncIO, SingleAsyncDataNotPollable) {
292   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
293   testReads({{0, kAlign}}, AsyncIO::NOT_POLLABLE);
294 }
295
296 TEST(AsyncIO, SingleAsyncDataPollable) {
297   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
298   testReads({{0, kAlign}}, AsyncIO::POLLABLE);
299 }
300
301 TEST(AsyncIO, MultipleAsyncDataNotPollable) {
302   testReads(
303       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
304       AsyncIO::NOT_POLLABLE);
305   testReads(
306       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
307       AsyncIO::NOT_POLLABLE);
308
309   testReads({
310     {0, 5*1024*1024},
311     {kAlign, 5*1024*1024}
312   }, AsyncIO::NOT_POLLABLE);
313
314   testReads({
315     {kAlign, 0},
316     {kAlign, kAlign},
317     {kAlign, 2*kAlign},
318     {kAlign, 20*kAlign},
319     {kAlign, 1024*1024},
320   }, AsyncIO::NOT_POLLABLE);
321 }
322
323 TEST(AsyncIO, MultipleAsyncDataPollable) {
324   testReads(
325       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
326       AsyncIO::POLLABLE);
327   testReads(
328       {{kAlign, 2*kAlign}, {kAlign, 2*kAlign}, {kAlign, 4*kAlign}},
329       AsyncIO::POLLABLE);
330
331   testReads({
332     {0, 5*1024*1024},
333     {kAlign, 5*1024*1024}
334   }, AsyncIO::NOT_POLLABLE);
335
336   testReads({
337     {kAlign, 0},
338     {kAlign, kAlign},
339     {kAlign, 2*kAlign},
340     {kAlign, 20*kAlign},
341     {kAlign, 1024*1024},
342   }, AsyncIO::NOT_POLLABLE);
343 }
344
345 TEST(AsyncIO, ManyAsyncDataNotPollable) {
346   {
347     std::vector<TestSpec> v;
348     for (int i = 0; i < 1000; i++) {
349       v.push_back({off_t(kAlign * i), kAlign});
350     }
351     testReads(v, AsyncIO::NOT_POLLABLE);
352   }
353 }
354
355 TEST(AsyncIO, ManyAsyncDataPollable) {
356   {
357     std::vector<TestSpec> v;
358     for (int i = 0; i < 1000; i++) {
359       v.push_back({off_t(kAlign * i), kAlign});
360     }
361     testReads(v, AsyncIO::POLLABLE);
362   }
363 }
364
365 TEST(AsyncIO, NonBlockingWait) {
366   AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
367   AsyncIO::Op op;
368   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
369   PCHECK(fd != -1);
370   SCOPE_EXIT {
371     ::close(fd);
372   };
373   size_t size = 2*kAlign;
374   auto buf = allocateAligned(size);
375   op.pread(fd, buf.get(), size, 0);
376   aioReader.submit(&op);
377   EXPECT_EQ(aioReader.pending(), 1);
378
379   folly::Range<AsyncIO::Op**> completed;
380   while (completed.empty()) {
381     // poll without blocking until the read request completes.
382     completed = aioReader.wait(0);
383   }
384   EXPECT_EQ(completed.size(), 1);
385
386   EXPECT_TRUE(completed[0] == &op);
387   ssize_t res = op.result();
388   EXPECT_LE(0, res) << folly::errnoStr(-res);
389   EXPECT_EQ(size, res);
390   EXPECT_EQ(aioReader.pending(), 0);
391 }