6eecedc199ef544d90155c475b04f447acb91ecf
[folly.git] / folly / futures / test / WindowTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <gtest/gtest.h>
18
19 #include <boost/thread/barrier.hpp>
20
21 #include <folly/Conv.h>
22 #include <folly/futures/Future.h>
23
24 #include <vector>
25
26 using namespace folly;
27
28 typedef FutureException eggs_t;
29 static eggs_t eggs("eggs");
30
31 TEST(Window, basic) {
32   // int -> Future<int>
33   auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
34     auto res = reduce(
35       window(
36         input,
37         [](int i) { return makeFuture(i); },
38         window_size),
39       0,
40       [](int sum, const Try<int>& b) {
41         return sum + *b;
42       }).get();
43     EXPECT_EQ(expect, res);
44   };
45   {
46     // 2 in-flight at a time
47     std::vector<int> input = {1, 2, 3};
48     fn(input, 2, 6);
49   }
50   {
51     // 4 in-flight at a time
52     std::vector<int> input = {1, 2, 3};
53     fn(input, 4, 6);
54   }
55   {
56     // empty input
57     std::vector<int> input;
58     fn(input, 1, 0);
59   }
60   {
61     // int -> Future<Unit>
62     auto res = reduce(window(std::vector<int>({1, 2, 3}),
63                              [](int /* i */) { return makeFuture(); },
64                              2),
65                       0,
66                       [](int sum, const Try<Unit>& b) {
67                         EXPECT_TRUE(b.hasValue());
68                         return sum + 1;
69                       }).get();
70     EXPECT_EQ(3, res);
71   }
72   {
73     // string -> return Future<int>
74     auto res = reduce(
75       window(
76         std::vector<std::string>{"1", "2", "3"},
77         [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
78         2),
79       0,
80       [](int sum, const Try<int>& b) {
81         return sum + *b;
82       }).get();
83     EXPECT_EQ(6, res);
84   }
85 }
86
87 TEST(Window, parallel) {
88   std::vector<int> input;
89   std::vector<Promise<int>> ps(10);
90   for (size_t i = 0; i < ps.size(); i++) {
91     input.emplace_back(i);
92   }
93   auto f = collect(window(input, [&](int i) {
94     return ps[i].getFuture();
95   }, 3));
96
97   std::vector<std::thread> ts;
98   boost::barrier barrier(ps.size() + 1);
99   for (size_t i = 0; i < ps.size(); i++) {
100     ts.emplace_back([&ps, &barrier, i]() {
101       barrier.wait();
102       ps[i].setValue(i);
103     });
104   }
105
106   barrier.wait();
107
108   for (size_t i = 0; i < ps.size(); i++) {
109     ts[i].join();
110   }
111
112   EXPECT_TRUE(f.isReady());
113   for (size_t i = 0; i < ps.size(); i++) {
114     EXPECT_EQ(i, f.value()[i]);
115   }
116 }
117
118 TEST(Window, parallelWithError) {
119   std::vector<int> input;
120   std::vector<Promise<int>> ps(10);
121   for (size_t i = 0; i < ps.size(); i++) {
122     input.emplace_back(i);
123   }
124   auto f = collect(window(input, [&](int i) {
125     return ps[i].getFuture();
126   }, 3));
127
128   std::vector<std::thread> ts;
129   boost::barrier barrier(ps.size() + 1);
130   for (size_t i = 0; i < ps.size(); i++) {
131     ts.emplace_back([&ps, &barrier, i]() {
132       barrier.wait();
133       if (i == (ps.size()/2)) {
134         ps[i].setException(eggs);
135       } else {
136         ps[i].setValue(i);
137       }
138     });
139   }
140
141   barrier.wait();
142
143   for (size_t i = 0; i < ps.size(); i++) {
144     ts[i].join();
145   }
146
147   EXPECT_TRUE(f.isReady());
148   EXPECT_THROW(f.value(), eggs_t);
149 }
150
151 TEST(Window, allParallelWithError) {
152   std::vector<int> input;
153   std::vector<Promise<int>> ps(10);
154   for (size_t i = 0; i < ps.size(); i++) {
155     input.emplace_back(i);
156   }
157   auto f = collectAll(window(input, [&](int i) {
158     return ps[i].getFuture();
159   }, 3));
160
161   std::vector<std::thread> ts;
162   boost::barrier barrier(ps.size() + 1);
163   for (size_t i = 0; i < ps.size(); i++) {
164     ts.emplace_back([&ps, &barrier, i]() {
165       barrier.wait();
166       if (i == (ps.size()/2)) {
167         ps[i].setException(eggs);
168       } else {
169         ps[i].setValue(i);
170       }
171     });
172   }
173
174   barrier.wait();
175
176   for (size_t i = 0; i < ps.size(); i++) {
177     ts[i].join();
178   }
179
180   EXPECT_TRUE(f.isReady());
181   for (size_t i = 0; i < ps.size(); i++) {
182     if (i == (ps.size()/2)) {
183       EXPECT_THROW(f.value()[i].value(), eggs_t);
184     } else {
185       EXPECT_TRUE(f.value()[i].hasValue());
186       EXPECT_EQ(i, f.value()[i].value());
187     }
188   }
189 }