--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <folly/io/async/HHWheelTimer.h>
+#include <folly/io/async/Request.h>
+
+#include <folly/ScopeGuard.h>
+
+#include <cassert>
+
+using std::chrono::milliseconds;
+
+namespace folly {
+
+/**
+ * We want to select the default interval carefully.
+ * An interval of 10ms will give us 10ms * WHEEL_SIZE^WHEEL_BUCKETS
+ * for the largest timeout possible, or about 497 days.
+ *
+ * For a lower bound, we want a reasonable limit on local IO, 10ms
+ * seems short enough
+ *
+ * A shorter interval also has CPU implications, less than 1ms might
+ * start showing up in cpu perf. Also, it might not be possible to set
+ * tick interval less than 10ms on older kernels.
+ */
+int HHWheelTimer::DEFAULT_TICK_INTERVAL = 10;
+
+HHWheelTimer::Callback::~Callback() {
+ if (isScheduled()) {
+ cancelTimeout();
+ }
+}
+
+void HHWheelTimer::Callback::setScheduled(HHWheelTimer* wheel,
+ std::chrono::milliseconds timeout) {
+ assert(wheel_ == nullptr);
+ assert(expiration_ == milliseconds(0));
+
+ wheel_ = wheel;
+
+ if (wheel_->count_ == 0) {
+ wheel_->now_ = std::chrono::duration_cast<milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch());
+ }
+
+ expiration_ = wheel_->now_ + timeout;
+}
+
+void HHWheelTimer::Callback::cancelTimeoutImpl() {
+ if (--wheel_->count_ <= 0) {
+ assert(wheel_->count_ == 0);
+ wheel_->AsyncTimeout::cancelTimeout();
+ }
+ hook_.unlink();
+
+ wheel_ = nullptr;
+ expiration_ = milliseconds(0);
+}
+
+HHWheelTimer::HHWheelTimer(folly::EventBase* eventBase,
+ std::chrono::milliseconds intervalMS)
+ : AsyncTimeout(eventBase)
+ , interval_(intervalMS)
+ , nextTick_(1)
+ , count_(0)
+ , catchupEveryN_(DEFAULT_CATCHUP_EVERY_N)
+ , expirationsSinceCatchup_(0)
+{
+}
+
+HHWheelTimer::~HHWheelTimer() {
+}
+
+void HHWheelTimer::destroy() {
+ assert(count_ == 0);
+ DelayedDestruction::destroy();
+}
+
+void HHWheelTimer::scheduleTimeoutImpl(Callback* callback,
+ std::chrono::milliseconds timeout) {
+ uint32_t due = timeToWheelTicks(timeout) + nextTick_;
+ int64_t diff = due - nextTick_;
+ CallbackList* list;
+
+ if (diff < WHEEL_SIZE) {
+ list = &buckets_[0][due & WHEEL_MASK];
+ } else if (diff < 1 << (2 * WHEEL_BITS)) {
+ list = &buckets_[1][(due >> WHEEL_BITS) & WHEEL_MASK];
+ } else if (diff < 1 << (3 * WHEEL_BITS)) {
+ list = &buckets_[2][(due >> 2 * WHEEL_BITS) & WHEEL_MASK];
+ } else if (diff < 0) {
+ list = &buckets_[0][nextTick_ & WHEEL_MASK];
+ } else {
+ /* in largest slot */
+ if (diff > LARGEST_SLOT) {
+ diff = LARGEST_SLOT;
+ due = diff + nextTick_;
+ }
+ list = &buckets_[3][(due >> 3 * WHEEL_BITS) & WHEEL_MASK];
+ }
+ list->push_back(*callback);
+}
+
+void HHWheelTimer::scheduleTimeout(Callback* callback,
+ std::chrono::milliseconds timeout) {
+ // Cancel the callback if it happens to be scheduled already.
+ callback->cancelTimeout();
+
+ callback->context_ = RequestContext::saveContext();
+
+ if (count_ == 0) {
+ this->AsyncTimeout::scheduleTimeout(interval_.count());
+ }
+
+ callback->setScheduled(this, timeout);
+ scheduleTimeoutImpl(callback, timeout);
+ count_++;
+}
+
+bool HHWheelTimer::cascadeTimers(int bucket, int tick) {
+ CallbackList cbs;
+ cbs.swap(buckets_[bucket][tick]);
+ while (!cbs.empty()) {
+ auto* cb = &cbs.front();
+ cbs.pop_front();
+ scheduleTimeoutImpl(cb, cb->getTimeRemaining(now_));
+ }
+
+ // If tick is zero, timeoutExpired will cascade the next bucket.
+ return tick == 0;
+}
+
+void HHWheelTimer::timeoutExpired() noexcept {
+ // If destroy() is called inside timeoutExpired(), delay actual destruction
+ // until timeoutExpired() returns
+ DestructorGuard dg(this);
+
+ // timeoutExpired() can only be invoked directly from the event base loop.
+ // It should never be invoked recursively.
+ //
+ milliseconds catchup = now_ + interval_;
+ // If catchup is enabled, we may have missed multiple intervals, use
+ // currentTime() to check exactly.
+ if (++expirationsSinceCatchup_ >= catchupEveryN_) {
+ catchup = std::chrono::duration_cast<milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch());
+ expirationsSinceCatchup_ = 0;
+ }
+ while (now_ < catchup) {
+ now_ += interval_;
+
+ int idx = nextTick_ & WHEEL_MASK;
+ if (0 == idx) {
+ // Cascade timers
+ if (cascadeTimers(1, (nextTick_ >> WHEEL_BITS) & WHEEL_MASK) &&
+ cascadeTimers(2, (nextTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) {
+ cascadeTimers(3, (nextTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK);
+ }
+ }
+
+ nextTick_++;
+ CallbackList* cbs = &buckets_[0][idx];
+ while (!cbs->empty()) {
+ auto* cb = &cbs->front();
+ cbs->pop_front();
+ count_--;
+ cb->wheel_ = nullptr;
+ cb->expiration_ = milliseconds(0);
+ auto old_ctx =
+ RequestContext::setContext(cb->context_);
+ cb->timeoutExpired();
+ RequestContext::setContext(old_ctx);
+ }
+ }
+ if (count_ > 0) {
+ this->AsyncTimeout::scheduleTimeout(interval_.count());
+ }
+}
+
+} // folly
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <folly/io/async/AsyncTimeout.h>
+#include <folly/io/async/DelayedDestruction.h>
+
+#include <boost/intrusive/list.hpp>
+
+#include <chrono>
+#include <cstddef>
+#include <memory>
+#include <list>
+
+namespace folly {
+
+/**
+ * Hashed Hierarchical Wheel Timer
+ *
+ * Comparison:
+ * TAsyncTimeout - a single timeout.
+ * HHWheelTimer - a set of efficient timeouts with different interval,
+ * but timeouts are not exact.
+ *
+ * All of the above are O(1) in insertion, tick update and cancel
+
+ * This implementation ticks once every 10ms.
+ * We model timers as the number of ticks until the next
+ * due event. We allow 32-bits of space to track this
+ * due interval, and break that into 4 regions of 8 bits.
+ * Each region indexes into a bucket of 256 lists.
+ *
+ * Bucket 0 represents those events that are due the soonest.
+ * Each tick causes us to look at the next list in a bucket.
+ * The 0th list in a bucket is special; it means that it is time to
+ * flush the timers from the next higher bucket and schedule them
+ * into a different bucket.
+ *
+ * This technique results in a very cheap mechanism for
+ * maintaining time and timers, provided that we can maintain
+ * a consistent rate of ticks.
+ */
+class HHWheelTimer : protected folly::AsyncTimeout,
+ public folly::DelayedDestruction {
+ public:
+ typedef std::unique_ptr<HHWheelTimer, Destructor> UniquePtr;
+
+ /**
+ * A callback to be notified when a timeout has expired.
+ */
+ class Callback {
+ public:
+ Callback()
+ : wheel_(nullptr)
+ , expiration_(0) {}
+
+ virtual ~Callback();
+
+ /**
+ * timeoutExpired() is invoked when the timeout has expired.
+ */
+ virtual void timeoutExpired() noexcept = 0;
+
+ /**
+ * Cancel the timeout, if it is running.
+ *
+ * If the timeout is not scheduled, cancelTimeout() does nothing.
+ */
+ void cancelTimeout() {
+ if (wheel_ == nullptr) {
+ // We're not scheduled, so there's nothing to do.
+ return;
+ }
+ cancelTimeoutImpl();
+ }
+
+ /**
+ * Return true if this timeout is currently scheduled, and false otherwise.
+ */
+ bool isScheduled() const {
+ return wheel_ != nullptr;
+ }
+
+ private:
+ // Get the time remaining until this timeout expires
+ std::chrono::milliseconds getTimeRemaining(
+ std::chrono::milliseconds now) const {
+ if (now >= expiration_) {
+ return std::chrono::milliseconds(0);
+ }
+ return expiration_ - now;
+ }
+
+ void setScheduled(HHWheelTimer* wheel,
+ std::chrono::milliseconds);
+ void cancelTimeoutImpl();
+
+ HHWheelTimer* wheel_;
+ std::chrono::milliseconds expiration_;
+
+ typedef boost::intrusive::list_member_hook<
+ boost::intrusive::link_mode<boost::intrusive::auto_unlink> > ListHook;
+
+ ListHook hook_;
+
+ typedef boost::intrusive::list<
+ Callback,
+ boost::intrusive::member_hook<Callback, ListHook, &Callback::hook_>,
+ boost::intrusive::constant_time_size<false> > List;
+
+ std::shared_ptr<RequestContext> context_;
+
+ // Give HHWheelTimer direct access to our members so it can take care
+ // of scheduling/cancelling.
+ friend class HHWheelTimer;
+ };
+
+ /**
+ * Create a new HHWheelTimer with the specified interval.
+ */
+ static int DEFAULT_TICK_INTERVAL;
+ explicit HHWheelTimer(folly::EventBase* eventBase,
+ std::chrono::milliseconds intervalMS =
+ std::chrono::milliseconds(DEFAULT_TICK_INTERVAL));
+
+ /**
+ * Destroy the HHWheelTimer.
+ *
+ * A HHWheelTimer should only be destroyed when there are no more
+ * callbacks pending in the set.
+ */
+ virtual void destroy();
+
+ /**
+ * Get the tick interval for this HHWheelTimer.
+ *
+ * Returns the tick interval in milliseconds.
+ */
+ std::chrono::milliseconds getTickInterval() const {
+ return interval_;
+ }
+
+ /**
+ * Schedule the specified Callback to be invoked after the
+ * specified timeout interval.
+ *
+ * If the callback is already scheduled, this cancels the existing timeout
+ * before scheduling the new timeout.
+ */
+ void scheduleTimeout(Callback* callback,
+ std::chrono::milliseconds timeout);
+ void scheduleTimeoutImpl(Callback* callback,
+ std::chrono::milliseconds timeout);
+
+ /**
+ * Return the number of currently pending timeouts
+ */
+ uint64_t count() const {
+ return count_;
+ }
+
+ /**
+ * This turns on more exact timing. By default the wheel timer
+ * increments its cached time only once everyN (default) ticks.
+ *
+ * With catchupEveryN at 1, timeouts will only be delayed until the
+ * next tick, at which point all overdue timeouts are called. The
+ * wheel timer is approximately 2x slower with this set to 1.
+ *
+ * Load testing in opt mode showed skew was about 1% with no catchup.
+ */
+ void setCatchupEveryN(uint32_t everyN) {
+ catchupEveryN_ = everyN;
+ }
+
+ using folly::AsyncTimeout::attachEventBase;
+ using folly::AsyncTimeout::detachEventBase;
+ using folly::AsyncTimeout::getTimeoutManager;
+
+ protected:
+ /**
+ * Protected destructor.
+ *
+ * Use destroy() instead. See the comments in DelayedDestruction for more
+ * details.
+ */
+ virtual ~HHWheelTimer();
+
+ private:
+ // Forbidden copy constructor and assignment operator
+ HHWheelTimer(HHWheelTimer const &) = delete;
+ HHWheelTimer& operator=(HHWheelTimer const &) = delete;
+
+ // Methods inherited from TAsyncTimeout
+ virtual void timeoutExpired() noexcept;
+
+ std::chrono::milliseconds interval_;
+
+ static constexpr int WHEEL_BUCKETS = 4;
+ static constexpr int WHEEL_BITS = 8;
+ static constexpr unsigned int WHEEL_SIZE = (1 << WHEEL_BITS);
+ static constexpr unsigned int WHEEL_MASK = (WHEEL_SIZE - 1);
+ static constexpr uint32_t LARGEST_SLOT = 0xffffffffUL;
+
+ typedef Callback::List CallbackList;
+ CallbackList buckets_[WHEEL_BUCKETS][WHEEL_SIZE];
+
+ uint32_t timeToWheelTicks(std::chrono::milliseconds t) {
+ return t.count() / interval_.count();
+ }
+
+ bool cascadeTimers(int bucket, int tick);
+ int64_t nextTick_;
+ uint64_t count_;
+ std::chrono::milliseconds now_;
+
+ static constexpr uint32_t DEFAULT_CATCHUP_EVERY_N = 100;
+
+ uint32_t catchupEveryN_;
+ uint32_t expirationsSinceCatchup_;
+};
+
+} // folly