X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2Fdetail%2FCore.h;h=0e66d3309d4ff54d146ce11baffaa7588fcb2283;hb=96791c4516497b4ea2ad08af12e4267bd1c4e796;hp=a41059009d48cf87d8b600e530141ad2a2bcfd98;hpb=76663af23df01607f74c00c449852f71e5d3f771;p=folly.git diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index a4105900..0e66d330 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,9 +25,10 @@ #include #include #include +#include +#include #include #include -#include #include #include @@ -73,7 +74,7 @@ enum class State : uint8_t { /// doesn't access a Future or Promise object from more than one thread at a /// time there won't be any problems. template -class Core { +class Core final { static_assert(!std::is_void::value, "void futures are not supported. Use Unit instead."); public: @@ -99,26 +100,6 @@ class Core { Core(Core&&) noexcept = delete; Core& operator=(Core&&) = delete; - // Core is assumed to be convertible only if the type is convertible - // and the size is the same. This is a compromise for the complexity - // of having to make Core truly have a conversion constructor which - // would cause various other problems. - // If we made Core move constructible then we would need to update the - // Promise and Future with the location of the new Core. This is complex - // and may be inefficient. - // Core should only be modified so that for size(T) == size(U), - // sizeof(Core) == size(Core). - // This assumption is used as a proxy to make sure that - // the members of Core and Core line up so that we can use a - // reinterpret cast. - template < - class U, - typename = typename std::enable_if::value && - sizeof(U) == sizeof(T)>::type> - static Core* convert(Core* from) { - return reinterpret_cast*>(from); - } - /// May call from any thread bool hasResult() const { switch (fsm_.getState()) { @@ -300,7 +281,36 @@ class Core { interruptHandler_ = std::move(fn); } - protected: + private: + // Helper class that stores a pointer to the `Core` object and calls + // `derefCallback` and `detachOne` in the destructor. + class CoreAndCallbackReference { + public: + explicit CoreAndCallbackReference(Core* core) noexcept : core_(core) {} + + ~CoreAndCallbackReference() { + if (core_) { + core_->derefCallback(); + core_->detachOne(); + } + } + + CoreAndCallbackReference(CoreAndCallbackReference const& o) = delete; + CoreAndCallbackReference& operator=(CoreAndCallbackReference const& o) = + delete; + + CoreAndCallbackReference(CoreAndCallbackReference&& o) noexcept { + std::swap(core_, o.core_); + } + + Core* getCore() const noexcept { + return core_; + } + + private: + Core* core_{nullptr}; + }; + void maybeCallback() { FSM_START(fsm_) case State::Armed: @@ -326,64 +336,81 @@ class Core { executorLock_.unlock(); } - // keep Core alive until callback did its thing - ++attached_; - if (x) { + exception_wrapper ew; + // We need to reset `callback_` after it was executed (which can happen + // through the executor or, if `Executor::add` throws, below). The + // executor might discard the function without executing it (now or + // later), in which case `callback_` also needs to be reset. + // The `Core` has to be kept alive throughout that time, too. Hence we + // increment `attached_` and `callbackReferences_` by two, and construct + // exactly two `CoreAndCallbackReference` objects, which call + // `derefCallback` and `detachOne` in their destructor. One will guard + // this scope, the other one will guard the lambda passed to the executor. + attached_ += 2; + callbackReferences_ += 2; + CoreAndCallbackReference guard_local_scope(this); + CoreAndCallbackReference guard_lambda(this); try { if (LIKELY(x->getNumPriorities() == 1)) { - x->add([this]() mutable { - SCOPE_EXIT { detachOne(); }; - RequestContext::setContext(context_); - SCOPE_EXIT { callback_ = {}; }; - callback_(std::move(*result_)); + x->add([core_ref = std::move(guard_lambda)]() mutable { + auto cr = std::move(core_ref); + Core* const core = cr.getCore(); + RequestContextScopeGuard rctx(core->context_); + core->callback_(std::move(*core->result_)); }); } else { - x->addWithPriority([this]() mutable { - SCOPE_EXIT { detachOne(); }; - RequestContext::setContext(context_); - SCOPE_EXIT { callback_ = {}; }; - callback_(std::move(*result_)); - }, priority); + x->addWithPriority( + [core_ref = std::move(guard_lambda)]() mutable { + auto cr = std::move(core_ref); + Core* const core = cr.getCore(); + RequestContextScopeGuard rctx(core->context_); + core->callback_(std::move(*core->result_)); + }, + priority); } + } catch (const std::exception& e) { + ew = exception_wrapper(std::current_exception(), e); } catch (...) { - --attached_; // Account for extra ++attached_ before try - RequestContext::setContext(context_); - result_ = Try(exception_wrapper(std::current_exception())); - SCOPE_EXIT { callback_ = {}; }; + ew = exception_wrapper(std::current_exception()); + } + if (ew) { + RequestContextScopeGuard rctx(context_); + result_ = Try(std::move(ew)); callback_(std::move(*result_)); } } else { - SCOPE_EXIT { detachOne(); }; - RequestContext::setContext(context_); - SCOPE_EXIT { callback_ = {}; }; + attached_++; + SCOPE_EXIT { + callback_ = {}; + detachOne(); + }; + RequestContextScopeGuard rctx(context_); callback_(std::move(*result_)); } } void detachOne() { - auto a = --attached_; - assert(a >= 0); - assert(a <= 2); - if (a == 0) { + auto a = attached_--; + assert(a >= 1); + if (a == 1) { delete this; } } - // Core should only be modified so that for size(T) == size(U), - // sizeof(Core) == size(Core). - // See Core::convert for details. + void derefCallback() { + if (--callbackReferences_ == 0) { + callback_ = {}; + } + } - folly::Function< - void(Try&&), - folly::FunctionMoveCtor::MAY_THROW, - 8 * sizeof(void*)> - callback_; + folly::Function&&)> callback_; // place result_ next to increase the likelihood that the value will be // contained entirely in one cache line folly::Optional> result_; FSM fsm_; std::atomic attached_; + std::atomic callbackReferences_{0}; std::atomic active_ {true}; std::atomic interruptHandlerSet_ {false}; folly::MicroSpinLock interruptLock_ {0}; @@ -423,34 +450,15 @@ struct CollectVariadicContext { std::get(results) = std::move(t); } } - ~CollectVariadicContext() { + ~CollectVariadicContext() noexcept { if (!threw.exchange(true)) { - p.setValue(unwrap(std::move(results))); + p.setValue(unwrapTryTuple(std::move(results))); } } Promise> p; std::tuple...> results; std::atomic threw {false}; typedef Future> type; - - private: - template - static std::tuple unwrap(std::tuple...>&& o, - Ts2&&... ts2) { - static_assert(sizeof...(ts2) < - std::tuple_size...>>::value, - "Non-templated unwrap should be used instead"); - assert(std::get(o).hasValue()); - - return unwrap(std::move(o), - std::forward(ts2)..., - std::move(*std::get(o))); - } - - static std::tuple unwrap(std::tuple...>&& /* o */, - Ts&&... ts) { - return std::tuple(std::forward(ts)...); - } }; template