From: James Sedgwick Date: Mon, 15 Dec 2014 21:09:47 +0000 (-0800) Subject: global io executor X-Git-Tag: v0.22.0~102 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=3490aa71feab139521c48124f8fba360a0dfc88b;p=folly.git global io executor Summary: This is something we've talked about for a while. It's also an alternative to the mechanism in D1714645. If we like it, I'll do something similar for a global cpu executor. That functionality should probably just be baked into Executor itself instead of a separate subclass, which is why the IOExecutor stuff is in Executor.h/.cpp, because it'll be pretty similar. The main exception is that for getCPUExecutor() you could return a default global InlineExecutor instead of exploding as in getIOExecutor() Test Plan: wangle unit, will start plumbing this into the services in #5003045 if we like it Reviewed By: davejwatson@fb.com Subscribers: hannesr, trunkagent, fugalh, alandau, mshneer, folly-diffs@, bmatheny FB internal diff: D1727894 Tasks: 5002442 Signature: t1:1727894:1418344077:9e54088a6acb3f78e53011a32fd1dfe8b3214c1d --- diff --git a/folly/Makefile.am b/folly/Makefile.am index 7f78fa66..2da84595 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -82,6 +82,8 @@ nobase_follyinclude_HEADERS = \ experimental/wangle/concurrent/Codel.h \ experimental/wangle/concurrent/CPUThreadPoolExecutor.h \ experimental/wangle/concurrent/FutureExecutor.h \ + experimental/wangle/concurrent/GlobalExecutor.h \ + experimental/wangle/concurrent/IOExecutor.h \ experimental/wangle/concurrent/IOThreadPoolExecutor.h \ experimental/wangle/concurrent/LifoSemMPMCQueue.h \ experimental/wangle/concurrent/NamedThreadFactory.h \ @@ -328,6 +330,8 @@ libfolly_la_SOURCES = \ experimental/TestUtil.cpp \ experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp \ experimental/wangle/concurrent/Codel.cpp \ + experimental/wangle/concurrent/GlobalExecutor.cpp \ + experimental/wangle/concurrent/IOExecutor.cpp \ experimental/wangle/concurrent/IOThreadPoolExecutor.cpp \ experimental/wangle/concurrent/ThreadPoolExecutor.cpp \ experimental/wangle/ConnectionManager.cpp \ diff --git a/folly/experimental/wangle/concurrent/GlobalExecutor.cpp b/folly/experimental/wangle/concurrent/GlobalExecutor.cpp new file mode 100644 index 00000000..b0efd4f2 --- /dev/null +++ b/folly/experimental/wangle/concurrent/GlobalExecutor.cpp @@ -0,0 +1,55 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +using namespace folly; +using namespace folly::wangle; + +namespace { + +Singleton globalIOThreadPoolSingleton( + "GlobalIOThreadPool", + [](){ + return new IOThreadPoolExecutor( + sysconf(_SC_NPROCESSORS_ONLN), + std::make_shared("GlobalIOThreadPool")); + }); + +} + +namespace folly { namespace wangle { + +IOExecutor* getIOExecutor() { + auto singleton = IOExecutor::getSingleton(); + auto executor = singleton->load(); + while (!executor) { + IOExecutor* nullIOExecutor = nullptr; + singleton->compare_exchange_strong( + nullIOExecutor, + Singleton::get("GlobalIOThreadPool")); + executor = singleton->load(); + } + return executor; +} + +void setIOExecutor(IOExecutor* executor) { + IOExecutor::getSingleton()->store(executor); +} + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/GlobalExecutor.h b/folly/experimental/wangle/concurrent/GlobalExecutor.h new file mode 100644 index 00000000..cac76be8 --- /dev/null +++ b/folly/experimental/wangle/concurrent/GlobalExecutor.h @@ -0,0 +1,32 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace folly { namespace wangle { + +class IOExecutor; + +// Retrieve the global IOExecutor. If there is none, a default +// IOThreadPoolExecutor will be constructed and returned. +IOExecutor* getIOExecutor(); + +// Set an IOExecutor to be the global IOExecutor which will be returned by +// subsequent calls to getIOExecutor(). IOExecutors will uninstall themselves +// as global when they are destructed. +void setIOExecutor(IOExecutor* executor); + +}} diff --git a/folly/experimental/wangle/concurrent/IOExecutor.cpp b/folly/experimental/wangle/concurrent/IOExecutor.cpp new file mode 100644 index 00000000..d3985c99 --- /dev/null +++ b/folly/experimental/wangle/concurrent/IOExecutor.cpp @@ -0,0 +1,50 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include + +using folly::Singleton; +using folly::wangle::IOExecutor; + +namespace { + +Singleton> globalIOExecutorSingleton( + "GlobalIOExecutor", + [](){ + return new std::atomic(nullptr); + }); + +} + +namespace folly { namespace wangle { + +IOExecutor::~IOExecutor() { + auto thisCopy = this; + try { + getSingleton()->compare_exchange_strong(thisCopy, nullptr); + } catch (const std::runtime_error& e) { + // The global IOExecutor singleton was already destructed so doesn't need to + // be restored. Ignore. + } +} + +std::atomic* IOExecutor::getSingleton() { + return Singleton>::get("GlobalIOExecutor"); +} + +}} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/IOExecutor.h b/folly/experimental/wangle/concurrent/IOExecutor.h new file mode 100644 index 00000000..14eb6643 --- /dev/null +++ b/folly/experimental/wangle/concurrent/IOExecutor.h @@ -0,0 +1,52 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace folly { +class EventBase; +} + +namespace folly { namespace wangle { + +// An IOExecutor is an executor that operates on at least one EventBase. One of +// these EventBases should be accessible via getEventBase(). The event base +// returned by a call to getEventBase() is implementation dependent. +// +// Note that IOExecutors don't necessarily loop on the base themselves - for +// instance, EventBase itself is an IOExecutor but doesn't drive itself. +// +// Implementations of IOExecutor are eligible to become the global IO executor, +// returned on every call to getIOExecutor(), via setIOExecutor(). +// These functions are declared in GlobalExecutor.h +// +// If getIOExecutor is called and none has been set, a default global +// IOThreadPoolExecutor will be created and returned. +class IOExecutor : public virtual Executor { + public: + virtual ~IOExecutor(); + virtual EventBase* getEventBase() = 0; + + private: + static std::atomic* getSingleton(); + friend IOExecutor* getIOExecutor(); + friend void setIOExecutor(IOExecutor* executor); +}; + +}} diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp index d60472fc..73b5c61f 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp @@ -88,8 +88,7 @@ void IOThreadPoolExecutor::add( if (threadList_.get().empty()) { throw std::runtime_error("No threads available"); } - auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()]; - auto ioThread = std::static_pointer_cast(thread); + auto ioThread = pickThread(); auto moveTask = folly::makeMoveWrapper( Task(std::move(func), expiration, std::move(expireCallback))); @@ -105,6 +104,19 @@ void IOThreadPoolExecutor::add( } } +std::shared_ptr +IOThreadPoolExecutor::pickThread() { + if (*thisThread_) { + return *thisThread_; + } + auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()]; + return std::static_pointer_cast(thread); +} + +EventBase* IOThreadPoolExecutor::getEventBase() { + return pickThread()->eventBase; +} + std::shared_ptr IOThreadPoolExecutor::makeThread() { return std::make_shared(this); @@ -114,6 +126,7 @@ void IOThreadPoolExecutor::threadRun(ThreadPtr thread) { const auto ioThread = std::static_pointer_cast(thread); ioThread->eventBase = folly::EventBaseManager::get()->getEventBase(); + thisThread_.reset(new std::shared_ptr(ioThread)); auto idler = new MemoryIdlerTimeout(ioThread->eventBase); ioThread->eventBase->runBeforeLoop(idler); diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h index 0dcfd2ef..0fde4a29 100644 --- a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h @@ -15,6 +15,8 @@ */ #pragma once + +#include #include #include @@ -22,7 +24,7 @@ namespace folly { namespace wangle { // N.B. For this thread pool, stop() behaves like join() because outstanding // tasks belong to the event base and will be executed upon its destruction. -class IOThreadPoolExecutor : public ThreadPoolExecutor { +class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor { public: explicit IOThreadPoolExecutor( size_t numThreads, @@ -37,12 +39,9 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor { std::chrono::milliseconds expiration, Func expireCallback = nullptr) override; - private: - ThreadPtr makeThread() override; - void threadRun(ThreadPtr thread) override; - void stopThreads(size_t n) override; - uint64_t getPendingTaskCount() override; + EventBase* getEventBase() override; + private: struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread { IOThread(IOThreadPoolExecutor* pool) : Thread(pool), @@ -53,7 +52,14 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor { EventBase* eventBase; }; + ThreadPtr makeThread() override; + std::shared_ptr pickThread(); + void threadRun(ThreadPtr thread) override; + void stopThreads(size_t n) override; + uint64_t getPendingTaskCount() override; + size_t nextThread_; + ThreadLocal> thisThread_; }; }} // folly::wangle diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h index 451f164f..88aa1bc7 100644 --- a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h +++ b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h @@ -31,7 +31,7 @@ namespace folly { namespace wangle { -class ThreadPoolExecutor : public Executor { +class ThreadPoolExecutor : public virtual Executor { public: explicit ThreadPoolExecutor( size_t numThreads, diff --git a/folly/experimental/wangle/concurrent/test/GlobalExecutorTest.cpp b/folly/experimental/wangle/concurrent/test/GlobalExecutorTest.cpp new file mode 100644 index 00000000..f0f678de --- /dev/null +++ b/folly/experimental/wangle/concurrent/test/GlobalExecutorTest.cpp @@ -0,0 +1,51 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +using namespace folly::wangle; + +TEST(GlobalExecutorTest, GlobalIOExecutor) { + class DummyExecutor : public IOExecutor { + public: + void add(folly::Func f) override { + count++; + } + folly::EventBase* getEventBase() override { + return nullptr; + } + int count{0}; + }; + + auto f = [](){}; + + // Don't explode, we should create the default global IOExecutor lazily here. + getIOExecutor()->add(f); + + { + DummyExecutor dummy; + setIOExecutor(&dummy); + getIOExecutor()->add(f); + // Make sure we were properly installed. + EXPECT_EQ(1, dummy.count); + } + + // Don't explode, we should restore the default global IOExecutor when dummy + // is destructed. + getIOExecutor()->add(f); +}