folly: gen: pmap: parallel version of map
authorLucian Grijincu <lucian@fb.com>
Sun, 13 Apr 2014 06:10:11 +0000 (23:10 -0700)
committerSara Golemon <sgolemon@fb.com>
Fri, 18 Apr 2014 19:04:14 +0000 (12:04 -0700)
Summary:
same as map, but runs it's argument in parallel over a number of threads.

@override-unit-failures

Test Plan: added test

Reviewed By: tjackson@fb.com

FB internal diff: D1258364

folly/Optional.h
folly/gen/ParallelMap-inl.h [new file with mode: 0644]
folly/gen/ParallelMap.h [new file with mode: 0644]
folly/gen/test/ParallelMapBenchmark.cpp [new file with mode: 0644]
folly/gen/test/ParallelMapTest.cpp [new file with mode: 0644]

index f85fbf6b600a2e949d985292848cb824727d16e7..e877738db0a85323e881e52919f0ba2dc943d8bb 100644 (file)
@@ -112,15 +112,17 @@ class Optional {
     }
   }
 
-  /* implicit */ Optional(const None&)
+  /* implicit */ Optional(const None&) noexcept
     : hasValue_(false) {
   }
 
-  /* implicit */ Optional(Value&& newValue) {
+  /* implicit */ Optional(Value&& newValue)
+    noexcept(std::is_nothrow_move_constructible<Value>::value) {
     construct(std::move(newValue));
   }
 
-  /* implicit */ Optional(const Value& newValue) {
+  /* implicit */ Optional(const Value& newValue)
+    noexcept(std::is_nothrow_copy_constructible<Value>::value) {
     construct(newValue);
   }
 
diff --git a/folly/gen/ParallelMap-inl.h b/folly/gen/ParallelMap-inl.h
new file mode 100644 (file)
index 0000000..7cc0321
--- /dev/null
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_PARALLELMAP_H
+#error This file may only be included from folly/gen/ParallelMap.h
+#endif
+
+#include <atomic>
+#include <cassert>
+#include <thread>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "folly/MPMCPipeline.h"
+#include "folly/experimental/EventCount.h"
+
+namespace folly { namespace gen { namespace detail {
+
+/**
+ * PMap - Map in parallel (using threads). For producing a sequence of
+ * values by passing each value from a source collection through a
+ * predicate while running the predicate in parallel in different
+ * threads.
+ *
+ * This type is usually used through the 'pmap' helper function:
+ *
+ *   auto squares = seq(1, 10) | pmap(4, fibonacci) | sum;
+ */
+template<class Predicate>
+class PMap : public Operator<PMap<Predicate>> {
+  Predicate pred_;
+  size_t nThreads_;
+ public:
+  PMap() {}
+
+  PMap(Predicate pred, size_t nThreads)
+    : pred_(std::move(pred)),
+      nThreads_(nThreads) { }
+
+  template<class Value,
+           class Source,
+           class Input = typename std::decay<Value>::type,
+           class Output = typename std::decay<
+             typename std::result_of<Predicate(Value)>::type
+             >::type>
+  class Generator :
+    public GenImpl<Output, Generator<Value, Source, Input, Output>> {
+    Source source_;
+    Predicate pred_;
+    const size_t nThreads_;
+
+    class ExecutionPipeline {
+      std::vector<std::thread> workers_;
+      std::atomic<bool> done_{false};
+      const Predicate& pred_;
+      MPMCPipeline<Input, Output> pipeline_;
+      EventCount wake_;
+
+     public:
+      ExecutionPipeline(const Predicate& pred, size_t nThreads)
+        : pred_(pred),
+          pipeline_(nThreads, nThreads) {
+        workers_.reserve(nThreads);
+        for (int i = 0; i < nThreads; i++) {
+          workers_.push_back(std::thread([this] { this->predApplier(); }));
+        }
+      }
+
+      ~ExecutionPipeline() {
+        assert(pipeline_.sizeGuess() == 0);
+        assert(done_.load());
+        for (auto& w : workers_) { w.join(); }
+      }
+
+      void stop() {
+        // prevent workers from consuming more than we produce.
+        done_.store(true, std::memory_order_release);
+        wake_.notifyAll();
+      }
+
+      bool write(Value&& value) {
+        bool wrote = pipeline_.write(std::forward<Value>(value));
+        if (wrote) {
+          wake_.notify();
+        }
+        return wrote;
+      }
+
+      void blockingWrite(Value&& value) {
+        pipeline_.blockingWrite(std::forward<Value>(value));
+        wake_.notify();
+      }
+
+      bool read(Output& out) {
+        return pipeline_.read(out);
+      }
+
+      void blockingRead(Output& out) {
+        pipeline_.blockingRead(out);
+      }
+
+     private:
+      void predApplier() {
+        // Each thread takes a value from the pipeline_, runs the
+        // predicate and enqueues the result. The pipeline preserves
+        // ordering. NOTE: don't use blockingReadStage<0> to read from
+        // the pipeline_ as there may not be any: end-of-data is signaled
+        // separately using done_/wake_.
+        Input in;
+        for (;;) {
+          auto key = wake_.prepareWait();
+
+          typename MPMCPipeline<Input, Output>::template Ticket<0> ticket;
+          if (pipeline_.template readStage<0>(ticket, in)) {
+            wake_.cancelWait();
+            Output out = pred_(std::move(in));
+            pipeline_.template blockingWriteStage<0>(ticket,
+                                                     std::move(out));
+            continue;
+          }
+
+          if (done_.load(std::memory_order_acquire)) {
+            wake_.cancelWait();
+            break;
+          }
+
+          // Not done_, but no items in the queue.
+          wake_.wait(key);
+        }
+      }
+    };
+
+  public:
+    Generator(Source source, const Predicate& pred, size_t nThreads)
+      : source_(std::move(source)),
+        pred_(pred),
+        nThreads_(nThreads ?: sysconf(_SC_NPROCESSORS_ONLN)) {
+    }
+
+    template<class Body>
+    void foreach(Body&& body) const {
+      ExecutionPipeline pipeline(pred_, nThreads_);
+
+      size_t wrote = 0;
+      size_t read = 0;
+      source_.foreach([&](Value value) {
+        if (pipeline.write(std::forward<Value>(value))) {
+          // input queue not yet full, saturate it before we process
+          // anything downstream
+          ++wrote;
+          return;
+        }
+
+        // input queue full; drain ready items from the queue
+        Output out;
+        while (pipeline.read(out)) {
+          ++read;
+          body(std::move(out));
+        }
+
+        // write the value we were going to write before we made room.
+        pipeline.blockingWrite(std::forward<Value>(value));
+        ++wrote;
+      });
+
+      pipeline.stop();
+
+      // flush the output queue
+      while (read < wrote) {
+        Output out;
+        pipeline.blockingRead(out);
+        ++read;
+        body(std::move(out));
+      }
+    }
+
+    template<class Handler>
+    bool apply(Handler&& handler) const {
+      ExecutionPipeline pipeline(pred_, nThreads_);
+
+      size_t wrote = 0;
+      size_t read = 0;
+      bool more = true;
+      source_.apply([&](Value value) {
+        if (pipeline.write(std::forward<Value>(value))) {
+          // input queue not yet full, saturate it before we process
+          // anything downstream
+          ++wrote;
+          return true;
+        }
+
+        // input queue full; drain ready items from the queue
+        Output out;
+        while (pipeline.read(out)) {
+          ++read;
+          if (!handler(std::move(out))) {
+            more = false;
+            return false;
+          }
+        }
+
+        // write the value we were going to write before we made room.
+        pipeline.blockingWrite(std::forward<Value>(value));
+        ++wrote;
+        return true;
+      });
+
+      pipeline.stop();
+
+      // flush the output queue
+      while (read < wrote) {
+        Output out;
+        pipeline.blockingRead(out);
+        ++read;
+        if (more) {
+          more = more && handler(std::move(out));
+        }
+      }
+      return more;
+    }
+
+    static constexpr bool infinite = Source::infinite;
+  };
+
+  template<class Source,
+           class Value,
+           class Gen = Generator<Value, Source>>
+  Gen compose(GenImpl<Value, Source>&& source) const {
+    return Gen(std::move(source.self()), pred_, nThreads_);
+  }
+
+  template<class Source,
+           class Value,
+           class Gen = Generator<Value, Source>>
+  Gen compose(const GenImpl<Value, Source>& source) const {
+    return Gen(source.self(), pred_, nThreads_);
+  }
+};
+
+}}}  // namespaces
diff --git a/folly/gen/ParallelMap.h b/folly/gen/ParallelMap.h
new file mode 100644 (file)
index 0000000..196dfd0
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_GEN_PARALLELMAP_H
+#define FOLLY_GEN_PARALLELMAP_H
+
+#include "folly/gen/Core.h"
+
+namespace folly { namespace gen {
+
+namespace detail {
+
+template<class Predicate>
+class PMap;
+
+}  // namespace detail
+
+/**
+ * Run `pred` in parallel in nThreads. Results are returned in the
+ * same order in which they were retrieved from the source generator
+ * (similar to map).
+ *
+ * NOTE: Only `pred` is run from separate threads; the source
+ *       generator and the rest of the pipeline is executed in the
+ *       caller thread.
+ */
+template<class Predicate,
+         class PMap = detail::PMap<Predicate>>
+  PMap pmap(Predicate pred = Predicate(), size_t nThreads = 0) {
+  return PMap(std::move(pred), nThreads);
+}
+
+}}  // namespaces
+
+#include "folly/gen/ParallelMap-inl.h"
+
+#endif  // FOLLY_GEN_PARALLELMAP_H
diff --git a/folly/gen/test/ParallelMapBenchmark.cpp b/folly/gen/test/ParallelMapBenchmark.cpp
new file mode 100644 (file)
index 0000000..1793b79
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+#include <atomic>
+#include <algorithm>
+#include <thread>
+#include <vector>
+
+#include "folly/Benchmark.h"
+#include "folly/gen/Base.h"
+#include "folly/gen/ParallelMap.h"
+
+using namespace folly::gen;
+
+DEFINE_int32(threads,
+             std::max(1, (int32_t) sysconf(_SC_NPROCESSORS_CONF) / 2),
+             "Num threads.");
+
+constexpr int kFib = 35;  // unit of work
+size_t fib(int n) { return n <= 1 ? 1 : fib(n-1) * fib(n-2); }
+
+BENCHMARK(FibSumMap, n) {
+  auto result =
+    seq(1, (int) n)
+    | map([](int) { return fib(kFib); })
+    | sum;
+  folly::doNotOptimizeAway(result);
+}
+
+BENCHMARK_RELATIVE(FibSumPmap, n) {
+  // Schedule more work: enough so that each worker thread does the
+  // same amount as one FibSumMap.
+  const size_t kNumThreads = FLAGS_threads;
+  auto result =
+    seq(1, (int) (n * kNumThreads))
+    | pmap([](int) { return fib(kFib); }, kNumThreads)
+    | sum;
+  folly::doNotOptimizeAway(result);
+}
+
+BENCHMARK_RELATIVE(FibSumThreads, n) {
+  // Schedule kNumThreads to execute the same code as FibSumMap.
+  const size_t kNumThreads = FLAGS_threads;
+  std::vector<std::thread> workers;
+  workers.reserve(kNumThreads);
+  auto fn = [n] {
+    auto result =
+      seq(1, (int) n)
+      | map([](int) { return fib(kFib); })
+      | sum;
+    folly::doNotOptimizeAway(result);
+  };
+  for (int i = 0; i < kNumThreads; i++) {
+    workers.push_back(std::thread(fn));
+  }
+  for (auto& w : workers) { w.join(); }
+}
+
+/*
+  ============================================================================
+  folly/gen/test/ParallelMapBenchmark.cpp         relative  time/iter  iters/s
+  ============================================================================
+  FibSumMap                                                   41.64ms    24.02
+  FibSumPmap                                        98.38%    42.32ms    23.63
+  FibSumThreads                                     94.48%    44.07ms    22.69
+  ============================================================================
+
+  real0m15.595s
+  user2m47.100s
+  sys0m0.016s
+*/
+
+int main(int argc, char *argv[]) {
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  folly::runBenchmarks();
+  return 0;
+}
diff --git a/folly/gen/test/ParallelMapTest.cpp b/folly/gen/test/ParallelMapTest.cpp
new file mode 100644 (file)
index 0000000..7b29f6c
--- /dev/null
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Memory.h"
+#include "folly/gen/Base.h"
+#include "folly/gen/ParallelMap.h"
+
+using namespace folly;
+using namespace folly::gen;
+
+TEST(Pmap, InfiniteEquivalent) {
+  // apply
+  {
+    auto mapResult
+      = seq(1)
+      | map([](int x) { return x * x; })
+      | until([](int x) { return x > 1000 * 1000; })
+      | as<std::vector<int>>();
+
+    auto pmapResult
+      = seq(1)
+      | pmap([](int x) { return x * x; }, 4)
+      | until([](int x) { return x > 1000 * 1000; })
+      | as<std::vector<int>>();
+
+    EXPECT_EQ(pmapResult, mapResult);
+  }
+
+  // foreach
+  {
+    auto mapResult
+      = seq(1, 10)
+      | map([](int x) { return x * x; })
+      | as<std::vector<int>>();
+
+    auto pmapResult
+      = seq(1, 10)
+      | pmap([](int x) { return x * x; }, 4)
+      | as<std::vector<int>>();
+
+    EXPECT_EQ(pmapResult, mapResult);
+  }
+}
+
+TEST(Pmap, Empty) {
+  // apply
+  {
+    auto mapResult
+      = seq(1)
+      | map([](int x) { return x * x; })
+      | until([](int) { return true; })
+      | as<std::vector<int>>();
+
+    auto pmapResult
+      = seq(1)
+      | pmap([](int x) { return x * x; }, 4)
+      | until([](int) { return true; })
+      | as<std::vector<int>>();
+
+    EXPECT_EQ(mapResult.size(), 0);
+    EXPECT_EQ(pmapResult, mapResult);
+  }
+
+  // foreach
+  {
+    auto mapResult
+      = empty<int>()
+      | map([](int x) { return x * x; })
+      | as<std::vector<int>>();
+
+    auto pmapResult
+      = empty<int>()
+      | pmap([](int x) { return x * x; }, 4)
+      | as<std::vector<int>>();
+
+    EXPECT_EQ(mapResult.size(), 0);
+    EXPECT_EQ(pmapResult, mapResult);
+  }
+}
+
+TEST(Pmap, Rvalues) {
+  // apply
+  {
+    auto mapResult
+      = seq(1)
+      | map([](int x) { return make_unique<int>(x); })
+      | map([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
+      | map([](std::unique_ptr<int> x) { return *x; })
+      | take(1000)
+      | sum;
+
+    auto pmapResult
+      = seq(1)
+      | pmap([](int x) { return make_unique<int>(x); })
+      | pmap([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
+      | pmap([](std::unique_ptr<int> x) { return *x; })
+      | take(1000)
+      | sum;
+
+    EXPECT_EQ(pmapResult, mapResult);
+  }
+
+  // foreach
+  {
+    auto mapResult
+      = seq(1, 1000)
+      | map([](int x) { return make_unique<int>(x); })
+      | map([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
+      | map([](std::unique_ptr<int> x) { return *x; })
+      | sum;
+
+    auto pmapResult
+      = seq(1, 1000)
+      | pmap([](int x) { return make_unique<int>(x); })
+      | pmap([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
+      | pmap([](std::unique_ptr<int> x) { return *x; })
+      | sum;
+
+    EXPECT_EQ(pmapResult, mapResult);
+  }
+}
+
+int main(int argc, char *argv[]) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  return RUN_ALL_TESTS();
+}