Properly std::chrono'ize HHWheelTimer
[folly.git] / folly / io / async / test / EventBaseTest.cpp
index c0abf9b0135e77db6370e234bc2bdd1ae909455a..454ece3c55dd7d697a79c8d9d8c48a829bec854a 100644 (file)
  * specific language governing permissions and limitations
  * under the License.
  */
+
 #include <folly/Memory.h>
+#include <folly/ScopeGuard.h>
 
 #include <folly/io/async/AsyncTimeout.h>
 #include <folly/io/async/EventBase.h>
 #include <folly/io/async/EventHandler.h>
 #include <folly/io/async/test/SocketPair.h>
 #include <folly/io/async/test/Util.h>
+#include <folly/portability/Unistd.h>
+
+#include <folly/futures/Promise.h>
 
 #include <atomic>
 #include <iostream>
-#include <unistd.h>
 #include <memory>
 #include <thread>
 
@@ -43,6 +47,8 @@ using std::chrono::milliseconds;
 using std::chrono::microseconds;
 using std::chrono::duration_cast;
 
+using namespace std::chrono_literals;
+
 using namespace folly;
 
 ///////////////////////////////////////////////////////////////////////////
@@ -53,9 +59,10 @@ enum { BUF_SIZE = 4096 };
 
 ssize_t writeToFD(int fd, size_t length) {
   // write an arbitrary amount of data to the fd
-  char buf[length];
-  memset(buf, 'a', sizeof(buf));
-  ssize_t rc = write(fd, buf, sizeof(buf));
+  auto bufv = vector<char>(length);
+  auto buf = bufv.data();
+  memset(buf, 'a', length);
+  ssize_t rc = write(fd, buf, length);
   CHECK_EQ(rc, length);
   return rc;
 }
