folly::Future-istic barrier
authorTudor Bosman <tudorb@fb.com>
Thu, 16 Jul 2015 16:50:03 +0000 (09:50 -0700)
committerSara Golemon <sgolemon@fb.com>
Mon, 20 Jul 2015 19:26:30 +0000 (12:26 -0700)
Summary: What it says on the tin.

Reviewed By: @fugalh

Differential Revision: D2230390

folly/Makefile.am
folly/futures/Barrier.cpp [new file with mode: 0644]
folly/futures/Barrier.h [new file with mode: 0644]
folly/futures/test/BarrierTest.cpp [new file with mode: 0644]

index 43f595dd129a6918924f109569e5c560ee9be43e..a516449d5e7d33ce58af6fe4b3e1a0b437de5571 100644 (file)
@@ -128,6 +128,7 @@ nobase_follyinclude_HEADERS = \
        FormatTraits.h \
        Format.h \
        Format-inl.h \
+       futures/Barrier.h \
        futures/Deprecated.h \
        futures/ThreadedExecutor.h \
        futures/DrivableExecutor.h \
@@ -311,6 +312,7 @@ libfolly_la_SOURCES = \
        FileUtil.cpp \
        FingerprintTables.cpp \
        futures/detail/ThreadWheelTimekeeper.cpp \
+       futures/Barrier.cpp \
        futures/ThreadedExecutor.cpp \
        futures/Future.cpp \
        futures/InlineExecutor.cpp \
