Allow a timer with pending callbacks to be destroyed
[folly.git] / folly / io / async / HHWheelTimer.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements. See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership. The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License. You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied. See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21 #include <folly/io/async/HHWheelTimer.h>
22 #include <folly/io/async/Request.h>
23
24 #include <folly/Optional.h>
25 #include <folly/ScopeGuard.h>
26
27 #include <cassert>
28
29 using std::chrono::milliseconds;
30
31 namespace folly {
32
33 /**
34  * We want to select the default interval carefully.
35  * An interval of 10ms will give us 10ms * WHEEL_SIZE^WHEEL_BUCKETS
36  * for the largest timeout possible, or about 497 days.
37  *
38  * For a lower bound, we want a reasonable limit on local IO, 10ms
39  * seems short enough
40  *
41  * A shorter interval also has CPU implications, less than 1ms might
42  * start showing up in cpu perf.  Also, it might not be possible to set
43  * tick interval less than 10ms on older kernels.
44  */
45 int HHWheelTimer::DEFAULT_TICK_INTERVAL = 10;
46
47 HHWheelTimer::Callback::~Callback() {
48   if (isScheduled()) {
49     cancelTimeout();
50   }
51 }
52
53 void HHWheelTimer::Callback::setScheduled(HHWheelTimer* wheel,
54                                           std::chrono::milliseconds timeout) {
55   assert(wheel_ == nullptr);
56   assert(expiration_ == milliseconds(0));
57
58   wheel_ = wheel;
59
60   // Only update the now_ time if we're not in a timeout expired callback
61   if (wheel_->count_  == 0 && !wheel_->processingCallbacksGuard_) {
62     wheel_->now_ = getCurTime();
63   }
64
65   expiration_ = wheel_->now_ + timeout;
66 }
67
68 void HHWheelTimer::Callback::cancelTimeoutImpl() {
69   if (--wheel_->count_ <= 0) {
70     assert(wheel_->count_ == 0);
71     wheel_->AsyncTimeout::cancelTimeout();
72   }
73   hook_.unlink();
74
75   wheel_ = nullptr;
76   expiration_ = milliseconds(0);
77 }
78
79 HHWheelTimer::HHWheelTimer(
80     folly::TimeoutManager* timeoutMananger,
81     std::chrono::milliseconds intervalMS,
82     AsyncTimeout::InternalEnum internal,
83     std::chrono::milliseconds defaultTimeoutMS)
84     : AsyncTimeout(timeoutMananger, internal),
85       interval_(intervalMS),
86       defaultTimeout_(defaultTimeoutMS),
87       nextTick_(1),
88       count_(0),
89       catchupEveryN_(DEFAULT_CATCHUP_EVERY_N),
90       expirationsSinceCatchup_(0),
91       processingCallbacksGuard_(nullptr) {}
92
93 HHWheelTimer::~HHWheelTimer() {
94   // Ensure this gets done, but right before destruction finishes.
95   auto destructionPublisherGuard = folly::makeGuard([&] {
96     // Inform the subscriber that this instance is doomed.
97     if (processingCallbacksGuard_) {
98       *processingCallbacksGuard_ = true;
99     }
100   });
101   cancelAll();
102 }
103
104 void HHWheelTimer::scheduleTimeoutImpl(Callback* callback,
105                                        std::chrono::milliseconds timeout) {
106   int64_t due = timeToWheelTicks(timeout) + nextTick_;
107   int64_t diff = due - nextTick_;
108   CallbackList* list;
109
110   if (diff < 0) {
111     list = &buckets_[0][nextTick_ & WHEEL_MASK];
112   } else if (diff < WHEEL_SIZE) {
113     list = &buckets_[0][due & WHEEL_MASK];
114   } else if (diff < 1 << (2 * WHEEL_BITS)) {
115     list = &buckets_[1][(due >> WHEEL_BITS) & WHEEL_MASK];
116   } else if (diff < 1 << (3 * WHEEL_BITS)) {
117     list = &buckets_[2][(due >> 2 * WHEEL_BITS) & WHEEL_MASK];
118   } else {
119     /* in largest slot */
120     if (diff > LARGEST_SLOT) {
121       diff = LARGEST_SLOT;
122       due = diff + nextTick_;
123     }
124     list = &buckets_[3][(due >> 3 * WHEEL_BITS) & WHEEL_MASK];
125   }
126   list->push_back(*callback);
127 }
128
129 void HHWheelTimer::scheduleTimeout(Callback* callback,
130                                    std::chrono::milliseconds timeout) {
131   // Cancel the callback if it happens to be scheduled already.
132   callback->cancelTimeout();
133
134   callback->context_ = RequestContext::saveContext();
135
136   if (count_ == 0 && !processingCallbacksGuard_) {
137     this->AsyncTimeout::scheduleTimeout(interval_.count());
138   }
139
140   callback->setScheduled(this, timeout);
141   scheduleTimeoutImpl(callback, timeout);
142   count_++;
143 }
144
145 void HHWheelTimer::scheduleTimeout(Callback* callback) {
146   CHECK(std::chrono::milliseconds(-1) != defaultTimeout_)
147       << "Default timeout was not initialized";
148   scheduleTimeout(callback, defaultTimeout_);
149 }
150
151 bool HHWheelTimer::cascadeTimers(int bucket, int tick) {
152   CallbackList cbs;
153   cbs.swap(buckets_[bucket][tick]);
154   while (!cbs.empty()) {
155     auto* cb = &cbs.front();
156     cbs.pop_front();
157     scheduleTimeoutImpl(cb, cb->getTimeRemaining(now_));
158   }
159
160   // If tick is zero, timeoutExpired will cascade the next bucket.
161   return tick == 0;
162 }
163
164 void HHWheelTimer::timeoutExpired() noexcept {
165   // If the last smart pointer for "this" is reset inside the callback's
166   // timeoutExpired(), then the guard will detect that it is time to bail from
167   // this method.
168   auto isDestroyed = false;
169   // If scheduleTimeout is called from a callback in this function, it may
170   // cause inconsistencies in the state of this object. As such, we need
171   // to treat these calls slightly differently.
172   CHECK(!processingCallbacksGuard_);
173   processingCallbacksGuard_ = &isDestroyed;
174   auto reEntryGuard = folly::makeGuard([&] {
175     if (!isDestroyed) {
176       processingCallbacksGuard_ = nullptr;
177     }
178   });
179
180   // timeoutExpired() can only be invoked directly from the event base loop.
181   // It should never be invoked recursively.
182   //
183   milliseconds catchup = now_ + interval_;
184   // If catchup is enabled, we may have missed multiple intervals, use
185   // currentTime() to check exactly.
186   if (++expirationsSinceCatchup_ >= catchupEveryN_) {
187     catchup = std::chrono::duration_cast<milliseconds>(
188       std::chrono::steady_clock::now().time_since_epoch());
189     expirationsSinceCatchup_ = 0;
190   }
191   while (now_ < catchup) {
192     now_ += interval_;
193
194     int idx = nextTick_ & WHEEL_MASK;
195     if (0 == idx) {
196       // Cascade timers
197       if (cascadeTimers(1, (nextTick_ >> WHEEL_BITS) & WHEEL_MASK) &&
198           cascadeTimers(2, (nextTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) {
199         cascadeTimers(3, (nextTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK);
200       }
201     }
202
203     nextTick_++;
204     CallbackList* cbs = &buckets_[0][idx];
205     while (!cbs->empty()) {
206       auto* cb = &cbs->front();
207       cbs->pop_front();
208       count_--;
209       cb->wheel_ = nullptr;
210       cb->expiration_ = milliseconds(0);
211       RequestContextScopeGuard rctx(cb->context_);
212       cb->timeoutExpired();
213       if (isDestroyed) {
214         // The HHWheelTimer itself has been destroyed. The other callbacks
215         // will have been cancelled from the destructor. Bail before causing
216         // damage.
217         return;
218       }
219     }
220   }
221   if (count_ > 0) {
222     this->AsyncTimeout::scheduleTimeout(interval_.count());
223   }
224 }
225
226 size_t HHWheelTimer::cancelAll() {
227   size_t count = 0;
228
229   if (count_ != 0) {
230     const uint64_t numElements = WHEEL_BUCKETS * WHEEL_SIZE;
231     auto maxBuckets = std::min(numElements, count_);
232     auto buckets = folly::make_unique<CallbackList[]>(maxBuckets);
233     size_t countBuckets = 0;
234     for (auto& tick : buckets_) {
235       for (auto& bucket : tick) {
236         if (bucket.empty()) {
237           continue;
238         }
239         for (auto& cb : bucket) {
240           count++;
241         }
242         std::swap(bucket, buckets[countBuckets++]);
243         if (count >= count_) {
244           break;
245         }
246       }
247     }
248
249     for (size_t i = 0; i < countBuckets; ++i) {
250       auto& bucket = buckets[i];
251       while (!bucket.empty()) {
252         auto& cb = bucket.front();
253         cb.cancelTimeout();
254         cb.callbackCanceled();
255       }
256     }
257   }
258
259   return count;
260 }
261
262 } // folly