Generalized and polished folly::TokenBucket v2016.08.29.00
authorPhilipp Unterbrunner <philippu@fb.com>
Mon, 29 Aug 2016 13:27:23 +0000 (06:27 -0700)
committerFacebook Github Bot 1 <facebook-github-bot-1-bot@fb.com>
Mon, 29 Aug 2016 13:38:26 +0000 (06:38 -0700)
Summary: Added support for user-defined clock classes, improved comments, and removed part of the std::atomics use that had no effect on thread-safety.

Reviewed By: yfeldblum

Differential Revision: D3708378

fbshipit-source-id: 1a933c3707c12311584a3b33afd773ee91577167

folly/TokenBucket.h
folly/test/TokenBucketTest.cpp

index 363bb7c4f3d906d10627df37a2147b1cfb0ee41b..4cd32561e728fe9842bb86bffebffd3e17d343fe 100644 (file)
 
 #include <algorithm>
 #include <atomic>
-#include <limits>
 #include <chrono>
 
 #include <folly/Likely.h>
+#include <folly/detail/CacheLocality.h>
 
 namespace folly {
 
-/** Threadsafe TokenBucket implementation, based on the idea of
- * converting tokens into time and maintaining state as a timestamp relative to
- * now.  The number of tokens available is represented by the delta between now
- * and the timestamp, and the 'burst' is represented by the maximum delta.
+/**
+ * Default clock class used by ParameterizedDynamicTokenBucket and derived
+ * classes. User-defined clock classes must be steady (monotonic) and define a
+ * static function std::chrono::duration<> timeSinceEpoch().
  */
-class TokenBucket {
- private:
-  std::atomic<double> time_;
-  std::atomic<double> secondsPerToken_;
-  std::atomic<double> secondsPerBurst_;
+struct DefaultTokenBucketClock {
+  static auto timeSinceEpoch() noexcept
+      -> decltype(std::chrono::steady_clock::now().time_since_epoch()) {
+    return std::chrono::steady_clock::now().time_since_epoch();
+  }
+};
 
+/**
+ * Thread-safe (atomic) token bucket implementation.
+ *
+ * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream
+ * of events with an average rate and some amount of burstiness. The canonical
+ * example is a packet switched network: the network can accept some number of
+ * bytes per second and the bytes come in finite packets (bursts). A token
+ * bucket stores up to a fixed number of tokens (the burst size). Some number
+ * of tokens are removed when an event occurs. The tokens are replenished at a
+ * fixed rate.
+ *
+ * This implementation records the last time it was updated. This allows the
+ * token bucket to add tokens "just in time" when tokens are requested.
+ *
+ * The "dynamic" base variant allows the token generation rate and maximum
+ * burst size to change with every token consumption.
+ *
+ * @tparam ClockT Clock type, must be steady i.e. monotonic.
+ */
+template <typename ClockT = DefaultTokenBucketClock>
+class ParameterizedDynamicTokenBucket {
  public:
-  TokenBucket(double rate, double burst, double nowInSeconds) noexcept
-      : time_(nowInSeconds) {
-    reset(rate, burst, nowInSeconds);
+  /**
+   * Constructor.
+   *
+   * @param zeroTime Initial time at which to consider the token bucket
+   *                 starting to fill. Defaults to 0, so by default token
+   *                 buckets are "full" after construction.
+   */
+  explicit ParameterizedDynamicTokenBucket(double zeroTime = 0) noexcept
+      : zeroTime_(zeroTime) {}
+
+  /**
+   * Copy constructor.
+   *
+   * Thread-safe. (Copy constructors of derived classes may not be thread-safe
+   * however.)
+   */
+  ParameterizedDynamicTokenBucket(
+      const ParameterizedDynamicTokenBucket& other) noexcept
+      : zeroTime_(other.zeroTime_.load()) {}
+
+  /**
+   * Copy-assignment operator.
+   *
+   * Warning: not thread safe for the object being assigned to (including
+   * self-assignment). Thread-safe for the other object.
+   */
+  ParameterizedDynamicTokenBucket& operator=(
+      const ParameterizedDynamicTokenBucket& other) noexcept {
+    zeroTime_ = other.zeroTime_.load();
+    return *this;
   }
 
-  void reset(double rate, double burst, double nowInSeconds) noexcept {
-    double tokens = available(nowInSeconds);
-
-    secondsPerToken_.store(
-        1.0 / rate - std::numeric_limits<double>::epsilon(),
-        std::memory_order_relaxed);
-
-    secondsPerBurst_.store(
-        burst / rate + std::numeric_limits<double>::epsilon(),
-        std::memory_order_relaxed);
-
-    set_capacity(tokens, nowInSeconds);
+  /**
+   * Re-initialize token bucket.
+   *
+   * Thread-safe.
+   *
+   * @param zeroTime Initial time at which to consider the token bucket
+   *                 starting to fill. Defaults to 0, so by default token
+   *                 bucket is reset to "full".
+   */
+  void reset(double zeroTime = 0) noexcept {
+    zeroTime_ = zeroTime;
   }
 
-  void set_capacity(double tokens, double nowInSeconds) noexcept {
-    const double secondsPerToken = std::atomic_load_explicit(
-        &secondsPerToken_, std::memory_order_relaxed);
-
-    const double secondsPerBurst = std::atomic_load_explicit(
-        &secondsPerBurst_, std::memory_order_relaxed);
-
-    double newTime = nowInSeconds - std::min(
-        tokens * secondsPerToken, secondsPerBurst);
-
-    time_.store(newTime, std::memory_order_relaxed);
+  /**
+   * Attempts to consume some number of tokens. Tokens are first added to the
+   * bucket based on the time elapsed since the last attempt to consume tokens.
+   * Note: Attempts to consume more tokens than the burst size will always
+   * fail.
+   *
+   * Thread-safe.
+   *
+   * @param toConsume The number of tokens to consume.
+   * @param rate Number of tokens to generate per second.
+   * @param burstSize Maximum burst size. Must be greater than 0.
+   * @param nowInSeconds Current time in seconds. Should be monotonically
+   *                     increasing from the nowInSeconds specified in
+   *                     this token bucket's constructor.
+   * @return True if the rate limit check passed, false otherwise.
+   */
+  bool consume(
+      double toConsume,
+      double rate,
+      double burstSize,
+      double nowInSeconds = defaultClockNow()) {
+    assert(rate > 0);
+    assert(burstSize > 0);
+
+    return this->consumeImpl(
+        rate, burstSize, nowInSeconds, [toConsume](double& tokens) {
+          if (tokens < toConsume) {
+            return false;
+          }
+          tokens -= toConsume;
+          return true;
+        });
   }
 
-  // If there are `tokens` avilable at `nowInSeconds`, consume them and
-  // return true.  Otherwise, return false.
-  //
-  // This implementation is written in a lock-free manner using a
-  // compare-and-exchange loop, with branch prediction optimized to minimize
-  // time spent in the 'success' case which performs a write.
-  bool consume(double tokens, double nowInSeconds) noexcept {
-    const double secondsNeeded = tokens * std::atomic_load_explicit(
-        &secondsPerToken_, std::memory_order_relaxed);
-
-    const double minTime = nowInSeconds - std::atomic_load_explicit(
-        &secondsPerBurst_, std::memory_order_relaxed);
+  /**
+   * Similar to consume, but always consumes some number of tokens.  If the
+   * bucket contains enough tokens - consumes toConsume tokens.  Otherwise the
+   * bucket is drained.
+   *
+   * Thread-safe.
+   *
+   * @param toConsume The number of tokens to consume.
+   * @param rate Number of tokens to generate per second.
+   * @param burstSize Maximum burst size. Must be greater than 0.
+   * @param nowInSeconds Current time in seconds. Should be monotonically
+   *                     increasing from the nowInSeconds specified in
+   *                     this token bucket's constructor.
+   * @return number of tokens that were consumed.
+   */
+  double consumeOrDrain(
+      double toConsume,
+      double rate,
+      double burstSize,
+      double nowInSeconds = defaultClockNow()) {
+    assert(rate > 0);
+    assert(burstSize > 0);
 
-    double oldTime =
-      std::atomic_load_explicit(&time_, std::memory_order_relaxed);
-    double newTime = oldTime;
+    double consumed;
+    this->consumeImpl(
+        rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) {
+          if (tokens < toConsume) {
+            consumed = tokens;
+            tokens = 0.0;
+          } else {
+            consumed = toConsume;
+            tokens -= toConsume;
+          }
+          return true;
+        });
+    return consumed;
+  }
 
-    // Limit the number of available tokens to 'burst'.  We don't need to do
-    // this inside the loop because if we iterate more than once another
-    // caller will have performed an update that also covered this
-    // calculation.  Also, tell the compiler to optimize branch prediction to
-    // minimize time spent between reads and writes in the success case
-    if (UNLIKELY(minTime > oldTime)) {
-      newTime = minTime;
-    }
+  /**
+   * Returns the number of tokens currently available.
+   *
+   * Thread-safe (but returned value may immediately be outdated).
+   */
+  double available(
+      double rate,
+      double burstSize,
+      double nowInSeconds = defaultClockNow()) const noexcept {
+    assert(rate > 0);
+    assert(burstSize > 0);
+
+    return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize);
+  }
 
-    while (true) {
-      newTime += secondsNeeded;
+  /**
+   * Returns the current time in seconds since Epoch.
+   */
+  static double defaultClockNow() noexcept(noexcept(ClockT::timeSinceEpoch())) {
+    return std::chrono::duration_cast<std::chrono::duration<double>>(
+               ClockT::timeSinceEpoch())
+        .count();
+  }
 
-      // Optimize for the write-contention case, to minimize the impact of
-      // branch misprediction on other threads
-      if (UNLIKELY(newTime > nowInSeconds)) {
+ private:
+  template <typename TCallback>
+  bool consumeImpl(
+      double rate,
+      double burstSize,
+      double nowInSeconds,
+      const TCallback& callback) {
+    auto zeroTimeOld = zeroTime_.load();
+    double zeroTimeNew;
+    do {
+      auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
+      if (!callback(tokens)) {
         return false;
       }
-
-      // Optimize for the write-contention case, to minimize the impact of
-      // branch misprediction on other threads
-      if (LIKELY(std::atomic_compare_exchange_weak_explicit(
-              &time_, &oldTime, newTime,
-              std::memory_order_relaxed, std::memory_order_relaxed))) {
-        return true;
-      }
-
-      newTime = oldTime;
-    }
+      zeroTimeNew = nowInSeconds - tokens / rate;
+    } while (
+        UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
 
     return true;
   }
 
-  // Similar to consume, but will always consume some number of tokens.
-  double consumeOrDrain(double tokens, double nowInSeconds) noexcept {
-    const double secondsPerToken = std::atomic_load_explicit(
-        &secondsPerToken_, std::memory_order_relaxed);
-
-    const double secondsNeeded = tokens * secondsPerToken;
-    const double minTime = nowInSeconds - std::atomic_load_explicit(
-        &secondsPerBurst_, std::memory_order_relaxed);
-
-    double oldTime =
-      std::atomic_load_explicit(&time_, std::memory_order_relaxed);
-    double newTime = oldTime;
+  std::atomic<double> zeroTime_ FOLLY_ALIGN_TO_AVOID_FALSE_SHARING;
+};
 
+/**
+ * Specialization of ParameterizedDynamicTokenBucket with a fixed token
+ * generation rate and a fixed maximum burst size.
+ */
+template <typename ClockT = DefaultTokenBucketClock>
+class ParameterizedTokenBucket {
+ private:
+  using Impl = ParameterizedDynamicTokenBucket<ClockT>;
 
-    // Limit the number of available tokens to 'burst'.
-    // Also, tell the compiler to optimize branch prediction to
-    // minimize time spent between reads and writes in the success case
-    if (UNLIKELY(minTime > oldTime)) {
-      newTime = minTime;
-    }
+ public:
+  /**
+   * Construct a token bucket with a specific maximum rate and burst size.
+   *
+   * @param genRate Number of tokens to generate per second.
+   * @param burstSize Maximum burst size. Must be greater than 0.
+   * @param zeroTime Initial time at which to consider the token bucket
+   *                 starting to fill. Defaults to 0, so by default token
+   *                 bucket is "full" after construction.
+   */
+  ParameterizedTokenBucket(
+      double genRate,
+      double burstSize,
+      double zeroTime = 0) noexcept
+      : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
+    assert(rate_ > 0);
+    assert(burstSize_ > 0);
+  }
 
-    double consumed;
+  /**
+   * Copy constructor.
+   *
+   * Warning: not thread safe!
+   */
+  ParameterizedTokenBucket(const ParameterizedTokenBucket& other) noexcept =
+      default;
+
+  /**
+   * Copy-assignment operator.
+   *
+   * Warning: not thread safe!
+   */
+  ParameterizedTokenBucket& operator=(
+      const ParameterizedTokenBucket& other) noexcept = default;
+
+  /**
+   * Change rate and burst size.
+   *
+   * Warning: not thread safe!
+   *
+   * @param genRate Number of tokens to generate per second.
+   * @param burstSize Maximum burst size. Must be greater than 0.
+   * @param nowInSeconds Current time in seconds. Should be monotonically
+   *                     increasing from the nowInSeconds specified in
+   *                     this token bucket's constructor.
+   */
+  void reset(
+      double genRate,
+      double burstSize,
+      double nowInSeconds = defaultClockNow()) noexcept {
+    assert(genRate > 0);
+    assert(burstSize > 0);
+    double availTokens = available(nowInSeconds);
+    rate_ = genRate;
+    burstSize_ = burstSize;
+    setCapacity(availTokens, nowInSeconds);
+  }
 
-    newTime += secondsNeeded;
+  /**
+   * Change number of tokens in bucket.
+   *
+   * Warning: not thread safe!
+   *
+   * @param tokens Desired number of tokens in bucket after the call.
+   * @param nowInSeconds Current time in seconds. Should be monotonically
+   *                     increasing from the nowInSeconds specified in
+   *                     this token bucket's constructor.
+   */
+  void setCapacity(double tokens, double nowInSeconds) noexcept {
+    tokenBucket_.reset(nowInSeconds - tokens / rate_);
+  }
 
-    consumed = (newTime - nowInSeconds) / secondsPerToken;
-    time_.store(newTime, std::memory_order_relaxed);
+  /**
+   * Attempts to consume some number of tokens. Tokens are first added to the
+   * bucket based on the time elapsed since the last attempt to consume tokens.
+   * Note: Attempts to consume more tokens than the burst size will always
+   * fail.
+   *
+   * Thread-safe.
+   *
+   * @param toConsume The number of tokens to consume.
+   * @param nowInSeconds Current time in seconds. Should be monotonically
+   *                     increasing from the nowInSeconds specified in
+   *                     this token bucket's constructor.
+   * @return True if the rate limit check passed, false otherwise.
+   */
+  bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
+    return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
+  }
 
-    return consumed;
+  /**
+   * Similar to consume, but always consumes some number of tokens.  If the
+   * bucket contains enough tokens - consumes toConsume tokens.  Otherwise the
+   * bucket is drained.
+   *
+   * Thread-safe.
+   *
+   * @param toConsume The number of tokens to consume.
+   * @param nowInSeconds Current time in seconds. Should be monotonically
+   *                     increasing from the nowInSeconds specified in
+   *                     this token bucket's constructor.
+   * @return number of tokens that were consumed.
+   */
+  double consumeOrDrain(
+      double toConsume,
+      double nowInSeconds = defaultClockNow()) {
+    return tokenBucket_.consumeOrDrain(
+        toConsume, rate_, burstSize_, nowInSeconds);
   }
 
-  double available(double nowInSeconds = defaultClockNow()) const noexcept {
-    double time =
-      std::atomic_load_explicit(&time_, std::memory_order_relaxed);
+  /**
+   * Returns the number of tokens currently available.
+   *
+   * Thread-safe (but returned value may immediately be outdated).
+   */
+  double available(double nowInSeconds = defaultClockNow()) const {
+    return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
+  }
 
-    double deltaTime = std::min(
-        std::atomic_load_explicit(&secondsPerBurst_,
-                                  std::memory_order_relaxed),
-        nowInSeconds - time);
+  /**
+   * Returns the number of tokens generated per second.
+   *
+   * Thread-safe (but returned value may immediately be outdated).
+   */
+  double rate() const noexcept {
+    return rate_;
+  }
 
-    return std::max(0.0, deltaTime / std::atomic_load_explicit(
-          &secondsPerToken_, std::memory_order_relaxed));
+  /**
+   * Returns the maximum burst size.
+   *
+   * Thread-safe (but returned value may immediately be outdated).
+   */
+  double burst() const noexcept {
+    return burstSize_;
   }
 
-  static double defaultClockNow() {
-    return std::chrono::duration_cast<std::chrono::microseconds>(
-        std::chrono::steady_clock::now().time_since_epoch()
-      ).count() / 1000000.0;
+  /**
+   * Returns the current time in seconds since Epoch.
+   */
+  static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
+    return Impl::defaultClockNow();
   }
+
+ private:
+  Impl tokenBucket_;
+  double rate_;
+  double burstSize_;
 };
 
+using TokenBucket = ParameterizedTokenBucket<>;
+using DynamicTokenBucket = ParameterizedDynamicTokenBucket<>;
 }
index 6b699c80918e4092f5695723ee7da58c38959013..7f0f49533902cff5442a6de2ec699ab35f8e06c2 100644 (file)
@@ -72,3 +72,64 @@ static std::vector<std::pair<double, double> > rateToConsumeSize = {
 INSTANTIATE_TEST_CASE_P(TokenBucket,
                         TokenBucketTest,
                         ::testing::ValuesIn(rateToConsumeSize));
+
+void doTokenBucketTest(double maxQps, double consumeSize) {
+  const double tenMillisecondBurst = maxQps * 0.010;
+  // Select a burst size of 10 milliseconds at the max rate or the consume size
+  // if 10 ms at maxQps is too small.
+  const double burstSize = std::max(consumeSize, tenMillisecondBurst);
+  TokenBucket tokenBucket(maxQps, burstSize, 0);
+  double tokenCounter = 0;
+  double currentTime = 0;
+  // Simulate time advancing 10 seconds
+  for (; currentTime <= 10.0; currentTime += 0.001) {
+    EXPECT_FALSE(tokenBucket.consume(burstSize + 1, currentTime));
+    while (tokenBucket.consume(consumeSize, currentTime)) {
+      tokenCounter += consumeSize;
+    }
+    // Tokens consumed should exceed some lower bound based on maxQps.
+    // Note: The token bucket implementation is not precise, so the lower bound
+    // is somewhat fudged. The upper bound is accurate however.
+    EXPECT_LE(maxQps * currentTime * 0.9 - 1, tokenCounter);
+    // Tokens consumed should not exceed some upper bound based on maxQps.
+    EXPECT_GE(maxQps * currentTime + 1e-6, tokenCounter);
+  }
+}
+
+TEST(TokenBucket, sanity) {
+  doTokenBucketTest(100, 1);
+  doTokenBucketTest(1000, 1);
+  doTokenBucketTest(10000, 1);
+  // Consume more than one at a time.
+  doTokenBucketTest(10000, 5);
+}
+
+TEST(TokenBucket, ReverseTime2) {
+  const double rate = 1000;
+  TokenBucket tokenBucket(rate, rate * 0.01 + 1e-6);
+  size_t count = 0;
+  while (tokenBucket.consume(1, 0.1)) {
+    count += 1;
+  }
+  EXPECT_EQ(10, count);
+  // Going backwards in time has no affect on the toke count (this protects
+  // against different threads providing out of order timestamps).
+  double tokensBefore = tokenBucket.available();
+  EXPECT_FALSE(tokenBucket.consume(1, 0.09999999));
+  EXPECT_EQ(tokensBefore, tokenBucket.available());
+}
+
+TEST(TokenBucket, drainOnFail) {
+  DynamicTokenBucket tokenBucket;
+
+  // Almost empty the bucket
+  EXPECT_TRUE(tokenBucket.consume(9, 10, 10, 1));
+
+  // Request more tokens than available
+  EXPECT_FALSE(tokenBucket.consume(5, 10, 10, 1));
+  EXPECT_DOUBLE_EQ(1.0, tokenBucket.available(10, 10, 1));
+
+  // Again request more tokens than available, but ask to drain
+  EXPECT_DOUBLE_EQ(1.0, tokenBucket.consumeOrDrain(5, 10, 10, 1));
+  EXPECT_DOUBLE_EQ(0.0, tokenBucket.consumeOrDrain(1, 10, 10, 1));
+}