Fiber-local context
authorAndrii Grynenko <andrii@fb.com>
Fri, 3 Apr 2015 00:17:46 +0000 (17:17 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Fri, 10 Apr 2015 03:33:35 +0000 (20:33 -0700)
Summary: This adds fiber-local context, which behaves more like static objects+fork rather than thread-locals.

Test Plan: unit test

Reviewed By: pavlo@fb.com, bwatling@fb.com

Subscribers: rushix, alikhtarov, bwatling

FB internal diff: D1958135

Signature: t1:1958135:1427999426:8e4b89f4af53a1a119b2e5a765fb549dd8442c50

folly/experimental/fibers/Fiber-inl.h
folly/experimental/fibers/Fiber.cpp
folly/experimental/fibers/Fiber.h
folly/experimental/fibers/FiberManager-inl.h
folly/experimental/fibers/FiberManager.h
folly/experimental/fibers/test/FibersTest.cpp

index adb78a491075f4cf1f47a3c0c16da02b38148c21..8823caf6cf5dd43c48c7794ce40c6fa59797c72e 100644 (file)
@@ -45,4 +45,43 @@ void Fiber::setReadyFunction(G&& func) {
   readyFunc_ = std::move(func);
 }
 
+template <typename T>
+T& Fiber::LocalData::get() {
+  if (data_) {
+    assert(*dataType_ == typeid(T));
+    return *reinterpret_cast<T*>(data_);
+  }
+
+  dataSize_ = sizeof(T);
+  dataType_ = &typeid(T);
+  if (sizeof(T) <= kBufferSize) {
+    dataDestructor_ = dataBufferDestructor<T>;
+    data_ = &buffer_;
+  } else {
+    dataDestructor_ = dataHeapDestructor<T>;
+    data_ = allocateHeapBuffer(dataSize_);
+  }
+  dataCopyConstructor_ = dataCopyConstructor<T>;
+
+  new (reinterpret_cast<T*>(data_)) T();
+
+  return *reinterpret_cast<T*>(data_);
+}
+
+template <typename T>
+void Fiber::LocalData::dataCopyConstructor(void* ptr, const void* other) {
+  new (reinterpret_cast<T*>(ptr)) T(*reinterpret_cast<const T*>(other));
+}
+
+template <typename T>
+void Fiber::LocalData::dataBufferDestructor(void* ptr) {
+  reinterpret_cast<T*>(ptr)->~T();
+}
+
+template <typename T>
+void Fiber::LocalData::dataHeapDestructor(void *ptr) {
+  reinterpret_cast<T*>(ptr)->~T();
+  freeHeapBuffer(ptr);
+}
+
 }}  // folly::fibers
index d738f67f6778ad108999f9f754b3d245f2de2aba..f4958c1ee5b96a2981797c7dedcfcd45d82030fe 100644 (file)
@@ -176,4 +176,47 @@ intptr_t Fiber::preempt(State state) {
   return ret;
 }
 
+Fiber::LocalData::LocalData(const LocalData& other) : data_(nullptr) {
+  *this = other;
+}
+
+Fiber::LocalData& Fiber::LocalData::operator=(const LocalData& other) {
+  reset();
+  if (!other.data_) {
+    return *this;
+  }
+
+  dataSize_ = other.dataSize_;
+  dataType_ = other.dataType_;
+  dataDestructor_ = other.dataDestructor_;
+  dataCopyConstructor_ = other.dataCopyConstructor_;
+
+  if (dataSize_ <= kBufferSize) {
+    data_ = &buffer_;
+  } else {
+    data_ = allocateHeapBuffer(dataSize_);
+  }
+
+  dataCopyConstructor_(data_, other.data_);
+
+  return *this;
+}
+
+void Fiber::LocalData::reset() {
+  if (!data_) {
+    return;
+  }
+
+  dataDestructor_(data_);
+  data_ = nullptr;
+}
+
+void* Fiber::LocalData::allocateHeapBuffer(size_t size) {
+  return new char[size];
+}
+
+void Fiber::LocalData::freeHeapBuffer(void* buffer) {
+  delete[] reinterpret_cast<char*>(buffer);
+}
+
 }}
index d18bb78253c319f97fd97eaec768d5e0b86ae5be..e3a38d554fe2e82c0f12dd21cd471b3b4f827431 100644 (file)
@@ -16,6 +16,7 @@
 #pragma once
 
 #include <functional>
+#include <typeinfo>
 
 #include <boost/context/all.hpp>
 #include <boost/version.hpp>
