race in Future destructor
[folly.git] / folly / stats / BucketedTimeSeries-defs.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
18 #define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
19
20 #include <glog/logging.h>
21
22 namespace folly {
23
24 template <typename VT, typename TT>
25 BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
26                                                TimeType duration)
27   : firstTime_(1),
28     latestTime_(0),
29     duration_(duration) {
30   // For tracking all-time data we only use total_, and don't need to bother
31   // with buckets_
32   if (!isAllTime()) {
33     // Round numBuckets down to duration_.count().
34     //
35     // There is no point in having more buckets than our timestamp
36     // granularity: otherwise we would have buckets that could never be used.
37     if (numBuckets > duration_.count()) {
38       numBuckets = duration_.count();
39     }
40
41     buckets_.resize(numBuckets, Bucket());
42   }
43 }
44
45 template <typename VT, typename TT>
46 void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
47   addValueAggregated(now, val, 1);
48 }
49
50 template <typename VT, typename TT>
51 void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
52                                           const ValueType& val,
53                                           int64_t times) {
54   addValueAggregated(now, val * times, times);
55 }
56
57 template <typename VT, typename TT>
58 void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
59                                                     const ValueType& sum,
60                                                     int64_t nsamples) {
61   // Make sure time doesn't go backwards
62   now = std::max(now, latestTime_);
63
64   if (isAllTime()) {
65     if (empty()) {
66       firstTime_ = now;
67     }
68     latestTime_ = now;
69     total_.add(sum, nsamples);
70     return;
71   }
72
73   // Update the buckets
74   size_t curBucket = update(now);
75   buckets_[curBucket].add(sum, nsamples);
76
77   // Update the aggregate sum/count
78   total_.add(sum, nsamples);
79 }
80
81 template <typename VT, typename TT>
82 size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
83   if (empty()) {
84     // This is the first data point.
85     firstTime_ = now;
86   }
87
88   // For all-time data, all we need to do is update latestTime_
89   if (isAllTime()) {
90     latestTime_ = std::max(latestTime_, now);
91     return 0;
92   }
93
94   // Make sure time doesn't go backwards.
95   // If the time is less than or equal to the latest time we have already seen,
96   // we don't need to do anything.
97   if (now <= latestTime_) {
98     return getBucketIdx(latestTime_);
99   }
100
101   // We could cache nextBucketStart as a member variable, so we don't have to
102   // recompute it each time update() is called with a new timestamp value.
103   // This makes things faster when update() (or addValue()) is called once
104   // per second, but slightly slower when update() is called multiple times a
105   // second.  We care more about optimizing the cases where addValue() is being
106   // called frequently.  If addValue() is only being called once every few
107   // seconds, it doesn't matter as much if it is fast.
108
109   // Get info about the bucket that latestTime_ points at
110   size_t currentBucket;
111   TimeType currentBucketStart;
112   TimeType nextBucketStart;
113   getBucketInfo(latestTime_, &currentBucket,
114                 &currentBucketStart, &nextBucketStart);
115
116   // Update latestTime_
117   latestTime_ = now;
118
119   if (now < nextBucketStart) {
120     // We're still in the same bucket.
121     // We're done after updating latestTime_.
122     return currentBucket;
123   } else if (now >= currentBucketStart + duration_) {
124     // It's been a while.  We have wrapped, and all of the buckets need to be
125     // cleared.
126     for (Bucket& bucket : buckets_) {
127       bucket.clear();
128     }
129     total_.clear();
130     return getBucketIdx(latestTime_);
131   } else {
132     // clear all the buckets between the last time and current time, meaning
133     // buckets in the range [(currentBucket+1), newBucket]. Note that
134     // the bucket (currentBucket+1) is always the oldest bucket we have. Since
135     // our array is circular, loop when we reach the end.
136     size_t newBucket = getBucketIdx(now);
137     size_t idx = currentBucket;
138     while (idx != newBucket) {
139       ++idx;
140       if (idx >= buckets_.size()) {
141         idx = 0;
142       }
143       total_ -= buckets_[idx];
144       buckets_[idx].clear();
145     }
146     return newBucket;
147   }
148 }
149
150 template <typename VT, typename TT>
151 void BucketedTimeSeries<VT, TT>::clear() {
152   for (Bucket& bucket : buckets_) {
153     bucket.clear();
154   }
155   total_.clear();
156   // Set firstTime_ larger than latestTime_,
157   // to indicate that the timeseries is empty
158   firstTime_ = TimeType(1);
159   latestTime_ = TimeType(0);
160 }
161
162
163 template <typename VT, typename TT>
164 TT BucketedTimeSeries<VT, TT>::getEarliestTime() const {
165   if (empty()) {
166     return TimeType(0);
167   }
168   if (isAllTime()) {
169     return firstTime_;
170   }
171
172   size_t currentBucket;
173   TimeType currentBucketStart;
174   TimeType nextBucketStart;
175   getBucketInfo(latestTime_, &currentBucket,
176                 &currentBucketStart, &nextBucketStart);
177
178   // Subtract 1 duration from the start of the next bucket to find the
179   // earliest possible data point we could be tracking.
180   TimeType earliestTime = nextBucketStart - duration_;
181
182   // We're never tracking data before firstTime_
183   earliestTime = std::max(earliestTime, firstTime_);
184
185   return earliestTime;
186 }
187
188 template <typename VT, typename TT>
189 TT BucketedTimeSeries<VT, TT>::elapsed() const {
190   if (empty()) {
191     return TimeType(0);
192   }
193
194   // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
195   return latestTime_ - getEarliestTime() + TimeType(1);
196 }
197
198 template <typename VT, typename TT>
199 TT BucketedTimeSeries<VT, TT>::elapsed(TimeType start, TimeType end) const {
200   if (empty()) {
201     return TimeType(0);
202   }
203   start = std::max(start, getEarliestTime());
204   end = std::min(end, latestTime_ + TimeType(1));
205   end = std::max(start, end);
206   return end - start;
207 }
208
209 template <typename VT, typename TT>
210 VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
211   ValueType sum = ValueType();
212   forEachBucket(start, end, [&](const Bucket& bucket,
213                                 TimeType bucketStart,
214                                 TimeType nextBucketStart) -> bool {
215     sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
216                              bucket.sum);
217     return true;
218   });
219
220   return sum;
221 }
222
223 template <typename VT, typename TT>
224 uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
225   uint64_t count = 0;
226   forEachBucket(start, end, [&](const Bucket& bucket,
227                                 TimeType bucketStart,
228                                 TimeType nextBucketStart) -> bool {
229     count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
230                                bucket.count);
231     return true;
232   });
233
234   return count;
235 }
236
237 template <typename VT, typename TT>
238 template <typename ReturnType>
239 ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
240   ValueType sum = ValueType();
241   uint64_t count = 0;
242   forEachBucket(start, end, [&](const Bucket& bucket,
243                                 TimeType bucketStart,
244                                 TimeType nextBucketStart) -> bool {
245     sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
246                              bucket.sum);
247     count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
248                                bucket.count);
249     return true;
250   });
251
252   if (count == 0) {
253     return ReturnType(0);
254   }
255
256   return detail::avgHelper<ReturnType>(sum, count);
257 }
258
259 /*
260  * A note about some of the bucket index calculations below:
261  *
262  * buckets_.size() may not divide evenly into duration_.  When this happens,
263  * some buckets will be wider than others.  We still want to spread the data
264  * out as evenly as possible among the buckets (as opposed to just making the
265  * last bucket be significantly wider than all of the others).
266  *
267  * To make the division work out, we pretend that the buckets are each
268  * duration_ wide, so that the overall duration becomes
269  * buckets.size() * duration_.
270  *
271  * To transform a real timestamp into the scale used by our buckets,
272  * we have to multiply by buckets_.size().  To figure out which bucket it goes
273  * into, we then divide by duration_.
274  */
275
276 template <typename VT, typename TT>
277 size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
278   // For all-time data we don't use buckets_.  Everything is tracked in total_.
279   DCHECK(!isAllTime());
280
281   time %= duration_;
282   return time.count() * buckets_.size() / duration_.count();
283 }
284
285 /*
286  * Compute the bucket index for the specified time, as well as the earliest
287  * time that falls into this bucket.
288  */
289 template <typename VT, typename TT>
290 void BucketedTimeSeries<VT, TT>::getBucketInfo(
291     TimeType time, size_t *bucketIdx,
292     TimeType* bucketStart, TimeType* nextBucketStart) const {
293   typedef typename TimeType::rep TimeInt;
294   DCHECK(!isAllTime());
295
296   // Keep these two lines together.  The compiler should be able to compute
297   // both the division and modulus with a single operation.
298   TimeType timeMod = time % duration_;
299   TimeInt numFullDurations = time / duration_;
300
301   TimeInt scaledTime = timeMod.count() * buckets_.size();
302
303   // Keep these two lines together.  The compiler should be able to compute
304   // both the division and modulus with a single operation.
305   *bucketIdx = scaledTime / duration_.count();
306   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
307
308   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
309   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
310
311   TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
312                           buckets_.size());
313   TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
314                               buckets_.size());
315
316   TimeType durationStart(numFullDurations * duration_.count());
317   *bucketStart = bucketStartMod + durationStart;
318   *nextBucketStart = nextBucketStartMod + durationStart;
319 }
320
321 template <typename VT, typename TT>
322 template <typename Function>
323 void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
324   if (isAllTime()) {
325     fn(total_, firstTime_, latestTime_ + TimeType(1));
326     return;
327   }
328
329   typedef typename TimeType::rep TimeInt;
330
331   // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
332   // the same way as in getBucketInfo().
333   TimeType timeMod = latestTime_ % duration_;
334   TimeInt numFullDurations = latestTime_ / duration_;
335   TimeType durationStart(numFullDurations * duration_.count());
336   TimeInt scaledTime = timeMod.count() * buckets_.size();
337   size_t latestBucketIdx = scaledTime / duration_.count();
338   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
339   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
340   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
341
342   // Walk through the buckets, starting one past the current bucket.
343   // The next bucket is from the previous cycle, so subtract 1 duration
344   // from durationStart.
345   size_t idx = latestBucketIdx;
346   durationStart -= duration_;
347
348   TimeType nextBucketStart =
349     TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
350     durationStart;
351   while (true) {
352     ++idx;
353     if (idx >= buckets_.size()) {
354       idx = 0;
355       durationStart += duration_;
356       scaledNextBucketStart = duration_.count();
357     } else {
358       scaledNextBucketStart += duration_.count();
359     }
360
361     TimeType bucketStart = nextBucketStart;
362     nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
363                                buckets_.size()) + durationStart;
364
365     // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
366     // For now we go ahead and invoke the function with these buckets.
367     // sum and count should always be 0 in these buckets.
368
369     DCHECK_LE(bucketStart.count(), latestTime_.count());
370     bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
371     if (!ret) {
372       break;
373     }
374
375     if (idx == latestBucketIdx) {
376       // all done
377       break;
378     }
379   }
380 }
381
382 /*
383  * Adjust the input value from the specified bucket to only account
384  * for the desired range.
385  *
386  * For example, if the bucket spans time [10, 20), but we only care about the
387  * range [10, 16), this will return 60% of the input value.
388  */
389 template<typename VT, typename TT>
390 VT BucketedTimeSeries<VT, TT>::rangeAdjust(
391     TimeType bucketStart, TimeType nextBucketStart,
392     TimeType start, TimeType end, ValueType input) const {
393   // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
394   // if it were latestTime_.  This makes us more accurate when someone is
395   // querying for all of the data up to latestTime_.  Even though latestTime_
396   // may only be partially through the bucket, we don't want to adjust
397   // downwards in this case, because the bucket really only has data up to
398   // latestTime_.
399   if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
400     nextBucketStart = latestTime_ + TimeType(1);
401   }
402
403   if (start <= bucketStart && end >= nextBucketStart) {
404     // The bucket is wholly contained in the [start, end) interval
405     return input;
406   }
407
408   TimeType intervalStart = std::max(start, bucketStart);
409   TimeType intervalEnd = std::min(end, nextBucketStart);
410   return input * (intervalEnd - intervalStart) /
411     (nextBucketStart - bucketStart);
412 }
413
414 template <typename VT, typename TT>
415 template <typename Function>
416 void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
417                                                Function fn) const {
418   forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
419                                      TimeType nextBucketStart) -> bool {
420     if (start >= nextBucketStart) {
421       return true;
422     }
423     if (end <= bucketStart) {
424       return false;
425     }
426     bool ret = fn(bucket, bucketStart, nextBucketStart);
427     return ret;
428   });
429 }
430
431 } // folly
432
433 #endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_