X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2Fdetail%2FCore.h;h=a41059009d48cf87d8b600e530141ad2a2bcfd98;hb=76663af23df01607f74c00c449852f71e5d3f771;hp=27842b55d2e92062359b341d1dea5f8ff6ded81f;hpb=dce6e23d30d9330889bb5fb83c2c4b7383fe21fb;p=folly.git diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index 27842b55..a4105900 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2016 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +21,13 @@ #include #include +#include +#include +#include #include -#include - -#include -#include #include -#include +#include +#include #include #include @@ -74,19 +74,21 @@ enum class State : uint8_t { /// time there won't be any problems. template class Core { + static_assert(!std::is_void::value, + "void futures are not supported. Use Unit instead."); public: /// This must be heap-constructed. There's probably a way to enforce that in /// code but since this is just internal detail code and I don't know how /// off-hand, I'm punting. - Core() {} + Core() : result_(), fsm_(State::Start), attached_(2) {} explicit Core(Try&& t) - : fsm_(State::OnlyResult), - attached_(1), - result_(std::move(t)) {} + : result_(std::move(t)), + fsm_(State::OnlyResult), + attached_(1) {} ~Core() { - assert(attached_ == 0); + DCHECK(attached_ == 0); } // not copyable @@ -97,6 +99,26 @@ class Core { Core(Core&&) noexcept = delete; Core& operator=(Core&&) = delete; + // Core is assumed to be convertible only if the type is convertible + // and the size is the same. This is a compromise for the complexity + // of having to make Core truly have a conversion constructor which + // would cause various other problems. + // If we made Core move constructible then we would need to update the + // Promise and Future with the location of the new Core. This is complex + // and may be inefficient. + // Core should only be modified so that for size(T) == size(U), + // sizeof(Core) == size(Core). + // This assumption is used as a proxy to make sure that + // the members of Core and Core line up so that we can use a + // reinterpret cast. + template < + class U, + typename = typename std::enable_if::value && + sizeof(U) == sizeof(T)>::type> + static Core* convert(Core* from) { + return reinterpret_cast*>(from); + } + /// May call from any thread bool hasResult() const { switch (fsm_.getState()) { @@ -125,33 +147,13 @@ class Core { } } - template - class LambdaBufHelper { - public: - explicit LambdaBufHelper(F&& func) : func_(std::forward(func)) {} - void operator()(Try&& t) { - SCOPE_EXIT { this->~LambdaBufHelper(); }; - func_(std::move(t)); - } - private: - F func_; - }; - /// Call only from Future thread. template - void setCallback(F func) { + void setCallback(F&& func) { bool transitionToArmed = false; auto setCallback_ = [&]{ context_ = RequestContext::saveContext(); - - // Move the lambda into the Core if it fits - if (sizeof(LambdaBufHelper) <= lambdaBufSize) { - auto funcLoc = static_cast*>((void*)lambdaBuf_); - new (funcLoc) LambdaBufHelper(std::forward(func)); - callback_ = std::ref(*funcLoc); - } else { - callback_ = std::move(func); - } + callback_ = std::forward(func); }; FSM_START(fsm_) @@ -212,71 +214,97 @@ class Core { void detachPromise() { // detachPromise() and setResult() should never be called in parallel // so we don't need to protect this. - if (!result_) { - setResult(Try(exception_wrapper(BrokenPromise()))); + if (UNLIKELY(!result_)) { + setResult(Try(exception_wrapper(BrokenPromise(typeid(T).name())))); } detachOne(); } /// May call from any thread void deactivate() { - active_ = false; + active_.store(false, std::memory_order_release); } /// May call from any thread void activate() { - active_ = true; + active_.store(true, std::memory_order_release); maybeCallback(); } /// May call from any thread - bool isActive() { return active_; } + bool isActive() { return active_.load(std::memory_order_acquire); } /// Call only from Future thread - void setExecutor(Executor* x, int8_t priority) { - folly::MSLGuard g(executorLock_); + void setExecutor(Executor* x, int8_t priority = Executor::MID_PRI) { + if (!executorLock_.try_lock()) { + executorLock_.lock(); + } + executor_ = x; + priority_ = priority; + executorLock_.unlock(); + } + + void setExecutorNoLock(Executor* x, int8_t priority = Executor::MID_PRI) { executor_ = x; priority_ = priority; } Executor* getExecutor() { - folly::MSLGuard g(executorLock_); return executor_; } /// Call only from Future thread void raise(exception_wrapper e) { - folly::MSLGuard guard(interruptLock_); + if (!interruptLock_.try_lock()) { + interruptLock_.lock(); + } if (!interrupt_ && !hasResult()) { interrupt_ = folly::make_unique(std::move(e)); if (interruptHandler_) { interruptHandler_(*interrupt_); } } + interruptLock_.unlock(); } std::function getInterruptHandler() { - folly::MSLGuard guard(interruptLock_); - return interruptHandler_; + if (!interruptHandlerSet_.load(std::memory_order_acquire)) { + return nullptr; + } + if (!interruptLock_.try_lock()) { + interruptLock_.lock(); + } + auto handler = interruptHandler_; + interruptLock_.unlock(); + return handler; } /// Call only from Promise thread void setInterruptHandler(std::function fn) { - folly::MSLGuard guard(interruptLock_); + if (!interruptLock_.try_lock()) { + interruptLock_.lock(); + } if (!hasResult()) { if (interrupt_) { fn(*interrupt_); } else { - interruptHandler_ = std::move(fn); + setInterruptHandlerNoLock(std::move(fn)); } } + interruptLock_.unlock(); + } + + void setInterruptHandlerNoLock( + std::function fn) { + interruptHandlerSet_.store(true, std::memory_order_relaxed); + interruptHandler_ = std::move(fn); } protected: void maybeCallback() { FSM_START(fsm_) case State::Armed: - if (active_) { + if (active_.load(std::memory_order_acquire)) { FSM_UPDATE2(fsm_, State::Done, []{}, [this]{ this->doCallback(); }); } FSM_BREAK @@ -287,36 +315,48 @@ class Core { } void doCallback() { - RequestContext::setContext(context_); - - // TODO(6115514) semantic race on reading executor_ and setExecutor() - Executor* x; + Executor* x = executor_; int8_t priority; - { - folly::MSLGuard g(executorLock_); + if (x) { + if (!executorLock_.try_lock()) { + executorLock_.lock(); + } x = executor_; priority = priority_; + executorLock_.unlock(); } + // keep Core alive until callback did its thing + ++attached_; + if (x) { - ++attached_; // keep Core alive until executor did its thing try { if (LIKELY(x->getNumPriorities() == 1)) { x->add([this]() mutable { SCOPE_EXIT { detachOne(); }; + RequestContext::setContext(context_); + SCOPE_EXIT { callback_ = {}; }; callback_(std::move(*result_)); }); } else { x->addWithPriority([this]() mutable { SCOPE_EXIT { detachOne(); }; + RequestContext::setContext(context_); + SCOPE_EXIT { callback_ = {}; }; callback_(std::move(*result_)); }, priority); } } catch (...) { + --attached_; // Account for extra ++attached_ before try + RequestContext::setContext(context_); result_ = Try(exception_wrapper(std::current_exception())); + SCOPE_EXIT { callback_ = {}; }; callback_(std::move(*result_)); } } else { + SCOPE_EXIT { detachOne(); }; + RequestContext::setContext(context_); + SCOPE_EXIT { callback_ = {}; }; callback_(std::move(*result_)); } } @@ -330,17 +370,26 @@ class Core { } } - FSM fsm_ {State::Start}; - std::atomic attached_ {2}; + // Core should only be modified so that for size(T) == size(U), + // sizeof(Core) == size(Core). + // See Core::convert for details. + + folly::Function< + void(Try&&), + folly::FunctionMoveCtor::MAY_THROW, + 8 * sizeof(void*)> + callback_; + // place result_ next to increase the likelihood that the value will be + // contained entirely in one cache line + folly::Optional> result_; + FSM fsm_; + std::atomic attached_; std::atomic active_ {true}; + std::atomic interruptHandlerSet_ {false}; folly::MicroSpinLock interruptLock_ {0}; folly::MicroSpinLock executorLock_ {0}; int8_t priority_ {-1}; Executor* executor_ {nullptr}; - folly::Optional> result_ {}; - std::function&&)> callback_ {nullptr}; - static constexpr size_t lambdaBufSize = 8 * sizeof(void*); - char lambdaBuf_[lambdaBufSize]; std::shared_ptr context_ {nullptr}; std::unique_ptr interrupt_ {}; std::function interruptHandler_ {nullptr}; @@ -371,22 +420,41 @@ struct CollectVariadicContext { p.setException(std::move(t.exception())); } } else if (!threw) { - std::get(results) = std::move(t.value()); + std::get(results) = std::move(t); } } ~CollectVariadicContext() { if (!threw.exchange(true)) { - p.setValue(std::move(results)); + p.setValue(unwrap(std::move(results))); } } Promise> p; - std::tuple results; - std::atomic threw; + std::tuple...> results; + std::atomic threw {false}; typedef Future> type; + + private: + template + static std::tuple unwrap(std::tuple...>&& o, + Ts2&&... ts2) { + static_assert(sizeof...(ts2) < + std::tuple_size...>>::value, + "Non-templated unwrap should be used instead"); + assert(std::get(o).hasValue()); + + return unwrap(std::move(o), + std::forward(ts2)..., + std::move(*std::get(o))); + } + + static std::tuple unwrap(std::tuple...>&& /* o */, + Ts&&... ts) { + return std::tuple(std::forward(ts)...); + } }; -template