@@ -116,6 +117,40 @@ class Fiber {
   std::function<void()> resultFunc_;
   std::function<void()> finallyFunc_;
 
+  class LocalData {
+   public:
+    LocalData() {}
+    LocalData(const LocalData& other);
+    LocalData& operator=(const LocalData& other);
+
+    template <typename T>
+    T& get();
+
+    void reset();
+
+    //private:
+    static void* allocateHeapBuffer(size_t size);
+    static void freeHeapBuffer(void* buffer);
+
+    template <typename T>
+    static void dataCopyConstructor(void*, const void*);
+    template <typename T>
+    static void dataBufferDestructor(void*);
+    template <typename T>
+    static void dataHeapDestructor(void*);
+
+    static constexpr size_t kBufferSize = 128;
+    std::aligned_storage<kBufferSize>::type buffer_;
+    size_t dataSize_;
+
+    const std::type_info* dataType_;
+    void (*dataDestructor_)(void*);
+    void (*dataCopyConstructor_)(void*, const void*);
+    void* data_{nullptr};
+  };
+
+  LocalData localData_;
+
   folly::IntrusiveListHook listHook_; /**< list hook for different FiberManager
                                            queues */
   pid_t threadId_{0};
index 9bf6eb6b18284bcbd7cfc9a8ddf895fb7fae657b..cf963ee063e9b87805ed1e450d4b4a581dee2f40 100644 (file)
@@ -41,8 +41,9 @@ inline void FiberManager::ensureLoopScheduled() {
 inline void FiberManager::runReadyFiber(Fiber* fiber) {
   assert(fiber->state_ == Fiber::NOT_STARTED ||
          fiber->state_ == Fiber::READY_TO_RUN);
+  currentFiber_ = fiber;
 
-   while (fiber->state_ == Fiber::NOT_STARTED ||
+  while (fiber->state_ == Fiber::NOT_STARTED ||
          fiber->state_ == Fiber::READY_TO_RUN) {
     activeFiber_ = fiber;
     if (fiber->readyFunc_) {
@@ -79,6 +80,7 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {
       }
       fiber->finallyFunc_ = nullptr;
     }
+    fiber->localData_.reset();
 
     if (fibersPoolSize_ < options_.maxFibersPoolSize) {
       fibersPool_.push_front(*fiber);
@@ -89,6 +91,7 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {
       --fibersAllocated_;
     }
   }
+  currentFiber_ = nullptr;
 }
 
 inline bool FiberManager::loopUntilNoReady() {
@@ -120,6 +123,10 @@ inline bool FiberManager::loopUntilNoReady() {
       [this, &hadRemoteFiber] (RemoteTask* taskPtr) {
         std::unique_ptr<RemoteTask> task(taskPtr);
         auto fiber = getFiber();
+        if (task->localData) {
+          fiber->localData_ = *task->localData;
+        }
+
         fiber->setFunction(std::move(task->func));
         fiber->data_ = reinterpret_cast<intptr_t>(fiber);
         runReadyFiber(fiber);
@@ -170,6 +177,9 @@ void FiberManager::addTask(F&& func) {
   typedef AddTaskHelper<F> Helper;
 
   auto fiber = getFiber();
+  if (currentFiber_) {
+    fiber->localData_ = currentFiber_->localData_;
+  }
 
   if (Helper::allocateInBuffer) {
     auto funcLoc = static_cast<typename Helper::Func*>(fiber->getUserBuffer());
@@ -191,6 +201,10 @@ void FiberManager::addTask(F&& func) {
 template <typename F, typename G>
 void FiberManager::addTaskReadyFunc(F&& func, G&& readyFunc) {
   auto fiber = getFiber();
+  if (currentFiber_) {
+    fiber->localData_ = currentFiber_->localData_;
+  }
+
   fiber->setFunction(std::forward<F>(func));
   fiber->setReadyFunction(std::forward<G>(readyFunc));
 
@@ -202,7 +216,15 @@ void FiberManager::addTaskReadyFunc(F&& func, G&& readyFunc) {
 
 template <typename F>
 void FiberManager::addTaskRemote(F&& func) {
-  auto task = folly::make_unique<RemoteTask>(std::move(func));
+  auto task = [&]() {
+    auto currentFm = getFiberManagerUnsafe();
+    if (currentFm && currentFm->currentFiber_) {
+      return folly::make_unique<RemoteTask>(
+        std::forward<F>(func),
+        currentFm->currentFiber_->localData_);
+    }
+    return folly::make_unique<RemoteTask>(std::forward<F>(func));
+  }();
   if (remoteTaskQueue_.insertHead(task.release())) {
     loopController_->scheduleThreadSafe();
   }
@@ -294,6 +316,9 @@ void FiberManager::addTaskFinally(F&& func, G&& finally) {
     "finally(Try<T>&&): T must be convertible from func()'s return type");
 
   auto fiber = getFiber();
+  if (currentFiber_) {
+    fiber->localData_ = currentFiber_->localData_;
+  }
 
   typedef AddTaskFinallyHelper<F,G> Helper;
 
@@ -375,6 +400,12 @@ inline bool FiberManager::hasActiveFiber() {
   return activeFiber_ != nullptr;
 }
 
+template <typename T>
+T& FiberManager::local() {
+  assert(getFiberManager().currentFiber_ != nullptr);
+  return currentFiber_->localData_.get<T>();
+}
+
 template <typename F>
 typename FirstArgOf<F>::type::value_type
 inline await(F&& func) {
index aba56173230082b2b5dd7df6eb8105fb7e1882d6..e0fbf71c80765f796b6d87f89ea71e9cb8763f16 100644 (file)
@@ -179,6 +179,16 @@ class FiberManager {
   typename std::result_of<F()>::type
   runInMainContext(F&& func);
 
+  /**
+   * Returns a refference to a fiber-local context for given Fiber. Should be
+   * always called with the same T for each fiber. Fiber-local context is lazily
+   * default-constructed on first request.
+   * When new task is scheduled via addTask / addTaskRemote from a fiber its
+   * fiber-local context is copied into the new fiber.
+   */
+  template <typename T>
+  T& local();
+
   /**
    * @return How many fiber objects (and stacks) has this manager allocated.
    */
@@ -213,14 +223,24 @@ class FiberManager {
 
   struct RemoteTask {
     template <typename F>
-    explicit RemoteTask(F&& f) : func(std::move(f)) {}
+    explicit RemoteTask(F&& f) : func(std::forward<F>(f)) {}
+    template <typename F>
+    RemoteTask(F&& f, const Fiber::LocalData& localData_) :
+        func(std::forward<F>(f)),
+        localData(folly::make_unique<Fiber::LocalData>(localData_)) {}
     std::function<void()> func;
-    folly::AtomicLinkedListHook<RemoteTask> nextRemoteTask;
+    std::unique_ptr<Fiber::LocalData> localData;
+    AtomicLinkedListHook<RemoteTask> nextRemoteTask;
   };
 
   typedef folly::IntrusiveList<Fiber, &Fiber::listHook_> FiberTailQueue;
 
   Fiber* activeFiber_{nullptr}; /**< active fiber, nullptr on main context */
+  /**
+   * Same as active fiber, but also set for functions run from fiber on main
+   * context.
+   */
+  Fiber* currentFiber_{nullptr};
 
   FiberTailQueue readyFibers_;  /**< queue of fibers ready to be executed */
   FiberTailQueue fibersPool_;   /**< pool of unitialized Fiber objects */
@@ -374,6 +394,18 @@ inline runInMainContext(F&& func) {
   return fm->runInMainContext(std::forward<F>(func));
 }
 
+/**
+ * Returns a refference to a fiber-local context for given Fiber. Should be
+ * always called with the same T for each fiber. Fiber-local context is lazily
+ * default-constructed on first request.
+ * When new task is scheduled via addTask / addTaskRemote from a fiber its
+ * fiber-local context is copied into the new fiber.
+ */
+template <typename T>
+T& local() {
+  return FiberManager::getFiberManager().local<T>();
+}
+
 }}
 
 #include "FiberManager-inl.h"
index a0709630da5886d6eb0a4b21843dc12034690a02..0b79cafa94a1e5b669af90a7c09c50f907b907e8 100644 (file)
@@ -1215,6 +1215,71 @@ TEST(FiberManager, remoteHasReadyTasks) {
   EXPECT_EQ(result, 47);
 }
 
+template <typename Data>
+void testFiberLocal() {
+  FiberManager fm(folly::make_unique<SimpleLoopController>());
+
+  fm.addTask([]() {
+      EXPECT_EQ(42, local<Data>().value);
+
+      local<Data>().value = 43;
+
+      addTask([]() {
+          EXPECT_EQ(43, local<Data>().value);
+
+          local<Data>().value = 44;
+
+          addTask([]() {
+              EXPECT_EQ(44, local<Data>().value);
+            });
+        });
+   });
+
+  fm.addTask([&]() {
+      EXPECT_EQ(42, local<Data>().value);
+
+      local<Data>().value = 43;
+
+      fm.addTaskRemote([]() {
+          EXPECT_EQ(43, local<Data>().value);
+        });
+    });
+
+  fm.addTask([]() {
+      EXPECT_EQ(42, local<Data>().value);
+      local<Data>().value = 43;
+
+      auto task = []() {
+        EXPECT_EQ(43, local<Data>().value);
+        local<Data>().value = 44;
+      };
+      std::vector<std::function<void()>> tasks{task};
+      whenAny(tasks.begin(), tasks.end());
+
+      EXPECT_EQ(43, local<Data>().value);
+    });
+
+  fm.loopUntilNoReady();
+  EXPECT_FALSE(fm.hasTasks());
+}
+
+TEST(FiberManager, fiberLocal) {
+  struct SimpleData {
+    int value{42};
+  };
+
+  testFiberLocal<SimpleData>();
+}
+
+TEST(FiberManager, fiberLocalHeap) {
+  struct LargeData {
+    char _[1024*1024];
+    int value{42};
+  };
+
+  testFiberLocal<LargeData>();
+}
+
 static size_t sNumAwaits;
 
 void runBenchmark(size_t numAwaits, size_t toSend) {