e9a749a7b43f21152b6f3d6d9bfaf2f757f7e501
[folly.git] / folly / stats / BucketedTimeSeries-defs.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
18 #define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
19
20 #include <algorithm>
21 #include <glog/logging.h>
22 #include <folly/Likely.h>
23
24 namespace folly {
25
26 template <typename VT, typename TT>
27 BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t nBuckets,
28                                                TimeType maxDuration)
29   : firstTime_(1),
30     latestTime_(0),
31     duration_(maxDuration) {
32   // For tracking all-time data we only use total_, and don't need to bother
33   // with buckets_
34   if (!isAllTime()) {
35     // Round nBuckets down to duration_.count().
36     //
37     // There is no point in having more buckets than our timestamp
38     // granularity: otherwise we would have buckets that could never be used.
39     if (nBuckets > size_t(duration_.count())) {
40       nBuckets = duration_.count();
41     }
42
43     buckets_.resize(nBuckets, Bucket());
44   }
45 }
46
47 template <typename VT, typename TT>
48 bool BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
49   return addValueAggregated(now, val, 1);
50 }
51
52 template <typename VT, typename TT>
53 bool BucketedTimeSeries<VT, TT>::addValue(TimeType now,
54                                           const ValueType& val,
55                                           int64_t times) {
56   return addValueAggregated(now, val * times, times);
57 }
58
59 template <typename VT, typename TT>
60 bool BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
61                                                     const ValueType& total,
62                                                     int64_t nsamples) {
63   if (isAllTime()) {
64     if (UNLIKELY(empty())) {
65       firstTime_ = now;
66       latestTime_ = now;
67     } else if (now > latestTime_) {
68       latestTime_ = now;
69     } else if (now < firstTime_) {
70       firstTime_ = now;
71     }
72     total_.add(total, nsamples);
73     return true;
74   }
75
76   size_t bucketIdx;
77   if (UNLIKELY(empty())) {
78     // First data point we've ever seen
79     firstTime_ = now;
80     latestTime_ = now;
81     bucketIdx = getBucketIdx(now);
82   } else if (now > latestTime_) {
83     // More recent time.  Need to update the buckets.
84     bucketIdx = updateBuckets(now);
85   } else if (LIKELY(now == latestTime_)) {
86     // Current time.
87     bucketIdx = getBucketIdx(now);
88   } else {
89     // An earlier time in the past.  We need to check if this time still falls
90     // within our window.
91     if (now < getEarliestTimeNonEmpty()) {
92       return false;
93     }
94     bucketIdx = getBucketIdx(now);
95   }
96
97   total_.add(total, nsamples);
98   buckets_[bucketIdx].add(total, nsamples);
99   return true;
100 }
101
102 template <typename VT, typename TT>
103 size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
104   if (empty()) {
105     // This is the first data point.
106     firstTime_ = now;
107   }
108
109   // For all-time data, all we need to do is update latestTime_
110   if (isAllTime()) {
111     latestTime_ = std::max(latestTime_, now);
112     return 0;
113   }
114
115   // Make sure time doesn't go backwards.
116   // If the time is less than or equal to the latest time we have already seen,
117   // we don't need to do anything.
118   if (now <= latestTime_) {
119     return getBucketIdx(latestTime_);
120   }
121
122   return updateBuckets(now);
123 }
124
125 template <typename VT, typename TT>
126 size_t BucketedTimeSeries<VT, TT>::updateBuckets(TimeType now) {
127   // We could cache nextBucketStart as a member variable, so we don't have to
128   // recompute it each time update() is called with a new timestamp value.
129   // This makes things faster when update() (or addValue()) is called once
130   // per second, but slightly slower when update() is called multiple times a
131   // second.  We care more about optimizing the cases where addValue() is being
132   // called frequently.  If addValue() is only being called once every few
133   // seconds, it doesn't matter as much if it is fast.
134
135   // Get info about the bucket that latestTime_ points at
136   size_t currentBucket;
137   TimeType currentBucketStart;
138   TimeType nextBucketStart;
139   getBucketInfo(latestTime_, &currentBucket,
140                 &currentBucketStart, &nextBucketStart);
141
142   // Update latestTime_
143   latestTime_ = now;
144
145   if (now < nextBucketStart) {
146     // We're still in the same bucket.
147     // We're done after updating latestTime_.
148     return currentBucket;
149   } else if (now >= currentBucketStart + duration_) {
150     // It's been a while.  We have wrapped, and all of the buckets need to be
151     // cleared.
152     for (Bucket& bucket : buckets_) {
153       bucket.clear();
154     }
155     total_.clear();
156     return getBucketIdx(latestTime_);
157   } else {
158     // clear all the buckets between the last time and current time, meaning
159     // buckets in the range [(currentBucket+1), newBucket]. Note that
160     // the bucket (currentBucket+1) is always the oldest bucket we have. Since
161     // our array is circular, loop when we reach the end.
162     size_t newBucket = getBucketIdx(now);
163     size_t idx = currentBucket;
164     while (idx != newBucket) {
165       ++idx;
166       if (idx >= buckets_.size()) {
167         idx = 0;
168       }
169       total_ -= buckets_[idx];
170       buckets_[idx].clear();
171     }
172     return newBucket;
173   }
174 }
175
176 template <typename VT, typename TT>
177 void BucketedTimeSeries<VT, TT>::clear() {
178   for (Bucket& bucket : buckets_) {
179     bucket.clear();
180   }
181   total_.clear();
182   // Set firstTime_ larger than latestTime_,
183   // to indicate that the timeseries is empty
184   firstTime_ = TimeType(1);
185   latestTime_ = TimeType(0);
186 }
187
188
189 template <typename VT, typename TT>
190 TT BucketedTimeSeries<VT, TT>::getEarliestTime() const {
191   if (empty()) {
192     return TimeType(0);
193   }
194   if (isAllTime()) {
195     return firstTime_;
196   }
197
198   // Compute the earliest time we can track
199   TimeType earliestTime = getEarliestTimeNonEmpty();
200
201   // We're never tracking data before firstTime_
202   earliestTime = std::max(earliestTime, firstTime_);
203
204   return earliestTime;
205 }
206
207 template <typename VT, typename TT>
208 TT BucketedTimeSeries<VT, TT>::getEarliestTimeNonEmpty() const {
209   size_t currentBucket;
210   TimeType currentBucketStart;
211   TimeType nextBucketStart;
212   getBucketInfo(latestTime_, &currentBucket,
213                 &currentBucketStart, &nextBucketStart);
214
215   // Subtract 1 duration from the start of the next bucket to find the
216   // earliest possible data point we could be tracking.
217   return nextBucketStart - duration_;
218 }
219
220 template <typename VT, typename TT>
221 TT BucketedTimeSeries<VT, TT>::elapsed() const {
222   if (empty()) {
223     return TimeType(0);
224   }
225
226   // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
227   return latestTime_ - getEarliestTime() + TimeType(1);
228 }
229
230 template <typename VT, typename TT>
231 TT BucketedTimeSeries<VT, TT>::elapsed(TimeType start, TimeType end) const {
232   if (empty()) {
233     return TimeType(0);
234   }
235   start = std::max(start, getEarliestTime());
236   end = std::min(end, latestTime_ + TimeType(1));
237   end = std::max(start, end);
238   return end - start;
239 }
240
241 template <typename VT, typename TT>
242 VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
243   ValueType total = ValueType();
244   forEachBucket(start, end, [&](const Bucket& bucket,
245                                 TimeType bucketStart,
246                                 TimeType nextBucketStart) -> bool {
247     total += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
248                              bucket.sum);
249     return true;
250   });
251
252   return total;
253 }
254
255 template <typename VT, typename TT>
256 uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
257   uint64_t sample_count = 0;
258   forEachBucket(start, end, [&](const Bucket& bucket,
259                                 TimeType bucketStart,
260                                 TimeType nextBucketStart) -> bool {
261     sample_count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
262                                bucket.count);
263     return true;
264   });
265
266   return sample_count;
267 }
268
269 template <typename VT, typename TT>
270 template <typename ReturnType>
271 ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
272   ValueType total = ValueType();
273   uint64_t sample_count = 0;
274   forEachBucket(start, end, [&](const Bucket& bucket,
275                                 TimeType bucketStart,
276                                 TimeType nextBucketStart) -> bool {
277     total += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
278                              bucket.sum);
279     sample_count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
280                                bucket.count);
281     return true;
282   });
283
284   if (sample_count == 0) {
285     return ReturnType(0);
286   }
287
288   return detail::avgHelper<ReturnType>(total, sample_count);
289 }
290
291 /*
292  * A note about some of the bucket index calculations below:
293  *
294  * buckets_.size() may not divide evenly into duration_.  When this happens,
295  * some buckets will be wider than others.  We still want to spread the data
296  * out as evenly as possible among the buckets (as opposed to just making the
297  * last bucket be significantly wider than all of the others).
298  *
299  * To make the division work out, we pretend that the buckets are each
300  * duration_ wide, so that the overall duration becomes
301  * buckets.size() * duration_.
302  *
303  * To transform a real timestamp into the scale used by our buckets,
304  * we have to multiply by buckets_.size().  To figure out which bucket it goes
305  * into, we then divide by duration_.
306  */
307
308 template <typename VT, typename TT>
309 size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
310   // For all-time data we don't use buckets_.  Everything is tracked in total_.
311   DCHECK(!isAllTime());
312
313   time %= duration_;
314   return time.count() * buckets_.size() / duration_.count();
315 }
316
317 /*
318  * Compute the bucket index for the specified time, as well as the earliest
319  * time that falls into this bucket.
320  */
321 template <typename VT, typename TT>
322 void BucketedTimeSeries<VT, TT>::getBucketInfo(
323     TimeType time, size_t *bucketIdx,
324     TimeType* bucketStart, TimeType* nextBucketStart) const {
325   typedef typename TimeType::rep TimeInt;
326   DCHECK(!isAllTime());
327
328   // Keep these two lines together.  The compiler should be able to compute
329   // both the division and modulus with a single operation.
330   TimeType timeMod = time % duration_;
331   TimeInt numFullDurations = time / duration_;
332
333   TimeInt scaledTime = timeMod.count() * buckets_.size();
334
335   // Keep these two lines together.  The compiler should be able to compute
336   // both the division and modulus with a single operation.
337   *bucketIdx = scaledTime / duration_.count();
338   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
339
340   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
341   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
342
343   TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
344                           buckets_.size());
345   TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
346                               buckets_.size());
347
348   TimeType durationStart(numFullDurations * duration_.count());
349   *bucketStart = bucketStartMod + durationStart;
350   *nextBucketStart = nextBucketStartMod + durationStart;
351 }
352
353 template <typename VT, typename TT>
354 template <typename Function>
355 void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
356   if (isAllTime()) {
357     fn(total_, firstTime_, latestTime_ + TimeType(1));
358     return;
359   }
360
361   typedef typename TimeType::rep TimeInt;
362
363   // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
364   // the same way as in getBucketInfo().
365   TimeType timeMod = latestTime_ % duration_;
366   TimeInt numFullDurations = latestTime_ / duration_;
367   TimeType durationStart(numFullDurations * duration_.count());
368   TimeInt scaledTime = timeMod.count() * buckets_.size();
369   size_t latestBucketIdx = scaledTime / duration_.count();
370   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
371   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
372   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
373
374   // Walk through the buckets, starting one past the current bucket.
375   // The next bucket is from the previous cycle, so subtract 1 duration
376   // from durationStart.
377   size_t idx = latestBucketIdx;
378   durationStart -= duration_;
379
380   TimeType nextBucketStart =
381     TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
382     durationStart;
383   while (true) {
384     ++idx;
385     if (idx >= buckets_.size()) {
386       idx = 0;
387       durationStart += duration_;
388       scaledNextBucketStart = duration_.count();
389     } else {
390       scaledNextBucketStart += duration_.count();
391     }
392
393     TimeType bucketStart = nextBucketStart;
394     nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
395                                buckets_.size()) + durationStart;
396
397     // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
398     // For now we go ahead and invoke the function with these buckets.
399     // sum and count should always be 0 in these buckets.
400
401     DCHECK_LE(bucketStart.count(), latestTime_.count());
402     bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
403     if (!ret) {
404       break;
405     }
406
407     if (idx == latestBucketIdx) {
408       // all done
409       break;
410     }
411   }
412 }
413
414 /*
415  * Adjust the input value from the specified bucket to only account
416  * for the desired range.
417  *
418  * For example, if the bucket spans time [10, 20), but we only care about the
419  * range [10, 16), this will return 60% of the input value.
420  */
421 template<typename VT, typename TT>
422 VT BucketedTimeSeries<VT, TT>::rangeAdjust(
423     TimeType bucketStart, TimeType nextBucketStart,
424     TimeType start, TimeType end, ValueType input) const {
425   // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
426   // if it were latestTime_.  This makes us more accurate when someone is
427   // querying for all of the data up to latestTime_.  Even though latestTime_
428   // may only be partially through the bucket, we don't want to adjust
429   // downwards in this case, because the bucket really only has data up to
430   // latestTime_.
431   if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
432     nextBucketStart = latestTime_ + TimeType(1);
433   }
434
435   if (start <= bucketStart && end >= nextBucketStart) {
436     // The bucket is wholly contained in the [start, end) interval
437     return input;
438   }
439
440   TimeType intervalStart = std::max(start, bucketStart);
441   TimeType intervalEnd = std::min(end, nextBucketStart);
442   return input * (intervalEnd - intervalStart) /
443     (nextBucketStart - bucketStart);
444 }
445
446 template <typename VT, typename TT>
447 template <typename Function>
448 void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
449                                                Function fn) const {
450   forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
451                                      TimeType nextBucketStart) -> bool {
452     if (start >= nextBucketStart) {
453       return true;
454     }
455     if (end <= bucketStart) {
456       return false;
457     }
458     bool ret = fn(bucket, bucketStart, nextBucketStart);
459     return ret;
460   });
461 }
462
463 } // folly
464
465 #endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_