waitFor race workaround
[folly.git] / folly / wangle / Future-inl.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <chrono>
20 #include <thread>
21
22 #include <folly/wangle/detail/Core.h>
23 #include <folly/Baton.h>
24
25 namespace folly { namespace wangle {
26
27 template <typename T>
28 struct isFuture {
29   static const bool value = false;
30 };
31
32 template <typename T>
33 struct isFuture<Future<T> > {
34   static const bool value = true;
35 };
36
37 template <class T>
38 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
39   *this = std::move(other);
40 }
41
42 template <class T>
43 Future<T>& Future<T>::operator=(Future<T>&& other) {
44   std::swap(core_, other.core_);
45   return *this;
46 }
47
48 template <class T>
49 Future<T>::~Future() {
50   detach();
51 }
52
53 template <class T>
54 void Future<T>::detach() {
55   if (core_) {
56     core_->detachFuture();
57     core_ = nullptr;
58   }
59 }
60
61 template <class T>
62 void Future<T>::throwIfInvalid() const {
63   if (!core_)
64     throw NoState();
65 }
66
67 template <class T>
68 template <class F>
69 void Future<T>::setCallback_(F&& func) {
70   throwIfInvalid();
71   core_->setCallback(std::move(func));
72 }
73
74 template <class T>
75 template <class F>
76 typename std::enable_if<
77   !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
78   Future<typename std::result_of<F(Try<T>&&)>::type> >::type
79 Future<T>::then(F&& func) {
80   typedef typename std::result_of<F(Try<T>&&)>::type B;
81
82   throwIfInvalid();
83
84   // wrap these so we can move them into the lambda
85   folly::MoveWrapper<Promise<B>> p;
86   folly::MoveWrapper<F> funcm(std::forward<F>(func));
87
88   // grab the Future now before we lose our handle on the Promise
89   auto f = p->getFuture();
90
91   /* This is a bit tricky.
92
93      We can't just close over *this in case this Future gets moved. So we
94      make a new dummy Future. We could figure out something more
95      sophisticated that avoids making a new Future object when it can, as an
96      optimization. But this is correct.
97
98      core_ can't be moved, it is explicitly disallowed (as is copying). But
99      if there's ever a reason to allow it, this is one place that makes that
100      assumption and would need to be fixed. We use a standard shared pointer
101      for core_ (by copying it in), which means in essence obj holds a shared
102      pointer to itself.  But this shouldn't leak because Promise will not
103      outlive the continuation, because Promise will setException() with a
104      broken Promise if it is destructed before completed. We could use a
105      weak pointer but it would have to be converted to a shared pointer when
106      func is executed (because the Future returned by func may possibly
107      persist beyond the callback, if it gets moved), and so it is an
108      optimization to just make it shared from the get-go.
109
110      We have to move in the Promise and func using the MoveWrapper
111      hack. (func could be copied but it's a big drag on perf).
112
113      Two subtle but important points about this design. detail::Core has no
114      back pointers to Future or Promise, so if Future or Promise get moved
115      (and they will be moved in performant code) we don't have to do
116      anything fancy. And because we store the continuation in the
117      detail::Core, not in the Future, we can execute the continuation even
118      after the Future has gone out of scope. This is an intentional design
119      decision. It is likely we will want to be able to cancel a continuation
120      in some circumstances, but I think it should be explicit not implicit
121      in the destruction of the Future used to create it.
122      */
123   setCallback_(
124     [p, funcm](Try<T>&& t) mutable {
125       p->fulfil([&]() {
126           return (*funcm)(std::move(t));
127         });
128     });
129
130   return std::move(f);
131 }
132
133 template <class T>
134 template <class F>
135 typename std::enable_if<
136   isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
137   Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
138 Future<T>::then(F&& func) {
139   typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
140
141   throwIfInvalid();
142
143   // wrap these so we can move them into the lambda
144   folly::MoveWrapper<Promise<B>> p;
145   folly::MoveWrapper<F> funcm(std::forward<F>(func));
146
147   // grab the Future now before we lose our handle on the Promise
148   auto f = p->getFuture();
149
150   setCallback_(
151     [p, funcm](Try<T>&& t) mutable {
152       try {
153         auto f2 = (*funcm)(std::move(t));
154         // that didn't throw, now we can steal p
155         f2.setCallback_([p](Try<B>&& b) mutable {
156             p->fulfilTry(std::move(b));
157           });
158       } catch (...) {
159         p->setException(std::current_exception());
160       }
161     });
162
163   return std::move(f);
164 }
165
166 template <class T>
167 Future<void> Future<T>::then() {
168   return then([] (Try<T>&& t) {});
169 }
170
171 template <class T>
172 typename std::add_lvalue_reference<T>::type Future<T>::value() {
173   throwIfInvalid();
174
175   return core_->getTry().value();
176 }
177
178 template <class T>
179 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
180   throwIfInvalid();
181
182   return core_->value();
183 }
184
185 template <class T>
186 Try<T>& Future<T>::getTry() {
187   throwIfInvalid();
188
189   return core_->getTry();
190 }
191
192 template <class T>
193 template <typename Executor>
194 inline Future<T> Future<T>::via(Executor* executor) {
195   throwIfInvalid();
196
197   this->deactivate();
198   core_->setExecutor(executor);
199
200   return std::move(*this);
201 }
202
203 template <class T>
204 bool Future<T>::isReady() const {
205   throwIfInvalid();
206   return core_->ready();
207 }
208
209 template <class T>
210 void Future<T>::raise(std::exception_ptr exception) {
211   core_->raise(exception);
212 }
213
214 // makeFuture
215
216 template <class T>
217 Future<typename std::decay<T>::type> makeFuture(T&& t) {
218   Promise<typename std::decay<T>::type> p;
219   auto f = p.getFuture();
220   p.setValue(std::forward<T>(t));
221   return std::move(f);
222 }
223
224 inline // for multiple translation units
225 Future<void> makeFuture() {
226   Promise<void> p;
227   auto f = p.getFuture();
228   p.setValue();
229   return std::move(f);
230 }
231
232 template <class F>
233 auto makeFutureTry(
234     F&& func,
235     typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
236     -> Future<decltype(func())> {
237   Promise<decltype(func())> p;
238   auto f = p.getFuture();
239   p.fulfil(
240     [&func]() {
241       return (func)();
242     });
243   return std::move(f);
244 }
245
246 template <class F>
247 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
248   F copy = func;
249   return makeFutureTry(std::move(copy));
250 }
251
252 template <class T>
253 Future<T> makeFuture(std::exception_ptr const& e) {
254   Promise<T> p;
255   auto f = p.getFuture();
256   p.setException(e);
257   return std::move(f);
258 }
259
260 template <class T, class E>
261 typename std::enable_if<std::is_base_of<std::exception, E>::value,
262                         Future<T>>::type
263 makeFuture(E const& e) {
264   Promise<T> p;
265   auto f = p.getFuture();
266   p.fulfil([&]() -> T { throw e; });
267   return std::move(f);
268 }
269
270 template <class T>
271 Future<T> makeFuture(Try<T>&& t) {
272   try {
273     return makeFuture<T>(std::move(t.value()));
274   } catch (...) {
275     return makeFuture<T>(std::current_exception());
276   }
277 }
278
279 template <>
280 inline Future<void> makeFuture(Try<void>&& t) {
281   try {
282     t.throwIfFailed();
283     return makeFuture();
284   } catch (...) {
285     return makeFuture<void>(std::current_exception());
286   }
287 }
288
289 // when (variadic)
290
291 template <typename... Fs>
292 typename detail::VariadicContext<
293   typename std::decay<Fs>::type::value_type...>::type
294 whenAll(Fs&&... fs)
295 {
296   auto ctx =
297     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
298   ctx->total = sizeof...(fs);
299   auto f_saved = ctx->p.getFuture();
300   detail::whenAllVariadicHelper(ctx,
301     std::forward<typename std::decay<Fs>::type>(fs)...);
302   return std::move(f_saved);
303 }
304
305 // when (iterator)
306
307 template <class InputIterator>
308 Future<
309   std::vector<
310   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
311 whenAll(InputIterator first, InputIterator last)
312 {
313   typedef
314     typename std::iterator_traits<InputIterator>::value_type::value_type T;
315
316   auto n = std::distance(first, last);
317   if (n == 0) {
318     return makeFuture(std::vector<Try<T>>());
319   }
320
321   auto ctx = new detail::WhenAllContext<T>();
322
323   ctx->total = n;
324   ctx->results.resize(ctx->total);
325
326   auto f_saved = ctx->p.getFuture();
327
328   for (size_t i = 0; first != last; ++first, ++i) {
329      auto& f = *first;
330      f.setCallback_([ctx, i](Try<T>&& t) {
331          ctx->results[i] = std::move(t);
332          if (++ctx->count == ctx->total) {
333            ctx->p.setValue(std::move(ctx->results));
334            delete ctx;
335          }
336        });
337   }
338
339   return std::move(f_saved);
340 }
341
342 template <class InputIterator>
343 Future<
344   std::pair<size_t,
345             Try<
346               typename
347               std::iterator_traits<InputIterator>::value_type::value_type> > >
348 whenAny(InputIterator first, InputIterator last) {
349   typedef
350     typename std::iterator_traits<InputIterator>::value_type::value_type T;
351
352   auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
353   auto f_saved = ctx->p.getFuture();
354
355   for (size_t i = 0; first != last; first++, i++) {
356     auto& f = *first;
357     f.setCallback_([i, ctx](Try<T>&& t) {
358       if (!ctx->done.exchange(true)) {
359         ctx->p.setValue(std::make_pair(i, std::move(t)));
360       }
361       ctx->decref();
362     });
363   }
364
365   return std::move(f_saved);
366 }
367
368 template <class InputIterator>
369 Future<std::vector<std::pair<size_t, Try<typename
370   std::iterator_traits<InputIterator>::value_type::value_type>>>>
371 whenN(InputIterator first, InputIterator last, size_t n) {
372   typedef typename
373     std::iterator_traits<InputIterator>::value_type::value_type T;
374   typedef std::vector<std::pair<size_t, Try<T>>> V;
375
376   struct ctx_t {
377     V v;
378     size_t completed;
379     Promise<V> p;
380   };
381   auto ctx = std::make_shared<ctx_t>();
382   ctx->completed = 0;
383
384   // for each completed Future, increase count and add to vector, until we
385   // have n completed futures at which point we fulfil our Promise with the
386   // vector
387   auto it = first;
388   size_t i = 0;
389   while (it != last) {
390     it->then([ctx, n, i](Try<T>&& t) {
391       auto& v = ctx->v;
392       auto c = ++ctx->completed;
393       if (c <= n) {
394         assert(ctx->v.size() < n);
395         v.push_back(std::make_pair(i, std::move(t)));
396         if (c == n) {
397           ctx->p.fulfilTry(Try<V>(std::move(v)));
398         }
399       }
400     });
401
402     it++;
403     i++;
404   }
405
406   if (i < n) {
407     ctx->p.setException(std::runtime_error("Not enough futures"));
408   }
409
410   return ctx->p.getFuture();
411 }
412
413 template <typename T>
414 Future<T>
415 waitWithSemaphore(Future<T>&& f) {
416   Baton<> baton;
417   auto done = f.then([&](Try<T> &&t) {
418     baton.post();
419     return std::move(t.value());
420   });
421   baton.wait();
422   while (!done.isReady()) {
423     // There's a race here between the return here and the actual finishing of
424     // the future. f is completed, but the setup may not have finished on done
425     // after the baton has posted.
426     std::this_thread::yield();
427   }
428   return done;
429 }
430
431 template<>
432 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
433   Baton<> baton;
434   auto done = f.then([&](Try<void> &&t) {
435     baton.post();
436     t.value();
437   });
438   baton.wait();
439   while (!done.isReady()) {
440     // There's a race here between the return here and the actual finishing of
441     // the future. f is completed, but the setup may not have finished on done
442     // after the baton has posted.
443     std::this_thread::yield();
444   }
445   return done;
446 }
447
448 template <typename T, class Duration>
449 Future<T>
450 waitWithSemaphore(Future<T>&& f, Duration timeout) {
451   auto baton = std::make_shared<Baton<>>();
452   auto done = f.then([baton](Try<T> &&t) {
453     baton->post();
454     return std::move(t.value());
455   });
456   baton->timed_wait(std::chrono::system_clock::now() + timeout);
457   return done;
458 }
459
460 template <class Duration>
461 Future<void>
462 waitWithSemaphore(Future<void>&& f, Duration timeout) {
463   auto baton = std::make_shared<Baton<>>();
464   auto done = f.then([baton](Try<void> &&t) {
465     baton->post();
466     t.value();
467   });
468   baton->timed_wait(std::chrono::system_clock::now() + timeout);
469   return done;
470 }
471
472 }}
473
474 // I haven't included a Future<T&> specialization because I don't forsee us
475 // using it, however it is not difficult to add when needed. Refer to
476 // Future<void> for guidance. std::future and boost::future code would also be
477 // instructive.