parallel(pipeline)
authorTom Jackson <tjackson@fb.com>
Sat, 5 Apr 2014 00:58:07 +0000 (17:58 -0700)
committerSara Golemon <sgolemon@fb.com>
Fri, 18 Apr 2014 19:04:15 +0000 (12:04 -0700)
Summary:
Adding `... | parallel(my | pipe | line) | ...` for parallelizing a portion of a generator pipeline.

```lang=cpp
auto factored = from(values)
| parallel(filter(isEven) | map(square) | sub(count))
| sum;
```

Work is divided evenly among a fixed number of threads using a `MPMCQueue`.

Test Plan: Unit tests and benchmarks testing for a variety of workloads and performance characteristics, including sub-linear (blocking) workloads, linear (mostly math) workloads, and superlinear (sleeping) workloads to simulate real-world use.

Reviewed By: lucian@fb.com

FB internal diff: D638551

folly/gen/Base-inl.h
folly/gen/Base.h
folly/gen/Core-inl.h
folly/gen/Parallel-inl.h [new file with mode: 0644]
folly/gen/Parallel.h [new file with mode: 0644]
folly/gen/test/Bench.h [new file with mode: 0644]
folly/gen/test/ParallelBenchmark.cpp [new file with mode: 0644]
folly/gen/test/ParallelTest.cpp [new file with mode: 0644]

index d627ed551a6fbc45bef958f3c93fad08b9fa2d4c..1c2da985d2ee06b155fa26bb8a965ac77becb2df 100644 (file)
@@ -142,6 +142,45 @@ public:
   }
 };
 
