EventBase::runInEventLoopThreadAndWait.
authorYedidya Feldblum <yfeldblum@fb.com>
Mon, 2 Feb 2015 19:37:23 +0000 (11:37 -0800)
committerAndrew Cox <andrewcox@fb.com>
Wed, 4 Feb 2015 20:58:31 +0000 (12:58 -0800)
Summary:
[Folly] EventBase::runInEventLoopThreadAndWait.

Useful for when some code needs to be run in the event loop thread, but another thread needs to trigger the code and then wait for it to be done.

Test Plan:
Unit tests:
* `folly/io/async/test/EventBaseTest.cpp`

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, folly-diffs@, brettp, dougw

FB internal diff: D1810764

Signature: t1:1810764:1422900654:7ff0aa7feb2792266f620b344cf8a1110a09f7ef

folly/io/async/EventBase.cpp
folly/io/async/EventBase.h
folly/io/async/test/EventBaseTest.cpp

index e956ca1881623e245369394c6e297243072956ae..00aa9d80d0a0b7ea63ba31453de1afda67c4bb1f 100644 (file)
@@ -20,6 +20,7 @@
 
 #include <folly/io/async/EventBase.h>
 
+#include <folly/Baton.h>
 #include <folly/ThreadName.h>
 #include <folly/io/async/NotificationQueue.h>
 
@@ -562,6 +563,40 @@ bool EventBase::runInEventBaseThread(const Cob& fn) {
   return true;
 }
 
+bool EventBase::runInEventBaseThreadAndWait(void (*fn)(void*), void* arg) {
+  if (inRunningEventBaseThread()) {
+    LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
+               << "allowed";
+    return false;
+  }
+
+  Baton<> ready;
+  runInEventBaseThread([&] {
+      fn(arg);
+      ready.post();
+  });
+  ready.wait();
+
+  return true;
+}
+
+bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) {
+  if (inRunningEventBaseThread()) {
+    LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
+               << "allowed";
+    return false;
+  }
+
+  Baton<> ready;
+  runInEventBaseThread([&] {
+      fn();
+      ready.post();
+  });
+  ready.wait();
+
+  return true;
+}
+
 bool EventBase::runAfterDelay(const Cob& cob,
                                int milliseconds,
                                TimeoutManager::InternalEnum in) {
index 484977b13563c9298ba57cf2328cdb3a06c48c28..3cdfd2ddc1d15426515e04737dae104568319999 100644 (file)
@@ -346,6 +346,28 @@ class EventBase : private boost::noncopyable,
    */
   bool runInEventBaseThread(const Cob& fn);
 
+  /*
+   * Like runInEventBaseThread, but the caller waits for the callback to be
+   * executed.
+   */
+  template<typename T>
+  bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
+    return runInEventBaseThreadAndWait(reinterpret_cast<void (*)(void*)>(fn),
+                                       reinterpret_cast<void*>(arg));
+  }
+
+  /*
+   * Like runInEventBaseThread, but the caller waits for the callback to be
+   * executed.
+   */
+  bool runInEventBaseThreadAndWait(void (*fn)(void*), void* arg);
+
+  /*
+   * Like runInEventBaseThread, but the caller waits for the callback to be
+   * executed.
+   */
+  bool runInEventBaseThreadAndWait(const Cob& fn);
+
   /**
    * Runs the given Cob at some time after the specified number of
    * milliseconds.  (No guarantees exactly when.)
index cacf18feb8a40bbacf7504d001ba9d5e907fc622..255054998223b82dfac0b7d5bf309a54bf72ec18 100644 (file)
 #include <folly/io/async/test/SocketPair.h>
 #include <folly/io/async/test/Util.h>
 
+#include <atomic>
 #include <iostream>
 #include <unistd.h>
 #include <memory>
 #include <thread>
 
+using std::atomic;
 using std::deque;
 using std::pair;
 using std::vector;
+using std::thread;
 using std::make_pair;
 using std::cerr;
 using std::endl;
@@ -1171,6 +1174,45 @@ TEST(EventBaseTest, RunInThread) {
   }
 }
 
+//  This test simulates some calls, and verifies that the waiting happens by
+//  triggering what otherwise would be race conditions, and trying to detect
+//  whether any of the race conditions happened.
+TEST(EventBaseTest, RunInEventLoopThreadAndWait) {
+  const size_t c = 256;
+  vector<atomic<size_t>> atoms(c);
+  for (size_t i = 0; i < c; ++i) {
+    auto& atom = atoms.at(i);
+    atom = 0;
+  }
+  vector<thread> threads(c);
+  for (size_t i = 0; i < c; ++i) {
+    auto& atom = atoms.at(i);
+    auto& th = threads.at(i);
+    th = thread([&atom] {
+        EventBase eb;
+        auto ebth = thread([&]{ eb.loopForever(); });
+        eb.waitUntilRunning();
+        eb.runInEventBaseThreadAndWait([&] {
+          size_t x = 0;
+          atom.compare_exchange_weak(
+              x, 1, std::memory_order_release, std::memory_order_relaxed);
+        });
+        size_t x = 0;
+        atom.compare_exchange_weak(
+            x, 2, std::memory_order_release, std::memory_order_relaxed);
+        eb.terminateLoopSoon();
+        ebth.join();
+    });
+  }
+  for (size_t i = 0; i < c; ++i) {
+    auto& th = threads.at(i);
+    th.join();
+  }
+  size_t sum = 0;
+  for (auto& atom : atoms) sum += atom;
+  EXPECT_EQ(c, sum);
+}
+
 ///////////////////////////////////////////////////////////////////////////
 // Tests for runInLoop()
 ///////////////////////////////////////////////////////////////////////////