add FOLLY_SANITIZE macro
[folly.git] / folly / stats / BucketedTimeSeries-defs.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Likely.h>
20 #include <folly/stats/BucketedTimeSeries.h>
21 #include <glog/logging.h>
22 #include <algorithm>
23 #include <stdexcept>
24
25 namespace folly {
26
27 template <typename VT, typename CT>
28 BucketedTimeSeries<VT, CT>::BucketedTimeSeries(
29     size_t nBuckets,
30     Duration maxDuration)
31     : firstTime_(Duration(1)), latestTime_(), duration_(maxDuration) {
32   // For tracking all-time data we only use total_, and don't need to bother
33   // with buckets_
34   if (!isAllTime()) {
35     // Round nBuckets down to duration_.count().
36     //
37     // There is no point in having more buckets than our timestamp
38     // granularity: otherwise we would have buckets that could never be used.
39     if (nBuckets > size_t(duration_.count())) {
40       nBuckets = size_t(duration_.count());
41     }
42
43     buckets_.resize(nBuckets, Bucket());
44   }
45 }
46
47 template <typename VT, typename CT>
48 BucketedTimeSeries<VT, CT>::BucketedTimeSeries(
49     TimePoint theFirstTime,
50     TimePoint theLatestTime,
51     Duration maxDuration,
52     const std::vector<Bucket>& bucketsList)
53     : firstTime_(theFirstTime),
54       latestTime_(theLatestTime),
55       duration_(maxDuration),
56       buckets_(bucketsList) {
57   // Come up with the total_ from buckets_ being passed in
58   for (auto const& bucket : buckets_) {
59     total_.add(bucket.sum, bucket.count);
60   }
61
62   // Verify the integrity of the data
63
64   // If firstTime is greater than latestTime, the total count should be 0.
65   // (firstTime being greater than latestTime means that no data points have
66   // ever been added to the time series.)
67   if (firstTime_ > latestTime_ && (total_.sum != 0 || total_.count != 0)) {
68     throw std::invalid_argument(
69         "The total should have been 0 "
70         "if firstTime is greater than lastestTime");
71   }
72
73   // If firstTime is less than or equal to latestTime,
74   // latestTime - firstTime should be less than or equal to the duration.
75   if (firstTime_ <= latestTime_ && latestTime_ - firstTime_ > duration_) {
76     throw std::invalid_argument(
77         "The difference between firstTime and latestTime "
78         "should be less than or equal to the duration");
79   }
80 }
81
82 template <typename VT, typename CT>
83 bool BucketedTimeSeries<VT, CT>::addValue(TimePoint now, const ValueType& val) {
84   return addValueAggregated(now, val, 1);
85 }
86
87 template <typename VT, typename CT>
88 bool BucketedTimeSeries<VT, CT>::addValue(
89     TimePoint now,
90     const ValueType& val,
91     uint64_t times) {
92   return addValueAggregated(now, val * ValueType(times), times);
93 }
94
95 template <typename VT, typename CT>
96 bool BucketedTimeSeries<VT, CT>::addValueAggregated(
97     TimePoint now,
98     const ValueType& total,
99     uint64_t nsamples) {
100   if (isAllTime()) {
101     if (UNLIKELY(empty())) {
102       firstTime_ = now;
103       latestTime_ = now;
104     } else if (now > latestTime_) {
105       latestTime_ = now;
106     } else if (now < firstTime_) {
107       firstTime_ = now;
108     }
109     total_.add(total, nsamples);
110     return true;
111   }
112
113   size_t bucketIdx;
114   if (UNLIKELY(empty())) {
115     // First data point we've ever seen
116     firstTime_ = now;
117     latestTime_ = now;
118     bucketIdx = getBucketIdx(now);
119   } else if (now > latestTime_) {
120     // More recent time.  Need to update the buckets.
121     bucketIdx = updateBuckets(now);
122   } else if (LIKELY(now == latestTime_)) {
123     // Current time.
124     bucketIdx = getBucketIdx(now);
125   } else {
126     // An earlier time in the past.  We need to check if this time still falls
127     // within our window.
128     if (now < getEarliestTimeNonEmpty()) {
129       return false;
130     }
131     bucketIdx = getBucketIdx(now);
132   }
133
134   total_.add(total, nsamples);
135   buckets_[bucketIdx].add(total, nsamples);
136   return true;
137 }
138
139 template <typename VT, typename CT>
140 size_t BucketedTimeSeries<VT, CT>::update(TimePoint now) {
141   if (empty()) {
142     // This is the first data point.
143     firstTime_ = now;
144   }
145
146   // For all-time data, all we need to do is update latestTime_
147   if (isAllTime()) {
148     latestTime_ = std::max(latestTime_, now);
149     return 0;
150   }
151
152   // Make sure time doesn't go backwards.
153   // If the time is less than or equal to the latest time we have already seen,
154   // we don't need to do anything.
155   if (now <= latestTime_) {
156     return getBucketIdx(latestTime_);
157   }
158
159   return updateBuckets(now);
160 }
161
162 template <typename VT, typename CT>
163 size_t BucketedTimeSeries<VT, CT>::updateBuckets(TimePoint now) {
164   // We could cache nextBucketStart as a member variable, so we don't have to
165   // recompute it each time update() is called with a new timestamp value.
166   // This makes things faster when update() (or addValue()) is called once
167   // per second, but slightly slower when update() is called multiple times a
168   // second.  We care more about optimizing the cases where addValue() is being
169   // called frequently.  If addValue() is only being called once every few
170   // seconds, it doesn't matter as much if it is fast.
171
172   // Get info about the bucket that latestTime_ points at
173   size_t currentBucket;
174   TimePoint currentBucketStart;
175   TimePoint nextBucketStart;
176   getBucketInfo(
177       latestTime_, &currentBucket, &currentBucketStart, &nextBucketStart);
178
179   // Update latestTime_
180   latestTime_ = now;
181
182   if (now < nextBucketStart) {
183     // We're still in the same bucket.
184     // We're done after updating latestTime_.
185     return currentBucket;
186   } else if (now >= currentBucketStart + duration_) {
187     // It's been a while.  We have wrapped, and all of the buckets need to be
188     // cleared.
189     for (Bucket& bucket : buckets_) {
190       bucket.clear();
191     }
192     total_.clear();
193     return getBucketIdx(latestTime_);
194   } else {
195     // clear all the buckets between the last time and current time, meaning
196     // buckets in the range [(currentBucket+1), newBucket]. Note that
197     // the bucket (currentBucket+1) is always the oldest bucket we have. Since
198     // our array is circular, loop when we reach the end.
199     size_t newBucket = getBucketIdx(now);
200     size_t idx = currentBucket;
201     while (idx != newBucket) {
202       ++idx;
203       if (idx >= buckets_.size()) {
204         idx = 0;
205       }
206       total_ -= buckets_[idx];
207       buckets_[idx].clear();
208     }
209     return newBucket;
210   }
211 }
212
213 template <typename VT, typename CT>
214 void BucketedTimeSeries<VT, CT>::clear() {
215   for (Bucket& bucket : buckets_) {
216     bucket.clear();
217   }
218   total_.clear();
219   // Set firstTime_ larger than latestTime_,
220   // to indicate that the timeseries is empty
221   firstTime_ = TimePoint(Duration(1));
222   latestTime_ = TimePoint();
223 }
224
225 template <typename VT, typename CT>
226 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTime() const {
227   if (empty()) {
228     return TimePoint();
229   }
230   if (isAllTime()) {
231     return firstTime_;
232   }
233
234   // Compute the earliest time we can track
235   TimePoint earliestTime = getEarliestTimeNonEmpty();
236
237   // We're never tracking data before firstTime_
238   earliestTime = std::max(earliestTime, firstTime_);
239
240   return earliestTime;
241 }
242
243 template <typename VT, typename CT>
244 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTimeNonEmpty()
245     const {
246   size_t currentBucket;
247   TimePoint currentBucketStart;
248   TimePoint nextBucketStart;
249   getBucketInfo(
250       latestTime_, &currentBucket, &currentBucketStart, &nextBucketStart);
251
252   // Subtract 1 duration from the start of the next bucket to find the
253   // earliest possible data point we could be tracking.
254   return nextBucketStart - duration_;
255 }
256
257 template <typename VT, typename CT>
258 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed() const {
259   if (empty()) {
260     return Duration(0);
261   }
262
263   // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
264   return latestTime_ - getEarliestTime() + Duration(1);
265 }
266
267 template <typename VT, typename CT>
268 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed(
269     TimePoint start,
270     TimePoint end) const {
271   if (empty()) {
272     return Duration(0);
273   }
274   start = std::max(start, getEarliestTime());
275   end = std::min(end, latestTime_ + Duration(1));
276   end = std::max(start, end);
277   return end - start;
278 }
279
280 template <typename VT, typename CT>
281 VT BucketedTimeSeries<VT, CT>::sum(TimePoint start, TimePoint end) const {
282   ValueType total = ValueType();
283   forEachBucket(
284       start,
285       end,
286       [&](const Bucket& bucket,
287           TimePoint bucketStart,
288           TimePoint nextBucketStart) -> bool {
289         total += this->rangeAdjust(
290             bucketStart, nextBucketStart, start, end, bucket.sum);
291         return true;
292       });
293
294   return total;
295 }
296
297 template <typename VT, typename CT>
298 uint64_t BucketedTimeSeries<VT, CT>::count(TimePoint start, TimePoint end)
299     const {
300   uint64_t sample_count = 0;
301   forEachBucket(
302       start,
303       end,
304       [&](const Bucket& bucket,
305           TimePoint bucketStart,
306           TimePoint nextBucketStart) -> bool {
307         sample_count += this->rangeAdjust(
308             bucketStart, nextBucketStart, start, end, ValueType(bucket.count));
309         return true;
310       });
311
312   return sample_count;
313 }
314
315 template <typename VT, typename CT>
316 template <typename ReturnType>
317 ReturnType BucketedTimeSeries<VT, CT>::avg(TimePoint start, TimePoint end)
318     const {
319   ValueType total = ValueType();
320   uint64_t sample_count = 0;
321   forEachBucket(
322       start,
323       end,
324       [&](const Bucket& bucket,
325           TimePoint bucketStart,
326           TimePoint nextBucketStart) -> bool {
327         total += this->rangeAdjust(
328             bucketStart, nextBucketStart, start, end, bucket.sum);
329         sample_count += this->rangeAdjust(
330             bucketStart, nextBucketStart, start, end, ValueType(bucket.count));
331         return true;
332       });
333
334   if (sample_count == 0) {
335     return ReturnType(0);
336   }
337
338   return detail::avgHelper<ReturnType>(total, sample_count);
339 }
340
341 /*
342  * A note about some of the bucket index calculations below:
343  *
344  * buckets_.size() may not divide evenly into duration_.  When this happens,
345  * some buckets will be wider than others.  We still want to spread the data
346  * out as evenly as possible among the buckets (as opposed to just making the
347  * last bucket be significantly wider than all of the others).
348  *
349  * To make the division work out, we pretend that the buckets are each
350  * duration_ wide, so that the overall duration becomes
351  * buckets.size() * duration_.
352  *
353  * To transform a real timestamp into the scale used by our buckets,
354  * we have to multiply by buckets_.size().  To figure out which bucket it goes
355  * into, we then divide by duration_.
356  */
357
358 template <typename VT, typename CT>
359 size_t BucketedTimeSeries<VT, CT>::getBucketIdx(TimePoint time) const {
360   // For all-time data we don't use buckets_.  Everything is tracked in total_.
361   DCHECK(!isAllTime());
362
363   auto timeIntoCurrentCycle = (time.time_since_epoch() % duration_);
364   return timeIntoCurrentCycle.count() * buckets_.size() / duration_.count();
365 }
366
367 /*
368  * Compute the bucket index for the specified time, as well as the earliest
369  * time that falls into this bucket.
370  */
371 template <typename VT, typename CT>
372 void BucketedTimeSeries<VT, CT>::getBucketInfo(
373     TimePoint time,
374     size_t* bucketIdx,
375     TimePoint* bucketStart,
376     TimePoint* nextBucketStart) const {
377   typedef typename Duration::rep TimeInt;
378   DCHECK(!isAllTime());
379
380   // Keep these two lines together.  The compiler should be able to compute
381   // both the division and modulus with a single operation.
382   Duration timeMod = time.time_since_epoch() % duration_;
383   TimeInt numFullDurations = time.time_since_epoch() / duration_;
384
385   TimeInt scaledTime = timeMod.count() * TimeInt(buckets_.size());
386
387   // Keep these two lines together.  The compiler should be able to compute
388   // both the division and modulus with a single operation.
389   *bucketIdx = size_t(scaledTime / duration_.count());
390   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
391
392   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
393   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
394
395   Duration bucketStartMod(
396       (scaledBucketStart + buckets_.size() - 1) / buckets_.size());
397   Duration nextBucketStartMod(
398       (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size());
399
400   TimePoint durationStart(numFullDurations * duration_);
401   *bucketStart = bucketStartMod + durationStart;
402   *nextBucketStart = nextBucketStartMod + durationStart;
403 }
404
405 template <typename VT, typename CT>
406 template <typename Function>
407 void BucketedTimeSeries<VT, CT>::forEachBucket(Function fn) const {
408   if (isAllTime()) {
409     fn(total_, firstTime_, latestTime_ + Duration(1));
410     return;
411   }
412
413   typedef typename Duration::rep TimeInt;
414
415   // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
416   // the same way as in getBucketInfo().
417   Duration timeMod = latestTime_.time_since_epoch() % duration_;
418   TimeInt numFullDurations = latestTime_.time_since_epoch() / duration_;
419   TimePoint durationStart(numFullDurations * duration_);
420   TimeInt scaledTime = timeMod.count() * TimeInt(buckets_.size());
421   size_t latestBucketIdx = size_t(scaledTime / duration_.count());
422   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
423   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
424   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
425
426   // Walk through the buckets, starting one past the current bucket.
427   // The next bucket is from the previous cycle, so subtract 1 duration
428   // from durationStart.
429   size_t idx = latestBucketIdx;
430   durationStart -= duration_;
431
432   TimePoint nextBucketStart =
433       Duration(
434           (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
435       durationStart;
436   while (true) {
437     ++idx;
438     if (idx >= buckets_.size()) {
439       idx = 0;
440       durationStart += duration_;
441       scaledNextBucketStart = duration_.count();
442     } else {
443       scaledNextBucketStart += duration_.count();
444     }
445
446     TimePoint bucketStart = nextBucketStart;
447     nextBucketStart =
448         Duration(
449             (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
450         durationStart;
451
452     // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
453     // For now we go ahead and invoke the function with these buckets.
454     // sum and count should always be 0 in these buckets.
455
456     DCHECK_LE(
457         bucketStart.time_since_epoch().count(),
458         latestTime_.time_since_epoch().count());
459     bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
460     if (!ret) {
461       break;
462     }
463
464     if (idx == latestBucketIdx) {
465       // all done
466       break;
467     }
468   }
469 }
470
471 /*
472  * Adjust the input value from the specified bucket to only account
473  * for the desired range.
474  *
475  * For example, if the bucket spans time [10, 20), but we only care about the
476  * range [10, 16), this will return 60% of the input value.
477  */
478 template <typename VT, typename CT>
479 VT BucketedTimeSeries<VT, CT>::rangeAdjust(
480     TimePoint bucketStart,
481     TimePoint nextBucketStart,
482     TimePoint start,
483     TimePoint end,
484     ValueType input) const {
485   // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
486   // if it were latestTime_.  This makes us more accurate when someone is
487   // querying for all of the data up to latestTime_.  Even though latestTime_
488   // may only be partially through the bucket, we don't want to adjust
489   // downwards in this case, because the bucket really only has data up to
490   // latestTime_.
491   if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
492     nextBucketStart = latestTime_ + Duration(1);
493   }
494
495   if (start <= bucketStart && end >= nextBucketStart) {
496     // The bucket is wholly contained in the [start, end) interval
497     return input;
498   }
499
500   TimePoint intervalStart = std::max(start, bucketStart);
501   TimePoint intervalEnd = std::min(end, nextBucketStart);
502   return input * (intervalEnd - intervalStart) /
503       (nextBucketStart - bucketStart);
504 }
505
506 template <typename VT, typename CT>
507 template <typename Function>
508 void BucketedTimeSeries<VT, CT>::forEachBucket(
509     TimePoint start,
510     TimePoint end,
511     Function fn) const {
512   forEachBucket(
513       [&start, &end, &fn](
514           const Bucket& bucket,
515           TimePoint bucketStart,
516           TimePoint nextBucketStart) -> bool {
517         if (start >= nextBucketStart) {
518           return true;
519         }
520         if (end <= bucketStart) {
521           return false;
522         }
523         bool ret = fn(bucket, bucketStart, nextBucketStart);
524         return ret;
525       });
526 }
527
528 } // namespace folly