remove constant tick
authorDave Watson <davejwatson@fb.com>
Thu, 18 Aug 2016 15:38:21 +0000 (08:38 -0700)
committerFacebook Github Bot <facebook-github-bot-bot@fb.com>
Thu, 18 Aug 2016 15:53:30 +0000 (08:53 -0700)
Summary:
Preciesly calculate the next tick.  Currently only calculates the tick based on the lowest level of wheel timer, so it will still tick at least every WHEEL_SIZE intervals.

Currently the tick calculation is a linear scan over all the buckets, the next diff will optimize this.

Reviewed By: yfeldblum

Differential Revision: D3637096

fbshipit-source-id: 53dd596a2085c05c657cccbc7efba267bbd086a6

folly/io/async/HHWheelTimer.cpp
folly/io/async/HHWheelTimer.h
folly/io/async/test/HHWheelTimerSlowTests.cpp

index 9adcf2d42726b6c7a66a32f739cfce2af2c1ffaf..5a1cb3643d748eb7df40f91a5e53cb946b0af42c 100644 (file)
@@ -58,12 +58,7 @@ void HHWheelTimer::Callback::setScheduled(HHWheelTimer* wheel,
 
   wheel_ = wheel;
 
-  // Only update the now_ time if we're not in a timeout expired callback
-  if (wheel_->count_  == 0 && !wheel_->processingCallbacksGuard_) {
-    wheel_->now_ = getCurTime();
-  }
-
-  expiration_ = wheel_->now_ + timeout;
+  expiration_ = getCurTime() + timeout;
 }
 
 void HHWheelTimer::Callback::cancelTimeoutImpl() {
@@ -85,8 +80,10 @@ HHWheelTimer::HHWheelTimer(
     : AsyncTimeout(timeoutMananger, internal),
       interval_(intervalMS),
       defaultTimeout_(defaultTimeoutMS),
-      nextTick_(1),
+      lastTick_(1),
+      expireTick_(1),
       count_(0),
+      startTime_(getCurTime()),
       processingCallbacksGuard_(nullptr) {}
 
 HHWheelTimer::~HHWheelTimer() {
@@ -108,12 +105,13 @@ HHWheelTimer::~HHWheelTimer() {
 
 void HHWheelTimer::scheduleTimeoutImpl(Callback* callback,
                                        std::chrono::milliseconds timeout) {
-  int64_t due = timeToWheelTicks(timeout) + nextTick_;
-  int64_t diff = due - nextTick_;
+  auto nextTick = calcNextTick();
+  int64_t due = timeToWheelTicks(timeout) + nextTick;
+  int64_t diff = due - nextTick;
   CallbackList* list;
 
   if (diff < 0) {
-    list = &buckets_[0][nextTick_ & WHEEL_MASK];
+    list = &buckets_[0][nextTick & WHEEL_MASK];
   } else if (diff < WHEEL_SIZE) {
     list = &buckets_[0][due & WHEEL_MASK];
   } else if (diff < 1 << (2 * WHEEL_BITS)) {
@@ -124,7 +122,7 @@ void HHWheelTimer::scheduleTimeoutImpl(Callback* callback,
     /* in largest slot */
     if (diff > LARGEST_SLOT) {
       diff = LARGEST_SLOT;
-      due = diff + nextTick_;
+      due = diff + nextTick;
     }
     list = &buckets_[3][(due >> 3 * WHEEL_BITS) & WHEEL_MASK];
   }
@@ -138,13 +136,18 @@ void HHWheelTimer::scheduleTimeout(Callback* callback,
 
   callback->context_ = RequestContext::saveContext();
 
-  if (count_ == 0 && !processingCallbacksGuard_) {
-    this->AsyncTimeout::scheduleTimeout(interval_.count());
-  }
+  uint64_t prev = count_;
+  count_++;
 
   callback->setScheduled(this, timeout);
   scheduleTimeoutImpl(callback, timeout);
-  count_++;
+
+  /* If we're calling callbacks, timer will be reset after all
+   * callbacks are called.
+   */
+  if (!processingCallbacksGuard_) {
+    scheduleNextTimeout();
+  }
 }
 
 void HHWheelTimer::scheduleTimeout(Callback* callback) {
@@ -159,7 +162,7 @@ bool HHWheelTimer::cascadeTimers(int bucket, int tick) {
   while (!cbs.empty()) {
     auto* cb = &cbs.front();
     cbs.pop_front();
-    scheduleTimeoutImpl(cb, cb->getTimeRemaining(now_));
+    scheduleTimeoutImpl(cb, cb->getTimeRemaining(getCurTime()));
   }
 
   // If tick is zero, timeoutExpired will cascade the next bucket.
@@ -167,6 +170,8 @@ bool HHWheelTimer::cascadeTimers(int bucket, int tick) {
 }
 
 void HHWheelTimer::timeoutExpired() noexcept {
+  auto nextTick = calcNextTick();
+
   // If the last smart pointer for "this" is reset inside the callback's
   // timeoutExpired(), then the guard will detect that it is time to bail from
   // this method.
@@ -185,21 +190,19 @@ void HHWheelTimer::timeoutExpired() noexcept {
   // timeoutExpired() can only be invoked directly from the event base loop.
   // It should never be invoked recursively.
   //
-  milliseconds catchup = std::chrono::duration_cast<milliseconds>(
-      std::chrono::steady_clock::now().time_since_epoch());
-  while (now_ < catchup) {
-    now_ += interval_;
+  lastTick_ = expireTick_;
+  while (lastTick_ < nextTick) {
+    int idx = lastTick_ & WHEEL_MASK;
 
-    int idx = nextTick_ & WHEEL_MASK;
-    if (0 == idx) {
+    if (idx == 0) {
       // Cascade timers
-      if (cascadeTimers(1, (nextTick_ >> WHEEL_BITS) & WHEEL_MASK) &&
-          cascadeTimers(2, (nextTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) {
-        cascadeTimers(3, (nextTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK);
+      if (cascadeTimers(1, (lastTick_ >> WHEEL_BITS) & WHEEL_MASK) &&
+          cascadeTimers(2, (lastTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) {
+        cascadeTimers(3, (lastTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK);
       }
     }
 
-    nextTick_++;
+    lastTick_++;
     CallbackList* cbs = &buckets_[0][idx];
     while (!cbs->empty()) {
       auto* cb = &cbs->front();
@@ -223,9 +226,7 @@ void HHWheelTimer::timeoutExpired() noexcept {
       return;
     }
   }
-  if (count_ > 0) {
-    this->AsyncTimeout::scheduleTimeout(interval_.count());
-  }
+  scheduleNextTimeout();
 }
 
 size_t HHWheelTimer::cancelAll() {
@@ -262,4 +263,41 @@ size_t HHWheelTimer::cancelAll() {
   return count;
 }
 
+void HHWheelTimer::scheduleNextTimeout() {
+  auto nextTick = calcNextTick();
+  long tick = 1;
+  if (nextTick & WHEEL_MASK) {
+    for (tick = nextTick & WHEEL_MASK; tick < WHEEL_SIZE; tick++) {
+      if (!buckets_[0][tick].empty()) {
+        break;
+      }
+    }
+    tick -= (nextTick - 1) & WHEEL_MASK;
+  }
+
+  if (count_ > 0) {
+    if (!this->AsyncTimeout::isScheduled() ||
+        (expireTick_ > tick + nextTick - 1)) {
+      this->AsyncTimeout::scheduleTimeout(interval_ * tick);
+      expireTick_ = tick + nextTick - 1;
+    }
+  } else {
+    this->AsyncTimeout::cancelTimeout();
+  }
+}
+
+int64_t HHWheelTimer::calcNextTick() {
+  auto intervals =
+      (getCurTime().count() - startTime_.count()) / interval_.count();
+  // Slow eventbases will have skew between the actual time and the
+  // callback time.  To avoid racing the next scheduleNextTimeout()
+  // call, always schedule new timeouts against the actual
+  // timeoutExpired() time.
+  if (!processingCallbacksGuard_) {
+    return intervals;
+  } else {
+    return lastTick_;
+  }
+}
+
 } // folly
index c8602336e8ff363543b2c7aa4f38a446a69a8beb..b11201c1a51ea19089f40a97f4b83476d45552c1 100644 (file)
@@ -33,14 +33,6 @@ namespace folly {
 /**
  * Hashed Hierarchical Wheel Timer
  *
- * Comparison:
- * AsyncTimeout - a single timeout.
- * HHWheelTimer - a set of efficient timeouts with different interval,
- *    but timeouts are not exact.
- *
- * All of the above are O(1) in insertion, tick update and cancel
-
- * This implementation ticks once every 10ms.
  * We model timers as the number of ticks until the next
  * due event.  We allow 32-bits of space to track this
  * due interval, and break that into 4 regions of 8 bits.
@@ -53,8 +45,11 @@ namespace folly {
  * into a different bucket.
  *
  * This technique results in a very cheap mechanism for
- * maintaining time and timers, provided that we can maintain
- * a consistent rate of ticks.
+ * maintaining time and timers.
+ *
+ * Unlike the original timer wheel paper, this implementation does
+ * *not* tick constantly, and instead calculates the exact next wakeup
+ * time.
  */
 class HHWheelTimer : private folly::AsyncTimeout,
                      public folly::DelayedDestruction {
@@ -292,12 +287,22 @@ class HHWheelTimer : private folly::AsyncTimeout,
   }
 
   bool cascadeTimers(int bucket, int tick);
-  int64_t nextTick_;
+  int64_t lastTick_;
+  int64_t expireTick_;
   uint64_t count_;
-  std::chrono::milliseconds now_;
+  std::chrono::milliseconds startTime_;
+
+  int64_t calcNextTick();
+
+  void scheduleNextTimeout();
 
   bool* processingCallbacksGuard_;
   CallbackList timeouts; // Timeouts queued to run
+
+  std::chrono::milliseconds getCurTime() {
+    return std::chrono::duration_cast<std::chrono::milliseconds>(
+        std::chrono::steady_clock::now().time_since_epoch());
+  }
 };
 
 } // folly
index 74ae6127e340417625ec42c4d8dfee3448a587da..20a40cf65a6093e649cb78e23b9e732e3b03b4da 100644 (file)
@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <folly/Random.h>
 #include <folly/io/async/EventBase.h>
 #include <folly/io/async/HHWheelTimer.h>
 #include <folly/io/async/test/UndelayedDestruction.h>
@@ -272,3 +273,52 @@ TEST_F(HHWheelTimerTest, Level1) {
   T_CHECK_TIMEOUT(
       start, t2.timestamps[0], milliseconds(300), milliseconds(256));
 }
+
+TEST_F(HHWheelTimerTest, Stress) {
+  StackWheelTimer t(&eventBase, milliseconds(1));
+
+  long timeoutcount = 10000;
+  TestTimeout timeouts[10000];
+  long runtimeouts = 0;
+  for (long i = 0; i < timeoutcount; i++) {
+    long newtimeout = Random::rand32(1, 10000);
+    if (Random::rand32(3)) {
+      // NOTE: hhwheel timer runs before eventbase runAfterDelay,
+      // so runAfterDelay cancelTimeout() must run  at least one timerwheel
+      // before scheduleTimeout, to ensure it runs first.
+      newtimeout += 256;
+      t.scheduleTimeout(&timeouts[i], std::chrono::milliseconds(newtimeout));
+      eventBase.runAfterDelay(
+          [&, i]() {
+            timeouts[i].fn = nullptr;
+            timeouts[i].cancelTimeout();
+            runtimeouts++;
+            LOG(INFO) << "Ran " << runtimeouts << " timeouts, cancelled";
+          },
+          newtimeout - 256);
+      timeouts[i].fn = [&, i, newtimeout]() {
+        LOG(INFO) << "FAIL:timer " << i << " still fired in " << newtimeout;
+        EXPECT_FALSE(true);
+      };
+    } else {
+      t.scheduleTimeout(&timeouts[i], std::chrono::milliseconds(newtimeout));
+      timeouts[i].fn = [&, i]() {
+        timeoutcount++;
+        long newtimeout = Random::rand32(1, 10000);
+        t.scheduleTimeout(&timeouts[i], std::chrono::milliseconds(newtimeout));
+        runtimeouts++;
+        /* sleep override */ usleep(1000);
+        LOG(INFO) << "Ran " << runtimeouts << " timeouts of " << timeoutcount;
+        timeouts[i].fn = [&, i]() {
+          runtimeouts++;
+          LOG(INFO) << "Ran " << runtimeouts << " timeouts of " << timeoutcount;
+        };
+      };
+    }
+  }
+
+  LOG(INFO) << "RUNNING TEST";
+  eventBase.loop();
+
+  EXPECT_EQ(runtimeouts, timeoutcount);
+}