2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <boost/thread/barrier.hpp>
19 #include <folly/Conv.h>
20 #include <folly/futures/Future.h>
21 #include <folly/portability/GTest.h>
25 using namespace folly;
27 typedef FutureException eggs_t;
28 static eggs_t eggs("eggs");
32 auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
36 [](int i) { return makeFuture(i); },
39 [](int sum, const Try<int>& b) {
42 EXPECT_EQ(expect, res);
45 // 2 in-flight at a time
46 std::vector<int> input = {1, 2, 3};
50 // 4 in-flight at a time
51 std::vector<int> input = {1, 2, 3};
56 std::vector<int> input;
60 // int -> Future<Unit>
61 auto res = reduce(window(std::vector<int>({1, 2, 3}),
62 [](int /* i */) { return makeFuture(); },
65 [](int sum, const Try<Unit>& b) {
66 EXPECT_TRUE(b.hasValue());
72 // string -> return Future<int>
75 std::vector<std::string>{"1", "2", "3"},
76 [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
79 [](int sum, const Try<int>& b) {
86 TEST(Window, parallel) {
87 std::vector<int> input;
88 std::vector<Promise<int>> ps(10);
89 for (size_t i = 0; i < ps.size(); i++) {
90 input.emplace_back(i);
92 auto f = collect(window(input, [&](int i) {
93 return ps[i].getFuture();
96 std::vector<std::thread> ts;
97 boost::barrier barrier(ps.size() + 1);
98 for (size_t i = 0; i < ps.size(); i++) {
99 ts.emplace_back([&ps, &barrier, i]() {
107 for (size_t i = 0; i < ps.size(); i++) {
111 EXPECT_TRUE(f.isReady());
112 for (size_t i = 0; i < ps.size(); i++) {
113 EXPECT_EQ(i, f.value()[i]);
117 TEST(Window, parallelWithError) {
118 std::vector<int> input;
119 std::vector<Promise<int>> ps(10);
120 for (size_t i = 0; i < ps.size(); i++) {
121 input.emplace_back(i);
123 auto f = collect(window(input, [&](int i) {
124 return ps[i].getFuture();
127 std::vector<std::thread> ts;
128 boost::barrier barrier(ps.size() + 1);
129 for (size_t i = 0; i < ps.size(); i++) {
130 ts.emplace_back([&ps, &barrier, i]() {
132 if (i == (ps.size()/2)) {
133 ps[i].setException(eggs);
142 for (size_t i = 0; i < ps.size(); i++) {
146 EXPECT_TRUE(f.isReady());
147 EXPECT_THROW(f.value(), eggs_t);
150 TEST(Window, allParallelWithError) {
151 std::vector<int> input;
152 std::vector<Promise<int>> ps(10);
153 for (size_t i = 0; i < ps.size(); i++) {
154 input.emplace_back(i);
156 auto f = collectAll(window(input, [&](int i) {
157 return ps[i].getFuture();
160 std::vector<std::thread> ts;
161 boost::barrier barrier(ps.size() + 1);
162 for (size_t i = 0; i < ps.size(); i++) {
163 ts.emplace_back([&ps, &barrier, i]() {
165 if (i == (ps.size()/2)) {
166 ps[i].setException(eggs);
175 for (size_t i = 0; i < ps.size(); i++) {
179 EXPECT_TRUE(f.isReady());
180 for (size_t i = 0; i < ps.size(); i++) {
181 if (i == (ps.size()/2)) {
182 EXPECT_THROW(f.value()[i].value(), eggs_t);
184 EXPECT_TRUE(f.value()[i].hasValue());
185 EXPECT_EQ(i, f.value()[i].value());