X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2FTokenBucket.h;h=2abbfe379e71dabd50c7258945fdf82f61fd6ba4;hp=363bb7c4f3d906d10627df37a2147b1cfb0ee41b;hb=b71a1b76b3dd7d63bc1d27ed292ddb604fdd9388;hpb=321542683a01c3f334047531e9b487f047129775 diff --git a/folly/TokenBucket.h b/folly/TokenBucket.h index 363bb7c4..2abbfe37 100644 --- a/folly/TokenBucket.h +++ b/folly/TokenBucket.h @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,155 +18,371 @@ #include #include -#include #include #include +#include namespace folly { -/** Threadsafe TokenBucket implementation, based on the idea of - * converting tokens into time and maintaining state as a timestamp relative to - * now. The number of tokens available is represented by the delta between now - * and the timestamp, and the 'burst' is represented by the maximum delta. +/** + * Default clock class used by ParameterizedDynamicTokenBucket and derived + * classes. User-defined clock classes must be steady (monotonic) and define a + * static function std::chrono::duration<> timeSinceEpoch(). */ -class TokenBucket { - private: - std::atomic time_; - std::atomic secondsPerToken_; - std::atomic secondsPerBurst_; +struct DefaultTokenBucketClock { + static auto timeSinceEpoch() noexcept + -> decltype(std::chrono::steady_clock::now().time_since_epoch()) { + return std::chrono::steady_clock::now().time_since_epoch(); + } +}; +/** + * Thread-safe (atomic) token bucket implementation. + * + * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream + * of events with an average rate and some amount of burstiness. The canonical + * example is a packet switched network: the network can accept some number of + * bytes per second and the bytes come in finite packets (bursts). A token + * bucket stores up to a fixed number of tokens (the burst size). Some number + * of tokens are removed when an event occurs. The tokens are replenished at a + * fixed rate. + * + * This implementation records the last time it was updated. This allows the + * token bucket to add tokens "just in time" when tokens are requested. + * + * The "dynamic" base variant allows the token generation rate and maximum + * burst size to change with every token consumption. + * + * @tparam ClockT Clock type, must be steady i.e. monotonic. + */ +template +class ParameterizedDynamicTokenBucket { public: - TokenBucket(double rate, double burst, double nowInSeconds) noexcept - : time_(nowInSeconds) { - reset(rate, burst, nowInSeconds); + /** + * Constructor. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * buckets are "full" after construction. + */ + explicit ParameterizedDynamicTokenBucket(double zeroTime = 0) noexcept + : zeroTime_(zeroTime) {} + + /** + * Copy constructor. + * + * Thread-safe. (Copy constructors of derived classes may not be thread-safe + * however.) + */ + ParameterizedDynamicTokenBucket( + const ParameterizedDynamicTokenBucket& other) noexcept + : zeroTime_(other.zeroTime_.load()) {} + + /** + * Copy-assignment operator. + * + * Warning: not thread safe for the object being assigned to (including + * self-assignment). Thread-safe for the other object. + */ + ParameterizedDynamicTokenBucket& operator=( + const ParameterizedDynamicTokenBucket& other) noexcept { + zeroTime_ = other.zeroTime_.load(); + return *this; } - void reset(double rate, double burst, double nowInSeconds) noexcept { - double tokens = available(nowInSeconds); - - secondsPerToken_.store( - 1.0 / rate - std::numeric_limits::epsilon(), - std::memory_order_relaxed); - - secondsPerBurst_.store( - burst / rate + std::numeric_limits::epsilon(), - std::memory_order_relaxed); - - set_capacity(tokens, nowInSeconds); + /** + * Re-initialize token bucket. + * + * Thread-safe. + * + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is reset to "full". + */ + void reset(double zeroTime = 0) noexcept { + zeroTime_ = zeroTime; } - void set_capacity(double tokens, double nowInSeconds) noexcept { - const double secondsPerToken = std::atomic_load_explicit( - &secondsPerToken_, std::memory_order_relaxed); - - const double secondsPerBurst = std::atomic_load_explicit( - &secondsPerBurst_, std::memory_order_relaxed); - - double newTime = nowInSeconds - std::min( - tokens * secondsPerToken, secondsPerBurst); - - time_.store(newTime, std::memory_order_relaxed); + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept(noexcept(ClockT::timeSinceEpoch())) { + return std::chrono::duration_cast>( + ClockT::timeSinceEpoch()) + .count(); } - // If there are `tokens` avilable at `nowInSeconds`, consume them and - // return true. Otherwise, return false. - // - // This implementation is written in a lock-free manner using a - // compare-and-exchange loop, with branch prediction optimized to minimize - // time spent in the 'success' case which performs a write. - bool consume(double tokens, double nowInSeconds) noexcept { - const double secondsNeeded = tokens * std::atomic_load_explicit( - &secondsPerToken_, std::memory_order_relaxed); - - const double minTime = nowInSeconds - std::atomic_load_explicit( - &secondsPerBurst_, std::memory_order_relaxed); + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume( + double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) { + assert(rate > 0); + assert(burstSize > 0); + + return this->consumeImpl( + rate, burstSize, nowInSeconds, [toConsume](double& tokens) { + if (tokens < toConsume) { + return false; + } + tokens -= toConsume; + return true; + }); + } - double oldTime = - std::atomic_load_explicit(&time_, std::memory_order_relaxed); - double newTime = oldTime; + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param rate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain( + double toConsume, + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) { + assert(rate > 0); + assert(burstSize > 0); - // Limit the number of available tokens to 'burst'. We don't need to do - // this inside the loop because if we iterate more than once another - // caller will have performed an update that also covered this - // calculation. Also, tell the compiler to optimize branch prediction to - // minimize time spent between reads and writes in the success case - if (UNLIKELY(minTime > oldTime)) { - newTime = minTime; - } + double consumed; + this->consumeImpl( + rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) { + if (tokens < toConsume) { + consumed = tokens; + tokens = 0.0; + } else { + consumed = toConsume; + tokens -= toConsume; + } + return true; + }); + return consumed; + } - while (true) { - newTime += secondsNeeded; + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available( + double rate, + double burstSize, + double nowInSeconds = defaultClockNow()) const noexcept { + assert(rate > 0); + assert(burstSize > 0); + + return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize); + } - // Optimize for the write-contention case, to minimize the impact of - // branch misprediction on other threads - if (UNLIKELY(newTime > nowInSeconds)) { + private: + template + bool consumeImpl( + double rate, + double burstSize, + double nowInSeconds, + const TCallback& callback) { + auto zeroTimeOld = zeroTime_.load(); + double zeroTimeNew; + do { + auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); + if (!callback(tokens)) { return false; } - - // Optimize for the write-contention case, to minimize the impact of - // branch misprediction on other threads - if (LIKELY(std::atomic_compare_exchange_weak_explicit( - &time_, &oldTime, newTime, - std::memory_order_relaxed, std::memory_order_relaxed))) { - return true; - } - - newTime = oldTime; - } + zeroTimeNew = nowInSeconds - tokens / rate; + } while ( + UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew))); return true; } - // Similar to consume, but will always consume some number of tokens. - double consumeOrDrain(double tokens, double nowInSeconds) noexcept { - const double secondsPerToken = std::atomic_load_explicit( - &secondsPerToken_, std::memory_order_relaxed); - - const double secondsNeeded = tokens * secondsPerToken; - const double minTime = nowInSeconds - std::atomic_load_explicit( - &secondsPerBurst_, std::memory_order_relaxed); - - double oldTime = - std::atomic_load_explicit(&time_, std::memory_order_relaxed); - double newTime = oldTime; + FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic zeroTime_; +}; +/** + * Specialization of ParameterizedDynamicTokenBucket with a fixed token + * generation rate and a fixed maximum burst size. + */ +template +class ParameterizedTokenBucket { + private: + using Impl = ParameterizedDynamicTokenBucket; - // Limit the number of available tokens to 'burst'. - // Also, tell the compiler to optimize branch prediction to - // minimize time spent between reads and writes in the success case - if (UNLIKELY(minTime > oldTime)) { - newTime = minTime; - } + public: + /** + * Construct a token bucket with a specific maximum rate and burst size. + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param zeroTime Initial time at which to consider the token bucket + * starting to fill. Defaults to 0, so by default token + * bucket is "full" after construction. + */ + ParameterizedTokenBucket( + double genRate, + double burstSize, + double zeroTime = 0) noexcept + : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) { + assert(rate_ > 0); + assert(burstSize_ > 0); + } - double consumed; + /** + * Copy constructor. + * + * Warning: not thread safe! + */ + ParameterizedTokenBucket(const ParameterizedTokenBucket& other) noexcept = + default; + + /** + * Copy-assignment operator. + * + * Warning: not thread safe! + */ + ParameterizedTokenBucket& operator=( + const ParameterizedTokenBucket& other) noexcept = default; + + /** + * Returns the current time in seconds since Epoch. + */ + static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) { + return Impl::defaultClockNow(); + } - newTime += secondsNeeded; + /** + * Change rate and burst size. + * + * Warning: not thread safe! + * + * @param genRate Number of tokens to generate per second. + * @param burstSize Maximum burst size. Must be greater than 0. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void reset( + double genRate, + double burstSize, + double nowInSeconds = defaultClockNow()) noexcept { + assert(genRate > 0); + assert(burstSize > 0); + double availTokens = available(nowInSeconds); + rate_ = genRate; + burstSize_ = burstSize; + setCapacity(availTokens, nowInSeconds); + } - consumed = (newTime - nowInSeconds) / secondsPerToken; - time_.store(newTime, std::memory_order_relaxed); + /** + * Change number of tokens in bucket. + * + * Warning: not thread safe! + * + * @param tokens Desired number of tokens in bucket after the call. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + */ + void setCapacity(double tokens, double nowInSeconds) noexcept { + tokenBucket_.reset(nowInSeconds - tokens / rate_); + } - return consumed; + /** + * Attempts to consume some number of tokens. Tokens are first added to the + * bucket based on the time elapsed since the last attempt to consume tokens. + * Note: Attempts to consume more tokens than the burst size will always + * fail. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return True if the rate limit check passed, false otherwise. + */ + bool consume(double toConsume, double nowInSeconds = defaultClockNow()) { + return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); } - double available(double nowInSeconds = defaultClockNow()) const noexcept { - double time = - std::atomic_load_explicit(&time_, std::memory_order_relaxed); + /** + * Similar to consume, but always consumes some number of tokens. If the + * bucket contains enough tokens - consumes toConsume tokens. Otherwise the + * bucket is drained. + * + * Thread-safe. + * + * @param toConsume The number of tokens to consume. + * @param nowInSeconds Current time in seconds. Should be monotonically + * increasing from the nowInSeconds specified in + * this token bucket's constructor. + * @return number of tokens that were consumed. + */ + double consumeOrDrain( + double toConsume, + double nowInSeconds = defaultClockNow()) { + return tokenBucket_.consumeOrDrain( + toConsume, rate_, burstSize_, nowInSeconds); + } - double deltaTime = std::min( - std::atomic_load_explicit(&secondsPerBurst_, - std::memory_order_relaxed), - nowInSeconds - time); + /** + * Returns the number of tokens currently available. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double available(double nowInSeconds = defaultClockNow()) const { + return tokenBucket_.available(rate_, burstSize_, nowInSeconds); + } - return std::max(0.0, deltaTime / std::atomic_load_explicit( - &secondsPerToken_, std::memory_order_relaxed)); + /** + * Returns the number of tokens generated per second. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double rate() const noexcept { + return rate_; } - static double defaultClockNow() { - return std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch() - ).count() / 1000000.0; + /** + * Returns the maximum burst size. + * + * Thread-safe (but returned value may immediately be outdated). + */ + double burst() const noexcept { + return burstSize_; } + + private: + Impl tokenBucket_; + double rate_; + double burstSize_; }; -} +using TokenBucket = ParameterizedTokenBucket<>; +using DynamicTokenBucket = ParameterizedDynamicTokenBucket<>; +} // namespace folly