2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/FunctionScheduler.h>
18 #include <folly/ThreadName.h>
19 #include <folly/Conv.h>
20 #include <folly/String.h>
22 #ifdef _POSIX_MONOTONIC_CLOCK
23 #define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_MONOTONIC
25 #define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_REALTIME
29 using std::chrono::seconds;
30 using std::chrono::milliseconds;
32 static milliseconds nowInMS() {
33 struct timespec ts /*= void*/;
34 if (clock_gettime(FOLLY_TIME_MONOTONIC_CLOCK, &ts)) {
35 // Only possible failures are EFAULT or EINVAL, both practically
36 // impossible. But an assert can't hurt.
40 static_cast<int64_t>(ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0 + 0.5));
45 FunctionScheduler::FunctionScheduler() {
48 FunctionScheduler::~FunctionScheduler() {
49 // make sure to stop the thread (if running)
53 void FunctionScheduler::addFunction(const std::function<void()>& cb,
54 milliseconds interval,
56 milliseconds startDelay) {
57 LatencyDistribution latencyDistr(false, 0.0);
58 addFunctionInternal(cb, interval,
59 latencyDistr, nameID, startDelay);
62 void FunctionScheduler::addFunctionInternal(const std::function<void()>& cb,
63 milliseconds interval,
64 const LatencyDistribution& latencyDistr,
66 milliseconds startDelay) {
67 if (interval < milliseconds::zero()) {
68 throw std::invalid_argument("FunctionScheduler: "
69 "time interval must be non-negative");
71 if (startDelay < milliseconds::zero()) {
72 throw std::invalid_argument("FunctionScheduler: "
73 "start delay must be non-negative");
76 std::lock_guard<std::mutex> l(mutex_);
77 // check if the nameID is unique
78 for (const auto& f : functions_) {
79 if (f.isValid() && f.name == nameID) {
80 throw std::invalid_argument(to<string>(
81 "FunctionScheduler: a function named \"", nameID,
82 "\" already exists"));
85 if (currentFunction_ && currentFunction_->name == nameID) {
86 throw std::invalid_argument(to<string>(
87 "FunctionScheduler: a function named \"", nameID,
88 "\" already exists"));
91 functions_.emplace_back(cb, interval, nameID.str(), startDelay,
92 latencyDistr.isPoisson, latencyDistr.poissonMean);
94 functions_.back().setNextRunTime(nowInMS() + startDelay);
95 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
96 // Signal the running thread to wake up and see if it needs to change it's
97 // current scheduling decision.
98 runningCondvar_.notify_one();
102 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
103 bool retValue = false;
104 std::unique_lock<std::mutex> l(mutex_);
106 if (currentFunction_ && currentFunction_->name == nameID) {
107 // This function is currently being run. Clear currentFunction_
108 // The running thread will see this and won't reschedule the function.
109 currentFunction_ = nullptr;
113 for (auto it = functions_.begin(); it != functions_.end(); ++it) {
114 if (it->isValid() && it->name == nameID) {
115 cancelFunction(l, it);
122 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
123 FunctionHeap::iterator it) {
124 // This function should only be called with mutex_ already locked.
125 DCHECK(l.mutex() == &mutex_);
126 DCHECK(l.owns_lock());
129 // Internally gcc has an __adjust_heap() function to fill in a hole in the
130 // heap. Unfortunately it isn't part of the standard API.
132 // For now we just leave the RepeatFunc in our heap, but mark it as unused.
133 // When it's nextTimeInterval comes up, the runner thread will pop it from
134 // the heap and simply throw it away.
137 // We're not running, so functions_ doesn't need to be maintained in heap
139 functions_.erase(it);
143 void FunctionScheduler::cancelAllFunctions() {
144 std::unique_lock<std::mutex> l(mutex_);
148 bool FunctionScheduler::start() {
149 std::unique_lock<std::mutex> l(mutex_);
156 VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
158 milliseconds now(nowInMS());
159 // Reset the next run time. for all functions.
160 // note: this is needed since one can shutdown() and start() again
161 for (auto& f : functions_) {
162 f.setNextRunTime(now + f.startDelay);
163 VLOG(1) << " - func: "
164 << (f.name.empty() ? "(anon)" : f.name.c_str())
165 << ", period = " << f.timeInterval.count()
166 << "ms, delay = " << f.startDelay.count() << "ms";
168 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
170 thread_ = std::thread([&] { this->run(); });
174 void FunctionScheduler::shutdown() {
176 std::lock_guard<std::mutex> g(mutex_);
182 runningCondvar_.notify_one();
187 void FunctionScheduler::run() {
188 std::unique_lock<std::mutex> lock(mutex_);
190 if (!threadName_.empty()) {
191 folly::setThreadName(threadName_);
192 google::setThreadName(threadName_);
196 // If we have nothing to run, wait until a function is added or until we
198 if (functions_.empty()) {
199 runningCondvar_.wait(lock);
203 milliseconds now(nowInMS());
205 // Move the next function to run to the end of functions_
206 std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
208 // Check to see if the function was cancelled.
209 // If so, just remove it and continue around the loop.
210 if (!functions_.back().isValid()) {
211 functions_.pop_back();
215 auto sleepTime = functions_.back().getNextRunTime() - now;
216 if (sleepTime < milliseconds::zero()) {
217 // We need to run this function now
218 runOneFunction(lock, now);
220 // Re-add the function to the heap, and wait until we actually
222 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
223 runningCondvar_.wait_for(lock, sleepTime);
228 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
229 std::chrono::milliseconds now) {
230 DCHECK(lock.mutex() == &mutex_);
231 DCHECK(lock.owns_lock());
233 // The function to run will be at the end of functions_ already.
235 // Fully remove it from functions_ now.
236 // We need to release mutex_ while we invoke this function, and we need to
237 // maintain the heap property on functions_ while mutex_ is unlocked.
238 RepeatFunc func(std::move(functions_.back()));
239 functions_.pop_back();
240 currentFunction_ = &func;
242 // Update the function's run time, and re-insert it into the heap.
244 // This allows scheduler to catch up
245 func.lastRunTime += func.timeInterval;
247 // Note that we adjust lastRunTime to the current time where we started the
248 // function call, rather than the time when the function finishes.
249 // This ensures that we call the function once every time interval, as
250 // opposed to waiting time interval seconds between calls. (These can be
251 // different if the function takes a significant amount of time to run.)
252 func.lastRunTime = now;
255 // Release the lock while we invoke the user's function
258 // Invoke the function
260 VLOG(5) << "Now running " << func.name;
262 } catch (const std::exception& ex) {
263 LOG(ERROR) << "Error running the scheduled function <"
264 << func.name << ">: " << exceptionStr(ex);
267 // Re-acquire the lock
270 if (!currentFunction_) {
271 // The function was cancelled while we were running it.
272 // We shouldn't reschedule it;
275 // Clear currentFunction_
276 CHECK_EQ(currentFunction_, &func);
277 currentFunction_ = nullptr;
279 // Re-insert the function into our functions_ heap.
280 // We only maintain the heap property while running_ is set. (running_ may
281 // have been cleared while we were invoking the user's function.)
282 if (func.isPoissonDistr) {
283 func.setTimeIntervalPoissonDistr();
285 functions_.push_back(std::move(func));
287 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
291 void FunctionScheduler::setThreadName(StringPiece threadName) {
292 std::unique_lock<std::mutex> l(mutex_);
293 threadName_ = threadName.str();