X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2FHHWheelTimer.cpp;h=96ad8afc4a627bca3fc07e6b3984817fc929d302;hb=b669462b65cacda010d6dca11bc56f9aee768ebc;hp=d762224891d565875918a9872caf3b7fe78cc9ca;hpb=062bc87d14e1135bff6ee43edc5f3a40d9ee5943;p=folly.git diff --git a/folly/io/async/HHWheelTimer.cpp b/folly/io/async/HHWheelTimer.cpp index d7622248..96ad8afc 100644 --- a/folly/io/async/HHWheelTimer.cpp +++ b/folly/io/async/HHWheelTimer.cpp @@ -1,28 +1,28 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2004-present Facebook, Inc. * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + #include #include +#include +#include #include +#include + #include using std::chrono::milliseconds; @@ -52,16 +52,11 @@ HHWheelTimer::Callback::~Callback() { void HHWheelTimer::Callback::setScheduled(HHWheelTimer* wheel, std::chrono::milliseconds timeout) { assert(wheel_ == nullptr); - assert(expiration_ == milliseconds(0)); + assert(expiration_ == decltype(expiration_){}); wheel_ = wheel; - // Only update the now_ time if we're not in a timeout expired callback - if (wheel_->count_ == 0 && !wheel_->processingCallbacksGuard_) { - wheel_->now_ = getCurTime(); - } - - expiration_ = wheel_->now_ + timeout; + expiration_ = getCurTime() + timeout; } void HHWheelTimer::Callback::cancelTimeoutImpl() { @@ -69,42 +64,66 @@ void HHWheelTimer::Callback::cancelTimeoutImpl() { assert(wheel_->count_ == 0); wheel_->AsyncTimeout::cancelTimeout(); } - hook_.unlink(); + unlink(); + if ((-1 != bucket_) && (wheel_->buckets_[0][bucket_].empty())) { + auto bi = makeBitIterator(wheel_->bitmap_.begin()); + *(bi + bucket_) = false; + } wheel_ = nullptr; - expiration_ = milliseconds(0); + expiration_ = {}; } -HHWheelTimer::HHWheelTimer(folly::EventBase* eventBase, - std::chrono::milliseconds intervalMS) - : AsyncTimeout(eventBase) - , interval_(intervalMS) - , nextTick_(1) - , count_(0) - , catchupEveryN_(DEFAULT_CATCHUP_EVERY_N) - , expirationsSinceCatchup_(0) - , processingCallbacksGuard_(false) -{ +HHWheelTimer::HHWheelTimer( + folly::TimeoutManager* timeoutMananger, + std::chrono::milliseconds intervalMS, + AsyncTimeout::InternalEnum internal, + std::chrono::milliseconds defaultTimeoutMS) + : AsyncTimeout(timeoutMananger, internal), + interval_(intervalMS), + defaultTimeout_(defaultTimeoutMS), + lastTick_(1), + expireTick_(1), + count_(0), + startTime_(getCurTime()), + processingCallbacksGuard_(nullptr) { + bitmap_.resize((WHEEL_SIZE / sizeof(uint64_t)) / 8, 0); } HHWheelTimer::~HHWheelTimer() { -} - -void HHWheelTimer::destroy() { - assert(count_ == 0); - DelayedDestruction::destroy(); + // Ensure this gets done, but right before destruction finishes. + auto destructionPublisherGuard = folly::makeGuard([&] { + // Inform the subscriber that this instance is doomed. + if (processingCallbacksGuard_) { + *processingCallbacksGuard_ = true; + } + }); + while (!timeouts.empty()) { + auto* cb = &timeouts.front(); + timeouts.pop_front(); + cb->cancelTimeout(); + cb->callbackCanceled(); + } + cancelAll(); } void HHWheelTimer::scheduleTimeoutImpl(Callback* callback, std::chrono::milliseconds timeout) { - int64_t due = timeToWheelTicks(timeout) + nextTick_; - int64_t diff = due - nextTick_; + auto nextTick = calcNextTick(); + int64_t due = timeToWheelTicks(timeout) + nextTick; + int64_t diff = due - nextTick; CallbackList* list; + auto bi = makeBitIterator(bitmap_.begin()); + if (diff < 0) { - list = &buckets_[0][nextTick_ & WHEEL_MASK]; + list = &buckets_[0][nextTick & WHEEL_MASK]; + *(bi + (nextTick & WHEEL_MASK)) = true; + callback->bucket_ = nextTick & WHEEL_MASK; } else if (diff < WHEEL_SIZE) { list = &buckets_[0][due & WHEEL_MASK]; + *(bi + (due & WHEEL_MASK)) = true; + callback->bucket_ = due & WHEEL_MASK; } else if (diff < 1 << (2 * WHEEL_BITS)) { list = &buckets_[1][(due >> WHEEL_BITS) & WHEEL_MASK]; } else if (diff < 1 << (3 * WHEEL_BITS)) { @@ -113,7 +132,7 @@ void HHWheelTimer::scheduleTimeoutImpl(Callback* callback, /* in largest slot */ if (diff > LARGEST_SLOT) { diff = LARGEST_SLOT; - due = diff + nextTick_; + due = diff + nextTick; } list = &buckets_[3][(due >> 3 * WHEEL_BITS) & WHEEL_MASK]; } @@ -127,13 +146,23 @@ void HHWheelTimer::scheduleTimeout(Callback* callback, callback->context_ = RequestContext::saveContext(); - if (count_ == 0 && !processingCallbacksGuard_) { - this->AsyncTimeout::scheduleTimeout(interval_.count()); - } + count_++; callback->setScheduled(this, timeout); scheduleTimeoutImpl(callback, timeout); - count_++; + + /* If we're calling callbacks, timer will be reset after all + * callbacks are called. + */ + if (!processingCallbacksGuard_) { + scheduleNextTimeout(); + } +} + +void HHWheelTimer::scheduleTimeout(Callback* callback) { + CHECK(std::chrono::milliseconds(-1) != defaultTimeout_) + << "Default timeout was not initialized"; + scheduleTimeout(callback, defaultTimeout_); } bool HHWheelTimer::cascadeTimers(int bucket, int tick) { @@ -142,7 +171,7 @@ bool HHWheelTimer::cascadeTimers(int bucket, int tick) { while (!cbs.empty()) { auto* cb = &cbs.front(); cbs.pop_front(); - scheduleTimeoutImpl(cb, cb->getTimeRemaining(now_)); + scheduleTimeoutImpl(cb, cb->getTimeRemaining(getCurTime())); } // If tick is zero, timeoutExpired will cascade the next bucket. @@ -150,71 +179,95 @@ bool HHWheelTimer::cascadeTimers(int bucket, int tick) { } void HHWheelTimer::timeoutExpired() noexcept { - // If destroy() is called inside timeoutExpired(), delay actual destruction - // until timeoutExpired() returns - DestructorGuard dg(this); + auto nextTick = calcNextTick(); + + // If the last smart pointer for "this" is reset inside the callback's + // timeoutExpired(), then the guard will detect that it is time to bail from + // this method. + auto isDestroyed = false; // If scheduleTimeout is called from a callback in this function, it may // cause inconsistencies in the state of this object. As such, we need // to treat these calls slightly differently. - processingCallbacksGuard_ = true; + CHECK(!processingCallbacksGuard_); + processingCallbacksGuard_ = &isDestroyed; auto reEntryGuard = folly::makeGuard([&] { - processingCallbacksGuard_ = false; + if (!isDestroyed) { + processingCallbacksGuard_ = nullptr; + } }); // timeoutExpired() can only be invoked directly from the event base loop. // It should never be invoked recursively. // - milliseconds catchup = now_ + interval_; - // If catchup is enabled, we may have missed multiple intervals, use - // currentTime() to check exactly. - if (++expirationsSinceCatchup_ >= catchupEveryN_) { - catchup = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()); - expirationsSinceCatchup_ = 0; - } - while (now_ < catchup) { - now_ += interval_; + lastTick_ = expireTick_; + while (lastTick_ < nextTick) { + int idx = lastTick_ & WHEEL_MASK; - int idx = nextTick_ & WHEEL_MASK; - if (0 == idx) { - // Cascade timers - if (cascadeTimers(1, (nextTick_ >> WHEEL_BITS) & WHEEL_MASK) && - cascadeTimers(2, (nextTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) { - cascadeTimers(3, (nextTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK); - } - } + auto bi = makeBitIterator(bitmap_.begin()); + *(bi + idx) = false; - nextTick_++; + lastTick_++; CallbackList* cbs = &buckets_[0][idx]; while (!cbs->empty()) { auto* cb = &cbs->front(); cbs->pop_front(); - count_--; - cb->wheel_ = nullptr; - cb->expiration_ = milliseconds(0); - auto old_ctx = - RequestContext::setContext(cb->context_); - cb->timeoutExpired(); - RequestContext::setContext(old_ctx); + timeouts.push_back(*cb); + } + + if (idx == 0) { + // Cascade timers + if (cascadeTimers(1, (lastTick_ >> WHEEL_BITS) & WHEEL_MASK) && + cascadeTimers(2, (lastTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) { + cascadeTimers(3, (lastTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK); + } } } - if (count_ > 0) { - this->AsyncTimeout::scheduleTimeout(interval_.count()); + + while (!timeouts.empty()) { + auto* cb = &timeouts.front(); + timeouts.pop_front(); + count_--; + cb->wheel_ = nullptr; + cb->expiration_ = {}; + RequestContextScopeGuard rctx(cb->context_); + cb->timeoutExpired(); + if (isDestroyed) { + // The HHWheelTimer itself has been destroyed. The other callbacks + // will have been cancelled from the destructor. Bail before causing + // damage. + return; + } } + scheduleNextTimeout(); } size_t HHWheelTimer::cancelAll() { - decltype(buckets_) buckets; - std::swap(buckets, buckets_); size_t count = 0; - for (auto& tick : buckets) { - for (auto& bucket : tick) { + if (count_ != 0) { + const uint64_t numElements = WHEEL_BUCKETS * WHEEL_SIZE; + auto maxBuckets = std::min(numElements, count_); + auto buckets = std::make_unique(maxBuckets); + size_t countBuckets = 0; + for (auto& tick : buckets_) { + for (auto& bucket : tick) { + if (bucket.empty()) { + continue; + } + count += bucket.size(); + std::swap(bucket, buckets[countBuckets++]); + if (count >= count_) { + break; + } + } + } + + for (size_t i = 0; i < countBuckets; ++i) { + auto& bucket = buckets[i]; while (!bucket.empty()) { auto& cb = bucket.front(); cb.cancelTimeout(); cb.callbackCanceled(); - count++; } } } @@ -222,4 +275,43 @@ size_t HHWheelTimer::cancelAll() { return count; } -} // folly +void HHWheelTimer::scheduleNextTimeout() { + auto nextTick = calcNextTick(); + int64_t tick = 1; + + if (nextTick & WHEEL_MASK) { + auto bi = makeBitIterator(bitmap_.begin()); + auto bi_end = makeBitIterator(bitmap_.end()); + auto it = folly::findFirstSet(bi + (nextTick & WHEEL_MASK), bi_end); + if (it == bi_end) { + tick = WHEEL_SIZE - ((nextTick - 1) & WHEEL_MASK); + } else { + tick = std::distance(bi + (nextTick & WHEEL_MASK), it) + 1; + } + } + + if (count_ > 0) { + if (!this->AsyncTimeout::isScheduled() || + (expireTick_ > tick + nextTick - 1)) { + this->AsyncTimeout::scheduleTimeout(interval_ * tick); + expireTick_ = tick + nextTick - 1; + } + } else { + this->AsyncTimeout::cancelTimeout(); + } +} + +int64_t HHWheelTimer::calcNextTick() { + auto intervals = (getCurTime() - startTime_) / interval_; + // Slow eventbases will have skew between the actual time and the + // callback time. To avoid racing the next scheduleNextTimeout() + // call, always schedule new timeouts against the actual + // timeoutExpired() time. + if (!processingCallbacksGuard_) { + return intervals; + } else { + return lastTick_; + } +} + +} // namespace folly