Make folly::AsyncIO thread safe
[folly.git] / folly / experimental / io / test / AsyncIOTest.cpp
index cdd9d2837ef367992ab4f03a38aacfa039405e31..f4ee54b6b91b464d1ad70c8993b2c8a24bacd94a 100644 (file)
@@ -25,6 +25,7 @@
 #include <cstdio>
 #include <memory>
 #include <random>
+#include <thread>
 #include <vector>
 
 #include <glog/logging.h>
@@ -152,20 +153,39 @@ void testReadsSerially(const std::vector<TestSpec>& specs,
 }
 
 void testReadsParallel(const std::vector<TestSpec>& specs,
-                       AsyncIO::PollMode pollMode) {
+                       AsyncIO::PollMode pollMode,
+                       bool multithreaded) {
   AsyncIO aioReader(specs.size(), pollMode);
   std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
   std::vector<ManagedBuffer> bufs;
+  bufs.reserve(specs.size());
 
   int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
   PCHECK(fd != -1);
   SCOPE_EXIT {
     ::close(fd);
   };
+
+  std::vector<std::thread> threads;
+  if (multithreaded) {
+    threads.reserve(specs.size());
+  }
   for (int i = 0; i < specs.size(); i++) {
     bufs.push_back(allocateAligned(specs[i].size));
+  }
+  auto submit = [&] (int i) {
     ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
     aioReader.submit(&ops[i]);
+  };
+  for (int i = 0; i < specs.size(); i++) {
+    if (multithreaded) {
+      threads.emplace_back([&submit, i] { submit(i); });
+    } else {
+      submit(i);
+    }
+  }
+  for (auto& t : threads) {
+    t.join();
   }
   std::vector<bool> pending(specs.size(), true);
 
@@ -249,7 +269,8 @@ void testReadsQueued(const std::vector<TestSpec>& specs,
 void testReads(const std::vector<TestSpec>& specs,
                AsyncIO::PollMode pollMode) {
   testReadsSerially(specs, pollMode);
-  testReadsParallel(specs, pollMode);
+  testReadsParallel(specs, pollMode, false);
+  testReadsParallel(specs, pollMode, true);
   testReadsQueued(specs, pollMode);
 }