X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2Fdetail%2FCore.h;h=7516885264c2c68deff2e6dd17fb74f6cb9315c1;hb=dda657cc57624ebea01a96a54fed2c0c86b398fe;hp=4bdc5ef4755e05f20ef761420d1510fdb2e966ba;hpb=714633903be94edc55af06571738d3ad0343b925;p=folly.git diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index 4bdc5ef4..75168852 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -22,7 +22,7 @@ #include #include -#include +#include #include #include @@ -68,19 +68,27 @@ enum class State : uint8_t { /// migrate between threads, though this usually happens within the API code. /// For example, an async operation will probably make a Promise, grab its /// Future, then move the Promise into another thread that will eventually -/// fulfil it. With executors and via, this gets slightly more complicated at +/// fulfill it. With executors and via, this gets slightly more complicated at /// first blush, but it's the same principle. In general, as long as the user /// doesn't access a Future or Promise object from more than one thread at a /// time there won't be any problems. template class Core { + static_assert(!std::is_void::value, + "void futures are not supported. Use Unit instead."); public: /// This must be heap-constructed. There's probably a way to enforce that in /// code but since this is just internal detail code and I don't know how /// off-hand, I'm punting. - Core() {} + Core() : result_(), fsm_(State::Start), attached_(2) {} + + explicit Core(Try&& t) + : result_(std::move(t)), + fsm_(State::OnlyResult), + attached_(1) {} + ~Core() { - assert(attached_ == 0); + DCHECK(attached_ == 0); } // not copyable @@ -122,7 +130,8 @@ class Core { template class LambdaBufHelper { public: - explicit LambdaBufHelper(F&& func) : func_(std::forward(func)) {} + template + explicit LambdaBufHelper(FF&& func) : func_(std::forward(func)) {} void operator()(Try&& t) { SCOPE_EXIT { this->~LambdaBufHelper(); }; func_(std::move(t)); @@ -140,7 +149,7 @@ class Core { // Move the lambda into the Core if it fits if (sizeof(LambdaBufHelper) <= lambdaBufSize) { - auto funcLoc = static_cast*>((void*)lambdaBuf_); + auto funcLoc = reinterpret_cast*>(&lambdaBuf_); new (funcLoc) LambdaBufHelper(std::forward(func)); callback_ = std::ref(*funcLoc); } else { @@ -198,7 +207,7 @@ class Core { /// Called by a destructing Future (in the Future thread, by definition) void detachFuture() { - activateNoDeprecatedWarning(); + activate(); detachOne(); } @@ -206,28 +215,39 @@ class Core { void detachPromise() { // detachPromise() and setResult() should never be called in parallel // so we don't need to protect this. - if (!result_) { - setResult(Try(exception_wrapper(BrokenPromise()))); + if (UNLIKELY(!result_)) { + setResult(Try(exception_wrapper(BrokenPromise(typeid(T).name())))); } detachOne(); } /// May call from any thread - void deactivate() DEPRECATED { - active_ = false; + void deactivate() { + active_.store(false, std::memory_order_release); } /// May call from any thread - void activate() DEPRECATED { - activateNoDeprecatedWarning(); + void activate() { + active_.store(true, std::memory_order_release); + maybeCallback(); } /// May call from any thread - bool isActive() { return active_; } + bool isActive() { return active_.load(std::memory_order_acquire); } /// Call only from Future thread - void setExecutor(Executor* x) { + void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) { + if (!executorLock_.try_lock()) { + executorLock_.lock(); + } + executor_ = x; + priority_ = priority; + executorLock_.unlock(); + } + + void setExecutorNoLock(Executor* x, int8_t priority = Executor::MID_PRI) { executor_ = x; + priority_ = priority; } Executor* getExecutor() { @@ -236,39 +256,57 @@ class Core { /// Call only from Future thread void raise(exception_wrapper e) { - std::lock_guard guard(interruptLock_); + if (!interruptLock_.try_lock()) { + interruptLock_.lock(); + } if (!interrupt_ && !hasResult()) { interrupt_ = folly::make_unique(std::move(e)); if (interruptHandler_) { interruptHandler_(*interrupt_); } } + interruptLock_.unlock(); + } + + std::function getInterruptHandler() { + if (!interruptHandlerSet_.load(std::memory_order_acquire)) { + return nullptr; + } + if (!interruptLock_.try_lock()) { + interruptLock_.lock(); + } + auto handler = interruptHandler_; + interruptLock_.unlock(); + return handler; } /// Call only from Promise thread void setInterruptHandler(std::function fn) { - std::lock_guard guard(interruptLock_); + if (!interruptLock_.try_lock()) { + interruptLock_.lock(); + } if (!hasResult()) { if (interrupt_) { fn(*interrupt_); } else { - interruptHandler_ = std::move(fn); + setInterruptHandlerNoLock(std::move(fn)); } } + interruptLock_.unlock(); } - protected: - void activateNoDeprecatedWarning() { - active_ = true; - maybeCallback(); + void setInterruptHandlerNoLock( + std::function fn) { + interruptHandlerSet_.store(true, std::memory_order_relaxed); + interruptHandler_ = std::move(fn); } + protected: void maybeCallback() { FSM_START(fsm_) case State::Armed: - if (active_) { - FSM_UPDATE2(fsm_, State::Done, []{}, - std::bind(&Core::doCallback, this)); + if (active_.load(std::memory_order_acquire)) { + FSM_UPDATE2(fsm_, State::Done, []{}, [this]{ this->doCallback(); }); } FSM_BREAK @@ -278,19 +316,46 @@ class Core { } void doCallback() { - // TODO(5306911) we should probably try/catch around the callback - - RequestContext::setContext(context_); - - // TODO(6115514) semantic race on reading executor_ and setExecutor() Executor* x = executor_; + int8_t priority; if (x) { - ++attached_; // keep Core alive until executor did its thing - x->add([this]() mutable { - SCOPE_EXIT { detachOne(); }; + if (!executorLock_.try_lock()) { + executorLock_.lock(); + } + x = executor_; + priority = priority_; + executorLock_.unlock(); + } + + if (x) { + // keep Core alive until executor did its thing + ++attached_; + try { + if (LIKELY(x->getNumPriorities() == 1)) { + x->add([this]() mutable { + SCOPE_EXIT { detachOne(); }; + RequestContext::setContext(context_); + SCOPE_EXIT { callback_ = {}; }; + callback_(std::move(*result_)); + }); + } else { + x->addWithPriority([this]() mutable { + SCOPE_EXIT { detachOne(); }; + RequestContext::setContext(context_); + SCOPE_EXIT { callback_ = {}; }; + callback_(std::move(*result_)); + }, priority); + } + } catch (...) { + --attached_; // Account for extra ++attached_ before try + RequestContext::setContext(context_); + result_ = Try(exception_wrapper(std::current_exception())); + SCOPE_EXIT { callback_ = {}; }; callback_(std::move(*result_)); - }); + } } else { + RequestContext::setContext(context_); + SCOPE_EXIT { callback_ = {}; }; callback_(std::move(*result_)); } } @@ -304,75 +369,99 @@ class Core { } } - FSM fsm_ {State::Start}; - std::atomic attached_ {2}; + // lambdaBuf occupies exactly one cache line + static constexpr size_t lambdaBufSize = 8 * sizeof(void*); + typename std::aligned_storage::type lambdaBuf_; + // place result_ next to increase the likelihood that the value will be + // contained entirely in one cache line + folly::Optional> result_; + std::function&&)> callback_ {nullptr}; + FSM fsm_; + std::atomic attached_; std::atomic active_ {true}; + std::atomic interruptHandlerSet_ {false}; folly::MicroSpinLock interruptLock_ {0}; - folly::Optional> result_ {}; - std::function&&)> callback_ {nullptr}; - static constexpr size_t lambdaBufSize = 8 * sizeof(void*); - char lambdaBuf_[lambdaBufSize]; + folly::MicroSpinLock executorLock_ {0}; + int8_t priority_ {-1}; + Executor* executor_ {nullptr}; std::shared_ptr context_ {nullptr}; - std::atomic executor_ {nullptr}; std::unique_ptr interrupt_ {}; std::function interruptHandler_ {nullptr}; }; template -struct VariadicContext { - VariadicContext() : total(0), count(0) {} - Promise... > > p; - std::tuple... > results; - size_t total; - std::atomic count; +struct CollectAllVariadicContext { + CollectAllVariadicContext() {} + template + inline void setPartialResult(Try& t) { + std::get(results) = std::move(t); + } + ~CollectAllVariadicContext() { + p.setValue(std::move(results)); + } + Promise...>> p; + std::tuple...> results; typedef Future...>> type; }; -template -typename std::enable_if::type -whenAllVariadicHelper(VariadicContext *ctx, THead&& head, Fs&&... tail) { - head.setCallback_([ctx](Try&& t) { - std::get(ctx->results) = std::move(t); - if (++ctx->count == ctx->total) { - ctx->p.setValue(std::move(ctx->results)); - delete ctx; +template +struct CollectVariadicContext { + CollectVariadicContext() {} + template + inline void setPartialResult(Try& t) { + if (t.hasException()) { + if (!threw.exchange(true)) { + p.setException(std::move(t.exception())); + } + } else if (!threw) { + std::get(results) = std::move(t); + } + } + ~CollectVariadicContext() { + if (!threw.exchange(true)) { + p.setValue(unwrap(std::move(results))); } - }); + } + Promise> p; + std::tuple...> results; + std::atomic threw {false}; + typedef Future> type; + + private: + template + static std::tuple unwrap(std::tuple...>&& o, + Ts2&&... ts2) { + static_assert(sizeof...(ts2) < + std::tuple_size...>>::value, + "Non-templated unwrap should be used instead"); + assert(std::get(o).hasValue()); + + return unwrap(std::move(o), + std::forward(ts2)..., + std::move(*std::get(o))); + } + + static std::tuple unwrap(std::tuple...>&& /* o */, + Ts&&... ts) { + return std::tuple(std::forward(ts)...); + } +}; + +template