+/**
+ * RangeSource - For producing values from a folly::Range. Useful for referring
+ * to a slice of some container.
+ *
+ * This type is primarily used through the 'from' function, like:
+ *
+ *   auto rangeSource = from(folly::range(v.begin(), v.end()));
+ *   auto sum = rangeSource | sum;
+ *
+ * Reminder: Be careful not to invalidate iterators when using ranges like this.
+ */
+template<class Iterator>
+class RangeSource : public GenImpl<typename Range<Iterator>::reference,
+                                   RangeSource<Iterator>> {
+  Range<Iterator> range_;
+ public:
+  RangeSource() {}
+  explicit RangeSource(Range<Iterator> range)
+    : range_(std::move(range))
+  {}
+
+  template<class Handler>
+  bool apply(Handler&& handler) const {
+    for (auto& value : range_) {
+      if (!handler(value)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  template<class Body>
+  void foreach(Body&& body) const {
+    for (auto& value : range_) {
+      body(value);
+    }
+  }
+};
+
 /**
  * Sequence - For generating values from beginning value, incremented along the
  * way with the ++ and += operators. Iteration may continue indefinitely by
@@ -256,8 +295,32 @@ class Yield : public GenImpl<Value, Yield<Value, Source>> {
 template<class Value>
 class Empty : public GenImpl<Value, Empty<Value>> {
  public:
-  template<class Handler>
-  bool apply(Handler&&) const { return true; }
+  template <class Handler>
+  bool apply(Handler&&) const {
+    return true;
+  }
+
+  template <class Body>
+  void foreach(Body&&) const {}
+};
+
+template<class Value>
+class Just : public GenImpl<const Value&, Just<Value>> {
+  static_assert(!std::is_reference<Value>::value,
+                "Just requires non-ref types");
+  const Value value_;
+ public:
+  Just(Value value) : value_(std::forward<Value>(value)) {}
+
+  template <class Handler>
+  bool apply(Handler&& handler) const {
+    return handler(value_);
+  }
+
+  template <class Body>
+  void foreach(Body&& body) const {
+    body(value_);
+  }
 };
 
 /*
@@ -879,6 +942,25 @@ class Distinct : public Operator<Distinct<Selector>> {
   }
 };
 
+/**
+ * Composer - Helper class for adapting pipelines into functors. Primarily used
+ * for 'mapOp'.
+ */
+template<class Operators>
+class Composer {
+  Operators op_;
+ public:
+  explicit Composer(Operators op)
+    : op_(std::move(op)) {}
+
+  template<class Source,
+           class Ret = decltype(std::declval<Operators>()
+                                  .compose(std::declval<Source>()))>
+  Ret operator()(Source&& source) const {
+    return op_.compose(std::forward<Source>(source));
+  }
+};
+
 /**
  * Batch - For producing fixed-size batches of each value from a source.
  *
index 3b6cb7cac22470b1fbdc4fcfa71981b5d20bea4b..06cc0e3c89e1f786e9987a67b17e8f1b6d25b3a7 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2013 Facebook, Inc.
+ * Copyright 2014 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 #ifndef FOLLY_GEN_BASE_H
 #define FOLLY_GEN_BASE_H
 
@@ -264,6 +265,8 @@ class Yield;
 template<class Value>
 class Empty;
 
+template<class Value>
+class Just;
 
 /*
  * Operators
@@ -290,6 +293,9 @@ class Order;
 template<class Selector>
 class Distinct;
 
+template<class Operators>
+class Composer;
+
 template<class Expected>
 class TypeAssertion;
 
@@ -431,6 +437,11 @@ detail::Empty<Value> empty() {
   return {};
 }
 
+template<class Value>
+detail::Just<Value> just(Value value) {
+  return detail::Just<Value>(std::move(value));
+}
+
 /*
  * Operator Factories
  */
@@ -446,6 +457,21 @@ Map map(Predicate pred = Predicate()) {
   return Map(std::move(pred));
 }
 
+/**
+ * mapOp - Given a generator of generators, maps the application of the given
+ * operator on to each inner gen. Especially useful in aggregating nested data
+ * structures:
+ *
+ *   chunked(samples, 256)
+ *     | mapOp(filter(sampleTest) | count)
+ *     | sum;
+ */
+template<class Operator,
+         class Map = detail::Map<detail::Composer<Operator>>>
+Map mapOp(Operator op) {
+  return Map(detail::Composer<Operator>(std::move(op)));
+}
+
 /*
  * member(...) - For extracting a member from each value.
  *
index 9808487f8565069c571c78e3524553aeb8bce314..973928a601225abe2424e94a4441d412ade3f76f 100644 (file)
@@ -90,8 +90,10 @@ class Operator : public FBounded<Self> {
 
  protected:
   Operator() = default;
-  Operator(const Operator&) = default;
   Operator(Operator&&) = default;
+  Operator(const Operator&) = default;
+  Operator& operator=(Operator&&) = default;
+  Operator& operator=(const Operator&) = default;
 };
 
 /**
@@ -142,8 +144,10 @@ class GenImpl : public FBounded<Self> {
  protected:
   // To prevent slicing
   GenImpl() = default;
-  GenImpl(const GenImpl&) = default;
   GenImpl(GenImpl&&) = default;
+  GenImpl(const GenImpl&) = default;
+  GenImpl& operator=(GenImpl&&) = default;
+  GenImpl& operator=(const GenImpl&) = default;
 
  public:
   typedef Value ValueType;
diff --git a/folly/gen/Parallel-inl.h b/folly/gen/Parallel-inl.h
new file mode 100644 (file)
index 0000000..b4866a1
--- /dev/null
@@ -0,0 +1,410 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_PARALLEL_H_
+#error This file may only be included from folly/gen/ParallelGen.h
+#endif
+
+#include "folly/MPMCQueue.h"
+#include "folly/ScopeGuard.h"
+#include "folly/experimental/EventCount.h"
+#include <atomic>
+#include <thread>
+#include <vector>
+
+namespace folly {
+namespace gen {
+namespace detail {
+
+template <typename T>
+class ClosableMPMCQueue {
+  MPMCQueue<T> queue_;
+  std::atomic<size_t> producers_{0};
+  std::atomic<size_t> consumers_{0};
+  folly::EventCount wakeProducer_;
+  folly::EventCount wakeConsumer_;
+
+ public:
+  explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
+
+  ~ClosableMPMCQueue() {
+    CHECK(!producers());
+    CHECK(!consumers());
+  }
+
+  void openProducer() { ++producers_; }
+  void openConsumer() { ++consumers_; }
+
+  void closeInputProducer() {
+    int64_t producers = producers_--;
+    CHECK(producers);
+    if (producers == 1) { // last producer
+      wakeConsumer_.notifyAll();
+    }
+  }
+
+  void closeOutputConsumer() {
+    int64_t consumers = consumers_--;
+    CHECK(consumers);
+    if (consumers == 1) { // last consumer
+      wakeProducer_.notifyAll();
+    }
+  }
+
+  size_t producers() const {
+    return producers_.load(std::memory_order_acquire);
+  }
+
+  size_t consumers() const {
+    return consumers_.load(std::memory_order_acquire);
+  }
+
+  template <typename... Args>
+  bool writeUnlessFull(Args&&... args) noexcept {
+    if (queue_.write(std::forward<Args>(args)...)) {
+      // wake consumers to pick up new value
+      wakeConsumer_.notify();
+      return true;
+    }
+    return false;
+  }
+
+  template <typename... Args>
+  bool writeUnlessClosed(Args&&... args) {
+    // write if there's room
+    while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
+      // if write fails, check if there are still consumers listening
+      auto key = wakeProducer_.prepareWait();
+      if (!consumers()) {
+        // no consumers left; bail out
+        wakeProducer_.cancelWait();
+        return false;
+      }
+      wakeProducer_.wait(key);
+    }
+    // wake consumers to pick up new value
+    wakeConsumer_.notify();
+    return true;
+  }
+
+  bool readUnlessEmpty(T& out) {
+    if (queue_.read(out)) {
+      // wake producers to fill empty space
+      wakeProducer_.notify();
+      return true;
+    }
+    return false;
+  }
+
+  bool readUnlessClosed(T& out) {
+    while (!queue_.readIfNotEmpty(out)) {
+      auto key = wakeConsumer_.prepareWait();
+      if (!producers()) {
+        // wake producers to fill empty space
+        wakeProducer_.notify();
+        return false;
+      }
+      wakeConsumer_.wait(key);
+    }
+    // wake writers blocked by full queue
+    wakeProducer_.notify();
+    return true;
+  }
+};
+
+template <class Sink>
+class Sub : public Operator<Sub<Sink>> {
+  Sink sink_;
+
+ public:
+  explicit Sub(Sink sink) : sink_(sink) {}
+
+  template <class Value,
+            class Source,
+            class Result =
+                decltype(std::declval<Sink>().compose(std::declval<Source>())),
+            class Just = Just<typename std::decay<Result>::type>>
+  Just compose(const GenImpl<Value, Source>& source) const {
+    return Just(source | sink_);
+  }
+};
+
+template <class Ops>
+class Parallel : public Operator<Parallel<Ops>> {
+  Ops ops_;
+  size_t threads_;
+
+ public:
+  Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
+
+  template <class Input,
+            class Source,
+            class InputDecayed = typename std::decay<Input>::type,
+            class Composed =
+                decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
+            class Output = typename Composed::ValueType,
+            class OutputDecayed = typename std::decay<Output>::type>
+  class Generator : public GenImpl<OutputDecayed&&,
+                                   Generator<Input,
+                                             Source,
+                                             InputDecayed,
+                                             Composed,
+                                             Output,
+                                             OutputDecayed>> {
+    const Source source_;
+    const Ops ops_;
+    const size_t threads_;
+    typedef ClosableMPMCQueue<InputDecayed> InQueue;
+    typedef ClosableMPMCQueue<OutputDecayed> OutQueue;
+
+    class Puller : public GenImpl<InputDecayed&&, Puller> {
+      InQueue* queue_;
+
+     public:
+      explicit Puller(InQueue* queue) : queue_(queue) {}
+
+      template <class Handler>
+      bool apply(Handler&& handler) const {
+        InputDecayed input;
+        while (queue_->readUnlessClosed(input)) {
+          if (!handler(std::move(input))) {
+            return false;
+          }
+        }
+        return true;
+      }
+
+      template <class Body>
+      void foreach(Body&& body) const {
+        InputDecayed input;
+        while (queue_->readUnlessClosed(input)) {
+          body(std::move(input));
+        }
+      }
+    };
+
+    template <bool all = false>
+    class Pusher : public Operator<Pusher<all>> {
+      OutQueue* queue_;
+
+     public:
+      explicit Pusher(OutQueue* queue) : queue_(queue) {}
+
+      template <class Value, class InnerSource>
+      void compose(const GenImpl<Value, InnerSource>& source) const {
+        if (all) {
+          source.self().foreach([&](Value value) {
+            queue_->writeUnlessClosed(std::forward<Value>(value));
+          });
+        } else {
+          source.self().apply([&](Value value) {
+            return queue_->writeUnlessClosed(std::forward<Value>(value));
+          });
+        }
+      }
+    };
+
+    template <bool all = false>
+    class Executor {
+      InQueue inQueue_;
+      OutQueue outQueue_;
+      Puller puller_;
+      Pusher<all> pusher_;
+      std::vector<std::thread> workers_;
+      const Ops* ops_;
+
+      void work() {
+        puller_ | *ops_ | pusher_;
+      };
+
+     public:
+      Executor(size_t threads, const Ops* ops)
+          : inQueue_(threads * 4),
+            outQueue_(threads * 4),
+            puller_(&inQueue_),
+            pusher_(&outQueue_),
+            ops_(ops) {
+        inQueue_.openProducer();
+        outQueue_.openConsumer();
+        for (int t = 0; t < threads; ++t) {
+          inQueue_.openConsumer();
+          outQueue_.openProducer();
+          workers_.emplace_back([this] {
+            SCOPE_EXIT {
+              inQueue_.closeOutputConsumer();
+              outQueue_.closeInputProducer();
+            };
+            this->work();
+          });
+        }
+      }
+
+      ~Executor() {
+        if (inQueue_.producers()) {
+          inQueue_.closeInputProducer();
+        }
+        if (outQueue_.consumers()) {
+          outQueue_.closeOutputConsumer();
+        }
+        while (!workers_.empty()) {
+          workers_.back().join();
+          workers_.pop_back();
+        }
+        CHECK(!inQueue_.consumers());
+        CHECK(!outQueue_.producers());
+      }
+
+      void closeInputProducer() { inQueue_.closeInputProducer(); }
+
+      void closeOutputConsumer() { outQueue_.closeOutputConsumer(); }
+
+      bool writeUnlessClosed(Input&& input) {
+        return inQueue_.writeUnlessClosed(std::forward<Input>(input));
+      }
+
+      bool writeUnlessFull(Input&& input) {
+        return inQueue_.writeUnlessFull(std::forward<Input>(input));
+      }
+
+      bool readUnlessClosed(OutputDecayed& output) {
+        return outQueue_.readUnlessClosed(output);
+      }
+
+      bool readUnlessEmpty(OutputDecayed& output) {
+        return outQueue_.readUnlessEmpty(output);
+      }
+    };
+
+   public:
+    Generator(Source source, Ops ops, size_t threads)
+        : source_(std::move(source)),
+          ops_(std::move(ops)),
+          threads_(threads
+                       ?: std::max<size_t>(1, sysconf(_SC_NPROCESSORS_CONF))) {}
+
+    template <class Handler>
+    bool apply(Handler&& handler) const {
+      Executor<false> executor(threads_, &ops_);
+      bool more = true;
+      source_.apply([&](Input input) {
+        if (executor.writeUnlessFull(std::forward<Input>(input))) {
+          return true;
+        }
+        OutputDecayed output;
+        while (executor.readUnlessEmpty(output)) {
+          if (!handler(std::move(output))) {
+            more = false;
+            return false;
+          }
+        }
+        if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
+          return false;
+        }
+        return true;
+      });
+      executor.closeInputProducer();
+
+      if (more) {
+        OutputDecayed output;
+        while (executor.readUnlessClosed(output)) {
+          if (!handler(std::move(output))) {
+            more = false;
+            break;
+          }
+        }
+      }
+      executor.closeOutputConsumer();
+
+      return more;
+    }
+
+    template <class Body>
+    void foreach(Body&& body) const {
+      Executor<true> executor(threads_, &ops_);
+      source_.foreach([&](Input input) {
+        if (executor.writeUnlessFull(std::forward<Input>(input))) {
+          return;
+        }
+        OutputDecayed output;
+        while (executor.readUnlessEmpty(output)) {
+          body(std::move(output));
+        }
+        CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
+      });
+      executor.closeInputProducer();
+
+      OutputDecayed output;
+      while (executor.readUnlessClosed(output)) {
+        body(std::move(output));
+      }
+      executor.closeOutputConsumer();
+    }
+  };
+
+  template <class Value, class Source>
+  Generator<Value, Source> compose(const GenImpl<Value, Source>& source) const {
+    return Generator<Value, Source>(source.self(), ops_, threads_);
+  }
+
+  template <class Value, class Source>
+  Generator<Value, Source> compose(GenImpl<Value, Source>&& source) const {
+    return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
+  }
+};
+
+/**
+ * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a
+ * maximum chunk size.
+ *
+ * Usually used through the 'chunked' helper, like:
+ *
+ *   int n
+ *     = chunked(values)
+ *     | parallel  // each thread processes a chunk
+ *     | concat   // but can still process values one at a time
+ *     | filter(isPrime)
+ *     | atomic_count;
+ */
+template <class Iterator>
+class ChunkedRangeSource
+    : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
+  int chunkSize_;
+  Range<Iterator> range_;
+
+ public:
+  ChunkedRangeSource() {}
+  ChunkedRangeSource(int chunkSize, Range<Iterator> range)
+      : chunkSize_(chunkSize), range_(std::move(range)) {}
+
+  template <class Handler>
+  bool apply(Handler&& handler) const {
+    auto remaining = range_;
+    while (!remaining.empty()) {
+      auto chunk = remaining.subpiece(0, chunkSize_);
+      remaining.advance(chunk.size());
+      auto gen = RangeSource<Iterator>(chunk);
+      if (!handler(std::move(gen))) {
+        return false;
+      }
+    }
+    return true;
+  }
+};
+
+} // namespace detail
+
+} // namespace gen
+} // namespace folly
diff --git a/folly/gen/Parallel.h b/folly/gen/Parallel.h
new file mode 100644 (file)
index 0000000..d1b9979
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_PARALLEL_H_
+#define FOLLY_GEN_PARALLEL_H_
+
+#include <mutex>
+
+#include "folly/gen/Base.h"
+
+namespace folly { namespace gen {
+namespace detail {
+
+template <class Ops>
+class Parallel;
+
+template <class Sink>
+class Sub;
+
+template <class Iterator>
+class ChunkedRangeSource;
+
+}
+
+/**
+ * chunked() - For producing values from a container in slices.
+ *
+ * Especially for use with 'parallel()', chunked can be used to process values
+ * from a persistent container in chunks larger than one value at a time. The
+ * values produced are generators for slices of the input container. */
+template <class Container,
+          class Iterator = typename Container::const_iterator,
+          class Chunked = detail::ChunkedRangeSource<Iterator>>
+Chunked chunked(const Container& container, int chunkSize = 256) {
+  return Chunked(chunkSize, folly::range(container.begin(), container.end()));
+}
+
+template <class Container,
+          class Iterator = typename Container::iterator,
+          class Chunked = detail::ChunkedRangeSource<Iterator>>
+Chunked chunked(Container& container, int chunkSize = 256) {
+  return Chunked(chunkSize, folly::range(container.begin(), container.end()));
+}
+
+
+/**
+ * parallel - A parallelization operator.
+ *
+ * 'parallel(ops)' can be used with any generator to process a segment
+ * of the pipeline in parallel. Multiple threads are used to apply the
+ * operations ('ops') to the input sequence, with the resulting sequence
+ * interleaved to be processed on the client thread.
+ *
+ *   auto scoredResults
+ *     = from(ids)
+ *     | parallel(map(fetchObj) | filter(isValid) | map(scoreObj))
+ *     | as<vector>();
+ *
+ * Operators specified for parallel execution must yield sequences, not just
+ * individual values. If a sink function such as 'count' is desired, it must be
+ * wrapped in 'sub' to produce a subcount, since any such aggregation must be
+ * re-aggregated.
+ *
+ *   auto matches
+ *     = from(docs)
+ *     | parallel(filter(expensiveTest) | sub(count))
+ *     | sum;
+ *
+ * Here, each thread counts its portion of the result, then the sub-counts are
+ * summed up to produce the total count.
+ */
+template <class Ops, class Parallel = detail::Parallel<Ops>>
+Parallel parallel(Ops ops, size_t threads = 0) {
+  return Parallel(std::move(ops), threads);
+}
+
+/**
+ * sub - For sub-summarization of a sequence.
+ *
+ * 'sub' can be used to apply a sink function to a generator, but wrap the
+ * single value in another generator. Note that the sink is eagerly evaluated on
+ * the input sequence.
+ *
+ *   auto sum = from(list) | sub(count) | first;
+ *
+ * This is primarily used with 'parallel', as noted above.
+ */
+template <class Sink, class Sub = detail::Sub<Sink>>
+Sub sub(Sink sink) {
+  return Sub(std::move(sink));
+}
+
+}} // !namespace folly::gen
+
+#include "folly/gen/Parallel-inl.h"
+
+#endif /* FOLLY_GEN_PARALLEL_H_ */
diff --git a/folly/gen/test/Bench.h b/folly/gen/test/Bench.h
new file mode 100644 (file)
index 0000000..be91357
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_BENCH_H_
+#define FOLLY_GEN_BENCH_H_
+
+#include "folly/Benchmark.h"
+
+#define BENCH_GEN_IMPL(gen, prefix)                         \
+static bool FB_ANONYMOUS_VARIABLE(benchGen) = (             \
+  ::folly::addBenchmark(__FILE__, prefix FB_STRINGIZE(gen), \
+    [](unsigned iters){                                     \
+      while (iters--) {                                     \
+        folly::doNotOptimizeAway(gen);                      \
+      }                                                     \
+    }), true)
+#define BENCH_GEN(gen) BENCH_GEN_IMPL(gen, "")
+#define BENCH_GEN_REL(gen) BENCH_GEN_IMPL(gen, "%")
+
+#endif
diff --git a/folly/gen/test/ParallelBenchmark.cpp b/folly/gen/test/ParallelBenchmark.cpp
new file mode 100644 (file)
index 0000000..afebbc8
--- /dev/null
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <iostream>
+#include <array>
+#include <vector>
+#include <future>
+
+#include "folly/gen/Base.h"
+#include "folly/gen/Parallel.h"
+#include "folly/gen/test/Bench.h"
+
+
+DEFINE_int32(threads,
+             std::max(1, (int32_t) sysconf(_SC_NPROCESSORS_CONF) / 2),
+             "Num threads.");
+
+using namespace folly::gen;
+using std::vector;
+
+
+constexpr int kFib = 28;  // unit of work
+size_t fib(int n) { return n <= 1 ? 1 : fib(n - 1) + fib(n - 2); }
+
+static auto add = [](int a, int b) { return a + b; };
+static auto mod7 = [](int i) { return i % 7; };
+
+static auto isPrimeSlow = [](int n) {
+  if (n < 2) {
+    return false;
+  } else if (n > 2) {
+    for (int d = 3; d * d <= n; d += 2) {
+      if (0 == n % d) {
+        return false;
+      }
+    }
+  }
+  return true;
+};
+
+static auto primes =
+    seq(1, 1 << 20) | filter(isPrimeSlow) | as<vector>();
+
+static auto isPrime = [](int n) {
+  return !(from(primes)
+         | until([&](int d) { return d * d > n; })
+         | filter([&](int d) { return 0 == n % d; })
+         | any);
+};
+
+static auto factors = [](int n) {
+  return from(primes)
+       | until([&](int d) { return d * d > n; })
+       | filter([&](int d) { return 0 == n % d; })
+       | count;
+};
+
+static auto factorsSlow = [](int n) {
+  return from(primes)
+       | filter([&](int d) { return 0 == n % d; })
+       | count;
+};
+
+static auto sleepyWork = [](int i) {
+  const auto sleepyTime = std::chrono::microseconds(100);
+  std::this_thread::sleep_for(sleepyTime);
+  return i;
+};
+
+static auto sleepAndWork = [](int i) {
+  return factorsSlow(i) + sleepyWork(i);
+};
+
+std::mutex block;
+static auto workAndBlock = [](int i) {
+  int r = factorsSlow(i);
+  {
+    std::lock_guard<std::mutex> lock(block);
+    return sleepyWork(i) + r;
+  }
+};
+
+auto start = 1 << 20;
+auto v = seq(start) | take(1 << 20) | as<vector>();
+auto small = from(v) | take(1 << 12);
+auto medium = from(v) | take(1 << 14);
+auto large = from(v) | take(1 << 18);
+auto huge = from(v);
+auto chunks = chunked(v);
+
+BENCH_GEN(small | map(factorsSlow) | sum);
+BENCH_GEN_REL(small | parallel(map(factorsSlow)) | sum);
+BENCHMARK_DRAW_LINE();
+
+BENCH_GEN(small | map(factors) | sum);
+BENCH_GEN_REL(small | parallel(map(factors)) | sum);
+BENCHMARK_DRAW_LINE();
+
+BENCH_GEN(large | map(factors) | sum);
+BENCH_GEN_REL(large | parallel(map(factors)) | sum);
+BENCHMARK_DRAW_LINE();
+
+auto ch = chunks;
+auto cat = concat;
+BENCH_GEN(huge | filter(isPrime) | count);
+BENCH_GEN_REL(ch | cat | filter(isPrime) | count);
+BENCH_GEN_REL(ch | parallel(cat | filter(isPrime)) | count);
+BENCH_GEN_REL(ch | parallel(cat | filter(isPrime) | sub(count)) | sum);
+BENCHMARK_DRAW_LINE();
+
+BENCH_GEN(small | map(sleepAndWork) | sum);
+BENCH_GEN_REL(small | parallel(map(sleepAndWork)) | sum);
+BENCHMARK_DRAW_LINE();
+
+const int fibs = 1000;
+BENCH_GEN(seq(1, fibs) | map([](int) { return fib(kFib); }) | sum);
+BENCH_GEN_REL(seq(1, fibs) |
+              parallel(map([](int) { return fib(kFib); }) | sub(sum)) | sum);
+BENCH_GEN_REL([] {
+  auto threads = seq(1, int(FLAGS_threads))
+               | map([](int i) {
+                   return std::thread([=] {
+                     return range((i + 0) * fibs / FLAGS_threads,
+                                  (i + 1) * fibs / FLAGS_threads) |
+                            map([](int) { return fib(kFib); }) | sum;
+                   });
+                 })
+               | as<vector>();
+  from(threads) | [](std::thread &thread) { thread.join(); };
+  return 1;
+}());
+BENCHMARK_DRAW_LINE();
+
+#if 0
+============================================================================
+folly/gen/test/ParallelBenchmark.cpp            relative  time/iter  iters/s
+============================================================================
+small | map(factorsSlow) | sum                                4.59s  217.87m
+small | parallel(map(factorsSlow)) | sum        1588.86%   288.88ms     3.46
+----------------------------------------------------------------------------
+small | map(factors) | sum                                   9.62ms   103.94
+small | parallel(map(factors)) | sum              89.15%    10.79ms    92.66
+----------------------------------------------------------------------------
+large | map(factors) | sum                                 650.52ms     1.54
+large | parallel(map(factors)) | sum              53.82%      1.21s  827.41m
+----------------------------------------------------------------------------
+huge | filter(isPrime) | count                             295.93ms     3.38
+ch | cat | filter(isPrime) | count                99.76%   296.64ms     3.37
+ch | parallel(cat | filter(isPrime)) | count     142.75%   207.31ms     4.82
+ch | parallel(cat | filter(isPrime) | sub(count 1538.50%    19.24ms    51.99
+----------------------------------------------------------------------------
+small | map(sleepAndWork) | sum                               5.37s  186.18m
+small | parallel(map(sleepAndWork)) | sum       1840.38%   291.85ms     3.43
+----------------------------------------------------------------------------
+seq(1, fibs) | map([](int) { return fib(kFib);                1.49s  669.53m
+seq(1, fibs) | parallel(map([](int) { return fi 1698.07%    87.96ms    11.37
+[] { auto threads = seq(1, int(FLAGS_threads))  1571.16%    95.06ms    10.52
+----------------------------------------------------------------------------
+============================================================================
+#endif
+int main(int argc, char *argv[]) {
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  folly::runBenchmarks();
+  return 0;
+}
diff --git a/folly/gen/test/ParallelTest.cpp b/folly/gen/test/ParallelTest.cpp
new file mode 100644 (file)
index 0000000..7eabb31
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <iostream>
+#include <array>
+#include <vector>
+#include "folly/gen/Base.h"
+#include "folly/gen/Parallel.h"
+
+using namespace folly;
+using namespace folly::gen;
+using std::vector;
+
+const auto square = [](int i) { return i * i; };
+const auto even = [](int i) { return 0 == i % 2; };
+static auto sleepyWork = [](int i) {
+  const auto sleepyTime = std::chrono::microseconds(100);
+  std::this_thread::sleep_for(sleepyTime);
+  return i;
+};
+
+static auto isPrime = [](int n) {
+  if (n < 2) {
+    return false;
+  } else if (n > 2) {
+    for (int d = 3; d * d <= n; d += 2) {
+      if (0 == n % d) {
+        return false;
+      }
+    }
+  }
+  return true;
+};
+
+struct {
+  template<class T>
+  std::unique_ptr<T> operator()(T t) const {
+    return std::unique_ptr<T>(new T(std::move(t)));
+  }
+} makeUnique;
+
+static auto primes = seq(1, 1 << 14)
+                   | filter(isPrime)
+                   | as<vector<size_t>>();
+
+static auto primeFactors = [](int n) {
+  return from(primes)
+       | filter([&](int d) { return 0 == n % d; })
+       | count;
+};
+
+TEST(ParallelTest, Serial) {
+  EXPECT_EQ(
+            seq(1,10) | map(square) | filter(even) | sum,
+            seq(1,10) | parallel(map(square) | filter(even)) | sum);
+}
+
+auto heavyWork = map(primeFactors);
+
+TEST(ParallelTest, ComputeBound64) {
+  int length = 1 << 10;
+  EXPECT_EQ(seq<size_t>(1, length) | heavyWork | sum,
+            seq<size_t>(1, length) | parallel(heavyWork) | sum);
+}
+
+TEST(ParallelTest, Take) {
+  int length = 1 << 18;
+  int limit = 1 << 14;
+  EXPECT_EQ(seq(1, length) | take(limit) | count,
+            seq(1, length) | parallel(heavyWork) | take(limit) | count);
+}
+
+
+TEST(ParallelTest, Unique) {
+  auto uniqued = from(primes) | map(makeUnique) | as<vector>();
+  EXPECT_EQ(primes.size(),
+            from(primes) | parallel(map(makeUnique)) |
+                parallel(dereference | map(makeUnique)) | dereference | count);
+  EXPECT_EQ(2,
+            from(primes) | parallel(map(makeUnique)) |
+                parallel(dereference | map(makeUnique)) | dereference |
+                take(2) | count);
+}
+
+TEST(ParallelTest, PSum) {
+  EXPECT_EQ(from(primes) | map(sleepyWork) | sum,
+            from(primes) | parallel(map(sleepyWork) | sub(sum)) | sum);
+}
+
+int main(int argc, char *argv[]) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  return RUN_ALL_TESTS();
+}