2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/FunctionScheduler.h>
18 #include <folly/ThreadName.h>
19 #include <folly/Conv.h>
20 #include <folly/String.h>
23 using std::chrono::milliseconds;
24 using std::chrono::steady_clock;
28 FunctionScheduler::FunctionScheduler() {
31 FunctionScheduler::~FunctionScheduler() {
32 // make sure to stop the thread (if running)
36 void FunctionScheduler::addFunction(const std::function<void()>& cb,
37 milliseconds interval,
39 milliseconds startDelay) {
40 LatencyDistribution latencyDistr(false, 0.0);
41 addFunction(cb, interval, latencyDistr, nameID, startDelay);
44 void FunctionScheduler::addFunction(const std::function<void()>& cb,
45 milliseconds interval,
46 const LatencyDistribution& latencyDistr,
48 milliseconds startDelay) {
49 if (interval < milliseconds::zero()) {
50 throw std::invalid_argument("FunctionScheduler: "
51 "time interval must be non-negative");
53 if (startDelay < milliseconds::zero()) {
54 throw std::invalid_argument("FunctionScheduler: "
55 "start delay must be non-negative");
58 std::lock_guard<std::mutex> l(mutex_);
59 // check if the nameID is unique
60 for (const auto& f : functions_) {
61 if (f.isValid() && f.name == nameID) {
62 throw std::invalid_argument(to<string>(
63 "FunctionScheduler: a function named \"", nameID,
64 "\" already exists"));
67 if (currentFunction_ && currentFunction_->name == nameID) {
68 throw std::invalid_argument(to<string>(
69 "FunctionScheduler: a function named \"", nameID,
70 "\" already exists"));
73 functions_.emplace_back(cb, interval, nameID.str(), startDelay,
74 latencyDistr.isPoisson, latencyDistr.poissonMean);
76 functions_.back().setNextRunTime(steady_clock::now() + startDelay);
77 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
78 // Signal the running thread to wake up and see if it needs to change it's
79 // current scheduling decision.
80 runningCondvar_.notify_one();
84 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
85 std::unique_lock<std::mutex> l(mutex_);
87 if (currentFunction_ && currentFunction_->name == nameID) {
88 // This function is currently being run. Clear currentFunction_
89 // The running thread will see this and won't reschedule the function.
90 currentFunction_ = nullptr;
94 for (auto it = functions_.begin(); it != functions_.end(); ++it) {
95 if (it->isValid() && it->name == nameID) {
96 cancelFunction(l, it);
103 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
104 FunctionHeap::iterator it) {
105 // This function should only be called with mutex_ already locked.
106 DCHECK(l.mutex() == &mutex_);
107 DCHECK(l.owns_lock());
110 // Internally gcc has an __adjust_heap() function to fill in a hole in the
111 // heap. Unfortunately it isn't part of the standard API.
113 // For now we just leave the RepeatFunc in our heap, but mark it as unused.
114 // When it's nextTimeInterval comes up, the runner thread will pop it from
115 // the heap and simply throw it away.
118 // We're not running, so functions_ doesn't need to be maintained in heap
120 functions_.erase(it);
124 void FunctionScheduler::cancelAllFunctions() {
125 std::unique_lock<std::mutex> l(mutex_);
129 bool FunctionScheduler::start() {
130 std::unique_lock<std::mutex> l(mutex_);
137 VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
139 auto now = steady_clock::now();
140 // Reset the next run time. for all functions.
141 // note: this is needed since one can shutdown() and start() again
142 for (auto& f : functions_) {
143 f.setNextRunTime(now + f.startDelay);
144 VLOG(1) << " - func: "
145 << (f.name.empty() ? "(anon)" : f.name.c_str())
146 << ", period = " << f.timeInterval.count()
147 << "ms, delay = " << f.startDelay.count() << "ms";
149 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
151 thread_ = std::thread([&] { this->run(); });
155 void FunctionScheduler::shutdown() {
157 std::lock_guard<std::mutex> g(mutex_);
163 runningCondvar_.notify_one();
168 void FunctionScheduler::run() {
169 std::unique_lock<std::mutex> lock(mutex_);
171 if (!threadName_.empty()) {
172 folly::setThreadName(threadName_);
176 // If we have nothing to run, wait until a function is added or until we
178 if (functions_.empty()) {
179 runningCondvar_.wait(lock);
183 auto now = steady_clock::now();
185 // Move the next function to run to the end of functions_
186 std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
188 // Check to see if the function was cancelled.
189 // If so, just remove it and continue around the loop.
190 if (!functions_.back().isValid()) {
191 functions_.pop_back();
195 auto sleepTime = functions_.back().getNextRunTime() - now;
196 if (sleepTime < milliseconds::zero()) {
197 // We need to run this function now
198 runOneFunction(lock, now);
200 // Re-add the function to the heap, and wait until we actually
202 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
203 runningCondvar_.wait_for(lock, sleepTime);
208 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
209 steady_clock::time_point now) {
210 DCHECK(lock.mutex() == &mutex_);
211 DCHECK(lock.owns_lock());
213 // The function to run will be at the end of functions_ already.
215 // Fully remove it from functions_ now.
216 // We need to release mutex_ while we invoke this function, and we need to
217 // maintain the heap property on functions_ while mutex_ is unlocked.
218 RepeatFunc func(std::move(functions_.back()));
219 functions_.pop_back();
220 currentFunction_ = &func;
222 // Update the function's run time, and re-insert it into the heap.
224 // This allows scheduler to catch up
225 func.lastRunTime += func.timeInterval;
227 // Note that we adjust lastRunTime to the current time where we started the
228 // function call, rather than the time when the function finishes.
229 // This ensures that we call the function once every time interval, as
230 // opposed to waiting time interval seconds between calls. (These can be
231 // different if the function takes a significant amount of time to run.)
232 func.lastRunTime = now;
235 // Release the lock while we invoke the user's function
238 // Invoke the function
240 VLOG(5) << "Now running " << func.name;
242 } catch (const std::exception& ex) {
243 LOG(ERROR) << "Error running the scheduled function <"
244 << func.name << ">: " << exceptionStr(ex);
247 // Re-acquire the lock
250 if (!currentFunction_) {
251 // The function was cancelled while we were running it.
252 // We shouldn't reschedule it;
255 // Clear currentFunction_
256 CHECK_EQ(currentFunction_, &func);
257 currentFunction_ = nullptr;
259 // Re-insert the function into our functions_ heap.
260 // We only maintain the heap property while running_ is set. (running_ may
261 // have been cleared while we were invoking the user's function.)
262 if (func.isPoissonDistr) {
263 func.setTimeIntervalPoissonDistr();
265 functions_.push_back(std::move(func));
267 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
271 void FunctionScheduler::setThreadName(StringPiece threadName) {
272 std::unique_lock<std::mutex> l(mutex_);
273 threadName_ = threadName.str();