Removing call to google::setThreadName from FunctionScheduler
[folly.git] / folly / experimental / FunctionScheduler.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/FunctionScheduler.h>
18 #include <folly/ThreadName.h>
19 #include <folly/Conv.h>
20 #include <folly/String.h>
21
22 #ifdef _POSIX_MONOTONIC_CLOCK
23 #define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_MONOTONIC
24 #else
25 #define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_REALTIME
26 #endif
27
28 using namespace std;
29 using std::chrono::seconds;
30 using std::chrono::milliseconds;
31
32 static milliseconds nowInMS() {
33   struct timespec ts /*= void*/;
34   if (clock_gettime(FOLLY_TIME_MONOTONIC_CLOCK, &ts)) {
35     // Only possible failures are EFAULT or EINVAL, both practically
36     // impossible. But an assert can't hurt.
37     assert(false);
38   }
39   return milliseconds(
40     static_cast<int64_t>(ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0 + 0.5));
41 }
42
43 namespace folly {
44
45 FunctionScheduler::FunctionScheduler() {
46 }
47
48 FunctionScheduler::~FunctionScheduler() {
49   // make sure to stop the thread (if running)
50   shutdown();
51 }
52
53 void FunctionScheduler::addFunction(const std::function<void()>& cb,
54                                     milliseconds interval,
55                                     StringPiece nameID,
56                                     milliseconds startDelay) {
57   LatencyDistribution latencyDistr(false, 0.0);
58   addFunctionInternal(cb, interval,
59                       latencyDistr, nameID, startDelay);
60 }
61
62 void FunctionScheduler::addFunctionInternal(const std::function<void()>& cb,
63                                     milliseconds interval,
64                                     const LatencyDistribution& latencyDistr,
65                                     StringPiece nameID,
66                                     milliseconds startDelay) {
67   if (interval < milliseconds::zero()) {
68     throw std::invalid_argument("FunctionScheduler: "
69                                 "time interval must be non-negative");
70   }
71   if (startDelay < milliseconds::zero()) {
72     throw std::invalid_argument("FunctionScheduler: "
73                                 "start delay must be non-negative");
74   }
75
76   std::lock_guard<std::mutex> l(mutex_);
77   // check if the nameID is unique
78   for (const auto& f : functions_) {
79     if (f.isValid() && f.name == nameID) {
80       throw std::invalid_argument(to<string>(
81             "FunctionScheduler: a function named \"", nameID,
82             "\" already exists"));
83     }
84   }
85   if (currentFunction_ && currentFunction_->name == nameID) {
86     throw std::invalid_argument(to<string>(
87           "FunctionScheduler: a function named \"", nameID,
88           "\" already exists"));
89   }
90
91   functions_.emplace_back(cb, interval, nameID.str(), startDelay,
92                           latencyDistr.isPoisson, latencyDistr.poissonMean);
93   if (running_) {
94     functions_.back().setNextRunTime(nowInMS() + startDelay);
95     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
96     // Signal the running thread to wake up and see if it needs to change it's
97     // current scheduling decision.
98     runningCondvar_.notify_one();
99   }
100 }
101
102 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
103   bool retValue = false;
104   std::unique_lock<std::mutex> l(mutex_);
105
106   if (currentFunction_ && currentFunction_->name == nameID) {
107     // This function is currently being run.  Clear currentFunction_
108     // The running thread will see this and won't reschedule the function.
109     currentFunction_ = nullptr;
110     return true;
111   }
112
113   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
114     if (it->isValid() && it->name == nameID) {
115       cancelFunction(l, it);
116       return true;
117     }
118   }
119   return false;
120 }
121
122 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
123                                        FunctionHeap::iterator it) {
124   // This function should only be called with mutex_ already locked.
125   DCHECK(l.mutex() == &mutex_);
126   DCHECK(l.owns_lock());
127
128   if (running_) {
129     // Internally gcc has an __adjust_heap() function to fill in a hole in the
130     // heap.  Unfortunately it isn't part of the standard API.
131     //
132     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
133     // When it's nextTimeInterval comes up, the runner thread will pop it from
134     // the heap and simply throw it away.
135     it->cancel();
136   } else {
137     // We're not running, so functions_ doesn't need to be maintained in heap
138     // order.
139     functions_.erase(it);
140   }
141 }
142
143 void FunctionScheduler::cancelAllFunctions() {
144   std::unique_lock<std::mutex> l(mutex_);
145   functions_.clear();
146 }
147
148 bool FunctionScheduler::start() {
149   std::unique_lock<std::mutex> l(mutex_);
150   if (running_) {
151     return false;
152   }
153
154   running_ = true;
155
156   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
157           << " functions.";
158   milliseconds now(nowInMS());
159   // Reset the next run time. for all functions.
160   // note: this is needed since one can shutdown() and start() again
161   for (auto& f : functions_) {
162     f.setNextRunTime(now + f.startDelay);
163     VLOG(1) << "   - func: "
164             << (f.name.empty() ? "(anon)" : f.name.c_str())
165             << ", period = " << f.timeInterval.count()
166             << "ms, delay = " << f.startDelay.count() << "ms";
167   }
168   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
169
170   thread_ = std::thread([&] { this->run(); });
171   return true;
172 }
173
174 void FunctionScheduler::shutdown() {
175   {
176     std::lock_guard<std::mutex> g(mutex_);
177     if (!running_) {
178       return;
179     }
180
181     running_ = false;
182     runningCondvar_.notify_one();
183   }
184   thread_.join();
185 }
186
187 void FunctionScheduler::run() {
188   std::unique_lock<std::mutex> lock(mutex_);
189
190   if (!threadName_.empty()) {
191     folly::setThreadName(threadName_);
192   }
193
194   while (running_) {
195     // If we have nothing to run, wait until a function is added or until we
196     // are stopped.
197     if (functions_.empty()) {
198       runningCondvar_.wait(lock);
199       continue;
200     }
201
202     milliseconds now(nowInMS());
203
204     // Move the next function to run to the end of functions_
205     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
206
207     // Check to see if the function was cancelled.
208     // If so, just remove it and continue around the loop.
209     if (!functions_.back().isValid()) {
210       functions_.pop_back();
211       continue;
212     }
213
214     auto sleepTime = functions_.back().getNextRunTime() - now;
215     if (sleepTime < milliseconds::zero()) {
216       // We need to run this function now
217       runOneFunction(lock, now);
218     } else {
219       // Re-add the function to the heap, and wait until we actually
220       // need to run it.
221       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
222       runningCondvar_.wait_for(lock, sleepTime);
223     }
224   }
225 }
226
227 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
228                                        std::chrono::milliseconds now) {
229   DCHECK(lock.mutex() == &mutex_);
230   DCHECK(lock.owns_lock());
231
232   // The function to run will be at the end of functions_ already.
233   //
234   // Fully remove it from functions_ now.
235   // We need to release mutex_ while we invoke this function, and we need to
236   // maintain the heap property on functions_ while mutex_ is unlocked.
237   RepeatFunc func(std::move(functions_.back()));
238   functions_.pop_back();
239   currentFunction_ = &func;
240
241   // Update the function's run time, and re-insert it into the heap.
242   if (steady_) {
243     // This allows scheduler to catch up
244     func.lastRunTime += func.timeInterval;
245   } else {
246     // Note that we adjust lastRunTime to the current time where we started the
247     // function call, rather than the time when the function finishes.
248     // This ensures that we call the function once every time interval, as
249     // opposed to waiting time interval seconds between calls.  (These can be
250     // different if the function takes a significant amount of time to run.)
251     func.lastRunTime = now;
252   }
253
254   // Release the lock while we invoke the user's function
255   lock.unlock();
256
257   // Invoke the function
258   try {
259     VLOG(5) << "Now running " << func.name;
260     func.cb();
261   } catch (const std::exception& ex) {
262     LOG(ERROR) << "Error running the scheduled function <"
263       << func.name << ">: " << exceptionStr(ex);
264   }
265
266   // Re-acquire the lock
267   lock.lock();
268
269   if (!currentFunction_) {
270     // The function was cancelled while we were running it.
271     // We shouldn't reschedule it;
272     return;
273   }
274   // Clear currentFunction_
275   CHECK_EQ(currentFunction_, &func);
276   currentFunction_ = nullptr;
277
278   // Re-insert the function into our functions_ heap.
279   // We only maintain the heap property while running_ is set.  (running_ may
280   // have been cleared while we were invoking the user's function.)
281   if (func.isPoissonDistr) {
282     func.setTimeIntervalPoissonDistr();
283   }
284   functions_.push_back(std::move(func));
285   if (running_) {
286     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
287   }
288 }
289
290 void FunctionScheduler::setThreadName(StringPiece threadName) {
291   std::unique_lock<std::mutex> l(mutex_);
292   threadName_ = threadName.str();
293 }
294
295 }