Summary:
Existing scheduleThreadSafe implementation had 2 potential races on destruction:
1. (very unlikely) insertHead is complete, but fiber loop is already running on another thread, so it finishes processing all of the fibers, destroys FiberManager or EventBase or both. By the time we get to scheduleThreadSafe EventBaseLoopController is already destoyed
2. (more likely) scheduleThreadSafe is complete, but FiberManager loop which is already running, picks complete fiber, finishes the processing. After that FiberManager may be destoyed. So when EventBase actually executes the callback FiberManager is already dead.
This solution fixes both races. Holding the alive shared_ptr when completing sheduleThreadSafe assures EventBase can't be destoyed until its completed (or it won't try to schedule anything after EventBase was destroyed). Locking alive weak_ptr in the EventBase loop callback ensures FiberManager and thus EventBaseLoopController were not destroyed yet (they can be destoyed only by the same thread which is running EventBase loop).
Reviewed By: spalamarchuk
Differential Revision:
D2763206
fb-gh-sync-id:
1972d6c0c11aa931747ebdaed4029a209130f69c
#include <folly/Memory.h>
#include <folly/experimental/fibers/EventBaseLoopController.h>
#include <folly/experimental/fibers/FiberManager.h>
-#include <folly/io/async/EventBase.h>
namespace folly { namespace fibers {
-class EventBaseLoopController::ControllerCallback :
- public folly::EventBase::LoopCallback {
- public:
- explicit ControllerCallback(EventBaseLoopController& controller)
- : controller_(controller) {}
-
- void runLoopCallback() noexcept override {
- controller_.runLoop();
- }
- private:
- EventBaseLoopController& controller_;
-};
-
inline EventBaseLoopController::EventBaseLoopController()
- : callback_(folly::make_unique<ControllerCallback>(*this)) {
-}
+ : callback_(*this), aliveWeak_(destructionCallback_.getWeak()) {}
inline EventBaseLoopController::~EventBaseLoopController() {
- callback_->cancelLoopCallback();
+ callback_.cancelLoopCallback();
}
inline void EventBaseLoopController::attachEventBase(
}
eventBase_ = &eventBase;
+ eventBase_->runOnDestruction(&destructionCallback_);
eventBaseAttached_ = true;
awaitingScheduling_ = true;
} else {
// Schedule it to run in current iteration.
- eventBase_->runInLoop(callback_.get(), true);
+ eventBase_->runInLoop(&callback_, true);
awaitingScheduling_ = false;
}
}
inline void EventBaseLoopController::cancel() {
- callback_->cancelLoopCallback();
+ callback_.cancelLoopCallback();
}
inline void EventBaseLoopController::runLoop() {
fm_->loopUntilNoReady();
}
-inline void EventBaseLoopController::scheduleThreadSafe() {
+inline void EventBaseLoopController::scheduleThreadSafe(
+ std::function<bool()> func) {
/* The only way we could end up here is if
1) Fiber thread creates a fiber that awaits (which means we must
have already attached, fiber thread wouldn't be running).
2) We move the promise to another thread (this move is a memory fence)
3) We fulfill the promise from the other thread. */
assert(eventBaseAttached_);
- eventBase_->runInEventBaseThread([this] () { runLoop(); });
+
+ auto alive = aliveWeak_.lock();
+
+ if (func() && alive) {
+ auto aliveWeak = aliveWeak_;
+ eventBase_->runInEventBaseThread([this, aliveWeak]() {
+ if (!aliveWeak.expired()) {
+ runLoop();
+ }
+ });
+ }
}
inline void EventBaseLoopController::timedSchedule(std::function<void()> func,
#include <memory>
#include <atomic>
#include <folly/experimental/fibers/LoopController.h>
+#include <folly/io/async/EventBase.h>
namespace folly {
class EventBase;
}
private:
- class ControllerCallback;
+ class ControllerCallback : public folly::EventBase::LoopCallback {
+ public:
+ explicit ControllerCallback(EventBaseLoopController& controller)
+ : controller_(controller) {}
+
+ void runLoopCallback() noexcept override { controller_.runLoop(); }
+
+ private:
+ EventBaseLoopController& controller_;
+ };
+
+ class DestructionCallback : public folly::EventBase::LoopCallback {
+ public:
+ DestructionCallback() : alive_(new int(42)) {}
+ ~DestructionCallback() { reset(); }
+
+ void runLoopCallback() noexcept override { reset(); }
+
+ std::weak_ptr<void> getWeak() { return {alive_}; }
+
+ private:
+ void reset() {
+ std::weak_ptr<void> aliveWeak(alive_);
+ alive_.reset();
+
+ while (!aliveWeak.expired()) {
+ // Spin until all operations requiring EventBaseLoopController to be
+ // alive are complete.
+ }
+ }
+
+ std::shared_ptr<void> alive_;
+ };
bool awaitingScheduling_{false};
folly::EventBase* eventBase_{nullptr};
- std::unique_ptr<ControllerCallback> callback_;
+ ControllerCallback callback_;
+ DestructionCallback destructionCallback_;
FiberManager* fm_{nullptr};
std::atomic<bool> eventBaseAttached_{false};
+ std::weak_ptr<void> aliveWeak_;
/* LoopController interface */
void schedule() override;
void cancel() override;
void runLoop();
- void scheduleThreadSafe() override;
+ void scheduleThreadSafe(std::function<bool()> func) override;
void timedSchedule(std::function<void()> func, TimePoint time) override;
friend class FiberManager;
}
return folly::make_unique<RemoteTask>(std::forward<F>(func));
}();
- if (remoteTaskQueue_.insertHead(task.release())) {
- loopController_->scheduleThreadSafe();
- }
+ auto insertHead =
+ [&]() { return remoteTaskQueue_.insertHead(task.release()); };
+ loopController_->scheduleThreadSafe(std::ref(insertHead));
}
template <typename X>
if (observer_) {
observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
}
- if (remoteReadyQueue_.insertHead(fiber)) {
- loopController_->scheduleThreadSafe();
- }
+ auto insertHead = [&]() { return remoteReadyQueue_.insertHead(fiber); };
+ loopController_->scheduleThreadSafe(std::ref(insertHead));
}
void FiberManager::setObserver(ExecutionObserver* observer) {
/**
* Same as schedule(), but safe to call from any thread.
+ * Runs func and only schedules if func returned true.
*/
- virtual void scheduleThreadSafe() = 0;
+ virtual void scheduleThreadSafe(std::function<bool()> func) = 0;
/**
* Called by FiberManager to cancel a previously scheduled
scheduled_ = false;
}
- void scheduleThreadSafe() override {
- ++remoteScheduleCalled_;
- scheduled_ = true;
+ void scheduleThreadSafe(std::function<bool()> func) override {
+ if (func()) {
+ ++remoteScheduleCalled_;
+ scheduled_ = true;
+ }
}
friend class FiberManager;