Consistently have the namespace closing comment
[folly.git] / folly / executors / ThreadedExecutor.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/executors/ThreadedExecutor.h>
18
19 #include <chrono>
20
21 #include <glog/logging.h>
22
23 #include <folly/executors/thread_factory/NamedThreadFactory.h>
24 #include <folly/system/ThreadName.h>
25
26 namespace folly {
27
28 template <typename F>
29 static auto with_unique_lock(std::mutex& m, F&& f) -> decltype(f()) {
30   std::unique_lock<std::mutex> lock(m);
31   return f();
32 }
33
34 ThreadedExecutor::ThreadedExecutor(std::shared_ptr<ThreadFactory> threadFactory)
35     : threadFactory_(std::move(threadFactory)) {
36   controlt_ = std::thread([this] { control(); });
37 }
38
39 ThreadedExecutor::~ThreadedExecutor() {
40   stopping_.store(true, std::memory_order_release);
41   notify();
42   controlt_.join();
43   CHECK(running_.empty());
44   CHECK(finished_.empty());
45 }
46
47 void ThreadedExecutor::add(Func func) {
48   CHECK(!stopping_.load(std::memory_order_acquire));
49   with_unique_lock(enqueuedm_, [&] { enqueued_.push_back(std::move(func)); });
50   notify();
51 }
52
53 std::shared_ptr<ThreadFactory> ThreadedExecutor::newDefaultThreadFactory() {
54   return std::make_shared<NamedThreadFactory>("Threaded");
55 }
56
57 void ThreadedExecutor::notify() {
58   with_unique_lock(controlm_, [&] { controls_ = true; });
59   controlc_.notify_one();
60 }
61
62 void ThreadedExecutor::control() {
63   folly::setThreadName("ThreadedCtrl");
64   auto looping = true;
65   while (looping) {
66     controlWait();
67     looping = controlPerformAll();
68   }
69 }
70
71 void ThreadedExecutor::controlWait() {
72   constexpr auto kMaxWait = std::chrono::seconds(10);
73   std::unique_lock<std::mutex> lock(controlm_);
74   controlc_.wait_for(lock, kMaxWait, [&] { return controls_; });
75   controls_ = false;
76 }
77
78 void ThreadedExecutor::work(Func& func) {
79   func();
80   auto id = std::this_thread::get_id();
81   with_unique_lock(finishedm_, [&] { finished_.push_back(id); });
82   notify();
83 }
84
85 void ThreadedExecutor::controlJoinFinishedThreads() {
86   std::deque<std::thread::id> finishedt;
87   with_unique_lock(finishedm_, [&] { std::swap(finishedt, finished_); });
88   for (auto id : finishedt) {
89     running_[id].join();
90     running_.erase(id);
91   }
92 }
93
94 void ThreadedExecutor::controlLaunchEnqueuedTasks() {
95   std::deque<Func> enqueuedt;
96   with_unique_lock(enqueuedm_, [&] { std::swap(enqueuedt, enqueued_); });
97   for (auto& f : enqueuedt) {
98     auto th = threadFactory_->newThread(
99         [ this, f = std::move(f) ]() mutable { work(f); });
100     auto id = th.get_id();
101     running_[id] = std::move(th);
102   }
103 }
104
105 bool ThreadedExecutor::controlPerformAll() {
106   auto stopping = stopping_.load(std::memory_order_acquire);
107   controlJoinFinishedThreads();
108   controlLaunchEnqueuedTasks();
109   return !stopping || !running_.empty();
110 }
111 } // namespace folly