update stats APIs to use TimePoint vs Duration correctly
[folly.git] / folly / stats / BucketedTimeSeries-defs.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <glog/logging.h>
21 #include <folly/Likely.h>
22
23 namespace folly {
24
25 template <typename VT, typename CT>
26 BucketedTimeSeries<VT, CT>::BucketedTimeSeries(
27     size_t nBuckets,
28     Duration maxDuration)
29     : firstTime_(Duration(1)), latestTime_(), duration_(maxDuration) {
30   // For tracking all-time data we only use total_, and don't need to bother
31   // with buckets_
32   if (!isAllTime()) {
33     // Round nBuckets down to duration_.count().
34     //
35     // There is no point in having more buckets than our timestamp
36     // granularity: otherwise we would have buckets that could never be used.
37     if (nBuckets > size_t(duration_.count())) {
38       nBuckets = duration_.count();
39     }
40
41     buckets_.resize(nBuckets, Bucket());
42   }
43 }
44
45 template <typename VT, typename CT>
46 bool BucketedTimeSeries<VT, CT>::addValue(TimePoint now, const ValueType& val) {
47   return addValueAggregated(now, val, 1);
48 }
49
50 template <typename VT, typename CT>
51 bool BucketedTimeSeries<VT, CT>::addValue(
52     TimePoint now,
53     const ValueType& val,
54     int64_t times) {
55   return addValueAggregated(now, val * times, times);
56 }
57
58 template <typename VT, typename CT>
59 bool BucketedTimeSeries<VT, CT>::addValueAggregated(
60     TimePoint now,
61     const ValueType& total,
62     int64_t nsamples) {
63   if (isAllTime()) {
64     if (UNLIKELY(empty())) {
65       firstTime_ = now;
66       latestTime_ = now;
67     } else if (now > latestTime_) {
68       latestTime_ = now;
69     } else if (now < firstTime_) {
70       firstTime_ = now;
71     }
72     total_.add(total, nsamples);
73     return true;
74   }
75
76   size_t bucketIdx;
77   if (UNLIKELY(empty())) {
78     // First data point we've ever seen
79     firstTime_ = now;
80     latestTime_ = now;
81     bucketIdx = getBucketIdx(now);
82   } else if (now > latestTime_) {
83     // More recent time.  Need to update the buckets.
84     bucketIdx = updateBuckets(now);
85   } else if (LIKELY(now == latestTime_)) {
86     // Current time.
87     bucketIdx = getBucketIdx(now);
88   } else {
89     // An earlier time in the past.  We need to check if this time still falls
90     // within our window.
91     if (now < getEarliestTimeNonEmpty()) {
92       return false;
93     }
94     bucketIdx = getBucketIdx(now);
95   }
96
97   total_.add(total, nsamples);
98   buckets_[bucketIdx].add(total, nsamples);
99   return true;
100 }
101
102 template <typename VT, typename CT>
103 size_t BucketedTimeSeries<VT, CT>::update(TimePoint now) {
104   if (empty()) {
105     // This is the first data point.
106     firstTime_ = now;
107   }
108
109   // For all-time data, all we need to do is update latestTime_
110   if (isAllTime()) {
111     latestTime_ = std::max(latestTime_, now);
112     return 0;
113   }
114
115   // Make sure time doesn't go backwards.
116   // If the time is less than or equal to the latest time we have already seen,
117   // we don't need to do anything.
118   if (now <= latestTime_) {
119     return getBucketIdx(latestTime_);
120   }
121
122   return updateBuckets(now);
123 }
124
125 template <typename VT, typename CT>
126 size_t BucketedTimeSeries<VT, CT>::updateBuckets(TimePoint now) {
127   // We could cache nextBucketStart as a member variable, so we don't have to
128   // recompute it each time update() is called with a new timestamp value.
129   // This makes things faster when update() (or addValue()) is called once
130   // per second, but slightly slower when update() is called multiple times a
131   // second.  We care more about optimizing the cases where addValue() is being
132   // called frequently.  If addValue() is only being called once every few
133   // seconds, it doesn't matter as much if it is fast.
134
135   // Get info about the bucket that latestTime_ points at
136   size_t currentBucket;
137   TimePoint currentBucketStart;
138   TimePoint nextBucketStart;
139   getBucketInfo(latestTime_, &currentBucket,
140                 &currentBucketStart, &nextBucketStart);
141
142   // Update latestTime_
143   latestTime_ = now;
144
145   if (now < nextBucketStart) {
146     // We're still in the same bucket.
147     // We're done after updating latestTime_.
148     return currentBucket;
149   } else if (now >= currentBucketStart + duration_) {
150     // It's been a while.  We have wrapped, and all of the buckets need to be
151     // cleared.
152     for (Bucket& bucket : buckets_) {
153       bucket.clear();
154     }
155     total_.clear();
156     return getBucketIdx(latestTime_);
157   } else {
158     // clear all the buckets between the last time and current time, meaning
159     // buckets in the range [(currentBucket+1), newBucket]. Note that
160     // the bucket (currentBucket+1) is always the oldest bucket we have. Since
161     // our array is circular, loop when we reach the end.
162     size_t newBucket = getBucketIdx(now);
163     size_t idx = currentBucket;
164     while (idx != newBucket) {
165       ++idx;
166       if (idx >= buckets_.size()) {
167         idx = 0;
168       }
169       total_ -= buckets_[idx];
170       buckets_[idx].clear();
171     }
172     return newBucket;
173   }
174 }
175
176 template <typename VT, typename CT>
177 void BucketedTimeSeries<VT, CT>::clear() {
178   for (Bucket& bucket : buckets_) {
179     bucket.clear();
180   }
181   total_.clear();
182   // Set firstTime_ larger than latestTime_,
183   // to indicate that the timeseries is empty
184   firstTime_ = TimePoint(Duration(1));
185   latestTime_ = TimePoint();
186 }
187
188 template <typename VT, typename CT>
189 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTime() const {
190   if (empty()) {
191     return TimePoint();
192   }
193   if (isAllTime()) {
194     return firstTime_;
195   }
196
197   // Compute the earliest time we can track
198   TimePoint earliestTime = getEarliestTimeNonEmpty();
199
200   // We're never tracking data before firstTime_
201   earliestTime = std::max(earliestTime, firstTime_);
202
203   return earliestTime;
204 }
205
206 template <typename VT, typename CT>
207 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTimeNonEmpty()
208     const {
209   size_t currentBucket;
210   TimePoint currentBucketStart;
211   TimePoint nextBucketStart;
212   getBucketInfo(latestTime_, &currentBucket,
213                 &currentBucketStart, &nextBucketStart);
214
215   // Subtract 1 duration from the start of the next bucket to find the
216   // earliest possible data point we could be tracking.
217   return nextBucketStart - duration_;
218 }
219
220 template <typename VT, typename CT>
221 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed() const {
222   if (empty()) {
223     return Duration(0);
224   }
225
226   // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
227   return latestTime_ - getEarliestTime() + Duration(1);
228 }
229
230 template <typename VT, typename CT>
231 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed(
232     TimePoint start,
233     TimePoint end) const {
234   if (empty()) {
235     return Duration(0);
236   }
237   start = std::max(start, getEarliestTime());
238   end = std::min(end, latestTime_ + Duration(1));
239   end = std::max(start, end);
240   return end - start;
241 }
242
243 template <typename VT, typename CT>
244 VT BucketedTimeSeries<VT, CT>::sum(TimePoint start, TimePoint end) const {
245   ValueType total = ValueType();
246   forEachBucket(
247       start,
248       end,
249       [&](const Bucket& bucket,
250           TimePoint bucketStart,
251           TimePoint nextBucketStart) -> bool {
252         total += this->rangeAdjust(
253             bucketStart, nextBucketStart, start, end, bucket.sum);
254         return true;
255       });
256
257   return total;
258 }
259
260 template <typename VT, typename CT>
261 uint64_t BucketedTimeSeries<VT, CT>::count(TimePoint start, TimePoint end)
262     const {
263   uint64_t sample_count = 0;
264   forEachBucket(
265       start,
266       end,
267       [&](const Bucket& bucket,
268           TimePoint bucketStart,
269           TimePoint nextBucketStart) -> bool {
270         sample_count += this->rangeAdjust(
271             bucketStart, nextBucketStart, start, end, bucket.count);
272         return true;
273       });
274
275   return sample_count;
276 }
277
278 template <typename VT, typename CT>
279 template <typename ReturnType>
280 ReturnType BucketedTimeSeries<VT, CT>::avg(TimePoint start, TimePoint end)
281     const {
282   ValueType total = ValueType();
283   uint64_t sample_count = 0;
284   forEachBucket(
285       start,
286       end,
287       [&](const Bucket& bucket,
288           TimePoint bucketStart,
289           TimePoint nextBucketStart) -> bool {
290         total += this->rangeAdjust(
291             bucketStart, nextBucketStart, start, end, bucket.sum);
292         sample_count += this->rangeAdjust(
293             bucketStart, nextBucketStart, start, end, bucket.count);
294         return true;
295       });
296
297   if (sample_count == 0) {
298     return ReturnType(0);
299   }
300
301   return detail::avgHelper<ReturnType>(total, sample_count);
302 }
303
304 /*
305  * A note about some of the bucket index calculations below:
306  *
307  * buckets_.size() may not divide evenly into duration_.  When this happens,
308  * some buckets will be wider than others.  We still want to spread the data
309  * out as evenly as possible among the buckets (as opposed to just making the
310  * last bucket be significantly wider than all of the others).
311  *
312  * To make the division work out, we pretend that the buckets are each
313  * duration_ wide, so that the overall duration becomes
314  * buckets.size() * duration_.
315  *
316  * To transform a real timestamp into the scale used by our buckets,
317  * we have to multiply by buckets_.size().  To figure out which bucket it goes
318  * into, we then divide by duration_.
319  */
320
321 template <typename VT, typename CT>
322 size_t BucketedTimeSeries<VT, CT>::getBucketIdx(TimePoint time) const {
323   // For all-time data we don't use buckets_.  Everything is tracked in total_.
324   DCHECK(!isAllTime());
325
326   auto timeIntoCurrentCycle = (time.time_since_epoch() % duration_);
327   return timeIntoCurrentCycle.count() * buckets_.size() / duration_.count();
328 }
329
330 /*
331  * Compute the bucket index for the specified time, as well as the earliest
332  * time that falls into this bucket.
333  */
334 template <typename VT, typename CT>
335 void BucketedTimeSeries<VT, CT>::getBucketInfo(
336     TimePoint time,
337     size_t* bucketIdx,
338     TimePoint* bucketStart,
339     TimePoint* nextBucketStart) const {
340   typedef typename Duration::rep TimeInt;
341   DCHECK(!isAllTime());
342
343   // Keep these two lines together.  The compiler should be able to compute
344   // both the division and modulus with a single operation.
345   Duration timeMod = time.time_since_epoch() % duration_;
346   TimeInt numFullDurations = time.time_since_epoch() / duration_;
347
348   TimeInt scaledTime = timeMod.count() * buckets_.size();
349
350   // Keep these two lines together.  The compiler should be able to compute
351   // both the division and modulus with a single operation.
352   *bucketIdx = scaledTime / duration_.count();
353   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
354
355   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
356   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
357
358   Duration bucketStartMod(
359       (scaledBucketStart + buckets_.size() - 1) / buckets_.size());
360   Duration nextBucketStartMod(
361       (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size());
362
363   TimePoint durationStart(numFullDurations * duration_);
364   *bucketStart = bucketStartMod + durationStart;
365   *nextBucketStart = nextBucketStartMod + durationStart;
366 }
367
368 template <typename VT, typename CT>
369 template <typename Function>
370 void BucketedTimeSeries<VT, CT>::forEachBucket(Function fn) const {
371   if (isAllTime()) {
372     fn(total_, firstTime_, latestTime_ + Duration(1));
373     return;
374   }
375
376   typedef typename Duration::rep TimeInt;
377
378   // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
379   // the same way as in getBucketInfo().
380   Duration timeMod = latestTime_.time_since_epoch() % duration_;
381   TimeInt numFullDurations = latestTime_.time_since_epoch() / duration_;
382   TimePoint durationStart(numFullDurations * duration_);
383   TimeInt scaledTime = timeMod.count() * buckets_.size();
384   size_t latestBucketIdx = scaledTime / duration_.count();
385   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
386   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
387   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
388
389   // Walk through the buckets, starting one past the current bucket.
390   // The next bucket is from the previous cycle, so subtract 1 duration
391   // from durationStart.
392   size_t idx = latestBucketIdx;
393   durationStart -= duration_;
394
395   TimePoint nextBucketStart =
396       Duration(
397           (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
398       durationStart;
399   while (true) {
400     ++idx;
401     if (idx >= buckets_.size()) {
402       idx = 0;
403       durationStart += duration_;
404       scaledNextBucketStart = duration_.count();
405     } else {
406       scaledNextBucketStart += duration_.count();
407     }
408
409     TimePoint bucketStart = nextBucketStart;
410     nextBucketStart =
411         Duration(
412             (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
413         durationStart;
414
415     // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
416     // For now we go ahead and invoke the function with these buckets.
417     // sum and count should always be 0 in these buckets.
418
419     DCHECK_LE(
420         bucketStart.time_since_epoch().count(),
421         latestTime_.time_since_epoch().count());
422     bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
423     if (!ret) {
424       break;
425     }
426
427     if (idx == latestBucketIdx) {
428       // all done
429       break;
430     }
431   }
432 }
433
434 /*
435  * Adjust the input value from the specified bucket to only account
436  * for the desired range.
437  *
438  * For example, if the bucket spans time [10, 20), but we only care about the
439  * range [10, 16), this will return 60% of the input value.
440  */
441 template <typename VT, typename CT>
442 VT BucketedTimeSeries<VT, CT>::rangeAdjust(
443     TimePoint bucketStart,
444     TimePoint nextBucketStart,
445     TimePoint start,
446     TimePoint end,
447     ValueType input) const {
448   // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
449   // if it were latestTime_.  This makes us more accurate when someone is
450   // querying for all of the data up to latestTime_.  Even though latestTime_
451   // may only be partially through the bucket, we don't want to adjust
452   // downwards in this case, because the bucket really only has data up to
453   // latestTime_.
454   if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
455     nextBucketStart = latestTime_ + Duration(1);
456   }
457
458   if (start <= bucketStart && end >= nextBucketStart) {
459     // The bucket is wholly contained in the [start, end) interval
460     return input;
461   }
462
463   TimePoint intervalStart = std::max(start, bucketStart);
464   TimePoint intervalEnd = std::min(end, nextBucketStart);
465   return input * (intervalEnd - intervalStart) /
466     (nextBucketStart - bucketStart);
467 }
468
469 template <typename VT, typename CT>
470 template <typename Function>
471 void BucketedTimeSeries<VT, CT>::forEachBucket(
472     TimePoint start,
473     TimePoint end,
474     Function fn) const {
475   forEachBucket(
476       [&start, &end, &fn](
477           const Bucket& bucket,
478           TimePoint bucketStart,
479           TimePoint nextBucketStart) -> bool {
480         if (start >= nextBucketStart) {
481           return true;
482         }
483         if (end <= bucketStart) {
484           return false;
485         }
486         bool ret = fn(bucket, bucketStart, nextBucketStart);
487         return ret;
488       });
489 }
490
491 } // folly