2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
18 #define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
20 #include <glog/logging.h>
24 template <typename VT, typename TT>
25 BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
30 // For tracking all-time data we only use total_, and don't need to bother
33 // Round numBuckets down to duration_.count().
35 // There is no point in having more buckets than our timestamp
36 // granularity: otherwise we would have buckets that could never be used.
37 if (numBuckets > duration_.count()) {
38 numBuckets = duration_.count();
41 buckets_.resize(numBuckets, Bucket());
45 template <typename VT, typename TT>
46 void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
47 addValueAggregated(now, val, 1);
50 template <typename VT, typename TT>
51 void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
54 addValueAggregated(now, val * times, times);
57 template <typename VT, typename TT>
58 void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
61 // Make sure time doesn't go backwards
62 now = std::max(now, latestTime_);
69 total_.add(sum, nsamples);
74 size_t curBucket = update(now);
75 buckets_[curBucket].add(sum, nsamples);
77 // Update the aggregate sum/count
78 total_.add(sum, nsamples);
81 template <typename VT, typename TT>
82 size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
84 // This is the first data point.
88 // For all-time data, all we need to do is update latestTime_
90 latestTime_ = std::max(latestTime_, now);
94 // Make sure time doesn't go backwards.
95 // If the time is less than or equal to the latest time we have already seen,
96 // we don't need to do anything.
97 if (now <= latestTime_) {
98 return getBucketIdx(latestTime_);
101 // We could cache nextBucketStart as a member variable, so we don't have to
102 // recompute it each time update() is called with a new timestamp value.
103 // This makes things faster when update() (or addValue()) is called once
104 // per second, but slightly slower when update() is called multiple times a
105 // second. We care more about optimizing the cases where addValue() is being
106 // called frequently. If addValue() is only being called once every few
107 // seconds, it doesn't matter as much if it is fast.
109 // Get info about the bucket that latestTime_ points at
110 size_t currentBucket;
111 TimeType currentBucketStart;
112 TimeType nextBucketStart;
113 getBucketInfo(latestTime_, ¤tBucket,
114 ¤tBucketStart, &nextBucketStart);
116 // Update latestTime_
119 if (now < nextBucketStart) {
120 // We're still in the same bucket.
121 // We're done after updating latestTime_.
122 return currentBucket;
123 } else if (now >= currentBucketStart + duration_) {
124 // It's been a while. We have wrapped, and all of the buckets need to be
126 for (Bucket& bucket : buckets_) {
130 return getBucketIdx(latestTime_);
132 // clear all the buckets between the last time and current time, meaning
133 // buckets in the range [(currentBucket+1), newBucket]. Note that
134 // the bucket (currentBucket+1) is always the oldest bucket we have. Since
135 // our array is circular, loop when we reach the end.
136 size_t newBucket = getBucketIdx(now);
137 size_t idx = currentBucket;
138 while (idx != newBucket) {
140 if (idx >= buckets_.size()) {
143 total_ -= buckets_[idx];
144 buckets_[idx].clear();
150 template <typename VT, typename TT>
151 void BucketedTimeSeries<VT, TT>::clear() {
152 for (Bucket& bucket : buckets_) {
156 // Set firstTime_ larger than latestTime_,
157 // to indicate that the timeseries is empty
158 firstTime_ = TimeType(1);
159 latestTime_ = TimeType(0);
163 template <typename VT, typename TT>
164 TT BucketedTimeSeries<VT, TT>::getEarliestTime() const {
172 size_t currentBucket;
173 TimeType currentBucketStart;
174 TimeType nextBucketStart;
175 getBucketInfo(latestTime_, ¤tBucket,
176 ¤tBucketStart, &nextBucketStart);
178 // Subtract 1 duration from the start of the next bucket to find the
179 // earliest possible data point we could be tracking.
180 TimeType earliestTime = nextBucketStart - duration_;
182 // We're never tracking data before firstTime_
183 earliestTime = std::max(earliestTime, firstTime_);
188 template <typename VT, typename TT>
189 TT BucketedTimeSeries<VT, TT>::elapsed() const {
194 // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
195 return latestTime_ - getEarliestTime() + TimeType(1);
198 template <typename VT, typename TT>
199 TT BucketedTimeSeries<VT, TT>::elapsed(TimeType start, TimeType end) const {
203 start = std::max(start, getEarliestTime());
204 end = std::min(end, latestTime_ + TimeType(1));
205 end = std::max(start, end);
209 template <typename VT, typename TT>
210 VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
211 ValueType sum = ValueType();
212 forEachBucket(start, end, [&](const Bucket& bucket,
213 TimeType bucketStart,
214 TimeType nextBucketStart) -> bool {
215 sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
223 template <typename VT, typename TT>
224 uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
226 forEachBucket(start, end, [&](const Bucket& bucket,
227 TimeType bucketStart,
228 TimeType nextBucketStart) -> bool {
229 count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
237 template <typename VT, typename TT>
238 template <typename ReturnType>
239 ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
240 ValueType sum = ValueType();
242 forEachBucket(start, end, [&](const Bucket& bucket,
243 TimeType bucketStart,
244 TimeType nextBucketStart) -> bool {
245 sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
247 count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
253 return ReturnType(0);
256 return detail::avgHelper<ReturnType>(sum, count);
260 * A note about some of the bucket index calculations below:
262 * buckets_.size() may not divide evenly into duration_. When this happens,
263 * some buckets will be wider than others. We still want to spread the data
264 * out as evenly as possible among the buckets (as opposed to just making the
265 * last bucket be significantly wider than all of the others).
267 * To make the division work out, we pretend that the buckets are each
268 * duration_ wide, so that the overall duration becomes
269 * buckets.size() * duration_.
271 * To transform a real timestamp into the scale used by our buckets,
272 * we have to multiply by buckets_.size(). To figure out which bucket it goes
273 * into, we then divide by duration_.
276 template <typename VT, typename TT>
277 size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
278 // For all-time data we don't use buckets_. Everything is tracked in total_.
279 DCHECK(!isAllTime());
282 return time.count() * buckets_.size() / duration_.count();
286 * Compute the bucket index for the specified time, as well as the earliest
287 * time that falls into this bucket.
289 template <typename VT, typename TT>
290 void BucketedTimeSeries<VT, TT>::getBucketInfo(
291 TimeType time, size_t *bucketIdx,
292 TimeType* bucketStart, TimeType* nextBucketStart) const {
293 typedef typename TimeType::rep TimeInt;
294 DCHECK(!isAllTime());
296 // Keep these two lines together. The compiler should be able to compute
297 // both the division and modulus with a single operation.
298 TimeType timeMod = time % duration_;
299 TimeInt numFullDurations = time / duration_;
301 TimeInt scaledTime = timeMod.count() * buckets_.size();
303 // Keep these two lines together. The compiler should be able to compute
304 // both the division and modulus with a single operation.
305 *bucketIdx = scaledTime / duration_.count();
306 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
308 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
309 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
311 TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
313 TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
316 TimeType durationStart(numFullDurations * duration_.count());
317 *bucketStart = bucketStartMod + durationStart;
318 *nextBucketStart = nextBucketStartMod + durationStart;
321 template <typename VT, typename TT>
322 template <typename Function>
323 void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
325 fn(total_, firstTime_, latestTime_ + TimeType(1));
329 typedef typename TimeType::rep TimeInt;
331 // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
332 // the same way as in getBucketInfo().
333 TimeType timeMod = latestTime_ % duration_;
334 TimeInt numFullDurations = latestTime_ / duration_;
335 TimeType durationStart(numFullDurations * duration_.count());
336 TimeInt scaledTime = timeMod.count() * buckets_.size();
337 size_t latestBucketIdx = scaledTime / duration_.count();
338 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
339 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
340 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
342 // Walk through the buckets, starting one past the current bucket.
343 // The next bucket is from the previous cycle, so subtract 1 duration
344 // from durationStart.
345 size_t idx = latestBucketIdx;
346 durationStart -= duration_;
348 TimeType nextBucketStart =
349 TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
353 if (idx >= buckets_.size()) {
355 durationStart += duration_;
356 scaledNextBucketStart = duration_.count();
358 scaledNextBucketStart += duration_.count();
361 TimeType bucketStart = nextBucketStart;
362 nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
363 buckets_.size()) + durationStart;
365 // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
366 // For now we go ahead and invoke the function with these buckets.
367 // sum and count should always be 0 in these buckets.
369 DCHECK_LE(bucketStart.count(), latestTime_.count());
370 bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
375 if (idx == latestBucketIdx) {
383 * Adjust the input value from the specified bucket to only account
384 * for the desired range.
386 * For example, if the bucket spans time [10, 20), but we only care about the
387 * range [10, 16), this will return 60% of the input value.
389 template<typename VT, typename TT>
390 VT BucketedTimeSeries<VT, TT>::rangeAdjust(
391 TimeType bucketStart, TimeType nextBucketStart,
392 TimeType start, TimeType end, ValueType input) const {
393 // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
394 // if it were latestTime_. This makes us more accurate when someone is
395 // querying for all of the data up to latestTime_. Even though latestTime_
396 // may only be partially through the bucket, we don't want to adjust
397 // downwards in this case, because the bucket really only has data up to
399 if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
400 nextBucketStart = latestTime_ + TimeType(1);
403 if (start <= bucketStart && end >= nextBucketStart) {
404 // The bucket is wholly contained in the [start, end) interval
408 TimeType intervalStart = std::max(start, bucketStart);
409 TimeType intervalEnd = std::min(end, nextBucketStart);
410 return input * (intervalEnd - intervalStart) /
411 (nextBucketStart - bucketStart);
414 template <typename VT, typename TT>
415 template <typename Function>
416 void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
418 forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
419 TimeType nextBucketStart) -> bool {
420 if (start >= nextBucketStart) {
423 if (end <= bucketStart) {
426 bool ret = fn(bucket, bucketStart, nextBucketStart);
433 #endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_