2017
[folly.git] / folly / futures / test / ViaTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/futures/Future.h>
18 #include <folly/futures/InlineExecutor.h>
19 #include <folly/futures/ManualExecutor.h>
20 #include <folly/futures/DrivableExecutor.h>
21 #include <folly/Baton.h>
22 #include <folly/MPMCQueue.h>
23 #include <folly/portability/GTest.h>
24
25 #include <thread>
26
27 using namespace folly;
28
29 struct ManualWaiter : public DrivableExecutor {
30   explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
31
32   void add(Func f) override {
33     ex->add(std::move(f));
34   }
35
36   void drive() override {
37     ex->wait();
38     ex->run();
39   }
40
41   std::shared_ptr<ManualExecutor> ex;
42 };
43
44 struct ViaFixture : public testing::Test {
45   ViaFixture() :
46     westExecutor(new ManualExecutor),
47     eastExecutor(new ManualExecutor),
48     waiter(new ManualWaiter(westExecutor)),
49     done(false)
50   {
51     t = std::thread([=] {
52         ManualWaiter eastWaiter(eastExecutor);
53         while (!done)
54           eastWaiter.drive();
55       });
56   }
57
58   ~ViaFixture() override {
59     done = true;
60     eastExecutor->add([=]() { });
61     t.join();
62   }
63
64   void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
65     eastExecutor->add([=]() {
66       cob(a + b);
67     });
68   }
69
70   std::shared_ptr<ManualExecutor> westExecutor;
71   std::shared_ptr<ManualExecutor> eastExecutor;
72   std::shared_ptr<ManualWaiter> waiter;
73   InlineExecutor inlineExecutor;
74   std::atomic<bool> done;
75   std::thread t;
76 };
77
78 TEST(Via, exceptionOnLaunch) {
79   auto future = makeFuture<int>(std::runtime_error("E"));
80   EXPECT_THROW(future.value(), std::runtime_error);
81 }
82
83 TEST(Via, thenValue) {
84   auto future = makeFuture(std::move(1))
85     .then([](Try<int>&& t) {
86       return t.value() == 1;
87     })
88     ;
89
90   EXPECT_TRUE(future.value());
91 }
92
93 TEST(Via, thenFuture) {
94   auto future = makeFuture(1)
95     .then([](Try<int>&& t) {
96       return makeFuture(t.value() == 1);
97     });
98   EXPECT_TRUE(future.value());
99 }
100
101 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
102   return makeFuture(t.value() + ";static");
103 }
104
105 TEST(Via, thenFunction) {
106   struct Worker {
107     Future<std::string> doWork(Try<std::string>&& t) {
108       return makeFuture(t.value() + ";class");
109     }
110     static Future<std::string> doWorkStatic(Try<std::string>&& t) {
111       return makeFuture(t.value() + ";class-static");
112     }
113   } w;
114
115   auto f = makeFuture(std::string("start"))
116     .then(doWorkStatic)
117     .then(Worker::doWorkStatic)
118     .then(&Worker::doWork, &w)
119     ;
120
121   EXPECT_EQ(f.value(), "start;static;class-static;class");
122 }
123
124 TEST_F(ViaFixture, threadHops) {
125   auto westThreadId = std::this_thread::get_id();
126   auto f = via(eastExecutor.get())
127                .then([=](Try<Unit>&& /* t */) {
128                  EXPECT_NE(std::this_thread::get_id(), westThreadId);
129                  return makeFuture<int>(1);
130                })
131                .via(westExecutor.get())
132                .then([=](Try<int>&& t) {
133                  EXPECT_EQ(std::this_thread::get_id(), westThreadId);
134                  return t.value();
135                });
136   EXPECT_EQ(f.getVia(waiter.get()), 1);
137 }
138
139 TEST_F(ViaFixture, chainVias) {
140   auto westThreadId = std::this_thread::get_id();
141   auto f = via(eastExecutor.get()).then([=]() {
142     EXPECT_NE(std::this_thread::get_id(), westThreadId);
143     return 1;
144   }).then([=](int val) {
145     return makeFuture(val).via(westExecutor.get())
146       .then([=](int v) mutable {
147         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
148         return v + 1;
149       });
150   }).then([=](int val) {
151     // even though ultimately the future that triggers this one executed in
152     // the west thread, this then() inherited the executor from its
153     // predecessor, ie the eastExecutor.
154     EXPECT_NE(std::this_thread::get_id(), westThreadId);
155     return val + 1;
156   }).via(westExecutor.get()).then([=](int val) {
157     // go back to west, so we can wait on it
158     EXPECT_EQ(std::this_thread::get_id(), westThreadId);
159     return val + 1;
160   });
161
162   EXPECT_EQ(f.getVia(waiter.get()), 4);
163 }
164
165 TEST_F(ViaFixture, bareViaAssignment) {
166   auto f = via(eastExecutor.get());
167 }
168 TEST_F(ViaFixture, viaAssignment) {
169   // via()&&
170   auto f = makeFuture().via(eastExecutor.get());
171   // via()&
172   auto f2 = f.via(eastExecutor.get());
173 }
174
175 TEST(Via, chain1) {
176   EXPECT_EQ(42,
177             makeFuture()
178             .thenMulti([] { return 42; })
179             .get());
180 }
181
182 TEST(Via, chain3) {
183   int count = 0;
184   auto f = makeFuture().thenMulti(
185       [&]{ count++; return 3.14159; },
186       [&](double) { count++; return std::string("hello"); },
187       [&]{ count++; return makeFuture(42); });
188   EXPECT_EQ(42, f.get());
189   EXPECT_EQ(3, count);
190 }
191
192 struct PriorityExecutor : public Executor {
193   void add(Func /* f */) override {}
194
195   void addWithPriority(Func f, int8_t priority) override {
196     int mid = getNumPriorities() / 2;
197     int p = priority < 0 ?
198             std::max(0, mid + priority) :
199             std::min(getNumPriorities() - 1, mid + priority);
200     EXPECT_LT(p, 3);
201     EXPECT_GE(p, 0);
202     if (p == 0) {
203       count0++;
204     } else if (p == 1) {
205       count1++;
206     } else if (p == 2) {
207       count2++;
208     }
209     f();
210   }
211
212   uint8_t getNumPriorities() const override {
213     return 3;
214   }
215
216   int count0{0};
217   int count1{0};
218   int count2{0};
219 };
220
221 TEST(Via, priority) {
222   PriorityExecutor exe;
223   via(&exe, -1).then([]{});
224   via(&exe, 0).then([]{});
225   via(&exe, 1).then([]{});
226   via(&exe, 42).then([]{});  // overflow should go to max priority
227   via(&exe, -42).then([]{}); // underflow should go to min priority
228   via(&exe).then([]{});      // default to mid priority
229   via(&exe, Executor::LO_PRI).then([]{});
230   via(&exe, Executor::HI_PRI).then([]{});
231   EXPECT_EQ(3, exe.count0);
232   EXPECT_EQ(2, exe.count1);
233   EXPECT_EQ(3, exe.count2);
234 }
235
236 TEST_F(ViaFixture, chainX1) {
237   EXPECT_EQ(42,
238             makeFuture()
239             .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
240             .get());
241 }
242
243 TEST_F(ViaFixture, chainX3) {
244   auto westThreadId = std::this_thread::get_id();
245   int count = 0;
246   auto f = via(westExecutor.get()).thenMultiWithExecutor(
247       eastExecutor.get(),
248       [&]{
249         EXPECT_NE(std::this_thread::get_id(), westThreadId);
250         count++; return 3.14159;
251       },
252       [&](double) { count++; return std::string("hello"); },
253       [&]{ count++; })
254     .then([&](){
255         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
256         return makeFuture(42);
257     });
258   EXPECT_EQ(42, f.getVia(waiter.get()));
259   EXPECT_EQ(3, count);
260 }
261
262 TEST(Via, then2) {
263   ManualExecutor x1, x2;
264   bool a = false, b = false, c = false;
265   via(&x1)
266     .then([&]{ a = true; })
267     .then(&x2, [&]{ b = true; })
268     .then([&]{ c = true; });
269
270   EXPECT_FALSE(a);
271   EXPECT_FALSE(b);
272
273   x1.run();
274   EXPECT_TRUE(a);
275   EXPECT_FALSE(b);
276   EXPECT_FALSE(c);
277
278   x2.run();
279   EXPECT_TRUE(b);
280   EXPECT_FALSE(c);
281
282   x1.run();
283   EXPECT_TRUE(c);
284 }
285
286 TEST(Via, then2Variadic) {
287   struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
288   Foo f;
289   ManualExecutor x;
290   makeFuture().then(&x, &Foo::foo, &f);
291   EXPECT_FALSE(f.a);
292   x.run();
293   EXPECT_TRUE(f.a);
294 }
295
296 #ifndef __APPLE__ // TODO #7372389
297 /// Simple executor that does work in another thread
298 class ThreadExecutor : public Executor {
299   folly::MPMCQueue<Func> funcs;
300   std::atomic<bool> done {false};
301   std::thread worker;
302   folly::Baton<> baton;
303
304   void work() {
305     baton.post();
306     Func fn;
307     while (!done) {
308       while (!funcs.isEmpty()) {
309         funcs.blockingRead(fn);
310         fn();
311       }
312     }
313   }
314
315  public:
316   explicit ThreadExecutor(size_t n = 1024)
317     : funcs(n) {
318     worker = std::thread(std::bind(&ThreadExecutor::work, this));
319   }
320
321   ~ThreadExecutor() override {
322     done = true;
323     funcs.write([]{});
324     worker.join();
325   }
326
327   void add(Func fn) override {
328     funcs.blockingWrite(std::move(fn));
329   }
330
331   void waitForStartup() {
332     baton.wait();
333   }
334 };
335
336 TEST(Via, viaThenGetWasRacy) {
337   ThreadExecutor x;
338   std::unique_ptr<int> val = folly::via(&x)
339     .then([] { return folly::make_unique<int>(42); })
340     .get();
341   ASSERT_TRUE(!!val);
342   EXPECT_EQ(42, *val);
343 }
344
345 TEST(Via, callbackRace) {
346   ThreadExecutor x;
347
348   auto fn = [&x]{
349     auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
350     std::vector<Future<Unit>> futures;
351
352     for (auto& p : *promises) {
353       futures.emplace_back(
354         p.getFuture()
355         .via(&x)
356         .then([](Try<Unit>&&){}));
357     }
358
359     x.waitForStartup();
360     x.add([promises]{
361       for (auto& p : *promises) {
362         p.setValue();
363       }
364     });
365
366     return collectAll(futures);
367   };
368
369   fn().wait();
370 }
371 #endif
372
373 class DummyDrivableExecutor : public DrivableExecutor {
374  public:
375   void add(Func /* f */) override {}
376   void drive() override { ran = true; }
377   bool ran{false};
378 };
379
380 TEST(Via, getVia) {
381   {
382     // non-void
383     ManualExecutor x;
384     auto f = via(&x).then([]{ return true; });
385     EXPECT_TRUE(f.getVia(&x));
386   }
387
388   {
389     // void
390     ManualExecutor x;
391     auto f = via(&x).then();
392     f.getVia(&x);
393   }
394
395   {
396     DummyDrivableExecutor x;
397     auto f = makeFuture(true);
398     EXPECT_TRUE(f.getVia(&x));
399     EXPECT_FALSE(x.ran);
400   }
401 }
402
403 TEST(Via, getTryVia) {
404   {
405     // non-void
406     ManualExecutor x;
407     auto f = via(&x).then([] { return 23; });
408     EXPECT_FALSE(f.isReady());
409     EXPECT_EQ(23, f.getTryVia(&x).value());
410   }
411
412   {
413     // void
414     ManualExecutor x;
415     auto f = via(&x).then();
416     EXPECT_FALSE(f.isReady());
417     auto t = f.getTryVia(&x);
418     EXPECT_TRUE(t.hasValue());
419   }
420
421   {
422     DummyDrivableExecutor x;
423     auto f = makeFuture(23);
424     EXPECT_EQ(23, f.getTryVia(&x).value());
425     EXPECT_FALSE(x.ran);
426   }
427 }
428
429 TEST(Via, waitVia) {
430   {
431     ManualExecutor x;
432     auto f = via(&x).then();
433     EXPECT_FALSE(f.isReady());
434     f.waitVia(&x);
435     EXPECT_TRUE(f.isReady());
436   }
437
438   {
439     // try rvalue as well
440     ManualExecutor x;
441     auto f = via(&x).then().waitVia(&x);
442     EXPECT_TRUE(f.isReady());
443   }
444
445   {
446     DummyDrivableExecutor x;
447     makeFuture(true).waitVia(&x);
448     EXPECT_FALSE(x.ran);
449   }
450 }
451
452 TEST(Via, viaRaces) {
453   ManualExecutor x;
454   Promise<Unit> p;
455   auto tid = std::this_thread::get_id();
456   bool done = false;
457
458   std::thread t1([&] {
459     p.getFuture()
460       .via(&x)
461       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
462       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
463       .then([&](Try<Unit>&&) { done = true; });
464   });
465
466   std::thread t2([&] {
467     p.setValue();
468   });
469
470   while (!done) x.run();
471   t1.join();
472   t2.join();
473 }
474
475 TEST(ViaFunc, liftsVoid) {
476   ManualExecutor x;
477   int count = 0;
478   Future<Unit> f = via(&x, [&]{ count++; });
479
480   EXPECT_EQ(0, count);
481   x.run();
482   EXPECT_EQ(1, count);
483 }
484
485 TEST(ViaFunc, value) {
486   ManualExecutor x;
487   EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
488 }
489
490 TEST(ViaFunc, exception) {
491   ManualExecutor x;
492   EXPECT_THROW(
493     via(&x, []() -> int { throw std::runtime_error("expected"); })
494       .getVia(&x),
495     std::runtime_error);
496 }
497
498 TEST(ViaFunc, future) {
499   ManualExecutor x;
500   EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
501             .getVia(&x));
502 }
503
504 TEST(ViaFunc, voidFuture) {
505   ManualExecutor x;
506   int count = 0;
507   via(&x, [&]{ count++; }).getVia(&x);
508   EXPECT_EQ(1, count);
509 }
510
511 TEST(ViaFunc, isSticky) {
512   ManualExecutor x;
513   int count = 0;
514
515   auto f = via(&x, [&]{ count++; });
516   x.run();
517
518   f.then([&]{ count++; });
519   EXPECT_EQ(1, count);
520   x.run();
521   EXPECT_EQ(2, count);
522 }
523
524 TEST(ViaFunc, moveOnly) {
525   ManualExecutor x;
526   auto intp = folly::make_unique<int>(42);
527
528   EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));
529 }