9a12256ffb84020716336bcdcdb2cf02cd3c4962
[folly.git] / folly / experimental / FunctionScheduler.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/FunctionScheduler.h>
18 #include <folly/ThreadName.h>
19 #include <folly/Conv.h>
20 #include <folly/String.h>
21
22 #ifdef _POSIX_MONOTONIC_CLOCK
23 #define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_MONOTONIC
24 #else
25 #define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_REALTIME
26 #endif
27
28 using namespace std;
29 using std::chrono::seconds;
30 using std::chrono::milliseconds;
31
32 static milliseconds nowInMS() {
33   struct timespec ts /*= void*/;
34   if (clock_gettime(FOLLY_TIME_MONOTONIC_CLOCK, &ts)) {
35     // Only possible failures are EFAULT or EINVAL, both practically
36     // impossible. But an assert can't hurt.
37     assert(false);
38   }
39   return milliseconds(
40     static_cast<int64_t>(ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0 + 0.5));
41 }
42
43 namespace folly {
44
45 FunctionScheduler::FunctionScheduler() {
46 }
47
48 FunctionScheduler::~FunctionScheduler() {
49   // make sure to stop the thread (if running)
50   shutdown();
51 }
52
53 void FunctionScheduler::addFunction(const std::function<void()>& cb,
54                                     milliseconds interval,
55                                     StringPiece nameID,
56                                     milliseconds startDelay) {
57   LatencyDistribution latencyDistr(false, 0.0);
58   addFunctionInternal(cb, interval,
59                       latencyDistr, nameID, startDelay);
60 }
61
62 void FunctionScheduler::addFunctionInternal(const std::function<void()>& cb,
63                                     milliseconds interval,
64                                     const LatencyDistribution& latencyDistr,
65                                     StringPiece nameID,
66                                     milliseconds startDelay) {
67   if (interval < milliseconds::zero()) {
68     throw std::invalid_argument("FunctionScheduler: "
69                                 "time interval must be non-negative");
70   }
71   if (startDelay < milliseconds::zero()) {
72     throw std::invalid_argument("FunctionScheduler: "
73                                 "start delay must be non-negative");
74   }
75
76   std::lock_guard<std::mutex> l(mutex_);
77   // check if the nameID is unique
78   for (const auto& f : functions_) {
79     if (f.isValid() && f.name == nameID) {
80       throw std::invalid_argument(to<string>(
81             "FunctionScheduler: a function named \"", nameID,
82             "\" already exists"));
83     }
84   }
85   if (currentFunction_ && currentFunction_->name == nameID) {
86     throw std::invalid_argument(to<string>(
87           "FunctionScheduler: a function named \"", nameID,
88           "\" already exists"));
89   }
90
91   functions_.emplace_back(cb, interval, nameID.str(), startDelay,
92                           latencyDistr.isPoisson, latencyDistr.poissonMean);
93   if (running_) {
94     functions_.back().setNextRunTime(nowInMS() + startDelay);
95     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
96     // Signal the running thread to wake up and see if it needs to change it's
97     // current scheduling decision.
98     runningCondvar_.notify_one();
99   }
100 }
101
102 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
103   std::unique_lock<std::mutex> l(mutex_);
104
105   if (currentFunction_ && currentFunction_->name == nameID) {
106     // This function is currently being run.  Clear currentFunction_
107     // The running thread will see this and won't reschedule the function.
108     currentFunction_ = nullptr;
109     return true;
110   }
111
112   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
113     if (it->isValid() && it->name == nameID) {
114       cancelFunction(l, it);
115       return true;
116     }
117   }
118   return false;
119 }
120
121 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
122                                        FunctionHeap::iterator it) {
123   // This function should only be called with mutex_ already locked.
124   DCHECK(l.mutex() == &mutex_);
125   DCHECK(l.owns_lock());
126
127   if (running_) {
128     // Internally gcc has an __adjust_heap() function to fill in a hole in the
129     // heap.  Unfortunately it isn't part of the standard API.
130     //
131     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
132     // When it's nextTimeInterval comes up, the runner thread will pop it from
133     // the heap and simply throw it away.
134     it->cancel();
135   } else {
136     // We're not running, so functions_ doesn't need to be maintained in heap
137     // order.
138     functions_.erase(it);
139   }
140 }
141
142 void FunctionScheduler::cancelAllFunctions() {
143   std::unique_lock<std::mutex> l(mutex_);
144   functions_.clear();
145 }
146
147 bool FunctionScheduler::start() {
148   std::unique_lock<std::mutex> l(mutex_);
149   if (running_) {
150     return false;
151   }
152
153   running_ = true;
154
155   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
156           << " functions.";
157   milliseconds now(nowInMS());
158   // Reset the next run time. for all functions.
159   // note: this is needed since one can shutdown() and start() again
160   for (auto& f : functions_) {
161     f.setNextRunTime(now + f.startDelay);
162     VLOG(1) << "   - func: "
163             << (f.name.empty() ? "(anon)" : f.name.c_str())
164             << ", period = " << f.timeInterval.count()
165             << "ms, delay = " << f.startDelay.count() << "ms";
166   }
167   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
168
169   thread_ = std::thread([&] { this->run(); });
170   return true;
171 }
172
173 void FunctionScheduler::shutdown() {
174   {
175     std::lock_guard<std::mutex> g(mutex_);
176     if (!running_) {
177       return;
178     }
179
180     running_ = false;
181     runningCondvar_.notify_one();
182   }
183   thread_.join();
184 }
185
186 void FunctionScheduler::run() {
187   std::unique_lock<std::mutex> lock(mutex_);
188
189   if (!threadName_.empty()) {
190     folly::setThreadName(threadName_);
191   }
192
193   while (running_) {
194     // If we have nothing to run, wait until a function is added or until we
195     // are stopped.
196     if (functions_.empty()) {
197       runningCondvar_.wait(lock);
198       continue;
199     }
200
201     milliseconds now(nowInMS());
202
203     // Move the next function to run to the end of functions_
204     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
205
206     // Check to see if the function was cancelled.
207     // If so, just remove it and continue around the loop.
208     if (!functions_.back().isValid()) {
209       functions_.pop_back();
210       continue;
211     }
212
213     auto sleepTime = functions_.back().getNextRunTime() - now;
214     if (sleepTime < milliseconds::zero()) {
215       // We need to run this function now
216       runOneFunction(lock, now);
217     } else {
218       // Re-add the function to the heap, and wait until we actually
219       // need to run it.
220       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
221       runningCondvar_.wait_for(lock, sleepTime);
222     }
223   }
224 }
225
226 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
227                                        std::chrono::milliseconds now) {
228   DCHECK(lock.mutex() == &mutex_);
229   DCHECK(lock.owns_lock());
230
231   // The function to run will be at the end of functions_ already.
232   //
233   // Fully remove it from functions_ now.
234   // We need to release mutex_ while we invoke this function, and we need to
235   // maintain the heap property on functions_ while mutex_ is unlocked.
236   RepeatFunc func(std::move(functions_.back()));
237   functions_.pop_back();
238   currentFunction_ = &func;
239
240   // Update the function's run time, and re-insert it into the heap.
241   if (steady_) {
242     // This allows scheduler to catch up
243     func.lastRunTime += func.timeInterval;
244   } else {
245     // Note that we adjust lastRunTime to the current time where we started the
246     // function call, rather than the time when the function finishes.
247     // This ensures that we call the function once every time interval, as
248     // opposed to waiting time interval seconds between calls.  (These can be
249     // different if the function takes a significant amount of time to run.)
250     func.lastRunTime = now;
251   }
252
253   // Release the lock while we invoke the user's function
254   lock.unlock();
255
256   // Invoke the function
257   try {
258     VLOG(5) << "Now running " << func.name;
259     func.cb();
260   } catch (const std::exception& ex) {
261     LOG(ERROR) << "Error running the scheduled function <"
262       << func.name << ">: " << exceptionStr(ex);
263   }
264
265   // Re-acquire the lock
266   lock.lock();
267
268   if (!currentFunction_) {
269     // The function was cancelled while we were running it.
270     // We shouldn't reschedule it;
271     return;
272   }
273   // Clear currentFunction_
274   CHECK_EQ(currentFunction_, &func);
275   currentFunction_ = nullptr;
276
277   // Re-insert the function into our functions_ heap.
278   // We only maintain the heap property while running_ is set.  (running_ may
279   // have been cleared while we were invoking the user's function.)
280   if (func.isPoissonDistr) {
281     func.setTimeIntervalPoissonDistr();
282   }
283   functions_.push_back(std::move(func));
284   if (running_) {
285     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
286   }
287 }
288
289 void FunctionScheduler::setThreadName(StringPiece threadName) {
290   std::unique_lock<std::mutex> l(mutex_);
291   threadName_ = threadName.str();
292 }
293
294 }