Fix ASAN failure in FutureDAG test
[folly.git] / folly / gen / Base-inl.h
index e7a8e415426851fb2af0f62f65281bbe939baec9..8869517ef9b5f4c8d80e1ab2a0169c1268e7d6f2 100644 (file)
@@ -1366,6 +1366,103 @@ class Batch : public Operator<Batch> {
   }
 };
 
+/**
+ * Window - For overlapping the lifetimes of pipeline values, especially with
+ * Futures.
+ *
+ * This type is usually used through the 'window' helper function:
+ *
+ *   auto responses
+ *     = byLine(STDIN)
+ *     | map(makeRequestFuture)
+ *     | window(1000)
+ *     | map(waitFuture)
+ *     | as<vector>();
+ */
+class Window : public Operator<Window> {
+  size_t windowSize_;
+
+ public:
+  explicit Window(size_t windowSize) : windowSize_(windowSize) {
+    if (windowSize_ == 0) {
+      throw std::invalid_argument("Window size must be non-zero!");
+    }
+  }
+
+  template <
+      class Value,
+      class Source,
+      class StorageType = typename std::decay<Value>::type>
+  class Generator
+      : public GenImpl<StorageType&&, Generator<Value, Source, StorageType>> {
+    Source source_;
+    size_t windowSize_;
+
+   public:
+    explicit Generator(Source source, size_t windowSize)
+        : source_(std::move(source)), windowSize_(windowSize) {}
+
+    template <class Handler>
+    bool apply(Handler&& handler) const {
+      std::vector<StorageType> buffer;
+      buffer.reserve(windowSize_);
+      size_t readIndex = 0;
+      bool shouldContinue = source_.apply([&](Value value) -> bool {
+        if (buffer.size() < windowSize_) {
+          buffer.push_back(std::forward<Value>(value));
+        } else {
+          StorageType& entry = buffer[readIndex++];
+          if (readIndex == windowSize_) {
+            readIndex = 0;
+          }
+          if (!handler(std::move(entry))) {
+            return false;
+          }
+          entry = std::forward<Value>(value);
+        }
+        return true;
+      });
+      if (!shouldContinue) {
+        return false;
+      }
+      if (buffer.size() < windowSize_) {
+        for (StorageType& entry : buffer) {
+          if (!handler(std::move(entry))) {
+            return false;
+          }
+        }
+      } else {
+        for (size_t i = readIndex;;) {
+          StorageType& entry = buffer[i++];
+          if (!handler(std::move(entry))) {
+            return false;
+          }
+          if (i == windowSize_) {
+            i = 0;
+          }
+          if (i == readIndex) {
+            break;
+          }
+        }
+      }
+      return true;
+    }
+
+    // Taking n-tuples of an infinite source is still infinite
+    static constexpr bool infinite = Source::infinite;
+  };
+
+  template <class Source, class Value, class Gen = Generator<Value, Source>>
+  Gen compose(GenImpl<Value, Source>&& source) const {
+    return Gen(std::move(source.self()), windowSize_);
+  }
+
+  template <class Source, class Value, class Gen = Generator<Value, Source>>
+  Gen compose(const GenImpl<Value, Source>& source) const {
+    return Gen(source.self(), windowSize_);
+  }
+};
+
 /**
  * Concat - For flattening generators of generators.
  *
@@ -2235,7 +2332,7 @@ const T& operator|(const Optional<T>& opt, const Unwrap&) {
   return opt.value();
 }
 
-} //::detail
+} // namespace detail
 
 /**
  * VirtualGen<T> - For wrapping template types in simple polymorphic wrapper.
@@ -2357,6 +2454,11 @@ inline detail::Skip skip(size_t count) { return detail::Skip(count); }
 inline detail::Batch batch(size_t batchSize) {
   return detail::Batch(batchSize);
 }
+
+inline detail::Window window(size_t windowSize) {
+  return detail::Window(windowSize);
+}
+
 } // namespace gen
 } // namespace folly