2bdabfc04974cd167dd35af60f03a58704d8d6f5
[folly.git] / folly / futures / test / ViaTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/futures/Future.h>
18 #include <folly/futures/InlineExecutor.h>
19 #include <folly/futures/ManualExecutor.h>
20 #include <folly/futures/DrivableExecutor.h>
21 #include <folly/Baton.h>
22 #include <folly/MPMCQueue.h>
23 #include <folly/portability/GTest.h>
24
25 #include <thread>
26
27 using namespace folly;
28
29 struct ManualWaiter : public DrivableExecutor {
30   explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
31
32   void add(Func f) override {
33     ex->add(std::move(f));
34   }
35
36   void drive() override {
37     ex->wait();
38     ex->run();
39   }
40
41   std::shared_ptr<ManualExecutor> ex;
42 };
43
44 struct ViaFixture : public testing::Test {
45   ViaFixture() :
46     westExecutor(new ManualExecutor),
47     eastExecutor(new ManualExecutor),
48     waiter(new ManualWaiter(westExecutor)),
49     done(false)
50   {
51     t = std::thread([=] {
52         ManualWaiter eastWaiter(eastExecutor);
53         while (!done)
54           eastWaiter.drive();
55       });
56   }
57
58   ~ViaFixture() override {
59     done = true;
60     eastExecutor->add([=]() { });
61     t.join();
62   }
63
64   void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
65     eastExecutor->add([=]() {
66       cob(a + b);
67     });
68   }
69
70   std::shared_ptr<ManualExecutor> westExecutor;
71   std::shared_ptr<ManualExecutor> eastExecutor;
72   std::shared_ptr<ManualWaiter> waiter;
73   InlineExecutor inlineExecutor;
74   std::atomic<bool> done;
75   std::thread t;
76 };
77
78 TEST(Via, exceptionOnLaunch) {
79   auto future = makeFuture<int>(std::runtime_error("E"));
80   EXPECT_THROW(future.value(), std::runtime_error);
81 }
82
83 TEST(Via, thenValue) {
84   auto future = makeFuture(std::move(1))
85     .then([](Try<int>&& t) {
86       return t.value() == 1;
87     })
88     ;
89
90   EXPECT_TRUE(future.value());
91 }
92
93 TEST(Via, thenFuture) {
94   auto future = makeFuture(1)
95     .then([](Try<int>&& t) {
96       return makeFuture(t.value() == 1);
97     });
98   EXPECT_TRUE(future.value());
99 }
100
101 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
102   return makeFuture(t.value() + ";static");
103 }
104
105 TEST(Via, thenFunction) {
106   struct Worker {
107     Future<std::string> doWork(Try<std::string>&& t) {
108       return makeFuture(t.value() + ";class");
109     }
110     static Future<std::string> doWorkStatic(Try<std::string>&& t) {
111       return makeFuture(t.value() + ";class-static");
112     }
113   } w;
114
115   auto f = makeFuture(std::string("start"))
116     .then(doWorkStatic)
117     .then(Worker::doWorkStatic)
118     .then(&Worker::doWork, &w)
119     ;
120
121   EXPECT_EQ(f.value(), "start;static;class-static;class");
122 }
123
124 TEST_F(ViaFixture, threadHops) {
125   auto westThreadId = std::this_thread::get_id();
126   auto f = via(eastExecutor.get())
127                .then([=](Try<Unit>&& /* t */) {
128                  EXPECT_NE(std::this_thread::get_id(), westThreadId);
129                  return makeFuture<int>(1);
130                })
131                .via(westExecutor.get())
132                .then([=](Try<int>&& t) {
133                  EXPECT_EQ(std::this_thread::get_id(), westThreadId);
134                  return t.value();
135                });
136   EXPECT_EQ(f.getVia(waiter.get()), 1);
137 }
138
139 TEST_F(ViaFixture, chainVias) {
140   auto westThreadId = std::this_thread::get_id();
141   auto f = via(eastExecutor.get()).then([=]() {
142     EXPECT_NE(std::this_thread::get_id(), westThreadId);
143     return 1;
144   }).then([=](int val) {
145     return makeFuture(val).via(westExecutor.get())
146       .then([=](int v) mutable {
147         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
148         return v + 1;
149       });
150   }).then([=](int val) {
151     // even though ultimately the future that triggers this one executed in
152     // the west thread, this then() inherited the executor from its
153     // predecessor, ie the eastExecutor.
154     EXPECT_NE(std::this_thread::get_id(), westThreadId);
155     return val + 1;
156   }).via(westExecutor.get()).then([=](int val) {
157     // go back to west, so we can wait on it
158     EXPECT_EQ(std::this_thread::get_id(), westThreadId);
159     return val + 1;
160   });
161
162   EXPECT_EQ(f.getVia(waiter.get()), 4);
163 }
164
165 TEST_F(ViaFixture, bareViaAssignment) {
166   auto f = via(eastExecutor.get());
167 }
168 TEST_F(ViaFixture, viaAssignment) {
169   // via()&&
170   auto f = makeFuture().via(eastExecutor.get());
171   // via()&
172   auto f2 = f.via(eastExecutor.get());
173 }
174
175 TEST(Via, chain1) {
176   EXPECT_EQ(42,
177             makeFuture()
178             .thenMulti([] { return 42; })
179             .get());
180 }
181
182 TEST(Via, chain3) {
183   int count = 0;
184   auto f = makeFuture().thenMulti(
185       [&]{ count++; return 3.14159; },
186       [&](double) { count++; return std::string("hello"); },
187       [&]{ count++; return makeFuture(42); });
188   EXPECT_EQ(42, f.get());
189   EXPECT_EQ(3, count);
190 }
191
192 struct PriorityExecutor : public Executor {
193   void add(Func /* f */) override {}
194
195   void addWithPriority(Func f, int8_t priority) override {
196     int mid = getNumPriorities() / 2;
197     int p = priority < 0 ?
198             std::max(0, mid + priority) :
199             std::min(getNumPriorities() - 1, mid + priority);
200     EXPECT_LT(p, 3);
201     EXPECT_GE(p, 0);
202     if (p == 0) {
203       count0++;
204     } else if (p == 1) {
205       count1++;
206     } else if (p == 2) {
207       count2++;
208     }
209     f();
210   }
211
212   uint8_t getNumPriorities() const override {
213     return 3;
214   }
215
216   int count0{0};
217   int count1{0};
218   int count2{0};
219 };
220
221 TEST(Via, priority) {
222   PriorityExecutor exe;
223   via(&exe, -1).then([]{});
224   via(&exe, 0).then([]{});
225   via(&exe, 1).then([]{});
226   via(&exe, 42).then([]{});  // overflow should go to max priority
227   via(&exe, -42).then([]{}); // underflow should go to min priority
228   via(&exe).then([]{});      // default to mid priority
229   via(&exe, Executor::LO_PRI).then([]{});
230   via(&exe, Executor::HI_PRI).then([]{});
231   EXPECT_EQ(3, exe.count0);
232   EXPECT_EQ(2, exe.count1);
233   EXPECT_EQ(3, exe.count2);
234 }
235
236 TEST_F(ViaFixture, chainX1) {
237   EXPECT_EQ(42,
238             makeFuture()
239             .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
240             .get());
241 }
242
243 TEST_F(ViaFixture, chainX3) {
244   auto westThreadId = std::this_thread::get_id();
245   int count = 0;
246   auto f = via(westExecutor.get()).thenMultiWithExecutor(
247       eastExecutor.get(),
248       [&]{
249         EXPECT_NE(std::this_thread::get_id(), westThreadId);
250         count++; return 3.14159;
251       },
252       [&](double) { count++; return std::string("hello"); },
253       [&]{ count++; })
254     .then([&](){
255         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
256         return makeFuture(42);
257     });
258   EXPECT_EQ(42, f.getVia(waiter.get()));
259   EXPECT_EQ(3, count);
260 }
261
262 TEST(Via, then2) {
263   ManualExecutor x1, x2;
264   bool a = false, b = false, c = false;
265   via(&x1)
266     .then([&]{ a = true; })
267     .then(&x2, [&]{ b = true; })
268     .then([&]{ c = true; });
269
270   EXPECT_FALSE(a);
271   EXPECT_FALSE(b);
272
273   x1.run();
274   EXPECT_TRUE(a);
275   EXPECT_FALSE(b);
276   EXPECT_FALSE(c);
277
278   x2.run();
279   EXPECT_TRUE(b);
280   EXPECT_FALSE(c);
281
282   x1.run();
283   EXPECT_TRUE(c);
284 }
285
286 TEST(Via, then2Variadic) {
287   struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
288   Foo f;
289   ManualExecutor x;
290   makeFuture().then(&x, &Foo::foo, &f);
291   EXPECT_FALSE(f.a);
292   x.run();
293   EXPECT_TRUE(f.a);
294 }
295
296 #ifndef __APPLE__ // TODO #7372389
297 /// Simple executor that does work in another thread
298 class ThreadExecutor : public Executor {
299   folly::MPMCQueue<Func> funcs;
300   std::atomic<bool> done {false};
301   std::thread worker;
302   folly::Baton<> baton;
303
304   void work() {
305     baton.post();
306     Func fn;
307     while (!done) {
308       while (!funcs.isEmpty()) {
309         funcs.blockingRead(fn);
310         fn();
311       }
312     }
313   }
314
315  public:
316   explicit ThreadExecutor(size_t n = 1024)
317     : funcs(n) {
318     worker = std::thread(std::bind(&ThreadExecutor::work, this));
319   }
320
321   ~ThreadExecutor() override {
322     done = true;
323     funcs.write([]{});
324     worker.join();
325   }
326
327   void add(Func fn) override {
328     funcs.blockingWrite(std::move(fn));
329   }
330
331   void waitForStartup() {
332     baton.wait();
333   }
334 };
335
336 TEST(Via, viaThenGetWasRacy) {
337   ThreadExecutor x;
338   std::unique_ptr<int> val = folly::via(&x)
339     .then([] { return folly::make_unique<int>(42); })
340     .get();
341   ASSERT_TRUE(!!val);
342   EXPECT_EQ(42, *val);
343 }
344
345 TEST(Via, callbackRace) {
346   ThreadExecutor x;
347
348   auto fn = [&x]{
349     auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
350     std::vector<Future<Unit>> futures;
351
352     for (auto& p : *promises) {
353       futures.emplace_back(
354         p.getFuture()
355         .via(&x)
356         .then([](Try<Unit>&&){}));
357     }
358
359     x.waitForStartup();
360     x.add([promises]{
361       for (auto& p : *promises) {
362         p.setValue();
363       }
364     });
365
366     return collectAll(futures);
367   };
368
369   fn().wait();
370 }
371 #endif
372
373 class DummyDrivableExecutor : public DrivableExecutor {
374  public:
375   void add(Func /* f */) override {}
376   void drive() override { ran = true; }
377   bool ran{false};
378 };
379
380 TEST(Via, getVia) {
381   {
382     // non-void
383     ManualExecutor x;
384     auto f = via(&x).then([]{ return true; });
385     EXPECT_TRUE(f.getVia(&x));
386   }
387
388   {
389     // void
390     ManualExecutor x;
391     auto f = via(&x).then();
392     f.getVia(&x);
393   }
394
395   {
396     DummyDrivableExecutor x;
397     auto f = makeFuture(true);
398     EXPECT_TRUE(f.getVia(&x));
399     EXPECT_FALSE(x.ran);
400   }
401 }
402
403 TEST(Via, getTryVia) {
404   {
405     // non-void
406     ManualExecutor x;
407     auto f = via(&x).then([] { return 23; });
408     EXPECT_FALSE(f.isReady());
409     EXPECT_EQ(23, f.getTryVia(&x).value());
410   }
411
412   {
413     // void
414     ManualExecutor x;
415     auto f = via(&x).then();
416     EXPECT_FALSE(f.isReady());
417     auto t = f.getTryVia(&x);
418     EXPECT_TRUE(t.hasValue());
419   }
420
421   {
422     DummyDrivableExecutor x;
423     auto f = makeFuture(23);
424     EXPECT_EQ(23, f.getTryVia(&x).value());
425     EXPECT_FALSE(x.ran);
426   }
427 }
428
429 TEST(Via, waitVia) {
430   {
431     ManualExecutor x;
432     auto f = via(&x).then();
433     EXPECT_FALSE(f.isReady());
434     f.waitVia(&x);
435     EXPECT_TRUE(f.isReady());
436   }
437
438   {
439     // try rvalue as well
440     ManualExecutor x;
441     auto f = via(&x).then().waitVia(&x);
442     EXPECT_TRUE(f.isReady());
443   }
444
445   {
446     DummyDrivableExecutor x;
447     makeFuture(true).waitVia(&x);
448     EXPECT_FALSE(x.ran);
449   }
450 }
451
452 TEST(Via, viaRaces) {
453   ManualExecutor x;
454   Promise<Unit> p;
455   auto tid = std::this_thread::get_id();
456   bool done = false;
457
458   std::thread t1([&] {
459     p.getFuture()
460       .via(&x)
461       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
462       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
463       .then([&](Try<Unit>&&) { done = true; });
464   });
465
466   std::thread t2([&] {
467     p.setValue();
468   });
469
470   while (!done) x.run();
471   t1.join();
472   t2.join();
473 }
474
475 TEST(Via, viaDummyExecutorFutureSetValueFirst) {
476   // The callback object will get destroyed when passed to the executor.
477
478   // A promise will be captured by the callback lambda so we can observe that
479   // it will be destroyed.
480   Promise<Unit> captured_promise;
481   auto captured_promise_future = captured_promise.getFuture();
482
483   DummyDrivableExecutor x;
484   auto future = makeFuture().via(&x).then(
485       [c = std::move(captured_promise)] { return 42; });
486
487   EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
488   EXPECT_THROW(
489       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
490 }
491
492 TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
493   // The callback object will get destroyed when passed to the executor.
494
495   // A promise will be captured by the callback lambda so we can observe that
496   // it will be destroyed.
497   Promise<Unit> captured_promise;
498   auto captured_promise_future = captured_promise.getFuture();
499
500   DummyDrivableExecutor x;
501   Promise<Unit> trigger;
502   auto future = trigger.getFuture().via(&x).then(
503       [c = std::move(captured_promise)] { return 42; });
504   trigger.setValue();
505
506   EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
507   EXPECT_THROW(
508       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
509 }
510
511 TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
512   // The callback object will get destroyed when the ManualExecutor runs out
513   // of scope.
514
515   // A promise will be captured by the callback lambda so we can observe that
516   // it will be destroyed.
517   Promise<Unit> captured_promise;
518   auto captured_promise_future = captured_promise.getFuture();
519
520   Optional<Future<int>> future;
521   {
522     ManualExecutor x;
523     future = makeFuture().via(&x).then(
524         [c = std::move(captured_promise)] { return 42; });
525   }
526
527   EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
528   EXPECT_THROW(
529       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
530 }
531
532 TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
533   // The callback object will get destroyed when the ManualExecutor runs out
534   // of scope.
535
536   // A promise will be captured by the callback lambda so we can observe that
537   // it will be destroyed.
538   Promise<Unit> captured_promise;
539   auto captured_promise_future = captured_promise.getFuture();
540
541   Optional<Future<int>> future;
542   {
543     ManualExecutor x;
544     Promise<Unit> trigger;
545     future = trigger.getFuture().via(&x).then(
546         [c = std::move(captured_promise)] { return 42; });
547     trigger.setValue();
548   }
549
550   EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
551   EXPECT_THROW(
552       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
553 }
554
555 TEST(ViaFunc, liftsVoid) {
556   ManualExecutor x;
557   int count = 0;
558   Future<Unit> f = via(&x, [&]{ count++; });
559
560   EXPECT_EQ(0, count);
561   x.run();
562   EXPECT_EQ(1, count);
563 }
564
565 TEST(ViaFunc, value) {
566   ManualExecutor x;
567   EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
568 }
569
570 TEST(ViaFunc, exception) {
571   ManualExecutor x;
572   EXPECT_THROW(
573     via(&x, []() -> int { throw std::runtime_error("expected"); })
574       .getVia(&x),
575     std::runtime_error);
576 }
577
578 TEST(ViaFunc, future) {
579   ManualExecutor x;
580   EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
581             .getVia(&x));
582 }
583
584 TEST(ViaFunc, voidFuture) {
585   ManualExecutor x;
586   int count = 0;
587   via(&x, [&]{ count++; }).getVia(&x);
588   EXPECT_EQ(1, count);
589 }
590
591 TEST(ViaFunc, isSticky) {
592   ManualExecutor x;
593   int count = 0;
594
595   auto f = via(&x, [&]{ count++; });
596   x.run();
597
598   f.then([&]{ count++; });
599   EXPECT_EQ(1, count);
600   x.run();
601   EXPECT_EQ(2, count);
602 }
603
604 TEST(ViaFunc, moveOnly) {
605   ManualExecutor x;
606   auto intp = folly::make_unique<int>(42);
607
608   EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));
609 }