make BucketedTimeSeries::addValue() honor old timestamps
authorAdam Simpkins <simpkins@fb.com>
Wed, 14 May 2014 20:40:33 +0000 (13:40 -0700)
committerAnton Likhtarov <alikhtarov@fb.com>
Mon, 9 Jun 2014 22:33:57 +0000 (15:33 -0700)
Summary:
Previously BucketedTimeSeries()::addValue() documented that it required
time to move forwards.  If it was ever called with a timestamp older
than the most recent one it had seen, it would just use latestTime_ as
the time, and add the value to the most recent bucket.

This changes addValue() so that it always uses the timestamp passed in
by the caller.  If this time value refers to an old bucket that is still
being tracked, the data point will be added to that bucket.  If the time
value is older than the oldest currently tracked bucket, the data point
will be ignored, and false will be returned.

I did consider leaving the current addValue() behavior as-is, and
requiring a separate addHistoricalValue() for when users intentionally
want to try adding old data points.  However, it seems nicer to build
this into the existing addValue() function.  The old behavior of just
replacing the supplied time value seems potentially surprising to users.

This does change the behavior of addValue(), and therefore could affect
the behavior of some programs.  However, up until now no-one should have
been calling addValue() with old time values, as it wouldn't have done
what they want anyway.  I did a brief search through our code base, and
all call sites I saw always called addValue() with the current time.
(Most of the callers use wall clock time, so this change might affect
program behavior if the system time changes after the program starts.
We should ideally change our programs to use monotonic clocks instead.)

Test Plan:
Included a new unit test.

Also compared the timeseries_benchmark results before and after this
change.  Overall this new logic seems to be faster.  For the "all time"
case, the new code is over 2x as fast.  For the normal, non-all-time
case the new code is around 5% faster.

Reviewed By: hans@fb.com

Subscribers: doug, folly@lists, net-systems@, exa

FB internal diff: D1338466

folly/stats/BucketedTimeSeries-defs.h
folly/stats/BucketedTimeSeries.h
folly/test/TimeseriesTest.cpp

index 3474258ddc6dfa1825430a1f41cb8d4328af3392..d37539a72d3a3d62459aeef3846f7a50b057ab75 100644 (file)
@@ -18,6 +18,7 @@
 #define FOLLY_STATS_BUCKETEDTIMESERIES_INL_H_
 
 #include <glog/logging.h>
