/*
- * Copyright 2012 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*/
-#ifndef FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
-#define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
+#pragma once
+#include <algorithm>
#include <glog/logging.h>
+#include <folly/Likely.h>
namespace folly {
-template <typename VT, typename TT>
-BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
- TimeType duration)
- : firstTime_(1),
- latestTime_(0),
- duration_(duration) {
+template <typename VT, typename CT>
+BucketedTimeSeries<VT, CT>::BucketedTimeSeries(
+ size_t nBuckets,
+ Duration maxDuration)
+ : firstTime_(Duration(1)), latestTime_(), duration_(maxDuration) {
// For tracking all-time data we only use total_, and don't need to bother
// with buckets_
if (!isAllTime()) {
- // Round numBuckets down to duration_.count().
+ // Round nBuckets down to duration_.count().
//
// There is no point in having more buckets than our timestamp
// granularity: otherwise we would have buckets that could never be used.
- if (numBuckets > duration_.count()) {
- numBuckets = duration_.count();
+ if (nBuckets > size_t(duration_.count())) {
+ nBuckets = duration_.count();
}
- buckets_.resize(numBuckets, Bucket());
+ buckets_.resize(nBuckets, Bucket());
}
}
-template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
- addValueAggregated(now, val, 1);
+template <typename VT, typename CT>
+bool BucketedTimeSeries<VT, CT>::addValue(TimePoint now, const ValueType& val) {
+ return addValueAggregated(now, val, 1);
}
-template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
- const ValueType& val,
- int64_t times) {
- addValueAggregated(now, val * times, times);
+template <typename VT, typename CT>
+bool BucketedTimeSeries<VT, CT>::addValue(
+ TimePoint now,
+ const ValueType& val,
+ int64_t times) {
+ return addValueAggregated(now, val * times, times);
}
-template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
- const ValueType& sum,
- int64_t nsamples) {
- // Make sure time doesn't go backwards
- now = std::max(now, latestTime_);
-
+template <typename VT, typename CT>
+bool BucketedTimeSeries<VT, CT>::addValueAggregated(
+ TimePoint now,
+ const ValueType& total,
+ int64_t nsamples) {
if (isAllTime()) {
- if (empty()) {
+ if (UNLIKELY(empty())) {
+ firstTime_ = now;
+ latestTime_ = now;
+ } else if (now > latestTime_) {
+ latestTime_ = now;
+ } else if (now < firstTime_) {
firstTime_ = now;
}
- latestTime_ = now;
- total_.add(sum, nsamples);
- return;
+ total_.add(total, nsamples);
+ return true;
}
- // Update the buckets
- size_t curBucket = update(now);
- buckets_[curBucket].add(sum, nsamples);
+ size_t bucketIdx;
+ if (UNLIKELY(empty())) {
+ // First data point we've ever seen
+ firstTime_ = now;
+ latestTime_ = now;
+ bucketIdx = getBucketIdx(now);
+ } else if (now > latestTime_) {
+ // More recent time. Need to update the buckets.
+ bucketIdx = updateBuckets(now);
+ } else if (LIKELY(now == latestTime_)) {
+ // Current time.
+ bucketIdx = getBucketIdx(now);
+ } else {
+ // An earlier time in the past. We need to check if this time still falls
+ // within our window.
+ if (now < getEarliestTimeNonEmpty()) {
+ return false;
+ }
+ bucketIdx = getBucketIdx(now);
+ }
- // Update the aggregate sum/count
- total_.add(sum, nsamples);
+ total_.add(total, nsamples);
+ buckets_[bucketIdx].add(total, nsamples);
+ return true;
}
-template <typename VT, typename TT>
-size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
+template <typename VT, typename CT>
+size_t BucketedTimeSeries<VT, CT>::update(TimePoint now) {
if (empty()) {
// This is the first data point.
firstTime_ = now;
return getBucketIdx(latestTime_);
}
+ return updateBuckets(now);
+}
+
+template <typename VT, typename CT>
+size_t BucketedTimeSeries<VT, CT>::updateBuckets(TimePoint now) {
// We could cache nextBucketStart as a member variable, so we don't have to
// recompute it each time update() is called with a new timestamp value.
// This makes things faster when update() (or addValue()) is called once
// Get info about the bucket that latestTime_ points at
size_t currentBucket;
- TimeType currentBucketStart;
- TimeType nextBucketStart;
+ TimePoint currentBucketStart;
+ TimePoint nextBucketStart;
getBucketInfo(latestTime_, ¤tBucket,
¤tBucketStart, &nextBucketStart);
}
}
-template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::clear() {
+template <typename VT, typename CT>
+void BucketedTimeSeries<VT, CT>::clear() {
for (Bucket& bucket : buckets_) {
bucket.clear();
}
total_.clear();
// Set firstTime_ larger than latestTime_,
// to indicate that the timeseries is empty
- firstTime_ = TimeType(1);
- latestTime_ = TimeType(0);
+ firstTime_ = TimePoint(Duration(1));
+ latestTime_ = TimePoint();
}
-
-template <typename VT, typename TT>
-TT BucketedTimeSeries<VT, TT>::elapsed() const {
+template <typename VT, typename CT>
+typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTime() const {
if (empty()) {
- return TimeType(0);
+ return TimePoint();
}
-
if (isAllTime()) {
- return latestTime_ - firstTime_ + TimeType(1);
+ return firstTime_;
}
+ // Compute the earliest time we can track
+ TimePoint earliestTime = getEarliestTimeNonEmpty();
+
+ // We're never tracking data before firstTime_
+ earliestTime = std::max(earliestTime, firstTime_);
+
+ return earliestTime;
+}
+
+template <typename VT, typename CT>
+typename CT::time_point BucketedTimeSeries<VT, CT>::getEarliestTimeNonEmpty()
+ const {
size_t currentBucket;
- TimeType currentBucketStart;
- TimeType nextBucketStart;
+ TimePoint currentBucketStart;
+ TimePoint nextBucketStart;
getBucketInfo(latestTime_, ¤tBucket,
¤tBucketStart, &nextBucketStart);
// Subtract 1 duration from the start of the next bucket to find the
// earliest possible data point we could be tracking.
- TimeType earliestTime = nextBucketStart - duration_;
+ return nextBucketStart - duration_;
+}
- // We're never tracking data before firstTime_
- earliestTime = std::max(earliestTime, firstTime_);
+template <typename VT, typename CT>
+typename CT::duration BucketedTimeSeries<VT, CT>::elapsed() const {
+ if (empty()) {
+ return Duration(0);
+ }
- return latestTime_ - earliestTime + TimeType(1);
+ // Add 1 since [latestTime_, earliestTime] is an inclusive interval.
+ return latestTime_ - getEarliestTime() + Duration(1);
}
-template <typename VT, typename TT>
-VT BucketedTimeSeries<VT, TT>::sum(TimeType start, TimeType end) const {
- ValueType sum = ValueType();
- forEachBucket(start, end, [&](const Bucket& bucket,
- TimeType bucketStart,
- TimeType nextBucketStart) -> bool {
- sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
- bucket.sum);
- return true;
- });
-
- return sum;
+template <typename VT, typename CT>
+typename CT::duration BucketedTimeSeries<VT, CT>::elapsed(
+ TimePoint start,
+ TimePoint end) const {
+ if (empty()) {
+ return Duration(0);
+ }
+ start = std::max(start, getEarliestTime());
+ end = std::min(end, latestTime_ + Duration(1));
+ end = std::max(start, end);
+ return end - start;
}
-template <typename VT, typename TT>
-uint64_t BucketedTimeSeries<VT, TT>::count(TimeType start, TimeType end) const {
- uint64_t count = 0;
- forEachBucket(start, end, [&](const Bucket& bucket,
- TimeType bucketStart,
- TimeType nextBucketStart) -> bool {
- count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
- bucket.count);
- return true;
- });
+template <typename VT, typename CT>
+VT BucketedTimeSeries<VT, CT>::sum(TimePoint start, TimePoint end) const {
+ ValueType total = ValueType();
+ forEachBucket(
+ start,
+ end,
+ [&](const Bucket& bucket,
+ TimePoint bucketStart,
+ TimePoint nextBucketStart) -> bool {
+ total += this->rangeAdjust(
+ bucketStart, nextBucketStart, start, end, bucket.sum);
+ return true;
+ });
+
+ return total;
+}
- return count;
+template <typename VT, typename CT>
+uint64_t BucketedTimeSeries<VT, CT>::count(TimePoint start, TimePoint end)
+ const {
+ uint64_t sample_count = 0;
+ forEachBucket(
+ start,
+ end,
+ [&](const Bucket& bucket,
+ TimePoint bucketStart,
+ TimePoint nextBucketStart) -> bool {
+ sample_count += this->rangeAdjust(
+ bucketStart, nextBucketStart, start, end, bucket.count);
+ return true;
+ });
+
+ return sample_count;
}
-template <typename VT, typename TT>
+template <typename VT, typename CT>
template <typename ReturnType>
-ReturnType BucketedTimeSeries<VT, TT>::avg(TimeType start, TimeType end) const {
- ValueType sum = ValueType();
- uint64_t count = 0;
- forEachBucket(start, end, [&](const Bucket& bucket,
- TimeType bucketStart,
- TimeType nextBucketStart) -> bool {
- sum += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
- bucket.sum);
- count += this->rangeAdjust(bucketStart, nextBucketStart, start, end,
- bucket.count);
- return true;
- });
-
- if (count == 0) {
+ReturnType BucketedTimeSeries<VT, CT>::avg(TimePoint start, TimePoint end)
+ const {
+ ValueType total = ValueType();
+ uint64_t sample_count = 0;
+ forEachBucket(
+ start,
+ end,
+ [&](const Bucket& bucket,
+ TimePoint bucketStart,
+ TimePoint nextBucketStart) -> bool {
+ total += this->rangeAdjust(
+ bucketStart, nextBucketStart, start, end, bucket.sum);
+ sample_count += this->rangeAdjust(
+ bucketStart, nextBucketStart, start, end, bucket.count);
+ return true;
+ });
+
+ if (sample_count == 0) {
return ReturnType(0);
}
- return detail::avgHelper<ReturnType>(sum, count);
+ return detail::avgHelper<ReturnType>(total, sample_count);
}
/*
* into, we then divide by duration_.
*/
-template <typename VT, typename TT>
-size_t BucketedTimeSeries<VT, TT>::getBucketIdx(TimeType time) const {
+template <typename VT, typename CT>
+size_t BucketedTimeSeries<VT, CT>::getBucketIdx(TimePoint time) const {
// For all-time data we don't use buckets_. Everything is tracked in total_.
DCHECK(!isAllTime());
- time %= duration_;
- return time.count() * buckets_.size() / duration_.count();
+ auto timeIntoCurrentCycle = (time.time_since_epoch() % duration_);
+ return timeIntoCurrentCycle.count() * buckets_.size() / duration_.count();
}
/*
* Compute the bucket index for the specified time, as well as the earliest
* time that falls into this bucket.
*/
-template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::getBucketInfo(
- TimeType time, size_t *bucketIdx,
- TimeType* bucketStart, TimeType* nextBucketStart) const {
- typedef typename TimeType::rep TimeInt;
+template <typename VT, typename CT>
+void BucketedTimeSeries<VT, CT>::getBucketInfo(
+ TimePoint time,
+ size_t* bucketIdx,
+ TimePoint* bucketStart,
+ TimePoint* nextBucketStart) const {
+ typedef typename Duration::rep TimeInt;
DCHECK(!isAllTime());
// Keep these two lines together. The compiler should be able to compute
// both the division and modulus with a single operation.
- TimeType timeMod = time % duration_;
- TimeInt numFullDurations = time / duration_;
+ Duration timeMod = time.time_since_epoch() % duration_;
+ TimeInt numFullDurations = time.time_since_epoch() / duration_;
TimeInt scaledTime = timeMod.count() * buckets_.size();
TimeInt scaledBucketStart = scaledTime - scaledOffsetInBucket;
TimeInt scaledNextBucketStart = scaledBucketStart + duration_.count();
- TimeType bucketStartMod((scaledBucketStart + buckets_.size() - 1) /
- buckets_.size());
- TimeType nextBucketStartMod((scaledNextBucketStart + buckets_.size() - 1) /
- buckets_.size());
+ Duration bucketStartMod(
+ (scaledBucketStart + buckets_.size() - 1) / buckets_.size());
+ Duration nextBucketStartMod(
+ (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size());
- TimeType durationStart(numFullDurations * duration_.count());
+ TimePoint durationStart(numFullDurations * duration_);
*bucketStart = bucketStartMod + durationStart;
*nextBucketStart = nextBucketStartMod + durationStart;
}
-template <typename VT, typename TT>
+template <typename VT, typename CT>
template <typename Function>
-void BucketedTimeSeries<VT, TT>::forEachBucket(Function fn) const {
+void BucketedTimeSeries<VT, CT>::forEachBucket(Function fn) const {
if (isAllTime()) {
- fn(total_, firstTime_, latestTime_ + TimeType(1));
+ fn(total_, firstTime_, latestTime_ + Duration(1));
return;
}
- typedef typename TimeType::rep TimeInt;
+ typedef typename Duration::rep TimeInt;
// Compute durationStart, latestBucketIdx, and scaledNextBucketStart,
// the same way as in getBucketInfo().
- TimeType timeMod = latestTime_ % duration_;
- TimeInt numFullDurations = latestTime_ / duration_;
- TimeType durationStart(numFullDurations * duration_.count());
+ Duration timeMod = latestTime_.time_since_epoch() % duration_;
+ TimeInt numFullDurations = latestTime_.time_since_epoch() / duration_;
+ TimePoint durationStart(numFullDurations * duration_);
TimeInt scaledTime = timeMod.count() * buckets_.size();
size_t latestBucketIdx = scaledTime / duration_.count();
TimeInt scaledOffsetInBucket = scaledTime % duration_.count();
size_t idx = latestBucketIdx;
durationStart -= duration_;
- TimeType nextBucketStart =
- TimeType((scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
- durationStart;
+ TimePoint nextBucketStart =
+ Duration(
+ (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
+ durationStart;
while (true) {
++idx;
if (idx >= buckets_.size()) {
scaledNextBucketStart += duration_.count();
}
- TimeType bucketStart = nextBucketStart;
- nextBucketStart = TimeType((scaledNextBucketStart + buckets_.size() - 1) /
- buckets_.size()) + durationStart;
+ TimePoint bucketStart = nextBucketStart;
+ nextBucketStart =
+ Duration(
+ (scaledNextBucketStart + buckets_.size() - 1) / buckets_.size()) +
+ durationStart;
// Should we bother skipping buckets where firstTime_ >= nextBucketStart?
// For now we go ahead and invoke the function with these buckets.
// sum and count should always be 0 in these buckets.
- DCHECK_LE(bucketStart.count(), latestTime_.count());
+ DCHECK_LE(
+ bucketStart.time_since_epoch().count(),
+ latestTime_.time_since_epoch().count());
bool ret = fn(buckets_[idx], bucketStart, nextBucketStart);
if (!ret) {
break;
* For example, if the bucket spans time [10, 20), but we only care about the
* range [10, 16), this will return 60% of the input value.
*/
-template<typename VT, typename TT>
-VT BucketedTimeSeries<VT, TT>::rangeAdjust(
- TimeType bucketStart, TimeType nextBucketStart,
- TimeType start, TimeType end, ValueType input) const {
+template <typename VT, typename CT>
+VT BucketedTimeSeries<VT, CT>::rangeAdjust(
+ TimePoint bucketStart,
+ TimePoint nextBucketStart,
+ TimePoint start,
+ TimePoint end,
+ ValueType input) const {
// If nextBucketStart is greater than latestTime_, treat nextBucketStart as
// if it were latestTime_. This makes us more accurate when someone is
// querying for all of the data up to latestTime_. Even though latestTime_
// downwards in this case, because the bucket really only has data up to
// latestTime_.
if (bucketStart <= latestTime_ && nextBucketStart > latestTime_) {
- nextBucketStart = latestTime_ + TimeType(1);
+ nextBucketStart = latestTime_ + Duration(1);
}
if (start <= bucketStart && end >= nextBucketStart) {
return input;
}
- TimeType intervalStart = std::max(start, bucketStart);
- TimeType intervalEnd = std::min(end, nextBucketStart);
+ TimePoint intervalStart = std::max(start, bucketStart);
+ TimePoint intervalEnd = std::min(end, nextBucketStart);
return input * (intervalEnd - intervalStart) /
(nextBucketStart - bucketStart);
}
-template <typename VT, typename TT>
+template <typename VT, typename CT>
template <typename Function>
-void BucketedTimeSeries<VT, TT>::forEachBucket(TimeType start, TimeType end,
- Function fn) const {
- forEachBucket([&start, &end, &fn] (const Bucket& bucket, TimeType bucketStart,
- TimeType nextBucketStart) -> bool {
- if (start >= nextBucketStart) {
- return true;
- }
- if (end <= bucketStart) {
- return false;
- }
- bool ret = fn(bucket, bucketStart, nextBucketStart);
- return ret;
- });
+void BucketedTimeSeries<VT, CT>::forEachBucket(
+ TimePoint start,
+ TimePoint end,
+ Function fn) const {
+ forEachBucket(
+ [&start, &end, &fn](
+ const Bucket& bucket,
+ TimePoint bucketStart,
+ TimePoint nextBucketStart) -> bool {
+ if (start >= nextBucketStart) {
+ return true;
+ }
+ if (end <= bucketStart) {
+ return false;
+ }
+ bool ret = fn(bucket, bucketStart, nextBucketStart);
+ return ret;
+ });
}
} // folly
-
-#endif // FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_