Fix ASAN failure in FutureDAG test
[folly.git] / folly / gen / Base-inl.h
index c654cbcd1de518a9224293a4a3ad398ce8a540f9..8869517ef9b5f4c8d80e1ab2a0169c1268e7d6f2 100644 (file)
@@ -470,7 +470,7 @@ class SingleCopy : public GenImpl<const Value&, SingleCopy<Value>> {
  *
  * This type is usually used through the 'map' or 'mapped' helper function:
  *
- *   auto squares = seq(1, 10) | map(square) | asVector;
+ *   auto squares = seq(1, 10) | map(square) | as<std::vector>();
  */
 template <class Predicate>
 class Map : public Operator<Map<Predicate>> {
@@ -597,7 +597,7 @@ class Filter : public Operator<Filter<Predicate>> {
  *
  *   auto best = from(sortedItems)
  *             | until([](Item& item) { return item.score > 100; })
- *             | asVector;
+ *             | as<std::vector>();
  */
 template <class Predicate>
 class Until : public Operator<Until<Predicate>> {
@@ -703,6 +703,65 @@ class Take : public Operator<Take> {
   }
 };
 
+/**
+ * Visit - For calling a function on each item before passing it down the
+ * pipeline.
+ *
+ * This type is usually used through the 'visit' helper function:
+ *
+ *   auto printedValues = seq(1) | visit(debugPrint);
+ *   // nothing printed yet
+ *   auto results = take(10) | as<std::vector>();
+ *   // results now populated, 10 values printed
+ */
+template <class Visitor>
+class Visit : public Operator<Visit<Visitor>> {
+  Visitor visitor_;
+
+ public:
+  Visit() = default;
+
+  explicit Visit(Visitor visitor) : visitor_(std::move(visitor)) {}
+
+  template <class Value, class Source>
+  class Generator : public GenImpl<Value, Generator<Value, Source>> {
+    Source source_;
+    Visitor visitor_;
+
+   public:
+    explicit Generator(Source source, const Visitor& visitor)
+        : source_(std::move(source)), visitor_(visitor) {}
+
+    template <class Body>
+    void foreach(Body&& body) const {
+      source_.foreach([&](Value value) {
+        visitor_(value); // not forwarding to avoid accidental moves
+        body(std::forward<Value>(value));
+      });
+    }
+
+    template <class Handler>
+    bool apply(Handler&& handler) const {
+      return source_.apply([&](Value value) {
+        visitor_(value); // not forwarding to avoid accidental moves
+        return handler(std::forward<Value>(value));
+      });
+    }
+
+    static constexpr bool infinite = Source::infinite;
+  };
+
+  template <class Source, class Value, class Gen = Generator<Value, Source>>
+  Gen compose(GenImpl<Value, Source>&& source) const {
+    return Gen(std::move(source.self()), visitor_);
+  }
+
+  template <class Source, class Value, class Gen = Generator<Value, Source>>
+  Gen compose(const GenImpl<Value, Source>& source) const {
+    return Gen(source.self(), visitor_);
+  }
+};
+
 /**
  * Stride - For producing every Nth value from a source.
  *
@@ -1307,6 +1366,103 @@ class Batch : public Operator<Batch> {
   }
 };
 
+/**
+ * Window - For overlapping the lifetimes of pipeline values, especially with
+ * Futures.
+ *
+ * This type is usually used through the 'window' helper function:
+ *
+ *   auto responses
+ *     = byLine(STDIN)
+ *     | map(makeRequestFuture)
+ *     | window(1000)
+ *     | map(waitFuture)
+ *     | as<vector>();
+ */
+class Window : public Operator<Window> {
+  size_t windowSize_;
+
+ public:
+  explicit Window(size_t windowSize) : windowSize_(windowSize) {
+    if (windowSize_ == 0) {
+      throw std::invalid_argument("Window size must be non-zero!");
+    }
+  }
+
+  template <
+      class Value,
+      class Source,
+      class StorageType = typename std::decay<Value>::type>
+  class Generator
+      : public GenImpl<StorageType&&, Generator<Value, Source, StorageType>> {
+    Source source_;
+    size_t windowSize_;
+
+   public:
+    explicit Generator(Source source, size_t windowSize)
+        : source_(std::move(source)), windowSize_(windowSize) {}
+
+    template <class Handler>
+    bool apply(Handler&& handler) const {
+      std::vector<StorageType> buffer;
+      buffer.reserve(windowSize_);
+      size_t readIndex = 0;
+      bool shouldContinue = source_.apply([&](Value value) -> bool {
+        if (buffer.size() < windowSize_) {
+          buffer.push_back(std::forward<Value>(value));
+        } else {
+          StorageType& entry = buffer[readIndex++];
+          if (readIndex == windowSize_) {
+            readIndex = 0;
+          }
+          if (!handler(std::move(entry))) {
+            return false;
+          }
+          entry = std::forward<Value>(value);
+        }
+        return true;
+      });
+      if (!shouldContinue) {
+        return false;
+      }
+      if (buffer.size() < windowSize_) {
+        for (StorageType& entry : buffer) {
+          if (!handler(std::move(entry))) {
+            return false;
+          }
+        }
+      } else {
+        for (size_t i = readIndex;;) {
+          StorageType& entry = buffer[i++];
+          if (!handler(std::move(entry))) {
+            return false;
+          }
+          if (i == windowSize_) {
+            i = 0;
+          }
+          if (i == readIndex) {
+            break;
+          }
+        }
+      }
+      return true;
+    }
+
+    // Taking n-tuples of an infinite source is still infinite
+    static constexpr bool infinite = Source::infinite;
+  };
+
+  template <class Source, class Value, class Gen = Generator<Value, Source>>
+  Gen compose(GenImpl<Value, Source>&& source) const {
+    return Gen(std::move(source.self()), windowSize_);
+  }
+
+  template <class Source, class Value, class Gen = Generator<Value, Source>>
+  Gen compose(const GenImpl<Value, Source>& source) const {
+    return Gen(source.self(), windowSize_);
+  }
+};
+
 /**
  * Concat - For flattening generators of generators.
  *
@@ -2176,7 +2332,7 @@ const T& operator|(const Optional<T>& opt, const Unwrap&) {
   return opt.value();
 }
 
-} //::detail
+} // namespace detail
 
 /**
  * VirtualGen<T> - For wrapping template types in simple polymorphic wrapper.
@@ -2298,7 +2454,12 @@ inline detail::Skip skip(size_t count) { return detail::Skip(count); }
 inline detail::Batch batch(size_t batchSize) {
   return detail::Batch(batchSize);
 }
-} // gen
-} // folly
+
+inline detail::Window window(size_t windowSize) {
+  return detail::Window(windowSize);
+}
+
+} // namespace gen
+} // namespace folly
 
 FOLLY_POP_WARNING