Nuke Future<void> (folly/futures)
[folly.git] / folly / futures / test / ViaTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <gtest/gtest.h>
18
19 #include <folly/futures/Future.h>
20 #include <folly/futures/InlineExecutor.h>
21 #include <folly/futures/ManualExecutor.h>
22 #include <folly/futures/DrivableExecutor.h>
23 #include <folly/Baton.h>
24 #include <folly/MPMCQueue.h>
25
26 #include <thread>
27
28 using namespace folly;
29
30 struct ManualWaiter : public DrivableExecutor {
31   explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
32
33   void add(Func f) override {
34     ex->add(f);
35   }
36
37   void drive() override {
38     ex->wait();
39     ex->run();
40   }
41
42   std::shared_ptr<ManualExecutor> ex;
43 };
44
45 struct ViaFixture : public testing::Test {
46   ViaFixture() :
47     westExecutor(new ManualExecutor),
48     eastExecutor(new ManualExecutor),
49     waiter(new ManualWaiter(westExecutor)),
50     done(false)
51   {
52     t = std::thread([=] {
53         ManualWaiter eastWaiter(eastExecutor);
54         while (!done)
55           eastWaiter.drive();
56       });
57   }
58
59   ~ViaFixture() override {
60     done = true;
61     eastExecutor->add([=]() { });
62     t.join();
63   }
64
65   void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
66     eastExecutor->add([=]() {
67       cob(a + b);
68     });
69   }
70
71   std::shared_ptr<ManualExecutor> westExecutor;
72   std::shared_ptr<ManualExecutor> eastExecutor;
73   std::shared_ptr<ManualWaiter> waiter;
74   InlineExecutor inlineExecutor;
75   std::atomic<bool> done;
76   std::thread t;
77 };
78
79 TEST(Via, exceptionOnLaunch) {
80   auto future = makeFuture<int>(std::runtime_error("E"));
81   EXPECT_THROW(future.value(), std::runtime_error);
82 }
83
84 TEST(Via, thenValue) {
85   auto future = makeFuture(std::move(1))
86     .then([](Try<int>&& t) {
87       return t.value() == 1;
88     })
89     ;
90
91   EXPECT_TRUE(future.value());
92 }
93
94 TEST(Via, thenFuture) {
95   auto future = makeFuture(1)
96     .then([](Try<int>&& t) {
97       return makeFuture(t.value() == 1);
98     });
99   EXPECT_TRUE(future.value());
100 }
101
102 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
103   return makeFuture(t.value() + ";static");
104 }
105
106 TEST(Via, thenFunction) {
107   struct Worker {
108     Future<std::string> doWork(Try<std::string>&& t) {
109       return makeFuture(t.value() + ";class");
110     }
111     static Future<std::string> doWorkStatic(Try<std::string>&& t) {
112       return makeFuture(t.value() + ";class-static");
113     }
114   } w;
115
116   auto f = makeFuture(std::string("start"))
117     .then(doWorkStatic)
118     .then(Worker::doWorkStatic)
119     .then(&Worker::doWork, &w)
120     ;
121
122   EXPECT_EQ(f.value(), "start;static;class-static;class");
123 }
124
125 TEST_F(ViaFixture, threadHops) {
126   auto westThreadId = std::this_thread::get_id();
127   auto f = via(eastExecutor.get()).then([=](Try<Unit>&& t) {
128     EXPECT_NE(std::this_thread::get_id(), westThreadId);
129     return makeFuture<int>(1);
130   }).via(westExecutor.get()
131   ).then([=](Try<int>&& t) {
132     EXPECT_EQ(std::this_thread::get_id(), westThreadId);
133     return t.value();
134   });
135   EXPECT_EQ(f.getVia(waiter.get()), 1);
136 }
137
138 TEST_F(ViaFixture, chainVias) {
139   auto westThreadId = std::this_thread::get_id();
140   auto f = via(eastExecutor.get()).then([=]() {
141     EXPECT_NE(std::this_thread::get_id(), westThreadId);
142     return 1;
143   }).then([=](int val) {
144     return makeFuture(val).via(westExecutor.get())
145       .then([=](int val) mutable {
146         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
147         return val + 1;
148       });
149   }).then([=](int val) {
150     // even though ultimately the future that triggers this one executed in
151     // the west thread, this then() inherited the executor from its
152     // predecessor, ie the eastExecutor.
153     EXPECT_NE(std::this_thread::get_id(), westThreadId);
154     return val + 1;
155   }).via(westExecutor.get()).then([=](int val) {
156     // go back to west, so we can wait on it
157     EXPECT_EQ(std::this_thread::get_id(), westThreadId);
158     return val + 1;
159   });
160
161   EXPECT_EQ(f.getVia(waiter.get()), 4);
162 }
163
164 TEST_F(ViaFixture, bareViaAssignment) {
165   auto f = via(eastExecutor.get());
166 }
167 TEST_F(ViaFixture, viaAssignment) {
168   // via()&&
169   auto f = makeFuture().via(eastExecutor.get());
170   // via()&
171   auto f2 = f.via(eastExecutor.get());
172 }
173
174 TEST(Via, chain1) {
175   EXPECT_EQ(42,
176             makeFuture()
177             .thenMulti([] { return 42; })
178             .get());
179 }
180
181 TEST(Via, chain3) {
182   int count = 0;
183   auto f = makeFuture().thenMulti(
184       [&]{ count++; return 3.14159; },
185       [&](double) { count++; return std::string("hello"); },
186       [&]{ count++; return makeFuture(42); });
187   EXPECT_EQ(42, f.get());
188   EXPECT_EQ(3, count);
189 }
190
191 struct PriorityExecutor : public Executor {
192   void add(Func f) override {}
193
194   void addWithPriority(Func f, int8_t priority) override {
195     int mid = getNumPriorities() / 2;
196     int p = priority < 0 ?
197             std::max(0, mid + priority) :
198             std::min(getNumPriorities() - 1, mid + priority);
199     EXPECT_LT(p, 3);
200     EXPECT_GE(p, 0);
201     if (p == 0) {
202       count0++;
203     } else if (p == 1) {
204       count1++;
205     } else if (p == 2) {
206       count2++;
207     }
208     f();
209   }
210
211   uint8_t getNumPriorities() const override {
212     return 3;
213   }
214
215   int count0{0};
216   int count1{0};
217   int count2{0};
218 };
219
220 TEST(Via, priority) {
221   PriorityExecutor exe;
222   via(&exe, -1).then([]{});
223   via(&exe, 0).then([]{});
224   via(&exe, 1).then([]{});
225   via(&exe, 42).then([]{});  // overflow should go to max priority
226   via(&exe, -42).then([]{}); // underflow should go to min priority
227   via(&exe).then([]{});      // default to mid priority
228   via(&exe, Executor::LO_PRI).then([]{});
229   via(&exe, Executor::HI_PRI).then([]{});
230   EXPECT_EQ(3, exe.count0);
231   EXPECT_EQ(2, exe.count1);
232   EXPECT_EQ(3, exe.count2);
233 }
234
235 TEST_F(ViaFixture, chainX1) {
236   EXPECT_EQ(42,
237             makeFuture()
238             .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
239             .get());
240 }
241
242 TEST_F(ViaFixture, chainX3) {
243   auto westThreadId = std::this_thread::get_id();
244   int count = 0;
245   auto f = via(westExecutor.get()).thenMultiWithExecutor(
246       eastExecutor.get(),
247       [&]{
248         EXPECT_NE(std::this_thread::get_id(), westThreadId);
249         count++; return 3.14159;
250       },
251       [&](double) { count++; return std::string("hello"); },
252       [&]{ count++; })
253     .then([&](){
254         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
255         return makeFuture(42);
256     });
257   EXPECT_EQ(42, f.getVia(waiter.get()));
258   EXPECT_EQ(3, count);
259 }
260
261 TEST(Via, then2) {
262   ManualExecutor x1, x2;
263   bool a = false, b = false, c = false;
264   via(&x1)
265     .then([&]{ a = true; })
266     .then(&x2, [&]{ b = true; })
267     .then([&]{ c = true; });
268
269   EXPECT_FALSE(a);
270   EXPECT_FALSE(b);
271
272   x1.run();
273   EXPECT_TRUE(a);
274   EXPECT_FALSE(b);
275   EXPECT_FALSE(c);
276
277   x2.run();
278   EXPECT_TRUE(b);
279   EXPECT_FALSE(c);
280
281   x1.run();
282   EXPECT_TRUE(c);
283 }
284
285 TEST(Via, then2Variadic) {
286   struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
287   Foo f;
288   ManualExecutor x;
289   makeFuture().then(&x, &Foo::foo, &f);
290   EXPECT_FALSE(f.a);
291   x.run();
292   EXPECT_TRUE(f.a);
293 }
294
295 #ifndef __APPLE__ // TODO #7372389
296 /// Simple executor that does work in another thread
297 class ThreadExecutor : public Executor {
298   folly::MPMCQueue<Func> funcs;
299   std::atomic<bool> done {false};
300   std::thread worker;
301   folly::Baton<> baton;
302
303   void work() {
304     baton.post();
305     Func fn;
306     while (!done) {
307       while (!funcs.isEmpty()) {
308         funcs.blockingRead(fn);
309         fn();
310       }
311     }
312   }
313
314  public:
315   explicit ThreadExecutor(size_t n = 1024)
316     : funcs(n) {
317     worker = std::thread(std::bind(&ThreadExecutor::work, this));
318   }
319
320   ~ThreadExecutor() override {
321     done = true;
322     funcs.write([]{});
323     worker.join();
324   }
325
326   void add(Func fn) override {
327     funcs.blockingWrite(std::move(fn));
328   }
329
330   void waitForStartup() {
331     baton.wait();
332   }
333 };
334
335 TEST(Via, viaThenGetWasRacy) {
336   ThreadExecutor x;
337   std::unique_ptr<int> val = folly::via(&x)
338     .then([] { return folly::make_unique<int>(42); })
339     .get();
340   ASSERT_TRUE(!!val);
341   EXPECT_EQ(42, *val);
342 }
343
344 TEST(Via, callbackRace) {
345   ThreadExecutor x;
346
347   auto fn = [&x]{
348     auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
349     std::vector<Future<Unit>> futures;
350
351     for (auto& p : *promises) {
352       futures.emplace_back(
353         p.getFuture()
354         .via(&x)
355         .then([](Try<Unit>&&){}));
356     }
357
358     x.waitForStartup();
359     x.add([promises]{
360       for (auto& p : *promises) {
361         p.setValue();
362       }
363     });
364
365     return collectAll(futures);
366   };
367
368   fn().wait();
369 }
370 #endif
371
372 class DummyDrivableExecutor : public DrivableExecutor {
373  public:
374   void add(Func f) override {}
375   void drive() override { ran = true; }
376   bool ran{false};
377 };
378
379 TEST(Via, getVia) {
380   {
381     // non-void
382     ManualExecutor x;
383     auto f = via(&x).then([]{ return true; });
384     EXPECT_TRUE(f.getVia(&x));
385   }
386
387   {
388     // void
389     ManualExecutor x;
390     auto f = via(&x).then();
391     f.getVia(&x);
392   }
393
394   {
395     DummyDrivableExecutor x;
396     auto f = makeFuture(true);
397     EXPECT_TRUE(f.getVia(&x));
398     EXPECT_FALSE(x.ran);
399   }
400 }
401
402 TEST(Via, waitVia) {
403   {
404     ManualExecutor x;
405     auto f = via(&x).then();
406     EXPECT_FALSE(f.isReady());
407     f.waitVia(&x);
408     EXPECT_TRUE(f.isReady());
409   }
410
411   {
412     // try rvalue as well
413     ManualExecutor x;
414     auto f = via(&x).then().waitVia(&x);
415     EXPECT_TRUE(f.isReady());
416   }
417
418   {
419     DummyDrivableExecutor x;
420     makeFuture(true).waitVia(&x);
421     EXPECT_FALSE(x.ran);
422   }
423 }
424
425 TEST(Via, viaRaces) {
426   ManualExecutor x;
427   Promise<Unit> p;
428   auto tid = std::this_thread::get_id();
429   bool done = false;
430
431   std::thread t1([&] {
432     p.getFuture()
433       .via(&x)
434       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
435       .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
436       .then([&](Try<Unit>&&) { done = true; });
437   });
438
439   std::thread t2([&] {
440     p.setValue();
441   });
442
443   while (!done) x.run();
444   t1.join();
445   t2.join();
446 }
447
448 TEST(ViaFunc, liftsVoid) {
449   ManualExecutor x;
450   int count = 0;
451   Future<Unit> f = via(&x, [&]{ count++; });
452
453   EXPECT_EQ(0, count);
454   x.run();
455   EXPECT_EQ(1, count);
456 }
457
458 TEST(ViaFunc, value) {
459   ManualExecutor x;
460   EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
461 }
462
463 TEST(ViaFunc, exception) {
464   ManualExecutor x;
465   EXPECT_THROW(
466     via(&x, []() -> int { throw std::runtime_error("expected"); })
467       .getVia(&x),
468     std::runtime_error);
469 }
470
471 TEST(ViaFunc, future) {
472   ManualExecutor x;
473   EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
474             .getVia(&x));
475 }
476
477 TEST(ViaFunc, voidFuture) {
478   ManualExecutor x;
479   int count = 0;
480   via(&x, [&]{ count++; }).getVia(&x);
481   EXPECT_EQ(1, count);
482 }
483
484 TEST(ViaFunc, isSticky) {
485   ManualExecutor x;
486   int count = 0;
487
488   auto f = via(&x, [&]{ count++; });
489   x.run();
490
491   f.then([&]{ count++; });
492   EXPECT_EQ(1, count);
493   x.run();
494   EXPECT_EQ(2, count);
495 }