2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/FunctionScheduler.h>
18 #include <folly/ThreadName.h>
19 #include <folly/Conv.h>
20 #include <folly/String.h>
23 using std::chrono::milliseconds;
24 using std::chrono::steady_clock;
28 FunctionScheduler::FunctionScheduler() {
31 FunctionScheduler::~FunctionScheduler() {
32 // make sure to stop the thread (if running)
36 void FunctionScheduler::addFunction(const std::function<void()>& cb,
37 milliseconds interval,
39 milliseconds startDelay) {
40 LatencyDistribution latencyDistr(false, 0.0);
41 addFunctionInternal(cb, interval,
42 latencyDistr, nameID, startDelay);
45 void FunctionScheduler::addFunctionInternal(const std::function<void()>& cb,
46 milliseconds interval,
47 const LatencyDistribution& latencyDistr,
49 milliseconds startDelay) {
50 if (interval < milliseconds::zero()) {
51 throw std::invalid_argument("FunctionScheduler: "
52 "time interval must be non-negative");
54 if (startDelay < milliseconds::zero()) {
55 throw std::invalid_argument("FunctionScheduler: "
56 "start delay must be non-negative");
59 std::lock_guard<std::mutex> l(mutex_);
60 // check if the nameID is unique
61 for (const auto& f : functions_) {
62 if (f.isValid() && f.name == nameID) {
63 throw std::invalid_argument(to<string>(
64 "FunctionScheduler: a function named \"", nameID,
65 "\" already exists"));
68 if (currentFunction_ && currentFunction_->name == nameID) {
69 throw std::invalid_argument(to<string>(
70 "FunctionScheduler: a function named \"", nameID,
71 "\" already exists"));
74 functions_.emplace_back(cb, interval, nameID.str(), startDelay,
75 latencyDistr.isPoisson, latencyDistr.poissonMean);
77 functions_.back().setNextRunTime(steady_clock::now() + startDelay);
78 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
79 // Signal the running thread to wake up and see if it needs to change it's
80 // current scheduling decision.
81 runningCondvar_.notify_one();
85 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
86 std::unique_lock<std::mutex> l(mutex_);
88 if (currentFunction_ && currentFunction_->name == nameID) {
89 // This function is currently being run. Clear currentFunction_
90 // The running thread will see this and won't reschedule the function.
91 currentFunction_ = nullptr;
95 for (auto it = functions_.begin(); it != functions_.end(); ++it) {
96 if (it->isValid() && it->name == nameID) {
97 cancelFunction(l, it);
104 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
105 FunctionHeap::iterator it) {
106 // This function should only be called with mutex_ already locked.
107 DCHECK(l.mutex() == &mutex_);
108 DCHECK(l.owns_lock());
111 // Internally gcc has an __adjust_heap() function to fill in a hole in the
112 // heap. Unfortunately it isn't part of the standard API.
114 // For now we just leave the RepeatFunc in our heap, but mark it as unused.
115 // When it's nextTimeInterval comes up, the runner thread will pop it from
116 // the heap and simply throw it away.
119 // We're not running, so functions_ doesn't need to be maintained in heap
121 functions_.erase(it);
125 void FunctionScheduler::cancelAllFunctions() {
126 std::unique_lock<std::mutex> l(mutex_);
130 bool FunctionScheduler::start() {
131 std::unique_lock<std::mutex> l(mutex_);
138 VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
140 auto now = steady_clock::now();
141 // Reset the next run time. for all functions.
142 // note: this is needed since one can shutdown() and start() again
143 for (auto& f : functions_) {
144 f.setNextRunTime(now + f.startDelay);
145 VLOG(1) << " - func: "
146 << (f.name.empty() ? "(anon)" : f.name.c_str())
147 << ", period = " << f.timeInterval.count()
148 << "ms, delay = " << f.startDelay.count() << "ms";
150 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
152 thread_ = std::thread([&] { this->run(); });
156 void FunctionScheduler::shutdown() {
158 std::lock_guard<std::mutex> g(mutex_);
164 runningCondvar_.notify_one();
169 void FunctionScheduler::run() {
170 std::unique_lock<std::mutex> lock(mutex_);
172 if (!threadName_.empty()) {
173 folly::setThreadName(threadName_);
177 // If we have nothing to run, wait until a function is added or until we
179 if (functions_.empty()) {
180 runningCondvar_.wait(lock);
184 auto now = steady_clock::now();
186 // Move the next function to run to the end of functions_
187 std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
189 // Check to see if the function was cancelled.
190 // If so, just remove it and continue around the loop.
191 if (!functions_.back().isValid()) {
192 functions_.pop_back();
196 auto sleepTime = functions_.back().getNextRunTime() - now;
197 if (sleepTime < milliseconds::zero()) {
198 // We need to run this function now
199 runOneFunction(lock, now);
201 // Re-add the function to the heap, and wait until we actually
203 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
204 runningCondvar_.wait_for(lock, sleepTime);
209 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
210 steady_clock::time_point now) {
211 DCHECK(lock.mutex() == &mutex_);
212 DCHECK(lock.owns_lock());
214 // The function to run will be at the end of functions_ already.
216 // Fully remove it from functions_ now.
217 // We need to release mutex_ while we invoke this function, and we need to
218 // maintain the heap property on functions_ while mutex_ is unlocked.
219 RepeatFunc func(std::move(functions_.back()));
220 functions_.pop_back();
221 currentFunction_ = &func;
223 // Update the function's run time, and re-insert it into the heap.
225 // This allows scheduler to catch up
226 func.lastRunTime += func.timeInterval;
228 // Note that we adjust lastRunTime to the current time where we started the
229 // function call, rather than the time when the function finishes.
230 // This ensures that we call the function once every time interval, as
231 // opposed to waiting time interval seconds between calls. (These can be
232 // different if the function takes a significant amount of time to run.)
233 func.lastRunTime = now;
236 // Release the lock while we invoke the user's function
239 // Invoke the function
241 VLOG(5) << "Now running " << func.name;
243 } catch (const std::exception& ex) {
244 LOG(ERROR) << "Error running the scheduled function <"
245 << func.name << ">: " << exceptionStr(ex);
248 // Re-acquire the lock
251 if (!currentFunction_) {
252 // The function was cancelled while we were running it.
253 // We shouldn't reschedule it;
256 // Clear currentFunction_
257 CHECK_EQ(currentFunction_, &func);
258 currentFunction_ = nullptr;
260 // Re-insert the function into our functions_ heap.
261 // We only maintain the heap property while running_ is set. (running_ may
262 // have been cleared while we were invoking the user's function.)
263 if (func.isPoissonDistr) {
264 func.setTimeIntervalPoissonDistr();
266 functions_.push_back(std::move(func));
268 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
272 void FunctionScheduler::setThreadName(StringPiece threadName) {
273 std::unique_lock<std::mutex> l(mutex_);
274 threadName_ = threadName.str();