2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <glog/logging.h>
21 #include <folly/Likely.h>
25 template <typename VT, typename CT>
26 BucketedTimeSeries<VT, CT>::BucketedTimeSeries(
29 : firstTime_(Duration(1)), latestTime_(), duration_(maxDuration) {
30 // For tracking all-time data we only use total_, and don't need to bother
33 // Round nBuckets down to duration_.count().
35 // There is no point in having more buckets than our timestamp
36 // granularity: otherwise we would have buckets that could never be used.
37 if (nBuckets > size_t(duration_.count())) {
38 nBuckets = duration_.count();
41 buckets_.resize(nBuckets, Bucket());
45 template <typename VT, typename CT>
46 bool BucketedTimeSeries<VT, CT>::addValue(TimePoint now, const ValueType& val) {
47 return addValueAggregated(now, val, 1);
50 template <typename VT, typename CT>
51 bool BucketedTimeSeries<VT, CT>::addValue(
55 return addValueAggregated(now, val * times, times);
58 template <typename VT, typename CT>
59 bool BucketedTimeSeries<VT, CT>::addValueAggregated(
61 const ValueType& total,
64 if (UNLIKELY(empty())) {
67 } else if (now > latestTime_) {
69 } else if (now < firstTime_) {
72 total_.add(total, nsamples);
77 if (UNLIKELY(empty())) {
78 // First data point we've ever seen
81 bucketIdx = getBucketIdx(now);
82 } else if (now > latestTime_) {
83 // More recent time. Need to update the buckets.
84 bucketIdx = updateBuckets(now);
85 } else if (LIKELY(now == latestTime_)) {
87 bucketIdx = getBucketIdx(now);
89 // An earlier time in the past. We need to check if this time still falls
91 if (now < getEarliestTimeNonEmpty()) {
94 bucketIdx = getBucketIdx(now);
97 total_.add(total, nsamples);
98 buckets_[bucketIdx].add(total, nsamples);
102 template <typename VT, typename CT>
103 size_t BucketedTimeSeries<VT, CT>::update(TimePoint now) {
105 // This is the first data point.
109 // For all-time data, all we need to do is update latestTime_
111 latestTime_ = std::max(latestTime_, now);
115 // Make sure time doesn't go backwards.
116 // If the time is less than or equal to the latest time we have already seen,
117 // we don't need to do anything.
118 if (now <= latestTime_) {
119 return getBucketIdx(latestTime_);
122 return updateBuckets(now);
125 template <typename VT, typename CT>
126 size_t BucketedTimeSeries<VT, CT>::updateBuckets(TimePoint now) {
127 // We could cache nextBucketStart as a member variable, so we don't have to
128 // recompute it each time update() is called with a new timestamp value.
129 // This makes things faster when update() (or addValue()) is called once
130 // per second, but slightly slower when update() is called multiple times a
131 // second. We care more about optimizing the cases where addValue() is being
132 // called frequently. If addValue() is only being called once every few
133 // seconds, it doesn't matter as much if it is fast.
135 // Get info about the bucket that latestTime_ points at
136 size_t currentBucket;
137 TimePoint currentBucketStart;
138 TimePoint nextBucketStart;
139 getBucketInfo(latestTime_, ¤tBucket,
140 ¤tBucketStart, &nextBucketStart);
142 // Update latestTime_
145 if (now < nextBucketStart) {
146 // We're still in the same bucket.
147 // We're done after updating latestTime_.
148 return currentBucket;
149 } else if (now >= currentBucketStart + duration_) {
150 // It's been a while. We have wrapped, and all of the buckets need to be
152 for (Bucket& bucket : buckets_) {
156 return getBucketIdx(latestTime_);
158 // clear all the buckets between the last time and current time, meaning
159 // buckets in the range [(currentBucket+1), newBucket]. Note that
160 // the bucket (currentBucket+1) is always the oldest bucket we have. Since
161 // our array is circular, loop when we reach the end.
162 size_t newBucket = getBucketIdx(now);
163 size_t idx = currentBucket;
164 while (idx != newBucket) {
166 if (idx >= buckets_.size()) {
169 total_ -= buckets_[idx];
170 buckets_[idx].clear();
176 template <typename VT, typename CT>
177 void BucketedTimeSeries<VT, CT>::clear() {
178 for (Bucket& bucket : buckets_) {
182 // Set firstTime_ larger than latestTime_,
183 // to indicate that the timeseries is empty
184 firstTime_ = TimePoint(Duration(1));
185 latestTime_ = TimePoint();
188 template <typename VT, typename CT>
189 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTime() const {
197 // Compute the earliest time we can track
198 TimePoint earliestTime = getEarliestTimeNonEmpty();
200 // We're never tracking data before firstTime_
201 earliestTime = std::max(earliestTime, firstTime_);
206 template <typename VT, typename CT>
207 typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTimeNonEmpty()
209 size_t currentBucket;
210 TimePoint currentBucketStart;
211 TimePoint nextBucketStart;
212 getBucketInfo(latestTime_, ¤tBucket,
213 ¤tBucketStart, &nextBucketStart);
215 // Subtract 1 duration from the start of the next bucket to find the
216 // earliest possible data point we could be tracking.
217 return nextBucketStart - duration_;
220 template <typename VT, typename CT>
221 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed() const {
226 // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
227 return latestTime_ - getEarliestTime() + Duration(1);
230 template <typename VT, typename CT>
231 typename CT::duration BucketedTimeSeries<VT, CT>::elapsed(
233 TimePoint end) const {
237 start = std::max(start, getEarliestTime());
238 end = std::min(end, latestTime_ + Duration(1));
239 end = std::max(start, end);
243 template <typename VT, typename CT>
244 VT BucketedTimeSeries<VT, CT>::sum(TimePoint start, TimePoint end) const {
245 ValueType total = ValueType();
249 [&](const Bucket& bucket,
250 TimePoint bucketStart,
251 TimePoint nextBucketStart) -> bool {
252 total += this->rangeAdjust(
253 bucketStart, nextBucketStart, start, end, bucket.sum);
260 template <typename VT, typename CT>
261 uint64_t BucketedTimeSeries<VT, CT>::count(TimePoint start, TimePoint end)
263 uint64_t sample_count = 0;
267 [&](const Bucket& bucket,
268 TimePoint bucketStart,
269 TimePoint nextBucketStart) -> bool {
270 sample_count += this->rangeAdjust(
271 bucketStart, nextBucketStart, start, end, bucket.count);
278 template <typename VT, typename CT>
279 template <typename ReturnType>
280 ReturnType BucketedTimeSeries<VT, CT>::avg(TimePoint start, TimePoint end)
282 ValueType total = ValueType();
283 uint64_t sample_count = 0;
287 [&](const Bucket& bucket,
288 TimePoint bucketStart,
289 TimePoint nextBucketStart) -> bool {
290 total += this->rangeAdjust(
291 bucketStart, nextBucketStart, start, end, bucket.sum);
292 sample_count += this->rangeAdjust(
293 bucketStart, nextBucketStart, start, end, bucket.count);
297 if (sample_count == 0) {
298 return ReturnType(0);
301 return detail::avgHelper<ReturnType>(total, sample_count);
305 * A note about some of the bucket index calculations below:
307 * buckets_.size() may not divide evenly into duration_. When this happens,
308 * some buckets will be wider than others. We still want to spread the data
309 * out as evenly as possible among the buckets (as opposed to just making the
310 * last bucket be significantly wider than all of the others).
312 * To make the division work out, we pretend that the buckets are each
313 * duration_ wide, so that the overall duration becomes
314 * buckets.size() * duration_.
316 * To transform a real timestamp into the scale used by our buckets,
317 * we have to multiply by buckets_.size(). To figure out which bucket it goes
318 * into, we then divide by duration_.
321 template <typename VT, typename CT>
322 size_t BucketedTimeSeries<VT, CT>::getBucketIdx(TimePoint time) const {
323 // For all-time data we don't use buckets_. Everything is tracked in total_.
324 DCHECK(!isAllTime());
326 auto timeIntoCurrentCycle = (time.time_since_epoch() % duration_);
327 return timeIntoCurrentCycle.count() * buckets_.size() / duration_.count();
331 * Compute the bucket index for the specified time, as well as the earliest
332 * time that falls into this bucket.
334 template <typename VT, typename CT>
335 void BucketedTimeSeries<VT, CT>::getBucketInfo(
338 TimePoint* bucketStart,
339 TimePoint* nextBucketStart) const {
340 typedef typename Duration::rep TimeInt;
341 DCHECK(!isAllTime());
343 // Keep these two lines together. The compiler should be able to compute
344 // both the division and modulus with a single operation.
345 Duration timeMod = time.time_since_epoch() % duration_;
346 TimeInt numFullDurations = time.time_since_epoch() / duration_;
348 TimeInt scaledTime = timeMod.count() * buckets_.size();
350 // Keep these two lines together. The compiler should be able to compute
351 // both the division and modulus with a single operation.
352 *bucketIdx = scaledTime / duration_.count();
353 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
355 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
356 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
358 Duration bucketStartMod(
359 (scaledBucketStart + buckets_.size() - 1) / buckets_.size());
360 Duration nextBucketStartMod(
361 (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size());
363 TimePoint durationStart(numFullDurations * duration_);
364 *bucketStart = bucketStartMod + durationStart;
365 *nextBucketStart = nextBucketStartMod + durationStart;
368 template <typename VT, typename CT>
369 template <typename Function>
370 void BucketedTimeSeries<VT, CT>::forEachBucket(Function fn) const {
372 fn(total_, firstTime_, latestTime_ + Duration(1));
376 typedef typename Duration::rep TimeInt;
378 // Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
379 // the same way as in getBucketInfo().
380 Duration timeMod = latestTime_.time_since_epoch() % duration_;
381 TimeInt numFullDurations = latestTime_.time_since_epoch() / duration_;
382 TimePoint durationStart(numFullDurations * duration_);
383 TimeInt scaledTime = timeMod.count() * buckets_.size();
384 size_t latestBucketIdx = scaledTime / duration_.count();
385 TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
386 TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
387 TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
389 // Walk through the buckets, starting one past the current bucket.
390 // The next bucket is from the previous cycle, so subtract 1 duration
391 // from durationStart.
392 size_t idx = latestBucketIdx;
393 durationStart -= duration_;
395 TimePoint nextBucketStart =
397 (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
401 if (idx >= buckets_.size()) {
403 durationStart += duration_;
404 scaledNextBucketStart = duration_.count();
406 scaledNextBucketStart += duration_.count();
409 TimePoint bucketStart = nextBucketStart;
412 (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
415 // Should we bother skipping buckets where firstTime_ >= nextBucketStart?
416 // For now we go ahead and invoke the function with these buckets.
417 // sum and count should always be 0 in these buckets.
420 bucketStart.time_since_epoch().count(),
421 latestTime_.time_since_epoch().count());
422 bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
427 if (idx == latestBucketIdx) {
435 * Adjust the input value from the specified bucket to only account
436 * for the desired range.
438 * For example, if the bucket spans time [10, 20), but we only care about the
439 * range [10, 16), this will return 60% of the input value.
441 template <typename VT, typename CT>
442 VT BucketedTimeSeries<VT, CT>::rangeAdjust(
443 TimePoint bucketStart,
444 TimePoint nextBucketStart,
447 ValueType input) const {
448 // If nextBucketStart is greater than latestTime_, treat nextBucketStart as
449 // if it were latestTime_. This makes us more accurate when someone is
450 // querying for all of the data up to latestTime_. Even though latestTime_
451 // may only be partially through the bucket, we don't want to adjust
452 // downwards in this case, because the bucket really only has data up to
454 if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
455 nextBucketStart = latestTime_ + Duration(1);
458 if (start <= bucketStart && end >= nextBucketStart) {
459 // The bucket is wholly contained in the [start, end) interval
463 TimePoint intervalStart = std::max(start, bucketStart);
464 TimePoint intervalEnd = std::min(end, nextBucketStart);
465 return input * (intervalEnd - intervalStart) /
466 (nextBucketStart - bucketStart);
469 template <typename VT, typename CT>
470 template <typename Function>
471 void BucketedTimeSeries<VT, CT>::forEachBucket(
477 const Bucket& bucket,
478 TimePoint bucketStart,
479 TimePoint nextBucketStart) -> bool {
480 if (start >= nextBucketStart) {
483 if (end <= bucketStart) {
486 bool ret = fn(bucket, bucketStart, nextBucketStart);