Multi-Producer, Multi-Consumer pipeline
authorTudor Bosman <tudorb@fb.com>
Fri, 19 Jul 2013 01:05:51 +0000 (18:05 -0700)
committerSara Golemon <sgolemon@fb.com>
Wed, 28 Aug 2013 21:30:11 +0000 (14:30 -0700)
Summary:
A bunch of MPMCQueues linked together. Stage i produces exactly Ki (default 1)
outputs for each input. Ordering is preserved, even though stages might
produce (intermediate or final) results in parallel and in any order; we do
this by abusing the enqueueing mechanism in MPMCQueue. (Read the code for
details)

Test Plan: test added, more tests to be written before commit

Reviewed By: ngbronson@fb.com

FB internal diff: D892388

folly/MPMCPipeline.h [new file with mode: 0644]
folly/MPMCQueue.h
folly/detail/MPMCPipelineDetail.h [new file with mode: 0644]
folly/test/MPMCPipelineTest.cpp [new file with mode: 0644]

diff --git a/folly/MPMCPipeline.h b/folly/MPMCPipeline.h
new file mode 100644 (file)
index 0000000..3981e24
--- /dev/null
@@ -0,0 +1,285 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "folly/detail/MPMCPipelineDetail.h"
+
+namespace folly {
+
+/**
+ * Helper tag template to use amplification > 1
+ */
+template <class T, size_t Amp> class MPMCPipelineStage;
+
+/**
+ * Multi-Producer, Multi-Consumer pipeline.
+ *
+ * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h).
+ *
+ * At each stage, you may dequeue the results from the previous stage (possibly
+ * from multiple threads) and enqueue results to the next stage. Regardless of
+ * the order of completion, data is delivered to the next stage in the original
+ * order.  Each input is matched with a "ticket" which must be produced
+ * when enqueueing to the next stage.
+ *
+ * A given stage must produce exactly K ("amplification factor", default K=1)
+ * results for every input. This is enforced by requiring that each ticket
+ * is used exactly K times.
+ *
+ * Usage:
+ *
+ * // arguments are queue sizes
+ * MPMCPipeline<int, std::string, int> pipeline(10, 10, 10);
+ *
+ * pipeline.blockingWrite(42);
+ *
+ * {
+ *   int val;
+ *   auto ticket = pipeline.blockingReadStage<0>(val);
+ *   pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val));
+ * }
+ *
+ * {
+ *   std::string val;
+ *   auto ticket = pipeline.blockingReadStage<1>(val);
+ *   int ival = 0;
+ *   try {
+ *     ival = folly::to<int>(val);
+ *   } catch (...) {
+ *     // We must produce exactly 1 output even on exception!
+ *   }
+ *   pipeline.blockingWriteStage<1>(ticket, ival);
+ * }
+ *
+ * int result;
+ * pipeline.blockingRead(result);
+ * // result == 42
+ *
+ * To specify amplification factors greater than 1, use
+ * MPMCPipelineStage<T, amplification> instead of T in the declaration:
+ *
+ * MPMCPipeline<int,
+ *              MPMCPipelineStage<std::string, 2>,
+ *              MPMCPipelineStage<int, 4>>
+ *
+ * declares a two-stage pipeline: the first stage produces 2 strings
+ * for each input int, the second stage produces 4 ints for each input string,
+ * so, overall, the pipeline produces 2*4 = 8 ints for each input int.
+ *
+ * Implementation details: we use N+1 MPMCQueue objects; each intermediate
+ * queue connects two adjacent stages.  The MPMCQueue implementation is abused;
+ * instead of using it as a queue, we insert in the output queue at the
+ * position determined by the input queue's popTicket_.  We guarantee that
+ * all slots are filled (and therefore the queue doesn't freeze) because
+ * we require that each step produces exactly K outputs for every input.
+ */
+template <class In, class... Stages> class MPMCPipeline {
+  typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos;
+  typedef std::tuple<
+             detail::MPMCPipelineStageImpl<In>,
+             detail::MPMCPipelineStageImpl<
+                 typename detail::PipelineStageInfo<Stages>::value_type>...>
+    StageTuple;
+  static constexpr size_t kAmplification =
+    detail::AmplificationProduct<StageInfos>::value;
+
+ public:
+  /**
+   * Ticket, returned by blockingReadStage, must be given back to
+   * blockingWriteStage. Tickets are not thread-safe.
+   */
+  template <size_t Stage>
+  class Ticket {
+   public:
+    ~Ticket() noexcept {
+      CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!";
+    }
+
+#ifndef NDEBUG
+    Ticket() noexcept
+      : owner_(nullptr),
+        remainingUses_(0),
+        value_(0xdeadbeeffaceb00c) {
+    }
+#else
+    Ticket() noexcept : remainingUses_(0) { }
+#endif
+
+    Ticket(Ticket&& other) noexcept
+      :
+#ifndef NDEBUG
+        owner_(other.owner_),
+#endif
+        remainingUses_(other.remainingUses_),
+        value_(other.value_) {
+      other.remainingUses_ = 0;
+#ifndef NDEBUG
+      other.owner_ = nullptr;
+      other.value_ = 0xdeadbeeffaceb00c;
+#endif
+    }
+
+    Ticket& operator=(Ticket&& other) noexcept {
+      if (this != &other) {
+        this->~Ticket();
+        new (this) Ticket(std::move(other));
+      }
+      return *this;
+    }
+
+   private:
+    friend class MPMCPipeline;
+#ifndef NDEBUG
+    MPMCPipeline* owner_;
+#endif
+    size_t remainingUses_;
+    uint64_t value_;
+
+
+    Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept
+      :
+#ifndef NDEBUG
+        owner_(owner),
+#endif
+        remainingUses_(amplification),
+        value_(value * amplification) {
+    }
+
+    uint64_t use(MPMCPipeline* owner) {
+      CHECK_GT(remainingUses_--, 0);
+#ifndef NDEBUG
+      CHECK(owner == owner_);
+#endif
+      return value_++;
+    }
+  };
+
+  /**
+   * Default-construct pipeline. Useful to move-assign later,
+   * just like MPMCQueue, see MPMCQueue.h for more details.
+   */
+  MPMCPipeline() { }
+
+  /**
+   * Construct a pipeline with N+1 queue sizes.
+   */
+  template <class... Sizes>
+  explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) { }
+
+  /**
+   * Push an element into (the first stage of) the pipeline. Blocking.
+   */
+  template <class... Args>
+  void blockingWrite(Args&&... args) {
+    std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...);
+  }
+
+  /**
+   * Try to push an element into (the first stage of) the pipeline.
+   * Non-blocking.
+   */
+  template <class... Args>
+  bool write(Args&&... args) {
+    return std::get<0>(stages_).write(std::forward<Args>(args)...);
+  }
+
+  /**
+   * Read an element for stage Stage and obtain a ticket. Blocking.
+   */
+  template <size_t Stage>
+  Ticket<Stage> blockingReadStage(
+      typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
+    return Ticket<Stage>(
+        this,
+        std::tuple_element<Stage, StageInfos>::type::kAmplification,
+        std::get<Stage>(stages_).blockingRead(elem));
+  }
+
+  /**
+   * Try to read an element for stage Stage and obtain a ticket.
+   * Non-blocking.
+   */
+  template <size_t Stage>
+  bool readStage(
+      Ticket<Stage>& ticket,
+      typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
+    uint64_t tval;
+    if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) {
+      return false;
+    }
+    ticket = Ticket<Stage>(
+        this,
+        std::tuple_element<Stage, StageInfos>::type::kAmplification,
+        tval);
+    return true;
+  }
+
+  /**
+   * Complete an element in stage Stage (pushing it for stage Stage+1).
+   * Blocking.
+   */
+  template <size_t Stage, class... Args>
+  void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) {
+    std::get<Stage+1>(stages_).blockingWriteWithTicket(
+        ticket.use(this),
+        std::forward<Args>(args)...);
+  }
+
+  /**
+   * Pop an element from (the final stage of) the pipeline. Blocking.
+   */
+  void blockingRead(
+      typename std::tuple_element<
+          sizeof...(Stages),
+          StageTuple>::type::value_type& elem) {
+    std::get<sizeof...(Stages)>(stages_).blockingRead(elem);
+  }
+
+  /**
+   * Try to pop an element from (the final stage of) the pipeline.
+   * Non-blocking.
+   */
+  bool read(
+      typename std::tuple_element<
+          sizeof...(Stages),
+          StageTuple>::type::value_type& elem) {
+    return std::get<sizeof...(Stages)>(stages_).read(elem);
+  }
+
+  /**
+   * Estimate queue size, measured as values from the last stage.
+   * (so if the pipeline has an amplification factor > 1, pushing an element
+   * into the first stage will cause sizeGuess() to be == amplification factor)
+   * Elements "in flight" (currently processed as part of a stage, so not
+   * in any queue) are also counted.
+   */
+  ssize_t sizeGuess() const noexcept {
+    return (std::get<0>(stages_).writeCount() * kAmplification -
+            std::get<sizeof...(Stages)>(stages_).readCount());
+  }
+
+ private:
+  StageTuple stages_;
+};
+
+
+}  // namespaces
+
index 58c1d3cef08f3af9e5af1126f2bb6001c226bd4e..b4cdee186810827c31dd5252cfa82e16fb8a7864 100644 (file)
@@ -37,6 +37,8 @@ namespace detail {
 template<typename T, template<typename> class Atom>
 class SingleElementQueue;
 
+template <typename T> class MPMCPipelineStageImpl;
+
 } // namespace detail
 
 /// MPMCQueue<T> is a high-performance bounded concurrent queue that
