fix race in Future::waitWithSemaphore
[folly.git] / folly / wangle / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/wangle/detail/State.h>
23 #include <folly/Baton.h>
24
25 namespace folly { namespace wangle {
26
27 template <typename T>
28 struct isFuture {
29   static const bool value = false;
30 };
31
32 template <typename T>
33 struct isFuture<Future<T> > {
34   static const bool value = true;
35 };
36
37 template <class T>
38 Future<T>::Future(Future<T>&& other) noexcept : state_(nullptr) {
39   *this = std::move(other);
40 }
41
42 template <class T>
43 Future<T>& Future<T>::operator=(Future<T>&& other) {
44   std::swap(state_, other.state_);
45   return *this;
46 }
47
48 template <class T>
49 Future<T>::~Future() {
50   detach();
51 }
52
53 template <class T>
54 void Future<T>::detach() {
55   if (state_) {
56     state_->detachFuture();
57     state_ = nullptr;
58   }
59 }
60
61 template <class T>
62 void Future<T>::throwIfInvalid() const {
63   if (!state_)
64     throw NoState();
65 }
66
67 template <class T>
68 template <class F>
69 void Future<T>::setCallback_(F&& func) {
70   throwIfInvalid();
71   state_->setCallback(std::move(func));
72 }
73
74 template <class T>
75 template <class F>
76 typename std::enable_if<
77   !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
78   Future<typename std::result_of<F(Try<T>&&)>::type> >::type
79 Future<T>::then(F&& func) {
80   typedef typename std::result_of<F(Try<T>&&)>::type B;
81
82   throwIfInvalid();
83
84   // wrap these so we can move them into the lambda
85   folly::MoveWrapper<Promise<B>> p;
86   folly::MoveWrapper<F> funcm(std::forward<F>(func));
87
88   // grab the Future now before we lose our handle on the Promise
89   auto f = p->getFuture();
90
91   /* This is a bit tricky.
92
93      We can't just close over *this in case this Future gets moved. So we
94      make a new dummy Future. We could figure out something more
95      sophisticated that avoids making a new Future object when it can, as an
96      optimization. But this is correct.
97
98      state_ can't be moved, it is explicitly disallowed (as is copying). But
99      if there's ever a reason to allow it, this is one place that makes that
100      assumption and would need to be fixed. We use a standard shared pointer
101      for state_ (by copying it in), which means in essence obj holds a shared
102      pointer to itself.  But this shouldn't leak because Promise will not
103      outlive the continuation, because Promise will setException() with a
104      broken Promise if it is destructed before completed. We could use a
105      weak pointer but it would have to be converted to a shared pointer when
106      func is executed (because the Future returned by func may possibly
107      persist beyond the callback, if it gets moved), and so it is an
108      optimization to just make it shared from the get-go.
109
110      We have to move in the Promise and func using the MoveWrapper
111      hack. (func could be copied but it's a big drag on perf).
112
113      Two subtle but important points about this design. detail::State has no
114      back pointers to Future or Promise, so if Future or Promise get moved
115      (and they will be moved in performant code) we don't have to do
116      anything fancy. And because we store the continuation in the
117      detail::State, not in the Future, we can execute the continuation even
118      after the Future has gone out of scope. This is an intentional design
119      decision. It is likely we will want to be able to cancel a continuation
120      in some circumstances, but I think it should be explicit not implicit
121      in the destruction of the Future used to create it.
122      */
123   setCallback_(
124     [p, funcm](Try<T>&& t) mutable {
125       p->fulfil([&]() {
126           return (*funcm)(std::move(t));
127         });
128     });
129
130   return std::move(f);
131 }
132
133 template <class T>
134 template <class F>
135 typename std::enable_if<
136   isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
137   Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
138 Future<T>::then(F&& func) {
139   typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
140
141   throwIfInvalid();
142
143   // wrap these so we can move them into the lambda
144   folly::MoveWrapper<Promise<B>> p;
145   folly::MoveWrapper<F> funcm(std::forward<F>(func));
146
147   // grab the Future now before we lose our handle on the Promise
148   auto f = p->getFuture();
149
150   setCallback_(
151     [p, funcm](Try<T>&& t) mutable {
152       try {
153         auto f2 = (*funcm)(std::move(t));
154         // that didn't throw, now we can steal p
155         f2.setCallback_([p](Try<B>&& b) mutable {
156             p->fulfilTry(std::move(b));
157           });
158       } catch (...) {
159         p->setException(std::current_exception());
160       }
161     });
162
163   return std::move(f);
164 }
165
166 template <class T>
167 Future<void> Future<T>::then() {
168   return then([] (Try<T>&& t) {});
169 }
170
171 template <class T>
172 typename std::add_lvalue_reference<T>::type Future<T>::value() {
173   throwIfInvalid();
174
175   return state_->value();
176 }
177
178 template <class T>
179 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
180   throwIfInvalid();
181
182   return state_->value();
183 }
184
185 template <class T>
186 Try<T>& Future<T>::getTry() {
187   throwIfInvalid();
188
189   return state_->getTry();
190 }
191
192 template <class T>
193 template <typename Executor>
194 inline Future<T> Future<T>::via(Executor* executor) {
195   throwIfInvalid();
196   auto f = then([=](Try<T>&& t) {
197     MoveWrapper<Promise<T>> promise;
198     MoveWrapper<Try<T>> tw(std::move(t));
199     auto f2 = promise->getFuture();
200     executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
201     return f2;
202   });
203   f.deactivate();
204   return f;
205 }
206
207 template <class T>
208 bool Future<T>::isReady() const {
209   throwIfInvalid();
210   return state_->ready();
211 }
212
213 // makeFuture
214
215 template <class T>
216 Future<typename std::decay<T>::type> makeFuture(T&& t) {
217   Promise<typename std::decay<T>::type> p;
218   auto f = p.getFuture();
219   p.setValue(std::forward<T>(t));
220   return std::move(f);
221 }
222
223 inline // for multiple translation units
224 Future<void> makeFuture() {
225   Promise<void> p;
226   auto f = p.getFuture();
227   p.setValue();
228   return std::move(f);
229 }
230
231 template <class F>
232 auto makeFutureTry(
233     F&& func,
234     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
235     -> Future<decltype(func())> {
236   Promise<decltype(func())> p;
237   auto f = p.getFuture();
238   p.fulfil(
239     [&func]() {
240       return (func)();
241     });
242   return std::move(f);
243 }
244
245 template <class F>
246 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
247   F copy = func;
248   return makeFutureTry(std::move(copy));
249 }
250
251 template <class T>
252 Future<T> makeFuture(std::exception_ptr const& e) {
253   Promise<T> p;
254   auto f = p.getFuture();
255   p.setException(e);
256   return std::move(f);
257 }
258
259 template <class T, class E>
260 typename std::enable_if<std::is_base_of<std::exception, E>::value, Future<T>>::type
261 makeFuture(E const& e) {
262   Promise<T> p;
263   auto f = p.getFuture();
264   p.fulfil([&]() -> T { throw e; });
265   return std::move(f);
266 }
267
268 template <class T>
269 Future<T> makeFuture(Try<T>&& t) {
270   try {
271     return makeFuture<T>(std::move(t.value()));
272   } catch (...) {
273     return makeFuture<T>(std::current_exception());
274   }
275 }
276
277 template <>
278 inline Future<void> makeFuture(Try<void>&& t) {
279   try {
280     t.throwIfFailed();
281     return makeFuture();
282   } catch (...) {
283     return makeFuture<void>(std::current_exception());
284   }
285 }
286
287 // when (variadic)
288
289 template <typename... Fs>
290 typename detail::VariadicContext<
291   typename std::decay<Fs>::type::value_type...>::type
292 whenAll(Fs&&... fs)
293 {
294   auto ctx =
295     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
296   ctx->total = sizeof...(fs);
297   auto f_saved = ctx->p.getFuture();
298   detail::whenAllVariadicHelper(ctx,
299     std::forward<typename std::decay<Fs>::type>(fs)...);
300   return std::move(f_saved);
301 }
302
303 // when (iterator)
304
305 template <class InputIterator>
306 Future<
307   std::vector<
308   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
309 whenAll(InputIterator first, InputIterator last)
310 {
311   typedef
312     typename std::iterator_traits<InputIterator>::value_type::value_type T;
313
314   auto n = std::distance(first, last);
315   if (n == 0) {
316     return makeFuture(std::vector<Try<T>>());
317   }
318
319   auto ctx = new detail::WhenAllContext<T>();
320
321   ctx->total = n;
322   ctx->results.resize(ctx->total);
323
324   auto f_saved = ctx->p.getFuture();
325
326   for (size_t i = 0; first != last; ++first, ++i) {
327      auto& f = *first;
328      f.setCallback_([ctx, i](Try<T>&& t) {
329          ctx->results[i] = std::move(t);
330          if (++ctx->count == ctx->total) {
331            ctx->p.setValue(std::move(ctx->results));
332            delete ctx;
333          }
334        });
335   }
336
337   return std::move(f_saved);
338 }
339
340 template <class InputIterator>
341 Future<
342   std::pair<size_t,
343             Try<
344               typename
345               std::iterator_traits<InputIterator>::value_type::value_type> > >
346 whenAny(InputIterator first, InputIterator last) {
347   typedef
348     typename std::iterator_traits<InputIterator>::value_type::value_type T;
349
350   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
351   auto f_saved = ctx->p.getFuture();
352
353   for (size_t i = 0; first != last; first++, i++) {
354     auto& f = *first;
355     f.setCallback_([i, ctx](Try<T>&& t) {
356       if (!ctx->done.exchange(true)) {
357         ctx->p.setValue(std::make_pair(i, std::move(t)));
358       }
359       ctx->decref();
360     });
361   }
362
363   return std::move(f_saved);
364 }
365
366 template <class InputIterator>
367 Future<std::vector<std::pair<size_t, Try<typename
368   std::iterator_traits<InputIterator>::value_type::value_type>>>>
369 whenN(InputIterator first, InputIterator last, size_t n) {
370   typedef typename
371     std::iterator_traits<InputIterator>::value_type::value_type T;
372   typedef std::vector<std::pair<size_t, Try<T>>> V;
373
374   struct ctx_t {
375     V v;
376     size_t completed;
377     Promise<V> p;
378   };
379   auto ctx = std::make_shared<ctx_t>();
380   ctx->completed = 0;
381
382   // for each completed Future, increase count and add to vector, until we
383   // have n completed futures at which point we fulfil our Promise with the
384   // vector
385   auto it = first;
386   size_t i = 0;
387   while (it != last) {
388     it->then([ctx, n, i](Try<T>&& t) {
389       auto& v = ctx->v;
390       auto c = ++ctx->completed;
391       if (c <= n) {
392         assert(ctx->v.size() < n);
393         v.push_back(std::make_pair(i, std::move(t)));
394         if (c == n) {
395           ctx->p.fulfilTry(Try<V>(std::move(v)));
396         }
397       }
398     });
399
400     it++;
401     i++;
402   }
403
404   if (i < n) {
405     ctx->p.setException(std::runtime_error("Not enough futures"));
406   }
407
408   return ctx->p.getFuture();
409 }
410
411 template <typename T>
412 Future<T>
413 waitWithSemaphore(Future<T>&& f) {
414   Baton<> baton;
415   auto done = f.then([&](Try<T> &&t) {
416     baton.post();
417     return std::move(t.value());
418   });
419   baton.wait();
420   while (!done.isReady()) {
421     // There's a race here between the return here and the actual finishing of
422     // the future. f is completed, but the setup may not have finished on done
423     // after the baton has posted.
424     std::this_thread::yield();
425   }
426   return done;
427 }
428
429 template<>
430 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
431   Baton<> baton;
432   auto done = f.then([&](Try<void> &&t) {
433     baton.post();
434     t.value();
435   });
436   baton.wait();
437   while (!done.isReady()) {
438     // There's a race here between the return here and the actual finishing of
439     // the future. f is completed, but the setup may not have finished on done
440     // after the baton has posted.
441     std::this_thread::yield();
442   }
443   return done;
444 }
445
446 template <typename T, class Duration>
447 Future<T>
448 waitWithSemaphore(Future<T>&& f, Duration timeout) {
449   auto baton = std::make_shared<Baton<>>();
450   auto done = f.then([baton](Try<T> &&t) {
451     baton->post();
452     return std::move(t.value());
453   });
454   baton->timed_wait(std::chrono::system_clock::now() + timeout);
455   return done;
456 }
457
458 template <class Duration>
459 Future<void>
460 waitWithSemaphore(Future<void>&& f, Duration timeout) {
461   auto baton = std::make_shared<Baton<>>();
462   auto done = f.then([baton](Try<void> &&t) {
463     baton->post();
464     t.value();
465   });
466   baton->timed_wait(std::chrono::system_clock::now() + timeout);
467   return done;
468 }
469
470 }}
471
472 // I haven't included a Future<T&> specialization because I don't forsee us
473 // using it, however it is not difficult to add when needed. Refer to
474 // Future<void> for guidance. std::future and boost::future code would also be
475 // instructive.