folly::FunctionScheduler: replace std::function w/ folly::Function
[folly.git] / folly / experimental / FunctionScheduler.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/FunctionScheduler.h>
18
19 #include <random>
20
21 #include <folly/Conv.h>
22 #include <folly/Random.h>
23 #include <folly/String.h>
24 #include <folly/ThreadName.h>
25
26 using std::chrono::milliseconds;
27 using std::chrono::steady_clock;
28
29 namespace folly {
30
31 namespace {
32
33 struct ConstIntervalFunctor {
34   const milliseconds constInterval;
35
36   explicit ConstIntervalFunctor(milliseconds interval)
37       : constInterval(interval) {
38     if (interval < milliseconds::zero()) {
39       throw std::invalid_argument(
40           "FunctionScheduler: "
41           "time interval must be non-negative");
42     }
43   }
44
45   milliseconds operator()() const { return constInterval; }
46 };
47
48 struct PoissonDistributionFunctor {
49   std::default_random_engine generator;
50   std::poisson_distribution<int> poissonRandom;
51
52   explicit PoissonDistributionFunctor(double meanPoissonMs)
53       : poissonRandom(meanPoissonMs) {
54     if (meanPoissonMs < 0.0) {
55       throw std::invalid_argument(
56           "FunctionScheduler: "
57           "Poisson mean interval must be non-negative");
58     }
59   }
60
61   milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
62 };
63
64 struct UniformDistributionFunctor {
65   std::default_random_engine generator;
66   std::uniform_int_distribution<> dist;
67
68   UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
69       : generator(Random::rand32()),
70         dist(minInterval.count(), maxInterval.count()) {
71     if (minInterval > maxInterval) {
72       throw std::invalid_argument(
73           "FunctionScheduler: "
74           "min time interval must be less or equal than max interval");
75     }
76     if (minInterval < milliseconds::zero()) {
77       throw std::invalid_argument(
78           "FunctionScheduler: "
79           "time interval must be non-negative");
80     }
81   }
82
83   milliseconds operator()() { return milliseconds(dist(generator)); }
84 };
85
86 } // anonymous namespace
87
88 FunctionScheduler::FunctionScheduler() {}
89
90 FunctionScheduler::~FunctionScheduler() {
91   // make sure to stop the thread (if running)
92   shutdown();
93 }
94
95 void FunctionScheduler::addFunction(Function<void()>&& cb,
96                                     milliseconds interval,
97                                     StringPiece nameID,
98                                     milliseconds startDelay) {
99   addFunctionGenericDistribution(
100       std::move(cb),
101       ConstIntervalFunctor(interval),
102       nameID.str(),
103       to<std::string>(interval.count(), "ms"),
104       startDelay);
105 }
106
107 void FunctionScheduler::addFunction(Function<void()>&& cb,
108                                     milliseconds interval,
109                                     const LatencyDistribution& latencyDistr,
110                                     StringPiece nameID,
111                                     milliseconds startDelay) {
112   if (latencyDistr.isPoisson) {
113     addFunctionGenericDistribution(
114         std::move(cb),
115         PoissonDistributionFunctor(latencyDistr.poissonMean),
116         nameID.str(),
117         to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
118         startDelay);
119   } else {
120     addFunction(std::move(cb), interval, nameID, startDelay);
121   }
122 }
123
124 void FunctionScheduler::addFunctionUniformDistribution(
125     Function<void()>&& cb,
126     milliseconds minInterval,
127     milliseconds maxInterval,
128     StringPiece nameID,
129     milliseconds startDelay) {
130   addFunctionGenericDistribution(
131       std::move(cb),
132       UniformDistributionFunctor(minInterval, maxInterval),
133       nameID.str(),
134       to<std::string>(
135           "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
136       startDelay);
137 }
138
139 void FunctionScheduler::addFunctionGenericDistribution(
140     Function<void()>&& cb,
141     IntervalDistributionFunc&& intervalFunc,
142     const std::string& nameID,
143     const std::string& intervalDescr,
144     milliseconds startDelay) {
145   if (!cb) {
146     throw std::invalid_argument(
147         "FunctionScheduler: Scheduled function must be set");
148   }
149   if (!intervalFunc) {
150     throw std::invalid_argument(
151         "FunctionScheduler: interval distribution function must be set");
152   }
153   if (startDelay < milliseconds::zero()) {
154     throw std::invalid_argument(
155         "FunctionScheduler: start delay must be non-negative");
156   }
157
158   std::unique_lock<std::mutex> l(mutex_);
159   // check if the nameID is unique
160   for (const auto& f : functions_) {
161     if (f.isValid() && f.name == nameID) {
162       throw std::invalid_argument(
163           to<std::string>("FunctionScheduler: a function named \"",
164                           nameID,
165                           "\" already exists"));
166     }
167   }
168   if (currentFunction_ && currentFunction_->name == nameID) {
169     throw std::invalid_argument(to<std::string>(
170         "FunctionScheduler: a function named \"", nameID, "\" already exists"));
171   }
172
173   addFunctionToHeap(
174       l,
175       RepeatFunc(
176           std::move(cb),
177           std::move(intervalFunc),
178           nameID,
179           intervalDescr,
180           startDelay));
181 }
182
183 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
184   std::unique_lock<std::mutex> l(mutex_);
185
186   if (currentFunction_ && currentFunction_->name == nameID) {
187     // This function is currently being run. Clear currentFunction_
188     // The running thread will see this and won't reschedule the function.
189     currentFunction_ = nullptr;
190     return true;
191   }
192
193   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
194     if (it->isValid() && it->name == nameID) {
195       cancelFunction(l, it);
196       return true;
197     }
198   }
199   return false;
200 }
201
202 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
203                                        FunctionHeap::iterator it) {
204   // This function should only be called with mutex_ already locked.
205   DCHECK(l.mutex() == &mutex_);
206   DCHECK(l.owns_lock());
207
208   if (running_) {
209     // Internally gcc has an __adjust_heap() function to fill in a hole in the
210     // heap.  Unfortunately it isn't part of the standard API.
211     //
212     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
213     // When its nextTimeInterval comes up, the runner thread will pop it from
214     // the heap and simply throw it away.
215     it->cancel();
216   } else {
217     // We're not running, so functions_ doesn't need to be maintained in heap
218     // order.
219     functions_.erase(it);
220   }
221 }
222
223 void FunctionScheduler::cancelAllFunctions() {
224   std::unique_lock<std::mutex> l(mutex_);
225   functions_.clear();
226 }
227
228 bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
229   std::unique_lock<std::mutex> l(mutex_);
230   if (currentFunction_ && currentFunction_->name == nameID) {
231     RepeatFunc* funcPtrCopy = currentFunction_;
232     // This function is currently being run. Clear currentFunction_
233     // to avoid rescheduling it, and add the function again to honor the
234     // startDelay.
235     currentFunction_ = nullptr;
236     addFunctionToHeap(l, std::move(*funcPtrCopy));
237     return true;
238   }
239
240   // Since __adjust_heap() isn't a part of the standard API, there's no way to
241   // fix the heap ordering if we adjust the key (nextRunTime) for the existing
242   // RepeatFunc. Instead, we just cancel it and add an identical object.
243   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
244     if (it->isValid() && it->name == nameID) {
245       RepeatFunc funcCopy(std::move(*it));
246       cancelFunction(l, it);
247       addFunctionToHeap(l, std::move(funcCopy));
248       return true;
249     }
250   }
251   return false;
252 }
253
254 bool FunctionScheduler::start() {
255   std::unique_lock<std::mutex> l(mutex_);
256   if (running_) {
257     return false;
258   }
259
260   running_ = true;
261
262   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
263           << " functions.";
264   auto now = steady_clock::now();
265   // Reset the next run time. for all functions.
266   // note: this is needed since one can shutdown() and start() again
267   for (auto& f : functions_) {
268     f.resetNextRunTime(now);
269     VLOG(1) << "   - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
270             << ", period = " << f.intervalDescr
271             << ", delay = " << f.startDelay.count() << "ms";
272   }
273   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
274
275   thread_ = std::thread([&] { this->run(); });
276   return true;
277 }
278
279 void FunctionScheduler::shutdown() {
280   {
281     std::lock_guard<std::mutex> g(mutex_);
282     if (!running_) {
283       return;
284     }
285
286     running_ = false;
287     runningCondvar_.notify_one();
288   }
289   thread_.join();
290 }
291
292 void FunctionScheduler::run() {
293   std::unique_lock<std::mutex> lock(mutex_);
294
295   if (!threadName_.empty()) {
296     folly::setThreadName(threadName_);
297   }
298
299   while (running_) {
300     // If we have nothing to run, wait until a function is added or until we
301     // are stopped.
302     if (functions_.empty()) {
303       runningCondvar_.wait(lock);
304       continue;
305     }
306
307     auto now = steady_clock::now();
308
309     // Move the next function to run to the end of functions_
310     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
311
312     // Check to see if the function was cancelled.
313     // If so, just remove it and continue around the loop.
314     if (!functions_.back().isValid()) {
315       functions_.pop_back();
316       continue;
317     }
318
319     auto sleepTime = functions_.back().getNextRunTime() - now;
320     if (sleepTime < milliseconds::zero()) {
321       // We need to run this function now
322       runOneFunction(lock, now);
323     } else {
324       // Re-add the function to the heap, and wait until we actually
325       // need to run it.
326       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
327       runningCondvar_.wait_for(lock, sleepTime);
328     }
329   }
330 }
331
332 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
333                                        steady_clock::time_point now) {
334   DCHECK(lock.mutex() == &mutex_);
335   DCHECK(lock.owns_lock());
336
337   // The function to run will be at the end of functions_ already.
338   //
339   // Fully remove it from functions_ now.
340   // We need to release mutex_ while we invoke this function, and we need to
341   // maintain the heap property on functions_ while mutex_ is unlocked.
342   RepeatFunc func(std::move(functions_.back()));
343   functions_.pop_back();
344   if (!func.cb) {
345     VLOG(5) << func.name << "function has been canceled while waiting";
346     return;
347   }
348   currentFunction_ = &func;
349
350   // Update the function's next run time.
351   if (steady_) {
352     // This allows scheduler to catch up
353     func.setNextRunTimeSteady();
354   } else {
355     // Note that we set nextRunTime based on the current time where we started
356     // the function call, rather than the time when the function finishes.
357     // This ensures that we call the function once every time interval, as
358     // opposed to waiting time interval seconds between calls.  (These can be
359     // different if the function takes a significant amount of time to run.)
360     func.setNextRunTimeStrict(now);
361   }
362
363   // Release the lock while we invoke the user's function
364   lock.unlock();
365
366   // Invoke the function
367   try {
368     VLOG(5) << "Now running " << func.name;
369     func.cb();
370   } catch (const std::exception& ex) {
371     LOG(ERROR) << "Error running the scheduled function <"
372       << func.name << ">: " << exceptionStr(ex);
373   }
374
375   // Re-acquire the lock
376   lock.lock();
377
378   if (!currentFunction_) {
379     // The function was cancelled while we were running it.
380     // We shouldn't reschedule it;
381     return;
382   }
383   // Clear currentFunction_
384   CHECK_EQ(currentFunction_, &func);
385   currentFunction_ = nullptr;
386
387   // Re-insert the function into our functions_ heap.
388   // We only maintain the heap property while running_ is set.  (running_ may
389   // have been cleared while we were invoking the user's function.)
390   functions_.push_back(std::move(func));
391   if (running_) {
392     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
393   }
394 }
395
396 void FunctionScheduler::addFunctionToHeap(
397     const std::unique_lock<std::mutex>& lock,
398     RepeatFunc&& func) {
399   // This function should only be called with mutex_ already locked.
400   DCHECK(lock.mutex() == &mutex_);
401   DCHECK(lock.owns_lock());
402
403   functions_.emplace_back(std::move(func));
404   if (running_) {
405     functions_.back().resetNextRunTime(steady_clock::now());
406     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
407     // Signal the running thread to wake up and see if it needs to change
408     // its current scheduling decision.
409     runningCondvar_.notify_one();
410   }
411 }
412
413 void FunctionScheduler::setThreadName(StringPiece threadName) {
414   std::unique_lock<std::mutex> l(mutex_);
415   threadName_ = threadName.str();
416 }
417
418 }