Nuke Future<void> (folly/futures)
[folly.git] / folly / futures / test / WindowTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <gtest/gtest.h>
18
19 #include <boost/thread/barrier.hpp>
20
21 #include <folly/futures/Future.h>
22
23 #include <vector>
24
25 using namespace folly;
26
27 typedef FutureException eggs_t;
28 static eggs_t eggs("eggs");
29
30 TEST(Window, basic) {
31   // int -> Future<int>
32   auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
33     auto res = reduce(
34       window(
35         input,
36         [](int i) { return makeFuture(i); },
37         2),
38       0,
39       [](int sum, const Try<int>& b) {
40         return sum + *b;
41       }).get();
42     EXPECT_EQ(expect, res);
43   };
44   {
45     // 2 in-flight at a time
46     std::vector<int> input = {1, 2, 3};
47     fn(input, 2, 6);
48   }
49   {
50     // 4 in-flight at a time
51     std::vector<int> input = {1, 2, 3};
52     fn(input, 4, 6);
53   }
54   {
55     // empty inpt
56     std::vector<int> input;
57     fn(input, 1, 0);
58   }
59   {
60     // int -> Future<Unit>
61     auto res = reduce(
62       window(
63         std::vector<int>({1, 2, 3}),
64         [](int i) { return makeFuture(); },
65         2),
66       0,
67       [](int sum, const Try<Unit>& b) {
68         EXPECT_TRUE(b.hasValue());
69         return sum + 1;
70       }).get();
71     EXPECT_EQ(3, res);
72   }
73   {
74     // string -> return Future<int>
75     auto res = reduce(
76       window(
77         std::vector<std::string>{"1", "2", "3"},
78         [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
79         2),
80       0,
81       [](int sum, const Try<int>& b) {
82         return sum + *b;
83       }).get();
84     EXPECT_EQ(6, res);
85   }
86 }
87
88 TEST(Window, parallel) {
89   std::vector<int> input;
90   std::vector<Promise<int>> ps(10);
91   for (size_t i = 0; i < ps.size(); i++) {
92     input.emplace_back(i);
93   }
94   auto f = collect(window(input, [&](int i) {
95     return ps[i].getFuture();
96   }, 3));
97
98   std::vector<std::thread> ts;
99   boost::barrier barrier(ps.size() + 1);
100   for (size_t i = 0; i < ps.size(); i++) {
101     ts.emplace_back([&ps, &barrier, i]() {
102       barrier.wait();
103       ps[i].setValue(i);
104     });
105   }
106
107   barrier.wait();
108
109   for (size_t i = 0; i < ps.size(); i++) {
110     ts[i].join();
111   }
112
113   EXPECT_TRUE(f.isReady());
114   for (size_t i = 0; i < ps.size(); i++) {
115     EXPECT_EQ(i, f.value()[i]);
116   }
117 }
118
119 TEST(Window, parallelWithError) {
120   std::vector<int> input;
121   std::vector<Promise<int>> ps(10);
122   for (size_t i = 0; i < ps.size(); i++) {
123     input.emplace_back(i);
124   }
125   auto f = collect(window(input, [&](int i) {
126     return ps[i].getFuture();
127   }, 3));
128
129   std::vector<std::thread> ts;
130   boost::barrier barrier(ps.size() + 1);
131   for (size_t i = 0; i < ps.size(); i++) {
132     ts.emplace_back([&ps, &barrier, i]() {
133       barrier.wait();
134       if (i == (ps.size()/2)) {
135         ps[i].setException(eggs);
136       } else {
137         ps[i].setValue(i);
138       }
139     });
140   }
141
142   barrier.wait();
143
144   for (size_t i = 0; i < ps.size(); i++) {
145     ts[i].join();
146   }
147
148   EXPECT_TRUE(f.isReady());
149   EXPECT_THROW(f.value(), eggs_t);
150 }
151
152 TEST(Window, allParallelWithError) {
153   std::vector<int> input;
154   std::vector<Promise<int>> ps(10);
155   for (size_t i = 0; i < ps.size(); i++) {
156     input.emplace_back(i);
157   }
158   auto f = collectAll(window(input, [&](int i) {
159     return ps[i].getFuture();
160   }, 3));
161
162   std::vector<std::thread> ts;
163   boost::barrier barrier(ps.size() + 1);
164   for (size_t i = 0; i < ps.size(); i++) {
165     ts.emplace_back([&ps, &barrier, i]() {
166       barrier.wait();
167       if (i == (ps.size()/2)) {
168         ps[i].setException(eggs);
169       } else {
170         ps[i].setValue(i);
171       }
172     });
173   }
174
175   barrier.wait();
176
177   for (size_t i = 0; i < ps.size(); i++) {
178     ts[i].join();
179   }
180
181   EXPECT_TRUE(f.isReady());
182   for (size_t i = 0; i < ps.size(); i++) {
183     if (i == (ps.size()/2)) {
184       EXPECT_THROW(f.value()[i].value(), eggs_t);
185     } else {
186       EXPECT_TRUE(f.value()[i].hasValue());
187       EXPECT_EQ(i, f.value()[i].value());
188     }
189   }
190 }