2017
[folly.git] / folly / futures / test / WindowTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <boost/thread/barrier.hpp>
18
19 #include <folly/Conv.h>
20 #include <folly/futures/Future.h>
21 #include <folly/portability/GTest.h>
22
23 #include <vector>
24
25 using namespace folly;
26
27 typedef FutureException eggs_t;
28 static eggs_t eggs("eggs");
29
30 TEST(Window, basic) {
31   // int -> Future<int>
32   auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
33     auto res = reduce(
34       window(
35         input,
36         [](int i) { return makeFuture(i); },
37         window_size),
38       0,
39       [](int sum, const Try<int>& b) {
40         return sum + *b;
41       }).get();
42     EXPECT_EQ(expect, res);
43   };
44   {
45     // 2 in-flight at a time
46     std::vector<int> input = {1, 2, 3};
47     fn(input, 2, 6);
48   }
49   {
50     // 4 in-flight at a time
51     std::vector<int> input = {1, 2, 3};
52     fn(input, 4, 6);
53   }
54   {
55     // empty input
56     std::vector<int> input;
57     fn(input, 1, 0);
58   }
59   {
60     // int -> Future<Unit>
61     auto res = reduce(window(std::vector<int>({1, 2, 3}),
62                              [](int /* i */) { return makeFuture(); },
63                              2),
64                       0,
65                       [](int sum, const Try<Unit>& b) {
66                         EXPECT_TRUE(b.hasValue());
67                         return sum + 1;
68                       }).get();
69     EXPECT_EQ(3, res);
70   }
71   {
72     // string -> return Future<int>
73     auto res = reduce(
74       window(
75         std::vector<std::string>{"1", "2", "3"},
76         [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
77         2),
78       0,
79       [](int sum, const Try<int>& b) {
80         return sum + *b;
81       }).get();
82     EXPECT_EQ(6, res);
83   }
84 }
85
86 TEST(Window, parallel) {
87   std::vector<int> input;
88   std::vector<Promise<int>> ps(10);
89   for (size_t i = 0; i < ps.size(); i++) {
90     input.emplace_back(i);
91   }
92   auto f = collect(window(input, [&](int i) {
93     return ps[i].getFuture();
94   }, 3));
95
96   std::vector<std::thread> ts;
97   boost::barrier barrier(ps.size() + 1);
98   for (size_t i = 0; i < ps.size(); i++) {
99     ts.emplace_back([&ps, &barrier, i]() {
100       barrier.wait();
101       ps[i].setValue(i);
102     });
103   }
104
105   barrier.wait();
106
107   for (size_t i = 0; i < ps.size(); i++) {
108     ts[i].join();
109   }
110
111   EXPECT_TRUE(f.isReady());
112   for (size_t i = 0; i < ps.size(); i++) {
113     EXPECT_EQ(i, f.value()[i]);
114   }
115 }
116
117 TEST(Window, parallelWithError) {
118   std::vector<int> input;
119   std::vector<Promise<int>> ps(10);
120   for (size_t i = 0; i < ps.size(); i++) {
121     input.emplace_back(i);
122   }
123   auto f = collect(window(input, [&](int i) {
124     return ps[i].getFuture();
125   }, 3));
126
127   std::vector<std::thread> ts;
128   boost::barrier barrier(ps.size() + 1);
129   for (size_t i = 0; i < ps.size(); i++) {
130     ts.emplace_back([&ps, &barrier, i]() {
131       barrier.wait();
132       if (i == (ps.size()/2)) {
133         ps[i].setException(eggs);
134       } else {
135         ps[i].setValue(i);
136       }
137     });
138   }
139
140   barrier.wait();
141
142   for (size_t i = 0; i < ps.size(); i++) {
143     ts[i].join();
144   }
145
146   EXPECT_TRUE(f.isReady());
147   EXPECT_THROW(f.value(), eggs_t);
148 }
149
150 TEST(Window, allParallelWithError) {
151   std::vector<int> input;
152   std::vector<Promise<int>> ps(10);
153   for (size_t i = 0; i < ps.size(); i++) {
154     input.emplace_back(i);
155   }
156   auto f = collectAll(window(input, [&](int i) {
157     return ps[i].getFuture();
158   }, 3));
159
160   std::vector<std::thread> ts;
161   boost::barrier barrier(ps.size() + 1);
162   for (size_t i = 0; i < ps.size(); i++) {
163     ts.emplace_back([&ps, &barrier, i]() {
164       barrier.wait();
165       if (i == (ps.size()/2)) {
166         ps[i].setException(eggs);
167       } else {
168         ps[i].setValue(i);
169       }
170     });
171   }
172
173   barrier.wait();
174
175   for (size_t i = 0; i < ps.size(); i++) {
176     ts[i].join();
177   }
178
179   EXPECT_TRUE(f.isReady());
180   for (size_t i = 0; i < ps.size(); i++) {
181     if (i == (ps.size()/2)) {
182       EXPECT_THROW(f.value()[i].value(), eggs_t);
183     } else {
184       EXPECT_TRUE(f.value()[i].hasValue());
185       EXPECT_EQ(i, f.value()[i].value());
186     }
187   }
188 }