+#include "folly/Likely.h"
 
 namespace folly {
 
@@ -43,39 +44,58 @@ BucketedTimeSeries<VT, TT>::BucketedTimeSeries(size_t numBuckets,
 }
 
 template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
-  addValueAggregated(now, val, 1);
+bool BucketedTimeSeries<VT, TT>::addValue(TimeType now, const ValueType& val) {
+  return addValueAggregated(now, val, 1);
 }
 
 template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::addValue(TimeType now,
+bool BucketedTimeSeries<VT, TT>::addValue(TimeType now,
                                           const ValueType& val,
                                           int64_t times) {
-  addValueAggregated(now, val * times, times);
+  return addValueAggregated(now, val * times, times);
 }
 
 template <typename VT, typename TT>
-void BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
+bool BucketedTimeSeries<VT, TT>::addValueAggregated(TimeType now,
                                                     const ValueType& sum,
                                                     int64_t nsamples) {
-  // Make sure time doesn't go backwards
-  now = std::max(now, latestTime_);
-
   if (isAllTime()) {
-    if (empty()) {
+    if (UNLIKELY(empty())) {
+      firstTime_ = now;
+      latestTime_ = now;
+    } else if (now > latestTime_) {
+      latestTime_ = now;
+    } else if (now < firstTime_) {
       firstTime_ = now;
     }
-    latestTime_ = now;
     total_.add(sum, nsamples);
-    return;
+    return true;
   }
 
-  // Update the buckets
-  size_t curBucket = update(now);
-  buckets_[curBucket].add(sum, nsamples);
+  size_t bucketIdx;
+  if (UNLIKELY(empty())) {
+    // First data point we've ever seen
+    firstTime_ = now;
+    latestTime_ = now;
+    bucketIdx = getBucketIdx(now);
+  } else if (now > latestTime_) {
+    // More recent time.  Need to update the buckets.
+    bucketIdx = updateBuckets(now);
+  } else if (LIKELY(now == latestTime_)) {
+    // Current time.
+    bucketIdx = getBucketIdx(now);
+  } else {
+    // An earlier time in the past.  We need to check if this time still falls
+    // within our window.
+    if (now < getEarliestTimeNonEmpty()) {
+      return false;
+    }
+    bucketIdx = getBucketIdx(now);
+  }
 
-  // Update the aggregate sum/count
   total_.add(sum, nsamples);
+  buckets_[bucketIdx].add(sum, nsamples);
+  return true;
 }
 
 template <typename VT, typename TT>
@@ -98,6 +118,11 @@ size_t BucketedTimeSeries<VT, TT>::update(TimeType now) {
     return getBucketIdx(latestTime_);
   }
 
+  return updateBuckets(now);
+}
+
+template <typename VT, typename TT>
+size_t BucketedTimeSeries<VT, TT>::updateBuckets(TimeType now) {
   // We could cache nextBucketStart as a member variable, so we don't have to
   // recompute it each time update() is called with a new timestamp value.
   // This makes things faster when update() (or addValue()) is called once
@@ -169,6 +194,17 @@ TT BucketedTimeSeries<VT, TT>::getEarliestTime() const {
     return firstTime_;
   }
 
+  // Compute the earliest time we can track
+  TimeType earliestTime = getEarliestTimeNonEmpty();
+
+  // We're never tracking data before firstTime_
+  earliestTime = std::max(earliestTime, firstTime_);
+
+  return earliestTime;
+}
+
+template <typename VT, typename TT>
+TT BucketedTimeSeries<VT, TT>::getEarliestTimeNonEmpty() const {
   size_t currentBucket;
   TimeType currentBucketStart;
   TimeType nextBucketStart;
@@ -177,12 +213,7 @@ TT BucketedTimeSeries<VT, TT>::getEarliestTime() const {
 
   // Subtract 1 duration from the start of the next bucket to find the
   // earliest possible data point we could be tracking.
-  TimeType earliestTime = nextBucketStart - duration_;
-
-  // We're never tracking data before firstTime_
-  earliestTime = std::max(earliestTime, firstTime_);
-
-  return earliestTime;
+  return nextBucketStart - duration_;
 }
 
 template <typename VT, typename TT>
index c6c4ad8596a07eac93b1c520def62b623cfa35d0..8336c7d9ddb3713a868528eb0e7c6310f316c73a 100644 (file)
@@ -36,10 +36,12 @@ namespace folly {
  * be discarded and new data will go into the newly opened bucket.  Internally,
  * it uses a circular array of buckets that it reuses as time advances.
  *
- * The class assumes that time advances forward --  you can't retroactively add
- * values for events in the past -- the 'now' argument is provided for better
- * efficiency and ease of unittesting.
- *
+ * This class assumes that time advances forwards.  The window of time tracked
+ * by the timeseries will advance forwards whenever a more recent timestamp is
+ * passed to addValue().  While it is possible to pass old time values to
+ * addValue(), this will never move the time window backwards.  If the old time
+ * value falls outside the tracked window of time, the data point will be
+ * ignored.
  *
  * This class is not thread-safe -- use your own synchronization!
  */
@@ -65,23 +67,32 @@ class BucketedTimeSeries {
   /*
    * Adds the value 'val' at time 'now'
    *
-   * This function expects time to always move forwards: it cannot be used to
-   * add historical data points that have occurred in the past.  If now is
-   * older than the another timestamp that has already been passed to
-   * addValue() or update(), now will be ignored and the latest timestamp will
-   * be used.
+   * This function expects time to generally move forwards.  The window of time
+   * tracked by this time series will move forwards with time.  If 'now' is
+   * more recent than any time previously seen, addValue() will automatically
+   * call update(now) to advance the time window tracked by this data
+   * structure.
+   *
+   * Values in the recent past may be added to the data structure by passing in
+   * a slightly older value of 'now', as long as this time point still falls
+   * within the tracked duration.  If 'now' is older than the tracked duration
+   * of time, the data point value will be ignored, and addValue() will return
+   * false without doing anything else.
+   *
+   * Returns true on success, or false if now was older than the tracked time
+   * window.
    */
-  void addValue(TimeType now, const ValueType& val);
+  bool addValue(TimeType now, const ValueType& val);
 
   /*
    * Adds the value 'val' the given number of 'times' at time 'now'
    */
-  void addValue(TimeType now, const ValueType& val, int64_t times);
+  bool addValue(TimeType now, const ValueType& val, int64_t times);
 
   /*
    * Adds the value 'sum' as the sum of 'nsamples' samples
    */
-  void addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples);
+  bool addValueAggregated(TimeType now, const ValueType& sum, int64_t nsamples);
 
   /*
    * Updates the container to the specified time, doing all the necessary
@@ -376,6 +387,9 @@ class BucketedTimeSeries {
                                                               elapsed);
   }
 
+  TimeType getEarliestTimeNonEmpty() const;
+  size_t updateBuckets(TimeType now);
+
   ValueType rangeAdjust(TimeType bucketStart, TimeType nextBucketStart,
                         TimeType start, TimeType end,
                         ValueType input) const;
index 638330d0224408396e1cb74bc6c3aa957927506b..0d97f2756492838ed79157c62d4c0f7da602ef60 100644 (file)
@@ -714,6 +714,52 @@ TEST(BucketedTimeSeries, rateByInterval) {
   EXPECT_EQ(1.0, b.countRate(seconds(0), kDuration * 10));
 }
 
+TEST(BucketedTimeSeries, addHistorical) {
+  const int kNumBuckets = 5;
+  const seconds kDuration(10);
+  BucketedTimeSeries<double> b(kNumBuckets, kDuration);
+
+  // Initially fill with a constant rate of data
+  for (seconds i = seconds(0); i < seconds(10); ++i) {
+    b.addValue(i, 10.0);
+  }
+
+  EXPECT_EQ(10.0, b.rate());
+  EXPECT_EQ(10.0, b.avg());
+  EXPECT_EQ(10, b.count());
+
+  // Add some more data points to the middle bucket
+  b.addValue(seconds(4), 40.0);
+  b.addValue(seconds(5), 40.0);
+  EXPECT_EQ(15.0, b.avg());
+  EXPECT_EQ(18.0, b.rate());
+  EXPECT_EQ(12, b.count());
+
+  // Now start adding more current data points, until we are about to roll over
+  // the bucket where we added the extra historical data.
+  for (seconds i = seconds(10); i < seconds(14); ++i) {
+    b.addValue(i, 10.0);
+  }
+  EXPECT_EQ(15.0, b.avg());
+  EXPECT_EQ(18.0, b.rate());
+  EXPECT_EQ(12, b.count());
+
+  // Now roll over the middle bucket
+  b.addValue(seconds(14), 10.0);
+  b.addValue(seconds(15), 10.0);
+  EXPECT_EQ(10.0, b.avg());
+  EXPECT_EQ(10.0, b.rate());
+  EXPECT_EQ(10, b.count());
+
+  // Add more historical values past the bucket window.
+  // These should be ignored.
+  EXPECT_FALSE(b.addValue(seconds(4), 40.0));
+  EXPECT_FALSE(b.addValue(seconds(5), 40.0));
+  EXPECT_EQ(10.0, b.avg());
+  EXPECT_EQ(10.0, b.rate());
+  EXPECT_EQ(10, b.count());
+}
+
 namespace IntMHTS {
   enum Levels {
     MINUTE,