Copyright 2012 -> 2013
[folly.git] / folly / stats / BucketedTimeSeries-defs.h
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
18 #define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
19
20 #include <glog/logging.h>
21
22 namespace folly {
23
24 template <typename VT, typename TT>
25 BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
26                                                TimeType duration)
27   : firstTime_(1),
28     latestTime_(0),
29     duration_(duration) {
30   // For tracking all-time data we only use total_, and don't need to bother
31   // with buckets_
32   if (!isAllTime()) {
33     // Round numBuckets down to duration_.count().
34     //
35     // There is no point in having more buckets than our timestamp
36     // granularity: otherwise we would have buckets that could never be used.
37     if (numBuckets > duration_.count()) {
38       numBuckets = duration_.count();
39     }
40
41     buckets_.resize(numBuckets, Bucket());
42   }
43 }
44
45 template <typename VT, typename TT>
46 void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
47   addValueAggregated(now, val, 1);
48 }
49
50 template <typename VT, typename TT>
51 void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
52                                           const ValueType& val,
53                                           int64_t times) {
54   addValueAggregated(now, val * times, times);
55 }
56
57 template <typename VT, typename TT>
58 void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
59                                                     const ValueType& sum,
60                                                     int64_t nsamples) {
61   // Make sure time doesn't go backwards
62   now = std::max(now, latestTime_);
63
64   if (isAllTime()) {
65     if (empty()) {
66       firstTime_ = now;
67     }
68     latestTime_ = now;
69     total_.add(sum, nsamples);
70     return;
71   }
72
73   // Update the buckets
74   size_t curBucket = update(now);
75   buckets_[curBucket].add(sum, nsamples);
76
77   // Update the aggregate sum/count
78   total_.add(sum, nsamples);
79 }
80
81 template <typename VT, typename TT>
82 size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
83   if (empty()) {
84     // This is the first data point.
85     firstTime_ = now;
86   }
87
88   // For all-time data, all we need to do is update latestTime_
89   if (isAllTime()) {
90     latestTime_ = std::max(latestTime_, now);
91     return 0;
92   }
93
94   // Make sure time doesn't go backwards.
95   // If the time is less than or equal to the latest time we have already seen,
96   // we don't need to do anything.
97   if (now <= latestTime_) {
98     return getBucketIdx(latestTime_);
99   }
100
101   // We could cache nextBucketStart as a member variable, so we don't have to
102   // recompute it each time update() is called with a new timestamp value.
103   // This makes things faster when update() (or addValue()) is called once
104   // per second, but slightly slower when update() is called multiple times a
105   // second.  We care more about optimizing the cases where addValue() is being
106   // called frequently.  If addValue() is only being called once every few
107   // seconds, it doesn't matter as much if it is fast.
108
109   // Get info about the bucket that latestTime_ points at
110   size_t currentBucket;
111   TimeType currentBucketStart;
112   TimeType nextBucketStart;
113   getBucketInfo(latestTime_, &currentBucket,
114                 &currentBucketStart, &nextBucketStart);
115
116   // Update latestTime_
117   latestTime_ = now;
118
119   if (now < nextBucketStart) {
120     // We're still in the same bucket.
121     // We're done after updating latestTime_.
122     return currentBucket;
123   } else if (now >= currentBucketStart + duration_) {
124     // It's been a while.  We have wrapped, and all of the buckets need to be
125     // cleared.
126     for (Bucket& bucket : buckets_) {
127       bucket.clear();
128     }
129     total_.clear();
130     return getBucketIdx(latestTime_);
131   } else {
132     // clear all the buckets between the last time and current time, meaning
133     // buckets in the range [(currentBucket+1), newBucket]. Note that
134     // the bucket (currentBucket+1) is always the oldest bucket we have. Since
135     // our array is circular, loop when we reach the end.
136     size_t newBucket = getBucketIdx(now);
137     size_t idx = currentBucket;
138     while (idx != newBucket) {
139       ++idx;
140       if (idx >= buckets_.size()) {
141         idx = 0;
142       }
143       total_ -= buckets_[idx];
144       buckets_[idx].clear();
145     }
146     return newBucket;
147   }
148 }
149
150 template <typename VT, typename TT>
151 void BucketedTimeSeries<VT, TT>::clear() {
152   for (Bucket& bucket : buckets_) {
153     bucket.clear();
154   }
155   total_.clear();
156   // Set firstTime_ larger than latestTime_,
157   // to indicate that the timeseries is empty
158   firstTime_ = TimeType(1);
159   latestTime_ = TimeType(0);
160 }
161
162
163 template <typename VT, typename TT>
164 TT BucketedTimeSeries<VT, TT>::elapsed() const {
165   if (empty()) {
166     return TimeType(0);
167   }
168
169   if (isAllTime()) {
170     return latestTime_ - firstTime_ + TimeType(1);
171   }
172
173   size_t currentBucket;
174   TimeType currentBucketStart;
175   TimeType nextBucketStart;
176   getBucketInfo(latestTime_, &currentBucket,
177                 &currentBucketStart, &nextBucketStart);
178
179   // Subtract 1 duration from the start of the next bucket to find the
180   // earliest possible data point we could be tracking.
181   TimeType earliestTime = nextBucketStart - duration_;
182
183   // We're never tracking data before firstTime_
184   earliestTime = std::max(earliestTime, firstTime_);
185
186   return latestTime_ - earliestTime + TimeType(1);
187 }
188
189 template <typename VT, typename TT>
190 VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
191   ValueType sum = ValueType();
192   forEachBucket(start, end, [&](const Bucket& bucket,
193                                 TimeType bucketStart,
194                                 TimeType nextBucketStart) -> bool {
195     sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
196                              bucket.sum);
197     return true;
198   });
199
200   return sum;
201 }
202
203 template <typename VT, typename TT>
204 uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
205   uint64_t count = 0;
206   forEachBucket(start, end, [&](const Bucket& bucket,
207                                 TimeType bucketStart,
208                                 TimeType nextBucketStart) -> bool {
209     count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
210                                bucket.count);
211     return true;
212   });
213
214   return count;
215 }
216
217 template <typename VT, typename TT>
218 template <typename ReturnType>
219 ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
220   ValueType sum = ValueType();
221   uint64_t count = 0;
222   forEachBucket(start, end, [&](const Bucket& bucket,
223                                 TimeType bucketStart,
224                                 TimeType nextBucketStart) -> bool {
225     sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
226                              bucket.sum);
227     count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
228                                bucket.count);
229     return true;
230   });
231
232   if (count == 0) {
233     return ReturnType(0);
234   }
235
236   return detail::avgHelper<ReturnType>(sum, count);
237 }
238
239 /*
240  * A note about some of the bucket index calculations below:
241  *
242  * buckets_.size() may not divide evenly into duration_.  When this happens,
243  * some buckets will be wider than others.  We still want to spread the data
244  * out as evenly as possible among the buckets (as opposed to just making the
245  * last bucket be significantly wider than all of the others).
246  *
247  * To make the division work out, we pretend that the buckets are each
248  * duration_ wide, so that the overall duration becomes
249  * buckets.size() * duration_.
250  *
251  * To transform a real timestamp into the scale used by our buckets,
252  * we have to multiply by buckets_.size().  To figure out which bucket it goes
253  * into, we then divide by duration_.
254  */
255
256 template <typename VT, typename TT>
257 size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
258   // For all-time data we don't use buckets_.  Everything is tracked in total_.
259   DCHECK(!isAllTime());
260
261   time %= duration_;
262   return time.count() * buckets_.size() / duration_.count();
263 }
264
265 /*
266  * Compute the bucket index for the specified time, as well as the earliest
267  * time that falls into this bucket.
268  */
269 template <typename VT, typename TT>
270 void BucketedTimeSeries<VT, TT>::getBucketInfo(
271     TimeType time, size_t *bucketIdx,
272     TimeType* bucketStart, TimeType* nextBucketStart) const {
273   typedef typename TimeType::rep TimeInt;
274   DCHECK(!isAllTime());
275
276   // Keep these two lines together.  The compiler should be able to compute
277   // both the division and modulus with a single operation.
278   TimeType timeMod = time % duration_;
279   TimeInt numFullDurations = time / duration_;
280
281   TimeInt scaledTime = timeMod.count() * buckets_.size();
282
283   // Keep these two lines together.  The compiler should be able to compute
284   // both the division and modulus with a single operation.
285   *bucketIdx = scaledTime / duration_.count();
286   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
287
288   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
289   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
290
291   TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
292                           buckets_.size());
293   TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
294                               buckets_.size());
295
296   TimeType durationStart(numFullDurations * duration_.count());
297   *bucketStart = bucketStartMod + durationStart;
298   *nextBucketStart = nextBucketStartMod + durationStart;
299 }
300
301 template <typename VT, typename TT>
302 template <typename Function>
303 void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
304   if (isAllTime()) {
305     fn(total_, firstTime_, latestTime_ + TimeType(1));
306     return;
307   }
308
309   typedef typename TimeType::rep TimeInt;
310
311   // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
312   // the same way as in getBucketInfo().
313   TimeType timeMod = latestTime_ % duration_;
314   TimeInt numFullDurations = latestTime_ / duration_;
315   TimeType durationStart(numFullDurations * duration_.count());
316   TimeInt scaledTime = timeMod.count() * buckets_.size();
317   size_t latestBucketIdx = scaledTime / duration_.count();
318   TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
319   TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
320   TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
321
322   // Walk through the buckets, starting one past the current bucket.
323   // The next bucket is from the previous cycle, so subtract 1 duration
324   // from durationStart.
325   size_t idx = latestBucketIdx;
326   durationStart -= duration_;
327
328   TimeType nextBucketStart =
329     TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
330     durationStart;
331   while (true) {
332     ++idx;
333     if (idx >= buckets_.size()) {
334       idx = 0;
335       durationStart += duration_;
336       scaledNextBucketStart = duration_.count();
337     } else {
338       scaledNextBucketStart += duration_.count();
339     }
340
341     TimeType bucketStart = nextBucketStart;
342     nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
343                                buckets_.size()) + durationStart;
344
345     // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
346     // For now we go ahead and invoke the function with these buckets.
347     // sum and count should always be 0 in these buckets.
348
349     DCHECK_LE(bucketStart.count(), latestTime_.count());
350     bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
351     if (!ret) {
352       break;
353     }
354
355     if (idx == latestBucketIdx) {
356       // all done
357       break;
358     }
359   }
360 }
361
362 /*
363  * Adjust the input value from the specified bucket to only account
364  * for the desired range.
365  *
366  * For example, if the bucket spans time [10, 20), but we only care about the
367  * range [10, 16), this will return 60% of the input value.
368  */
369 template<typename VT, typename TT>
370 VT BucketedTimeSeries<VT, TT>::rangeAdjust(
371     TimeType bucketStart, TimeType nextBucketStart,
372     TimeType start, TimeType end, ValueType input) const {
373   // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
374   // if it were latestTime_.  This makes us more accurate when someone is
375   // querying for all of the data up to latestTime_.  Even though latestTime_
376   // may only be partially through the bucket, we don't want to adjust
377   // downwards in this case, because the bucket really only has data up to
378   // latestTime_.
379   if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
380     nextBucketStart = latestTime_ + TimeType(1);
381   }
382
383   if (start <= bucketStart && end >= nextBucketStart) {
384     // The bucket is wholly contained in the [start, end) interval
385     return input;
386   }
387
388   TimeType intervalStart = std::max(start, bucketStart);
389   TimeType intervalEnd = std::min(end, nextBucketStart);
390   return input * (intervalEnd - intervalStart) /
391     (nextBucketStart - bucketStart);
392 }
393
394 template <typename VT, typename TT>
395 template <typename Function>
396 void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
397                                                Function fn) const {
398   forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
399                                      TimeType nextBucketStart) -> bool {
400     if (start >= nextBucketStart) {
401       return true;
402     }
403     if (end <= bucketStart) {
404       return false;
405     }
406     bool ret = fn(bucket, bucketStart, nextBucketStart);
407     return ret;
408   });
409 }
410
411 } // folly
412
413 #endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_