Make HHWheelTimer take a TimeoutManager rather than EventBase
[folly.git] / folly / io / async / HHWheelTimer.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements. See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership. The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License. You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied. See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21 #include <folly/io/async/HHWheelTimer.h>
22 #include <folly/io/async/Request.h>
23
24 #include <folly/Optional.h>
25 #include <folly/ScopeGuard.h>
26
27 #include <cassert>
28
29 using std::chrono::milliseconds;
30
31 namespace folly {
32
33 /**
34  * We want to select the default interval carefully.
35  * An interval of 10ms will give us 10ms * WHEEL_SIZE^WHEEL_BUCKETS
36  * for the largest timeout possible, or about 497 days.
37  *
38  * For a lower bound, we want a reasonable limit on local IO, 10ms
39  * seems short enough
40  *
41  * A shorter interval also has CPU implications, less than 1ms might
42  * start showing up in cpu perf.  Also, it might not be possible to set
43  * tick interval less than 10ms on older kernels.
44  */
45 int HHWheelTimer::DEFAULT_TICK_INTERVAL = 10;
46
47 HHWheelTimer::Callback::~Callback() {
48   if (isScheduled()) {
49     cancelTimeout();
50   }
51 }
52
53 void HHWheelTimer::Callback::setScheduled(HHWheelTimer* wheel,
54                                           std::chrono::milliseconds timeout) {
55   assert(wheel_ == nullptr);
56   assert(expiration_ == milliseconds(0));
57
58   wheelGuard_ = DestructorGuard(wheel);
59   wheel_ = wheel;
60
61   // Only update the now_ time if we're not in a timeout expired callback
62   if (wheel_->count_  == 0 && !wheel_->processingCallbacksGuard_) {
63     wheel_->now_ = getCurTime();
64   }
65
66   expiration_ = wheel_->now_ + timeout;
67 }
68
69 void HHWheelTimer::Callback::cancelTimeoutImpl() {
70   if (--wheel_->count_ <= 0) {
71     assert(wheel_->count_ == 0);
72     wheel_->AsyncTimeout::cancelTimeout();
73   }
74   hook_.unlink();
75
76   wheel_ = nullptr;
77   wheelGuard_ = folly::none;
78   expiration_ = milliseconds(0);
79 }
80
81 HHWheelTimer::HHWheelTimer(folly::TimeoutManager* timeoutMananger,
82                            std::chrono::milliseconds intervalMS,
83                            AsyncTimeout::InternalEnum internal,
84                            std::chrono::milliseconds defaultTimeoutMS)
85     : AsyncTimeout(timeoutMananger, internal),
86       interval_(intervalMS),
87       defaultTimeout_(defaultTimeoutMS),
88       nextTick_(1),
89       count_(0),
90       catchupEveryN_(DEFAULT_CATCHUP_EVERY_N),
91       expirationsSinceCatchup_(0),
92       processingCallbacksGuard_(false) {}
93
94 HHWheelTimer::~HHWheelTimer() {
95   CHECK(count_ == 0);
96 }
97
98 void HHWheelTimer::destroy() {
99   assert(count_ == 0);
100   cancelAll();
101   DelayedDestruction::destroy();
102 }
103
104 void HHWheelTimer::scheduleTimeoutImpl(Callback* callback,
105                                        std::chrono::milliseconds timeout) {
106   int64_t due = timeToWheelTicks(timeout) + nextTick_;
107   int64_t diff = due - nextTick_;
108   CallbackList* list;
109
110   if (diff < 0) {
111     list = &buckets_[0][nextTick_ & WHEEL_MASK];
112   } else if (diff < WHEEL_SIZE) {
113     list = &buckets_[0][due & WHEEL_MASK];
114   } else if (diff < 1 << (2 * WHEEL_BITS)) {
115     list = &buckets_[1][(due >> WHEEL_BITS) & WHEEL_MASK];
116   } else if (diff < 1 << (3 * WHEEL_BITS)) {
117     list = &buckets_[2][(due >> 2 * WHEEL_BITS) & WHEEL_MASK];
118   } else {
119     /* in largest slot */
120     if (diff > LARGEST_SLOT) {
121       diff = LARGEST_SLOT;
122       due = diff + nextTick_;
123     }
124     list = &buckets_[3][(due >> 3 * WHEEL_BITS) & WHEEL_MASK];
125   }
126   list->push_back(*callback);
127 }
128
129 void HHWheelTimer::scheduleTimeout(Callback* callback,
130                                    std::chrono::milliseconds timeout) {
131   // Cancel the callback if it happens to be scheduled already.
132   callback->cancelTimeout();
133
134   callback->context_ = RequestContext::saveContext();
135
136   if (count_ == 0 && !processingCallbacksGuard_) {
137     this->AsyncTimeout::scheduleTimeout(interval_.count());
138   }
139
140   callback->setScheduled(this, timeout);
141   scheduleTimeoutImpl(callback, timeout);
142   count_++;
143 }
144
145 void HHWheelTimer::scheduleTimeout(Callback* callback) {
146   CHECK(std::chrono::milliseconds(-1) != defaultTimeout_)
147       << "Default timeout was not initialized";
148   scheduleTimeout(callback, defaultTimeout_);
149 }
150
151 bool HHWheelTimer::cascadeTimers(int bucket, int tick) {
152   CallbackList cbs;
153   cbs.swap(buckets_[bucket][tick]);
154   while (!cbs.empty()) {
155     auto* cb = &cbs.front();
156     cbs.pop_front();
157     scheduleTimeoutImpl(cb, cb->getTimeRemaining(now_));
158   }
159
160   // If tick is zero, timeoutExpired will cascade the next bucket.
161   return tick == 0;
162 }
163
164 void HHWheelTimer::timeoutExpired() noexcept {
165   // If destroy() is called inside timeoutExpired(), delay actual destruction
166   // until timeoutExpired() returns
167   DestructorGuard dg(this);
168   // If scheduleTimeout is called from a callback in this function, it may
169   // cause inconsistencies in the state of this object. As such, we need
170   // to treat these calls slightly differently.
171   processingCallbacksGuard_ = true;
172   auto reEntryGuard = folly::makeGuard([&] {
173     processingCallbacksGuard_ = false;
174   });
175
176   // timeoutExpired() can only be invoked directly from the event base loop.
177   // It should never be invoked recursively.
178   //
179   milliseconds catchup = now_ + interval_;
180   // If catchup is enabled, we may have missed multiple intervals, use
181   // currentTime() to check exactly.
182   if (++expirationsSinceCatchup_ >= catchupEveryN_) {
183     catchup = std::chrono::duration_cast<milliseconds>(
184       std::chrono::steady_clock::now().time_since_epoch());
185     expirationsSinceCatchup_ = 0;
186   }
187   while (now_ < catchup) {
188     now_ += interval_;
189
190     int idx = nextTick_ & WHEEL_MASK;
191     if (0 == idx) {
192       // Cascade timers
193       if (cascadeTimers(1, (nextTick_ >> WHEEL_BITS) & WHEEL_MASK) &&
194           cascadeTimers(2, (nextTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) {
195         cascadeTimers(3, (nextTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK);
196       }
197     }
198
199     nextTick_++;
200     CallbackList* cbs = &buckets_[0][idx];
201     while (!cbs->empty()) {
202       auto* cb = &cbs->front();
203       cbs->pop_front();
204       count_--;
205       cb->wheel_ = nullptr;
206       cb->expiration_ = milliseconds(0);
207       auto old_ctx =
208         RequestContext::setContext(cb->context_);
209       cb->timeoutExpired();
210       RequestContext::setContext(old_ctx);
211     }
212   }
213   if (count_ > 0) {
214     this->AsyncTimeout::scheduleTimeout(interval_.count());
215   }
216 }
217
218 size_t HHWheelTimer::cancelAll() {
219   size_t count = 0;
220
221   if (count_ != 0) {
222     decltype(buckets_) buckets;
223
224 // Work around std::swap() bug in libc++
225 //
226 // http://llvm.org/bugs/show_bug.cgi?id=22106
227 #if FOLLY_USE_LIBCPP
228     for (size_t i = 0; i < WHEEL_BUCKETS; ++i) {
229       for (size_t ii = 0; ii < WHEEL_SIZE; ++ii) {
230         std::swap(buckets_[i][ii], buckets[i][ii]);
231       }
232     }
233 #else
234     std::swap(buckets, buckets_);
235 #endif
236
237     for (auto& tick : buckets) {
238       for (auto& bucket : tick) {
239         while (!bucket.empty()) {
240           auto& cb = bucket.front();
241           cb.cancelTimeout();
242           cb.callbackCanceled();
243           count++;
244         }
245       }
246     }
247   }
248
249   return count;
250 }
251
252 } // folly