(wangle) Use MicroSpinLock
[folly.git] / folly / wangle / detail / Core.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <mutex>
21 #include <stdexcept>
22 #include <vector>
23
24 #include <folly/Optional.h>
25 #include <folly/SmallLocks.h>
26
27 #include <folly/wangle/Try.h>
28 #include <folly/wangle/Promise.h>
29 #include <folly/wangle/Future.h>
30 #include <folly/wangle/Executor.h>
31
32 namespace folly { namespace wangle { namespace detail {
33
34 // As of GCC 4.8.1, the std::function in libstdc++ optimizes only for pointers
35 // to functions, using a helper avoids a call to malloc.
36 template<typename T>
37 void empty_callback(Try<T>&&) { }
38
39 /** The shared state object for Future and Promise. */
40 template<typename T>
41 class Core {
42  public:
43   // This must be heap-constructed. There's probably a way to enforce that in
44   // code but since this is just internal detail code and I don't know how
45   // off-hand, I'm punting.
46   Core() = default;
47   ~Core() {
48     assert(calledBack_);
49     assert(detached_ == 2);
50   }
51
52   // not copyable
53   Core(Core const&) = delete;
54   Core& operator=(Core const&) = delete;
55
56   // not movable (see comment in the implementation of Future::then)
57   Core(Core&&) noexcept = delete;
58   Core& operator=(Core&&) = delete;
59
60   Try<T>& getTry() {
61     return *value_;
62   }
63
64   template <typename F>
65   void setCallback(F func) {
66     {
67       std::lock_guard<decltype(mutex_)> lock(mutex_);
68
69       if (callback_) {
70         throw std::logic_error("setCallback called twice");
71       }
72
73       callback_ = std::move(func);
74     }
75
76     maybeCallback();
77   }
78
79   void fulfil(Try<T>&& t) {
80     {
81       std::lock_guard<decltype(mutex_)> lock(mutex_);
82
83       if (ready()) {
84         throw std::logic_error("fulfil called twice");
85       }
86
87       value_ = std::move(t);
88       assert(ready());
89     }
90
91     maybeCallback();
92   }
93
94   void setException(std::exception_ptr const& e) {
95     fulfil(Try<T>(e));
96   }
97
98   template <class E> void setException(E const& e) {
99     fulfil(Try<T>(std::make_exception_ptr<E>(e)));
100   }
101
102   bool ready() const {
103     return value_.hasValue();
104   }
105
106   typename std::add_lvalue_reference<T>::type value() {
107     if (ready()) {
108       return value_->value();
109     } else {
110       throw FutureNotReady();
111     }
112   }
113
114   // Called by a destructing Future
115   void detachFuture() {
116     if (!callback_) {
117       setCallback(empty_callback<T>);
118     }
119     activate();
120     detachOne();
121   }
122
123   // Called by a destructing Promise
124   void detachPromise() {
125     if (!ready()) {
126       setException(BrokenPromise());
127     }
128     detachOne();
129   }
130
131   void deactivate() {
132     std::lock_guard<decltype(mutex_)> lock(mutex_);
133     active_ = false;
134   }
135
136   void activate() {
137     {
138       std::lock_guard<decltype(mutex_)> lock(mutex_);
139       active_ = true;
140     }
141     maybeCallback();
142   }
143
144   bool isActive() { return active_; }
145
146   void setExecutor(Executor* x) {
147     std::lock_guard<decltype(mutex_)> lock(mutex_);
148     executor_ = x;
149   }
150
151  private:
152   void maybeCallback() {
153     std::unique_lock<decltype(mutex_)> lock(mutex_);
154     if (!calledBack_ &&
155         value_ && callback_ && isActive()) {
156       // TODO(5306911) we should probably try/catch here
157       if (executor_) {
158         MoveWrapper<folly::Optional<Try<T>>> val(std::move(value_));
159         MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
160         executor_->add([cb, val]() mutable { (*cb)(std::move(**val)); });
161         calledBack_ = true;
162       } else {
163         calledBack_ = true;
164         lock.unlock();
165         callback_(std::move(*value_));
166       }
167     }
168   }
169
170   void detachOne() {
171     bool shouldDelete;
172     {
173       std::lock_guard<decltype(mutex_)> lock(mutex_);
174       detached_++;
175       assert(detached_ == 1 || detached_ == 2);
176       shouldDelete = (detached_ == 2);
177     }
178
179     if (shouldDelete) {
180       // we should have already executed the callback with the value
181       assert(calledBack_);
182       delete this;
183     }
184   }
185
186   folly::Optional<Try<T>> value_;
187   std::function<void(Try<T>&&)> callback_;
188   bool calledBack_ = false;
189   unsigned char detached_ = 0;
190   bool active_ = true;
191   Executor* executor_ = nullptr;
192
193   // this lock isn't meant to protect all accesses to members, only the ones
194   // that need to be threadsafe: the act of setting value_ and callback_, and
195   // seeing if they are set and whether we should then continue.
196   folly::MicroSpinLock mutex_ {0};
197 };
198
199 template <typename... Ts>
200 struct VariadicContext {
201   VariadicContext() : total(0), count(0) {}
202   Promise<std::tuple<Try<Ts>... > > p;
203   std::tuple<Try<Ts>... > results;
204   size_t total;
205   std::atomic<size_t> count;
206   typedef Future<std::tuple<Try<Ts>...>> type;
207 };
208
209 template <typename... Ts, typename THead, typename... Fs>
210 typename std::enable_if<sizeof...(Fs) == 0, void>::type
211 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
212   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
213     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
214     if (++ctx->count == ctx->total) {
215       ctx->p.setValue(std::move(ctx->results));
216       delete ctx;
217     }
218   });
219 }
220
221 template <typename... Ts, typename THead, typename... Fs>
222 typename std::enable_if<sizeof...(Fs) != 0, void>::type
223 whenAllVariadicHelper(VariadicContext<Ts...> *ctx, THead&& head, Fs&&... tail) {
224   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
225     std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
226     if (++ctx->count == ctx->total) {
227       ctx->p.setValue(std::move(ctx->results));
228       delete ctx;
229     }
230   });
231   // template tail-recursion
232   whenAllVariadicHelper(ctx, std::forward<Fs>(tail)...);
233 }
234
235 template <typename T>
236 struct WhenAllContext {
237   explicit WhenAllContext() : count(0), total(0) {}
238   Promise<std::vector<Try<T> > > p;
239   std::vector<Try<T> > results;
240   std::atomic<size_t> count;
241   size_t total;
242 };
243
244 template <typename T>
245 struct WhenAnyContext {
246   explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {};
247   Promise<std::pair<size_t, Try<T>>> p;
248   std::atomic<bool> done;
249   std::atomic<size_t> ref_count;
250   void decref() {
251     if (--ref_count == 0) {
252       delete this;
253     }
254   }
255 };
256
257 template <typename T>
258 struct WhenAllLaterContext {
259   explicit WhenAllLaterContext() : count(0), total(0) {}
260   std::function<void(std::vector<Try<T>>&&)> fn;
261   std::vector<Try<T> > results;
262   std::atomic<size_t> count;
263   size_t total;
264 };
265
266 }}} // namespace