Migrate FunctionScheduler from common/concurrency/ to folly/experimental/
[folly.git] / folly / experimental / FunctionScheduler.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/FunctionScheduler.h>
18 #include <folly/ThreadName.h>
19 #include <folly/Conv.h>
20 #include <folly/String.h>
21
22 #ifdef _POSIX_MONOTONIC_CLOCK
23 #define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_MONOTONIC
24 #else
25 #define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_REALTIME
26 #endif
27
28 using namespace std;
29 using std::chrono::seconds;
30 using std::chrono::milliseconds;
31
32 static milliseconds nowInMS() {
33   struct timespec ts /*= void*/;
34   if (clock_gettime(FOLLY_TIME_MONOTONIC_CLOCK, &ts)) {
35     // Only possible failures are EFAULT or EINVAL, both practically
36     // impossible. But an assert can't hurt.
37     assert(false);
38   }
39   return milliseconds(
40     static_cast<int64_t>(ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0 + 0.5));
41 }
42
43 namespace folly {
44
45 FunctionScheduler::FunctionScheduler() {
46 }
47
48 FunctionScheduler::~FunctionScheduler() {
49   // make sure to stop the thread (if running)
50   shutdown();
51 }
52
53 void FunctionScheduler::addFunction(const std::function<void()>& cb,
54                                     milliseconds interval,
55                                     StringPiece nameID,
56                                     milliseconds startDelay) {
57   LatencyDistribution latencyDistr(false, 0.0);
58   addFunctionInternal(cb, interval,
59                       latencyDistr, nameID, startDelay);
60 }
61
62 void FunctionScheduler::addFunctionInternal(const std::function<void()>& cb,
63                                     milliseconds interval,
64                                     const LatencyDistribution& latencyDistr,
65                                     StringPiece nameID,
66                                     milliseconds startDelay) {
67   if (interval < milliseconds::zero()) {
68     throw std::invalid_argument("FunctionScheduler: "
69                                 "time interval must be non-negative");
70   }
71   if (startDelay < milliseconds::zero()) {
72     throw std::invalid_argument("FunctionScheduler: "
73                                 "start delay must be non-negative");
74   }
75
76   std::lock_guard<std::mutex> l(mutex_);
77   // check if the nameID is unique
78   for (const auto& f : functions_) {
79     if (f.isValid() && f.name == nameID) {
80       throw std::invalid_argument(to<string>(
81             "FunctionScheduler: a function named \"", nameID,
82             "\" already exists"));
83     }
84   }
85   if (currentFunction_ && currentFunction_->name == nameID) {
86     throw std::invalid_argument(to<string>(
87           "FunctionScheduler: a function named \"", nameID,
88           "\" already exists"));
89   }
90
91   functions_.emplace_back(cb, interval, nameID.str(), startDelay,
92                           latencyDistr.isPoisson, latencyDistr.poissonMean);
93   if (running_) {
94     functions_.back().setNextRunTime(nowInMS() + startDelay);
95     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
96     // Signal the running thread to wake up and see if it needs to change it's
97     // current scheduling decision.
98     runningCondvar_.notify_one();
99   }
100 }
101
102 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
103   bool retValue = false;
104   std::unique_lock<std::mutex> l(mutex_);
105
106   if (currentFunction_ && currentFunction_->name == nameID) {
107     // This function is currently being run.  Clear currentFunction_
108     // The running thread will see this and won't reschedule the function.
109     currentFunction_ = nullptr;
110     return true;
111   }
112
113   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
114     if (it->isValid() && it->name == nameID) {
115       cancelFunction(l, it);
116       return true;
117     }
118   }
119   return false;
120 }
121
122 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
123                                        FunctionHeap::iterator it) {
124   // This function should only be called with mutex_ already locked.
125   DCHECK(l.mutex() == &mutex_);
126   DCHECK(l.owns_lock());
127
128   if (running_) {
129     // Internally gcc has an __adjust_heap() function to fill in a hole in the
130     // heap.  Unfortunately it isn't part of the standard API.
131     //
132     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
133     // When it's nextTimeInterval comes up, the runner thread will pop it from
134     // the heap and simply throw it away.
135     it->cancel();
136   } else {
137     // We're not running, so functions_ doesn't need to be maintained in heap
138     // order.
139     functions_.erase(it);
140   }
141 }
142
143 void FunctionScheduler::cancelAllFunctions() {
144   std::unique_lock<std::mutex> l(mutex_);
145   functions_.clear();
146 }
147
148 bool FunctionScheduler::start() {
149   std::unique_lock<std::mutex> l(mutex_);
150   if (running_) {
151     return false;
152   }
153
154   running_ = true;
155
156   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
157           << " functions.";
158   milliseconds now(nowInMS());
159   // Reset the next run time. for all functions.
160   // note: this is needed since one can shutdown() and start() again
161   for (auto& f : functions_) {
162     f.setNextRunTime(now + f.startDelay);
163     VLOG(1) << "   - func: "
164             << (f.name.empty() ? "(anon)" : f.name.c_str())
165             << ", period = " << f.timeInterval.count()
166             << "ms, delay = " << f.startDelay.count() << "ms";
167   }
168   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
169
170   thread_ = std::thread([&] { this->run(); });
171   return true;
172 }
173
174 void FunctionScheduler::shutdown() {
175   {
176     std::lock_guard<std::mutex> g(mutex_);
177     if (!running_) {
178       return;
179     }
180
181     running_ = false;
182     runningCondvar_.notify_one();
183   }
184   thread_.join();
185 }
186
187 void FunctionScheduler::run() {
188   std::unique_lock<std::mutex> lock(mutex_);
189
190   if (!threadName_.empty()) {
191     folly::setThreadName(threadName_);
192     google::setThreadName(threadName_);
193   }
194
195   while (running_) {
196     // If we have nothing to run, wait until a function is added or until we
197     // are stopped.
198     if (functions_.empty()) {
199       runningCondvar_.wait(lock);
200       continue;
201     }
202
203     milliseconds now(nowInMS());
204
205     // Move the next function to run to the end of functions_
206     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
207
208     // Check to see if the function was cancelled.
209     // If so, just remove it and continue around the loop.
210     if (!functions_.back().isValid()) {
211       functions_.pop_back();
212       continue;
213     }
214
215     auto sleepTime = functions_.back().getNextRunTime() - now;
216     if (sleepTime < milliseconds::zero()) {
217       // We need to run this function now
218       runOneFunction(lock, now);
219     } else {
220       // Re-add the function to the heap, and wait until we actually
221       // need to run it.
222       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
223       runningCondvar_.wait_for(lock, sleepTime);
224     }
225   }
226 }
227
228 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
229                                        std::chrono::milliseconds now) {
230   DCHECK(lock.mutex() == &mutex_);
231   DCHECK(lock.owns_lock());
232
233   // The function to run will be at the end of functions_ already.
234   //
235   // Fully remove it from functions_ now.
236   // We need to release mutex_ while we invoke this function, and we need to
237   // maintain the heap property on functions_ while mutex_ is unlocked.
238   RepeatFunc func(std::move(functions_.back()));
239   functions_.pop_back();
240   currentFunction_ = &func;
241
242   // Update the function's run time, and re-insert it into the heap.
243   if (steady_) {
244     // This allows scheduler to catch up
245     func.lastRunTime += func.timeInterval;
246   } else {
247     // Note that we adjust lastRunTime to the current time where we started the
248     // function call, rather than the time when the function finishes.
249     // This ensures that we call the function once every time interval, as
250     // opposed to waiting time interval seconds between calls.  (These can be
251     // different if the function takes a significant amount of time to run.)
252     func.lastRunTime = now;
253   }
254
255   // Release the lock while we invoke the user's function
256   lock.unlock();
257
258   // Invoke the function
259   try {
260     VLOG(5) << "Now running " << func.name;
261     func.cb();
262   } catch (const std::exception& ex) {
263     LOG(ERROR) << "Error running the scheduled function <"
264       << func.name << ">: " << exceptionStr(ex);
265   }
266
267   // Re-acquire the lock
268   lock.lock();
269
270   if (!currentFunction_) {
271     // The function was cancelled while we were running it.
272     // We shouldn't reschedule it;
273     return;
274   }
275   // Clear currentFunction_
276   CHECK_EQ(currentFunction_, &func);
277   currentFunction_ = nullptr;
278
279   // Re-insert the function into our functions_ heap.
280   // We only maintain the heap property while running_ is set.  (running_ may
281   // have been cleared while we were invoking the user's function.)
282   if (func.isPoissonDistr) {
283     func.setTimeIntervalPoissonDistr();
284   }
285   functions_.push_back(std::move(func));
286   if (running_) {
287     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
288   }
289 }
290
291 void FunctionScheduler::setThreadName(StringPiece threadName) {
292   std::unique_lock<std::mutex> l(mutex_);
293   threadName_ = threadName.str();
294 }
295
296 }