08d9c292c8524fb7f075e68288243c92015666d6
[folly.git] / folly / stats / BucketedTimeSeries-defs.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <algorithm>
20 #include <glog/logging.h>
21 #include <folly/Likely.h>
22 #include <folly/stats/BucketedTimeSeries.h>
23
24 namespace folly {
25
26 template <typename VT, typename CT>
27 BucketedTimeSeries<VT, CT>::BucketedTimeSeries(
28     size_t nBuckets,
29     Duration maxDuration)
30     : firstTime_(Duration(1)), latestTime_(), duration_(maxDuration) {
31   // For tracking all-time data we only use total_, and don't need to bother
32   // with buckets_
33   if (!isAllTime()) {
34     // Round nBuckets down to duration_.count().
35     //
36     // There is no point in having more buckets than our timestamp
37     // granularity: otherwise we would have buckets that could never be used.
38     if (nBuckets > size_t(duration_.count())) {
39       nBuckets = duration_.count();
40     }
41
42     buckets_.resize(nBuckets, Bucket());
43   }
44 }
45
46 template <typename VT, typename CT>
47 bool BucketedTimeSeries<VT, CT>::addValue(TimePoint now, const ValueType& val) {
48   return addValueAggregated(now, val, 1);
49 }
50
51 template <typename VT, typename CT>
52 bool BucketedTimeSeries<VT, CT>::addValue(
53     TimePoint now,
54     const ValueType& val,
55     int64_t times) {
56   return addValueAggregated(now, val * times, times);
57 }
58
59 template <typename VT, typename CT>
60 bool BucketedTimeSeries<VT, CT>::addValueAggregated(
61     TimePoint now,
62     const ValueType& total,
63     int64_t nsamples) {
64   if (isAllTime()) {
65     if (UNLIKELY(empty())) {
66       firstTime_ = now;
67       latestTime_ = now;
68     } else if (now > latestTime_) {
69       latestTime_ = now;
70     } else if (now < firstTime_) {
71       firstTime_ = now;
72     }
73     total_.add(total, nsamples);
74     return true;
75   }
76
77   size_t bucketIdx;
78   if (UNLIKELY(empty())) {
79     // First data point we've ever seen
80     firstTime_ = now;
81     latestTime_ = now;
82     bucketIdx = getBucketIdx(now);
83   } else if (now > latestTime_) {
84     // More recent time.  Need to update the buckets.
85     bucketIdx = updateBuckets(now);
86   } else if (LIKELY(now == latestTime_)) {
87     // Current time.
88     bucketIdx = getBucketIdx(now);
89   } else {
90     // An earlier time in the past.  We need to check if this time still falls
91     // within our window.
92     if (now < getEarliestTimeNonEmpty()) {
93       return false;
94     }
95     bucketIdx = getBucketIdx(now);
96   }
97
98   total_.add(total, nsamples);
99   buckets_[bucketIdx].add(total, nsamples);
100   return true;
101 }
102
103 template <typename VT, typename CT>
104 size_t BucketedTimeSeries<VT, CT>::update(TimePoint now) {
105   if (empty()) {
106     // This is the first data point.
107     firstTime_ = now;
108   }
109
110   // For all-time data, all we need to do is update latestTime_
111   if (isAllTime()) {
112     latestTime_ = std::max(latestTime_, now);
113     return 0;
114   }
115
116   // Make sure time doesn't go backwards.
117   // If the time is less than or equal to the latest time we have already seen,
118   // we don't need to do anything.
119   if (now <= latestTime_) {
120     return getBucketIdx(latestTime_);
121   }
122
123   return updateBuckets(now);
124 }
125
126 template <typename VT, typename CT>
127 size_t BucketedTimeSeries<VT, CT>::updateBuckets(TimePoint now) {
128   // We could cache nextBucketStart as a member variable, so we don't have to
129   // recompute it each time update() is called with a new timestamp value.
130   // This makes things faster when update() (or addValue()) is called once
131   // per second, but slightly slower when update() is called multiple times a
132   // second.  We care more about optimizing the cases where addValue() is being
133   // called frequently.  If addValue() is only being called once every few
134   // seconds, it doesn't matter as much if it is fast.
135
136   // Get info about the bucket that latestTime_ points at
137   size_t currentBucket;
138   TimePoint currentBucketStart;
139   TimePoint nextBucketStart;
140   getBucketInfo(latestTime_, &currentBucket,
141                 &currentBucketStart, &nextBucketStart);
142
143   // Update latestTime_
144   latestTime_ = now;
145
146   if (now < nextBucketStart) {
147     // We're still in the same bucket.
148     // We're done after updating latestTime_.
149     return currentBucket;
150   } else if (now >= currentBucketStart + duration_) {
151     // It's been a while.  We have wrapped, and all of the buckets need to be
152     // cleared.
153     for (Bucket& bucket : buckets_) {
154       bucket.clear();
155     }
156     total_.clear();
157     return getBucketIdx(latestTime_);
158   } else {
159     // clear all the buckets between the last time and current time, meaning
160     // buckets in the range [(currentBucket+1), newBucket]. Note that
161     // the bucket (currentBucket+1) is always the oldest bucket we have. Since
162     // our array is circular, loop when we reach the end.
163     size_t newBucket = getBucketIdx(now);
164     size_t idx = currentBucket;
165     while (idx != newBucket) {
166       ++idx;
167       if (idx >= buckets_.size()) {
168         idx = 0;
169       }
170       total_ -= buckets_[idx];
171       buckets_[idx].clear();
172     }
173     return newBucket;
174   }
175 }
176
177 template <typename VT, typename CT>
178 void BucketedTimeSeries<VT, CT>::clear() {
179   for (Bucket& bucket : buckets_) {
180     bucket.clear();
181   }
182   total_.clear();
183   // Set firstTime_ larger than latestTime_,
184   // to indicate that the timeseries is empty
185   firstTime_ = TimePoint(Duration(1));
186   latestTime_ = TimePoint();
187 }
188
189 template <typename VT, typename CT>
190 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTime() const {
191   if (empty()) {
192     return TimePoint();
193   }
194   if (isAllTime()) {
195     return firstTime_;
196   }
197
198   // Compute the earliest time we can track
199   TimePoint earliestTime = getEarliestTimeNonEmpty();
200
201   // We're never tracking data before firstTime_
202   earliestTime = std::max(earliestTime, firstTime_);
203
204   return earliestTime;
205 }
206
207 template <typename VT, typename CT>
208 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTimeNonEmpty()
209     const {
210   size_t currentBucket;
211   TimePoint currentBucketStart;
212   TimePoint nextBucketStart;
213   getBucketInfo(latestTime_, &currentBucket,
214                 &currentBucketStart, &nextBucketStart);
215
216   // Subtract 1 duration from the start of the next bucket to find the
217   // earliest possible data point we could be tracking.
218   return nextBucketStart - duration_;
219 }
220
221 template <typename VT, typename CT>
222 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed() const {
223   if (empty()) {
224     return Duration(0);
225   }
226
227   // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
228   return latestTime_ - getEarliestTime() + Duration(1);
229 }
230
231 template <typename VT, typename CT>
232 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed(
233     TimePoint start,
234     TimePoint end) const {
235   if (empty()) {
236     return Duration(0);
237   }
238   start = std::max(start, getEarliestTime());
239   end = std::min(end, latestTime_ + Duration(1));
240   end = std::max(start, end);
241   return end - start;
242 }
243
244 template <typename VT, typename CT>
245 VT BucketedTimeSeries<VT, CT>::sum(TimePoint start, TimePoint end) const {
246   ValueType total = ValueType();
247   forEachBucket(
248       start,
249       end,
250       [&](const Bucket& bucket,
251           TimePoint bucketStart,
252           TimePoint nextBucketStart) -> bool {
253         total += this->rangeAdjust(
254             bucketStart, nextBucketStart, start, end, bucket.sum);
255         return true;
256       });
257
258   return total;
259 }
260
261 template <typename VT, typename CT>
262 uint64_t BucketedTimeSeries<VT, CT>::count(TimePoint start, TimePoint end)
263     const {
264   uint64_t sample_count = 0;
265   forEachBucket(
266       start,
267       end,
268       [&](const Bucket& bucket,
269           TimePoint bucketStart,
270           TimePoint nextBucketStart) -> bool {
271         sample_count += this->rangeAdjust(
272             bucketStart, nextBucketStart, start, end, bucket.count);
273         return true;
274       });
275
276   return sample_count;
277 }
278
279 template <typename VT, typename CT>
280 template <typename ReturnType>
281 ReturnType BucketedTimeSeries<VT, CT>::avg(TimePoint start, TimePoint end)
282     const {
283   ValueType total = ValueType();
284   uint64_t sample_count = 0;
285   forEachBucket(
286       start,
287       end,
288       [&](const Bucket& bucket,
289           TimePoint bucketStart,
290           TimePoint nextBucketStart) -> bool {
291         total += this->rangeAdjust(
292             bucketStart, nextBucketStart, start, end, bucket.sum);
293         sample_count += this->rangeAdjust(
294             bucketStart, nextBucketStart, start, end, bucket.count);
295         return true;
296       });
297
298   if (sample_count == 0) {
299     return ReturnType(0);
300   }
301
302   return detail::avgHelper<ReturnType>(total, sample_count);
303 }
304
305 /*
306  * A note about some of the bucket index calculations below:
307  *
308  * buckets_.size() may not divide evenly into duration_.  When this happens,
309  * some buckets will be wider than others.  We still want to spread the data
310  * out as evenly as possible among the buckets (as opposed to just making the
311  * last bucket be significantly wider than all of the others).
312  *
313  * To make the division work out, we pretend that the buckets are each
314  * duration_ wide, so that the overall duration becomes
315  * buckets.size() * duration_.
316  *
317  * To transform a real timestamp into the scale used by our buckets,
318  * we have to multiply by buckets_.size().  To figure out which bucket it goes
319  * into, we then divide by duration_.
320  */
321
322 template <typename VT, typename CT>
323 size_t BucketedTimeSeries<VT, CT>::getBucketIdx(TimePoint time) const {
324   // For all-time data we don't use buckets_.  Everything is tracked in total_.
325   DCHECK(!isAllTime());
326
327   auto timeIntoCurrentCycle = (time.time_since_epoch() % duration_);
328   return timeIntoCurrentCycle.count() * buckets_.size() / duration_.count();
329 }
330
331 /*
332  * Compute the bucket index for the specified time, as well as the earliest
333  * time that falls into this bucket.
334  */
335 template <typename VT, typename CT>
336 void BucketedTimeSeries<VT, CT>::getBucketInfo(
337     TimePoint time,
338     size_t* bucketIdx,
339     TimePoint* bucketStart,
340     TimePoint* nextBucketStart) const {
341   typedef typename Duration::rep TimeInt;
342   DCHECK(!isAllTime());
343
344   // Keep these two lines together.  The compiler should be able to compute
345   // both the division and modulus with a single operation.
346   Duration timeMod = time.time_since_epoch() % duration_;
347   TimeInt numFullDurations = time.time_since_epoch() / duration_;
348
349   TimeInt scaledTime = timeMod.count() * buckets_.size();
350
351   // Keep these two lines together.  The compiler should be able to compute
352   // both the division and modulus with a single operation.
353   *bucketIdx = scaledTime / duration_.count();
354   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
355
356   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
357   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
358
359   Duration bucketStartMod(
360       (scaledBucketStart + buckets_.size() - 1) / buckets_.size());
361   Duration nextBucketStartMod(
362       (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size());
363
364   TimePoint durationStart(numFullDurations * duration_);
365   *bucketStart = bucketStartMod + durationStart;
366   *nextBucketStart = nextBucketStartMod + durationStart;
367 }
368
369 template <typename VT, typename CT>
370 template <typename Function>
371 void BucketedTimeSeries<VT, CT>::forEachBucket(Function fn) const {
372   if (isAllTime()) {
373     fn(total_, firstTime_, latestTime_ + Duration(1));
374     return;
375   }
376
377   typedef typename Duration::rep TimeInt;
378
379   // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
380   // the same way as in getBucketInfo().
381   Duration timeMod = latestTime_.time_since_epoch() % duration_;
382   TimeInt numFullDurations = latestTime_.time_since_epoch() / duration_;
383   TimePoint durationStart(numFullDurations * duration_);
384   TimeInt scaledTime = timeMod.count() * buckets_.size();
385   size_t latestBucketIdx = scaledTime / duration_.count();
386   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
387   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
388   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
389
390   // Walk through the buckets, starting one past the current bucket.
391   // The next bucket is from the previous cycle, so subtract 1 duration
392   // from durationStart.
393   size_t idx = latestBucketIdx;
394   durationStart -= duration_;
395
396   TimePoint nextBucketStart =
397       Duration(
398           (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
399       durationStart;
400   while (true) {
401     ++idx;
402     if (idx >= buckets_.size()) {
403       idx = 0;
404       durationStart += duration_;
405       scaledNextBucketStart = duration_.count();
406     } else {
407       scaledNextBucketStart += duration_.count();
408     }
409
410     TimePoint bucketStart = nextBucketStart;
411     nextBucketStart =
412         Duration(
413             (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
414         durationStart;
415
416     // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
417     // For now we go ahead and invoke the function with these buckets.
418     // sum and count should always be 0 in these buckets.
419
420     DCHECK_LE(
421         bucketStart.time_since_epoch().count(),
422         latestTime_.time_since_epoch().count());
423     bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
424     if (!ret) {
425       break;
426     }
427
428     if (idx == latestBucketIdx) {
429       // all done
430       break;
431     }
432   }
433 }
434
435 /*
436  * Adjust the input value from the specified bucket to only account
437  * for the desired range.
438  *
439  * For example, if the bucket spans time [10, 20), but we only care about the
440  * range [10, 16), this will return 60% of the input value.
441  */
442 template <typename VT, typename CT>
443 VT BucketedTimeSeries<VT, CT>::rangeAdjust(
444     TimePoint bucketStart,
445     TimePoint nextBucketStart,
446     TimePoint start,
447     TimePoint end,
448     ValueType input) const {
449   // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
450   // if it were latestTime_.  This makes us more accurate when someone is
451   // querying for all of the data up to latestTime_.  Even though latestTime_
452   // may only be partially through the bucket, we don't want to adjust
453   // downwards in this case, because the bucket really only has data up to
454   // latestTime_.
455   if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
456     nextBucketStart = latestTime_ + Duration(1);
457   }
458
459   if (start <= bucketStart && end >= nextBucketStart) {
460     // The bucket is wholly contained in the [start, end) interval
461     return input;
462   }
463
464   TimePoint intervalStart = std::max(start, bucketStart);
465   TimePoint intervalEnd = std::min(end, nextBucketStart);
466   return input * (intervalEnd - intervalStart) /
467     (nextBucketStart - bucketStart);
468 }
469
470 template <typename VT, typename CT>
471 template <typename Function>
472 void BucketedTimeSeries<VT, CT>::forEachBucket(
473     TimePoint start,
474     TimePoint end,
475     Function fn) const {
476   forEachBucket(
477       [&start, &end, &fn](
478           const Bucket& bucket,
479           TimePoint bucketStart,
480           TimePoint nextBucketStart) -> bool {
481         if (start >= nextBucketStart) {
482           return true;
483         }
484         if (end <= bucketStart) {
485           return false;
486         }
487         bool ret = fn(bucket, bucketStart, nextBucketStart);
488         return ret;
489       });
490 }
491
492 } // folly