X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2Fdetail%2FCore.h;h=0e66d3309d4ff54d146ce11baffaa7588fcb2283;hb=96791c4516497b4ea2ad08af12e4267bd1c4e796;hp=7516885264c2c68deff2e6dd17fb74f6cb9315c1;hpb=dda657cc57624ebea01a96a54fed2c0c86b398fe;p=folly.git diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index 75168852..0e66d330 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +21,14 @@ #include #include -#include +#include +#include #include - -#include -#include +#include +#include +#include #include -#include +#include #include #include @@ -73,7 +74,7 @@ enum class State : uint8_t { /// doesn't access a Future or Promise object from more than one thread at a /// time there won't be any problems. template -class Core { +class Core final { static_assert(!std::is_void::value, "void futures are not supported. Use Unit instead."); public: @@ -127,34 +128,13 @@ class Core { } } - template - class LambdaBufHelper { - public: - template - explicit LambdaBufHelper(FF&& func) : func_(std::forward(func)) {} - void operator()(Try&& t) { - SCOPE_EXIT { this->~LambdaBufHelper(); }; - func_(std::move(t)); - } - private: - F func_; - }; - /// Call only from Future thread. template - void setCallback(F func) { + void setCallback(F&& func) { bool transitionToArmed = false; auto setCallback_ = [&]{ context_ = RequestContext::saveContext(); - - // Move the lambda into the Core if it fits - if (sizeof(LambdaBufHelper) <= lambdaBufSize) { - auto funcLoc = reinterpret_cast*>(&lambdaBuf_); - new (funcLoc) LambdaBufHelper(std::forward(func)); - callback_ = std::ref(*funcLoc); - } else { - callback_ = std::move(func); - } + callback_ = std::forward(func); }; FSM_START(fsm_) @@ -301,7 +281,36 @@ class Core { interruptHandler_ = std::move(fn); } - protected: + private: + // Helper class that stores a pointer to the `Core` object and calls + // `derefCallback` and `detachOne` in the destructor. + class CoreAndCallbackReference { + public: + explicit CoreAndCallbackReference(Core* core) noexcept : core_(core) {} + + ~CoreAndCallbackReference() { + if (core_) { + core_->derefCallback(); + core_->detachOne(); + } + } + + CoreAndCallbackReference(CoreAndCallbackReference const& o) = delete; + CoreAndCallbackReference& operator=(CoreAndCallbackReference const& o) = + delete; + + CoreAndCallbackReference(CoreAndCallbackReference&& o) noexcept { + std::swap(core_, o.core_); + } + + Core* getCore() const noexcept { + return core_; + } + + private: + Core* core_{nullptr}; + }; + void maybeCallback() { FSM_START(fsm_) case State::Armed: @@ -328,56 +337,80 @@ class Core { } if (x) { - // keep Core alive until executor did its thing - ++attached_; + exception_wrapper ew; + // We need to reset `callback_` after it was executed (which can happen + // through the executor or, if `Executor::add` throws, below). The + // executor might discard the function without executing it (now or + // later), in which case `callback_` also needs to be reset. + // The `Core` has to be kept alive throughout that time, too. Hence we + // increment `attached_` and `callbackReferences_` by two, and construct + // exactly two `CoreAndCallbackReference` objects, which call + // `derefCallback` and `detachOne` in their destructor. One will guard + // this scope, the other one will guard the lambda passed to the executor. + attached_ += 2; + callbackReferences_ += 2; + CoreAndCallbackReference guard_local_scope(this); + CoreAndCallbackReference guard_lambda(this); try { if (LIKELY(x->getNumPriorities() == 1)) { - x->add([this]() mutable { - SCOPE_EXIT { detachOne(); }; - RequestContext::setContext(context_); - SCOPE_EXIT { callback_ = {}; }; - callback_(std::move(*result_)); + x->add([core_ref = std::move(guard_lambda)]() mutable { + auto cr = std::move(core_ref); + Core* const core = cr.getCore(); + RequestContextScopeGuard rctx(core->context_); + core->callback_(std::move(*core->result_)); }); } else { - x->addWithPriority([this]() mutable { - SCOPE_EXIT { detachOne(); }; - RequestContext::setContext(context_); - SCOPE_EXIT { callback_ = {}; }; - callback_(std::move(*result_)); - }, priority); + x->addWithPriority( + [core_ref = std::move(guard_lambda)]() mutable { + auto cr = std::move(core_ref); + Core* const core = cr.getCore(); + RequestContextScopeGuard rctx(core->context_); + core->callback_(std::move(*core->result_)); + }, + priority); } + } catch (const std::exception& e) { + ew = exception_wrapper(std::current_exception(), e); } catch (...) { - --attached_; // Account for extra ++attached_ before try - RequestContext::setContext(context_); - result_ = Try(exception_wrapper(std::current_exception())); - SCOPE_EXIT { callback_ = {}; }; + ew = exception_wrapper(std::current_exception()); + } + if (ew) { + RequestContextScopeGuard rctx(context_); + result_ = Try(std::move(ew)); callback_(std::move(*result_)); } } else { - RequestContext::setContext(context_); - SCOPE_EXIT { callback_ = {}; }; + attached_++; + SCOPE_EXIT { + callback_ = {}; + detachOne(); + }; + RequestContextScopeGuard rctx(context_); callback_(std::move(*result_)); } } void detachOne() { - auto a = --attached_; - assert(a >= 0); - assert(a <= 2); - if (a == 0) { + auto a = attached_--; + assert(a >= 1); + if (a == 1) { delete this; } } - // lambdaBuf occupies exactly one cache line - static constexpr size_t lambdaBufSize = 8 * sizeof(void*); - typename std::aligned_storage::type lambdaBuf_; + void derefCallback() { + if (--callbackReferences_ == 0) { + callback_ = {}; + } + } + + folly::Function&&)> callback_; // place result_ next to increase the likelihood that the value will be // contained entirely in one cache line folly::Optional> result_; - std::function&&)> callback_ {nullptr}; FSM fsm_; std::atomic attached_; + std::atomic callbackReferences_{0}; std::atomic active_ {true}; std::atomic interruptHandlerSet_ {false}; folly::MicroSpinLock interruptLock_ {0}; @@ -417,34 +450,15 @@ struct CollectVariadicContext { std::get(results) = std::move(t); } } - ~CollectVariadicContext() { + ~CollectVariadicContext() noexcept { if (!threw.exchange(true)) { - p.setValue(unwrap(std::move(results))); + p.setValue(unwrapTryTuple(std::move(results))); } } Promise> p; std::tuple...> results; std::atomic threw {false}; typedef Future> type; - - private: - template - static std::tuple unwrap(std::tuple...>&& o, - Ts2&&... ts2) { - static_assert(sizeof...(ts2) < - std::tuple_size...>>::value, - "Non-templated unwrap should be used instead"); - assert(std::get(o).hasValue()); - - return unwrap(std::move(o), - std::forward(ts2)..., - std::move(*std::get(o))); - } - - static std::tuple unwrap(std::tuple...>&& /* o */, - Ts&&... ts) { - return std::tuple(std::forward(ts)...); - } }; template