global io executor
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 15 Dec 2014 21:09:47 +0000 (13:09 -0800)
committerJoelMarcey <joelm@fb.com>
Thu, 18 Dec 2014 20:29:40 +0000 (12:29 -0800)
Summary:
This is something we've talked about for a while. It's also an alternative to the mechanism in D1714645.
If we like it, I'll do something similar for a global cpu executor. That functionality should probably just be
baked into Executor itself instead of a separate subclass, which is why the IOExecutor stuff is in Executor.h/.cpp,
because it'll be pretty similar. The main exception is that for getCPUExecutor() you could return a default global
InlineExecutor instead of exploding as in getIOExecutor()

Test Plan: wangle unit, will start plumbing this into the services in #5003045 if we like it

Reviewed By: davejwatson@fb.com

Subscribers: hannesr, trunkagent, fugalh, alandau, mshneer, folly-diffs@, bmatheny

FB internal diff: D1727894

Tasks: 5002442

Signature: t1:1727894:1418344077:9e54088a6acb3f78e53011a32fd1dfe8b3214c1d

folly/Makefile.am
folly/experimental/wangle/concurrent/GlobalExecutor.cpp [new file with mode: 0644]
folly/experimental/wangle/concurrent/GlobalExecutor.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/IOExecutor.cpp [new file with mode: 0644]
folly/experimental/wangle/concurrent/IOExecutor.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h
folly/experimental/wangle/concurrent/ThreadPoolExecutor.h
folly/experimental/wangle/concurrent/test/GlobalExecutorTest.cpp [new file with mode: 0644]

index 7f78fa6610a114c3050886fb20e54890ab34680a..2da845952bd288112432588cec82dda033941642 100644 (file)
@@ -82,6 +82,8 @@ nobase_follyinclude_HEADERS = \
        experimental/wangle/concurrent/Codel.h \
        experimental/wangle/concurrent/CPUThreadPoolExecutor.h \
        experimental/wangle/concurrent/FutureExecutor.h \
+       experimental/wangle/concurrent/GlobalExecutor.h \
+       experimental/wangle/concurrent/IOExecutor.h \
        experimental/wangle/concurrent/IOThreadPoolExecutor.h \
        experimental/wangle/concurrent/LifoSemMPMCQueue.h \
        experimental/wangle/concurrent/NamedThreadFactory.h \
@@ -328,6 +330,8 @@ libfolly_la_SOURCES = \
        experimental/TestUtil.cpp \
        experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp \
        experimental/wangle/concurrent/Codel.cpp \
+       experimental/wangle/concurrent/GlobalExecutor.cpp \
+       experimental/wangle/concurrent/IOExecutor.cpp \
        experimental/wangle/concurrent/IOThreadPoolExecutor.cpp \
        experimental/wangle/concurrent/ThreadPoolExecutor.cpp \
        experimental/wangle/ConnectionManager.cpp \
