folly::FunctionScheduler: Adding capability to reset a function's timer
[folly.git] / folly / experimental / FunctionScheduler.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/FunctionScheduler.h>
18
19 #include <random>
20
21 #include <folly/Conv.h>
22 #include <folly/Random.h>
23 #include <folly/String.h>
24 #include <folly/ThreadName.h>
25
26 using std::chrono::milliseconds;
27 using std::chrono::steady_clock;
28
29 namespace folly {
30
31 namespace {
32
33 struct ConstIntervalFunctor {
34   const milliseconds constInterval;
35
36   explicit ConstIntervalFunctor(milliseconds interval)
37       : constInterval(interval) {
38     if (interval < milliseconds::zero()) {
39       throw std::invalid_argument(
40           "FunctionScheduler: "
41           "time interval must be non-negative");
42     }
43   }
44
45   milliseconds operator()() const { return constInterval; }
46 };
47
48 struct PoissonDistributionFunctor {
49   std::default_random_engine generator;
50   std::poisson_distribution<int> poissonRandom;
51
52   explicit PoissonDistributionFunctor(double meanPoissonMs)
53       : poissonRandom(meanPoissonMs) {
54     if (meanPoissonMs < 0.0) {
55       throw std::invalid_argument(
56           "FunctionScheduler: "
57           "Poisson mean interval must be non-negative");
58     }
59   }
60
61   milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
62 };
63
64 struct UniformDistributionFunctor {
65   std::default_random_engine generator;
66   std::uniform_int_distribution<> dist;
67
68   UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
69       : generator(Random::rand32()),
70         dist(minInterval.count(), maxInterval.count()) {
71     if (minInterval > maxInterval) {
72       throw std::invalid_argument(
73           "FunctionScheduler: "
74           "min time interval must be less or equal than max interval");
75     }
76     if (minInterval < milliseconds::zero()) {
77       throw std::invalid_argument(
78           "FunctionScheduler: "
79           "time interval must be non-negative");
80     }
81   }
82
83   milliseconds operator()() { return milliseconds(dist(generator)); }
84 };
85
86 } // anonymous namespace
87
88 FunctionScheduler::FunctionScheduler() {}
89
90 FunctionScheduler::~FunctionScheduler() {
91   // make sure to stop the thread (if running)
92   shutdown();
93 }
94
95 void FunctionScheduler::addFunction(const std::function<void()>& cb,
96                                     milliseconds interval,
97                                     StringPiece nameID,
98                                     milliseconds startDelay) {
99   addFunctionGenericDistribution(
100       cb,
101       IntervalDistributionFunc(ConstIntervalFunctor(interval)),
102       nameID.str(),
103       to<std::string>(interval.count(), "ms"),
104       startDelay);
105 }
106
107 void FunctionScheduler::addFunction(const std::function<void()>& cb,
108                                     milliseconds interval,
109                                     const LatencyDistribution& latencyDistr,
110                                     StringPiece nameID,
111                                     milliseconds startDelay) {
112   if (latencyDistr.isPoisson) {
113     addFunctionGenericDistribution(
114         cb,
115         IntervalDistributionFunc(
116             PoissonDistributionFunctor(latencyDistr.poissonMean)),
117         nameID.str(),
118         to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
119         startDelay);
120   } else {
121     addFunction(cb, interval, nameID, startDelay);
122   }
123 }
124
125 void FunctionScheduler::addFunctionUniformDistribution(
126     const std::function<void()>& cb,
127     milliseconds minInterval,
128     milliseconds maxInterval,
129     StringPiece nameID,
130     milliseconds startDelay) {
131   addFunctionGenericDistribution(
132       cb,
133       IntervalDistributionFunc(
134           UniformDistributionFunctor(minInterval, maxInterval)),
135       nameID.str(),
136       to<std::string>(
137           "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
138       startDelay);
139 }
140
141 void FunctionScheduler::addFunctionGenericDistribution(
142     const std::function<void()>& cb,
143     const IntervalDistributionFunc& intervalFunc,
144     const std::string& nameID,
145     const std::string& intervalDescr,
146     milliseconds startDelay) {
147   if (!cb) {
148     throw std::invalid_argument(
149         "FunctionScheduler: Scheduled function must be set");
150   }
151   if (!intervalFunc) {
152     throw std::invalid_argument(
153         "FunctionScheduler: interval distribution function must be set");
154   }
155   if (startDelay < milliseconds::zero()) {
156     throw std::invalid_argument(
157         "FunctionScheduler: start delay must be non-negative");
158   }
159
160   std::unique_lock<std::mutex> l(mutex_);
161   // check if the nameID is unique
162   for (const auto& f : functions_) {
163     if (f.isValid() && f.name == nameID) {
164       throw std::invalid_argument(
165           to<std::string>("FunctionScheduler: a function named \"",
166                           nameID,
167                           "\" already exists"));
168     }
169   }
170   if (currentFunction_ && currentFunction_->name == nameID) {
171     throw std::invalid_argument(to<std::string>(
172         "FunctionScheduler: a function named \"", nameID, "\" already exists"));
173   }
174
175   addFunctionToHeap(
176       l, RepeatFunc(cb, intervalFunc, nameID, intervalDescr, startDelay));
177 }
178
179 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
180   std::unique_lock<std::mutex> l(mutex_);
181
182   if (currentFunction_ && currentFunction_->name == nameID) {
183     // This function is currently being run. Clear currentFunction_
184     // The running thread will see this and won't reschedule the function.
185     currentFunction_ = nullptr;
186     return true;
187   }
188
189   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
190     if (it->isValid() && it->name == nameID) {
191       cancelFunction(l, it);
192       return true;
193     }
194   }
195   return false;
196 }
197
198 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
199                                        FunctionHeap::iterator it) {
200   // This function should only be called with mutex_ already locked.
201   DCHECK(l.mutex() == &mutex_);
202   DCHECK(l.owns_lock());
203
204   if (running_) {
205     // Internally gcc has an __adjust_heap() function to fill in a hole in the
206     // heap.  Unfortunately it isn't part of the standard API.
207     //
208     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
209     // When its nextTimeInterval comes up, the runner thread will pop it from
210     // the heap and simply throw it away.
211     it->cancel();
212   } else {
213     // We're not running, so functions_ doesn't need to be maintained in heap
214     // order.
215     functions_.erase(it);
216   }
217 }
218
219 void FunctionScheduler::cancelAllFunctions() {
220   std::unique_lock<std::mutex> l(mutex_);
221   functions_.clear();
222 }
223
224 bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
225   std::unique_lock<std::mutex> l(mutex_);
226   if (currentFunction_ && currentFunction_->name == nameID) {
227     RepeatFunc* funcPtrCopy = currentFunction_;
228     // This function is currently being run. Clear currentFunction_
229     // to avoid rescheduling it, and add the function again to honor the
230     // startDelay.
231     currentFunction_ = nullptr;
232     addFunctionToHeap(l, std::move(*funcPtrCopy));
233     return true;
234   }
235
236   // Since __adjust_heap() isn't a part of the standard API, there's no way to
237   // fix the heap ordering if we adjust the key (nextRunTime) for the existing
238   // RepeatFunc. Instead, we just cancel it and add an identical object.
239   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
240     if (it->isValid() && it->name == nameID) {
241       RepeatFunc funcCopy(std::move(*it));
242       cancelFunction(l, it);
243       addFunctionToHeap(l, std::move(funcCopy));
244       return true;
245     }
246   }
247   return false;
248 }
249
250 bool FunctionScheduler::start() {
251   std::unique_lock<std::mutex> l(mutex_);
252   if (running_) {
253     return false;
254   }
255
256   running_ = true;
257
258   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
259           << " functions.";
260   auto now = steady_clock::now();
261   // Reset the next run time. for all functions.
262   // note: this is needed since one can shutdown() and start() again
263   for (auto& f : functions_) {
264     f.resetNextRunTime(now);
265     VLOG(1) << "   - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
266             << ", period = " << f.intervalDescr
267             << ", delay = " << f.startDelay.count() << "ms";
268   }
269   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
270
271   thread_ = std::thread([&] { this->run(); });
272   return true;
273 }
274
275 void FunctionScheduler::shutdown() {
276   {
277     std::lock_guard<std::mutex> g(mutex_);
278     if (!running_) {
279       return;
280     }
281
282     running_ = false;
283     runningCondvar_.notify_one();
284   }
285   thread_.join();
286 }
287
288 void FunctionScheduler::run() {
289   std::unique_lock<std::mutex> lock(mutex_);
290
291   if (!threadName_.empty()) {
292     folly::setThreadName(threadName_);
293   }
294
295   while (running_) {
296     // If we have nothing to run, wait until a function is added or until we
297     // are stopped.
298     if (functions_.empty()) {
299       runningCondvar_.wait(lock);
300       continue;
301     }
302
303     auto now = steady_clock::now();
304
305     // Move the next function to run to the end of functions_
306     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
307
308     // Check to see if the function was cancelled.
309     // If so, just remove it and continue around the loop.
310     if (!functions_.back().isValid()) {
311       functions_.pop_back();
312       continue;
313     }
314
315     auto sleepTime = functions_.back().getNextRunTime() - now;
316     if (sleepTime < milliseconds::zero()) {
317       // We need to run this function now
318       runOneFunction(lock, now);
319     } else {
320       // Re-add the function to the heap, and wait until we actually
321       // need to run it.
322       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
323       runningCondvar_.wait_for(lock, sleepTime);
324     }
325   }
326 }
327
328 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
329                                        steady_clock::time_point now) {
330   DCHECK(lock.mutex() == &mutex_);
331   DCHECK(lock.owns_lock());
332
333   // The function to run will be at the end of functions_ already.
334   //
335   // Fully remove it from functions_ now.
336   // We need to release mutex_ while we invoke this function, and we need to
337   // maintain the heap property on functions_ while mutex_ is unlocked.
338   RepeatFunc func(std::move(functions_.back()));
339   functions_.pop_back();
340   if (!func.cb) {
341     VLOG(5) << func.name << "function has been canceled while waiting";
342     return;
343   }
344   currentFunction_ = &func;
345
346   // Update the function's next run time.
347   if (steady_) {
348     // This allows scheduler to catch up
349     func.setNextRunTimeSteady();
350   } else {
351     // Note that we set nextRunTime based on the current time where we started
352     // the function call, rather than the time when the function finishes.
353     // This ensures that we call the function once every time interval, as
354     // opposed to waiting time interval seconds between calls.  (These can be
355     // different if the function takes a significant amount of time to run.)
356     func.setNextRunTimeStrict(now);
357   }
358
359   // Release the lock while we invoke the user's function
360   lock.unlock();
361
362   // Invoke the function
363   try {
364     VLOG(5) << "Now running " << func.name;
365     func.cb();
366   } catch (const std::exception& ex) {
367     LOG(ERROR) << "Error running the scheduled function <"
368       << func.name << ">: " << exceptionStr(ex);
369   }
370
371   // Re-acquire the lock
372   lock.lock();
373
374   if (!currentFunction_) {
375     // The function was cancelled while we were running it.
376     // We shouldn't reschedule it;
377     return;
378   }
379   // Clear currentFunction_
380   CHECK_EQ(currentFunction_, &func);
381   currentFunction_ = nullptr;
382
383   // Re-insert the function into our functions_ heap.
384   // We only maintain the heap property while running_ is set.  (running_ may
385   // have been cleared while we were invoking the user's function.)
386   functions_.push_back(std::move(func));
387   if (running_) {
388     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
389   }
390 }
391
392 void FunctionScheduler::addFunctionToHeap(
393     const std::unique_lock<std::mutex>& lock,
394     RepeatFunc&& func) {
395   // This function should only be called with mutex_ already locked.
396   DCHECK(lock.mutex() == &mutex_);
397   DCHECK(lock.owns_lock());
398
399   functions_.emplace_back(std::move(func));
400   if (running_) {
401     functions_.back().resetNextRunTime(steady_clock::now());
402     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
403     // Signal the running thread to wake up and see if it needs to change
404     // its current scheduling decision.
405     runningCondvar_.notify_one();
406   }
407 }
408
409 void FunctionScheduler::setThreadName(StringPiece threadName) {
410   std::unique_lock<std::mutex> l(mutex_);
411   threadName_ = threadName.str();
412 }
413
414 }