Add window overload that takes an executor to prevent stack overflow
[folly.git] / folly / futures / test / WindowTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <boost/thread/barrier.hpp>
18
19 #include <folly/Conv.h>
20 #include <folly/futures/Future.h>
21 #include <folly/futures/ManualExecutor.h>
22 #include <folly/portability/GTest.h>
23
24 #include <vector>
25
26 using namespace folly;
27
28 typedef FutureException eggs_t;
29 static eggs_t eggs("eggs");
30
31 TEST(Window, basic) {
32   // int -> Future<int>
33   auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
34     auto res = reduce(
35       window(
36         input,
37         [](int i) { return makeFuture(i); },
38         window_size),
39       0,
40       [](int sum, const Try<int>& b) {
41         return sum + *b;
42       }).get();
43     EXPECT_EQ(expect, res);
44   };
45   {
46     SCOPED_TRACE("2 in-flight at a time");
47     std::vector<int> input = {1, 2, 3};
48     fn(input, 2, 6);
49   }
50   {
51     SCOPED_TRACE("4 in-flight at a time");
52     std::vector<int> input = {1, 2, 3};
53     fn(input, 4, 6);
54   }
55   {
56     SCOPED_TRACE("empty input");
57     std::vector<int> input;
58     fn(input, 1, 0);
59   }
60   {
61     // int -> Future<Unit>
62     auto res = reduce(window(std::vector<int>({1, 2, 3}),
63                              [](int /* i */) { return makeFuture(); },
64                              2),
65                       0,
66                       [](int sum, const Try<Unit>& b) {
67                         EXPECT_TRUE(b.hasValue());
68                         return sum + 1;
69                       }).get();
70     EXPECT_EQ(3, res);
71   }
72   {
73     // string -> return Future<int>
74     auto res = reduce(
75       window(
76         std::vector<std::string>{"1", "2", "3"},
77         [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
78         2),
79       0,
80       [](int sum, const Try<int>& b) {
81         return sum + *b;
82       }).get();
83     EXPECT_EQ(6, res);
84   }
85 }
86
87 TEST(Window, parallel) {
88   std::vector<int> input;
89   std::vector<Promise<int>> ps(10);
90   for (size_t i = 0; i < ps.size(); i++) {
91     input.emplace_back(i);
92   }
93   auto f = collect(window(input, [&](int i) {
94     return ps[i].getFuture();
95   }, 3));
96
97   std::vector<std::thread> ts;
98   boost::barrier barrier(ps.size() + 1);
99   for (size_t i = 0; i < ps.size(); i++) {
100     ts.emplace_back([&ps, &barrier, i]() {
101       barrier.wait();
102       ps[i].setValue(i);
103     });
104   }
105
106   barrier.wait();
107
108   for (auto& t : ts) {
109     t.join();
110   }
111
112   EXPECT_TRUE(f.isReady());
113   for (size_t i = 0; i < ps.size(); i++) {
114     EXPECT_EQ(i, f.value()[i]);
115   }
116 }
117
118 TEST(Window, parallelWithError) {
119   std::vector<int> input;
120   std::vector<Promise<int>> ps(10);
121   for (size_t i = 0; i < ps.size(); i++) {
122     input.emplace_back(i);
123   }
124   auto f = collect(window(input, [&](int i) {
125     return ps[i].getFuture();
126   }, 3));
127
128   std::vector<std::thread> ts;
129   boost::barrier barrier(ps.size() + 1);
130   for (size_t i = 0; i < ps.size(); i++) {
131     ts.emplace_back([&ps, &barrier, i]() {
132       barrier.wait();
133       if (i == (ps.size()/2)) {
134         ps[i].setException(eggs);
135       } else {
136         ps[i].setValue(i);
137       }
138     });
139   }
140
141   barrier.wait();
142
143   for (auto& t : ts) {
144     t.join();
145   }
146
147   EXPECT_TRUE(f.isReady());
148   EXPECT_THROW(f.value(), eggs_t);
149 }
150
151 TEST(Window, allParallelWithError) {
152   std::vector<int> input;
153   std::vector<Promise<int>> ps(10);
154   for (size_t i = 0; i < ps.size(); i++) {
155     input.emplace_back(i);
156   }
157   auto f = collectAll(window(input, [&](int i) {
158     return ps[i].getFuture();
159   }, 3));
160
161   std::vector<std::thread> ts;
162   boost::barrier barrier(ps.size() + 1);
163   for (size_t i = 0; i < ps.size(); i++) {
164     ts.emplace_back([&ps, &barrier, i]() {
165       barrier.wait();
166       if (i == (ps.size()/2)) {
167         ps[i].setException(eggs);
168       } else {
169         ps[i].setValue(i);
170       }
171     });
172   }
173
174   barrier.wait();
175
176   for (auto& t : ts) {
177     t.join();
178   }
179
180   EXPECT_TRUE(f.isReady());
181   for (size_t i = 0; i < ps.size(); i++) {
182     if (i == (ps.size()/2)) {
183       EXPECT_THROW(f.value()[i].value(), eggs_t);
184     } else {
185       EXPECT_TRUE(f.value()[i].hasValue());
186       EXPECT_EQ(i, f.value()[i].value());
187     }
188   }
189 }
190
191 TEST(WindowExecutor, basic) {
192   ManualExecutor executor;
193
194   // int -> Future<int>
195   auto fn = [executor_ = &executor](
196                 std::vector<int> input, size_t window_size, size_t expect) {
197     auto res = reduce(
198         window(
199             executor_, input, [](int i) { return makeFuture(i); }, window_size),
200         0,
201         [](int sum, const Try<int>& b) { return sum + *b; });
202     executor_->waitFor(res);
203     EXPECT_EQ(expect, res.get());
204   };
205   {
206     SCOPED_TRACE("2 in-flight at a time");
207     std::vector<int> input = {1, 2, 3};
208     fn(input, 2, 6);
209   }
210   {
211     SCOPED_TRACE("4 in-flight at a time");
212     std::vector<int> input = {1, 2, 3};
213     fn(input, 4, 6);
214   }
215   {
216     SCOPED_TRACE("empty input");
217     std::vector<int> input;
218     fn(input, 1, 0);
219   }
220   {
221     // int -> Future<Unit>
222     auto res = reduce(
223         window(
224             &executor,
225             std::vector<int>({1, 2, 3}),
226             [](int /* i */) { return makeFuture(); },
227             2),
228         0,
229         [](int sum, const Try<Unit>& b) {
230           EXPECT_TRUE(b.hasValue());
231           return sum + 1;
232         });
233     executor.waitFor(res);
234     EXPECT_EQ(3, res.get());
235   }
236   {
237     // string -> return Future<int>
238     auto res = reduce(
239         window(
240             &executor,
241             std::vector<std::string>{"1", "2", "3"},
242             [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
243             2),
244         0,
245         [](int sum, const Try<int>& b) { return sum + *b; });
246     executor.waitFor(res);
247     EXPECT_EQ(6, res.get());
248   }
249 }
250
251 TEST(WindowExecutor, parallel) {
252   ManualExecutor executor;
253
254   std::vector<int> input;
255   std::vector<Promise<int>> ps(10);
256   for (size_t i = 0; i < ps.size(); i++) {
257     input.emplace_back(i);
258   }
259   auto f = collect(
260       window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
261
262   std::vector<std::thread> ts;
263   boost::barrier barrier(ps.size() + 1);
264   for (size_t i = 0; i < ps.size(); i++) {
265     ts.emplace_back([&ps, &barrier, i]() {
266       barrier.wait();
267       ps[i].setValue(i);
268     });
269   }
270
271   barrier.wait();
272
273   for (auto& t : ts) {
274     t.join();
275   }
276
277   executor.waitFor(f);
278   EXPECT_TRUE(f.isReady());
279   for (size_t i = 0; i < ps.size(); i++) {
280     EXPECT_EQ(i, f.value()[i]);
281   }
282 }
283
284 TEST(WindowExecutor, parallelWithError) {
285   ManualExecutor executor;
286
287   std::vector<int> input;
288   std::vector<Promise<int>> ps(10);
289   for (size_t i = 0; i < ps.size(); i++) {
290     input.emplace_back(i);
291   }
292   auto f = collect(
293       window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
294
295   std::vector<std::thread> ts;
296   boost::barrier barrier(ps.size() + 1);
297   for (size_t i = 0; i < ps.size(); i++) {
298     ts.emplace_back([&ps, &barrier, i]() {
299       barrier.wait();
300       if (i == (ps.size() / 2)) {
301         ps[i].setException(eggs);
302       } else {
303         ps[i].setValue(i);
304       }
305     });
306   }
307
308   barrier.wait();
309
310   for (auto& t : ts) {
311     t.join();
312   }
313
314   executor.waitFor(f);
315   EXPECT_TRUE(f.isReady());
316   EXPECT_THROW(f.value(), eggs_t);
317 }
318
319 TEST(WindowExecutor, allParallelWithError) {
320   ManualExecutor executor;
321
322   std::vector<int> input;
323   std::vector<Promise<int>> ps(10);
324   for (size_t i = 0; i < ps.size(); i++) {
325     input.emplace_back(i);
326   }
327   auto f = collectAll(
328       window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
329
330   std::vector<std::thread> ts;
331   boost::barrier barrier(ps.size() + 1);
332   for (size_t i = 0; i < ps.size(); i++) {
333     ts.emplace_back([&ps, &barrier, i]() {
334       barrier.wait();
335       if (i == (ps.size() / 2)) {
336         ps[i].setException(eggs);
337       } else {
338         ps[i].setValue(i);
339       }
340     });
341   }
342
343   barrier.wait();
344
345   for (auto& t : ts) {
346     t.join();
347   }
348
349   executor.waitFor(f);
350   EXPECT_TRUE(f.isReady());
351   for (size_t i = 0; i < ps.size(); i++) {
352     if (i == (ps.size() / 2)) {
353       EXPECT_THROW(f.value()[i].value(), eggs_t);
354     } else {
355       EXPECT_TRUE(f.value()[i].hasValue());
356       EXPECT_EQ(i, f.value()[i].value());
357     }
358   }
359 }