#include <folly/io/async/EventBase.h>
+#include <folly/Baton.h>
#include <folly/ThreadName.h>
#include <folly/io/async/NotificationQueue.h>
return true;
}
+bool EventBase::runInEventBaseThreadAndWait(void (*fn)(void*), void* arg) {
+ if (inRunningEventBaseThread()) {
+ LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
+ << "allowed";
+ return false;
+ }
+
+ Baton<> ready;
+ runInEventBaseThread([&] {
+ fn(arg);
+ ready.post();
+ });
+ ready.wait();
+
+ return true;
+}
+
+bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) {
+ if (inRunningEventBaseThread()) {
+ LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
+ << "allowed";
+ return false;
+ }
+
+ Baton<> ready;
+ runInEventBaseThread([&] {
+ fn();
+ ready.post();
+ });
+ ready.wait();
+
+ return true;
+}
+
bool EventBase::runAfterDelay(const Cob& cob,
int milliseconds,
TimeoutManager::InternalEnum in) {
*/
bool runInEventBaseThread(const Cob& fn);
+ /*
+ * Like runInEventBaseThread, but the caller waits for the callback to be
+ * executed.
+ */
+ template<typename T>
+ bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
+ return runInEventBaseThreadAndWait(reinterpret_cast<void (*)(void*)>(fn),
+ reinterpret_cast<void*>(arg));
+ }
+
+ /*
+ * Like runInEventBaseThread, but the caller waits for the callback to be
+ * executed.
+ */
+ bool runInEventBaseThreadAndWait(void (*fn)(void*), void* arg);
+
+ /*
+ * Like runInEventBaseThread, but the caller waits for the callback to be
+ * executed.
+ */
+ bool runInEventBaseThreadAndWait(const Cob& fn);
+
/**
* Runs the given Cob at some time after the specified number of
* milliseconds. (No guarantees exactly when.)
#include <folly/io/async/test/SocketPair.h>
#include <folly/io/async/test/Util.h>
+#include <atomic>
#include <iostream>
#include <unistd.h>
#include <memory>
#include <thread>
+using std::atomic;
using std::deque;
using std::pair;
using std::vector;
+using std::thread;
using std::make_pair;
using std::cerr;
using std::endl;
}
}
+// This test simulates some calls, and verifies that the waiting happens by
+// triggering what otherwise would be race conditions, and trying to detect
+// whether any of the race conditions happened.
+TEST(EventBaseTest, RunInEventLoopThreadAndWait) {
+ const size_t c = 256;
+ vector<atomic<size_t>> atoms(c);
+ for (size_t i = 0; i < c; ++i) {
+ auto& atom = atoms.at(i);
+ atom = 0;
+ }
+ vector<thread> threads(c);
+ for (size_t i = 0; i < c; ++i) {
+ auto& atom = atoms.at(i);
+ auto& th = threads.at(i);
+ th = thread([&atom] {
+ EventBase eb;
+ auto ebth = thread([&]{ eb.loopForever(); });
+ eb.waitUntilRunning();
+ eb.runInEventBaseThreadAndWait([&] {
+ size_t x = 0;
+ atom.compare_exchange_weak(
+ x, 1, std::memory_order_release, std::memory_order_relaxed);
+ });
+ size_t x = 0;
+ atom.compare_exchange_weak(
+ x, 2, std::memory_order_release, std::memory_order_relaxed);
+ eb.terminateLoopSoon();
+ ebth.join();
+ });
+ }
+ for (size_t i = 0; i < c; ++i) {
+ auto& th = threads.at(i);
+ th.join();
+ }
+ size_t sum = 0;
+ for (auto& atom : atoms) sum += atom;
+ EXPECT_EQ(c, sum);
+}
+
///////////////////////////////////////////////////////////////////////////
// Tests for runInLoop()
///////////////////////////////////////////////////////////////////////////