+
+class PipeHandler : public EventHandler {
+ public:
+ PipeHandler(EventBase* eventBase, int fd)
+ : EventHandler(eventBase, fd) {}
+
+ void handlerReady(uint16_t /* events */) noexcept override { abort(); }
+};
+
+TEST(EventBaseTest, StopBeforeLoop) {
+ EventBase evb;
+
+ // Give the evb something to do.
+ int p[2];
+ ASSERT_EQ(0, pipe(p));
+ PipeHandler handler(&evb, p[0]);
+ handler.registerHandler(EventHandler::READ);
+
+ // It's definitely not running yet
+ evb.terminateLoopSoon();
+
+ // let it run, it should exit quickly.
+ std::thread t([&] { evb.loop(); });
+ t.join();
+
+ handler.unregisterHandler();
+ close(p[0]);
+ close(p[1]);
+
+ SUCCEED();
+}
+
+TEST(EventBaseTest, RunCallbacksOnDestruction) {
+ bool ran = false;
+
+ {
+ EventBase base;
+ base.runInEventBaseThread([&](){
+ ran = true;
+ });
+ }
+
+ ASSERT_TRUE(ran);
+}
+
+TEST(EventBaseTest, LoopKeepAlive) {
+ EventBase evb;
+
+ bool done = false;
+ std::thread t([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds(100));
+ evb.runInEventBaseThread(
+ [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+ });
+
+ evb.loop();
+
+ ASSERT_TRUE(done);
+
+ t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveInLoop) {
+ EventBase evb;
+
+ bool done = false;
+ std::thread t;
+
+ evb.runInEventBaseThread([&] {
+ t = std::thread([&, loopKeepAlive = evb.getKeepAliveToken() ]() mutable {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds(100));
+ evb.runInEventBaseThread(
+ [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+ });
+ });
+
+ evb.loop();
+
+ ASSERT_TRUE(done);
+
+ t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
+ std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
+
+ bool done = false;
+
+ std::thread evThread([&] {
+ evb->loopForever();
+ evb.reset();
+ done = true;
+ });
+
+ {
+ auto* ev = evb.get();
+ Executor::KeepAlive keepAlive;
+ ev->runInEventBaseThreadAndWait(
+ [&ev, &keepAlive] { keepAlive = ev->getKeepAliveToken(); });
+ ASSERT_FALSE(done) << "Loop finished before we asked it to";
+ ev->terminateLoopSoon();
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(30));
+ ASSERT_FALSE(done) << "Loop terminated early";
+ ev->runInEventBaseThread([keepAlive = std::move(keepAlive)]{});
+ }
+
+ evThread.join();
+ ASSERT_TRUE(done);
+}
+
+TEST(EventBaseTest, LoopKeepAliveShutdown) {
+ auto evb = std::make_unique<EventBase>();
+
+ bool done = false;
+
+ std::thread t([
+ &done,
+ loopKeepAlive = evb->getKeepAliveToken(),
+ evbPtr = evb.get()
+ ]() mutable {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::milliseconds(100));
+ evbPtr->runInEventBaseThread(
+ [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+ });
+
+ evb.reset();
+
+ ASSERT_TRUE(done);
+
+ t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveAtomic) {
+ auto evb = std::make_unique<EventBase>();
+
+ static constexpr size_t kNumThreads = 100;
+ static constexpr size_t kNumTasks = 100;
+
+ std::vector<std::thread> ts;
+ std::vector<std::unique_ptr<Baton<>>> batons;
+ size_t done{0};
+
+ for (size_t i = 0; i < kNumThreads; ++i) {
+ batons.emplace_back(std::make_unique<Baton<>>());
+ }
+
+ for (size_t i = 0; i < kNumThreads; ++i) {
+ ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
+ std::vector<Executor::KeepAlive> keepAlives;
+ for (size_t j = 0; j < kNumTasks; ++j) {
+ keepAlives.emplace_back(evbPtr->getKeepAliveToken());
+ }
+
+ batonPtr->post();
+
+ /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ for (auto& keepAlive : keepAlives) {
+ evbPtr->runInEventBaseThread(
+ [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
+ }
+ });
+ }
+
+ for (auto& baton : batons) {
+ baton->wait();
+ }
+
+ evb.reset();
+
+ EXPECT_EQ(kNumThreads * kNumTasks, done);
+
+ for (auto& t : ts) {
+ t.join();
+ }
+}
+
+TEST(EventBaseTest, DrivableExecutorTest) {
+ folly::Promise<bool> p;
+ auto f = p.getFuture();
+ EventBase base;
+ bool finished = false;
+
+ std::thread t([&] {
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::microseconds(10));
+ finished = true;
+ base.runInEventBaseThread([&]() { p.setValue(true); });
+ });
+
+ // Ensure drive does not busy wait
+ base.drive(); // TODO: fix notification queue init() extra wakeup
+ base.drive();
+ EXPECT_TRUE(finished);
+
+ folly::Promise<bool> p2;
+ auto f2 = p2.getFuture();
+ // Ensure waitVia gets woken up properly, even from
+ // a separate thread.
+ base.runAfterDelay([&]() { p2.setValue(true); }, 10);
+ f2.waitVia(&base);
+ EXPECT_TRUE(f2.isReady());
+
+ t.join();
+}
+
+TEST(EventBaseTest, RequestContextTest) {
+ EventBase evb;
+ auto defaultCtx = RequestContext::get();
+
+ {
+ RequestContextScopeGuard rctx;
+ auto context = RequestContext::get();
+ EXPECT_NE(defaultCtx, context);
+ evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
+ }
+
+ EXPECT_EQ(defaultCtx, RequestContext::get());
+ evb.loop();
+ EXPECT_EQ(defaultCtx, RequestContext::get());
+}