Window, mainly for futures
authorTom Jackson <tjackson@fb.com>
Thu, 14 Sep 2017 02:01:02 +0000 (19:01 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 14 Sep 2017 02:05:22 +0000 (19:05 -0700)
Summary: Just a circular buffer in the middle of a pipeline.

Reviewed By: yfeldblum

Differential Revision: D5791551

fbshipit-source-id: 2808a53df9b8cd2a402da0678a6b4ed0e6ca6e00

folly/gen/Base-inl.h
folly/gen/Base.h
folly/gen/test/BaseTest.cpp

index e7a8e415426851fb2af0f62f65281bbe939baec9..6e39e03f3ae539d4559b4ca672879c3f5a94a702 100644 (file)
@@ -1366,6 +1366,103 @@ class Batch : public Operator<Batch> {
   }
 };
 
+/**
+ * Window - For overlapping the lifetimes of pipeline values, especially with
+ * Futures.
+ *
+ * This type is usually used through the 'window' helper function:
+ *
+ *   auto responses
+ *     = byLine(STDIN)
+ *     | map(makeRequestFuture)
+ *     | window(1000)
+ *     | map(waitFuture)
+ *     | as<vector>();
+ */
+class Window : public Operator<Window> {
+  size_t windowSize_;
+
+ public:
+  explicit Window(size_t windowSize) : windowSize_(windowSize) {
+    if (windowSize_ == 0) {
+      throw std::invalid_argument("Window size must be non-zero!");
+    }
+  }
+
+  template <
+      class Value,
+      class Source,
+      class StorageType = typename std::decay<Value>::type>
+  class Generator
+      : public GenImpl<StorageType&&, Generator<Value, Source, StorageType>> {
+    Source source_;
+    size_t windowSize_;
+
+   public:
+    explicit Generator(Source source, size_t windowSize)
+        : source_(std::move(source)), windowSize_(windowSize) {}
+
+    template <class Handler>
+    bool apply(Handler&& handler) const {
+      std::vector<StorageType> buffer;
+      buffer.reserve(windowSize_);
+      size_t readIndex = 0;
+      bool shouldContinue = source_.apply([&](Value value) -> bool {
+        if (buffer.size() < windowSize_) {
+          buffer.push_back(std::forward<Value>(value));
+        } else {
+          StorageType& entry = buffer[readIndex++];
+          if (readIndex == windowSize_) {
+            readIndex = 0;
+          }
+          if (!handler(std::move(entry))) {
+            return false;
+          }
+          entry = std::forward<Value>(value);
+        }
+        return true;
+      });
+      if (!shouldContinue) {
+        return false;
+      }
+      if (buffer.size() < windowSize_) {
+        for (StorageType& entry : buffer) {
+          if (!handler(std::move(entry))) {
+            return false;
+          }
+        }
+      } else {
+        for (size_t i = readIndex;;) {
+          StorageType& entry = buffer[i++];
+          if (!handler(std::move(entry))) {
+            return false;
+          }
+          if (i == windowSize_) {
+            i = 0;
+          }
+          if (i == readIndex) {
+            break;
+          }
+        }
+      }
+      return true;
+    }
+
+    // Taking n-tuples of an infinite source is still infinite
+    static constexpr bool infinite = Source::infinite;
+  };
+
+  template <class Source, class Value, class Gen = Generator<Value, Source>>
+  Gen compose(GenImpl<Value, Source>&& source) const {
+    return Gen(std::move(source.self()), windowSize_);
+  }
+
+  template <class Source, class Value, class Gen = Generator<Value, Source>>
+  Gen compose(const GenImpl<Value, Source>& source) const {
+    return Gen(source.self(), windowSize_);
+  }
+};
+
 /**
  * Concat - For flattening generators of generators.
  *
@@ -2357,6 +2454,11 @@ inline detail::Skip skip(size_t count) { return detail::Skip(count); }
 inline detail::Batch batch(size_t batchSize) {
   return detail::Batch(batchSize);
 }
+
+inline detail::Window window(size_t windowSize) {
+  return detail::Window(windowSize);
+}
+
 } // namespace gen
 } // namespace folly
 
index 1f8a59ec29acd459d8af501002de181655208197..b760aef3c562603b951dbb14dd6655a3f2372ba8 100644 (file)
@@ -357,6 +357,8 @@ class Cycle;
 
 class Batch;
 
+class Window;
+
 class Dereference;
 
 class Indirect;
index 8e0358c2d4375d44f266291fcded775c051e1037..5d9075bda463e95632cfccb05b186812422c97cc 100644 (file)
@@ -1249,6 +1249,31 @@ TEST(Gen, BatchMove) {
   EXPECT_EQ(expected, actual);
 }
 
+TEST(Gen, Window) {
+  auto expected = seq(0, 10) | as<std::vector>();
+  for (size_t windowSize = 1; windowSize <= 20; ++windowSize) {
+    // no early stop
+    auto actual = seq(0, 10) |
+        mapped([](int i) { return std::unique_ptr<int>(new int(i)); }) |
+        window(4) | dereference | as<std::vector>();
+    EXPECT_EQ(expected, actual) << windowSize;
+  }
+  for (size_t windowSize = 1; windowSize <= 20; ++windowSize) {
+    // pre-window take
+    auto actual = seq(0) |
+        mapped([](int i) { return std::unique_ptr<int>(new int(i)); }) |
+        take(11) | window(4) | dereference | as<std::vector>();
+    EXPECT_EQ(expected, actual) << windowSize;
+  }
+  for (size_t windowSize = 1; windowSize <= 20; ++windowSize) {
+    // post-window take
+    auto actual = seq(0) |
+        mapped([](int i) { return std::unique_ptr<int>(new int(i)); }) |
+        window(4) | take(11) | dereference | as<std::vector>();
+    EXPECT_EQ(expected, actual) << windowSize;
+  }
+}
+
 TEST(Gen, Just) {
   {
     int x = 3;