dbe94dbb11816f975e1b63827208715ae5a0cbce
[folly.git] / folly / futures / test / CollectTest.cpp
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <numeric>
18
19 #include <boost/thread/barrier.hpp>
20
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
23 #include <folly/portability/GTest.h>
24 #include <folly/small_vector.h>
25
26 using namespace folly;
27
28 typedef FutureException eggs_t;
29 static eggs_t eggs("eggs");
30
31 auto rng = std::mt19937(folly::randomNumberSeed());
32
33 TEST(Collect, collectAll) {
34   // returns a vector variant
35   {
36     std::vector<Promise<int>> promises(10);
37     std::vector<Future<int>> futures;
38
39     for (auto& p : promises) {
40       futures.push_back(p.getFuture());
41     }
42
43     auto allf = collectAll(futures);
44
45     std::shuffle(promises.begin(), promises.end(), rng);
46     for (auto& p : promises) {
47       EXPECT_FALSE(allf.isReady());
48       p.setValue(42);
49     }
50
51     EXPECT_TRUE(allf.isReady());
52     auto& results = allf.value();
53     for (auto& t : results) {
54       EXPECT_EQ(42, t.value());
55     }
56   }
57
58   // check error semantics
59   {
60     std::vector<Promise<int>> promises(4);
61     std::vector<Future<int>> futures;
62
63     for (auto& p : promises) {
64       futures.push_back(p.getFuture());
65     }
66
67     auto allf = collectAll(futures);
68
69
70     promises[0].setValue(42);
71     promises[1].setException(eggs);
72
73     EXPECT_FALSE(allf.isReady());
74
75     promises[2].setValue(42);
76
77     EXPECT_FALSE(allf.isReady());
78
79     promises[3].setException(eggs);
80
81     EXPECT_TRUE(allf.isReady());
82     EXPECT_FALSE(allf.getTry().hasException());
83
84     auto& results = allf.value();
85     EXPECT_EQ(42, results[0].value());
86     EXPECT_TRUE(results[1].hasException());
87     EXPECT_EQ(42, results[2].value());
88     EXPECT_TRUE(results[3].hasException());
89   }
90
91   // check that futures are ready in then()
92   {
93     std::vector<Promise<Unit>> promises(10);
94     std::vector<Future<Unit>> futures;
95
96     for (auto& p : promises) {
97       futures.push_back(p.getFuture());
98     }
99
100     auto allf = collectAll(futures).then([](Try<std::vector<Try<Unit>>>&& ts) {
101       for (auto& f : ts.value()) {
102         f.value();
103       }
104     });
105
106     std::shuffle(promises.begin(), promises.end(), rng);
107     for (auto& p : promises) {
108       p.setValue();
109     }
110     EXPECT_TRUE(allf.isReady());
111   }
112 }
113
114 TEST(Collect, collect) {
115   // success case
116   {
117     std::vector<Promise<int>> promises(10);
118     std::vector<Future<int>> futures;
119
120     for (auto& p : promises) {
121       futures.push_back(p.getFuture());
122     }
123
124     auto allf = collect(futures);
125
126     std::shuffle(promises.begin(), promises.end(), rng);
127     for (auto& p : promises) {
128       EXPECT_FALSE(allf.isReady());
129       p.setValue(42);
130     }
131
132     EXPECT_TRUE(allf.isReady());
133     for (auto i : allf.value()) {
134       EXPECT_EQ(42, i);
135     }
136   }
137
138   // failure case
139   {
140     std::vector<Promise<int>> promises(10);
141     std::vector<Future<int>> futures;
142
143     for (auto& p : promises) {
144       futures.push_back(p.getFuture());
145     }
146
147     auto allf = collect(futures);
148
149     std::shuffle(promises.begin(), promises.end(), rng);
150     for (int i = 0; i < 10; i++) {
151       if (i < 5) {
152         // everthing goes well so far...
153         EXPECT_FALSE(allf.isReady());
154         promises[i].setValue(42);
155       } else if (i == 5) {
156         // short circuit with an exception
157         EXPECT_FALSE(allf.isReady());
158         promises[i].setException(eggs);
159         EXPECT_TRUE(allf.isReady());
160       } else if (i < 8) {
161         // don't blow up on further values
162         EXPECT_TRUE(allf.isReady());
163         promises[i].setValue(42);
164       } else {
165         // don't blow up on further exceptions
166         EXPECT_TRUE(allf.isReady());
167         promises[i].setException(eggs);
168       }
169     }
170
171     EXPECT_THROW(allf.value(), eggs_t);
172   }
173
174   // void futures success case
175   {
176     std::vector<Promise<Unit>> promises(10);
177     std::vector<Future<Unit>> futures;
178
179     for (auto& p : promises) {
180       futures.push_back(p.getFuture());
181     }
182
183     auto allf = collect(futures);
184
185     std::shuffle(promises.begin(), promises.end(), rng);
186     for (auto& p : promises) {
187       EXPECT_FALSE(allf.isReady());
188       p.setValue();
189     }
190
191     EXPECT_TRUE(allf.isReady());
192   }
193
194   // void futures failure case
195   {
196     std::vector<Promise<Unit>> promises(10);
197     std::vector<Future<Unit>> futures;
198
199     for (auto& p : promises) {
200       futures.push_back(p.getFuture());
201     }
202
203     auto allf = collect(futures);
204
205     std::shuffle(promises.begin(), promises.end(), rng);
206     for (int i = 0; i < 10; i++) {
207       if (i < 5) {
208         // everthing goes well so far...
209         EXPECT_FALSE(allf.isReady());
210         promises[i].setValue();
211       } else if (i == 5) {
212         // short circuit with an exception
213         EXPECT_FALSE(allf.isReady());
214         promises[i].setException(eggs);
215         EXPECT_TRUE(allf.isReady());
216       } else if (i < 8) {
217         // don't blow up on further values
218         EXPECT_TRUE(allf.isReady());
219         promises[i].setValue();
220       } else {
221         // don't blow up on further exceptions
222         EXPECT_TRUE(allf.isReady());
223         promises[i].setException(eggs);
224       }
225     }
226
227     EXPECT_THROW(allf.value(), eggs_t);
228   }
229
230   // move only compiles
231   {
232     std::vector<Promise<std::unique_ptr<int>>> promises(10);
233     std::vector<Future<std::unique_ptr<int>>> futures;
234
235     for (auto& p : promises) {
236       futures.push_back(p.getFuture());
237     }
238
239     collect(futures);
240   }
241
242 }
243
244 struct NotDefaultConstructible {
245   NotDefaultConstructible() = delete;
246   explicit NotDefaultConstructible(int arg) : i(arg) {}
247   int i;
248 };
249
250 // We have a specialized implementation for non-default-constructible objects
251 // Ensure that it works and preserves order
252 TEST(Collect, collectNotDefaultConstructible) {
253   std::vector<Promise<NotDefaultConstructible>> promises(10);
254   std::vector<Future<NotDefaultConstructible>> futures;
255   std::vector<int> indices(10);
256   std::iota(indices.begin(), indices.end(), 0);
257   std::shuffle(indices.begin(), indices.end(), rng);
258
259   for (auto& p : promises) {
260     futures.push_back(p.getFuture());
261   }
262
263   auto allf = collect(futures);
264
265   for (auto i : indices) {
266     EXPECT_FALSE(allf.isReady());
267     promises[i].setValue(NotDefaultConstructible(i));
268   }
269
270   EXPECT_TRUE(allf.isReady());
271   int i = 0;
272   for (auto val : allf.value()) {
273     EXPECT_EQ(i, val.i);
274     i++;
275   }
276 }
277
278 TEST(Collect, collectAny) {
279   {
280     std::vector<Promise<int>> promises(10);
281     std::vector<Future<int>> futures;
282
283     for (auto& p : promises) {
284       futures.push_back(p.getFuture());
285     }
286
287     for (auto& f : futures) {
288       EXPECT_FALSE(f.isReady());
289     }
290
291     auto anyf = collectAny(futures);
292
293     /* futures were moved in, so these are invalid now */
294     EXPECT_FALSE(anyf.isReady());
295
296     promises[7].setValue(42);
297     EXPECT_TRUE(anyf.isReady());
298     auto& idx_fut = anyf.value();
299
300     auto i = idx_fut.first;
301     EXPECT_EQ(7, i);
302
303     auto& f = idx_fut.second;
304     EXPECT_EQ(42, f.value());
305   }
306
307   // error
308   {
309     std::vector<Promise<Unit>> promises(10);
310     std::vector<Future<Unit>> futures;
311
312     for (auto& p : promises) {
313       futures.push_back(p.getFuture());
314     }
315
316     for (auto& f : futures) {
317       EXPECT_FALSE(f.isReady());
318     }
319
320     auto anyf = collectAny(futures);
321
322     EXPECT_FALSE(anyf.isReady());
323
324     promises[3].setException(eggs);
325     EXPECT_TRUE(anyf.isReady());
326     EXPECT_TRUE(anyf.value().second.hasException());
327   }
328
329   // then()
330   {
331     std::vector<Promise<int>> promises(10);
332     std::vector<Future<int>> futures;
333
334     for (auto& p : promises) {
335       futures.push_back(p.getFuture());
336     }
337
338     auto anyf = collectAny(futures)
339       .then([](std::pair<size_t, Try<int>> p) {
340         EXPECT_EQ(42, p.second.value());
341       });
342
343     promises[3].setValue(42);
344     EXPECT_TRUE(anyf.isReady());
345   }
346 }
347
348 TEST(Collect, collectAnyWithoutException) {
349   {
350     std::vector<Promise<int>> promises(10);
351     std::vector<Future<int>> futures;
352
353     for (auto& p : promises) {
354       futures.push_back(p.getFuture());
355     }
356
357     auto onef = collectAnyWithoutException(futures);
358
359     /* futures were moved in, so these are invalid now */
360     EXPECT_FALSE(onef.isReady());
361
362     promises[7].setValue(42);
363     EXPECT_TRUE(onef.isReady());
364     auto& idx_fut = onef.value();
365     EXPECT_EQ(7, idx_fut.first);
366     EXPECT_EQ(42, idx_fut.second);
367   }
368
369   // some exception before ready
370   {
371     std::vector<Promise<int>> promises(10);
372     std::vector<Future<int>> futures;
373
374     for (auto& p : promises) {
375       futures.push_back(p.getFuture());
376     }
377
378     auto onef = collectAnyWithoutException(futures);
379
380     EXPECT_FALSE(onef.isReady());
381
382     promises[3].setException(eggs);
383     EXPECT_FALSE(onef.isReady());
384     promises[4].setException(eggs);
385     EXPECT_FALSE(onef.isReady());
386     promises[0].setValue(99);
387     EXPECT_TRUE(onef.isReady());
388     auto& idx_fut = onef.value();
389     EXPECT_EQ(0, idx_fut.first);
390     EXPECT_EQ(99, idx_fut.second);
391   }
392
393   // all exceptions
394   {
395     std::vector<Promise<int>> promises(10);
396     std::vector<Future<int>> futures;
397
398     for (auto& p : promises) {
399       futures.push_back(p.getFuture());
400     }
401
402     auto onef = collectAnyWithoutException(futures);
403
404     EXPECT_FALSE(onef.isReady());
405     for (int i = 0; i < 9; ++i) {
406       promises[i].setException(eggs);
407     }
408     EXPECT_FALSE(onef.isReady());
409
410     promises[9].setException(eggs);
411     EXPECT_TRUE(onef.isReady());
412     EXPECT_TRUE(onef.hasException());
413   }
414 }
415
416 TEST(Collect, alreadyCompleted) {
417   {
418     std::vector<Future<Unit>> fs;
419     for (int i = 0; i < 10; i++) {
420       fs.push_back(makeFuture());
421     }
422
423     collectAll(fs)
424       .then([&](std::vector<Try<Unit>> ts) {
425         EXPECT_EQ(fs.size(), ts.size());
426       });
427   }
428   {
429     std::vector<Future<int>> fs;
430     for (int i = 0; i < 10; i++) {
431       fs.push_back(makeFuture(i));
432     }
433
434     collectAny(fs)
435       .then([&](std::pair<size_t, Try<int>> p) {
436         EXPECT_EQ(p.first, p.second.value());
437       });
438   }
439 }
440
441 TEST(Collect, parallel) {
442   std::vector<Promise<int>> ps(10);
443   std::vector<Future<int>> fs;
444   for (size_t i = 0; i < ps.size(); i++) {
445     fs.emplace_back(ps[i].getFuture());
446   }
447   auto f = collect(fs);
448
449   std::vector<std::thread> ts;
450   boost::barrier barrier(ps.size() + 1);
451   for (size_t i = 0; i < ps.size(); i++) {
452     ts.emplace_back([&ps, &barrier, i]() {
453       barrier.wait();
454       ps[i].setValue(i);
455     });
456   }
457
458   barrier.wait();
459
460   for (size_t i = 0; i < ps.size(); i++) {
461     ts[i].join();
462   }
463
464   EXPECT_TRUE(f.isReady());
465   for (size_t i = 0; i < ps.size(); i++) {
466     EXPECT_EQ(i, f.value()[i]);
467   }
468 }
469
470 TEST(Collect, parallelWithError) {
471   std::vector<Promise<int>> ps(10);
472   std::vector<Future<int>> fs;
473   for (size_t i = 0; i < ps.size(); i++) {
474     fs.emplace_back(ps[i].getFuture());
475   }
476   auto f = collect(fs);
477
478   std::vector<std::thread> ts;
479   boost::barrier barrier(ps.size() + 1);
480   for (size_t i = 0; i < ps.size(); i++) {
481     ts.emplace_back([&ps, &barrier, i]() {
482       barrier.wait();
483       if (i == (ps.size()/2)) {
484         ps[i].setException(eggs);
485       } else {
486         ps[i].setValue(i);
487       }
488     });
489   }
490
491   barrier.wait();
492
493   for (size_t i = 0; i < ps.size(); i++) {
494     ts[i].join();
495   }
496
497   EXPECT_TRUE(f.isReady());
498   EXPECT_THROW(f.value(), eggs_t);
499 }
500
501 TEST(Collect, allParallel) {
502   std::vector<Promise<int>> ps(10);
503   std::vector<Future<int>> fs;
504   for (size_t i = 0; i < ps.size(); i++) {
505     fs.emplace_back(ps[i].getFuture());
506   }
507   auto f = collectAll(fs);
508
509   std::vector<std::thread> ts;
510   boost::barrier barrier(ps.size() + 1);
511   for (size_t i = 0; i < ps.size(); i++) {
512     ts.emplace_back([&ps, &barrier, i]() {
513       barrier.wait();
514       ps[i].setValue(i);
515     });
516   }
517
518   barrier.wait();
519
520   for (size_t i = 0; i < ps.size(); i++) {
521     ts[i].join();
522   }
523
524   EXPECT_TRUE(f.isReady());
525   for (size_t i = 0; i < ps.size(); i++) {
526     EXPECT_TRUE(f.value()[i].hasValue());
527     EXPECT_EQ(i, f.value()[i].value());
528   }
529 }
530
531 TEST(Collect, allParallelWithError) {
532   std::vector<Promise<int>> ps(10);
533   std::vector<Future<int>> fs;
534   for (size_t i = 0; i < ps.size(); i++) {
535     fs.emplace_back(ps[i].getFuture());
536   }
537   auto f = collectAll(fs);
538
539   std::vector<std::thread> ts;
540   boost::barrier barrier(ps.size() + 1);
541   for (size_t i = 0; i < ps.size(); i++) {
542     ts.emplace_back([&ps, &barrier, i]() {
543       barrier.wait();
544       if (i == (ps.size()/2)) {
545         ps[i].setException(eggs);
546       } else {
547         ps[i].setValue(i);
548       }
549     });
550   }
551
552   barrier.wait();
553
554   for (size_t i = 0; i < ps.size(); i++) {
555     ts[i].join();
556   }
557
558   EXPECT_TRUE(f.isReady());
559   for (size_t i = 0; i < ps.size(); i++) {
560     if (i == (ps.size()/2)) {
561       EXPECT_THROW(f.value()[i].value(), eggs_t);
562     } else {
563       EXPECT_TRUE(f.value()[i].hasValue());
564       EXPECT_EQ(i, f.value()[i].value());
565     }
566   }
567 }
568
569 TEST(Collect, collectN) {
570   std::vector<Promise<Unit>> promises(10);
571   std::vector<Future<Unit>> futures;
572
573   for (auto& p : promises) {
574     futures.push_back(p.getFuture());
575   }
576
577   bool flag = false;
578   size_t n = 3;
579   collectN(futures, n)
580     .then([&](std::vector<std::pair<size_t, Try<Unit>>> v) {
581       flag = true;
582       EXPECT_EQ(n, v.size());
583       for (auto& tt : v) {
584         EXPECT_TRUE(tt.second.hasValue());
585       }
586     });
587
588   promises[0].setValue();
589   EXPECT_FALSE(flag);
590   promises[1].setValue();
591   EXPECT_FALSE(flag);
592   promises[2].setValue();
593   EXPECT_TRUE(flag);
594 }
595
596 /// Ensure that we can compile collectAll/Any with folly::small_vector
597 TEST(Collect, smallVector) {
598   static_assert(!FOLLY_IS_TRIVIALLY_COPYABLE(Future<Unit>),
599                 "Futures should not be trivially copyable");
600   static_assert(!FOLLY_IS_TRIVIALLY_COPYABLE(Future<int>),
601                 "Futures should not be trivially copyable");
602
603   {
604     folly::small_vector<Future<Unit>> futures;
605
606     for (int i = 0; i < 10; i++) {
607       futures.push_back(makeFuture());
608     }
609
610     auto anyf = collectAny(futures);
611   }
612   {
613     folly::small_vector<Future<Unit>> futures;
614
615     for (int i = 0; i < 10; i++) {
616       futures.push_back(makeFuture());
617     }
618
619     auto allf = collectAll(futures);
620   }
621 }
622
623 TEST(Collect, collectAllVariadic) {
624   Promise<bool> pb;
625   Promise<int> pi;
626   Future<bool> fb = pb.getFuture();
627   Future<int> fi = pi.getFuture();
628   bool flag = false;
629   collectAll(std::move(fb), std::move(fi))
630     .then([&](std::tuple<Try<bool>, Try<int>> tup) {
631       flag = true;
632       EXPECT_TRUE(std::get<0>(tup).hasValue());
633       EXPECT_EQ(std::get<0>(tup).value(), true);
634       EXPECT_TRUE(std::get<1>(tup).hasValue());
635       EXPECT_EQ(std::get<1>(tup).value(), 42);
636     });
637   pb.setValue(true);
638   EXPECT_FALSE(flag);
639   pi.setValue(42);
640   EXPECT_TRUE(flag);
641 }
642
643 TEST(Collect, collectAllVariadicReferences) {
644   Promise<bool> pb;
645   Promise<int> pi;
646   Future<bool> fb = pb.getFuture();
647   Future<int> fi = pi.getFuture();
648   bool flag = false;
649   collectAll(fb, fi)
650     .then([&](std::tuple<Try<bool>, Try<int>> tup) {
651       flag = true;
652       EXPECT_TRUE(std::get<0>(tup).hasValue());
653       EXPECT_EQ(std::get<0>(tup).value(), true);
654       EXPECT_TRUE(std::get<1>(tup).hasValue());
655       EXPECT_EQ(std::get<1>(tup).value(), 42);
656     });
657   pb.setValue(true);
658   EXPECT_FALSE(flag);
659   pi.setValue(42);
660   EXPECT_TRUE(flag);
661 }
662
663 TEST(Collect, collectAllVariadicWithException) {
664   Promise<bool> pb;
665   Promise<int> pi;
666   Future<bool> fb = pb.getFuture();
667   Future<int> fi = pi.getFuture();
668   bool flag = false;
669   collectAll(std::move(fb), std::move(fi))
670     .then([&](std::tuple<Try<bool>, Try<int>> tup) {
671       flag = true;
672       EXPECT_TRUE(std::get<0>(tup).hasValue());
673       EXPECT_EQ(std::get<0>(tup).value(), true);
674       EXPECT_TRUE(std::get<1>(tup).hasException());
675       EXPECT_THROW(std::get<1>(tup).value(), eggs_t);
676     });
677   pb.setValue(true);
678   EXPECT_FALSE(flag);
679   pi.setException(eggs);
680   EXPECT_TRUE(flag);
681 }
682
683 TEST(Collect, collectVariadic) {
684   Promise<bool> pb;
685   Promise<int> pi;
686   Future<bool> fb = pb.getFuture();
687   Future<int> fi = pi.getFuture();
688   bool flag = false;
689   collect(std::move(fb), std::move(fi))
690     .then([&](std::tuple<bool, int> tup) {
691       flag = true;
692       EXPECT_EQ(std::get<0>(tup), true);
693       EXPECT_EQ(std::get<1>(tup), 42);
694     });
695   pb.setValue(true);
696   EXPECT_FALSE(flag);
697   pi.setValue(42);
698   EXPECT_TRUE(flag);
699 }
700
701 TEST(Collect, collectVariadicWithException) {
702   Promise<bool> pb;
703   Promise<int> pi;
704   Future<bool> fb = pb.getFuture();
705   Future<int> fi = pi.getFuture();
706   auto f = collect(std::move(fb), std::move(fi));
707   pb.setValue(true);
708   EXPECT_FALSE(f.isReady());
709   pi.setException(eggs);
710   EXPECT_TRUE(f.isReady());
711   EXPECT_TRUE(f.getTry().hasException());
712   EXPECT_THROW(f.get(), eggs_t);
713 }
714
715 TEST(Collect, collectAllNone) {
716   std::vector<Future<int>> fs;
717   auto f = collectAll(fs);
718   EXPECT_TRUE(f.isReady());
719 }
720
721 TEST(Collect, noDefaultConstructor) {
722   struct A {
723     explicit A(size_t /* x */) {}
724   };
725
726   auto f1 = makeFuture(A(1));
727   auto f2 = makeFuture(A(2));
728
729   auto f = collect(std::move(f1), std::move(f2));
730 }