}
};
+/**
+ * RangeSource - For producing values from a folly::Range. Useful for referring
+ * to a slice of some container.
+ *
+ * This type is primarily used through the 'from' function, like:
+ *
+ * auto rangeSource = from(folly::range(v.begin(), v.end()));
+ * auto sum = rangeSource | sum;
+ *
+ * Reminder: Be careful not to invalidate iterators when using ranges like this.
+ */
+template<class Iterator>
+class RangeSource : public GenImpl<typename Range<Iterator>::reference,
+ RangeSource<Iterator>> {
+ Range<Iterator> range_;
+ public:
+ RangeSource() {}
+ explicit RangeSource(Range<Iterator> range)
+ : range_(std::move(range))
+ {}
+
+ template<class Handler>
+ bool apply(Handler&& handler) const {
+ for (auto& value : range_) {
+ if (!handler(value)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ template<class Body>
+ void foreach(Body&& body) const {
+ for (auto& value : range_) {
+ body(value);
+ }
+ }
+};
+
/**
* Sequence - For generating values from beginning value, incremented along the
* way with the ++ and += operators. Iteration may continue indefinitely by
template<class Value>
class Empty : public GenImpl<Value, Empty<Value>> {
public:
- template<class Handler>
- bool apply(Handler&&) const { return true; }
+ template <class Handler>
+ bool apply(Handler&&) const {
+ return true;
+ }
+
+ template <class Body>
+ void foreach(Body&&) const {}
+};
+
+template<class Value>
+class Just : public GenImpl<const Value&, Just<Value>> {
+ static_assert(!std::is_reference<Value>::value,
+ "Just requires non-ref types");
+ const Value value_;
+ public:
+ Just(Value value) : value_(std::forward<Value>(value)) {}
+
+ template <class Handler>
+ bool apply(Handler&& handler) const {
+ return handler(value_);
+ }
+
+ template <class Body>
+ void foreach(Body&& body) const {
+ body(value_);
+ }
};
/*
}
};
+/**
+ * Composer - Helper class for adapting pipelines into functors. Primarily used
+ * for 'mapOp'.
+ */
+template<class Operators>
+class Composer {
+ Operators op_;
+ public:
+ explicit Composer(Operators op)
+ : op_(std::move(op)) {}
+
+ template<class Source,
+ class Ret = decltype(std::declval<Operators>()
+ .compose(std::declval<Source>()))>
+ Ret operator()(Source&& source) const {
+ return op_.compose(std::forward<Source>(source));
+ }
+};
+
/**
* Batch - For producing fixed-size batches of each value from a source.
*
/*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#ifndef FOLLY_GEN_BASE_H
#define FOLLY_GEN_BASE_H
template<class Value>
class Empty;
+template<class Value>
+class Just;
/*
* Operators
template<class Selector>
class Distinct;
+template<class Operators>
+class Composer;
+
template<class Expected>
class TypeAssertion;
return {};
}
+template<class Value>
+detail::Just<Value> just(Value value) {
+ return detail::Just<Value>(std::move(value));
+}
+
/*
* Operator Factories
*/
return Map(std::move(pred));
}
+/**
+ * mapOp - Given a generator of generators, maps the application of the given
+ * operator on to each inner gen. Especially useful in aggregating nested data
+ * structures:
+ *
+ * chunked(samples, 256)
+ * | mapOp(filter(sampleTest) | count)
+ * | sum;
+ */
+template<class Operator,
+ class Map = detail::Map<detail::Composer<Operator>>>
+Map mapOp(Operator op) {
+ return Map(detail::Composer<Operator>(std::move(op)));
+}
+
/*
* member(...) - For extracting a member from each value.
*
protected:
Operator() = default;
- Operator(const Operator&) = default;
Operator(Operator&&) = default;
+ Operator(const Operator&) = default;
+ Operator& operator=(Operator&&) = default;
+ Operator& operator=(const Operator&) = default;
};
/**
protected:
// To prevent slicing
GenImpl() = default;
- GenImpl(const GenImpl&) = default;
GenImpl(GenImpl&&) = default;
+ GenImpl(const GenImpl&) = default;
+ GenImpl& operator=(GenImpl&&) = default;
+ GenImpl& operator=(const GenImpl&) = default;
public:
typedef Value ValueType;
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_PARALLEL_H_
+#error This file may only be included from folly/gen/ParallelGen.h
+#endif
+
+#include "folly/MPMCQueue.h"
+#include "folly/ScopeGuard.h"
+#include "folly/experimental/EventCount.h"
+#include <atomic>
+#include <thread>
+#include <vector>
+
+namespace folly {
+namespace gen {
+namespace detail {
+
+template <typename T>
+class ClosableMPMCQueue {
+ MPMCQueue<T> queue_;
+ std::atomic<size_t> producers_{0};
+ std::atomic<size_t> consumers_{0};
+ folly::EventCount wakeProducer_;
+ folly::EventCount wakeConsumer_;
+
+ public:
+ explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
+
+ ~ClosableMPMCQueue() {
+ CHECK(!producers());
+ CHECK(!consumers());
+ }
+
+ void openProducer() { ++producers_; }
+ void openConsumer() { ++consumers_; }
+
+ void closeInputProducer() {
+ int64_t producers = producers_--;
+ CHECK(producers);
+ if (producers == 1) { // last producer
+ wakeConsumer_.notifyAll();
+ }
+ }
+
+ void closeOutputConsumer() {
+ int64_t consumers = consumers_--;
+ CHECK(consumers);
+ if (consumers == 1) { // last consumer
+ wakeProducer_.notifyAll();
+ }
+ }
+
+ size_t producers() const {
+ return producers_.load(std::memory_order_acquire);
+ }
+
+ size_t consumers() const {
+ return consumers_.load(std::memory_order_acquire);
+ }
+
+ template <typename... Args>
+ bool writeUnlessFull(Args&&... args) noexcept {
+ if (queue_.write(std::forward<Args>(args)...)) {
+ // wake consumers to pick up new value
+ wakeConsumer_.notify();
+ return true;
+ }
+ return false;
+ }
+
+ template <typename... Args>
+ bool writeUnlessClosed(Args&&... args) {
+ // write if there's room
+ while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
+ // if write fails, check if there are still consumers listening
+ auto key = wakeProducer_.prepareWait();
+ if (!consumers()) {
+ // no consumers left; bail out
+ wakeProducer_.cancelWait();
+ return false;
+ }
+ wakeProducer_.wait(key);
+ }
+ // wake consumers to pick up new value
+ wakeConsumer_.notify();
+ return true;
+ }
+
+ bool readUnlessEmpty(T& out) {
+ if (queue_.read(out)) {
+ // wake producers to fill empty space
+ wakeProducer_.notify();
+ return true;
+ }
+ return false;
+ }
+
+ bool readUnlessClosed(T& out) {
+ while (!queue_.readIfNotEmpty(out)) {
+ auto key = wakeConsumer_.prepareWait();
+ if (!producers()) {
+ // wake producers to fill empty space
+ wakeProducer_.notify();
+ return false;
+ }
+ wakeConsumer_.wait(key);
+ }
+ // wake writers blocked by full queue
+ wakeProducer_.notify();
+ return true;
+ }
+};
+
+template <class Sink>
+class Sub : public Operator<Sub<Sink>> {
+ Sink sink_;
+
+ public:
+ explicit Sub(Sink sink) : sink_(sink) {}
+
+ template <class Value,
+ class Source,
+ class Result =
+ decltype(std::declval<Sink>().compose(std::declval<Source>())),
+ class Just = Just<typename std::decay<Result>::type>>
+ Just compose(const GenImpl<Value, Source>& source) const {
+ return Just(source | sink_);
+ }
+};
+
+template <class Ops>
+class Parallel : public Operator<Parallel<Ops>> {
+ Ops ops_;
+ size_t threads_;
+
+ public:
+ Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
+
+ template <class Input,
+ class Source,
+ class InputDecayed = typename std::decay<Input>::type,
+ class Composed =
+ decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
+ class Output = typename Composed::ValueType,
+ class OutputDecayed = typename std::decay<Output>::type>
+ class Generator : public GenImpl<OutputDecayed&&,
+ Generator<Input,
+ Source,
+ InputDecayed,
+ Composed,
+ Output,
+ OutputDecayed>> {
+ const Source source_;
+ const Ops ops_;
+ const size_t threads_;
+ typedef ClosableMPMCQueue<InputDecayed> InQueue;
+ typedef ClosableMPMCQueue<OutputDecayed> OutQueue;
+
+ class Puller : public GenImpl<InputDecayed&&, Puller> {
+ InQueue* queue_;
+
+ public:
+ explicit Puller(InQueue* queue) : queue_(queue) {}
+
+ template <class Handler>
+ bool apply(Handler&& handler) const {
+ InputDecayed input;
+ while (queue_->readUnlessClosed(input)) {
+ if (!handler(std::move(input))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ template <class Body>
+ void foreach(Body&& body) const {
+ InputDecayed input;
+ while (queue_->readUnlessClosed(input)) {
+ body(std::move(input));
+ }
+ }
+ };
+
+ template <bool all = false>
+ class Pusher : public Operator<Pusher<all>> {
+ OutQueue* queue_;
+
+ public:
+ explicit Pusher(OutQueue* queue) : queue_(queue) {}
+
+ template <class Value, class InnerSource>
+ void compose(const GenImpl<Value, InnerSource>& source) const {
+ if (all) {
+ source.self().foreach([&](Value value) {
+ queue_->writeUnlessClosed(std::forward<Value>(value));
+ });
+ } else {
+ source.self().apply([&](Value value) {
+ return queue_->writeUnlessClosed(std::forward<Value>(value));
+ });
+ }
+ }
+ };
+
+ template <bool all = false>
+ class Executor {
+ InQueue inQueue_;
+ OutQueue outQueue_;
+ Puller puller_;
+ Pusher<all> pusher_;
+ std::vector<std::thread> workers_;
+ const Ops* ops_;
+
+ void work() {
+ puller_ | *ops_ | pusher_;
+ };
+
+ public:
+ Executor(size_t threads, const Ops* ops)
+ : inQueue_(threads * 4),
+ outQueue_(threads * 4),
+ puller_(&inQueue_),
+ pusher_(&outQueue_),
+ ops_(ops) {
+ inQueue_.openProducer();
+ outQueue_.openConsumer();
+ for (int t = 0; t < threads; ++t) {
+ inQueue_.openConsumer();
+ outQueue_.openProducer();
+ workers_.emplace_back([this] {
+ SCOPE_EXIT {
+ inQueue_.closeOutputConsumer();
+ outQueue_.closeInputProducer();
+ };
+ this->work();
+ });
+ }
+ }
+
+ ~Executor() {
+ if (inQueue_.producers()) {
+ inQueue_.closeInputProducer();
+ }
+ if (outQueue_.consumers()) {
+ outQueue_.closeOutputConsumer();
+ }
+ while (!workers_.empty()) {
+ workers_.back().join();
+ workers_.pop_back();
+ }
+ CHECK(!inQueue_.consumers());
+ CHECK(!outQueue_.producers());
+ }
+
+ void closeInputProducer() { inQueue_.closeInputProducer(); }
+
+ void closeOutputConsumer() { outQueue_.closeOutputConsumer(); }
+
+ bool writeUnlessClosed(Input&& input) {
+ return inQueue_.writeUnlessClosed(std::forward<Input>(input));
+ }
+
+ bool writeUnlessFull(Input&& input) {
+ return inQueue_.writeUnlessFull(std::forward<Input>(input));
+ }
+
+ bool readUnlessClosed(OutputDecayed& output) {
+ return outQueue_.readUnlessClosed(output);
+ }
+
+ bool readUnlessEmpty(OutputDecayed& output) {
+ return outQueue_.readUnlessEmpty(output);
+ }
+ };
+
+ public:
+ Generator(Source source, Ops ops, size_t threads)
+ : source_(std::move(source)),
+ ops_(std::move(ops)),
+ threads_(threads
+ ?: std::max<size_t>(1, sysconf(_SC_NPROCESSORS_CONF))) {}
+
+ template <class Handler>
+ bool apply(Handler&& handler) const {
+ Executor<false> executor(threads_, &ops_);
+ bool more = true;
+ source_.apply([&](Input input) {
+ if (executor.writeUnlessFull(std::forward<Input>(input))) {
+ return true;
+ }
+ OutputDecayed output;
+ while (executor.readUnlessEmpty(output)) {
+ if (!handler(std::move(output))) {
+ more = false;
+ return false;
+ }
+ }
+ if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
+ return false;
+ }
+ return true;
+ });
+ executor.closeInputProducer();
+
+ if (more) {
+ OutputDecayed output;
+ while (executor.readUnlessClosed(output)) {
+ if (!handler(std::move(output))) {
+ more = false;
+ break;
+ }
+ }
+ }
+ executor.closeOutputConsumer();
+
+ return more;
+ }
+
+ template <class Body>
+ void foreach(Body&& body) const {
+ Executor<true> executor(threads_, &ops_);
+ source_.foreach([&](Input input) {
+ if (executor.writeUnlessFull(std::forward<Input>(input))) {
+ return;
+ }
+ OutputDecayed output;
+ while (executor.readUnlessEmpty(output)) {
+ body(std::move(output));
+ }
+ CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
+ });
+ executor.closeInputProducer();
+
+ OutputDecayed output;
+ while (executor.readUnlessClosed(output)) {
+ body(std::move(output));
+ }
+ executor.closeOutputConsumer();
+ }
+ };
+
+ template <class Value, class Source>
+ Generator<Value, Source> compose(const GenImpl<Value, Source>& source) const {
+ return Generator<Value, Source>(source.self(), ops_, threads_);
+ }
+
+ template <class Value, class Source>
+ Generator<Value, Source> compose(GenImpl<Value, Source>&& source) const {
+ return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
+ }
+};
+
+/**
+ * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a
+ * maximum chunk size.
+ *
+ * Usually used through the 'chunked' helper, like:
+ *
+ * int n
+ * = chunked(values)
+ * | parallel // each thread processes a chunk
+ * | concat // but can still process values one at a time
+ * | filter(isPrime)
+ * | atomic_count;
+ */
+template <class Iterator>
+class ChunkedRangeSource
+ : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
+ int chunkSize_;
+ Range<Iterator> range_;
+
+ public:
+ ChunkedRangeSource() {}
+ ChunkedRangeSource(int chunkSize, Range<Iterator> range)
+ : chunkSize_(chunkSize), range_(std::move(range)) {}
+
+ template <class Handler>
+ bool apply(Handler&& handler) const {
+ auto remaining = range_;
+ while (!remaining.empty()) {
+ auto chunk = remaining.subpiece(0, chunkSize_);
+ remaining.advance(chunk.size());
+ auto gen = RangeSource<Iterator>(chunk);
+ if (!handler(std::move(gen))) {
+ return false;
+ }
+ }
+ return true;
+ }
+};
+
+} // namespace detail
+
+} // namespace gen
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_PARALLEL_H_
+#define FOLLY_GEN_PARALLEL_H_
+
+#include <mutex>
+
+#include "folly/gen/Base.h"
+
+namespace folly { namespace gen {
+namespace detail {
+
+template <class Ops>
+class Parallel;
+
+template <class Sink>
+class Sub;
+
+template <class Iterator>
+class ChunkedRangeSource;
+
+}
+
+/**
+ * chunked() - For producing values from a container in slices.
+ *
+ * Especially for use with 'parallel()', chunked can be used to process values
+ * from a persistent container in chunks larger than one value at a time. The
+ * values produced are generators for slices of the input container. */
+template <class Container,
+ class Iterator = typename Container::const_iterator,
+ class Chunked = detail::ChunkedRangeSource<Iterator>>
+Chunked chunked(const Container& container, int chunkSize = 256) {
+ return Chunked(chunkSize, folly::range(container.begin(), container.end()));
+}
+
+template <class Container,
+ class Iterator = typename Container::iterator,
+ class Chunked = detail::ChunkedRangeSource<Iterator>>
+Chunked chunked(Container& container, int chunkSize = 256) {
+ return Chunked(chunkSize, folly::range(container.begin(), container.end()));
+}
+
+
+/**
+ * parallel - A parallelization operator.
+ *
+ * 'parallel(ops)' can be used with any generator to process a segment
+ * of the pipeline in parallel. Multiple threads are used to apply the
+ * operations ('ops') to the input sequence, with the resulting sequence
+ * interleaved to be processed on the client thread.
+ *
+ * auto scoredResults
+ * = from(ids)
+ * | parallel(map(fetchObj) | filter(isValid) | map(scoreObj))
+ * | as<vector>();
+ *
+ * Operators specified for parallel execution must yield sequences, not just
+ * individual values. If a sink function such as 'count' is desired, it must be
+ * wrapped in 'sub' to produce a subcount, since any such aggregation must be
+ * re-aggregated.
+ *
+ * auto matches
+ * = from(docs)
+ * | parallel(filter(expensiveTest) | sub(count))
+ * | sum;
+ *
+ * Here, each thread counts its portion of the result, then the sub-counts are
+ * summed up to produce the total count.
+ */
+template <class Ops, class Parallel = detail::Parallel<Ops>>
+Parallel parallel(Ops ops, size_t threads = 0) {
+ return Parallel(std::move(ops), threads);
+}
+
+/**
+ * sub - For sub-summarization of a sequence.
+ *
+ * 'sub' can be used to apply a sink function to a generator, but wrap the
+ * single value in another generator. Note that the sink is eagerly evaluated on
+ * the input sequence.
+ *
+ * auto sum = from(list) | sub(count) | first;
+ *
+ * This is primarily used with 'parallel', as noted above.
+ */
+template <class Sink, class Sub = detail::Sub<Sink>>
+Sub sub(Sink sink) {
+ return Sub(std::move(sink));
+}
+
+}} // !namespace folly::gen
+
+#include "folly/gen/Parallel-inl.h"
+
+#endif /* FOLLY_GEN_PARALLEL_H_ */
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_BENCH_H_
+#define FOLLY_GEN_BENCH_H_
+
+#include "folly/Benchmark.h"
+
+#define BENCH_GEN_IMPL(gen, prefix) \
+static bool FB_ANONYMOUS_VARIABLE(benchGen) = ( \
+ ::folly::addBenchmark(__FILE__, prefix FB_STRINGIZE(gen), \
+ [](unsigned iters){ \
+ while (iters--) { \
+ folly::doNotOptimizeAway(gen); \
+ } \
+ }), true)
+#define BENCH_GEN(gen) BENCH_GEN_IMPL(gen, "")
+#define BENCH_GEN_REL(gen) BENCH_GEN_IMPL(gen, "%")
+
+#endif
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <iostream>
+#include <array>
+#include <vector>
+#include <future>
+
+#include "folly/gen/Base.h"
+#include "folly/gen/Parallel.h"
+#include "folly/gen/test/Bench.h"
+
+
+DEFINE_int32(threads,
+ std::max(1, (int32_t) sysconf(_SC_NPROCESSORS_CONF) / 2),
+ "Num threads.");
+
+using namespace folly::gen;
+using std::vector;
+
+
+constexpr int kFib = 28; // unit of work
+size_t fib(int n) { return n <= 1 ? 1 : fib(n - 1) + fib(n - 2); }
+
+static auto add = [](int a, int b) { return a + b; };
+static auto mod7 = [](int i) { return i % 7; };
+
+static auto isPrimeSlow = [](int n) {
+ if (n < 2) {
+ return false;
+ } else if (n > 2) {
+ for (int d = 3; d * d <= n; d += 2) {
+ if (0 == n % d) {
+ return false;
+ }
+ }
+ }
+ return true;
+};
+
+static auto primes =
+ seq(1, 1 << 20) | filter(isPrimeSlow) | as<vector>();
+
+static auto isPrime = [](int n) {
+ return !(from(primes)
+ | until([&](int d) { return d * d > n; })
+ | filter([&](int d) { return 0 == n % d; })
+ | any);
+};
+
+static auto factors = [](int n) {
+ return from(primes)
+ | until([&](int d) { return d * d > n; })
+ | filter([&](int d) { return 0 == n % d; })
+ | count;
+};
+
+static auto factorsSlow = [](int n) {
+ return from(primes)
+ | filter([&](int d) { return 0 == n % d; })
+ | count;
+};
+
+static auto sleepyWork = [](int i) {
+ const auto sleepyTime = std::chrono::microseconds(100);
+ std::this_thread::sleep_for(sleepyTime);
+ return i;
+};
+
+static auto sleepAndWork = [](int i) {
+ return factorsSlow(i) + sleepyWork(i);
+};
+
+std::mutex block;
+static auto workAndBlock = [](int i) {
+ int r = factorsSlow(i);
+ {
+ std::lock_guard<std::mutex> lock(block);
+ return sleepyWork(i) + r;
+ }
+};
+
+auto start = 1 << 20;
+auto v = seq(start) | take(1 << 20) | as<vector>();
+auto small = from(v) | take(1 << 12);
+auto medium = from(v) | take(1 << 14);
+auto large = from(v) | take(1 << 18);
+auto huge = from(v);
+auto chunks = chunked(v);
+
+BENCH_GEN(small | map(factorsSlow) | sum);
+BENCH_GEN_REL(small | parallel(map(factorsSlow)) | sum);
+BENCHMARK_DRAW_LINE();
+
+BENCH_GEN(small | map(factors) | sum);
+BENCH_GEN_REL(small | parallel(map(factors)) | sum);
+BENCHMARK_DRAW_LINE();
+
+BENCH_GEN(large | map(factors) | sum);
+BENCH_GEN_REL(large | parallel(map(factors)) | sum);
+BENCHMARK_DRAW_LINE();
+
+auto ch = chunks;
+auto cat = concat;
+BENCH_GEN(huge | filter(isPrime) | count);
+BENCH_GEN_REL(ch | cat | filter(isPrime) | count);
+BENCH_GEN_REL(ch | parallel(cat | filter(isPrime)) | count);
+BENCH_GEN_REL(ch | parallel(cat | filter(isPrime) | sub(count)) | sum);
+BENCHMARK_DRAW_LINE();
+
+BENCH_GEN(small | map(sleepAndWork) | sum);
+BENCH_GEN_REL(small | parallel(map(sleepAndWork)) | sum);
+BENCHMARK_DRAW_LINE();
+
+const int fibs = 1000;
+BENCH_GEN(seq(1, fibs) | map([](int) { return fib(kFib); }) | sum);
+BENCH_GEN_REL(seq(1, fibs) |
+ parallel(map([](int) { return fib(kFib); }) | sub(sum)) | sum);
+BENCH_GEN_REL([] {
+ auto threads = seq(1, int(FLAGS_threads))
+ | map([](int i) {
+ return std::thread([=] {
+ return range((i + 0) * fibs / FLAGS_threads,
+ (i + 1) * fibs / FLAGS_threads) |
+ map([](int) { return fib(kFib); }) | sum;
+ });
+ })
+ | as<vector>();
+ from(threads) | [](std::thread &thread) { thread.join(); };
+ return 1;
+}());
+BENCHMARK_DRAW_LINE();
+
+#if 0
+============================================================================
+folly/gen/test/ParallelBenchmark.cpp relative time/iter iters/s
+============================================================================
+small | map(factorsSlow) | sum 4.59s 217.87m
+small | parallel(map(factorsSlow)) | sum 1588.86% 288.88ms 3.46
+----------------------------------------------------------------------------
+small | map(factors) | sum 9.62ms 103.94
+small | parallel(map(factors)) | sum 89.15% 10.79ms 92.66
+----------------------------------------------------------------------------
+large | map(factors) | sum 650.52ms 1.54
+large | parallel(map(factors)) | sum 53.82% 1.21s 827.41m
+----------------------------------------------------------------------------
+huge | filter(isPrime) | count 295.93ms 3.38
+ch | cat | filter(isPrime) | count 99.76% 296.64ms 3.37
+ch | parallel(cat | filter(isPrime)) | count 142.75% 207.31ms 4.82
+ch | parallel(cat | filter(isPrime) | sub(count 1538.50% 19.24ms 51.99
+----------------------------------------------------------------------------
+small | map(sleepAndWork) | sum 5.37s 186.18m
+small | parallel(map(sleepAndWork)) | sum 1840.38% 291.85ms 3.43
+----------------------------------------------------------------------------
+seq(1, fibs) | map([](int) { return fib(kFib); 1.49s 669.53m
+seq(1, fibs) | parallel(map([](int) { return fi 1698.07% 87.96ms 11.37
+[] { auto threads = seq(1, int(FLAGS_threads)) 1571.16% 95.06ms 10.52
+----------------------------------------------------------------------------
+============================================================================
+#endif
+int main(int argc, char *argv[]) {
+ google::ParseCommandLineFlags(&argc, &argv, true);
+ folly::runBenchmarks();
+ return 0;
+}
--- /dev/null
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <iostream>
+#include <array>
+#include <vector>
+#include "folly/gen/Base.h"
+#include "folly/gen/Parallel.h"
+
+using namespace folly;
+using namespace folly::gen;
+using std::vector;
+
+const auto square = [](int i) { return i * i; };
+const auto even = [](int i) { return 0 == i % 2; };
+static auto sleepyWork = [](int i) {
+ const auto sleepyTime = std::chrono::microseconds(100);
+ std::this_thread::sleep_for(sleepyTime);
+ return i;
+};
+
+static auto isPrime = [](int n) {
+ if (n < 2) {
+ return false;
+ } else if (n > 2) {
+ for (int d = 3; d * d <= n; d += 2) {
+ if (0 == n % d) {
+ return false;
+ }
+ }
+ }
+ return true;
+};
+
+struct {
+ template<class T>
+ std::unique_ptr<T> operator()(T t) const {
+ return std::unique_ptr<T>(new T(std::move(t)));
+ }
+} makeUnique;
+
+static auto primes = seq(1, 1 << 14)
+ | filter(isPrime)
+ | as<vector<size_t>>();
+
+static auto primeFactors = [](int n) {
+ return from(primes)
+ | filter([&](int d) { return 0 == n % d; })
+ | count;
+};
+
+TEST(ParallelTest, Serial) {
+ EXPECT_EQ(
+ seq(1,10) | map(square) | filter(even) | sum,
+ seq(1,10) | parallel(map(square) | filter(even)) | sum);
+}
+
+auto heavyWork = map(primeFactors);
+
+TEST(ParallelTest, ComputeBound64) {
+ int length = 1 << 10;
+ EXPECT_EQ(seq<size_t>(1, length) | heavyWork | sum,
+ seq<size_t>(1, length) | parallel(heavyWork) | sum);
+}
+
+TEST(ParallelTest, Take) {
+ int length = 1 << 18;
+ int limit = 1 << 14;
+ EXPECT_EQ(seq(1, length) | take(limit) | count,
+ seq(1, length) | parallel(heavyWork) | take(limit) | count);
+}
+
+
+TEST(ParallelTest, Unique) {
+ auto uniqued = from(primes) | map(makeUnique) | as<vector>();
+ EXPECT_EQ(primes.size(),
+ from(primes) | parallel(map(makeUnique)) |
+ parallel(dereference | map(makeUnique)) | dereference | count);
+ EXPECT_EQ(2,
+ from(primes) | parallel(map(makeUnique)) |
+ parallel(dereference | map(makeUnique)) | dereference |
+ take(2) | count);
+}
+
+TEST(ParallelTest, PSum) {
+ EXPECT_EQ(from(primes) | map(sleepyWork) | sum,
+ from(primes) | parallel(map(sleepyWork) | sub(sum)) | sum);
+}
+
+int main(int argc, char *argv[]) {
+ testing::InitGoogleTest(&argc, argv);
+ google::ParseCommandLineFlags(&argc, &argv, true);
+ return RUN_ALL_TESTS();
+}