#include <folly/futures/ManualExecutor.h>
#include <folly/futures/DrivableExecutor.h>
#include <folly/dynamic.h>
+#include <folly/Baton.h>
#include <folly/MPMCQueue.h>
#include <folly/io/async/EventBase.h>
EXPECT_EQ(42, makeFuture<float>(42).value());
auto fun = [] { return 42; };
- EXPECT_TYPE(makeFutureTry(fun), Future<int>);
- EXPECT_EQ(42, makeFutureTry(fun).value());
+ EXPECT_TYPE(makeFutureWith(fun), Future<int>);
+ EXPECT_EQ(42, makeFutureWith(fun).value());
auto failfun = []() -> int { throw eggs; };
- EXPECT_TYPE(makeFutureTry(failfun), Future<int>);
- EXPECT_THROW(makeFutureTry(failfun).value(), eggs_t);
+ EXPECT_TYPE(makeFutureWith(failfun), Future<int>);
+ EXPECT_THROW(makeFutureWith(failfun).value(), eggs_t);
EXPECT_TYPE(makeFuture(), Future<void>);
}
EXPECT_EQ(7, f.value());
}
+TEST(Future, stream) {
+ auto fn = [](vector<int> input, size_t window_size, size_t expect) {
+ auto res = reduce(
+ window(
+ input,
+ [](int i) { return makeFuture(i); },
+ 2),
+ 0,
+ [](int sum, const Try<int>& b) {
+ return sum + *b;
+ }).get();
+ EXPECT_EQ(expect, res);
+ };
+ {
+ // streaming 2 at a time
+ vector<int> input = {1, 2, 3};
+ fn(input, 2, 6);
+ }
+ {
+ // streaming 4 at a time
+ vector<int> input = {1, 2, 3};
+ fn(input, 4, 6);
+ }
+ {
+ // empty inpt
+ vector<int> input;
+ fn(input, 1, 0);
+ }
+}
+
TEST(Future, collectAll) {
// returns a vector variant
{
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = collectAll(futures.begin(), futures.end());
+ auto allf = collectAll(futures);
random_shuffle(promises.begin(), promises.end());
for (auto& p : promises) {
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = collectAll(futures.begin(), futures.end());
+ auto allf = collectAll(futures);
promises[0].setValue(42);
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = collectAll(futures.begin(), futures.end())
+ auto allf = collectAll(futures)
.then([](Try<vector<Try<void>>>&& ts) {
for (auto& f : ts.value())
f.value();
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = collect(futures.begin(), futures.end());
+ auto allf = collect(futures);
random_shuffle(promises.begin(), promises.end());
for (auto& p : promises) {
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = collect(futures.begin(), futures.end());
+ auto allf = collect(futures);
random_shuffle(promises.begin(), promises.end());
for (int i = 0; i < 10; i++) {
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = collect(futures.begin(), futures.end());
+ auto allf = collect(futures);
random_shuffle(promises.begin(), promises.end());
for (auto& p : promises) {
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = collect(futures.begin(), futures.end());
+ auto allf = collect(futures);
random_shuffle(promises.begin(), promises.end());
for (int i = 0; i < 10; i++) {
for (auto& p : promises)
futures.push_back(p.getFuture());
- collect(futures.begin(), futures.end());
+ collect(futures);
}
}
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto allf = collect(futures.begin(), futures.end());
+ auto allf = collect(futures);
for (auto i : indices) {
EXPECT_FALSE(allf.isReady());
EXPECT_FALSE(f.isReady());
}
- auto anyf = collectAny(futures.begin(), futures.end());
+ auto anyf = collectAny(futures);
/* futures were moved in, so these are invalid now */
EXPECT_FALSE(anyf.isReady());
EXPECT_FALSE(f.isReady());
}
- auto anyf = collectAny(futures.begin(), futures.end());
+ auto anyf = collectAny(futures);
EXPECT_FALSE(anyf.isReady());
for (auto& p : promises)
futures.push_back(p.getFuture());
- auto anyf = collectAny(futures.begin(), futures.end())
+ auto anyf = collectAny(futures)
.then([](pair<size_t, Try<int>> p) {
EXPECT_EQ(42, p.second.value());
});
for (int i = 0; i < 10; i++)
fs.push_back(makeFuture());
- collectAll(fs.begin(), fs.end())
+ collectAll(fs)
.then([&](vector<Try<void>> ts) {
EXPECT_EQ(fs.size(), ts.size());
});
for (int i = 0; i < 10; i++)
fs.push_back(makeFuture(i));
- collectAny(fs.begin(), fs.end())
+ collectAny(fs)
.then([&](pair<size_t, Try<int>> p) {
EXPECT_EQ(p.first, p.second.value());
});
bool flag = false;
size_t n = 3;
- collectN(futures.begin(), futures.end(), n)
+ collectN(futures, n)
.then([&](vector<pair<size_t, Try<void>>> v) {
flag = true;
EXPECT_EQ(n, v.size());
for (int i = 0; i < 10; i++)
futures.push_back(makeFuture());
- auto anyf = collectAny(futures.begin(), futures.end());
+ auto anyf = collectAny(futures);
}
{
for (int i = 0; i < 10; i++)
futures.push_back(makeFuture());
- auto allf = collectAll(futures.begin(), futures.end());
+ auto allf = collectAll(futures);
}
}
TEST(Future, collectAll_none) {
vector<Future<int>> fs;
- auto f = collectAll(fs.begin(), fs.end());
+ auto f = collectAll(fs);
EXPECT_TRUE(f.isReady());
}
vector<Future<void>> v_f;
v_f.push_back(makeFuture());
v_f.push_back(makeFuture());
- auto done_v_f = collectAll(v_f.begin(), v_f.end()).wait().value();
+ auto done_v_f = collectAll(v_f).wait().value();
EXPECT_EQ(2, done_v_f.size());
vector<Future<bool>> v_fb;
v_fb.push_back(makeFuture(true));
v_fb.push_back(makeFuture(false));
- auto fut = collectAll(v_fb.begin(), v_fb.end());
+ auto fut = collectAll(v_fb);
auto done_v_fb = std::move(fut.wait().value());
EXPECT_EQ(2, done_v_fb.size());
}
vector<Future<bool>> v_fb;
v_fb.push_back(makeFuture(true));
v_fb.push_back(makeFuture(false));
- auto f = collectAll(v_fb.begin(), v_fb.end());
+ auto f = collectAll(v_fb);
f.wait(milliseconds(1));
EXPECT_TRUE(f.isReady());
EXPECT_EQ(2, f.value().size());
Promise<bool> p2;
v_fb.push_back(p1.getFuture());
v_fb.push_back(p2.getFuture());
- auto f = collectAll(v_fb.begin(), v_fb.end());
+ auto f = collectAll(v_fb);
f.wait(milliseconds(1));
EXPECT_FALSE(f.isReady());
p1.setValue(true);
for (auto& p : *promises) p.setValue();
});
- return collectAll(futures.begin(), futures.end());
+ return collectAll(futures);
};
fn().wait();
{
auto fs = makeFutures(0);
- Future<double> f1 = reduce(fs.begin(), fs.end(), 1.2,
+ Future<double> f1 = reduce(fs, 1.2,
[](double a, Try<int>&& b){
return a + *b + 0.1;
});
{
auto fs = makeFutures(1);
- Future<double> f1 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f1 = reduce(fs, 0.0,
[](double a, Try<int>&& b){
return a + *b + 0.1;
});
{
auto fs = makeFutures(3);
- Future<double> f1 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f1 = reduce(fs, 0.0,
[](double a, Try<int>&& b){
return a + *b + 0.1;
});
{
auto fs = makeFutures(3);
- Future<double> f1 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f1 = reduce(fs, 0.0,
[](double a, int&& b){
return a + b + 0.1;
});
{
auto fs = makeFutures(3);
- Future<double> f2 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f2 = reduce(fs, 0.0,
[](double a, Try<int>&& b){
return makeFuture<double>(a + *b + 0.1);
});
{
auto fs = makeFutures(3);
- Future<double> f2 = reduce(fs.begin(), fs.end(), 0.0,
+ Future<double> f2 = reduce(fs, 0.0,
[](double a, int&& b){
return makeFuture<double>(a + b + 0.1);
});
}
}
+TEST(Reduce, Chain) {
+ auto makeFutures = [](int count) {
+ std::vector<Future<int>> fs;
+ for (int i = 1; i <= count; ++i) {
+ fs.emplace_back(makeFuture(i));
+ }
+ return fs;
+ };
+
+ {
+ auto f = collectAll(makeFutures(3)).reduce(0, [](int a, Try<int>&& b){
+ return a + *b;
+ });
+ EXPECT_EQ(6, f.get());
+ }
+ {
+ auto f = collect(makeFutures(3)).reduce(0, [](int a, int&& b){
+ return a + b;
+ });
+ EXPECT_EQ(6, f.get());
+ }
+}
+
+TEST(Reduce, Streaming) {
+ {
+ std::vector<Future<int>> fs;
+ fs.push_back(makeFuture(1));
+ fs.push_back(makeFuture(2));
+ fs.push_back(makeFuture(3));
+
+ Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+ [](double a, int&& b){
+ return double(b);
+ });
+ EXPECT_EQ(3.0, f.get());
+ }
+ {
+ Promise<int> p1;
+ Promise<int> p2;
+ Promise<int> p3;
+
+ std::vector<Future<int>> fs;
+ fs.push_back(p1.getFuture());
+ fs.push_back(p2.getFuture());
+ fs.push_back(p3.getFuture());
+
+ Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+ [](double a, int&& b){
+ return double(b);
+ });
+ p3.setValue(3);
+ p2.setValue(2);
+ p1.setValue(1);
+ EXPECT_EQ(1.0, f.get());
+ }
+}
+
+TEST(Reduce, StreamingException) {
+ Promise<int> p1;
+ Promise<int> p2;
+ Promise<int> p3;
+
+ std::vector<Future<int>> fs;
+ fs.push_back(p1.getFuture());
+ fs.push_back(p2.getFuture());
+ fs.push_back(p3.getFuture());
+
+ Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+ [](double a, int&& b){
+ return b + 0.0;
+ });
+ p3.setValue(3);
+ p2.setException(exception_wrapper(std::runtime_error("blah")));
+ p1.setValue(1);
+ EXPECT_THROW(f.get(), std::runtime_error);
+}
+
TEST(Map, Basic) {
Promise<int> p1;
Promise<int> p2;
fs.push_back(p3.getFuture());
int c = 0;
- auto fs2 = futures::map(fs.begin(), fs.end(), [&](int i){
+ std::vector<Future<void>> fs2 = futures::map(fs, [&](int i){
c += i;
});
p1.setValue(1);
EXPECT_EQ(3, c);
- EXPECT_TRUE(collect(fs2.begin(), fs2.end()).isReady());
+ EXPECT_TRUE(collect(fs2).isReady());
}