Move folly/Baton.h to folly/synchronization/
[folly.git] / folly / futures / test / ViaTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <thread>
18
19 #include <folly/MPMCQueue.h>
20 #include <folly/executors/DrivableExecutor.h>
21 #include <folly/executors/InlineExecutor.h>
22 #include <folly/executors/ManualExecutor.h>
23 #include <folly/futures/Future.h>
24 #include <folly/portability/GTest.h>
25 #include <folly/synchronization/Baton.h>
26
27 using namespace folly;
28
29 struct ManualWaiter : public DrivableExecutor {
30   explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
31
32   void add(Func f) override {
33     ex->add(std::move(f));
34   }
35
36   void drive() override {
37     ex->wait();
38     ex->run();
39   }
40
41   std::shared_ptr<ManualExecutor> ex;
42 };
43
44 struct ViaFixture : public testing::Test {
45   ViaFixture() :
46     westExecutor(new ManualExecutor),
47     eastExecutor(new ManualExecutor),
48     waiter(new ManualWaiter(westExecutor)),
49     done(false)
50   {
51     t = std::thread([=] {
52         ManualWaiter eastWaiter(eastExecutor);
53         while (!done) {
54           eastWaiter.drive();
55         }
56       });
57   }
58
59   ~ViaFixture() override {
60     done = true;
61     eastExecutor->add([=]() { });
62     t.join();
63   }
64
65   void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
66     eastExecutor->add([=]() {
67       cob(a + b);
68     });
69   }
70
71   std::shared_ptr<ManualExecutor> westExecutor;
72   std::shared_ptr<ManualExecutor> eastExecutor;
73   std::shared_ptr<ManualWaiter> waiter;
74   InlineExecutor inlineExecutor;
75   std::atomic<bool> done;
76   std::thread t;
77 };
78
79 TEST(Via, exceptionOnLaunch) {
80   auto future = makeFuture<int>(std::runtime_error("E"));
81   EXPECT_THROW(future.value(), std::runtime_error);
82 }
83
84 TEST(Via, thenValue) {
85   auto future = makeFuture(std::move(1))
86     .then([](Try<int>&& t) {
87       return t.value() == 1;
88     })
89     ;
90
91   EXPECT_TRUE(future.value());
92 }
93
94 TEST(Via, thenFuture) {
95   auto future = makeFuture(1)
96     .then([](Try<int>&& t) {
97       return makeFuture(t.value() == 1);
98     });
99   EXPECT_TRUE(future.value());
100 }
101
102 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
103   return makeFuture(t.value() + ";static");
104 }
105
106 TEST(Via, thenFunction) {
107   struct Worker {
108     Future<std::string> doWork(Try<std::string>&& t) {
109       return makeFuture(t.value() + ";class");
110     }
111     static Future<std::string> doWorkStatic(Try<std::string>&& t) {
112       return makeFuture(t.value() + ";class-static");
113     }
114   } w;
115
116   auto f = makeFuture(std::string("start"))
117     .then(doWorkStatic)
118     .then(Worker::doWorkStatic)
119     .then(&Worker::doWork, &w)
120     ;
121
122   EXPECT_EQ(f.value(), "start;static;class-static;class");
123 }
124
125 TEST_F(ViaFixture, threadHops) {
126   auto westThreadId = std::this_thread::get_id();
127   auto f = via(eastExecutor.get())
128                .then([=](Try<Unit>&& /* t */) {
129                  EXPECT_NE(std::this_thread::get_id(), westThreadId);
130                  return makeFuture<int>(1);
131                })
132                .via(westExecutor.get())
133                .then([=](Try<int>&& t) {
134                  EXPECT_EQ(std::this_thread::get_id(), westThreadId);
135                  return t.value();
136                });
137   EXPECT_EQ(f.getVia(waiter.get()), 1);
138 }
139
140 TEST_F(ViaFixture, chainVias) {
141   auto westThreadId = std::this_thread::get_id();
142   auto f = via(eastExecutor.get()).then([=]() {
143     EXPECT_NE(std::this_thread::get_id(), westThreadId);
144     return 1;
145   }).then([=](int val) {
146     return makeFuture(val).via(westExecutor.get())
147       .then([=](int v) mutable {
148         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
149         return v + 1;
150       });
151   }).then([=](int val) {
152     // even though ultimately the future that triggers this one executed in
153     // the west thread, this then() inherited the executor from its
154     // predecessor, ie the eastExecutor.
155     EXPECT_NE(std::this_thread::get_id(), westThreadId);
156     return val + 1;
157   }).via(westExecutor.get()).then([=](int val) {
158     // go back to west, so we can wait on it
159     EXPECT_EQ(std::this_thread::get_id(), westThreadId);
160     return val + 1;
161   });
162
163   EXPECT_EQ(f.getVia(waiter.get()), 4);
164 }
165
166 TEST_F(ViaFixture, bareViaAssignment) {
167   auto f = via(eastExecutor.get());
168 }
169 TEST_F(ViaFixture, viaAssignment) {
170   // via()&&
171   auto f = makeFuture().via(eastExecutor.get());
172   // via()&
173   auto f2 = f.via(eastExecutor.get());
174 }
175
176 TEST(Via, chain1) {
177   EXPECT_EQ(42,
178             makeFuture()
179             .thenMulti([] { return 42; })
180             .get());
181 }
182
183 TEST(Via, chain3) {
184   int count = 0;
185   auto f = makeFuture().thenMulti(
186       [&]{ count++; return 3.14159; },
187       [&](double) { count++; return std::string("hello"); },
188       [&]{ count++; return makeFuture(42); });
189   EXPECT_EQ(42, f.get());
190   EXPECT_EQ(3, count);
191 }
192
193 struct PriorityExecutor : public Executor {
194   void add(Func /* f */) override {}
195
196   void addWithPriority(Func f, int8_t priority) override {
197     int mid = getNumPriorities() / 2;
198     int p = priority < 0 ?
199             std::max(0, mid + priority) :
200             std::min(getNumPriorities() - 1, mid + priority);
201     EXPECT_LT(p, 3);
202     EXPECT_GE(p, 0);
203     if (p == 0) {
204       count0++;
205     } else if (p == 1) {
206       count1++;
207     } else if (p == 2) {
208       count2++;
209     }
210     f();
211   }
212
213   uint8_t getNumPriorities() const override {
214     return 3;
215   }
216
217   int count0{0};
218   int count1{0};
219   int count2{0};
220 };
221
222 TEST(Via, priority) {
223   PriorityExecutor exe;
224   via(&exe, -1).then([]{});
225   via(&exe, 0).then([]{});
226   via(&exe, 1).then([]{});
227   via(&exe, 42).then([]{});  // overflow should go to max priority
228   via(&exe, -42).then([]{}); // underflow should go to min priority
229   via(&exe).then([]{});      // default to mid priority
230   via(&exe, Executor::LO_PRI).then([]{});
231   via(&exe, Executor::HI_PRI).then([]{});
232   EXPECT_EQ(3, exe.count0);
233   EXPECT_EQ(2, exe.count1);
234   EXPECT_EQ(3, exe.count2);
235 }
236
237 TEST_F(ViaFixture, chainX1) {
238   EXPECT_EQ(42,
239             makeFuture()
240             .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
241             .get());
242 }
243
244 TEST_F(ViaFixture, chainX3) {
245   auto westThreadId = std::this_thread::get_id();
246   int count = 0;
247   auto f = via(westExecutor.get()).thenMultiWithExecutor(
248       eastExecutor.get(),
249       [&]{
250         EXPECT_NE(std::this_thread::get_id(), westThreadId);
251         count++; return 3.14159;
252       },
253       [&](double) { count++; return std::string("hello"); },
254       [&]{ count++; })
255     .then([&](){
256         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
257         return makeFuture(42);
258     });
259   EXPECT_EQ(42, f.getVia(waiter.get()));
260   EXPECT_EQ(3, count);
261 }
262
263 TEST(Via, then2) {
264   ManualExecutor x1, x2;
265   bool a = false, b = false, c = false;
266   via(&x1)
267     .then([&]{ a = true; })
268     .then(&x2, [&]{ b = true; })
269     .then([&]{ c = true; });
270
271   EXPECT_FALSE(a);
272   EXPECT_FALSE(b);
273
274   x1.run();
275   EXPECT_TRUE(a);
276   EXPECT_FALSE(b);
277   EXPECT_FALSE(c);
278
279   x2.run();
280   EXPECT_TRUE(b);
281   EXPECT_FALSE(c);
282
283   x1.run();
284   EXPECT_TRUE(c);
285 }
286
287 TEST(Via, then2Variadic) {
288   struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
289   Foo f;
290   ManualExecutor x;
291   makeFuture().then(&x, &Foo::foo, &f);
292   EXPECT_FALSE(f.a);
293   x.run();
294   EXPECT_TRUE(f.a);
295 }
296
297 #ifndef __APPLE__ // TODO #7372389
298 /// Simple executor that does work in another thread
299 class ThreadExecutor : public Executor {
300   folly::MPMCQueue<Func> funcs;
301   std::atomic<bool> done {false};
302   std::thread worker;
303   folly::Baton<> baton;
304
305   void work() {
306     baton.post();
307     Func fn;
308     while (!done) {
309       while (!funcs.isEmpty()) {
310         funcs.blockingRead(fn);
311         fn();
312       }
313     }
314   }
315
316  public:
317   explicit ThreadExecutor(size_t n = 1024)
318     : funcs(n) {
319     worker = std::thread(std::bind(&ThreadExecutor::work, this));
320   }
321
322   ~ThreadExecutor() override {
323     done = true;
324     funcs.write([]{});
325     worker.join();
326   }
327
328   void add(Func fn) override {
329     funcs.blockingWrite(std::move(fn));
330   }
331
332   void waitForStartup() {
333     baton.wait();
334   }
335 };
336
337 TEST(Via, viaThenGetWasRacy) {
338   ThreadExecutor x;
339   std::unique_ptr<int> val =
340       folly::via(&x).then([] { return std::make_unique<int>(42); }).get();
341   ASSERT_TRUE(!!val);
342   EXPECT_EQ(42, *val);
343 }
344
345 TEST(Via, callbackRace) {
346   ThreadExecutor x;
347
348   auto fn = [&x]{
349     auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
350     std::vector<Future<Unit>> futures;
351
352     for (auto& p : *promises) {
353       futures.emplace_back(
354         p.getFuture()
355         .via(&x)
356         .then([](Try<Unit>&&){}));
357     }
358
359     x.waitForStartup();
360     x.add([promises]{
361       for (auto& p : *promises) {
362         p.setValue();
363       }
364     });
365
366     return collectAll(futures);
367   };
368
369   fn().wait();
370 }
371 #endif
372
373 class DummyDrivableExecutor : public DrivableExecutor {
374  public:
375   void add(Func /* f */) override {}
376   void drive() override { ran = true; }
377   bool ran{false};
378 };
379
380 TEST(Via, getVia) {
381   {
382     // non-void
383     ManualExecutor x;
384     auto f = via(&x).then([]{ return true; });
385     EXPECT_TRUE(f.getVia(&x));
386   }
387
388   {
389     // void
390     ManualExecutor x;
391     auto f = via(&x).then();
392     f.getVia(&x);
393   }
394
395   {
396     DummyDrivableExecutor x;
397     auto f = makeFuture(true);
398     EXPECT_TRUE(f.getVia(&x));
399     EXPECT_FALSE(x.ran);
400   }
401 }
402
403 TEST(Via, getTryVia) {
404   {
405     // non-void
406     ManualExecutor x;
407     auto f = via(&x).then([] { return 23; });
408     EXPECT_FALSE(f.isReady());
409     EXPECT_EQ(23, f.getTryVia(&x).value());
410   }
411
412   {
413     // void
414     ManualExecutor x;
415     auto f = via(&x).then();
416     EXPECT_FALSE(f.isReady());
417     auto t = f.getTryVia(&x);
418     EXPECT_TRUE(t.hasValue());
419   }
420
421   {
422     DummyDrivableExecutor x;
423     auto f = makeFuture(23);
424     EXPECT_EQ(23, f.getTryVia(&x).value());
425     EXPECT_FALSE(x.ran);
426   }
427 }
428
429 TEST(Via, waitVia) {
430   {
431     ManualExecutor x;
432     auto f = via(&x).then();
433     EXPECT_FALSE(f.isReady());
434     f.waitVia(&x);
435     EXPECT_TRUE(f.isReady());
436   }
437
438   {
439     // try rvalue as well
440     ManualExecutor x;
441     auto f = via(&x).then().waitVia(&x);
442     EXPECT_TRUE(f.isReady());
443   }
444
445   {
446     DummyDrivableExecutor x;
447     makeFuture(true).waitVia(&x);
448     EXPECT_FALSE(x.ran);
449   }
450 }
451
452 TEST(Via, viaRaces) {
453   ManualExecutor x;
454   Promise<Unit> p;
455   auto tid = std::this_thread::get_id();
456   bool done = false;
457
458   std::thread t1([&] {
459     p.getFuture()
460       .via(&x)
461       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
462       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
463       .then([&](Try<Unit>&&) { done = true; });
464   });
465
466   std::thread t2([&] {
467     p.setValue();
468   });
469
470   while (!done) {
471     x.run();
472   }
473   t1.join();
474   t2.join();
475 }
476
477 TEST(Via, viaDummyExecutorFutureSetValueFirst) {
478   // The callback object will get destroyed when passed to the executor.
479
480   // A promise will be captured by the callback lambda so we can observe that
481   // it will be destroyed.
482   Promise<Unit> captured_promise;
483   auto captured_promise_future = captured_promise.getFuture();
484
485   DummyDrivableExecutor x;
486   auto future = makeFuture().via(&x).then(
487       [c = std::move(captured_promise)] { return 42; });
488
489   EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
490   EXPECT_THROW(
491       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
492 }
493
494 TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
495   // The callback object will get destroyed when passed to the executor.
496
497   // A promise will be captured by the callback lambda so we can observe that
498   // it will be destroyed.
499   Promise<Unit> captured_promise;
500   auto captured_promise_future = captured_promise.getFuture();
501
502   DummyDrivableExecutor x;
503   Promise<Unit> trigger;
504   auto future = trigger.getFuture().via(&x).then(
505       [c = std::move(captured_promise)] { return 42; });
506   trigger.setValue();
507
508   EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
509   EXPECT_THROW(
510       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
511 }
512
513 TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
514   // The callback object will get destroyed when the ManualExecutor runs out
515   // of scope.
516
517   // A promise will be captured by the callback lambda so we can observe that
518   // it will be destroyed.
519   Promise<Unit> captured_promise;
520   auto captured_promise_future = captured_promise.getFuture();
521
522   Optional<Future<int>> future;
523   {
524     ManualExecutor x;
525     future = makeFuture().via(&x).then(
526         [c = std::move(captured_promise)] { return 42; });
527   }
528
529   EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
530   EXPECT_THROW(
531       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
532 }
533
534 TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
535   // The callback object will get destroyed when the ManualExecutor runs out
536   // of scope.
537
538   // A promise will be captured by the callback lambda so we can observe that
539   // it will be destroyed.
540   Promise<Unit> captured_promise;
541   auto captured_promise_future = captured_promise.getFuture();
542
543   Optional<Future<int>> future;
544   {
545     ManualExecutor x;
546     Promise<Unit> trigger;
547     future = trigger.getFuture().via(&x).then(
548         [c = std::move(captured_promise)] { return 42; });
549     trigger.setValue();
550   }
551
552   EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
553   EXPECT_THROW(
554       captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
555 }
556
557 TEST(ViaFunc, liftsVoid) {
558   ManualExecutor x;
559   int count = 0;
560   Future<Unit> f = via(&x, [&]{ count++; });
561
562   EXPECT_EQ(0, count);
563   x.run();
564   EXPECT_EQ(1, count);
565 }
566
567 TEST(ViaFunc, value) {
568   ManualExecutor x;
569   EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
570 }
571
572 TEST(ViaFunc, exception) {
573   ManualExecutor x;
574   EXPECT_THROW(
575     via(&x, []() -> int { throw std::runtime_error("expected"); })
576       .getVia(&x),
577     std::runtime_error);
578 }
579
580 TEST(ViaFunc, future) {
581   ManualExecutor x;
582   EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
583             .getVia(&x));
584 }
585
586 TEST(ViaFunc, voidFuture) {
587   ManualExecutor x;
588   int count = 0;
589   via(&x, [&]{ count++; }).getVia(&x);
590   EXPECT_EQ(1, count);
591 }
592
593 TEST(ViaFunc, isSticky) {
594   ManualExecutor x;
595   int count = 0;
596
597   auto f = via(&x, [&]{ count++; });
598   x.run();
599
600   f.then([&]{ count++; });
601   EXPECT_EQ(1, count);
602   x.run();
603   EXPECT_EQ(2, count);
604 }
605
606 TEST(ViaFunc, moveOnly) {
607   ManualExecutor x;
608   auto intp = std::make_unique<int>(42);
609
610   EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));
611 }