global executors with weak_ptr semantics
authorJames Sedgwick <jsedgwick@fb.com>
Wed, 14 Jan 2015 15:08:50 +0000 (07:08 -0800)
committerwoo <woo@fb.com>
Mon, 2 Feb 2015 21:09:41 +0000 (13:09 -0800)
Summary:
unfortunately, can't use atomics now that a weak_ptr is stored instead of a raw ptr, hence the additional singleton locks
might want to make an overload of via() that takes shared_ptr to integrate more easily? or change via() to store a shared_ptr (and make it non-owning if a raw ptr is passed)

Test Plan: unit

Reviewed By: davejwatson@fb.com

Subscribers: jsedgwick, trunkagent, fugalh, folly-diffs@

FB internal diff: D1764359

Tasks: 5002442

Signature: t1:1764359:1420845340:349ea88091d7ca4ee386b54aec599647341fadd4

folly/Executor.h
folly/Makefile.am
folly/wangle/concurrent/GlobalExecutor.cpp
folly/wangle/concurrent/GlobalExecutor.h
folly/wangle/concurrent/IOExecutor.cpp [deleted file]
folly/wangle/concurrent/IOExecutor.h
folly/wangle/concurrent/test/GlobalExecutorTest.cpp

index 5ba4844b5f1c4843b9117b60330b96c4624e696a..4bacba7b2bc017d67860bd0ea710970068d48dda 100644 (file)
@@ -16,6 +16,7 @@
 
 #pragma once
 
