Sort #include lines
[folly.git] / folly / futures / test / ViaTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <thread>
18
19 #include <folly/Baton.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/futures/DrivableExecutor.h>
22 #include <folly/futures/Future.h>
23 #include <folly/futures/InlineExecutor.h>
24 #include <folly/futures/ManualExecutor.h>
25 #include <folly/portability/GTest.h>
26
27 using namespace folly;
28
29 struct ManualWaiter : public DrivableExecutor {
30   explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
31
32   void add(Func f) override {
33     ex->add(std::move(f));
34   }
35
36   void drive() override {
37     ex->wait();
38     ex->run();
39   }
40
41   std::shared_ptr<ManualExecutor> ex;
42 };
43
44 struct ViaFixture : public testing::Test {
45   ViaFixture() :
46     westExecutor(new ManualExecutor),
47     eastExecutor(new ManualExecutor),
48     waiter(new ManualWaiter(westExecutor)),
49     done(false)
50   {
51     t = std::thread([=] {
52         ManualWaiter eastWaiter(eastExecutor);
53         while (!done)
54           eastWaiter.drive();
55       });
56   }
57
58   ~ViaFixture() override {
59     done = true;
60     eastExecutor->add([=]() { });
61     t.join();
62   }
63
64   void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
65     eastExecutor->add([=]() {
66       cob(a + b);
67     });
68   }
69
70   std::shared_ptr<ManualExecutor> westExecutor;
71   std::shared_ptr<ManualExecutor> eastExecutor;
72   std::shared_ptr<ManualWaiter> waiter;
73   InlineExecutor inlineExecutor;
74   std::atomic<bool> done;
75   std::thread t;
76 };
77
78 TEST(Via, exceptionOnLaunch) {
79   auto future = makeFuture<int>(std::runtime_error("E"));
80   EXPECT_THROW(future.value(), std::runtime_error);
81 }
82
83 TEST(Via, thenValue) {
84   auto future = makeFuture(std::move(1))
85     .then([](Try<int>&& t) {
86       return t.value() == 1;
87     })
88     ;
89
90   EXPECT_TRUE(future.value());
91 }
92
93 TEST(Via, thenFuture) {
94   auto future = makeFuture(1)
95     .then([](Try<int>&& t) {
96       return makeFuture(t.value() == 1);
97     });
98   EXPECT_TRUE(future.value());
99 }
100
101 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
102   return makeFuture(t.value() + ";static");
103 }
104
105 TEST(Via, thenFunction) {
106   struct Worker {
107     Future<std::string> doWork(Try<std::string>&& t) {
108       return makeFuture(t.value() + ";class");
109     }
110     static Future<std::string> doWorkStatic(Try<std::string>&& t) {
111       return makeFuture(t.value() + ";class-static");
112     }
113   } w;
114
115   auto f = makeFuture(std::string("start"))
116     .then(doWorkStatic)
117     .then(Worker::doWorkStatic)
118     .then(&Worker::doWork, &w)
119     ;
120
121   EXPECT_EQ(f.value(), "start;static;class-static;class");
122 }
123
124 TEST_F(ViaFixture, threadHops) {
125   auto westThreadId = std::this_thread::get_id();
126   auto f = via(eastExecutor.get())
127                .then([=](Try<Unit>&& /* t */) {
128                  EXPECT_NE(std::this_thread::get_id(), westThreadId);
129                  return makeFuture<int>(1);
130                })
131                .via(westExecutor.get())
132                .then([=](Try<int>&& t) {
133                  EXPECT_EQ(std::this_thread::get_id(), westThreadId);
134                  return t.value();
135                });
136   EXPECT_EQ(f.getVia(waiter.get()), 1);
137 }
138
139 TEST_F(ViaFixture, chainVias) {
140   auto westThreadId = std::this_thread::get_id();
141   auto f = via(eastExecutor.get()).then([=]() {
142     EXPECT_NE(std::this_thread::get_id(), westThreadId);
143     return 1;
144   }).then([=](int val) {
145     return makeFuture(val).via(westExecutor.get())
146       .then([=](int v) mutable {
147         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
148         return v + 1;
149       });
150   }).then([=](int val) {
151     // even though ultimately the future that triggers this one executed in
152     // the west thread, this then() inherited the executor from its
153     // predecessor, ie the eastExecutor.
154     EXPECT_NE(std::this_thread::get_id(), westThreadId);
155     return val + 1;
156   }).via(westExecutor.get()).then([=](int val) {
157     // go back to west, so we can wait on it
158     EXPECT_EQ(std::this_thread::get_id(), westThreadId);
159     return val + 1;
160   });
161
162   EXPECT_EQ(f.getVia(waiter.get()), 4);
163 }
164
165 TEST_F(ViaFixture, bareViaAssignment) {
166   auto f = via(eastExecutor.get());
167 }
168 TEST_F(ViaFixture, viaAssignment) {
169   // via()&&
170   auto f = makeFuture().via(eastExecutor.get());
171   // via()&
172   auto f2 = f.via(eastExecutor.get());
173 }
174
175 TEST(Via, chain1) {
176   EXPECT_EQ(42,
177             makeFuture()
178             .thenMulti([] { return 42; })
179             .get());
180 }
181
182 TEST(Via, chain3) {
183   int count = 0;
184   auto f = makeFuture().thenMulti(
185       [&]{ count++; return 3.14159; },
186       [&](double) { count++; return std::string("hello"); },
187       [&]{ count++; return makeFuture(42); });
188   EXPECT_EQ(42, f.get());
189   EXPECT_EQ(3, count);
190 }
191
192 struct PriorityExecutor : public Executor {
193   void add(Func /* f */) override {}
194
195   void addWithPriority(Func f, int8_t priority) override {
196     int mid = getNumPriorities() / 2;
197     int p = priority < 0 ?
198             std::max(0, mid + priority) :
199             std::min(getNumPriorities() - 1, mid + priority);
200     EXPECT_LT(p, 3);
201     EXPECT_GE(p, 0);
202     if (p == 0) {
203       count0++;
204     } else if (p == 1) {
205       count1++;
206     } else if (p == 2) {
207       count2++;
208     }
209     f();
210   }
211
212   uint8_t getNumPriorities() const override {
213     return 3;
214   }
215
216   int count0{0};
217   int count1{0};
218   int count2{0};
219 };
220
221 TEST(Via, priority) {
222   PriorityExecutor exe;
223   via(&exe, -1).then([]{});
224   via(&exe, 0).then([]{});
225   via(&exe, 1).then([]{});
226   via(&exe, 42).then([]{});  // overflow should go to max priority
227   via(&exe, -42).then([]{}); // underflow should go to min priority
228   via(&exe).then([]{});      // default to mid priority
229   via(&exe, Executor::LO_PRI).then([]{});
230   via(&exe, Executor::HI_PRI).then([]{});
231   EXPECT_EQ(3, exe.count0);
232   EXPECT_EQ(2, exe.count1);
233   EXPECT_EQ(3, exe.count2);
234 }
235
236 TEST_F(ViaFixture, chainX1) {
237   EXPECT_EQ(42,
238             makeFuture()
239             .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
240             .get());
241 }
242
243 TEST_F(ViaFixture, chainX3) {
244   auto westThreadId = std::this_thread::get_id();
245   int count = 0;
246   auto f = via(westExecutor.get()).thenMultiWithExecutor(
247       eastExecutor.get(),
248       [&]{
249         EXPECT_NE(std::this_thread::get_id(), westThreadId);
250         count++; return 3.14159;
251       },
252       [&](double) { count++; return std::string("hello"); },
253       [&]{ count++; })
254     .then([&](){
255         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
256         return makeFuture(42);
257     });
258   EXPECT_EQ(42, f.getVia(waiter.get()));
259   EXPECT_EQ(3, count);
260 }
261
262 TEST(Via, then2) {
263   ManualExecutor x1, x2;
264   bool a = false, b = false, c = false;
265   via(&x1)
266     .then([&]{ a = true; })
267     .then(&x2, [&]{ b = true; })
268     .then([&]{ c = true; });
269
270   EXPECT_FALSE(a);
271   EXPECT_FALSE(b);
272
273   x1.run();
274   EXPECT_TRUE(a);
275   EXPECT_FALSE(b);
276   EXPECT_FALSE(c);
277
278   x2.run();
279   EXPECT_TRUE(b);
280   EXPECT_FALSE(c);
281
282   x1.run();
283   EXPECT_TRUE(c);
284 }
285
286 TEST(Via, then2Variadic) {
287   struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
288   Foo f;
289   ManualExecutor x;
290   makeFuture().then(&x, &Foo::foo, &f);
291   EXPECT_FALSE(f.a);
292   x.run();
293   EXPECT_TRUE(f.a);
294 }
295
296 #ifndef __APPLE__ // TODO #7372389
297 /// Simple executor that does work in another thread
298 class ThreadExecutor : public Executor {
299   folly::MPMCQueue<Func> funcs;
300   std::atomic<bool> done {false};
301   std::thread worker;
302   folly::Baton<> baton;
303
304   void work() {
305     baton.post();
306     Func fn;
307     while (!done) {
308       while (!funcs.isEmpty()) {
309         funcs.blockingRead(fn);
310         fn();
311       }
312     }
313   }
314
315  public:
316   explicit ThreadExecutor(size_t n = 1024)
317     : funcs(n) {
318     worker = std::thread(std::bind(&ThreadExecutor::work, this));
319   }
320
321   ~ThreadExecutor() override {
322     done = true;
323     funcs.write([]{});
324     worker.join();
325   }
326
327   void add(Func fn) override {
328     funcs.blockingWrite(std::move(fn));
329   }
330
331   void waitForStartup() {
332     baton.wait();
333   }
334 };
335
336 TEST(Via, viaThenGetWasRacy) {
337   ThreadExecutor x;
338   std::unique_ptr<int> val =
339       folly::via(&x).then([] { return std::make_unique<int>(42); }).get();
340   ASSERT_TRUE(!!val);
341   EXPECT_EQ(42, *val);
342 }
343
344 TEST(Via, callbackRace) {
345   ThreadExecutor x;
346
347   auto fn = [&x]{
348     auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
349     std::vector<Future<Unit>> futures;
350
351     for (auto& p : *promises) {
352       futures.emplace_back(
353         p.getFuture()
354         .via(&x)
355         .then([](Try<Unit>&&){}));
356     }
357
358     x.waitForStartup();
359     x.add([promises]{
360       for (auto& p : *promises) {
361         p.setValue();
362       }
363     });
364
365     return collectAll(futures);
366   };
367
368   fn().wait();
369 }
370 #endif
371
372 class DummyDrivableExecutor : public DrivableExecutor {
373  public:
374   void add(Func /* f */) override {}
375   void drive() override { ran = true; }
376   bool ran{false};
377 };
378
379 TEST(Via, getVia) {
380   {
381     // non-void
382     ManualExecutor x;
383     auto f = via(&x).then([]{ return true; });
384     EXPECT_TRUE(f.getVia(&x));
385   }
386
387   {
388     // void
389     ManualExecutor x;
390     auto f = via(&x).then();
391     f.getVia(&x);
392   }
393
394   {
395     DummyDrivableExecutor x;
396     auto f = makeFuture(true);
397     EXPECT_TRUE(f.getVia(&x));
398     EXPECT_FALSE(x.ran);
399   }
400 }
401
402 TEST(Via, getTryVia) {
403   {
404     // non-void
405     ManualExecutor x;
406     auto f = via(&x).then([] { return 23; });
407     EXPECT_FALSE(f.isReady());
408     EXPECT_EQ(23, f.getTryVia(&x).value());
409   }
410
411   {
412     // void
413     ManualExecutor x;
414     auto f = via(&x).then();
415     EXPECT_FALSE(f.isReady());
416     auto t = f.getTryVia(&x);
417     EXPECT_TRUE(t.hasValue());
418   }
419
420   {
421     DummyDrivableExecutor x;
422     auto f = makeFuture(23);
423     EXPECT_EQ(23, f.getTryVia(&x).value());
424     EXPECT_FALSE(x.ran);
425   }
426 }
427
428 TEST(Via, waitVia) {
429   {
430     ManualExecutor x;
431     auto f = via(&x).then();
432     EXPECT_FALSE(f.isReady());
433     f.waitVia(&x);
434     EXPECT_TRUE(f.isReady());
435   }
436
437   {
438     // try rvalue as well
439     ManualExecutor x;
440     auto f = via(&x).then().waitVia(&x);
441     EXPECT_TRUE(f.isReady());
442   }
443
444   {
445     DummyDrivableExecutor x;
446     makeFuture(true).waitVia(&x);
447     EXPECT_FALSE(x.ran);
448   }
449 }
450
451 TEST(Via, viaRaces) {
452   ManualExecutor x;
453   Promise<Unit> p;
454   auto tid = std::this_thread::get_id();
455   bool done = false;
456
457   std::thread t1([&] {
458     p.getFuture()
459       .via(&x)
460       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
461       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
462       .then([&](Try<Unit>&&) { done = true; });
463   });
464
465   std::thread t2([&] {
466     p.setValue();
467   });
468
469   while (!done) x.run();
470   t1.join();
471   t2.join();
472 }
473
474 TEST(Via, viaDummyExecutorFutureSetValueFirst) {
475   // The callback object will get destroyed when passed to the executor.
476
477   // A promise will be captured by the callback lambda so we can observe that
478   // it will be destroyed.
479   Promise<Unit> captured_promise;
480   auto captured_promise_future = captured_promise.getFuture();
481
482   DummyDrivableExecutor x;
483   auto future = makeFuture().via(&x).then(
484       [c = std::move(captured_promise)] { return 42; });
485
486   EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
487   EXPECT_THROW(
488       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
489 }
490
491 TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
492   // The callback object will get destroyed when passed to the executor.
493
494   // A promise will be captured by the callback lambda so we can observe that
495   // it will be destroyed.
496   Promise<Unit> captured_promise;
497   auto captured_promise_future = captured_promise.getFuture();
498
499   DummyDrivableExecutor x;
500   Promise<Unit> trigger;
501   auto future = trigger.getFuture().via(&x).then(
502       [c = std::move(captured_promise)] { return 42; });
503   trigger.setValue();
504
505   EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
506   EXPECT_THROW(
507       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
508 }
509
510 TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
511   // The callback object will get destroyed when the ManualExecutor runs out
512   // of scope.
513
514   // A promise will be captured by the callback lambda so we can observe that
515   // it will be destroyed.
516   Promise<Unit> captured_promise;
517   auto captured_promise_future = captured_promise.getFuture();
518
519   Optional<Future<int>> future;
520   {
521     ManualExecutor x;
522     future = makeFuture().via(&x).then(
523         [c = std::move(captured_promise)] { return 42; });
524   }
525
526   EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
527   EXPECT_THROW(
528       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
529 }
530
531 TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
532   // The callback object will get destroyed when the ManualExecutor runs out
533   // of scope.
534
535   // A promise will be captured by the callback lambda so we can observe that
536   // it will be destroyed.
537   Promise<Unit> captured_promise;
538   auto captured_promise_future = captured_promise.getFuture();
539
540   Optional<Future<int>> future;
541   {
542     ManualExecutor x;
543     Promise<Unit> trigger;
544     future = trigger.getFuture().via(&x).then(
545         [c = std::move(captured_promise)] { return 42; });
546     trigger.setValue();
547   }
548
549   EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
550   EXPECT_THROW(
551       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
552 }
553
554 TEST(ViaFunc, liftsVoid) {
555   ManualExecutor x;
556   int count = 0;
557   Future<Unit> f = via(&x, [&]{ count++; });
558
559   EXPECT_EQ(0, count);
560   x.run();
561   EXPECT_EQ(1, count);
562 }
563
564 TEST(ViaFunc, value) {
565   ManualExecutor x;
566   EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
567 }
568
569 TEST(ViaFunc, exception) {
570   ManualExecutor x;
571   EXPECT_THROW(
572     via(&x, []() -> int { throw std::runtime_error("expected"); })
573       .getVia(&x),
574     std::runtime_error);
575 }
576
577 TEST(ViaFunc, future) {
578   ManualExecutor x;
579   EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
580             .getVia(&x));
581 }
582
583 TEST(ViaFunc, voidFuture) {
584   ManualExecutor x;
585   int count = 0;
586   via(&x, [&]{ count++; }).getVia(&x);
587   EXPECT_EQ(1, count);
588 }
589
590 TEST(ViaFunc, isSticky) {
591   ManualExecutor x;
592   int count = 0;
593
594   auto f = via(&x, [&]{ count++; });
595   x.run();
596
597   f.then([&]{ count++; });
598   EXPECT_EQ(1, count);
599   x.run();
600   EXPECT_EQ(2, count);
601 }
602
603 TEST(ViaFunc, moveOnly) {
604   ManualExecutor x;
605   auto intp = std::make_unique<int>(42);
606
607   EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));
608 }