Add 'runnable' callback to ExecutionObserver
authorBrian Watling <bwatling@fb.com>
Tue, 19 May 2015 15:30:03 +0000 (08:30 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Wed, 20 May 2015 17:57:11 +0000 (10:57 -0700)
Summary: Add a callback when a fiber becomes runnable

Test Plan: unit tests

Reviewed By: andrii@fb.com

Subscribers: alikhtarov, folly-diffs@, yfeldblum, chalfant

FB internal diff: D2081306

Signature: t1:2081306:1432011152:0ee93cb2682eb2a289b99c403e91465e72dd4ee8

folly/experimental/fibers/ExecutionObserver.h
folly/experimental/fibers/Fiber.cpp
folly/experimental/fibers/FiberManager-inl.h
folly/experimental/fibers/FiberManager.cpp

index 5cf1cc00c5fe72dd34e40bf8db47ea5899989f1f..ef5f8fb0bf8414c11d7d3d0bd3d47bea8c793489 100644 (file)
@@ -15,6 +15,8 @@
  */
 #pragma once
 
+#include <cstdint>
+
 namespace folly { namespace fibers {
 
 /**
@@ -26,13 +28,24 @@ class ExecutionObserver {
 
   /**
    * Called when a task is about to start executing.
+   *
+   * @param id Unique id for the fiber which is starting.
+   */
+  virtual void starting(uintptr_t id) noexcept = 0;
+
+  /**
+   * Called when a task is ready to run.
+   *
+   * @param id Unique id for the fiber which is ready to run.
    */
-  virtual void starting() noexcept = 0;
+  virtual void runnable(uintptr_t id) noexcept = 0;
 
   /**
    * Called just after a task stops executing.
+   *
+   * @param id Unique id for the fiber which is stopping.
    */
-  virtual void stopped() noexcept = 0;
+  virtual void stopped(uintptr_t id) noexcept = 0;
 };
 
 }} // namespace folly::fibers
index 40e9464158958c4f3a62e21c3320cb7e9674550e..2a562774728d5d388425e359fa2d54847153c6cf 100644 (file)
@@ -60,6 +60,10 @@ void Fiber::setData(intptr_t data) {
   data_ = data;
   state_ = READY_TO_RUN;
 
+  if (fiberManager_.observer_) {
+    fiberManager_.observer_->runnable(reinterpret_cast<uintptr_t>(this));
+  }
+
   if (LIKELY(threadId_ == localThreadId())) {
     fiberManager_.readyFibers_.push_back(*this);
     fiberManager_.ensureLoopScheduled();
index 97df9cab20b7cdbdf0926c85873d2bac2f9c2f79..c0f0d557def4aeeedec747a9285a3a2cfaeb4012 100644 (file)
@@ -64,7 +64,7 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {
          fiber->state_ == Fiber::READY_TO_RUN);
   currentFiber_ = fiber;
   if (observer_) {
-    observer_->starting();
+    observer_->starting(reinterpret_cast<uintptr_t>(fiber));
   }
 
   while (fiber->state_ == Fiber::NOT_STARTED ||
@@ -86,7 +86,7 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {
     awaitFunc_(*fiber);
     awaitFunc_ = nullptr;
     if (observer_) {
-      observer_->stopped();
+      observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
     }
     currentFiber_ = nullptr;
   } else if (fiber->state_ == Fiber::INVALID) {
@@ -107,7 +107,7 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {
     }
     // Make sure LocalData is not accessible from its destructor
     if (observer_) {
-      observer_->stopped();
+      observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
     }
     currentFiber_ = nullptr;
     fiber->localData_.reset();
@@ -122,7 +122,7 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {
     }
   } else if (fiber->state_ == Fiber::YIELDED) {
     if (observer_) {
-      observer_->stopped();
+      observer_->stopped(reinterpret_cast<uintptr_t>(fiber));
     }
     currentFiber_ = nullptr;
     fiber->state_ = Fiber::READY_TO_RUN;
@@ -168,12 +168,20 @@ inline bool FiberManager::loopUntilNoReady() {
 
         fiber->setFunction(std::move(task->func));
         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
+        if (observer_) {
+          observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
+        }
         runReadyFiber(fiber);
         hadRemoteFiber = true;
       }
     );
   }
 
+  if (observer_) {
+    for (auto& yielded : yieldedFibers_) {
+      observer_->runnable(reinterpret_cast<uintptr_t>(&yielded));
+    }
+  }
   readyFibers_.splice(readyFibers_.end(), yieldedFibers_);
 
   return fibersActive_ > 0;
@@ -233,6 +241,9 @@ void FiberManager::addTask(F&& func) {
 
   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
   readyFibers_.push_back(*fiber);
+  if (observer_) {
+    observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
+  }
 
   ensureLoopScheduled();
 }
@@ -364,6 +375,9 @@ void FiberManager::addTaskFinally(F&& func, G&& finally) {
 
   fiber->data_ = reinterpret_cast<intptr_t>(fiber);
   readyFibers_.push_back(*fiber);
+  if (observer_) {
+    observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
+  }
 
   ensureLoopScheduled();
 }
index 6ad7174dd91b81a90e14e1ca06910ef17678cd2b..445b65ad6e8f22db569321662e65e15f383e1e5d 100644 (file)
@@ -104,6 +104,9 @@ size_t FiberManager::stackHighWatermark() const {
 }
 
 void FiberManager::remoteReadyInsert(Fiber* fiber) {
+  if (observer_) {
+    observer_->runnable(reinterpret_cast<uintptr_t>(fiber));
+  }
   if (remoteReadyQueue_.insertHead(fiber)) {
     loopController_->scheduleThreadSafe();
   }