#include <folly/Conv.h>
#include <folly/futures/Future.h>
+#include <folly/futures/ManualExecutor.h>
#include <folly/portability/GTest.h>
#include <vector>
EXPECT_EQ(expect, res);
};
{
- // 2 in-flight at a time
+ SCOPED_TRACE("2 in-flight at a time");
std::vector<int> input = {1, 2, 3};
fn(input, 2, 6);
}
{
- // 4 in-flight at a time
+ SCOPED_TRACE("4 in-flight at a time");
std::vector<int> input = {1, 2, 3};
fn(input, 4, 6);
}
{
- // empty input
+ SCOPED_TRACE("empty input");
std::vector<int> input;
fn(input, 1, 0);
}
barrier.wait();
- for (size_t i = 0; i < ps.size(); i++) {
- ts[i].join();
+ for (auto& t : ts) {
+ t.join();
}
EXPECT_TRUE(f.isReady());
barrier.wait();
- for (size_t i = 0; i < ps.size(); i++) {
- ts[i].join();
+ for (auto& t : ts) {
+ t.join();
}
EXPECT_TRUE(f.isReady());
barrier.wait();
- for (size_t i = 0; i < ps.size(); i++) {
- ts[i].join();
+ for (auto& t : ts) {
+ t.join();
}
EXPECT_TRUE(f.isReady());
}
}
}
+
+TEST(WindowExecutor, basic) {
+ ManualExecutor executor;
+
+ // int -> Future<int>
+ auto fn = [executor_ = &executor](
+ std::vector<int> input, size_t window_size, size_t expect) {
+ auto res = reduce(
+ window(
+ executor_, input, [](int i) { return makeFuture(i); }, window_size),
+ 0,
+ [](int sum, const Try<int>& b) { return sum + *b; });
+ executor_->waitFor(res);
+ EXPECT_EQ(expect, res.get());
+ };
+ {
+ SCOPED_TRACE("2 in-flight at a time");
+ std::vector<int> input = {1, 2, 3};
+ fn(input, 2, 6);
+ }
+ {
+ SCOPED_TRACE("4 in-flight at a time");
+ std::vector<int> input = {1, 2, 3};
+ fn(input, 4, 6);
+ }
+ {
+ SCOPED_TRACE("empty input");
+ std::vector<int> input;
+ fn(input, 1, 0);
+ }
+ {
+ // int -> Future<Unit>
+ auto res = reduce(
+ window(
+ &executor,
+ std::vector<int>({1, 2, 3}),
+ [](int /* i */) { return makeFuture(); },
+ 2),
+ 0,
+ [](int sum, const Try<Unit>& b) {
+ EXPECT_TRUE(b.hasValue());
+ return sum + 1;
+ });
+ executor.waitFor(res);
+ EXPECT_EQ(3, res.get());
+ }
+ {
+ // string -> return Future<int>
+ auto res = reduce(
+ window(
+ &executor,
+ std::vector<std::string>{"1", "2", "3"},
+ [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
+ 2),
+ 0,
+ [](int sum, const Try<int>& b) { return sum + *b; });
+ executor.waitFor(res);
+ EXPECT_EQ(6, res.get());
+ }
+}
+
+TEST(WindowExecutor, parallel) {
+ ManualExecutor executor;
+
+ std::vector<int> input;
+ std::vector<Promise<int>> ps(10);
+ for (size_t i = 0; i < ps.size(); i++) {
+ input.emplace_back(i);
+ }
+ auto f = collect(
+ window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ ps[i].setValue(i);
+ });
+ }
+
+ barrier.wait();
+
+ for (auto& t : ts) {
+ t.join();
+ }
+
+ executor.waitFor(f);
+ EXPECT_TRUE(f.isReady());
+ for (size_t i = 0; i < ps.size(); i++) {
+ EXPECT_EQ(i, f.value()[i]);
+ }
+}
+
+TEST(WindowExecutor, parallelWithError) {
+ ManualExecutor executor;
+
+ std::vector<int> input;
+ std::vector<Promise<int>> ps(10);
+ for (size_t i = 0; i < ps.size(); i++) {
+ input.emplace_back(i);
+ }
+ auto f = collect(
+ window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ if (i == (ps.size() / 2)) {
+ ps[i].setException(eggs);
+ } else {
+ ps[i].setValue(i);
+ }
+ });
+ }
+
+ barrier.wait();
+
+ for (auto& t : ts) {
+ t.join();
+ }
+
+ executor.waitFor(f);
+ EXPECT_TRUE(f.isReady());
+ EXPECT_THROW(f.value(), eggs_t);
+}
+
+TEST(WindowExecutor, allParallelWithError) {
+ ManualExecutor executor;
+
+ std::vector<int> input;
+ std::vector<Promise<int>> ps(10);
+ for (size_t i = 0; i < ps.size(); i++) {
+ input.emplace_back(i);
+ }
+ auto f = collectAll(
+ window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+ std::vector<std::thread> ts;
+ boost::barrier barrier(ps.size() + 1);
+ for (size_t i = 0; i < ps.size(); i++) {
+ ts.emplace_back([&ps, &barrier, i]() {
+ barrier.wait();
+ if (i == (ps.size() / 2)) {
+ ps[i].setException(eggs);
+ } else {
+ ps[i].setValue(i);
+ }
+ });
+ }
+
+ barrier.wait();
+
+ for (auto& t : ts) {
+ t.join();
+ }
+
+ executor.waitFor(f);
+ EXPECT_TRUE(f.isReady());
+ for (size_t i = 0; i < ps.size(); i++) {
+ if (i == (ps.size() / 2)) {
+ EXPECT_THROW(f.value()[i].value(), eggs_t);
+ } else {
+ EXPECT_TRUE(f.value()[i].hasValue());
+ EXPECT_EQ(i, f.value()[i].value());
+ }
+ }
+}