#include <folly/futures/Future.h>
#include <folly/futures/ManualExecutor.h>
#include <folly/futures/DrivableExecutor.h>
+#include <folly/dynamic.h>
+#include <folly/Baton.h>
#include <folly/MPMCQueue.h>
#include <folly/io/async/EventBase.h>
.onError([&] (eggs_t& e) { throw e; return makeFuture<int>(-1); });
EXPECT_THROW(f.value(), eggs_t);
}
+
+ // exception_wrapper, return Future<T>
+ {
+ auto f = makeFuture()
+ .then([] { throw eggs; })
+ .onError([&] (exception_wrapper e) { flag(); return makeFuture(); });
+ EXPECT_FLAG();
+ EXPECT_NO_THROW(f.value());
+ }
+
+ // exception_wrapper, return Future<T> but throw
+ {
+ auto f = makeFuture()
+ .then([]{ throw eggs; return 0; })
+ .onError([&] (exception_wrapper e) {
+ flag();
+ throw eggs;
+ return makeFuture<int>(-1);
+ });
+ EXPECT_FLAG();
+ EXPECT_THROW(f.value(), eggs_t);
+ }
+
+ // exception_wrapper, return T
+ {
+ auto f = makeFuture()
+ .then([]{ throw eggs; return 0; })
+ .onError([&] (exception_wrapper e) {
+ flag();
+ return -1;
+ });
+ EXPECT_FLAG();
+ EXPECT_EQ(-1, f.value());
+ }
+
+ // exception_wrapper, return T but throw
+ {
+ auto f = makeFuture()
+ .then([]{ throw eggs; return 0; })
+ .onError([&] (exception_wrapper e) {
+ flag();
+ throw eggs;
+ return -1;
+ });
+ EXPECT_FLAG();
+ EXPECT_THROW(f.value(), eggs_t);
+ }
+
+ // const exception_wrapper&
+ {
+ auto f = makeFuture()
+ .then([] { throw eggs; })
+ .onError([&] (const exception_wrapper& e) {
+ flag();
+ return makeFuture();
+ });
+ EXPECT_FLAG();
+ EXPECT_NO_THROW(f.value());
+ }
+
}
TEST(Future, try) {
EXPECT_EQ(42, makeFuture<float>(42).value());
auto fun = [] { return 42; };
- EXPECT_TYPE(makeFutureTry(fun), Future<int>);
- EXPECT_EQ(42, makeFutureTry(fun).value());
+ EXPECT_TYPE(makeFutureWith(fun), Future<int>);
+ EXPECT_EQ(42, makeFutureWith(fun).value());
auto failfun = []() -> int { throw eggs; };
- EXPECT_TYPE(makeFutureTry(failfun), Future<int>);
- EXPECT_THROW(makeFutureTry(failfun).value(), eggs_t);
+ EXPECT_TYPE(makeFutureWith(failfun), Future<int>);
+ EXPECT_THROW(makeFutureWith(failfun).value(), eggs_t);
EXPECT_TYPE(makeFuture(), Future<void>);
}
}
}
-TEST(Promise, fulfil) {
+TEST(Promise, setWith) {
{
Promise<int> p;
auto f = p.getFuture();
- p.fulfil([] { return 42; });
+ p.setWith([] { return 42; });
EXPECT_EQ(42, f.value());
}
{
Promise<int> p;
auto f = p.getFuture();
- p.fulfil([]() -> int { throw eggs; });
+ p.setWith([]() -> int { throw eggs; });
EXPECT_THROW(f.value(), eggs_t);
}
}
EXPECT_EQ(7, f.value());
}
-TEST(Future, whenAll) {
+TEST(Future, stream) {
+ auto fn = [](vector<int> input, size_t window_size, size_t expect) {
+ auto res = reduce(
+ window(
+ input,
+ [](int i) { return makeFuture(i); },
+ 2),
+ 0,
+ [](int sum, const Try<int>& b) {
+ return sum + *b;
+ }).get();
+ EXPECT_EQ(expect, res);
+ };
+ {
+ // streaming 2 at a time
+ vector<int> input = {1, 2, 3};
+ fn(input, 2, 6);
+ }
+ {
+ // streaming 4 at a time
+ vector<int> input = {1, 2, 3};
+ fn(input, 4, 6);
+ }
+ {
+ // empty inpt
+ vector<int> input;
+ fn(input, 1, 0);
+ }
+}
+
+TEST(Future, collectAll) {
// returns a vector variant
{
vector<Promise<int>> promises(10);
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = whenAll(futures.begin(), futures.end());
+ auto allf = collectAll(futures);
random_shuffle(promises.begin(), promises.end());
for (auto& p : promises) {
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = whenAll(futures.begin(), futures.end());
+ auto allf = collectAll(futures);
promises[0].setValue(42);
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = whenAll(futures.begin(), futures.end())
+ auto allf = collectAll(futures)
.then([](Try<vector<Try<void>>>&& ts) {
for (auto& f : ts.value())
f.value();
}
}
+TEST(Future, collect) {
+ // success case
+ {
+ vector<Promise<int>> promises(10);
+ vector<Future<int>> futures;
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
-TEST(Future, whenAny) {
+ auto allf = collect(futures);
+
+ random_shuffle(promises.begin(), promises.end());
+ for (auto& p : promises) {
+ EXPECT_FALSE(allf.isReady());
+ p.setValue(42);
+ }
+
+ EXPECT_TRUE(allf.isReady());
+ for (auto i : allf.value()) {
+ EXPECT_EQ(42, i);
+ }
+ }
+
+ // failure case
+ {
+ vector<Promise<int>> promises(10);
+ vector<Future<int>> futures;
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
+
+ auto allf = collect(futures);
+
+ random_shuffle(promises.begin(), promises.end());
+ for (int i = 0; i < 10; i++) {
+ if (i < 5) {
+ // everthing goes well so far...
+ EXPECT_FALSE(allf.isReady());
+ promises[i].setValue(42);
+ } else if (i == 5) {
+ // short circuit with an exception
+ EXPECT_FALSE(allf.isReady());
+ promises[i].setException(eggs);
+ EXPECT_TRUE(allf.isReady());
+ } else if (i < 8) {
+ // don't blow up on further values
+ EXPECT_TRUE(allf.isReady());
+ promises[i].setValue(42);
+ } else {
+ // don't blow up on further exceptions
+ EXPECT_TRUE(allf.isReady());
+ promises[i].setException(eggs);
+ }
+ }
+
+ EXPECT_THROW(allf.value(), eggs_t);
+ }
+
+ // void futures success case
+ {
+ vector<Promise<void>> promises(10);
+ vector<Future<void>> futures;
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
+
+ auto allf = collect(futures);
+
+ random_shuffle(promises.begin(), promises.end());
+ for (auto& p : promises) {
+ EXPECT_FALSE(allf.isReady());
+ p.setValue();
+ }
+
+ EXPECT_TRUE(allf.isReady());
+ }
+
+ // void futures failure case
+ {
+ vector<Promise<void>> promises(10);
+ vector<Future<void>> futures;
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
+
+ auto allf = collect(futures);
+
+ random_shuffle(promises.begin(), promises.end());
+ for (int i = 0; i < 10; i++) {
+ if (i < 5) {
+ // everthing goes well so far...
+ EXPECT_FALSE(allf.isReady());
+ promises[i].setValue();
+ } else if (i == 5) {
+ // short circuit with an exception
+ EXPECT_FALSE(allf.isReady());
+ promises[i].setException(eggs);
+ EXPECT_TRUE(allf.isReady());
+ } else if (i < 8) {
+ // don't blow up on further values
+ EXPECT_TRUE(allf.isReady());
+ promises[i].setValue();
+ } else {
+ // don't blow up on further exceptions
+ EXPECT_TRUE(allf.isReady());
+ promises[i].setException(eggs);
+ }
+ }
+
+ EXPECT_THROW(allf.value(), eggs_t);
+ }
+
+ // move only compiles
+ {
+ vector<Promise<unique_ptr<int>>> promises(10);
+ vector<Future<unique_ptr<int>>> futures;
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
+
+ collect(futures);
+ }
+
+}
+
+struct NotDefaultConstructible {
+ NotDefaultConstructible() = delete;
+ NotDefaultConstructible(int arg) : i(arg) {}
+ int i;
+};
+
+// We have a specialized implementation for non-default-constructible objects
+// Ensure that it works and preserves order
+TEST(Future, collectNotDefaultConstructible) {
+ vector<Promise<NotDefaultConstructible>> promises(10);
+ vector<Future<NotDefaultConstructible>> futures;
+ vector<int> indices(10);
+ std::iota(indices.begin(), indices.end(), 0);
+ random_shuffle(indices.begin(), indices.end());
+
+ for (auto& p : promises)
+ futures.push_back(p.getFuture());
+
+ auto allf = collect(futures);
+
+ for (auto i : indices) {
+ EXPECT_FALSE(allf.isReady());
+ promises[i].setValue(NotDefaultConstructible(i));
+ }
+
+ EXPECT_TRUE(allf.isReady());
+ int i = 0;
+ for (auto val : allf.value()) {
+ EXPECT_EQ(i, val.i);
+ i++;
+ }
+}
+
+TEST(Future, collectAny) {
{
vector<Promise<int>> promises(10);
vector<Future<int>> futures;
EXPECT_FALSE(f.isReady());
}
- auto anyf = whenAny(futures.begin(), futures.end());
+ auto anyf = collectAny(futures);
/* futures were moved in, so these are invalid now */
EXPECT_FALSE(anyf.isReady());
EXPECT_FALSE(f.isReady());
}
- auto anyf = whenAny(futures.begin(), futures.end());
+ auto anyf = collectAny(futures);
EXPECT_FALSE(anyf.isReady());
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto anyf = whenAny(futures.begin(), futures.end())
+ auto anyf = collectAny(futures)
.then([](pair<size_t, Try<int>> p) {
EXPECT_EQ(42, p.second.value());
});
for (int i = 0; i < 10; i++)
fs.push_back(makeFuture());
- whenAll(fs.begin(), fs.end())
+ collectAll(fs)
.then([&](vector<Try<void>> ts) {
EXPECT_EQ(fs.size(), ts.size());
});
for (int i = 0; i < 10; i++)
fs.push_back(makeFuture(i));
- whenAny(fs.begin(), fs.end())
+ collectAny(fs)
.then([&](pair<size_t, Try<int>> p) {
EXPECT_EQ(p.first, p.second.value());
});
}
}
-TEST(when, whenN) {
+TEST(when, collectN) {
vector<Promise<void>> promises(10);
vector<Future<void>> futures;
bool flag = false;
size_t n = 3;
- whenN(futures.begin(), futures.end(), n)
+ collectN(futures, n)
.then([&](vector<pair<size_t, Try<void>>> v) {
flag = true;
EXPECT_EQ(n, v.size());
for (int i = 0; i < 10; i++)
futures.push_back(makeFuture());
- auto anyf = whenAny(futures.begin(), futures.end());
+ auto anyf = collectAny(futures);
}
{
for (int i = 0; i < 10; i++)
futures.push_back(makeFuture());
- auto allf = whenAll(futures.begin(), futures.end());
+ auto allf = collectAll(futures);
}
}
-TEST(Future, whenAllVariadic) {
+TEST(Future, collectAllVariadic) {
Promise<bool> pb;
Promise<int> pi;
Future<bool> fb = pb.getFuture();
Future<int> fi = pi.getFuture();
bool flag = false;
- whenAll(std::move(fb), std::move(fi))
+ collectAll(std::move(fb), std::move(fi))
.then([&](std::tuple<Try<bool>, Try<int>> tup) {
flag = true;
EXPECT_TRUE(std::get<0>(tup).hasValue());
EXPECT_TRUE(flag);
}
-TEST(Future, whenAllVariadicReferences) {
+TEST(Future, collectAllVariadicReferences) {
Promise<bool> pb;
Promise<int> pi;
Future<bool> fb = pb.getFuture();
Future<int> fi = pi.getFuture();
bool flag = false;
- whenAll(fb, fi)
+ collectAll(fb, fi)
.then([&](std::tuple<Try<bool>, Try<int>> tup) {
flag = true;
EXPECT_TRUE(std::get<0>(tup).hasValue());
EXPECT_TRUE(flag);
}
-TEST(Future, whenAll_none) {
+TEST(Future, collectAll_none) {
vector<Future<int>> fs;
- auto f = whenAll(fs.begin(), fs.end());
+ auto f = collectAll(fs);
EXPECT_TRUE(f.isReady());
}
vector<Future<void>> v_f;
v_f.push_back(makeFuture());
v_f.push_back(makeFuture());
- auto done_v_f = whenAll(v_f.begin(), v_f.end()).wait().value();
+ auto done_v_f = collectAll(v_f).wait().value();
EXPECT_EQ(2, done_v_f.size());
vector<Future<bool>> v_fb;
v_fb.push_back(makeFuture(true));
v_fb.push_back(makeFuture(false));
- auto fut = whenAll(v_fb.begin(), v_fb.end());
+ auto fut = collectAll(v_fb);
auto done_v_fb = std::move(fut.wait().value());
EXPECT_EQ(2, done_v_fb.size());
}
vector<Future<bool>> v_fb;
v_fb.push_back(makeFuture(true));
v_fb.push_back(makeFuture(false));
- auto f = whenAll(v_fb.begin(), v_fb.end());
+ auto f = collectAll(v_fb);
f.wait(milliseconds(1));
EXPECT_TRUE(f.isReady());
EXPECT_EQ(2, f.value().size());
Promise<bool> p2;
v_fb.push_back(p1.getFuture());
v_fb.push_back(p2.getFuture());
- auto f = whenAll(v_fb.begin(), v_fb.end());
+ auto f = collectAll(v_fb);
f.wait(milliseconds(1));
EXPECT_FALSE(f.isReady());
p1.setValue(true);
TEST(Future, getFuture_after_setException) {
Promise<void> p;
- p.fulfil([]() -> void { throw std::logic_error("foo"); });
+ p.setWith([]() -> void { throw std::logic_error("foo"); });
EXPECT_THROW(p.getFuture().value(), std::logic_error);
}
EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
- // Fulfil the promise
+ // Fulfill the promise
p.setValue();
}
for (auto& p : *promises) p.setValue();
});
- return whenAll(futures.begin(), futures.end());
+ return collectAll(futures);
};
fn().wait();
ptr.reset();
- promise.fulfil([]{return 1l;});
+ promise.setWith([]{return 1l;});
}
TEST(Future, Constructor) {
//auto f2 = []() -> Future<void> { }();
}
+TEST(Future, thenDynamic) {
+ // folly::dynamic has a constructor that takes any T, this test makes
+ // sure that we call the then lambda with folly::dynamic and not
+ // Try<folly::dynamic> because that then fails to compile
+ Promise<folly::dynamic> p;
+ Future<folly::dynamic> f = p.getFuture().then(
+ [](const folly::dynamic& d) {
+ return folly::dynamic(d.asInt() + 3);
+ }
+ );
+ p.setValue(2);
+ EXPECT_EQ(f.get(), 5);
+}
+
TEST(Future, via_then_get_was_racy) {
ThreadExecutor x;
std::unique_ptr<int> val = folly::via(&x)
{
auto fs = makeFutures(0);
- Future<double> f1 = reduce(fs.begin(), fs.end(), 1.2,
+ Future<double> f1 = reduce(fs, 1.2,
[](double a, Try<int>&& b){
return a + *b + 0.1;
});
{
auto fs = makeFutures(1);
- Future<double> f1 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f1 = reduce(fs, 0.0,
[](double a, Try<int>&& b){
return a + *b + 0.1;
});
{
auto fs = makeFutures(3);
- Future<double> f1 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f1 = reduce(fs, 0.0,
[](double a, Try<int>&& b){
return a + *b + 0.1;
});
{
auto fs = makeFutures(3);
- Future<double> f1 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f1 = reduce(fs, 0.0,
[](double a, int&& b){
return a + b + 0.1;
});
{
auto fs = makeFutures(3);
- Future<double> f2 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f2 = reduce(fs, 0.0,
[](double a, Try<int>&& b){
return makeFuture<double>(a + *b + 0.1);
});
{
auto fs = makeFutures(3);
- Future<double> f2 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f2 = reduce(fs, 0.0,
[](double a, int&& b){
return makeFuture<double>(a + b + 0.1);
});
EXPECT_EQ(6.3, f2.get());
}
}
+
+TEST(Reduce, Chain) {
+ auto makeFutures = [](int count) {
+ std::vector<Future<int>> fs;
+ for (int i = 1; i <= count; ++i) {
+ fs.emplace_back(makeFuture(i));
+ }
+ return fs;
+ };
+
+ {
+ auto f = collectAll(makeFutures(3)).reduce(0, [](int a, Try<int>&& b){
+ return a + *b;
+ });
+ EXPECT_EQ(6, f.get());
+ }
+ {
+ auto f = collect(makeFutures(3)).reduce(0, [](int a, int&& b){
+ return a + b;
+ });
+ EXPECT_EQ(6, f.get());
+ }
+}
+
+TEST(Reduce, Streaming) {
+ {
+ std::vector<Future<int>> fs;
+ fs.push_back(makeFuture(1));
+ fs.push_back(makeFuture(2));
+ fs.push_back(makeFuture(3));
+
+ Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+ [](double a, int&& b){
+ return double(b);
+ });
+ EXPECT_EQ(3.0, f.get());
+ }
+ {
+ Promise<int> p1;
+ Promise<int> p2;
+ Promise<int> p3;
+
+ std::vector<Future<int>> fs;
+ fs.push_back(p1.getFuture());
+ fs.push_back(p2.getFuture());
+ fs.push_back(p3.getFuture());
+
+ Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+ [](double a, int&& b){
+ return double(b);
+ });
+ p3.setValue(3);
+ p2.setValue(2);
+ p1.setValue(1);
+ EXPECT_EQ(1.0, f.get());
+ }
+}
+
+TEST(Reduce, StreamingException) {
+ Promise<int> p1;
+ Promise<int> p2;
+ Promise<int> p3;
+
+ std::vector<Future<int>> fs;
+ fs.push_back(p1.getFuture());
+ fs.push_back(p2.getFuture());
+ fs.push_back(p3.getFuture());
+
+ Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+ [](double a, int&& b){
+ return b + 0.0;
+ });
+ p3.setValue(3);
+ p2.setException(exception_wrapper(std::runtime_error("blah")));
+ p1.setValue(1);
+ EXPECT_THROW(f.get(), std::runtime_error);
+}
+
+TEST(Map, Basic) {
+ Promise<int> p1;
+ Promise<int> p2;
+ Promise<int> p3;
+
+ std::vector<Future<int>> fs;
+ fs.push_back(p1.getFuture());
+ fs.push_back(p2.getFuture());
+ fs.push_back(p3.getFuture());
+
+ int c = 0;
+ std::vector<Future<void>> fs2 = futures::map(fs, [&](int i){
+ c += i;
+ });
+
+ // Ensure we call the callbacks as the futures complete regardless of order
+ p2.setValue(1);
+ EXPECT_EQ(1, c);
+ p3.setValue(1);
+ EXPECT_EQ(2, c);
+ p1.setValue(1);
+ EXPECT_EQ(3, c);
+
+ EXPECT_TRUE(collect(fs2).isReady());
+}