diff --git a/folly/experimental/wangle/concurrent/GlobalExecutor.cpp b/folly/experimental/wangle/concurrent/GlobalExecutor.cpp
new file mode 100644 (file)
index 0000000..b0efd4f
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/Singleton.h>
+#include <folly/experimental/wangle/concurrent/IOExecutor.h>
+#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
+
+using namespace folly;
+using namespace folly::wangle;
+
+namespace {
+
+Singleton<IOThreadPoolExecutor> globalIOThreadPoolSingleton(
+    "GlobalIOThreadPool",
+    [](){
+      return new IOThreadPoolExecutor(
+          sysconf(_SC_NPROCESSORS_ONLN),
+          std::make_shared<NamedThreadFactory>("GlobalIOThreadPool"));
+    });
+
+}
+
+namespace folly { namespace wangle {
+
+IOExecutor* getIOExecutor() {
+  auto singleton = IOExecutor::getSingleton();
+  auto executor = singleton->load();
+  while (!executor) {
+    IOExecutor* nullIOExecutor = nullptr;
+    singleton->compare_exchange_strong(
+        nullIOExecutor,
+        Singleton<IOThreadPoolExecutor>::get("GlobalIOThreadPool"));
+    executor = singleton->load();
+  }
+  return executor;
+}
+
+void setIOExecutor(IOExecutor* executor) {
+  IOExecutor::getSingleton()->store(executor);
+}
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/GlobalExecutor.h b/folly/experimental/wangle/concurrent/GlobalExecutor.h
new file mode 100644 (file)
index 0000000..cac76be
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace folly { namespace wangle {
+
+class IOExecutor;
+
+// Retrieve the global IOExecutor. If there is none, a default
+// IOThreadPoolExecutor will be constructed and returned.
+IOExecutor* getIOExecutor();
+
+// Set an IOExecutor to be the global IOExecutor which will be returned by
+// subsequent calls to getIOExecutor(). IOExecutors will uninstall themselves
+// as global when they are destructed.
+void setIOExecutor(IOExecutor* executor);
+
+}}
diff --git a/folly/experimental/wangle/concurrent/IOExecutor.cpp b/folly/experimental/wangle/concurrent/IOExecutor.cpp
new file mode 100644 (file)
index 0000000..d3985c9
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/wangle/concurrent/IOExecutor.h>
+
+#include <folly/experimental/Singleton.h>
+#include <folly/experimental/wangle/concurrent/GlobalExecutor.h>
+
+using folly::Singleton;
+using folly::wangle::IOExecutor;
+
+namespace {
+
+Singleton<std::atomic<IOExecutor*>> globalIOExecutorSingleton(
+    "GlobalIOExecutor",
+    [](){
+      return new std::atomic<IOExecutor*>(nullptr);
+    });
+
+}
+
+namespace folly { namespace wangle {
+
+IOExecutor::~IOExecutor() {
+  auto thisCopy = this;
+  try {
+    getSingleton()->compare_exchange_strong(thisCopy, nullptr);
+  } catch (const std::runtime_error& e) {
+    // The global IOExecutor singleton was already destructed so doesn't need to
+    // be restored. Ignore.
+  }
+}
+
+std::atomic<IOExecutor*>* IOExecutor::getSingleton() {
+  return Singleton<std::atomic<IOExecutor*>>::get("GlobalIOExecutor");
+}
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/IOExecutor.h b/folly/experimental/wangle/concurrent/IOExecutor.h
new file mode 100644 (file)
index 0000000..14eb664
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <folly/Executor.h>
+
+namespace folly {
+class EventBase;
+}
+
+namespace folly { namespace wangle {
+
+// An IOExecutor is an executor that operates on at least one EventBase.  One of
+// these EventBases should be accessible via getEventBase(). The event base
+// returned by a call to getEventBase() is implementation dependent.
+//
+// Note that IOExecutors don't necessarily loop on the base themselves - for
+// instance, EventBase itself is an IOExecutor but doesn't drive itself.
+//
+// Implementations of IOExecutor are eligible to become the global IO executor,
+// returned on every call to getIOExecutor(), via setIOExecutor().
+// These functions are declared in GlobalExecutor.h
+//
+// If getIOExecutor is called and none has been set, a default global
+// IOThreadPoolExecutor will be created and returned.
+class IOExecutor : public virtual Executor {
+ public:
+  virtual ~IOExecutor();
+  virtual EventBase* getEventBase() = 0;
+
+ private:
+  static std::atomic<IOExecutor*>* getSingleton();
+  friend IOExecutor* getIOExecutor();
+  friend void setIOExecutor(IOExecutor* executor);
+};
+
+}}
index d60472fca183ded126e8be3ee00b7736f0f2cac7..73b5c61f26377c3139107fb4d10e24a946e4cfb2 100644 (file)
@@ -88,8 +88,7 @@ void IOThreadPoolExecutor::add(
   if (threadList_.get().empty()) {
     throw std::runtime_error("No threads available");
   }
-  auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
-  auto ioThread = std::static_pointer_cast<IOThread>(thread);
+  auto ioThread = pickThread();
 
   auto moveTask = folly::makeMoveWrapper(
       Task(std::move(func), expiration, std::move(expireCallback)));
@@ -105,6 +104,19 @@ void IOThreadPoolExecutor::add(
   }
 }
 
+std::shared_ptr<IOThreadPoolExecutor::IOThread>
+IOThreadPoolExecutor::pickThread() {
+  if (*thisThread_) {
+    return *thisThread_;
+  }
+  auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
+  return std::static_pointer_cast<IOThread>(thread);
+}
+
+EventBase* IOThreadPoolExecutor::getEventBase() {
+  return pickThread()->eventBase;
+}
+
 std::shared_ptr<ThreadPoolExecutor::Thread>
 IOThreadPoolExecutor::makeThread() {
   return std::make_shared<IOThread>(this);
@@ -114,6 +126,7 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
   const auto ioThread = std::static_pointer_cast<IOThread>(thread);
   ioThread->eventBase =
     folly::EventBaseManager::get()->getEventBase();
+  thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
 
   auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
   ioThread->eventBase->runBeforeLoop(idler);
index 0dcfd2eff071bdd71cfac072156238fcb28545f2..0fde4a2953974f4cb06cba32cd9c77accbba0952 100644 (file)
@@ -15,6 +15,8 @@
  */
 
 #pragma once
+
+#include <folly/experimental/wangle/concurrent/IOExecutor.h>
 #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
 #include <folly/io/async/EventBase.h>
 
@@ -22,7 +24,7 @@ namespace folly { namespace wangle {
 
 // N.B. For this thread pool, stop() behaves like join() because outstanding
 // tasks belong to the event base and will be executed upon its destruction.
-class IOThreadPoolExecutor : public ThreadPoolExecutor {
+class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {
  public:
   explicit IOThreadPoolExecutor(
       size_t numThreads,
@@ -37,12 +39,9 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
       std::chrono::milliseconds expiration,
       Func expireCallback = nullptr) override;
 
- private:
-  ThreadPtr makeThread() override;
-  void threadRun(ThreadPtr thread) override;
-  void stopThreads(size_t n) override;
-  uint64_t getPendingTaskCount() override;
+  EventBase* getEventBase() override;
 
+ private:
   struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
     IOThread(IOThreadPoolExecutor* pool)
       : Thread(pool),
@@ -53,7 +52,14 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor {
     EventBase* eventBase;
   };
 
+  ThreadPtr makeThread() override;
+  std::shared_ptr<IOThread> pickThread();
+  void threadRun(ThreadPtr thread) override;
+  void stopThreads(size_t n) override;
+  uint64_t getPendingTaskCount() override;
+
   size_t nextThread_;
+  ThreadLocal<std::shared_ptr<IOThread>> thisThread_;
 };
 
 }} // folly::wangle
index 451f164f54497d2f528a3e7b45b3f84269b0398b..88aa1bc70a22d02609263bd4a61bf1f1d6b12177 100644 (file)
@@ -31,7 +31,7 @@
 
 namespace folly { namespace wangle {
 
-class ThreadPoolExecutor : public Executor {
+class ThreadPoolExecutor : public virtual Executor {
  public:
   explicit ThreadPoolExecutor(
       size_t numThreads,
diff --git a/folly/experimental/wangle/concurrent/test/GlobalExecutorTest.cpp b/folly/experimental/wangle/concurrent/test/GlobalExecutorTest.cpp
new file mode 100644 (file)
index 0000000..f0f678d
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <folly/experimental/wangle/concurrent/GlobalExecutor.h>
+#include <folly/experimental/wangle/concurrent/IOExecutor.h>
+
+using namespace folly::wangle;
+
+TEST(GlobalExecutorTest, GlobalIOExecutor) {
+  class DummyExecutor : public IOExecutor {
+   public:
+    void add(folly::Func f) override {
+      count++;
+    }
+    folly::EventBase* getEventBase() override {
+      return nullptr;
+    }
+    int count{0};
+  };
+
+  auto f = [](){};
+
+  // Don't explode, we should create the default global IOExecutor lazily here.
+  getIOExecutor()->add(f);
+
+  {
+    DummyExecutor dummy;
+    setIOExecutor(&dummy);
+    getIOExecutor()->add(f);
+    // Make sure we were properly installed.
+    EXPECT_EQ(1, dummy.count);
+  }
+
+  // Don't explode, we should restore the default global IOExecutor when dummy
+  // is destructed.
+  getIOExecutor()->add(f);
+}