From 83312f92481105336979b6f2d9ab68ba2b0d9c6b Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Thu, 16 Jul 2015 09:50:03 -0700 Subject: [PATCH] folly::Future-istic barrier Summary: What it says on the tin. Reviewed By: @fugalh Differential Revision: D2230390 --- folly/Makefile.am | 2 + folly/futures/Barrier.cpp | 108 ++++++++++++++++++ folly/futures/Barrier.h | 96 ++++++++++++++++ folly/futures/test/BarrierTest.cpp | 174 +++++++++++++++++++++++++++++ 4 files changed, 380 insertions(+) create mode 100644 folly/futures/Barrier.cpp create mode 100644 folly/futures/Barrier.h create mode 100644 folly/futures/test/BarrierTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 43f595dd..a516449d 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -128,6 +128,7 @@ nobase_follyinclude_HEADERS = \ FormatTraits.h \ Format.h \ Format-inl.h \ + futures/Barrier.h \ futures/Deprecated.h \ futures/ThreadedExecutor.h \ futures/DrivableExecutor.h \ @@ -311,6 +312,7 @@ libfolly_la_SOURCES = \ FileUtil.cpp \ FingerprintTables.cpp \ futures/detail/ThreadWheelTimekeeper.cpp \ + futures/Barrier.cpp \ futures/ThreadedExecutor.cpp \ futures/Future.cpp \ futures/InlineExecutor.cpp \ diff --git a/folly/futures/Barrier.cpp b/folly/futures/Barrier.cpp new file mode 100644 index 00000000..e9f5ee86 --- /dev/null +++ b/folly/futures/Barrier.cpp @@ -0,0 +1,108 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace folly { namespace futures { + +Barrier::Barrier(uint32_t n) + : size_(n), + controlBlock_(allocateControlBlock()) { } + +Barrier::~Barrier() { + auto block = controlBlock_.load(std::memory_order_relaxed); + auto prev = block->valueAndReaderCount.load(std::memory_order_relaxed); + DCHECK_EQ(prev >> kReaderShift, 0); + auto val = prev & kValueMask; + auto p = promises(block); + + for (uint32_t i = 0; i < val; ++i) { + p[i].setException( + folly::make_exception_wrapper("Barrier destroyed")); + } + + freeControlBlock(controlBlock_); +} + +auto Barrier::allocateControlBlock() -> ControlBlock* { + auto block = static_cast(malloc(controlBlockSize(size_))); + if (!block) { + throw std::bad_alloc(); + } + block->valueAndReaderCount = 0; + + auto p = promises(block); + uint32_t i = 0; + try { + for (i = 0; i < size_; ++i) { + new (p + i) BoolPromise(); + } + } catch (...) { + for (; i != 0; --i) { + p[i - 1].~BoolPromise(); + } + throw; + } + + return block; +} + +void Barrier::freeControlBlock(ControlBlock* block) { + auto p = promises(block); + for (uint32_t i = size_; i != 0; --i) { + p[i - 1].~BoolPromise(); + } + free(block); +} + +folly::Future Barrier::wait() { + // Load the current control block first. As we know there is at least + // one thread in the current epoch (us), this means that the value is + // < size_, so controlBlock_ can't change until we bump the value below. + auto block = controlBlock_.load(std::memory_order_acquire); + auto p = promises(block); + + // Bump the value and record ourselves as reader. + // This ensures that block stays allocated, as the reader count is > 0. + auto prev = block->valueAndReaderCount.fetch_add(kReader + 1, + std::memory_order_acquire); + + auto prevValue = static_cast(prev & kValueMask); + DCHECK_LT(prevValue, size_); + auto future = p[prevValue].getFuture(); + + if (prevValue + 1 == size_) { + // Need to reset the barrier before fulfilling any futures. This is + // when the epoch is flipped to the next. + controlBlock_.store(allocateControlBlock(), std::memory_order_release); + + p[0].setValue(true); + for (uint32_t i = 1; i < size_; ++i) { + p[i].setValue(false); + } + } + + // Free the control block if we're the last reader at max value. + prev = block->valueAndReaderCount.fetch_sub(kReader, + std::memory_order_acq_rel); + if (prev == (kReader | uint64_t(size_))) { + freeControlBlock(block); + } + + return future; +} + +}} // namespaces diff --git a/folly/futures/Barrier.h b/folly/futures/Barrier.h new file mode 100644 index 00000000..64746de3 --- /dev/null +++ b/folly/futures/Barrier.h @@ -0,0 +1,96 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include + +namespace folly { namespace futures { + +// A folly::Future-istic Barrier synchronization primitive +// +// The barrier is initialized with a count N. +// +// The first N-1 calls to wait() return uncompleted futures. +// +// The Nth call to wait() completes the previous N-1 futures successfully, +// returns a future that is already completed successfully, and resets the +// barrier; the barrier may be reused immediately, as soon as at least one +// of the future completions has been observed. +// +// Of these N futures, exactly one is completed with true, while the others are +// completed with false; it is unspecified which future completes with true. +// (This may be used to elect a "leader" among a group of threads.) +// +// If the barrier is destroyed, any futures already returned by wait() will +// complete with an error. +class Barrier { + public: + explicit Barrier(uint32_t n); + ~Barrier(); + + folly::Future wait(); + + private: + typedef folly::Promise BoolPromise; + + static constexpr uint64_t kReaderShift = 32; + static constexpr uint64_t kReader = uint64_t(1) << kReaderShift; + static constexpr uint64_t kValueMask = kReader - 1; + + // For each "epoch" that the barrier is active, we have a different + // ControlBlock. The ControlBlock contains the current barrier value + // and the number of readers (currently inside wait()) packed into a + // 64-bit value. + // + // The ControlBlock is allocated as long as either: + // - there are threads currently inside wait() (reader count > 0), or + // - the value has not yet reached size_ (value < size_) + // + // The array of size_ Promise objects is allocated immediately following + // valueAndReaderCount. + + struct ControlBlock { + // Reader count in most significant 32 bits + // Value in least significant 32 bits + std::atomic valueAndReaderCount; + }; + + struct ControlBlockAndPromise { + ControlBlock cb; + BoolPromise promises[1]; + }; + + static BoolPromise* promises(ControlBlock* cb) { + return reinterpret_cast(cb)->promises; + } + + static size_t controlBlockSize(size_t n) { + return offsetof(ControlBlockAndPromise, promises) + n * sizeof(BoolPromise); + } + + ControlBlock* allocateControlBlock(); + void freeControlBlock(ControlBlock* b); + + uint32_t size_; + std::atomic controlBlock_; +}; + +}} // namespaces diff --git a/folly/futures/test/BarrierTest.cpp b/folly/futures/test/BarrierTest.cpp new file mode 100644 index 00000000..52faf8e5 --- /dev/null +++ b/folly/futures/test/BarrierTest.cpp @@ -0,0 +1,174 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +#include +#include +#include + +DEFINE_int32(seed, 0, "Random seed"); + +namespace folly { namespace futures { namespace test { + +TEST(BarrierTest, Simple) { + constexpr uint32_t numThreads = 10; + + std::mutex mutex; + std::condition_variable b1DoneCond; + std::condition_variable b2DoneCond; + std::atomic b1TrueSeen(0); + std::atomic b1Passed(0); + std::atomic b2TrueSeen(0); + std::atomic b2Passed(0); + + Barrier barrier(numThreads + 1); + + std::vector threads; + threads.reserve(numThreads); + for (uint32_t i = 0; i < numThreads; ++i) { + threads.emplace_back([&] () { + barrier.wait() + .then( + [&] (bool v) { + std::unique_lock lock(mutex); + b1TrueSeen += uint32_t(v); + if (++b1Passed == numThreads) { + b1DoneCond.notify_one(); + } + return barrier.wait(); + }) + .then( + [&] (bool v) { + std::unique_lock lock(mutex); + b2TrueSeen += uint32_t(v); + if (++b2Passed == numThreads) { + b2DoneCond.notify_one(); + } + }) + .get(); + }); + } + + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(0, b1Passed); + EXPECT_EQ(0, b1TrueSeen); + + b1TrueSeen += barrier.wait().get(); + + { + std::unique_lock lock(mutex); + while (b1Passed != numThreads) { + b1DoneCond.wait(lock); + } + EXPECT_EQ(1, b1TrueSeen); + } + + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(0, b2Passed); + EXPECT_EQ(0, b2TrueSeen); + + b2TrueSeen += barrier.wait().get(); + + { + std::unique_lock lock(mutex); + while (b2Passed != numThreads) { + b2DoneCond.wait(lock); + } + EXPECT_EQ(1, b2TrueSeen); + } + + for (auto& t : threads) { + t.join(); + } +} + +TEST(BarrierTest, Random) { + // Create numThreads threads. + // + // Each thread repeats the following numIterations times: + // - grab a randomly chosen number of futures from the barrier, waiting + // for a short random time between each + // - wait for all futures to complete + // - record whether the one future returning true was seen among them + // + // At the end, we verify that exactly one future returning true was seen + // for each iteration. + constexpr uint32_t numIterations = 1; + auto numThreads = folly::Random::rand32(30, 91); + + struct ThreadInfo { + ThreadInfo() { } + std::thread thread; + uint32_t iteration = 0; + uint32_t numFutures; + std::vector trueSeen; + }; + + std::vector threads; + threads.resize(numThreads); + + uint32_t totalFutures = 0; + for (auto& tinfo : threads) { + tinfo.numFutures = folly::Random::rand32(100); + tinfo.trueSeen.resize(numIterations); + totalFutures += tinfo.numFutures; + } + + Barrier barrier(totalFutures); + + for (auto& tinfo : threads) { + auto pinfo = &tinfo; + tinfo.thread = std::thread( + [numIterations, pinfo, &barrier] () { + std::vector> futures; + futures.reserve(pinfo->numFutures); + for (uint32_t i = 0; i < numIterations; ++i, ++pinfo->iteration) { + futures.clear(); + for (uint32_t j = 0; j < pinfo->numFutures; ++j) { + futures.push_back(barrier.wait()); + auto nanos = folly::Random::rand32(10 * 1000 * 1000); + /* sleep override */ + std::this_thread::sleep_for(std::chrono::nanoseconds(nanos)); + } + auto results = folly::collect(futures).get(); + pinfo->trueSeen[i] = + std::count(results.begin(), results.end(), true); + } + }); + } + + for (auto& tinfo : threads) { + tinfo.thread.join(); + EXPECT_EQ(numIterations, tinfo.iteration); + } + + for (uint32_t i = 0; i < numIterations; ++i) { + uint32_t trueCount = 0; + for (auto& tinfo : threads) { + trueCount += tinfo.trueSeen[i]; + } + EXPECT_EQ(1, trueCount); + } +} + +}}} // namespaces -- 2.34.1