@@ -79,8 +86,8 @@ size_t writeUntilFull(int fd) {
 
 ssize_t readFromFD(int fd, size_t length) {
   // write an arbitrary amount of data to the fd
-  char buf[length];
-  return read(fd, buf, sizeof(buf));
+  auto buf = vector<char>(length);
+  return read(fd, buf.data(), length);
 }
 
 size_t readUntilEmpty(int fd) {
@@ -141,7 +148,7 @@ class TestHandler : public EventHandler {
   TestHandler(EventBase* eventBase, int fd)
     : EventHandler(eventBase, fd), fd_(fd) {}
 
-  virtual void handlerReady(uint16_t events) noexcept {
+  void handlerReady(uint16_t events) noexcept override {
     ssize_t bytesRead = 0;
     ssize_t bytesWritten = 0;
     if (events & READ) {
@@ -155,7 +162,7 @@ class TestHandler : public EventHandler {
       bytesWritten = writeUntilFull(fd_);
     }
 
-    log.push_back(EventRecord(events, bytesRead, bytesWritten));
+    log.emplace_back(events, bytesRead, bytesWritten);
   }
 
   struct EventRecord {
@@ -190,9 +197,9 @@ TEST(EventBaseTest, ReadEvent) {
 
   // Register timeouts to perform two write events
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, 2345 },
-    { 160, EventHandler::WRITE, 99 },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, 2345, 0 },
+    { 160, EventHandler::WRITE, 99, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -231,11 +238,11 @@ TEST(EventBaseTest, ReadPersist) {
 
   // Register several timeouts to perform writes
   ScheduledEvent events[] = {
-    { 10,  EventHandler::WRITE, 1024 },
-    { 20,  EventHandler::WRITE, 2211 },
-    { 30,  EventHandler::WRITE, 4096 },
-    { 100, EventHandler::WRITE, 100 },
-    { 0, 0 },
+    { 10,  EventHandler::WRITE, 1024, 0 },
+    { 20,  EventHandler::WRITE, 2211, 0 },
+    { 30,  EventHandler::WRITE, 4096, 0 },
+    { 100, EventHandler::WRITE, 100,  0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -282,8 +289,8 @@ TEST(EventBaseTest, ReadImmediate) {
 
   // Register a timeout to perform another write
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, 2345 },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, 2345, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -329,9 +336,9 @@ TEST(EventBaseTest, WriteEvent) {
 
   // Register timeouts to perform two reads
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ, 0 },
-    { 60, EventHandler::READ, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ, 0, 0 },
+    { 60, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -370,11 +377,11 @@ TEST(EventBaseTest, WritePersist) {
 
   // Register several timeouts to read from the socket at several intervals
   ScheduledEvent events[] = {
-    { 10,  EventHandler::READ, 0 },
-    { 40,  EventHandler::READ, 0 },
-    { 70,  EventHandler::READ, 0 },
-    { 100, EventHandler::READ, 0 },
-    { 0, 0 },
+    { 10,  EventHandler::READ, 0, 0 },
+    { 40,  EventHandler::READ, 0, 0 },
+    { 70,  EventHandler::READ, 0, 0 },
+    { 100, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -414,8 +421,8 @@ TEST(EventBaseTest, WriteImmediate) {
 
   // Register a timeout to perform a read
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -464,9 +471,9 @@ TEST(EventBaseTest, ReadWrite) {
 
   // Register timeouts to perform a write then a read.
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, 2345 },
-    { 40, EventHandler::READ, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, 2345, 0 },
+    { 40, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -505,9 +512,9 @@ TEST(EventBaseTest, WriteRead) {
   // Register timeouts to perform a read then a write.
   size_t sock1WriteLength = 2345;
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ, 0 },
-    { 40, EventHandler::WRITE, sock1WriteLength },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ, 0, 0 },
+    { 40, EventHandler::WRITE, sock1WriteLength, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -551,8 +558,8 @@ TEST(EventBaseTest, ReadWriteSimultaneous) {
 
   // Register a timeout to perform a read and write together
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ | EventHandler::WRITE, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ | EventHandler::WRITE, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -590,13 +597,13 @@ TEST(EventBaseTest, ReadWritePersist) {
 
   // Register timeouts to perform several reads and writes
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, 2345 },
-    { 20, EventHandler::READ, 0 },
-    { 35, EventHandler::WRITE, 200 },
-    { 45, EventHandler::WRITE, 15 },
-    { 55, EventHandler::READ, 0 },
-    { 120, EventHandler::WRITE, 2345 },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, 2345, 0 },
+    { 20, EventHandler::READ, 0, 0 },
+    { 35, EventHandler::WRITE, 200, 0 },
+    { 45, EventHandler::WRITE, 15, 0 },
+    { 55, EventHandler::READ, 0, 0 },
+    { 120, EventHandler::WRITE, 2345, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -645,10 +652,10 @@ class PartialReadHandler : public TestHandler {
   PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
     : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
 
-  virtual void handlerReady(uint16_t events) noexcept {
+  void handlerReady(uint16_t events) noexcept override {
     assert(events == EventHandler::READ);
     ssize_t bytesRead = readFromFD(fd_, readLength_);
-    log.push_back(EventRecord(events, bytesRead, 0));
+    log.emplace_back(events, bytesRead, 0);
   }
 
  private:
@@ -673,8 +680,8 @@ TEST(EventBaseTest, ReadPartial) {
   // Register a timeout to perform a single write,
   // with more data than PartialReadHandler will read at once
   ScheduledEvent events[] = {
-    { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2) },
-    { 0, 0, 0 },
+    { 10, EventHandler::WRITE, (3*readLength) + (readLength / 2), 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -710,10 +717,10 @@ class PartialWriteHandler : public TestHandler {
   PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
     : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
 
-  virtual void handlerReady(uint16_t events) noexcept {
+  void handlerReady(uint16_t events) noexcept override {
     assert(events == EventHandler::WRITE);
     ssize_t bytesWritten = writeToFD(fd_, writeLength_);
-    log.push_back(EventRecord(events, 0, bytesWritten));
+    log.emplace_back(events, 0, bytesWritten);
   }
 
  private:
@@ -740,8 +747,8 @@ TEST(EventBaseTest, WritePartial) {
 
   // Register a timeout to read, so that more data can be written
   ScheduledEvent events[] = {
-    { 10, EventHandler::READ, 0 },
-    { 0, 0, 0 },
+    { 10, EventHandler::READ, 0, 0 },
+    { 0, 0, 0, 0 },
   };
   scheduleEvents(&eb, sp[1], events);
 
@@ -780,9 +787,7 @@ TEST(EventBaseTest, DestroyHandler) {
       : AsyncTimeout(eb)
       , handler_(h) {}
 
-    virtual void timeoutExpired() noexcept {
-      delete handler_;
-    }
+    void timeoutExpired() noexcept override { delete handler_; }
 
    private:
     EventHandler* handler_;
@@ -895,9 +900,7 @@ class TestTimeout : public AsyncTimeout {
     : AsyncTimeout(eventBase)
     , timestamp(false) {}
 
-  virtual void timeoutExpired() noexcept {
-    timestamp.reset();
-  }
+  void timeoutExpired() noexcept override { timestamp.reset(); }
 
   TimePoint timestamp;
 };
@@ -933,8 +936,8 @@ class ReschedulingTimeout : public AsyncTimeout {
     reschedule();
   }
 
-  virtual void timeoutExpired() noexcept {
-    timestamps.push_back(TimePoint());
+  void timeoutExpired() noexcept override {
+    timestamps.emplace_back();
     reschedule();
   }
 
@@ -1052,9 +1055,7 @@ TEST(EventBaseTest, DestroyTimeout) {
       : AsyncTimeout(eb)
       , timeout_(t) {}
 
-    virtual void timeoutExpired() noexcept {
-      delete timeout_;
-    }
+    void timeoutExpired() noexcept override { delete timeout_; }
 
    private:
     AsyncTimeout* timeout_;
@@ -1117,11 +1118,18 @@ void runInThreadTestFunc(RunInThreadArg* arg) {
 }
 
 TEST(EventBaseTest, RunInThread) {
-  uint32_t numThreads = 50;
-  uint32_t opsPerThread = 100;
+  constexpr uint32_t numThreads = 50;
+  constexpr uint32_t opsPerThread = 100;
   RunInThreadData data(numThreads, opsPerThread);
 
   deque<std::thread> threads;
+  SCOPE_EXIT {
+    // Wait on all of the threads.
+    for (auto& thread : threads) {
+      thread.join();
+    }
+  };
+
   for (uint32_t i = 0; i < numThreads; ++i) {
     threads.emplace_back([i, &data] {
         for (int n = 0; n < data.opsPerThread; ++n) {
@@ -1170,11 +1178,6 @@ TEST(EventBaseTest, RunInThread) {
   for (uint32_t n = 0; n < numThreads; ++n) {
     ASSERT_EQ(expectedValues[n], opsPerThread);
   }
-
-  // Wait on all of the threads.
-  for (auto& thread: threads) {
-    thread.join();
-  }
 }
 
 //  This test simulates some calls, and verifies that the waiting happens by
@@ -1187,24 +1190,23 @@ TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
     auto& atom = atoms.at(i);
     atom = make_unique<atomic<size_t>>(0);
   }
-  vector<thread> threads(c);
+  vector<thread> threads;
   for (size_t i = 0; i < c; ++i) {
-    auto& atom = *atoms.at(i);
-    auto& th = threads.at(i);
-    th = thread([&atom] {
-        EventBase eb;
-        auto ebth = thread([&]{ eb.loopForever(); });
-        eb.waitUntilRunning();
-        eb.runInEventBaseThreadAndWait([&] {
-          size_t x = 0;
-          atom.compare_exchange_weak(
-              x, 1, std::memory_order_release, std::memory_order_relaxed);
-        });
+    threads.emplace_back([&atoms, i] {
+      EventBase eb;
+      auto& atom = *atoms.at(i);
+      auto ebth = thread([&] { eb.loopForever(); });
+      eb.waitUntilRunning();
+      eb.runInEventBaseThreadAndWait([&] {
         size_t x = 0;
         atom.compare_exchange_weak(
-            x, 2, std::memory_order_release, std::memory_order_relaxed);
-        eb.terminateLoopSoon();
-        ebth.join();
+            x, 1, std::memory_order_release, std::memory_order_relaxed);
+      });
+      size_t x = 0;
+      atom.compare_exchange_weak(
+          x, 2, std::memory_order_release, std::memory_order_relaxed);
+      eb.terminateLoopSoon();
+      ebth.join();
     });
   }
   for (size_t i = 0; i < c; ++i) {
@@ -1269,7 +1271,7 @@ class CountedLoopCallback : public EventBase::LoopCallback {
     , count_(count)
     , action_(action) {}
 
-  virtual void runLoopCallback() noexcept {
+  void runLoopCallback() noexcept override {
     --count_;
     if (count_ > 0) {
       eventBase_->runInLoop(this);
@@ -1437,7 +1439,7 @@ class TerminateTestCallback : public EventBase::LoopCallback,
     unregisterHandler();
   }
 
-  virtual void handlerReady(uint16_t events) noexcept {
+  void handlerReady(uint16_t /* events */) noexcept override {
     // We didn't register with PERSIST, so we will have been automatically
     // unregistered already.
     ASSERT_FALSE(isHandlerRegistered());
@@ -1449,7 +1451,7 @@ class TerminateTestCallback : public EventBase::LoopCallback,
 
     eventBase_->runInLoop(this);
   }
-  virtual void runLoopCallback() noexcept {
+  void runLoopCallback() noexcept override {
     ++loopInvocations_;
     if (loopInvocations_ >= maxLoopInvocations_) {
       return;
@@ -1526,9 +1528,9 @@ class IdleTimeTimeoutSeries : public AsyncTimeout {
       scheduleTimeout(1);
     }
 
-  virtual ~IdleTimeTimeoutSeries() {}
+    ~IdleTimeTimeoutSeries() override {}
 
-  void timeoutExpired() noexcept {
+    void timeoutExpired() noexcept override {
     ++timeouts_;
 
     if(timeout_.empty()){
@@ -1563,7 +1565,7 @@ class IdleTimeTimeoutSeries : public AsyncTimeout {
  */
 TEST(EventBaseTest, IdleTime) {
   EventBase eventBase;
-  eventBase.setLoadAvgMsec(1000);
+  eventBase.setLoadAvgMsec(1000ms);
   eventBase.resetLoadAvg(5900.0);
   std::deque<uint64_t> timeouts0(4, 8080);
   timeouts0.push_front(8000);
@@ -1576,30 +1578,25 @@ TEST(EventBaseTest, IdleTime) {
   bool hostOverloaded = false;
 
   int latencyCallbacks = 0;
-  eventBase.setMaxLatency(6000, [&]() {
+  eventBase.setMaxLatency(6000us, [&]() {
     ++latencyCallbacks;
-
-    switch (latencyCallbacks) {
-    case 1:
-      if (tos0.getTimeouts() < 6) {
-        // This could only happen if the host this test is running
-        // on is heavily loaded.
-        int64_t maxLatencyReached = duration_cast<microseconds>(
-            std::chrono::steady_clock::now().time_since_epoch()).count();
-        ASSERT_LE(43800, maxLatencyReached - testStart);
-        hostOverloaded = true;
-        break;
-      }
-      ASSERT_EQ(6, tos0.getTimeouts());
-      ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
-      ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
-      tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
-      break;
-
-    default:
+    if (latencyCallbacks != 1) {
       FAIL() << "Unexpected latency callback";
-      break;
     }
+
+    if (tos0.getTimeouts() < 6) {
+      // This could only happen if the host this test is running
+      // on is heavily loaded.
+      int64_t maxLatencyReached = duration_cast<microseconds>(
+          std::chrono::steady_clock::now().time_since_epoch()).count();
+      ASSERT_LE(43800, maxLatencyReached - testStart);
+      hostOverloaded = true;
+      return;
+    }
+    ASSERT_EQ(6, tos0.getTimeouts());
+    ASSERT_GE(6100, eventBase.getAvgLoopTime() - 1200);
+    ASSERT_LE(6100, eventBase.getAvgLoopTime() + 1200);
+    tos.reset(new IdleTimeTimeoutSeries(&eventBase, timeouts));
   });
 
   // Kick things off with an "immedite" timeout
@@ -1696,9 +1693,7 @@ public:
   PipeHandler(EventBase* eventBase, int fd)
     : EventHandler(eventBase, fd) {}
 
-  void handlerReady(uint16_t events) noexcept {
-    abort();
-  }
+  void handlerReady(uint16_t /* events */) noexcept override { abort(); }
 };
 
 TEST(EventBaseTest, StopBeforeLoop) {
@@ -1736,3 +1731,184 @@ TEST(EventBaseTest, RunCallbacksOnDestruction) {
 
   ASSERT_TRUE(ran);
 }
+
+TEST(EventBaseTest, LoopKeepAlive) {
+  EventBase evb;
+
+  bool done = false;
+  std::thread t([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
+    /* sleep override */ std::this_thread::sleep_for(
+        std::chrono::milliseconds(100));
+    evb.runInEventBaseThread(
+        [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+  });
+
+  evb.loop();
+
+  ASSERT_TRUE(done);
+
+  t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveInLoop) {
+  EventBase evb;
+
+  bool done = false;
+  std::thread t;
+
+  evb.runInEventBaseThread([&] {
+    t = std::thread([&, loopKeepAlive = evb.loopKeepAlive() ]() mutable {
+      /* sleep override */ std::this_thread::sleep_for(
+          std::chrono::milliseconds(100));
+      evb.runInEventBaseThread(
+          [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+    });
+  });
+
+  evb.loop();
+
+  ASSERT_TRUE(done);
+
+  t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
+  std::unique_ptr<EventBase> evb = folly::make_unique<EventBase>();
+
+  bool done = false;
+
+  std::thread evThread([&] {
+    evb->loopForever();
+    evb.reset();
+    done = true;
+  });
+
+  {
+    auto* ev = evb.get();
+    EventBase::LoopKeepAlive keepAlive;
+    ev->runInEventBaseThreadAndWait(
+        [&ev, &keepAlive] { keepAlive = ev->loopKeepAlive(); });
+    ASSERT_FALSE(done) << "Loop finished before we asked it to";
+    ev->terminateLoopSoon();
+    /* sleep override */
+    std::this_thread::sleep_for(std::chrono::milliseconds(30));
+    ASSERT_FALSE(done) << "Loop terminated early";
+    ev->runInEventBaseThread([&ev, keepAlive = std::move(keepAlive) ]{});
+  }
+
+  evThread.join();
+  ASSERT_TRUE(done);
+}
+
+TEST(EventBaseTest, LoopKeepAliveShutdown) {
+  auto evb = folly::make_unique<EventBase>();
+
+  bool done = false;
+
+  std::thread t([
+    &done,
+    loopKeepAlive = evb->loopKeepAlive(),
+    evbPtr = evb.get()
+  ]() mutable {
+    /* sleep override */ std::this_thread::sleep_for(
+        std::chrono::milliseconds(100));
+    evbPtr->runInEventBaseThread(
+        [&done, loopKeepAlive = std::move(loopKeepAlive) ] { done = true; });
+  });
+
+  evb.reset();
+
+  ASSERT_TRUE(done);
+
+  t.join();
+}
+
+TEST(EventBaseTest, LoopKeepAliveAtomic) {
+  auto evb = folly::make_unique<EventBase>();
+
+  static constexpr size_t kNumThreads = 100;
+  static constexpr size_t kNumTasks = 100;
+
+  std::vector<std::thread> ts;
+  std::vector<std::unique_ptr<Baton<>>> batons;
+  size_t done{0};
+
+  for (size_t i = 0; i < kNumThreads; ++i) {
+    batons.emplace_back(std::make_unique<Baton<>>());
+  }
+
+  for (size_t i = 0; i < kNumThreads; ++i) {
+    ts.emplace_back([ evbPtr = evb.get(), batonPtr = batons[i].get(), &done ] {
+      std::vector<EventBase::LoopKeepAlive> keepAlives;
+      for (size_t j = 0; j < kNumTasks; ++j) {
+        keepAlives.emplace_back(evbPtr->loopKeepAliveAtomic());
+      }
+
+      batonPtr->post();
+
+      /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+      for (auto& keepAlive : keepAlives) {
+        evbPtr->runInEventBaseThread(
+            [&done, keepAlive = std::move(keepAlive) ]() { ++done; });
+      }
+    });
+  }
+
+  for (auto& baton : batons) {
+    baton->wait();
+  }
+
+  evb.reset();
+
+  EXPECT_EQ(kNumThreads * kNumTasks, done);
+
+  for (auto& t : ts) {
+    t.join();
+  }
+}
+
+TEST(EventBaseTest, DrivableExecutorTest) {
+  folly::Promise<bool> p;
+  auto f = p.getFuture();
+  EventBase base;
+  bool finished = false;
+
+  std::thread t([&] {
+    /* sleep override */
+    std::this_thread::sleep_for(std::chrono::microseconds(10));
+    finished = true;
+    base.runInEventBaseThread([&]() { p.setValue(true); });
+  });
+
+  // Ensure drive does not busy wait
+  base.drive(); // TODO: fix notification queue init() extra wakeup
+  base.drive();
+  EXPECT_TRUE(finished);
+
+  folly::Promise<bool> p2;
+  auto f2 = p2.getFuture();
+  // Ensure waitVia gets woken up properly, even from
+  // a separate thread.
+  base.runAfterDelay([&]() { p2.setValue(true); }, 10);
+  f2.waitVia(&base);
+  EXPECT_TRUE(f2.isReady());
+
+  t.join();
+}
+
+TEST(EventBaseTest, RequestContextTest) {
+  EventBase evb;
+  auto defaultCtx = RequestContext::get();
+
+  {
+    RequestContextScopeGuard rctx;
+    auto context = RequestContext::get();
+    EXPECT_NE(defaultCtx, context);
+    evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
+  }
+
+  EXPECT_EQ(defaultCtx, RequestContext::get());
+  evb.loop();
+  EXPECT_EQ(defaultCtx, RequestContext::get());
+}