update FunctionScheduler to use std::chrono::steady_clock
[folly.git] / folly / experimental / FunctionScheduler.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/FunctionScheduler.h>
18 #include <folly/ThreadName.h>
19 #include <folly/Conv.h>
20 #include <folly/String.h>
21
22 using namespace std;
23 using std::chrono::milliseconds;
24 using std::chrono::steady_clock;
25
26 namespace folly {
27
28 FunctionScheduler::FunctionScheduler() {
29 }
30
31 FunctionScheduler::~FunctionScheduler() {
32   // make sure to stop the thread (if running)
33   shutdown();
34 }
35
36 void FunctionScheduler::addFunction(const std::function<void()>& cb,
37                                     milliseconds interval,
38                                     StringPiece nameID,
39                                     milliseconds startDelay) {
40   LatencyDistribution latencyDistr(false, 0.0);
41   addFunctionInternal(cb, interval,
42                       latencyDistr, nameID, startDelay);
43 }
44
45 void FunctionScheduler::addFunctionInternal(const std::function<void()>& cb,
46                                     milliseconds interval,
47                                     const LatencyDistribution& latencyDistr,
48                                     StringPiece nameID,
49                                     milliseconds startDelay) {
50   if (interval < milliseconds::zero()) {
51     throw std::invalid_argument("FunctionScheduler: "
52                                 "time interval must be non-negative");
53   }
54   if (startDelay < milliseconds::zero()) {
55     throw std::invalid_argument("FunctionScheduler: "
56                                 "start delay must be non-negative");
57   }
58
59   std::lock_guard<std::mutex> l(mutex_);
60   // check if the nameID is unique
61   for (const auto& f : functions_) {
62     if (f.isValid() && f.name == nameID) {
63       throw std::invalid_argument(to<string>(
64             "FunctionScheduler: a function named \"", nameID,
65             "\" already exists"));
66     }
67   }
68   if (currentFunction_ && currentFunction_->name == nameID) {
69     throw std::invalid_argument(to<string>(
70           "FunctionScheduler: a function named \"", nameID,
71           "\" already exists"));
72   }
73
74   functions_.emplace_back(cb, interval, nameID.str(), startDelay,
75                           latencyDistr.isPoisson, latencyDistr.poissonMean);
76   if (running_) {
77     functions_.back().setNextRunTime(steady_clock::now() + startDelay);
78     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
79     // Signal the running thread to wake up and see if it needs to change it's
80     // current scheduling decision.
81     runningCondvar_.notify_one();
82   }
83 }
84
85 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
86   std::unique_lock<std::mutex> l(mutex_);
87
88   if (currentFunction_ && currentFunction_->name == nameID) {
89     // This function is currently being run.  Clear currentFunction_
90     // The running thread will see this and won't reschedule the function.
91     currentFunction_ = nullptr;
92     return true;
93   }
94
95   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
96     if (it->isValid() && it->name == nameID) {
97       cancelFunction(l, it);
98       return true;
99     }
100   }
101   return false;
102 }
103
104 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
105                                        FunctionHeap::iterator it) {
106   // This function should only be called with mutex_ already locked.
107   DCHECK(l.mutex() == &mutex_);
108   DCHECK(l.owns_lock());
109
110   if (running_) {
111     // Internally gcc has an __adjust_heap() function to fill in a hole in the
112     // heap.  Unfortunately it isn't part of the standard API.
113     //
114     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
115     // When it's nextTimeInterval comes up, the runner thread will pop it from
116     // the heap and simply throw it away.
117     it->cancel();
118   } else {
119     // We're not running, so functions_ doesn't need to be maintained in heap
120     // order.
121     functions_.erase(it);
122   }
123 }
124
125 void FunctionScheduler::cancelAllFunctions() {
126   std::unique_lock<std::mutex> l(mutex_);
127   functions_.clear();
128 }
129
130 bool FunctionScheduler::start() {
131   std::unique_lock<std::mutex> l(mutex_);
132   if (running_) {
133     return false;
134   }
135
136   running_ = true;
137
138   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
139           << " functions.";
140   auto now = steady_clock::now();
141   // Reset the next run time. for all functions.
142   // note: this is needed since one can shutdown() and start() again
143   for (auto& f : functions_) {
144     f.setNextRunTime(now + f.startDelay);
145     VLOG(1) << "   - func: "
146             << (f.name.empty() ? "(anon)" : f.name.c_str())
147             << ", period = " << f.timeInterval.count()
148             << "ms, delay = " << f.startDelay.count() << "ms";
149   }
150   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
151
152   thread_ = std::thread([&] { this->run(); });
153   return true;
154 }
155
156 void FunctionScheduler::shutdown() {
157   {
158     std::lock_guard<std::mutex> g(mutex_);
159     if (!running_) {
160       return;
161     }
162
163     running_ = false;
164     runningCondvar_.notify_one();
165   }
166   thread_.join();
167 }
168
169 void FunctionScheduler::run() {
170   std::unique_lock<std::mutex> lock(mutex_);
171
172   if (!threadName_.empty()) {
173     folly::setThreadName(threadName_);
174   }
175
176   while (running_) {
177     // If we have nothing to run, wait until a function is added or until we
178     // are stopped.
179     if (functions_.empty()) {
180       runningCondvar_.wait(lock);
181       continue;
182     }
183
184     auto now = steady_clock::now();
185
186     // Move the next function to run to the end of functions_
187     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
188
189     // Check to see if the function was cancelled.
190     // If so, just remove it and continue around the loop.
191     if (!functions_.back().isValid()) {
192       functions_.pop_back();
193       continue;
194     }
195
196     auto sleepTime = functions_.back().getNextRunTime() - now;
197     if (sleepTime < milliseconds::zero()) {
198       // We need to run this function now
199       runOneFunction(lock, now);
200     } else {
201       // Re-add the function to the heap, and wait until we actually
202       // need to run it.
203       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
204       runningCondvar_.wait_for(lock, sleepTime);
205     }
206   }
207 }
208
209 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
210                                        steady_clock::time_point now) {
211   DCHECK(lock.mutex() == &mutex_);
212   DCHECK(lock.owns_lock());
213
214   // The function to run will be at the end of functions_ already.
215   //
216   // Fully remove it from functions_ now.
217   // We need to release mutex_ while we invoke this function, and we need to
218   // maintain the heap property on functions_ while mutex_ is unlocked.
219   RepeatFunc func(std::move(functions_.back()));
220   functions_.pop_back();
221   currentFunction_ = &func;
222
223   // Update the function's run time, and re-insert it into the heap.
224   if (steady_) {
225     // This allows scheduler to catch up
226     func.lastRunTime += func.timeInterval;
227   } else {
228     // Note that we adjust lastRunTime to the current time where we started the
229     // function call, rather than the time when the function finishes.
230     // This ensures that we call the function once every time interval, as
231     // opposed to waiting time interval seconds between calls.  (These can be
232     // different if the function takes a significant amount of time to run.)
233     func.lastRunTime = now;
234   }
235
236   // Release the lock while we invoke the user's function
237   lock.unlock();
238
239   // Invoke the function
240   try {
241     VLOG(5) << "Now running " << func.name;
242     func.cb();
243   } catch (const std::exception& ex) {
244     LOG(ERROR) << "Error running the scheduled function <"
245       << func.name << ">: " << exceptionStr(ex);
246   }
247
248   // Re-acquire the lock
249   lock.lock();
250
251   if (!currentFunction_) {
252     // The function was cancelled while we were running it.
253     // We shouldn't reschedule it;
254     return;
255   }
256   // Clear currentFunction_
257   CHECK_EQ(currentFunction_, &func);
258   currentFunction_ = nullptr;
259
260   // Re-insert the function into our functions_ heap.
261   // We only maintain the heap property while running_ is set.  (running_ may
262   // have been cleared while we were invoking the user's function.)
263   if (func.isPoissonDistr) {
264     func.setTimeIntervalPoissonDistr();
265   }
266   functions_.push_back(std::move(func));
267   if (running_) {
268     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
269   }
270 }
271
272 void FunctionScheduler::setThreadName(StringPiece threadName) {
273   std::unique_lock<std::mutex> l(mutex_);
274   threadName_ = threadName.str();
275 }
276
277 }