Fix stop_ thread race
[folly.git] / folly / io / async / EventBase.cpp
index daf4049b81efa9091a76a73f5821f652b0f3d336..9280f001513551a65a0929584129675b7a22c710 100644 (file)
@@ -44,7 +44,7 @@ class FunctionLoopCallback : public EventBase::LoopCallback {
   explicit FunctionLoopCallback(const Cob& function)
       : function_(function) {}
 
-  virtual void runLoopCallback() noexcept {
+  void runLoopCallback() noexcept override {
     function_();
     delete this;
   }
@@ -66,7 +66,7 @@ const int kNoFD = -1;
 class EventBase::FunctionRunner
     : public NotificationQueue<std::pair<void (*)(void*), void*>>::Consumer {
  public:
-  void messageAvailable(std::pair<void (*)(void*), void*>&& msg) {
+  void messageAvailable(std::pair<void (*)(void*), void*>&& msg) override {
 
     // In libevent2, internal events do not break the loop.
     // Most users would expect loop(), followed by runInEventBaseThread(),
@@ -155,7 +155,8 @@ EventBase::EventBase(bool enableTimeMeasurement)
   , latestLoopCnt_(nextLoopCnt_)
   , startWork_(0)
   , observer_(nullptr)
-  , observerSampleCount_(0) {
+  , observerSampleCount_(0)
+  , executionObserver_(nullptr) {
   {
     std::lock_guard<std::mutex> lock(libevent_mutex_);
 
@@ -174,7 +175,7 @@ EventBase::EventBase(bool enableTimeMeasurement)
   }
   VLOG(5) << "EventBase(): Created.";
   initNotificationQueue();
-  RequestContext::getStaticContext();
+  RequestContext::saveContext();
 }
 
 // takes ownership of the event_base
@@ -193,13 +194,14 @@ EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
   , latestLoopCnt_(nextLoopCnt_)
   , startWork_(0)
   , observer_(nullptr)
-  , observerSampleCount_(0) {
+  , observerSampleCount_(0)
+  , executionObserver_(nullptr) {
   if (UNLIKELY(evb_ == nullptr)) {
     LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
     throw std::invalid_argument("EventBase(): event base cannot be nullptr");
   }
   initNotificationQueue();
-  RequestContext::getStaticContext();
+  RequestContext::saveContext();
 }
 
 EventBase::~EventBase() {
@@ -313,8 +315,7 @@ bool EventBase::loopBody(int flags) {
       std::chrono::steady_clock::now().time_since_epoch()).count();
   }
 
-  // TODO: Read stop_ atomically with an acquire barrier.
-  while (!stop_) {
+  while (!stop_.load(std::memory_order_acquire)) {
     ++nextLoopCnt_;
 
     // Run the before loop callbacks
@@ -519,7 +520,7 @@ void EventBase::runInLoop(Cob&& cob, bool thisIteration) {
 }
 
 void EventBase::runOnDestruction(LoopCallback* callback) {
-  DCHECK(isInEventBaseThread());
+  std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
   callback->cancelLoopCallback();
   onDestructionCallbacks_.push_back(*callback);
 }