X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Ffutures%2Fdetail%2FCore.h;h=b196b6649ee54b02233019af16cae891477ff55b;hp=27842b55d2e92062359b341d1dea5f8ff6ded81f;hb=8401bbaffc06e05b14ea7057842b7ed2804375c0;hpb=dce6e23d30d9330889bb5fb83c2c4b7383fe21fb diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index 27842b55..b196b664 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +21,14 @@ #include #include +#include +#include +#include #include -#include - -#include -#include +#include +#include #include -#include +#include #include #include @@ -73,20 +74,22 @@ enum class State : uint8_t { /// doesn't access a Future or Promise object from more than one thread at a /// time there won't be any problems. template -class Core { +class Core final { + static_assert(!std::is_void::value, + "void futures are not supported. Use Unit instead."); public: /// This must be heap-constructed. There's probably a way to enforce that in /// code but since this is just internal detail code and I don't know how /// off-hand, I'm punting. - Core() {} + Core() : result_(), fsm_(State::Start), attached_(2) {} explicit Core(Try&& t) - : fsm_(State::OnlyResult), - attached_(1), - result_(std::move(t)) {} + : result_(std::move(t)), + fsm_(State::OnlyResult), + attached_(1) {} ~Core() { - assert(attached_ == 0); + DCHECK(attached_ == 0); } // not copyable @@ -98,7 +101,7 @@ class Core { Core& operator=(Core&&) = delete; /// May call from any thread - bool hasResult() const { + bool hasResult() const noexcept { switch (fsm_.getState()) { case State::OnlyResult: case State::Armed: @@ -112,7 +115,7 @@ class Core { } /// May call from any thread - bool ready() const { + bool ready() const noexcept { return hasResult(); } @@ -125,33 +128,13 @@ class Core { } } - template - class LambdaBufHelper { - public: - explicit LambdaBufHelper(F&& func) : func_(std::forward(func)) {} - void operator()(Try&& t) { - SCOPE_EXIT { this->~LambdaBufHelper(); }; - func_(std::move(t)); - } - private: - F func_; - }; - /// Call only from Future thread. template - void setCallback(F func) { + void setCallback(F&& func) { bool transitionToArmed = false; auto setCallback_ = [&]{ context_ = RequestContext::saveContext(); - - // Move the lambda into the Core if it fits - if (sizeof(LambdaBufHelper) <= lambdaBufSize) { - auto funcLoc = static_cast*>((void*)lambdaBuf_); - new (funcLoc) LambdaBufHelper(std::forward(func)); - callback_ = std::ref(*funcLoc); - } else { - callback_ = std::move(func); - } + callback_ = std::forward(func); }; FSM_START(fsm_) @@ -212,71 +195,126 @@ class Core { void detachPromise() { // detachPromise() and setResult() should never be called in parallel // so we don't need to protect this. - if (!result_) { - setResult(Try(exception_wrapper(BrokenPromise()))); + if (UNLIKELY(!result_)) { + setResult(Try(exception_wrapper(BrokenPromise(typeid(T).name())))); } detachOne(); } /// May call from any thread void deactivate() { - active_ = false; + active_.store(false, std::memory_order_release); } /// May call from any thread void activate() { - active_ = true; + active_.store(true, std::memory_order_release); maybeCallback(); } /// May call from any thread - bool isActive() { return active_; } + bool isActive() { return active_.load(std::memory_order_acquire); } /// Call only from Future thread - void setExecutor(Executor* x, int8_t priority) { - folly::MSLGuard g(executorLock_); + void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) { + if (!executorLock_.try_lock()) { + executorLock_.lock(); + } + executor_ = x; + priority_ = priority; + executorLock_.unlock(); + } + + void setExecutorNoLock(Executor* x, int8_t priority = Executor::MID_PRI) { executor_ = x; priority_ = priority; } Executor* getExecutor() { - folly::MSLGuard g(executorLock_); return executor_; } /// Call only from Future thread void raise(exception_wrapper e) { - folly::MSLGuard guard(interruptLock_); + if (!interruptLock_.try_lock()) { + interruptLock_.lock(); + } if (!interrupt_ && !hasResult()) { - interrupt_ = folly::make_unique(std::move(e)); + interrupt_ = std::make_unique(std::move(e)); if (interruptHandler_) { interruptHandler_(*interrupt_); } } + interruptLock_.unlock(); } std::function getInterruptHandler() { - folly::MSLGuard guard(interruptLock_); - return interruptHandler_; + if (!interruptHandlerSet_.load(std::memory_order_acquire)) { + return nullptr; + } + if (!interruptLock_.try_lock()) { + interruptLock_.lock(); + } + auto handler = interruptHandler_; + interruptLock_.unlock(); + return handler; } /// Call only from Promise thread void setInterruptHandler(std::function fn) { - folly::MSLGuard guard(interruptLock_); + if (!interruptLock_.try_lock()) { + interruptLock_.lock(); + } if (!hasResult()) { if (interrupt_) { fn(*interrupt_); } else { - interruptHandler_ = std::move(fn); + setInterruptHandlerNoLock(std::move(fn)); } } + interruptLock_.unlock(); } - protected: + void setInterruptHandlerNoLock( + std::function fn) { + interruptHandlerSet_.store(true, std::memory_order_relaxed); + interruptHandler_ = std::move(fn); + } + + private: + // Helper class that stores a pointer to the `Core` object and calls + // `derefCallback` and `detachOne` in the destructor. + class CoreAndCallbackReference { + public: + explicit CoreAndCallbackReference(Core* core) noexcept : core_(core) {} + + ~CoreAndCallbackReference() { + if (core_) { + core_->derefCallback(); + core_->detachOne(); + } + } + + CoreAndCallbackReference(CoreAndCallbackReference const& o) = delete; + CoreAndCallbackReference& operator=(CoreAndCallbackReference const& o) = + delete; + + CoreAndCallbackReference(CoreAndCallbackReference&& o) noexcept { + std::swap(core_, o.core_); + } + + Core* getCore() const noexcept { + return core_; + } + + private: + Core* core_{nullptr}; + }; + void maybeCallback() { FSM_START(fsm_) case State::Armed: - if (active_) { + if (active_.load(std::memory_order_acquire)) { FSM_UPDATE2(fsm_, State::Done, []{}, [this]{ this->doCallback(); }); } FSM_BREAK @@ -287,60 +325,98 @@ class Core { } void doCallback() { - RequestContext::setContext(context_); - - // TODO(6115514) semantic race on reading executor_ and setExecutor() - Executor* x; + Executor* x = executor_; int8_t priority; - { - folly::MSLGuard g(executorLock_); + if (x) { + if (!executorLock_.try_lock()) { + executorLock_.lock(); + } x = executor_; priority = priority_; + executorLock_.unlock(); } if (x) { - ++attached_; // keep Core alive until executor did its thing + exception_wrapper ew; + // We need to reset `callback_` after it was executed (which can happen + // through the executor or, if `Executor::add` throws, below). The + // executor might discard the function without executing it (now or + // later), in which case `callback_` also needs to be reset. + // The `Core` has to be kept alive throughout that time, too. Hence we + // increment `attached_` and `callbackReferences_` by two, and construct + // exactly two `CoreAndCallbackReference` objects, which call + // `derefCallback` and `detachOne` in their destructor. One will guard + // this scope, the other one will guard the lambda passed to the executor. + attached_ += 2; + callbackReferences_ += 2; + CoreAndCallbackReference guard_local_scope(this); + CoreAndCallbackReference guard_lambda(this); try { if (LIKELY(x->getNumPriorities() == 1)) { - x->add([this]() mutable { - SCOPE_EXIT { detachOne(); }; - callback_(std::move(*result_)); + x->add([core_ref = std::move(guard_lambda)]() mutable { + auto cr = std::move(core_ref); + Core* const core = cr.getCore(); + RequestContextScopeGuard rctx(core->context_); + core->callback_(std::move(*core->result_)); }); } else { - x->addWithPriority([this]() mutable { - SCOPE_EXIT { detachOne(); }; - callback_(std::move(*result_)); - }, priority); + x->addWithPriority( + [core_ref = std::move(guard_lambda)]() mutable { + auto cr = std::move(core_ref); + Core* const core = cr.getCore(); + RequestContextScopeGuard rctx(core->context_); + core->callback_(std::move(*core->result_)); + }, + priority); } + } catch (const std::exception& e) { + ew = exception_wrapper(std::current_exception(), e); } catch (...) { - result_ = Try(exception_wrapper(std::current_exception())); + ew = exception_wrapper(std::current_exception()); + } + if (ew) { + RequestContextScopeGuard rctx(context_); + result_ = Try(std::move(ew)); callback_(std::move(*result_)); } } else { + attached_++; + SCOPE_EXIT { + callback_ = {}; + detachOne(); + }; + RequestContextScopeGuard rctx(context_); callback_(std::move(*result_)); } } void detachOne() { - auto a = --attached_; - assert(a >= 0); - assert(a <= 2); - if (a == 0) { + auto a = attached_--; + assert(a >= 1); + if (a == 1) { delete this; } } - FSM fsm_ {State::Start}; - std::atomic attached_ {2}; + void derefCallback() { + if (--callbackReferences_ == 0) { + callback_ = {}; + } + } + + folly::Function&&)> callback_; + // place result_ next to increase the likelihood that the value will be + // contained entirely in one cache line + folly::Optional> result_; + FSM fsm_; + std::atomic attached_; + std::atomic callbackReferences_{0}; std::atomic active_ {true}; + std::atomic interruptHandlerSet_ {false}; folly::MicroSpinLock interruptLock_ {0}; folly::MicroSpinLock executorLock_ {0}; int8_t priority_ {-1}; Executor* executor_ {nullptr}; - folly::Optional> result_ {}; - std::function&&)> callback_ {nullptr}; - static constexpr size_t lambdaBufSize = 8 * sizeof(void*); - char lambdaBuf_[lambdaBufSize]; std::shared_ptr context_ {nullptr}; std::unique_ptr interrupt_ {}; std::function interruptHandler_ {nullptr}; @@ -371,22 +447,22 @@ struct CollectVariadicContext { p.setException(std::move(t.exception())); } } else if (!threw) { - std::get(results) = std::move(t.value()); + std::get(results) = std::move(t); } } - ~CollectVariadicContext() { + ~CollectVariadicContext() noexcept { if (!threw.exchange(true)) { - p.setValue(std::move(results)); + p.setValue(unwrapTryTuple(std::move(results))); } } Promise> p; - std::tuple results; - std::atomic threw; + std::tuple...> results; + std::atomic threw {false}; typedef Future> type; }; -template