diff --git a/folly/futures/Barrier.cpp b/folly/futures/Barrier.cpp
new file mode 100644 (file)
index 0000000..e9f5ee8
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/futures/Barrier.h>
+
+namespace folly { namespace futures {
+
+Barrier::Barrier(uint32_t n)
+  : size_(n),
+    controlBlock_(allocateControlBlock()) { }
+
+Barrier::~Barrier() {
+  auto block = controlBlock_.load(std::memory_order_relaxed);
+  auto prev = block->valueAndReaderCount.load(std::memory_order_relaxed);
+  DCHECK_EQ(prev >> kReaderShift, 0);
+  auto val = prev & kValueMask;
+  auto p = promises(block);
+
+  for (uint32_t i = 0; i < val; ++i) {
+    p[i].setException(
+        folly::make_exception_wrapper<std::runtime_error>("Barrier destroyed"));
+  }
+
+  freeControlBlock(controlBlock_);
+}
+
+auto Barrier::allocateControlBlock() -> ControlBlock* {
+  auto block = static_cast<ControlBlock*>(malloc(controlBlockSize(size_)));
+  if (!block) {
+    throw std::bad_alloc();
+  }
+  block->valueAndReaderCount = 0;
+
+  auto p = promises(block);
+  uint32_t i = 0;
+  try {
+    for (i = 0; i < size_; ++i) {
+      new (p + i) BoolPromise();
+    }
+  } catch (...) {
+    for (; i != 0; --i) {
+      p[i - 1].~BoolPromise();
+    }
+    throw;
+  }
+
+  return block;
+}
+
+void Barrier::freeControlBlock(ControlBlock* block) {
+  auto p = promises(block);
+  for (uint32_t i = size_; i != 0; --i) {
+    p[i - 1].~BoolPromise();
+  }
+  free(block);
+}
+
+folly::Future<bool> Barrier::wait() {
+  // Load the current control block first. As we know there is at least
+  // one thread in the current epoch (us), this means that the value is
+  // < size_, so controlBlock_ can't change until we bump the value below.
+  auto block = controlBlock_.load(std::memory_order_acquire);
+  auto p = promises(block);
+
+  // Bump the value and record ourselves as reader.
+  // This ensures that block stays allocated, as the reader count is > 0.
+  auto prev = block->valueAndReaderCount.fetch_add(kReader + 1,
+                                                   std::memory_order_acquire);
+
+  auto prevValue = static_cast<uint32_t>(prev & kValueMask);
+  DCHECK_LT(prevValue, size_);
+  auto future = p[prevValue].getFuture();
+
+  if (prevValue + 1 == size_) {
+    // Need to reset the barrier before fulfilling any futures. This is
+    // when the epoch is flipped to the next.
+    controlBlock_.store(allocateControlBlock(), std::memory_order_release);
+
+    p[0].setValue(true);
+    for (uint32_t i = 1; i < size_; ++i) {
+      p[i].setValue(false);
+    }
+  }
+
+  // Free the control block if we're the last reader at max value.
+  prev = block->valueAndReaderCount.fetch_sub(kReader,
+                                              std::memory_order_acq_rel);
+  if (prev == (kReader | uint64_t(size_))) {
+    freeControlBlock(block);
+  }
+
+  return future;
+}
+
+}}  // namespaces
diff --git a/folly/futures/Barrier.h b/folly/futures/Barrier.h
new file mode 100644 (file)
index 0000000..64746de
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <cstdint>
+
+#include <folly/futures/Future.h>
+#include <folly/futures/Promise.h>
+
+namespace folly { namespace futures {
+
+// A folly::Future-istic Barrier synchronization primitive
+//
+// The barrier is initialized with a count N.
+//
+// The first N-1 calls to wait() return uncompleted futures.
+//
+// The Nth call to wait() completes the previous N-1 futures successfully,
+// returns a future that is already completed successfully, and resets the
+// barrier; the barrier may be reused immediately, as soon as at least one
+// of the future completions has been observed.
+//
+// Of these N futures, exactly one is completed with true, while the others are
+// completed with false; it is unspecified which future completes with true.
+// (This may be used to elect a "leader" among a group of threads.)
+//
+// If the barrier is destroyed, any futures already returned by wait() will
+// complete with an error.
+class Barrier {
+ public:
+  explicit Barrier(uint32_t n);
+  ~Barrier();
+
+  folly::Future<bool> wait();
+
+ private:
+  typedef folly::Promise<bool> BoolPromise;
+
+  static constexpr uint64_t kReaderShift = 32;
+  static constexpr uint64_t kReader = uint64_t(1) << kReaderShift;
+  static constexpr uint64_t kValueMask = kReader - 1;
+
+  // For each "epoch" that the barrier is active, we have a different
+  // ControlBlock. The ControlBlock contains the current barrier value
+  // and the number of readers (currently inside wait()) packed into a
+  // 64-bit value.
+  //
+  // The ControlBlock is allocated as long as either:
+  // - there are threads currently inside wait() (reader count > 0), or
+  // - the value has not yet reached size_ (value < size_)
+  //
+  // The array of size_ Promise objects is allocated immediately following
+  // valueAndReaderCount.
+
+  struct ControlBlock {
+    // Reader count in most significant 32 bits
+    // Value in least significant 32 bits
+    std::atomic<uint64_t> valueAndReaderCount;
+  };
+
+  struct ControlBlockAndPromise {
+    ControlBlock cb;
+    BoolPromise promises[1];
+  };
+
+  static BoolPromise* promises(ControlBlock* cb) {
+    return reinterpret_cast<ControlBlockAndPromise*>(cb)->promises;
+  }
+
+  static size_t controlBlockSize(size_t n) {
+    return offsetof(ControlBlockAndPromise, promises) + n * sizeof(BoolPromise);
+  }
+
+  ControlBlock* allocateControlBlock();
+  void freeControlBlock(ControlBlock* b);
+
+  uint32_t size_;
+  std::atomic<ControlBlock*> controlBlock_;
+};
+
+}}  // namespaces
diff --git a/folly/futures/test/BarrierTest.cpp b/folly/futures/test/BarrierTest.cpp
new file mode 100644 (file)
index 0000000..52faf8e
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/futures/Barrier.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+
+#include <folly/Random.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+DEFINE_int32(seed, 0, "Random seed");
+
+namespace folly { namespace futures { namespace test {
+
+TEST(BarrierTest, Simple) {
+  constexpr uint32_t numThreads = 10;
+
+  std::mutex mutex;
+  std::condition_variable b1DoneCond;
+  std::condition_variable b2DoneCond;
+  std::atomic<uint32_t> b1TrueSeen(0);
+  std::atomic<uint32_t> b1Passed(0);
+  std::atomic<uint32_t> b2TrueSeen(0);
+  std::atomic<uint32_t> b2Passed(0);
+
+  Barrier barrier(numThreads + 1);
+
+  std::vector<std::thread> threads;
+  threads.reserve(numThreads);
+  for (uint32_t i = 0; i < numThreads; ++i) {
+    threads.emplace_back([&] () {
+      barrier.wait()
+        .then(
+            [&] (bool v) {
+              std::unique_lock<std::mutex> lock(mutex);
+              b1TrueSeen += uint32_t(v);
+              if (++b1Passed == numThreads) {
+                b1DoneCond.notify_one();
+              }
+              return barrier.wait();
+            })
+        .then(
+            [&] (bool v) {
+              std::unique_lock<std::mutex> lock(mutex);
+              b2TrueSeen += uint32_t(v);
+              if (++b2Passed == numThreads) {
+                b2DoneCond.notify_one();
+              }
+            })
+        .get();
+    });
+  }
+
+  /* sleep override */
+  std::this_thread::sleep_for(std::chrono::milliseconds(50));
+  EXPECT_EQ(0, b1Passed);
+  EXPECT_EQ(0, b1TrueSeen);
+
+  b1TrueSeen += barrier.wait().get();
+
+  {
+    std::unique_lock<std::mutex> lock(mutex);
+    while (b1Passed != numThreads) {
+      b1DoneCond.wait(lock);
+    }
+    EXPECT_EQ(1, b1TrueSeen);
+  }
+
+  /* sleep override */
+  std::this_thread::sleep_for(std::chrono::milliseconds(50));
+  EXPECT_EQ(0, b2Passed);
+  EXPECT_EQ(0, b2TrueSeen);
+
+  b2TrueSeen += barrier.wait().get();
+
+  {
+    std::unique_lock<std::mutex> lock(mutex);
+    while (b2Passed != numThreads) {
+      b2DoneCond.wait(lock);
+    }
+    EXPECT_EQ(1, b2TrueSeen);
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+TEST(BarrierTest, Random) {
+  // Create numThreads threads.
+  //
+  // Each thread repeats the following numIterations times:
+  //   - grab a randomly chosen number of futures from the barrier, waiting
+  //     for a short random time between each
+  //   - wait for all futures to complete
+  //   - record whether the one future returning true was seen among them
+  //
+  // At the end, we verify that exactly one future returning true was seen
+  // for each iteration.
+  constexpr uint32_t numIterations = 1;
+  auto numThreads = folly::Random::rand32(30, 91);
+
+  struct ThreadInfo {
+    ThreadInfo() { }
+    std::thread thread;
+    uint32_t iteration = 0;
+    uint32_t numFutures;
+    std::vector<uint32_t> trueSeen;
+  };
+
+  std::vector<ThreadInfo> threads;
+  threads.resize(numThreads);
+
+  uint32_t totalFutures = 0;
+  for (auto& tinfo : threads) {
+    tinfo.numFutures = folly::Random::rand32(100);
+    tinfo.trueSeen.resize(numIterations);
+    totalFutures += tinfo.numFutures;
+  }
+
+  Barrier barrier(totalFutures);
+
+  for (auto& tinfo : threads) {
+    auto pinfo = &tinfo;
+    tinfo.thread = std::thread(
+        [numIterations, pinfo, &barrier] () {
+          std::vector<folly::Future<bool>> futures;
+          futures.reserve(pinfo->numFutures);
+          for (uint32_t i = 0; i < numIterations; ++i, ++pinfo->iteration) {
+            futures.clear();
+            for (uint32_t j = 0; j < pinfo->numFutures; ++j) {
+              futures.push_back(barrier.wait());
+              auto nanos = folly::Random::rand32(10 * 1000 * 1000);
+              /* sleep override */
+              std::this_thread::sleep_for(std::chrono::nanoseconds(nanos));
+            }
+            auto results = folly::collect(futures).get();
+            pinfo->trueSeen[i] =
+              std::count(results.begin(), results.end(), true);
+          }
+        });
+  }
+
+  for (auto& tinfo : threads) {
+    tinfo.thread.join();
+    EXPECT_EQ(numIterations, tinfo.iteration);
+  }
+
+  for (uint32_t i = 0; i < numIterations; ++i) {
+    uint32_t trueCount = 0;
+    for (auto& tinfo : threads) {
+      trueCount += tinfo.trueSeen[i];
+    }
+    EXPECT_EQ(1, trueCount);
+  }
+}
+
+}}}  // namespaces