fix bootstrap on osx
[folly.git] / folly / TokenBucket.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <atomic>
21 #include <chrono>
22
23 #include <folly/Likely.h>
24 #include <folly/detail/CacheLocality.h>
25
26 namespace folly {
27
28 /**
29  * Default clock class used by ParameterizedDynamicTokenBucket and derived
30  * classes. User-defined clock classes must be steady (monotonic) and define a
31  * static function std::chrono::duration<> timeSinceEpoch().
32  */
33 struct DefaultTokenBucketClock {
34   static auto timeSinceEpoch() noexcept
35       -> decltype(std::chrono::steady_clock::now().time_since_epoch()) {
36     return std::chrono::steady_clock::now().time_since_epoch();
37   }
38 };
39
40 /**
41  * Thread-safe (atomic) token bucket implementation.
42  *
43  * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream
44  * of events with an average rate and some amount of burstiness. The canonical
45  * example is a packet switched network: the network can accept some number of
46  * bytes per second and the bytes come in finite packets (bursts). A token
47  * bucket stores up to a fixed number of tokens (the burst size). Some number
48  * of tokens are removed when an event occurs. The tokens are replenished at a
49  * fixed rate.
50  *
51  * This implementation records the last time it was updated. This allows the
52  * token bucket to add tokens "just in time" when tokens are requested.
53  *
54  * The "dynamic" base variant allows the token generation rate and maximum
55  * burst size to change with every token consumption.
56  *
57  * @tparam ClockT Clock type, must be steady i.e. monotonic.
58  */
59 template <typename ClockT = DefaultTokenBucketClock>
60 class ParameterizedDynamicTokenBucket {
61  public:
62   /**
63    * Constructor.
64    *
65    * @param zeroTime Initial time at which to consider the token bucket
66    *                 starting to fill. Defaults to 0, so by default token
67    *                 buckets are "full" after construction.
68    */
69   explicit ParameterizedDynamicTokenBucket(double zeroTime = 0) noexcept
70       : zeroTime_(zeroTime) {}
71
72   /**
73    * Copy constructor.
74    *
75    * Thread-safe. (Copy constructors of derived classes may not be thread-safe
76    * however.)
77    */
78   ParameterizedDynamicTokenBucket(
79       const ParameterizedDynamicTokenBucket& other) noexcept
80       : zeroTime_(other.zeroTime_.load()) {}
81
82   /**
83    * Copy-assignment operator.
84    *
85    * Warning: not thread safe for the object being assigned to (including
86    * self-assignment). Thread-safe for the other object.
87    */
88   ParameterizedDynamicTokenBucket& operator=(
89       const ParameterizedDynamicTokenBucket& other) noexcept {
90     zeroTime_ = other.zeroTime_.load();
91     return *this;
92   }
93
94   /**
95    * Re-initialize token bucket.
96    *
97    * Thread-safe.
98    *
99    * @param zeroTime Initial time at which to consider the token bucket
100    *                 starting to fill. Defaults to 0, so by default token
101    *                 bucket is reset to "full".
102    */
103   void reset(double zeroTime = 0) noexcept {
104     zeroTime_ = zeroTime;
105   }
106
107   /**
108    * Returns the current time in seconds since Epoch.
109    */
110   static double defaultClockNow() noexcept(noexcept(ClockT::timeSinceEpoch())) {
111     return std::chrono::duration_cast<std::chrono::duration<double>>(
112                ClockT::timeSinceEpoch())
113         .count();
114   }
115
116   /**
117    * Attempts to consume some number of tokens. Tokens are first added to the
118    * bucket based on the time elapsed since the last attempt to consume tokens.
119    * Note: Attempts to consume more tokens than the burst size will always
120    * fail.
121    *
122    * Thread-safe.
123    *
124    * @param toConsume The number of tokens to consume.
125    * @param rate Number of tokens to generate per second.
126    * @param burstSize Maximum burst size. Must be greater than 0.
127    * @param nowInSeconds Current time in seconds. Should be monotonically
128    *                     increasing from the nowInSeconds specified in
129    *                     this token bucket's constructor.
130    * @return True if the rate limit check passed, false otherwise.
131    */
132   bool consume(
133       double toConsume,
134       double rate,
135       double burstSize,
136       double nowInSeconds = defaultClockNow()) {
137     assert(rate > 0);
138     assert(burstSize > 0);
139
140     return this->consumeImpl(
141         rate, burstSize, nowInSeconds, [toConsume](double& tokens) {
142           if (tokens < toConsume) {
143             return false;
144           }
145           tokens -= toConsume;
146           return true;
147         });
148   }
149
150   /**
151    * Similar to consume, but always consumes some number of tokens.  If the
152    * bucket contains enough tokens - consumes toConsume tokens.  Otherwise the
153    * bucket is drained.
154    *
155    * Thread-safe.
156    *
157    * @param toConsume The number of tokens to consume.
158    * @param rate Number of tokens to generate per second.
159    * @param burstSize Maximum burst size. Must be greater than 0.
160    * @param nowInSeconds Current time in seconds. Should be monotonically
161    *                     increasing from the nowInSeconds specified in
162    *                     this token bucket's constructor.
163    * @return number of tokens that were consumed.
164    */
165   double consumeOrDrain(
166       double toConsume,
167       double rate,
168       double burstSize,
169       double nowInSeconds = defaultClockNow()) {
170     assert(rate > 0);
171     assert(burstSize > 0);
172
173     double consumed;
174     this->consumeImpl(
175         rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) {
176           if (tokens < toConsume) {
177             consumed = tokens;
178             tokens = 0.0;
179           } else {
180             consumed = toConsume;
181             tokens -= toConsume;
182           }
183           return true;
184         });
185     return consumed;
186   }
187
188   /**
189    * Returns the number of tokens currently available.
190    *
191    * Thread-safe (but returned value may immediately be outdated).
192    */
193   double available(
194       double rate,
195       double burstSize,
196       double nowInSeconds = defaultClockNow()) const noexcept {
197     assert(rate > 0);
198     assert(burstSize > 0);
199
200     return std::min((nowInSeconds - this->zeroTime_) * rate, burstSize);
201   }
202
203  private:
204   template <typename TCallback>
205   bool consumeImpl(
206       double rate,
207       double burstSize,
208       double nowInSeconds,
209       const TCallback& callback) {
210     auto zeroTimeOld = zeroTime_.load();
211     double zeroTimeNew;
212     do {
213       auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
214       if (!callback(tokens)) {
215         return false;
216       }
217       zeroTimeNew = nowInSeconds - tokens / rate;
218     } while (
219         UNLIKELY(!zeroTime_.compare_exchange_weak(zeroTimeOld, zeroTimeNew)));
220
221     return true;
222   }
223
224   FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<double> zeroTime_;
225 };
226
227 /**
228  * Specialization of ParameterizedDynamicTokenBucket with a fixed token
229  * generation rate and a fixed maximum burst size.
230  */
231 template <typename ClockT = DefaultTokenBucketClock>
232 class ParameterizedTokenBucket {
233  private:
234   using Impl = ParameterizedDynamicTokenBucket<ClockT>;
235
236  public:
237   /**
238    * Construct a token bucket with a specific maximum rate and burst size.
239    *
240    * @param genRate Number of tokens to generate per second.
241    * @param burstSize Maximum burst size. Must be greater than 0.
242    * @param zeroTime Initial time at which to consider the token bucket
243    *                 starting to fill. Defaults to 0, so by default token
244    *                 bucket is "full" after construction.
245    */
246   ParameterizedTokenBucket(
247       double genRate,
248       double burstSize,
249       double zeroTime = 0) noexcept
250       : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
251     assert(rate_ > 0);
252     assert(burstSize_ > 0);
253   }
254
255   /**
256    * Copy constructor.
257    *
258    * Warning: not thread safe!
259    */
260   ParameterizedTokenBucket(const ParameterizedTokenBucket& other) noexcept =
261       default;
262
263   /**
264    * Copy-assignment operator.
265    *
266    * Warning: not thread safe!
267    */
268   ParameterizedTokenBucket& operator=(
269       const ParameterizedTokenBucket& other) noexcept = default;
270
271   /**
272    * Returns the current time in seconds since Epoch.
273    */
274   static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
275     return Impl::defaultClockNow();
276   }
277
278   /**
279    * Change rate and burst size.
280    *
281    * Warning: not thread safe!
282    *
283    * @param genRate Number of tokens to generate per second.
284    * @param burstSize Maximum burst size. Must be greater than 0.
285    * @param nowInSeconds Current time in seconds. Should be monotonically
286    *                     increasing from the nowInSeconds specified in
287    *                     this token bucket's constructor.
288    */
289   void reset(
290       double genRate,
291       double burstSize,
292       double nowInSeconds = defaultClockNow()) noexcept {
293     assert(genRate > 0);
294     assert(burstSize > 0);
295     double availTokens = available(nowInSeconds);
296     rate_ = genRate;
297     burstSize_ = burstSize;
298     setCapacity(availTokens, nowInSeconds);
299   }
300
301   /**
302    * Change number of tokens in bucket.
303    *
304    * Warning: not thread safe!
305    *
306    * @param tokens Desired number of tokens in bucket after the call.
307    * @param nowInSeconds Current time in seconds. Should be monotonically
308    *                     increasing from the nowInSeconds specified in
309    *                     this token bucket's constructor.
310    */
311   void setCapacity(double tokens, double nowInSeconds) noexcept {
312     tokenBucket_.reset(nowInSeconds - tokens / rate_);
313   }
314
315   /**
316    * Attempts to consume some number of tokens. Tokens are first added to the
317    * bucket based on the time elapsed since the last attempt to consume tokens.
318    * Note: Attempts to consume more tokens than the burst size will always
319    * fail.
320    *
321    * Thread-safe.
322    *
323    * @param toConsume The number of tokens to consume.
324    * @param nowInSeconds Current time in seconds. Should be monotonically
325    *                     increasing from the nowInSeconds specified in
326    *                     this token bucket's constructor.
327    * @return True if the rate limit check passed, false otherwise.
328    */
329   bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
330     return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
331   }
332
333   /**
334    * Similar to consume, but always consumes some number of tokens.  If the
335    * bucket contains enough tokens - consumes toConsume tokens.  Otherwise the
336    * bucket is drained.
337    *
338    * Thread-safe.
339    *
340    * @param toConsume The number of tokens to consume.
341    * @param nowInSeconds Current time in seconds. Should be monotonically
342    *                     increasing from the nowInSeconds specified in
343    *                     this token bucket's constructor.
344    * @return number of tokens that were consumed.
345    */
346   double consumeOrDrain(
347       double toConsume,
348       double nowInSeconds = defaultClockNow()) {
349     return tokenBucket_.consumeOrDrain(
350         toConsume, rate_, burstSize_, nowInSeconds);
351   }
352
353   /**
354    * Returns the number of tokens currently available.
355    *
356    * Thread-safe (but returned value may immediately be outdated).
357    */
358   double available(double nowInSeconds = defaultClockNow()) const {
359     return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
360   }
361
362   /**
363    * Returns the number of tokens generated per second.
364    *
365    * Thread-safe (but returned value may immediately be outdated).
366    */
367   double rate() const noexcept {
368     return rate_;
369   }
370
371   /**
372    * Returns the maximum burst size.
373    *
374    * Thread-safe (but returned value may immediately be outdated).
375    */
376   double burst() const noexcept {
377     return burstSize_;
378   }
379
380  private:
381   Impl tokenBucket_;
382   double rate_;
383   double burstSize_;
384 };
385
386 using TokenBucket = ParameterizedTokenBucket<>;
387 using DynamicTokenBucket = ParameterizedDynamicTokenBucket<>;
388 }