e9ea7543c1eb9f4ec3263239f9e1925c8da0b673
[folly.git] / folly / experimental / FunctionScheduler.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/FunctionScheduler.h>
18 #include <folly/ThreadName.h>
19 #include <folly/Conv.h>
20 #include <folly/String.h>
21
22 using namespace std;
23 using std::chrono::milliseconds;
24 using std::chrono::steady_clock;
25
26 namespace folly {
27
28 FunctionScheduler::FunctionScheduler() {
29 }
30
31 FunctionScheduler::~FunctionScheduler() {
32   // make sure to stop the thread (if running)
33   shutdown();
34 }
35
36 void FunctionScheduler::addFunction(const std::function<void()>& cb,
37                                     milliseconds interval,
38                                     StringPiece nameID,
39                                     milliseconds startDelay) {
40   LatencyDistribution latencyDistr(false, 0.0);
41   addFunction(cb, interval, latencyDistr, nameID, startDelay);
42 }
43
44 void FunctionScheduler::addFunction(const std::function<void()>& cb,
45                                     milliseconds interval,
46                                     const LatencyDistribution& latencyDistr,
47                                     StringPiece nameID,
48                                     milliseconds startDelay) {
49   if (interval < milliseconds::zero()) {
50     throw std::invalid_argument("FunctionScheduler: "
51                                 "time interval must be non-negative");
52   }
53   if (startDelay < milliseconds::zero()) {
54     throw std::invalid_argument("FunctionScheduler: "
55                                 "start delay must be non-negative");
56   }
57
58   std::lock_guard<std::mutex> l(mutex_);
59   // check if the nameID is unique
60   for (const auto& f : functions_) {
61     if (f.isValid() && f.name == nameID) {
62       throw std::invalid_argument(to<string>(
63             "FunctionScheduler: a function named \"", nameID,
64             "\" already exists"));
65     }
66   }
67   if (currentFunction_ && currentFunction_->name == nameID) {
68     throw std::invalid_argument(to<string>(
69           "FunctionScheduler: a function named \"", nameID,
70           "\" already exists"));
71   }
72
73   functions_.emplace_back(cb, interval, nameID.str(), startDelay,
74                           latencyDistr.isPoisson, latencyDistr.poissonMean);
75   if (running_) {
76     functions_.back().setNextRunTime(steady_clock::now() + startDelay);
77     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
78     // Signal the running thread to wake up and see if it needs to change it's
79     // current scheduling decision.
80     runningCondvar_.notify_one();
81   }
82 }
83
84 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
85   std::unique_lock<std::mutex> l(mutex_);
86
87   if (currentFunction_ && currentFunction_->name == nameID) {
88     // This function is currently being run.  Clear currentFunction_
89     // The running thread will see this and won't reschedule the function.
90     currentFunction_ = nullptr;
91     return true;
92   }
93
94   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
95     if (it->isValid() && it->name == nameID) {
96       cancelFunction(l, it);
97       return true;
98     }
99   }
100   return false;
101 }
102
103 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
104                                        FunctionHeap::iterator it) {
105   // This function should only be called with mutex_ already locked.
106   DCHECK(l.mutex() == &mutex_);
107   DCHECK(l.owns_lock());
108
109   if (running_) {
110     // Internally gcc has an __adjust_heap() function to fill in a hole in the
111     // heap.  Unfortunately it isn't part of the standard API.
112     //
113     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
114     // When it's nextTimeInterval comes up, the runner thread will pop it from
115     // the heap and simply throw it away.
116     it->cancel();
117   } else {
118     // We're not running, so functions_ doesn't need to be maintained in heap
119     // order.
120     functions_.erase(it);
121   }
122 }
123
124 void FunctionScheduler::cancelAllFunctions() {
125   std::unique_lock<std::mutex> l(mutex_);
126   functions_.clear();
127 }
128
129 bool FunctionScheduler::start() {
130   std::unique_lock<std::mutex> l(mutex_);
131   if (running_) {
132     return false;
133   }
134
135   running_ = true;
136
137   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
138           << " functions.";
139   auto now = steady_clock::now();
140   // Reset the next run time. for all functions.
141   // note: this is needed since one can shutdown() and start() again
142   for (auto& f : functions_) {
143     f.setNextRunTime(now + f.startDelay);
144     VLOG(1) << "   - func: "
145             << (f.name.empty() ? "(anon)" : f.name.c_str())
146             << ", period = " << f.timeInterval.count()
147             << "ms, delay = " << f.startDelay.count() << "ms";
148   }
149   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
150
151   thread_ = std::thread([&] { this->run(); });
152   return true;
153 }
154
155 void FunctionScheduler::shutdown() {
156   {
157     std::lock_guard<std::mutex> g(mutex_);
158     if (!running_) {
159       return;
160     }
161
162     running_ = false;
163     runningCondvar_.notify_one();
164   }
165   thread_.join();
166 }
167
168 void FunctionScheduler::run() {
169   std::unique_lock<std::mutex> lock(mutex_);
170
171   if (!threadName_.empty()) {
172     folly::setThreadName(threadName_);
173   }
174
175   while (running_) {
176     // If we have nothing to run, wait until a function is added or until we
177     // are stopped.
178     if (functions_.empty()) {
179       runningCondvar_.wait(lock);
180       continue;
181     }
182
183     auto now = steady_clock::now();
184
185     // Move the next function to run to the end of functions_
186     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
187
188     // Check to see if the function was cancelled.
189     // If so, just remove it and continue around the loop.
190     if (!functions_.back().isValid()) {
191       functions_.pop_back();
192       continue;
193     }
194
195     auto sleepTime = functions_.back().getNextRunTime() - now;
196     if (sleepTime < milliseconds::zero()) {
197       // We need to run this function now
198       runOneFunction(lock, now);
199     } else {
200       // Re-add the function to the heap, and wait until we actually
201       // need to run it.
202       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
203       runningCondvar_.wait_for(lock, sleepTime);
204     }
205   }
206 }
207
208 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
209                                        steady_clock::time_point now) {
210   DCHECK(lock.mutex() == &mutex_);
211   DCHECK(lock.owns_lock());
212
213   // The function to run will be at the end of functions_ already.
214   //
215   // Fully remove it from functions_ now.
216   // We need to release mutex_ while we invoke this function, and we need to
217   // maintain the heap property on functions_ while mutex_ is unlocked.
218   RepeatFunc func(std::move(functions_.back()));
219   functions_.pop_back();
220   currentFunction_ = &func;
221
222   // Update the function's run time, and re-insert it into the heap.
223   if (steady_) {
224     // This allows scheduler to catch up
225     func.lastRunTime += func.timeInterval;
226   } else {
227     // Note that we adjust lastRunTime to the current time where we started the
228     // function call, rather than the time when the function finishes.
229     // This ensures that we call the function once every time interval, as
230     // opposed to waiting time interval seconds between calls.  (These can be
231     // different if the function takes a significant amount of time to run.)
232     func.lastRunTime = now;
233   }
234
235   // Release the lock while we invoke the user's function
236   lock.unlock();
237
238   // Invoke the function
239   try {
240     VLOG(5) << "Now running " << func.name;
241     func.cb();
242   } catch (const std::exception& ex) {
243     LOG(ERROR) << "Error running the scheduled function <"
244       << func.name << ">: " << exceptionStr(ex);
245   }
246
247   // Re-acquire the lock
248   lock.lock();
249
250   if (!currentFunction_) {
251     // The function was cancelled while we were running it.
252     // We shouldn't reschedule it;
253     return;
254   }
255   // Clear currentFunction_
256   CHECK_EQ(currentFunction_, &func);
257   currentFunction_ = nullptr;
258
259   // Re-insert the function into our functions_ heap.
260   // We only maintain the heap property while running_ is set.  (running_ may
261   // have been cleared while we were invoking the user's function.)
262   if (func.isPoissonDistr) {
263     func.setTimeIntervalPoissonDistr();
264   }
265   functions_.push_back(std::move(func));
266   if (running_) {
267     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
268   }
269 }
270
271 void FunctionScheduler::setThreadName(StringPiece threadName) {
272   std::unique_lock<std::mutex> l(mutex_);
273   threadName_ = threadName.str();
274 }
275
276 }