*
* This type is usually used through the 'map' or 'mapped' helper function:
*
- * auto squares = seq(1, 10) | map(square) | asVector;
+ * auto squares = seq(1, 10) | map(square) | as<std::vector>();
*/
template <class Predicate>
class Map : public Operator<Map<Predicate>> {
*
* auto best = from(sortedItems)
* | until([](Item& item) { return item.score > 100; })
- * | asVector;
+ * | as<std::vector>();
*/
template <class Predicate>
class Until : public Operator<Until<Predicate>> {
}
};
+/**
+ * Visit - For calling a function on each item before passing it down the
+ * pipeline.
+ *
+ * This type is usually used through the 'visit' helper function:
+ *
+ * auto printedValues = seq(1) | visit(debugPrint);
+ * // nothing printed yet
+ * auto results = take(10) | as<std::vector>();
+ * // results now populated, 10 values printed
+ */
+template <class Visitor>
+class Visit : public Operator<Visit<Visitor>> {
+ Visitor visitor_;
+
+ public:
+ Visit() = default;
+
+ explicit Visit(Visitor visitor) : visitor_(std::move(visitor)) {}
+
+ template <class Value, class Source>
+ class Generator : public GenImpl<Value, Generator<Value, Source>> {
+ Source source_;
+ Visitor visitor_;
+
+ public:
+ explicit Generator(Source source, const Visitor& visitor)
+ : source_(std::move(source)), visitor_(visitor) {}
+
+ template <class Body>
+ void foreach(Body&& body) const {
+ source_.foreach([&](Value value) {
+ visitor_(value); // not forwarding to avoid accidental moves
+ body(std::forward<Value>(value));
+ });
+ }
+
+ template <class Handler>
+ bool apply(Handler&& handler) const {
+ return source_.apply([&](Value value) {
+ visitor_(value); // not forwarding to avoid accidental moves
+ return handler(std::forward<Value>(value));
+ });
+ }
+
+ static constexpr bool infinite = Source::infinite;
+ };
+
+ template <class Source, class Value, class Gen = Generator<Value, Source>>
+ Gen compose(GenImpl<Value, Source>&& source) const {
+ return Gen(std::move(source.self()), visitor_);
+ }
+
+ template <class Source, class Value, class Gen = Generator<Value, Source>>
+ Gen compose(const GenImpl<Value, Source>& source) const {
+ return Gen(source.self(), visitor_);
+ }
+};
+
/**
* Stride - For producing every Nth value from a source.
*
}
};
+/**
+ * Window - For overlapping the lifetimes of pipeline values, especially with
+ * Futures.
+ *
+ * This type is usually used through the 'window' helper function:
+ *
+ * auto responses
+ * = byLine(STDIN)
+ * | map(makeRequestFuture)
+ * | window(1000)
+ * | map(waitFuture)
+ * | as<vector>();
+ */
+class Window : public Operator<Window> {
+ size_t windowSize_;
+
+ public:
+ explicit Window(size_t windowSize) : windowSize_(windowSize) {
+ if (windowSize_ == 0) {
+ throw std::invalid_argument("Window size must be non-zero!");
+ }
+ }
+
+ template <
+ class Value,
+ class Source,
+ class StorageType = typename std::decay<Value>::type>
+ class Generator
+ : public GenImpl<StorageType&&, Generator<Value, Source, StorageType>> {
+ Source source_;
+ size_t windowSize_;
+
+ public:
+ explicit Generator(Source source, size_t windowSize)
+ : source_(std::move(source)), windowSize_(windowSize) {}
+
+ template <class Handler>
+ bool apply(Handler&& handler) const {
+ std::vector<StorageType> buffer;
+ buffer.reserve(windowSize_);
+ size_t readIndex = 0;
+ bool shouldContinue = source_.apply([&](Value value) -> bool {
+ if (buffer.size() < windowSize_) {
+ buffer.push_back(std::forward<Value>(value));
+ } else {
+ StorageType& entry = buffer[readIndex++];
+ if (readIndex == windowSize_) {
+ readIndex = 0;
+ }
+ if (!handler(std::move(entry))) {
+ return false;
+ }
+ entry = std::forward<Value>(value);
+ }
+ return true;
+ });
+ if (!shouldContinue) {
+ return false;
+ }
+ if (buffer.size() < windowSize_) {
+ for (StorageType& entry : buffer) {
+ if (!handler(std::move(entry))) {
+ return false;
+ }
+ }
+ } else {
+ for (size_t i = readIndex;;) {
+ StorageType& entry = buffer[i++];
+ if (!handler(std::move(entry))) {
+ return false;
+ }
+ if (i == windowSize_) {
+ i = 0;
+ }
+ if (i == readIndex) {
+ break;
+ }
+ }
+ }
+ return true;
+ }
+
+ // Taking n-tuples of an infinite source is still infinite
+ static constexpr bool infinite = Source::infinite;
+ };
+
+ template <class Source, class Value, class Gen = Generator<Value, Source>>
+ Gen compose(GenImpl<Value, Source>&& source) const {
+ return Gen(std::move(source.self()), windowSize_);
+ }
+
+ template <class Source, class Value, class Gen = Generator<Value, Source>>
+ Gen compose(const GenImpl<Value, Source>& source) const {
+ return Gen(source.self(), windowSize_);
+ }
+};
+
/**
* Concat - For flattening generators of generators.
*
return opt.value();
}
-} //::detail
+} // namespace detail
/**
* VirtualGen<T> - For wrapping template types in simple polymorphic wrapper.
inline detail::Batch batch(size_t batchSize) {
return detail::Batch(batchSize);
}
-} // gen
-} // folly
+
+inline detail::Window window(size_t windowSize) {
+ return detail::Window(windowSize);
+}
+
+} // namespace gen
+} // namespace folly
FOLLY_POP_WARNING