Generalized and polished folly::TokenBucket
[folly.git] / folly / TokenBucket.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <atomic>
21 #include <chrono>
22
23 #include <folly/Likely.h>
24 #include <folly/detail/CacheLocality.h>
25
26 namespace folly {
27
28 /**
29  * Default clock class used by ParameterizedDynamicTokenBucket and derived
30  * classes. User-defined clock classes must be steady (monotonic) and define a
31  * static function std::chrono::duration<> timeSinceEpoch().
32  */
33 struct DefaultTokenBucketClock {
34   static auto timeSinceEpoch() noexcept
35       -> decltype(std::chrono::steady_clock::now().time_since_epoch()) {
36     return std::chrono::steady_clock::now().time_since_epoch();
37   }
38 };
39
40 /**
41  * Thread-safe (atomic) token bucket implementation.
42  *
43  * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream
44  * of events with an average rate and some amount of burstiness. The canonical
45  * example is a packet switched network: the network can accept some number of
46  * bytes per second and the bytes come in finite packets (bursts). A token
47  * bucket stores up to a fixed number of tokens (the burst size). Some number
48  * of tokens are removed when an event occurs. The tokens are replenished at a
49  * fixed rate.
50  *
51  * This implementation records the last time it was updated. This allows the
52  * token bucket to add tokens "just in time" when tokens are requested.
53  *
54  * The "dynamic" base variant allows the token generation rate and maximum
55  * burst size to change with every token consumption.
56  *
57  * @tparam ClockT Clock type, must be steady i.e. monotonic.
58  */
59 template <typename ClockT = DefaultTokenBucketClock>
60 class ParameterizedDynamicTokenBucket {
61  public:
62   /**
63    * Constructor.
64    *
65    * @param zeroTime Initial time at which to consider the token bucket
66    *                 starting to fill. Defaults to 0, so by default token
67    *                 buckets are "full" after construction.
68    */
69   explicit ParameterizedDynamicTokenBucket(double zeroTime = 0) noexcept
70       : zeroTime_(zeroTime) {}
71
72   /**
73    * Copy constructor.
74    *
75    * Thread-safe. (Copy constructors of derived classes may not be thread-safe
76    * however.)
77    */
78   ParameterizedDynamicTokenBucket(
79       const ParameterizedDynamicTokenBucket& other) noexcept
80       : zeroTime_(other.zeroTime_.load()) {}
81
82   /**
83    * Copy-assignment operator.
84    *
85    * Warning: not thread safe for the object being assigned to (including
86    * self-assignment). Thread-safe for the other object.
87    */
88   ParameterizedDynamicTokenBucket& operator=(
89       const ParameterizedDynamicTokenBucket& other) noexcept {
90     zeroTime_ = other.zeroTime_.load();
91     return *this;
92   }
93
94   /**
95    * Re-initialize token bucket.
96    *
97    * Thread-safe.
98    *
99    * @param zeroTime Initial time at which to consider the token bucket
100    *                 starting to fill. Defaults to 0, so by default token
101    *                 bucket is reset to "full".
102    */
103   void reset(double zeroTime = 0) noexcept {
104     zeroTime_ = zeroTime;
105   }
106
107   /**
108    * Attempts to consume some number of tokens. Tokens are first added to the
109    * bucket based on the time elapsed since the last attempt to consume tokens.
110    * Note: Attempts to consume more tokens than the burst size will always
111    * fail.
112    *
113    * Thread-safe.
114    *
115    * @param toConsume The number of tokens to consume.
116    * @param rate Number of tokens to generate per second.
117    * @param burstSize Maximum burst size. Must be greater than 0.
118    * @param nowInSeconds Current time in seconds. Should be monotonically
119    *                     increasing from the nowInSeconds specified in
120    *                     this token bucket's constructor.
121    * @return True if the rate limit check passed, false otherwise.
122    */
123   bool consume(
124       double toConsume,
125       double rate,
126       double burstSize,
127       double nowInSeconds = defaultClockNow()) {
128     assert(rate > 0);
129     assert(burstSize > 0);
130
131     return this->consumeImpl(
132         rate, burstSize, nowInSeconds, [toConsume](double& tokens) {
133           if (tokens < toConsume) {
134             return false;
135           }
136           tokens -= toConsume;
137           return true;
138         });
139   }
140
141   /**
142    * Similar to consume, but always consumes some number of tokens.  If the
143    * bucket contains enough tokens - consumes toConsume tokens.  Otherwise the
144    * bucket is drained.
145    *
146    * Thread-safe.
147    *
148    * @param toConsume The number of tokens to consume.
149    * @param rate Number of tokens to generate per second.
150    * @param burstSize Maximum burst size. Must be greater than 0.
151    * @param nowInSeconds Current time in seconds. Should be monotonically
152    *                     increasing from the nowInSeconds specified in
153    *                     this token bucket's constructor.
154    * @return number of tokens that were consumed.
155    */
156   double consumeOrDrain(
157       double toConsume,
158       double rate,
159       double burstSize,
160       double nowInSeconds = defaultClockNow()) {
161     assert(rate > 0);
162     assert(burstSize > 0);
163
164     double consumed;
165     this->consumeImpl(
166         rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) {
167           if (tokens < toConsume) {
168             consumed = tokens;
169             tokens = 0.0;
170           } else {
171             consumed = toConsume;
172             tokens -= toConsume;
173           }
174           return true;
175         });
176     return consumed;
177   }
178
179   /**
180    * Returns the number of tokens currently available.
181    *
182    * Thread-safe (but returned value may immediately be outdated).
183    */
184   double available(
185       double rate,
186       double burstSize,
187       double nowInSeconds = defaultClockNow()) const noexcept {
188     assert(rate > 0);
189     assert(burstSize > 0);
190
191     return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize);
192   }
193
194   /**
195    * Returns the current time in seconds since Epoch.
196    */
197   static double defaultClockNow() noexcept(noexcept(ClockT::timeSinceEpoch())) {
198     return std::chrono::duration_cast<std::chrono::duration<double>>(
199                ClockT::timeSinceEpoch())
200         .count();
201   }
202
203  private:
204   template <typename TCallback>
205   bool consumeImpl(
206       double rate,
207       double burstSize,
208       double nowInSeconds,
209       const TCallback& callback) {
210     auto zeroTimeOld = zeroTime_.load();
211     double zeroTimeNew;
212     do {
213       auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
214       if (!callback(tokens)) {
215         return false;
216       }
217       zeroTimeNew = nowInSeconds - tokens / rate;
218     } while (
219         UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
220
221     return true;
222   }
223
224   std::atomic<double> zeroTime_ FOLLY_ALIGN_TO_AVOID_FALSE_SHARING;
225 };
226
227 /**
228  * Specialization of ParameterizedDynamicTokenBucket with a fixed token
229  * generation rate and a fixed maximum burst size.
230  */
231 template <typename ClockT = DefaultTokenBucketClock>
232 class ParameterizedTokenBucket {
233  private:
234   using Impl = ParameterizedDynamicTokenBucket<ClockT>;
235
236  public:
237   /**
238    * Construct a token bucket with a specific maximum rate and burst size.
239    *
240    * @param genRate Number of tokens to generate per second.
241    * @param burstSize Maximum burst size. Must be greater than 0.
242    * @param zeroTime Initial time at which to consider the token bucket
243    *                 starting to fill. Defaults to 0, so by default token
244    *                 bucket is "full" after construction.
245    */
246   ParameterizedTokenBucket(
247       double genRate,
248       double burstSize,
249       double zeroTime = 0) noexcept
250       : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
251     assert(rate_ > 0);
252     assert(burstSize_ > 0);
253   }
254
255   /**
256    * Copy constructor.
257    *
258    * Warning: not thread safe!
259    */
260   ParameterizedTokenBucket(const ParameterizedTokenBucket& other) noexcept =
261       default;
262
263   /**
264    * Copy-assignment operator.
265    *
266    * Warning: not thread safe!
267    */
268   ParameterizedTokenBucket& operator=(
269       const ParameterizedTokenBucket& other) noexcept = default;
270
271   /**
272    * Change rate and burst size.
273    *
274    * Warning: not thread safe!
275    *
276    * @param genRate Number of tokens to generate per second.
277    * @param burstSize Maximum burst size. Must be greater than 0.
278    * @param nowInSeconds Current time in seconds. Should be monotonically
279    *                     increasing from the nowInSeconds specified in
280    *                     this token bucket's constructor.
281    */
282   void reset(
283       double genRate,
284       double burstSize,
285       double nowInSeconds = defaultClockNow()) noexcept {
286     assert(genRate > 0);
287     assert(burstSize > 0);
288     double availTokens = available(nowInSeconds);
289     rate_ = genRate;
290     burstSize_ = burstSize;
291     setCapacity(availTokens, nowInSeconds);
292   }
293
294   /**
295    * Change number of tokens in bucket.
296    *
297    * Warning: not thread safe!
298    *
299    * @param tokens Desired number of tokens in bucket after the call.
300    * @param nowInSeconds Current time in seconds. Should be monotonically
301    *                     increasing from the nowInSeconds specified in
302    *                     this token bucket's constructor.
303    */
304   void setCapacity(double tokens, double nowInSeconds) noexcept {
305     tokenBucket_.reset(nowInSeconds - tokens / rate_);
306   }
307
308   /**
309    * Attempts to consume some number of tokens. Tokens are first added to the
310    * bucket based on the time elapsed since the last attempt to consume tokens.
311    * Note: Attempts to consume more tokens than the burst size will always
312    * fail.
313    *
314    * Thread-safe.
315    *
316    * @param toConsume The number of tokens to consume.
317    * @param nowInSeconds Current time in seconds. Should be monotonically
318    *                     increasing from the nowInSeconds specified in
319    *                     this token bucket's constructor.
320    * @return True if the rate limit check passed, false otherwise.
321    */
322   bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
323     return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
324   }
325
326   /**
327    * Similar to consume, but always consumes some number of tokens.  If the
328    * bucket contains enough tokens - consumes toConsume tokens.  Otherwise the
329    * bucket is drained.
330    *
331    * Thread-safe.
332    *
333    * @param toConsume The number of tokens to consume.
334    * @param nowInSeconds Current time in seconds. Should be monotonically
335    *                     increasing from the nowInSeconds specified in
336    *                     this token bucket's constructor.
337    * @return number of tokens that were consumed.
338    */
339   double consumeOrDrain(
340       double toConsume,
341       double nowInSeconds = defaultClockNow()) {
342     return tokenBucket_.consumeOrDrain(
343         toConsume, rate_, burstSize_, nowInSeconds);
344   }
345
346   /**
347    * Returns the number of tokens currently available.
348    *
349    * Thread-safe (but returned value may immediately be outdated).
350    */
351   double available(double nowInSeconds = defaultClockNow()) const {
352     return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
353   }
354
355   /**
356    * Returns the number of tokens generated per second.
357    *
358    * Thread-safe (but returned value may immediately be outdated).
359    */
360   double rate() const noexcept {
361     return rate_;
362   }
363
364   /**
365    * Returns the maximum burst size.
366    *
367    * Thread-safe (but returned value may immediately be outdated).
368    */
369   double burst() const noexcept {
370     return burstSize_;
371   }
372
373   /**
374    * Returns the current time in seconds since Epoch.
375    */
376   static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
377     return Impl::defaultClockNow();
378   }
379
380  private:
381   Impl tokenBucket_;
382   double rate_;
383   double burstSize_;
384 };
385
386 using TokenBucket = ParameterizedTokenBucket<>;
387 using DynamicTokenBucket = ParameterizedDynamicTokenBucket<>;
388 }