2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <gtest/gtest.h>
19 #include <boost/thread/barrier.hpp>
21 #include <folly/Conv.h>
22 #include <folly/futures/Future.h>
26 using namespace folly;
28 typedef FutureException eggs_t;
29 static eggs_t eggs("eggs");
33 auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
37 [](int i) { return makeFuture(i); },
40 [](int sum, const Try<int>& b) {
43 EXPECT_EQ(expect, res);
46 // 2 in-flight at a time
47 std::vector<int> input = {1, 2, 3};
51 // 4 in-flight at a time
52 std::vector<int> input = {1, 2, 3};
57 std::vector<int> input;
61 // int -> Future<Unit>
62 auto res = reduce(window(std::vector<int>({1, 2, 3}),
63 [](int /* i */) { return makeFuture(); },
66 [](int sum, const Try<Unit>& b) {
67 EXPECT_TRUE(b.hasValue());
73 // string -> return Future<int>
76 std::vector<std::string>{"1", "2", "3"},
77 [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
80 [](int sum, const Try<int>& b) {
87 TEST(Window, parallel) {
88 std::vector<int> input;
89 std::vector<Promise<int>> ps(10);
90 for (size_t i = 0; i < ps.size(); i++) {
91 input.emplace_back(i);
93 auto f = collect(window(input, [&](int i) {
94 return ps[i].getFuture();
97 std::vector<std::thread> ts;
98 boost::barrier barrier(ps.size() + 1);
99 for (size_t i = 0; i < ps.size(); i++) {
100 ts.emplace_back([&ps, &barrier, i]() {
108 for (size_t i = 0; i < ps.size(); i++) {
112 EXPECT_TRUE(f.isReady());
113 for (size_t i = 0; i < ps.size(); i++) {
114 EXPECT_EQ(i, f.value()[i]);
118 TEST(Window, parallelWithError) {
119 std::vector<int> input;
120 std::vector<Promise<int>> ps(10);
121 for (size_t i = 0; i < ps.size(); i++) {
122 input.emplace_back(i);
124 auto f = collect(window(input, [&](int i) {
125 return ps[i].getFuture();
128 std::vector<std::thread> ts;
129 boost::barrier barrier(ps.size() + 1);
130 for (size_t i = 0; i < ps.size(); i++) {
131 ts.emplace_back([&ps, &barrier, i]() {
133 if (i == (ps.size()/2)) {
134 ps[i].setException(eggs);
143 for (size_t i = 0; i < ps.size(); i++) {
147 EXPECT_TRUE(f.isReady());
148 EXPECT_THROW(f.value(), eggs_t);
151 TEST(Window, allParallelWithError) {
152 std::vector<int> input;
153 std::vector<Promise<int>> ps(10);
154 for (size_t i = 0; i < ps.size(); i++) {
155 input.emplace_back(i);
157 auto f = collectAll(window(input, [&](int i) {
158 return ps[i].getFuture();
161 std::vector<std::thread> ts;
162 boost::barrier barrier(ps.size() + 1);
163 for (size_t i = 0; i < ps.size(); i++) {
164 ts.emplace_back([&ps, &barrier, i]() {
166 if (i == (ps.size()/2)) {
167 ps[i].setException(eggs);
176 for (size_t i = 0; i < ps.size(); i++) {
180 EXPECT_TRUE(f.isReady());
181 for (size_t i = 0; i < ps.size(); i++) {
182 if (i == (ps.size()/2)) {
183 EXPECT_THROW(f.value()[i].value(), eggs_t);
185 EXPECT_TRUE(f.value()[i].hasValue());
186 EXPECT_EQ(i, f.value()[i].value());