+#include <atomic>
 #include <functional>
 
 namespace folly {
index d717d5fd8d311db1059e789df69abdc66f5f8b97..c3b8ca9d38adfcf9d233710601fc6ecfb687ec3c 100644 (file)
@@ -343,7 +343,6 @@ libfolly_la_SOURCES = \
        wangle/acceptor/TransportInfo.cpp \
        wangle/concurrent/CPUThreadPoolExecutor.cpp \
        wangle/concurrent/Codel.cpp \
-       wangle/concurrent/IOExecutor.cpp \
        wangle/concurrent/IOThreadPoolExecutor.cpp \
        wangle/concurrent/GlobalExecutor.cpp \
        wangle/concurrent/ThreadPoolExecutor.cpp \
index 35455921aa7e7b652a07721d4574231b0d0dca0b..eb97f06dde252edffece337c996360fb153bb329 100644 (file)
 #include <folly/experimental/Singleton.h>
 #include <folly/wangle/concurrent/IOExecutor.h>
 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
+#include <folly/futures/InlineExecutor.h>
 
 using namespace folly;
 using namespace folly::wangle;
 
 namespace {
 
-Singleton<IOThreadPoolExecutor> globalIOThreadPoolSingleton(
-    [](){
-      return new IOThreadPoolExecutor(
-          sysconf(_SC_NPROCESSORS_ONLN),
-          std::make_shared<NamedThreadFactory>("GlobalIOThreadPool"));
+// lock protecting global CPU executor
+struct CPUExecutorLock {};
+Singleton<RWSpinLock, CPUExecutorLock> globalCPUExecutorLock;
+// global CPU executor
+Singleton<std::weak_ptr<Executor>> globalCPUExecutor;
+// default global CPU executor is an InlineExecutor
+Singleton<std::shared_ptr<InlineExecutor>> globalInlineExecutor(
+    []{
+      return new std::shared_ptr<InlineExecutor>(
+          std::make_shared<InlineExecutor>());
+    });
+
+// lock protecting global IO executor
+struct IOExecutorLock {};
+Singleton<RWSpinLock, IOExecutorLock> globalIOExecutorLock;
+// global IO executor
+Singleton<std::weak_ptr<IOExecutor>> globalIOExecutor;
+// default global IO executor is an IOThreadPoolExecutor
+Singleton<std::shared_ptr<IOThreadPoolExecutor>> globalIOThreadPool(
+    []{
+      return new std::shared_ptr<IOThreadPoolExecutor>(
+          std::make_shared<IOThreadPoolExecutor>(
+              sysconf(_SC_NPROCESSORS_ONLN),
+              std::make_shared<NamedThreadFactory>("GlobalIOThreadPool")));
     });
 
 }
 
 namespace folly { namespace wangle {
 
-IOExecutor* getIOExecutor() {
-  auto singleton = IOExecutor::getSingleton();
-  auto executor = singleton->load();
-  while (!executor) {
-    IOExecutor* nullIOExecutor = nullptr;
-    singleton->compare_exchange_strong(
-        nullIOExecutor,
-        globalIOThreadPoolSingleton.get_fast());
-    executor = singleton->load();
+template <class Exe, class DefaultExe, class LockTag>
+std::shared_ptr<Exe> getExecutor(
+    Singleton<std::weak_ptr<Exe>>& sExecutor,
+    Singleton<std::shared_ptr<DefaultExe>>& sDefaultExecutor,
+    Singleton<RWSpinLock, LockTag>& sExecutorLock) {
+  std::shared_ptr<Exe> executor;
+  auto singleton = sExecutor.get_fast();
+  auto lock = sExecutorLock.get_fast();
+
+  {
+    RWSpinLock::ReadHolder guard(lock);
+    if ((executor = sExecutor->lock())) {
+      return executor;
+    }
+  }
+
+
+  RWSpinLock::WriteHolder guard(lock);
+  executor = singleton->lock();
+  if (!executor) {
+    executor = *sDefaultExecutor.get_fast();
+    *singleton = executor;
   }
   return executor;
 }
 
-void setIOExecutor(IOExecutor* executor) {
-  IOExecutor::getSingleton()->store(executor);
+template <class Exe, class LockTag>
+void setExecutor(
+    std::shared_ptr<Exe> executor,
+    Singleton<std::weak_ptr<Exe>>& sExecutor,
+    Singleton<RWSpinLock, LockTag>& sExecutorLock) {
+  RWSpinLock::WriteHolder guard(sExecutorLock.get_fast());
+  *sExecutor.get_fast() = std::move(executor);
+}
+
+std::shared_ptr<Executor> getCPUExecutor() {
+  return getExecutor(
+      globalCPUExecutor,
+      globalInlineExecutor,
+      globalCPUExecutorLock);
+}
+
+void setCPUExecutor(std::shared_ptr<Executor> executor) {
+  setExecutor(
+      std::move(executor),
+      globalCPUExecutor,
+      globalCPUExecutorLock);
+}
+
+std::shared_ptr<IOExecutor> getIOExecutor() {
+  return getExecutor(
+      globalIOExecutor,
+      globalIOThreadPool,
+      globalIOExecutorLock);
+}
+
+void setIOExecutor(std::shared_ptr<IOExecutor> executor) {
+  setExecutor(
+      std::move(executor),
+      globalIOExecutor,
+      globalIOExecutorLock);
 }
 
 }} // folly::wangle
index cac76be8cabf03534b0a8f6a2e9d8e8dd3c7de99..08df1c4690390b944d4fec618861922b3a9b3109 100644 (file)
 
 #pragma once
 
+#include <memory>
+
+namespace folly {
+class Executor;
+}
+
 namespace folly { namespace wangle {
 
+// Retrieve the global Executor. If there is none, a default InlineExecutor
+// will be constructed and returned. This is named CPUExecutor to distinguish
+// it from IOExecutor below and to hint that it's intended for CPU-bound tasks.
+std::shared_ptr<Executor> getCPUExecutor();
+
+// Set an Executor to be the global Executor which will be returned by
+// subsequent calls to getCPUExecutor(). Takes a non-owning (weak) reference.
+void setCPUExecutor(std::shared_ptr<Executor> executor);
+
+// IOExecutors differ from Executors in that they drive and provide access to
+// one or more EventBases.
 class IOExecutor;
 
 // Retrieve the global IOExecutor. If there is none, a default
 // IOThreadPoolExecutor will be constructed and returned.
-IOExecutor* getIOExecutor();
+std::shared_ptr<IOExecutor> getIOExecutor();
 
 // Set an IOExecutor to be the global IOExecutor which will be returned by
-// subsequent calls to getIOExecutor(). IOExecutors will uninstall themselves
-// as global when they are destructed.
-void setIOExecutor(IOExecutor* executor);
+// subsequent calls to getIOExecutor(). Takes a non-owning (weak) reference.
+void setIOExecutor(std::shared_ptr<IOExecutor> executor);
 
 }}
diff --git a/folly/wangle/concurrent/IOExecutor.cpp b/folly/wangle/concurrent/IOExecutor.cpp
deleted file mode 100644 (file)
index d1b3283..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2014 Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <folly/wangle/concurrent/IOExecutor.h>
-
-#include <folly/experimental/Singleton.h>
-#include <folly/wangle/concurrent/GlobalExecutor.h>
-
-using folly::Singleton;
-using folly::wangle::IOExecutor;
-
-namespace {
-
-Singleton<std::atomic<IOExecutor*>> globalIOExecutorSingleton(
-    [](){
-      return new std::atomic<IOExecutor*>(nullptr);
-    });
-
-}
-
-namespace folly { namespace wangle {
-
-IOExecutor::~IOExecutor() {
-  auto thisCopy = this;
-  try {
-    getSingleton()->compare_exchange_strong(thisCopy, nullptr);
-  } catch (const std::runtime_error& e) {
-    // The global IOExecutor singleton was already destructed so doesn't need to
-    // be restored. Ignore.
-  }
-}
-
-std::atomic<IOExecutor*>* IOExecutor::getSingleton() {
-  return globalIOExecutorSingleton.get_fast();
-}
-
-}} // folly::wangle
index 14eb66435a6024ae1838c5fc2b1f8a0d302964a3..0fc4f5c7890ba5a1b0f356b79f338f77a3448b5c 100644 (file)
@@ -40,13 +40,8 @@ namespace folly { namespace wangle {
 // IOThreadPoolExecutor will be created and returned.
 class IOExecutor : public virtual Executor {
  public:
-  virtual ~IOExecutor();
+  virtual ~IOExecutor() {}
   virtual EventBase* getEventBase() = 0;
-
- private:
-  static std::atomic<IOExecutor*>* getSingleton();
-  friend IOExecutor* getIOExecutor();
-  friend void setIOExecutor(IOExecutor* executor);
 };
 
 }}
index a601b0c191b6f42c97ce175d349e97384b8ac427..4539bd268f972ff99e75cb11921eb610b4bc3c67 100644 (file)
 
 using namespace folly::wangle;
 
+TEST(GlobalExecutorTest, GlobalCPUExecutor) {
+  class DummyExecutor : public folly::Executor {
+   public:
+    void add(folly::Func f) override {
+      f();
+      count++;
+    }
+    int count{0};
+  };
+
+  // The default CPU executor is a synchronous inline executor, lets verify
+  // that work we add is executed
+  auto count = 0;
+  auto f = [&](){ count++; };
+
+  // Don't explode, we should create the default global CPUExecutor lazily here.
+  getCPUExecutor()->add(f);
+  EXPECT_EQ(1, count);
+
+  {
+    auto dummy = std::make_shared<DummyExecutor>();
+    setCPUExecutor(dummy);
+    getCPUExecutor()->add(f);
+    // Make sure we were properly installed.
+    EXPECT_EQ(1, dummy->count);
+    EXPECT_EQ(2, count);
+  }
+
+  // Don't explode, we should restore the default global CPUExecutor because our
+  // weak reference to dummy has expired
+  getCPUExecutor()->add(f);
+  EXPECT_EQ(3, count);
+}
+
 TEST(GlobalExecutorTest, GlobalIOExecutor) {
   class DummyExecutor : public IOExecutor {
    public:
@@ -38,14 +72,14 @@ TEST(GlobalExecutorTest, GlobalIOExecutor) {
   getIOExecutor()->add(f);
 
   {
-    DummyExecutor dummy;
-    setIOExecutor(&dummy);
+    auto dummy = std::make_shared<DummyExecutor>();
+    setIOExecutor(dummy);
     getIOExecutor()->add(f);
     // Make sure we were properly installed.
-    EXPECT_EQ(1, dummy.count);
+    EXPECT_EQ(1, dummy->count);
   }
 
-  // Don't explode, we should restore the default global IOExecutor when dummy
-  // is destructed.
+  // Don't explode, we should restore the default global IOExecutor because our
+  // weak reference to dummy has expired
   getIOExecutor()->add(f);
 }