@@ -83,6 +85,7 @@ template<typename T,
              std::is_nothrow_constructible<T,T&&>::value ||
              folly::IsRelocatable<T>::value>::type>
 class MPMCQueue : boost::noncopyable {
+  friend class detail::MPMCPipelineStageImpl<T>;
  public:
   typedef T value_type;
 
diff --git a/folly/detail/MPMCPipelineDetail.h b/folly/detail/MPMCPipelineDetail.h
new file mode 100644 (file)
index 0000000..20f725a
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "folly/MPMCQueue.h"
+
+namespace folly {
+
+template <class T, class... Stages> class MPMCPipeline;
+
+template <class T, size_t Amp> class MPMCPipelineStage {
+ public:
+  typedef T value_type;
+  static constexpr size_t kAmplification = Amp;
+};
+
+namespace detail {
+
+/**
+ * Helper template to determine value type and amplification whether or not
+ * we use MPMCPipelineStage<>
+ */
+template <class T> struct PipelineStageInfo {
+  static constexpr size_t kAmplification = 1;
+  typedef T value_type;
+};
+
+template <class T, size_t Amp>
+struct PipelineStageInfo<MPMCPipelineStage<T, Amp>> {
+  static constexpr size_t kAmplification = Amp;
+  typedef T value_type;
+};
+
+/**
+ * Wrapper around MPMCQueue (friend) that keeps track of tickets.
+ */
+template <class T>
+class MPMCPipelineStageImpl {
+ public:
+  typedef T value_type;
+  template <class U, class... Stages> friend class MPMCPipeline;
+
+  // Implicit so that MPMCPipeline construction works
+  /* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) { }
+  MPMCPipelineStageImpl() { }
+
+  // only use on first stage, uses queue_.pushTicket_ instead of existing
+  // ticket
+  template <class... Args>
+  void blockingWrite(Args&&... args) noexcept {
+    queue_.blockingWrite(std::forward<Args>(args)...);
+  }
+
+  template <class... Args>
+  bool write(Args&&... args) noexcept {
+    return queue_.write(std::forward<Args>(args)...);
+  }
+
+  template <class... Args>
+  void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept {
+    queue_.enqueueWithTicket(ticket, std::forward<Args>(args)...);
+  }
+
+  uint64_t blockingRead(T& elem) noexcept {
+    uint64_t ticket = queue_.popTicket_++;
+    queue_.dequeueWithTicket(ticket, elem);
+    return ticket;
+  }
+
+  bool read(T& elem) noexcept {  // only use on last stage, won't track ticket
+    return queue_.read(elem);
+  }
+
+  template <class... Args>
+  bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
+    if (queue_.tryObtainReadyPopTicket(ticket)) {
+      queue_.dequeueWithTicket(ticket, elem);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  // See MPMCQueue<T>::writeCount; only works for the first stage
+  uint64_t writeCount() const noexcept {
+    return queue_.writeCount();
+  }
+
+  uint64_t readCount() const noexcept {
+    return queue_.readCount();
+  }
+
+ private:
+  MPMCQueue<T> queue_;
+};
+
+// Product of amplifications of a tuple of PipelineStageInfo<X>
+template <class Tuple> struct AmplificationProduct;
+
+template <> struct AmplificationProduct<std::tuple<>> {
+  static constexpr size_t value = 1;
+};
+
+template <class T, class... Ts>
+struct AmplificationProduct<std::tuple<T, Ts...>> {
+  static constexpr size_t value =
+    T::kAmplification *
+    AmplificationProduct<std::tuple<Ts...>>::value;
+};
+
+}}  // namespaces
+
diff --git a/folly/test/MPMCPipelineTest.cpp b/folly/test/MPMCPipelineTest.cpp
new file mode 100644 (file)
index 0000000..eed9905
--- /dev/null
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/MPMCPipeline.h"
+
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Conv.h"
+
+namespace folly { namespace test {
+
+TEST(MPMCPipeline, Trivial) {
+  MPMCPipeline<int, std::string> a(2, 2);
+  EXPECT_EQ(0, a.sizeGuess());
+  a.blockingWrite(42);
+  EXPECT_EQ(1, a.sizeGuess());
+
+  int val;
+  auto ticket = a.blockingReadStage<0>(val);
+  EXPECT_EQ(42, val);
+  EXPECT_EQ(1, a.sizeGuess());
+
+  a.blockingWriteStage<0>(ticket, "hello world");
+  EXPECT_EQ(1, a.sizeGuess());
+
+  std::string s;
+
+  a.blockingRead(s);
+  EXPECT_EQ("hello world", s);
+  EXPECT_EQ(0, a.sizeGuess());
+}
+
+TEST(MPMCPipeline, TrivialAmplification) {
+  MPMCPipeline<int, MPMCPipelineStage<std::string, 2>> a(2, 2);
+  EXPECT_EQ(0, a.sizeGuess());
+  a.blockingWrite(42);
+  EXPECT_EQ(2, a.sizeGuess());
+
+  int val;
+  auto ticket = a.blockingReadStage<0>(val);
+  EXPECT_EQ(42, val);
+  EXPECT_EQ(2, a.sizeGuess());
+
+  a.blockingWriteStage<0>(ticket, "hello world");
+  EXPECT_EQ(2, a.sizeGuess());
+  a.blockingWriteStage<0>(ticket, "goodbye");
+  EXPECT_EQ(2, a.sizeGuess());
+
+  std::string s;
+
+  a.blockingRead(s);
+  EXPECT_EQ("hello world", s);
+  EXPECT_EQ(1, a.sizeGuess());
+
+  a.blockingRead(s);
+  EXPECT_EQ("goodbye", s);
+  EXPECT_EQ(0, a.sizeGuess());
+}
+
+TEST(MPMCPipeline, MultiThreaded) {
+  constexpr size_t numThreadsPerStage = 6;
+  MPMCPipeline<int, std::string, std::string> a(5, 5, 5);
+
+  std::vector<std::thread> threads;
+  threads.reserve(numThreadsPerStage * 2 + 1);
+  for (size_t i = 0; i < numThreadsPerStage; ++i) {
+    threads.emplace_back([&a, i] () {
+      for (;;) {
+        int val;
+        auto ticket = a.blockingReadStage<0>(val);
+        if (val == -1) {  // stop
+          // We still need to propagate
+          a.blockingWriteStage<0>(ticket, "");
+          break;
+        }
+        a.blockingWriteStage<0>(
+            ticket, folly::to<std::string>(val, " hello"));
+      }
+    });
+  }
+
+  for (size_t i = 0; i < numThreadsPerStage; ++i) {
+    threads.emplace_back([&a, i] () {
+      for (;;) {
+        std::string val;
+        auto ticket = a.blockingReadStage<1>(val);
+        if (val.empty()) {  // stop
+          // We still need to propagate
+          a.blockingWriteStage<1>(ticket, "");
+          break;
+        }
+        a.blockingWriteStage<1>(
+            ticket, folly::to<std::string>(val, " world"));
+      }
+    });
+  }
+
+  std::vector<std::string> results;
+  threads.emplace_back([&a, &results] () {
+    for (;;) {
+      std::string val;
+      a.blockingRead(val);
+      if (val.empty()) {
+        break;
+      }
+      results.push_back(val);
+    }
+  });
+
+  constexpr size_t numValues = 1000;
+  for (size_t i = 0; i < numValues; ++i) {
+    a.blockingWrite(i);
+  }
+  for (size_t i = 0; i < numThreadsPerStage; ++i) {
+    a.blockingWrite(-1);
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  // The consumer thread dequeued the first empty string, there should be
+  // numThreadsPerStage - 1 left.
+  EXPECT_EQ(numThreadsPerStage - 1, a.sizeGuess());
+  for (size_t i = 0; i < numThreadsPerStage - 1; ++i) {
+    std::string val;
+    a.blockingRead(val);
+    EXPECT_TRUE(val.empty());
+  }
+  {
+    std::string tmp;
+    EXPECT_FALSE(a.read(tmp));
+  }
+  EXPECT_EQ(0, a.sizeGuess());
+
+  EXPECT_EQ(numValues, results.size());
+  for (size_t i = 0; i < results.size(); ++i) {
+    EXPECT_EQ(folly::to<std::string>(i, " hello world"), results[i]);
+  }
+}
+
+}}  // namespaces
+
+int main(int argc, char *argv[]) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  return RUN_ALL_TESTS();
+}
+