From: James Sedgwick Date: Sat, 21 Oct 2017 22:28:58 +0000 (-0700) Subject: move InlineExecutor, ManualExecutor, and GlobalThreadPoolList to X-Git-Tag: v2017.10.23.00~1 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=1719bfce91a3cca41abc566049eb4f4b8b2a565a move InlineExecutor, ManualExecutor, and GlobalThreadPoolList to Summary: That's everything that's going in executors/ except for Executor.h itself, which is included in hphp so will have to wait Reviewed By: mzlee Differential Revision: D6100274 fbshipit-source-id: 6be37892b1ad7f46828acfa6b2951e51b157a86a --- diff --git a/folly/Makefile.am b/folly/Makefile.am index cbc950b2..eac08817 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -59,7 +59,6 @@ nobase_follyinclude_HEADERS = \ concurrency/CacheLocality.h \ concurrency/ConcurrentHashMap.h \ concurrency/CoreCachedSharedPtr.h \ - concurrency/GlobalThreadPoolList.h \ concurrency/detail/ConcurrentHashMap-detail.h \ ConstexprMath.h \ detail/AtomicHashUtils.h \ @@ -92,6 +91,8 @@ nobase_follyinclude_HEADERS = \ executors/FiberIOExecutor.h \ executors/FutureExecutor.h \ executors/GlobalExecutor.h \ + executors/GlobalThreadPoolList.h \ + executors/InlineExecutor.h \ executors/IOExecutor.h \ executors/IOObjectCache.h \ executors/IOThreadPoolExecutor.h \ @@ -216,8 +217,6 @@ nobase_follyinclude_HEADERS = \ futures/Future-inl.h \ futures/FutureException.h \ futures/FutureSplitter.h \ - futures/InlineExecutor.h \ - futures/ManualExecutor.h \ futures/Promise-inl.h \ futures/Promise.h \ futures/SharedPromise.h \ @@ -491,7 +490,6 @@ libfolly_la_SOURCES = \ compression/Compression.cpp \ compression/Zlib.cpp \ concurrency/CacheLocality.cpp \ - concurrency/GlobalThreadPoolList.cpp \ detail/Futex.cpp \ detail/IPAddress.cpp \ detail/StaticSingletonManager.cpp \ @@ -505,14 +503,15 @@ libfolly_la_SOURCES = \ futures/Barrier.cpp \ futures/Future.cpp \ futures/FutureException.cpp \ - futures/InlineExecutor.cpp \ - futures/ManualExecutor.cpp \ futures/ThreadWheelTimekeeper.cpp \ futures/test/TestExecutor.cpp \ executors/CPUThreadPoolExecutor.cpp \ executors/Codel.cpp \ executors/GlobalExecutor.cpp \ + executors/GlobalThreadPoolList.cpp \ executors/IOThreadPoolExecutor.cpp \ + executors/InlineExecutor.cpp \ + executors/ManualExecutor.cpp \ executors/SerialExecutor.cpp \ executors/ThreadPoolExecutor.cpp \ executors/ThreadedExecutor.cpp \ diff --git a/folly/concurrency/GlobalThreadPoolList.cpp b/folly/concurrency/GlobalThreadPoolList.cpp deleted file mode 100644 index 680193d2..00000000 --- a/folly/concurrency/GlobalThreadPoolList.cpp +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include -#include -#include - -#include -#include -#include -#include - -namespace folly { - -namespace { - -class ThreadListHook { - public: - ThreadListHook(ThreadPoolListHook* poolId, std::thread::id threadId); - ~ThreadListHook(); - - private: - ThreadListHook() {} - ThreadPoolListHook* poolId_; - std::thread::id threadId_; -}; - -class GlobalThreadPoolListImpl { - public: - GlobalThreadPoolListImpl() {} - - void registerThreadPool(ThreadPoolListHook* threadPoolId, std::string name); - - void unregisterThreadPool(ThreadPoolListHook* threadPoolId); - - void registerThreadPoolThread( - ThreadPoolListHook* threadPoolId, - std::thread::id threadId); - - void unregisterThreadPoolThread( - ThreadPoolListHook* threadPoolId, - std::thread::id threadId); - - private: - struct PoolInfo { - ThreadPoolListHook* addr; - std::string name; - std::vector threads; - }; - - struct Pools { - // Just a vector since ease of access from gdb is the most important - // property - std::vector poolsInfo_; - - std::vector* FOLLY_NULLABLE - getThreadVector(void* threadPoolId) { - for (auto& elem : vector()) { - if (elem.addr == threadPoolId) { - return &elem.threads; - } - } - - return nullptr; - } - - std::vector& vector() { - return poolsInfo_; - } - }; - - Pools pools_; -}; - -class GlobalThreadPoolList { - public: - GlobalThreadPoolList() {} - - static GlobalThreadPoolList& instance(); - - void registerThreadPool(ThreadPoolListHook* threadPoolId, std::string name); - - void unregisterThreadPool(ThreadPoolListHook* threadPoolId); - - void registerThreadPoolThread( - ThreadPoolListHook* threadPoolId, - std::thread::id threadId); - - void unregisterThreadPoolThread( - ThreadPoolListHook* threadPoolId, - std::thread::id threadId); - - GlobalThreadPoolList(GlobalThreadPoolList const&) = delete; - void operator=(GlobalThreadPoolList const&) = delete; - - private: - folly::Synchronized globalListImpl_; - folly::ThreadLocalPtr threadHook_; -}; - -} // namespace - -GlobalThreadPoolList& GlobalThreadPoolList::instance() { - static folly::Indestructible ret; - return *ret; -} - -void GlobalThreadPoolList::registerThreadPool( - ThreadPoolListHook* threadPoolId, - std::string name) { - globalListImpl_->registerThreadPool(threadPoolId, name); -} - -void GlobalThreadPoolList::unregisterThreadPool( - ThreadPoolListHook* threadPoolId) { - globalListImpl_->unregisterThreadPool(threadPoolId); -} - -void GlobalThreadPoolList::registerThreadPoolThread( - ThreadPoolListHook* threadPoolId, - std::thread::id threadId) { - DCHECK(!threadHook_); - threadHook_.reset(make_unique(threadPoolId, threadId)); - - globalListImpl_->registerThreadPoolThread(threadPoolId, threadId); -} - -void GlobalThreadPoolList::unregisterThreadPoolThread( - ThreadPoolListHook* threadPoolId, - std::thread::id threadId) { - (void)threadPoolId; - (void)threadId; - globalListImpl_->unregisterThreadPoolThread(threadPoolId, threadId); -} - -void GlobalThreadPoolListImpl::registerThreadPool( - ThreadPoolListHook* threadPoolId, - std::string name) { - PoolInfo info; - info.name = name; - info.addr = threadPoolId; - pools_.vector().push_back(info); -} - -void GlobalThreadPoolListImpl::unregisterThreadPool( - ThreadPoolListHook* threadPoolId) { - auto& vector = pools_.vector(); - vector.erase( - std::remove_if( - vector.begin(), - vector.end(), - [=](PoolInfo& i) { return i.addr == threadPoolId; }), - vector.end()); -} - -void GlobalThreadPoolListImpl::registerThreadPoolThread( - ThreadPoolListHook* threadPoolId, - std::thread::id threadId) { - auto vec = pools_.getThreadVector(threadPoolId); - if (vec == nullptr) { - return; - } - - vec->push_back(threadId); -} - -void GlobalThreadPoolListImpl::unregisterThreadPoolThread( - ThreadPoolListHook* threadPoolId, - std::thread::id threadId) { - auto vec = pools_.getThreadVector(threadPoolId); - if (vec == nullptr) { - return; - } - - vec->erase(std::remove(vec->begin(), vec->end(), threadId), vec->end()); -} - -ThreadListHook::ThreadListHook( - ThreadPoolListHook* poolId, - std::thread::id threadId) { - poolId_ = poolId; - threadId_ = threadId; -} - -ThreadListHook::~ThreadListHook() { - GlobalThreadPoolList::instance().unregisterThreadPoolThread( - poolId_, threadId_); -} - -ThreadPoolListHook::ThreadPoolListHook(std::string name) { - GlobalThreadPoolList::instance().registerThreadPool(this, name); -} - -ThreadPoolListHook::~ThreadPoolListHook() { - GlobalThreadPoolList::instance().unregisterThreadPool(this); -} - -void ThreadPoolListHook::registerThread() { - GlobalThreadPoolList::instance().registerThreadPoolThread( - this, std::this_thread::get_id()); -} - -} // folly diff --git a/folly/concurrency/GlobalThreadPoolList.h b/folly/concurrency/GlobalThreadPoolList.h deleted file mode 100644 index 8e13df08..00000000 --- a/folly/concurrency/GlobalThreadPoolList.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include -#include -#include - -namespace folly { - -/** - * A hook for tracking which threads belong to which thread pools. - * This is used only by a gdb extension to aid in debugging. You won't be able - * to see any useful information from within C++ code. - * - * An instance of ThreadPoolListHook should be created in the thread pool class - * that you want to keep track of. Then, to register a thread you call - * registerThread() on your instance of ThreadPoolListHook from that thread. - * - * When a thread exits it will be removed from the list - * When the thread pool is destroyed, it will be removed from the list - */ -class ThreadPoolListHook { - public: - /** - * Name is used to identify the thread pool when listing threads. - */ - explicit ThreadPoolListHook(std::string name); - ~ThreadPoolListHook(); - - /** - * Call this from any new thread that the thread pool creates. - */ - void registerThread(); - - ThreadPoolListHook(const ThreadPoolListHook& other) = delete; - ThreadPoolListHook& operator=(const ThreadPoolListHook&) = delete; - - private: - ThreadPoolListHook(); -}; - -} // folly diff --git a/folly/executors/GlobalExecutor.cpp b/folly/executors/GlobalExecutor.cpp index d2de51d7..ef853751 100644 --- a/folly/executors/GlobalExecutor.cpp +++ b/folly/executors/GlobalExecutor.cpp @@ -17,7 +17,7 @@ #include #include #include -#include +#include using namespace folly; diff --git a/folly/executors/GlobalThreadPoolList.cpp b/folly/executors/GlobalThreadPoolList.cpp new file mode 100644 index 00000000..a9e5e532 --- /dev/null +++ b/folly/executors/GlobalThreadPoolList.cpp @@ -0,0 +1,218 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace folly { + +namespace { + +class ThreadListHook { + public: + ThreadListHook(ThreadPoolListHook* poolId, std::thread::id threadId); + ~ThreadListHook(); + + private: + ThreadListHook() {} + ThreadPoolListHook* poolId_; + std::thread::id threadId_; +}; + +class GlobalThreadPoolListImpl { + public: + GlobalThreadPoolListImpl() {} + + void registerThreadPool(ThreadPoolListHook* threadPoolId, std::string name); + + void unregisterThreadPool(ThreadPoolListHook* threadPoolId); + + void registerThreadPoolThread( + ThreadPoolListHook* threadPoolId, + std::thread::id threadId); + + void unregisterThreadPoolThread( + ThreadPoolListHook* threadPoolId, + std::thread::id threadId); + + private: + struct PoolInfo { + ThreadPoolListHook* addr; + std::string name; + std::vector threads; + }; + + struct Pools { + // Just a vector since ease of access from gdb is the most important + // property + std::vector poolsInfo_; + + std::vector* FOLLY_NULLABLE + getThreadVector(void* threadPoolId) { + for (auto& elem : vector()) { + if (elem.addr == threadPoolId) { + return &elem.threads; + } + } + + return nullptr; + } + + std::vector& vector() { + return poolsInfo_; + } + }; + + Pools pools_; +}; + +class GlobalThreadPoolList { + public: + GlobalThreadPoolList() {} + + static GlobalThreadPoolList& instance(); + + void registerThreadPool(ThreadPoolListHook* threadPoolId, std::string name); + + void unregisterThreadPool(ThreadPoolListHook* threadPoolId); + + void registerThreadPoolThread( + ThreadPoolListHook* threadPoolId, + std::thread::id threadId); + + void unregisterThreadPoolThread( + ThreadPoolListHook* threadPoolId, + std::thread::id threadId); + + GlobalThreadPoolList(GlobalThreadPoolList const&) = delete; + void operator=(GlobalThreadPoolList const&) = delete; + + private: + folly::Synchronized globalListImpl_; + folly::ThreadLocalPtr threadHook_; +}; + +} // namespace + +GlobalThreadPoolList& GlobalThreadPoolList::instance() { + static folly::Indestructible ret; + return *ret; +} + +void GlobalThreadPoolList::registerThreadPool( + ThreadPoolListHook* threadPoolId, + std::string name) { + globalListImpl_->registerThreadPool(threadPoolId, name); +} + +void GlobalThreadPoolList::unregisterThreadPool( + ThreadPoolListHook* threadPoolId) { + globalListImpl_->unregisterThreadPool(threadPoolId); +} + +void GlobalThreadPoolList::registerThreadPoolThread( + ThreadPoolListHook* threadPoolId, + std::thread::id threadId) { + DCHECK(!threadHook_); + threadHook_.reset(make_unique(threadPoolId, threadId)); + + globalListImpl_->registerThreadPoolThread(threadPoolId, threadId); +} + +void GlobalThreadPoolList::unregisterThreadPoolThread( + ThreadPoolListHook* threadPoolId, + std::thread::id threadId) { + (void)threadPoolId; + (void)threadId; + globalListImpl_->unregisterThreadPoolThread(threadPoolId, threadId); +} + +void GlobalThreadPoolListImpl::registerThreadPool( + ThreadPoolListHook* threadPoolId, + std::string name) { + PoolInfo info; + info.name = name; + info.addr = threadPoolId; + pools_.vector().push_back(info); +} + +void GlobalThreadPoolListImpl::unregisterThreadPool( + ThreadPoolListHook* threadPoolId) { + auto& vector = pools_.vector(); + vector.erase( + std::remove_if( + vector.begin(), + vector.end(), + [=](PoolInfo& i) { return i.addr == threadPoolId; }), + vector.end()); +} + +void GlobalThreadPoolListImpl::registerThreadPoolThread( + ThreadPoolListHook* threadPoolId, + std::thread::id threadId) { + auto vec = pools_.getThreadVector(threadPoolId); + if (vec == nullptr) { + return; + } + + vec->push_back(threadId); +} + +void GlobalThreadPoolListImpl::unregisterThreadPoolThread( + ThreadPoolListHook* threadPoolId, + std::thread::id threadId) { + auto vec = pools_.getThreadVector(threadPoolId); + if (vec == nullptr) { + return; + } + + vec->erase(std::remove(vec->begin(), vec->end(), threadId), vec->end()); +} + +ThreadListHook::ThreadListHook( + ThreadPoolListHook* poolId, + std::thread::id threadId) { + poolId_ = poolId; + threadId_ = threadId; +} + +ThreadListHook::~ThreadListHook() { + GlobalThreadPoolList::instance().unregisterThreadPoolThread( + poolId_, threadId_); +} + +ThreadPoolListHook::ThreadPoolListHook(std::string name) { + GlobalThreadPoolList::instance().registerThreadPool(this, name); +} + +ThreadPoolListHook::~ThreadPoolListHook() { + GlobalThreadPoolList::instance().unregisterThreadPool(this); +} + +void ThreadPoolListHook::registerThread() { + GlobalThreadPoolList::instance().registerThreadPoolThread( + this, std::this_thread::get_id()); +} + +} // folly diff --git a/folly/executors/GlobalThreadPoolList.h b/folly/executors/GlobalThreadPoolList.h new file mode 100644 index 00000000..8e13df08 --- /dev/null +++ b/folly/executors/GlobalThreadPoolList.h @@ -0,0 +1,61 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace folly { + +/** + * A hook for tracking which threads belong to which thread pools. + * This is used only by a gdb extension to aid in debugging. You won't be able + * to see any useful information from within C++ code. + * + * An instance of ThreadPoolListHook should be created in the thread pool class + * that you want to keep track of. Then, to register a thread you call + * registerThread() on your instance of ThreadPoolListHook from that thread. + * + * When a thread exits it will be removed from the list + * When the thread pool is destroyed, it will be removed from the list + */ +class ThreadPoolListHook { + public: + /** + * Name is used to identify the thread pool when listing threads. + */ + explicit ThreadPoolListHook(std::string name); + ~ThreadPoolListHook(); + + /** + * Call this from any new thread that the thread pool creates. + */ + void registerThread(); + + ThreadPoolListHook(const ThreadPoolListHook& other) = delete; + ThreadPoolListHook& operator=(const ThreadPoolListHook&) = delete; + + private: + ThreadPoolListHook(); +}; + +} // folly diff --git a/folly/executors/InlineExecutor.cpp b/folly/executors/InlineExecutor.cpp new file mode 100644 index 00000000..d51ae7bb --- /dev/null +++ b/folly/executors/InlineExecutor.cpp @@ -0,0 +1,28 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +namespace folly { + +InlineExecutor& InlineExecutor::instance() { + static auto instance = Indestructible{}; + return *instance; +} + +} // namespace folly diff --git a/folly/executors/InlineExecutor.h b/folly/executors/InlineExecutor.h new file mode 100644 index 00000000..644ff7bf --- /dev/null +++ b/folly/executors/InlineExecutor.h @@ -0,0 +1,34 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include + +namespace folly { + +/// When work is "queued", execute it immediately inline. +/// Usually when you think you want this, you actually want a +/// QueuedImmediateExecutor. +class InlineExecutor : public Executor { + public: + static InlineExecutor& instance(); + + void add(Func f) override { + f(); + } +}; + +} // namespace folly diff --git a/folly/executors/ManualExecutor.cpp b/folly/executors/ManualExecutor.cpp new file mode 100644 index 00000000..51ff9c8a --- /dev/null +++ b/folly/executors/ManualExecutor.cpp @@ -0,0 +1,92 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +namespace folly { + +void ManualExecutor::add(Func callback) { + std::lock_guard lock(lock_); + funcs_.emplace(std::move(callback)); + sem_.post(); +} + +size_t ManualExecutor::run() { + size_t count; + size_t n; + Func func; + + { + std::lock_guard lock(lock_); + + while (!scheduledFuncs_.empty()) { + auto& sf = scheduledFuncs_.top(); + if (sf.time > now_) { + break; + } + funcs_.emplace(sf.moveOutFunc()); + scheduledFuncs_.pop(); + } + + n = funcs_.size(); + } + + for (count = 0; count < n; count++) { + { + std::lock_guard lock(lock_); + if (funcs_.empty()) { + break; + } + + // Balance the semaphore so it doesn't grow without bound + // if nobody is calling wait(). + // This may fail (with EAGAIN), that's fine. + sem_.tryWait(); + + func = std::move(funcs_.front()); + funcs_.pop(); + } + func(); + } + + return count; +} + +void ManualExecutor::wait() { + while (true) { + { + std::lock_guard lock(lock_); + if (!funcs_.empty()) { + break; + } + } + + sem_.wait(); + } +} + +void ManualExecutor::advanceTo(TimePoint const& t) { + if (t > now_) { + now_ = t; + } + run(); +} + +} // namespace folly diff --git a/folly/executors/ManualExecutor.h b/folly/executors/ManualExecutor.h new file mode 100644 index 00000000..028a4ce6 --- /dev/null +++ b/folly/executors/ManualExecutor.h @@ -0,0 +1,149 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +namespace folly { + /// A ManualExecutor only does work when you turn the crank, by calling + /// run() or indirectly with makeProgress() or waitFor(). + /// + /// The clock for a manual executor starts at 0 and advances only when you + /// ask it to. i.e. time is also under manual control. + /// + /// NB No attempt has been made to make anything other than add and schedule + /// threadsafe. + class ManualExecutor : public DrivableExecutor, + public ScheduledExecutor { + public: + void add(Func) override; + + /// Do work. Returns the number of functions that were executed (maybe 0). + /// Non-blocking, in the sense that we don't wait for work (we can't + /// control whether one of the functions blocks). + /// This is stable, it will not chase an ever-increasing tail of work. + /// This also means, there may be more work available to perform at the + /// moment that this returns. + size_t run(); + + /// Wait for work to do. + void wait(); + + /// Wait for work to do, and do it. + void makeProgress() { + wait(); + run(); + } + + /// Implements DrivableExecutor + void drive() override { + makeProgress(); + } + + /// makeProgress until this Future is ready. + template void waitFor(F const& f) { + // TODO(5427828) +#if 0 + while (!f.isReady()) + makeProgress(); +#else + while (!f.isReady()) { + run(); + } +#endif + + } + + void scheduleAt(Func&& f, TimePoint const& t) override { + std::lock_guard lock(lock_); + scheduledFuncs_.emplace(t, std::move(f)); + sem_.post(); + } + + /// Advance the clock. The clock never advances on its own. + /// Advancing the clock causes some work to be done, if work is available + /// to do (perhaps newly available because of the advanced clock). + /// If dur is <= 0 this is a noop. + void advance(Duration const& dur) { + advanceTo(now_ + dur); + } + + /// Advance the clock to this absolute time. If t is <= now(), + /// this is a noop. + void advanceTo(TimePoint const& t); + + TimePoint now() override { return now_; } + + /// Flush the function queue. Destroys all stored functions without + /// executing them. Returns number of removed functions. + std::size_t clear() { + std::queue funcs; + std::priority_queue scheduled_funcs; + + { + std::lock_guard lock(lock_); + funcs_.swap(funcs); + scheduledFuncs_.swap(scheduled_funcs); + } + + return funcs.size() + scheduled_funcs.size(); + } + + private: + std::mutex lock_; + std::queue funcs_; + LifoSem sem_; + + // helper class to enable ordering of scheduled events in the priority + // queue + struct ScheduledFunc { + TimePoint time; + size_t ordinal; + Func mutable func; + + ScheduledFunc(TimePoint const& t, Func&& f) + : time(t), func(std::move(f)) + { + static size_t seq = 0; + ordinal = seq++; + } + + bool operator<(ScheduledFunc const& b) const { + // Earlier-scheduled things must be *higher* priority + // in the max-based std::priority_queue + if (time == b.time) { + return ordinal > b.ordinal; + } + return time > b.time; + } + + Func&& moveOutFunc() const { + return std::move(func); + } + }; + std::priority_queue scheduledFuncs_; + TimePoint now_ = TimePoint::min(); + }; + +} diff --git a/folly/executors/ThreadPoolExecutor.cpp b/folly/executors/ThreadPoolExecutor.cpp index 227f2b1d..84394db0 100644 --- a/folly/executors/ThreadPoolExecutor.cpp +++ b/folly/executors/ThreadPoolExecutor.cpp @@ -16,7 +16,7 @@ #include -#include +#include namespace folly { diff --git a/folly/executors/ThreadPoolExecutor.h b/folly/executors/ThreadPoolExecutor.h index 09ff14fd..a92726ef 100644 --- a/folly/executors/ThreadPoolExecutor.h +++ b/folly/executors/ThreadPoolExecutor.h @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/folly/executors/test/AsyncTest.cpp b/folly/executors/test/AsyncTest.cpp index 0622a8a6..4a7f0cf0 100644 --- a/folly/executors/test/AsyncTest.cpp +++ b/folly/executors/test/AsyncTest.cpp @@ -15,7 +15,7 @@ */ #include -#include +#include #include using namespace folly; diff --git a/folly/executors/test/SerialExecutorTest.cpp b/folly/executors/test/SerialExecutorTest.cpp index 285fd747..ae487591 100644 --- a/folly/executors/test/SerialExecutorTest.cpp +++ b/folly/executors/test/SerialExecutorTest.cpp @@ -18,8 +18,8 @@ #include #include +#include #include -#include #include using namespace std::chrono; diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index c7ddeb0a..b903a644 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -23,7 +23,7 @@ #include #include -#include +#include #include #include diff --git a/folly/futures/InlineExecutor.cpp b/folly/futures/InlineExecutor.cpp deleted file mode 100644 index 848e7f26..00000000 --- a/folly/futures/InlineExecutor.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include - -namespace folly { - -InlineExecutor& InlineExecutor::instance() { - static auto instance = Indestructible{}; - return *instance; -} - -} // namespace folly diff --git a/folly/futures/InlineExecutor.h b/folly/futures/InlineExecutor.h deleted file mode 100644 index 644ff7bf..00000000 --- a/folly/futures/InlineExecutor.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once -#include - -namespace folly { - -/// When work is "queued", execute it immediately inline. -/// Usually when you think you want this, you actually want a -/// QueuedImmediateExecutor. -class InlineExecutor : public Executor { - public: - static InlineExecutor& instance(); - - void add(Func f) override { - f(); - } -}; - -} // namespace folly diff --git a/folly/futures/ManualExecutor.cpp b/folly/futures/ManualExecutor.cpp deleted file mode 100644 index 50941d02..00000000 --- a/folly/futures/ManualExecutor.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include -#include -#include - -namespace folly { - -void ManualExecutor::add(Func callback) { - std::lock_guard lock(lock_); - funcs_.emplace(std::move(callback)); - sem_.post(); -} - -size_t ManualExecutor::run() { - size_t count; - size_t n; - Func func; - - { - std::lock_guard lock(lock_); - - while (!scheduledFuncs_.empty()) { - auto& sf = scheduledFuncs_.top(); - if (sf.time > now_) { - break; - } - funcs_.emplace(sf.moveOutFunc()); - scheduledFuncs_.pop(); - } - - n = funcs_.size(); - } - - for (count = 0; count < n; count++) { - { - std::lock_guard lock(lock_); - if (funcs_.empty()) { - break; - } - - // Balance the semaphore so it doesn't grow without bound - // if nobody is calling wait(). - // This may fail (with EAGAIN), that's fine. - sem_.tryWait(); - - func = std::move(funcs_.front()); - funcs_.pop(); - } - func(); - } - - return count; -} - -void ManualExecutor::wait() { - while (true) { - { - std::lock_guard lock(lock_); - if (!funcs_.empty()) { - break; - } - } - - sem_.wait(); - } -} - -void ManualExecutor::advanceTo(TimePoint const& t) { - if (t > now_) { - now_ = t; - } - run(); -} - -} // namespace folly diff --git a/folly/futures/ManualExecutor.h b/folly/futures/ManualExecutor.h deleted file mode 100644 index 028a4ce6..00000000 --- a/folly/futures/ManualExecutor.h +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include - -#include -#include -#include - -namespace folly { - /// A ManualExecutor only does work when you turn the crank, by calling - /// run() or indirectly with makeProgress() or waitFor(). - /// - /// The clock for a manual executor starts at 0 and advances only when you - /// ask it to. i.e. time is also under manual control. - /// - /// NB No attempt has been made to make anything other than add and schedule - /// threadsafe. - class ManualExecutor : public DrivableExecutor, - public ScheduledExecutor { - public: - void add(Func) override; - - /// Do work. Returns the number of functions that were executed (maybe 0). - /// Non-blocking, in the sense that we don't wait for work (we can't - /// control whether one of the functions blocks). - /// This is stable, it will not chase an ever-increasing tail of work. - /// This also means, there may be more work available to perform at the - /// moment that this returns. - size_t run(); - - /// Wait for work to do. - void wait(); - - /// Wait for work to do, and do it. - void makeProgress() { - wait(); - run(); - } - - /// Implements DrivableExecutor - void drive() override { - makeProgress(); - } - - /// makeProgress until this Future is ready. - template void waitFor(F const& f) { - // TODO(5427828) -#if 0 - while (!f.isReady()) - makeProgress(); -#else - while (!f.isReady()) { - run(); - } -#endif - - } - - void scheduleAt(Func&& f, TimePoint const& t) override { - std::lock_guard lock(lock_); - scheduledFuncs_.emplace(t, std::move(f)); - sem_.post(); - } - - /// Advance the clock. The clock never advances on its own. - /// Advancing the clock causes some work to be done, if work is available - /// to do (perhaps newly available because of the advanced clock). - /// If dur is <= 0 this is a noop. - void advance(Duration const& dur) { - advanceTo(now_ + dur); - } - - /// Advance the clock to this absolute time. If t is <= now(), - /// this is a noop. - void advanceTo(TimePoint const& t); - - TimePoint now() override { return now_; } - - /// Flush the function queue. Destroys all stored functions without - /// executing them. Returns number of removed functions. - std::size_t clear() { - std::queue funcs; - std::priority_queue scheduled_funcs; - - { - std::lock_guard lock(lock_); - funcs_.swap(funcs); - scheduledFuncs_.swap(scheduled_funcs); - } - - return funcs.size() + scheduled_funcs.size(); - } - - private: - std::mutex lock_; - std::queue funcs_; - LifoSem sem_; - - // helper class to enable ordering of scheduled events in the priority - // queue - struct ScheduledFunc { - TimePoint time; - size_t ordinal; - Func mutable func; - - ScheduledFunc(TimePoint const& t, Func&& f) - : time(t), func(std::move(f)) - { - static size_t seq = 0; - ordinal = seq++; - } - - bool operator<(ScheduledFunc const& b) const { - // Earlier-scheduled things must be *higher* priority - // in the max-based std::priority_queue - if (time == b.time) { - return ordinal > b.ordinal; - } - return time > b.time; - } - - Func&& moveOutFunc() const { - return std::move(func); - } - }; - std::priority_queue scheduledFuncs_; - TimePoint now_ = TimePoint::min(); - }; - -} diff --git a/folly/futures/test/Benchmark.cpp b/folly/futures/test/Benchmark.cpp index 56f4f712..a9c743de 100644 --- a/folly/futures/test/Benchmark.cpp +++ b/folly/futures/test/Benchmark.cpp @@ -16,8 +16,8 @@ #include #include +#include #include -#include #include #include #include diff --git a/folly/futures/test/ExecutorTest.cpp b/folly/futures/test/ExecutorTest.cpp index 2c201d0a..1755a2fc 100644 --- a/folly/futures/test/ExecutorTest.cpp +++ b/folly/futures/test/ExecutorTest.cpp @@ -15,10 +15,10 @@ */ #include +#include +#include #include #include -#include -#include #include // TODO(jsedgwick) move this test to executors/test/ once the tested executors diff --git a/folly/futures/test/SelfDestructTest.cpp b/folly/futures/test/SelfDestructTest.cpp index 41b56bf5..fb2bc3f2 100644 --- a/folly/futures/test/SelfDestructTest.cpp +++ b/folly/futures/test/SelfDestructTest.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ +#include #include -#include #include using namespace folly; diff --git a/folly/futures/test/ViaTest.cpp b/folly/futures/test/ViaTest.cpp index 79e6c9bf..fb2008a8 100644 --- a/folly/futures/test/ViaTest.cpp +++ b/folly/futures/test/ViaTest.cpp @@ -19,9 +19,9 @@ #include #include #include +#include +#include #include -#include -#include #include using namespace folly; diff --git a/folly/futures/test/WindowTest.cpp b/folly/futures/test/WindowTest.cpp index 098fc7a8..90158d47 100644 --- a/folly/futures/test/WindowTest.cpp +++ b/folly/futures/test/WindowTest.cpp @@ -17,8 +17,8 @@ #include #include +#include #include -#include #include #include