a726f2ee54d517a4e2535eaa58d118ca9345a604
[folly.git] / folly / stats / BucketedTimeSeries-defs.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <glog/logging.h>
21 #include <folly/Likely.h>
22
23 namespace folly {
24
25 template <typename VT, typename TT>
26 BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t nBuckets,
27                                                TimeType maxDuration)
28   : firstTime_(1),
29     latestTime_(0),
30     duration_(maxDuration) {
31   // For tracking all-time data we only use total_, and don't need to bother
32   // with buckets_
33   if (!isAllTime()) {
34     // Round nBuckets down to duration_.count().
35     //
36     // There is no point in having more buckets than our timestamp
37     // granularity: otherwise we would have buckets that could never be used.
38     if (nBuckets > size_t(duration_.count())) {
39       nBuckets = duration_.count();
40     }
41
42     buckets_.resize(nBuckets, Bucket());
43   }
44 }
45
46 template <typename VT, typename TT>
47 bool BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
48   return addValueAggregated(now, val, 1);
49 }
50
51 template <typename VT, typename TT>
52 bool BucketedTimeSeries<VT, TT>::addValue(TimeType now,
53                                           const ValueType& val,
54                                           int64_t times) {
55   return addValueAggregated(now, val * times, times);
56 }
57
58 template <typename VT, typename TT>
59 bool BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
60                                                     const ValueType& total,
61                                                     int64_t nsamples) {
62   if (isAllTime()) {
63     if (UNLIKELY(empty())) {
64       firstTime_ = now;
65       latestTime_ = now;
66     } else if (now > latestTime_) {
67       latestTime_ = now;
68     } else if (now < firstTime_) {
69       firstTime_ = now;
70     }
71     total_.add(total, nsamples);
72     return true;
73   }
74
75   size_t bucketIdx;
76   if (UNLIKELY(empty())) {
77     // First data point we've ever seen
78     firstTime_ = now;
79     latestTime_ = now;
80     bucketIdx = getBucketIdx(now);
81   } else if (now > latestTime_) {
82     // More recent time.  Need to update the buckets.
83     bucketIdx = updateBuckets(now);
84   } else if (LIKELY(now == latestTime_)) {
85     // Current time.
86     bucketIdx = getBucketIdx(now);
87   } else {
88     // An earlier time in the past.  We need to check if this time still falls
89     // within our window.
90     if (now < getEarliestTimeNonEmpty()) {
91       return false;
92     }
93     bucketIdx = getBucketIdx(now);
94   }
95
96   total_.add(total, nsamples);
97   buckets_[bucketIdx].add(total, nsamples);
98   return true;
99 }
100
101 template <typename VT, typename TT>
102 size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
103   if (empty()) {
104     // This is the first data point.
105     firstTime_ = now;
106   }
107
108   // For all-time data, all we need to do is update latestTime_
109   if (isAllTime()) {
110     latestTime_ = std::max(latestTime_, now);
111     return 0;
112   }
113
114   // Make sure time doesn't go backwards.
115   // If the time is less than or equal to the latest time we have already seen,
116   // we don't need to do anything.
117   if (now <= latestTime_) {
118     return getBucketIdx(latestTime_);
119   }
120
121   return updateBuckets(now);
122 }
123
124 template <typename VT, typename TT>
125 size_t BucketedTimeSeries<VT, TT>::updateBuckets(TimeType now) {
126   // We could cache nextBucketStart as a member variable, so we don't have to
127   // recompute it each time update() is called with a new timestamp value.
128   // This makes things faster when update() (or addValue()) is called once
129   // per second, but slightly slower when update() is called multiple times a
130   // second.  We care more about optimizing the cases where addValue() is being
131   // called frequently.  If addValue() is only being called once every few
132   // seconds, it doesn't matter as much if it is fast.
133
134   // Get info about the bucket that latestTime_ points at
135   size_t currentBucket;
136   TimeType currentBucketStart;
137   TimeType nextBucketStart;
138   getBucketInfo(latestTime_, &currentBucket,
139                 &currentBucketStart, &nextBucketStart);
140
141   // Update latestTime_
142   latestTime_ = now;
143
144   if (now < nextBucketStart) {
145     // We're still in the same bucket.
146     // We're done after updating latestTime_.
147     return currentBucket;
148   } else if (now >= currentBucketStart + duration_) {
149     // It's been a while.  We have wrapped, and all of the buckets need to be
150     // cleared.
151     for (Bucket& bucket : buckets_) {
152       bucket.clear();
153     }
154     total_.clear();
155     return getBucketIdx(latestTime_);
156   } else {
157     // clear all the buckets between the last time and current time, meaning
158     // buckets in the range [(currentBucket+1), newBucket]. Note that
159     // the bucket (currentBucket+1) is always the oldest bucket we have. Since
160     // our array is circular, loop when we reach the end.
161     size_t newBucket = getBucketIdx(now);
162     size_t idx = currentBucket;
163     while (idx != newBucket) {
164       ++idx;
165       if (idx >= buckets_.size()) {
166         idx = 0;
167       }
168       total_ -= buckets_[idx];
169       buckets_[idx].clear();
170     }
171     return newBucket;
172   }
173 }
174
175 template <typename VT, typename TT>
176 void BucketedTimeSeries<VT, TT>::clear() {
177   for (Bucket& bucket : buckets_) {
178     bucket.clear();
179   }
180   total_.clear();
181   // Set firstTime_ larger than latestTime_,
182   // to indicate that the timeseries is empty
183   firstTime_ = TimeType(1);
184   latestTime_ = TimeType(0);
185 }
186
187
188 template <typename VT, typename TT>
189 TT BucketedTimeSeries<VT, TT>::getEarliestTime() const {
190   if (empty()) {
191     return TimeType(0);
192   }
193   if (isAllTime()) {
194     return firstTime_;
195   }
196
197   // Compute the earliest time we can track
198   TimeType earliestTime = getEarliestTimeNonEmpty();
199
200   // We're never tracking data before firstTime_
201   earliestTime = std::max(earliestTime, firstTime_);
202
203   return earliestTime;
204 }
205
206 template <typename VT, typename TT>
207 TT BucketedTimeSeries<VT, TT>::getEarliestTimeNonEmpty() const {
208   size_t currentBucket;
209   TimeType currentBucketStart;
210   TimeType nextBucketStart;
211   getBucketInfo(latestTime_, &currentBucket,
212                 &currentBucketStart, &nextBucketStart);
213
214   // Subtract 1 duration from the start of the next bucket to find the
215   // earliest possible data point we could be tracking.
216   return nextBucketStart - duration_;
217 }
218
219 template <typename VT, typename TT>
220 TT BucketedTimeSeries<VT, TT>::elapsed() const {
221   if (empty()) {
222     return TimeType(0);
223   }
224
225   // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
226   return latestTime_ - getEarliestTime() + TimeType(1);
227 }
228
229 template <typename VT, typename TT>
230 TT BucketedTimeSeries<VT, TT>::elapsed(TimeType start, TimeType end) const {
231   if (empty()) {
232     return TimeType(0);
233   }
234   start = std::max(start, getEarliestTime());
235   end = std::min(end, latestTime_ + TimeType(1));
236   end = std::max(start, end);
237   return end - start;
238 }
239
240 template <typename VT, typename TT>
241 VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
242   ValueType total = ValueType();
243   forEachBucket(start, end, [&](const Bucket& bucket,
244                                 TimeType bucketStart,
245                                 TimeType nextBucketStart) -> bool {
246     total += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
247                              bucket.sum);
248     return true;
249   });
250
251   return total;
252 }
253
254 template <typename VT, typename TT>
255 uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
256   uint64_t sample_count = 0;
257   forEachBucket(start, end, [&](const Bucket& bucket,
258                                 TimeType bucketStart,
259                                 TimeType nextBucketStart) -> bool {
260     sample_count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
261                                bucket.count);
262     return true;
263   });
264
265   return sample_count;
266 }
267
268 template <typename VT, typename TT>
269 template <typename ReturnType>
270 ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
271   ValueType total = ValueType();
272   uint64_t sample_count = 0;
273   forEachBucket(start, end, [&](const Bucket& bucket,
274                                 TimeType bucketStart,
275                                 TimeType nextBucketStart) -> bool {
276     total += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
277                              bucket.sum);
278     sample_count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
279                                bucket.count);
280     return true;
281   });
282
283   if (sample_count == 0) {
284     return ReturnType(0);
285   }
286
287   return detail::avgHelper<ReturnType>(total, sample_count);
288 }
289
290 /*
291  * A note about some of the bucket index calculations below:
292  *
293  * buckets_.size() may not divide evenly into duration_.  When this happens,
294  * some buckets will be wider than others.  We still want to spread the data
295  * out as evenly as possible among the buckets (as opposed to just making the
296  * last bucket be significantly wider than all of the others).
297  *
298  * To make the division work out, we pretend that the buckets are each
299  * duration_ wide, so that the overall duration becomes
300  * buckets.size() * duration_.
301  *
302  * To transform a real timestamp into the scale used by our buckets,
303  * we have to multiply by buckets_.size().  To figure out which bucket it goes
304  * into, we then divide by duration_.
305  */
306
307 template <typename VT, typename TT>
308 size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
309   // For all-time data we don't use buckets_.  Everything is tracked in total_.
310   DCHECK(!isAllTime());
311
312   time %= duration_;
313   return time.count() * buckets_.size() / duration_.count();
314 }
315
316 /*
317  * Compute the bucket index for the specified time, as well as the earliest
318  * time that falls into this bucket.
319  */
320 template <typename VT, typename TT>
321 void BucketedTimeSeries<VT, TT>::getBucketInfo(
322     TimeType time, size_t *bucketIdx,
323     TimeType* bucketStart, TimeType* nextBucketStart) const {
324   typedef typename TimeType::rep TimeInt;
325   DCHECK(!isAllTime());
326
327   // Keep these two lines together.  The compiler should be able to compute
328   // both the division and modulus with a single operation.
329   TimeType timeMod = time % duration_;
330   TimeInt numFullDurations = time / duration_;
331
332   TimeInt scaledTime = timeMod.count() * buckets_.size();
333
334   // Keep these two lines together.  The compiler should be able to compute
335   // both the division and modulus with a single operation.
336   *bucketIdx = scaledTime / duration_.count();
337   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
338
339   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
340   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
341
342   TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
343                           buckets_.size());
344   TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
345                               buckets_.size());
346
347   TimeType durationStart(numFullDurations * duration_.count());
348   *bucketStart = bucketStartMod + durationStart;
349   *nextBucketStart = nextBucketStartMod + durationStart;
350 }
351
352 template <typename VT, typename TT>
353 template <typename Function>
354 void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
355   if (isAllTime()) {
356     fn(total_, firstTime_, latestTime_ + TimeType(1));
357     return;
358   }
359
360   typedef typename TimeType::rep TimeInt;
361
362   // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
363   // the same way as in getBucketInfo().
364   TimeType timeMod = latestTime_ % duration_;
365   TimeInt numFullDurations = latestTime_ / duration_;
366   TimeType durationStart(numFullDurations * duration_.count());
367   TimeInt scaledTime = timeMod.count() * buckets_.size();
368   size_t latestBucketIdx = scaledTime / duration_.count();
369   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
370   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
371   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
372
373   // Walk through the buckets, starting one past the current bucket.
374   // The next bucket is from the previous cycle, so subtract 1 duration
375   // from durationStart.
376   size_t idx = latestBucketIdx;
377   durationStart -= duration_;
378
379   TimeType nextBucketStart =
380     TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
381     durationStart;
382   while (true) {
383     ++idx;
384     if (idx >= buckets_.size()) {
385       idx = 0;
386       durationStart += duration_;
387       scaledNextBucketStart = duration_.count();
388     } else {
389       scaledNextBucketStart += duration_.count();
390     }
391
392     TimeType bucketStart = nextBucketStart;
393     nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
394                                buckets_.size()) + durationStart;
395
396     // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
397     // For now we go ahead and invoke the function with these buckets.
398     // sum and count should always be 0 in these buckets.
399
400     DCHECK_LE(bucketStart.count(), latestTime_.count());
401     bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
402     if (!ret) {
403       break;
404     }
405
406     if (idx == latestBucketIdx) {
407       // all done
408       break;
409     }
410   }
411 }
412
413 /*
414  * Adjust the input value from the specified bucket to only account
415  * for the desired range.
416  *
417  * For example, if the bucket spans time [10, 20), but we only care about the
418  * range [10, 16), this will return 60% of the input value.
419  */
420 template<typename VT, typename TT>
421 VT BucketedTimeSeries<VT, TT>::rangeAdjust(
422     TimeType bucketStart, TimeType nextBucketStart,
423     TimeType start, TimeType end, ValueType input) const {
424   // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
425   // if it were latestTime_.  This makes us more accurate when someone is
426   // querying for all of the data up to latestTime_.  Even though latestTime_
427   // may only be partially through the bucket, we don't want to adjust
428   // downwards in this case, because the bucket really only has data up to
429   // latestTime_.
430   if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
431     nextBucketStart = latestTime_ + TimeType(1);
432   }
433
434   if (start <= bucketStart && end >= nextBucketStart) {
435     // The bucket is wholly contained in the [start, end) interval
436     return input;
437   }
438
439   TimeType intervalStart = std::max(start, bucketStart);
440   TimeType intervalEnd = std::min(end, nextBucketStart);
441   return input * (intervalEnd - intervalStart) /
442     (nextBucketStart - bucketStart);
443 }
444
445 template <typename VT, typename TT>
446 template <typename Function>
447 void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
448                                                Function fn) const {
449   forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
450                                      TimeType nextBucketStart) -> bool {
451     if (start >= nextBucketStart) {
452       return true;
453     }
454     if (end <= bucketStart) {
455       return false;
456     }
457     bool ret = fn(bucket, bucketStart, nextBucketStart);
458     return ret;
459   });
460 }
461
462 } // folly