#pragma once
#include <algorithm>
+#include <cassert>
#include <chrono>
#include <random>
#include <thread>
-
-#include <folly/experimental/fibers/Baton.h>
+#include <folly/Baton.h>
#include <folly/Optional.h>
#include <folly/Random.h>
#include <folly/Traits.h>
#include <folly/futures/detail/Core.h>
#include <folly/futures/Timekeeper.h>
+#if defined(__ANDROID__) || defined(__APPLE__)
+#define FOLLY_FUTURE_USING_FIBER 0
+#else
+#define FOLLY_FUTURE_USING_FIBER 1
+#include <folly/experimental/fibers/Baton.h>
+#endif
+
namespace folly {
class Timekeeper;
namespace detail {
- Timekeeper* getTimekeeperSingleton();
+#if FOLLY_FUTURE_USING_FIBER
+typedef folly::fibers::Baton FutureBatonType;
+#else
+typedef folly::Baton<> FutureBatonType;
+#endif
+}
+
+namespace detail {
+ std::shared_ptr<Timekeeper> getTimekeeperSingleton();
}
template <class T>
template <typename T>
struct CollectContext {
- struct Nothing { explicit Nothing(int n) {} };
+ struct Nothing {
+ explicit Nothing(int /* n */) {}
+ };
using Result = typename std::conditional<
std::is_void<T>::value,
auto ctx = std::make_shared<UnorderedReduceContext>(
std::move(initial), std::move(func), std::distance(first, last));
- mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
- folly::MoveWrapper<Try<ItT>> mt(std::move(t));
- // Futures can be completed in any order, simultaneously.
- // To make this non-blocking, we create a new Future chain in
- // the order of completion to reduce the values.
- // The spinlock just protects chaining a new Future, not actually
- // executing the reduce, which should be really fast.
- folly::MSLGuard lock(ctx->lock_);
- ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
- // Either return a ItT&& or a Try<ItT>&& depending
- // on the type of the argument of func.
- return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
- });
- if (++ctx->numThens_ == ctx->numFutures_) {
- // After reducing the value of the last Future, fulfill the Promise
- ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
- ctx->promise_.setValue(std::move(t2));
+ mapSetCallback<ItT>(
+ first,
+ last,
+ [ctx](size_t /* i */, Try<ItT>&& t) {
+ folly::MoveWrapper<Try<ItT>> mt(std::move(t));
+ // Futures can be completed in any order, simultaneously.
+ // To make this non-blocking, we create a new Future chain in
+ // the order of completion to reduce the values.
+ // The spinlock just protects chaining a new Future, not actually
+ // executing the reduce, which should be really fast.
+ folly::MSLGuard lock(ctx->lock_);
+ ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
+ // Either return a ItT&& or a Try<ItT>&& depending
+ // on the type of the argument of func.
+ return ctx->func_(std::move(v),
+ mt->template get<IsTry::value, Arg&&>());
+ });
+ if (++ctx->numThens_ == ctx->numFutures_) {
+ // After reducing the value of the last Future, fulfill the Promise
+ ctx->memo_.setCallback_(
+ [ctx](Try<T>&& t2) { ctx->promise_.setValue(std::move(t2)); });
+ }
});
- }
- });
return ctx->promise_.getFuture();
}
std::atomic<bool> token {false};
};
+ std::shared_ptr<Timekeeper> tks;
if (!tk) {
- tk = folly::detail::getTimekeeperSingleton();
+ tks = folly::detail::getTimekeeperSingleton();
+ tk = DCHECK_NOTNULL(tks.get());
}
auto ctx = std::make_shared<Context>(std::move(e));
// short-circuit if there's nothing to do
if (f.isReady()) return;
- folly::fibers::Baton baton;
- f = f.then([&](Try<T> t) {
- baton.post();
- return makeFuture(std::move(t));
- });
+ FutureBatonType baton;
+ f.setCallback_([&](const Try<T>& /* t */) { baton.post(); });
baton.wait();
-
- // There's a race here between the return here and the actual finishing of
- // the future. f is completed, but the setup may not have finished on done
- // after the baton has posted.
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ assert(f.isReady());
}
template <class T>
// short-circuit if there's nothing to do
if (f.isReady()) return;
- auto baton = std::make_shared<folly::fibers::Baton>();
- f = f.then([baton](Try<T> t) {
+ folly::MoveWrapper<Promise<T>> promise;
+ auto ret = promise->getFuture();
+ auto baton = std::make_shared<FutureBatonType>();
+ f.setCallback_([baton, promise](Try<T>&& t) mutable {
+ promise->setTry(std::move(t));
baton->post();
- return makeFuture(std::move(t));
});
-
- // Let's preserve the invariant that if we did not timeout (timed_wait returns
- // true), then the returned Future is complete when it is returned to the
- // caller. We need to wait out the race for that Future to complete.
+ f = std::move(ret);
if (baton->timed_wait(dur)) {
- while (!f.isReady()) {
- std::this_thread::yield();
- }
+ assert(f.isReady());
}
}