2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Baton.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/executors/DrivableExecutor.h>
22 #include <folly/futures/Future.h>
23 #include <folly/futures/InlineExecutor.h>
24 #include <folly/futures/ManualExecutor.h>
25 #include <folly/portability/GTest.h>
27 using namespace folly;
29 struct ManualWaiter : public DrivableExecutor {
30 explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
32 void add(Func f) override {
33 ex->add(std::move(f));
36 void drive() override {
41 std::shared_ptr<ManualExecutor> ex;
44 struct ViaFixture : public testing::Test {
46 westExecutor(new ManualExecutor),
47 eastExecutor(new ManualExecutor),
48 waiter(new ManualWaiter(westExecutor)),
52 ManualWaiter eastWaiter(eastExecutor);
59 ~ViaFixture() override {
61 eastExecutor->add([=]() { });
65 void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
66 eastExecutor->add([=]() {
71 std::shared_ptr<ManualExecutor> westExecutor;
72 std::shared_ptr<ManualExecutor> eastExecutor;
73 std::shared_ptr<ManualWaiter> waiter;
74 InlineExecutor inlineExecutor;
75 std::atomic<bool> done;
79 TEST(Via, exceptionOnLaunch) {
80 auto future = makeFuture<int>(std::runtime_error("E"));
81 EXPECT_THROW(future.value(), std::runtime_error);
84 TEST(Via, thenValue) {
85 auto future = makeFuture(std::move(1))
86 .then([](Try<int>&& t) {
87 return t.value() == 1;
91 EXPECT_TRUE(future.value());
94 TEST(Via, thenFuture) {
95 auto future = makeFuture(1)
96 .then([](Try<int>&& t) {
97 return makeFuture(t.value() == 1);
99 EXPECT_TRUE(future.value());
102 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
103 return makeFuture(t.value() + ";static");
106 TEST(Via, thenFunction) {
108 Future<std::string> doWork(Try<std::string>&& t) {
109 return makeFuture(t.value() + ";class");
111 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
112 return makeFuture(t.value() + ";class-static");
116 auto f = makeFuture(std::string("start"))
118 .then(Worker::doWorkStatic)
119 .then(&Worker::doWork, &w)
122 EXPECT_EQ(f.value(), "start;static;class-static;class");
125 TEST_F(ViaFixture, threadHops) {
126 auto westThreadId = std::this_thread::get_id();
127 auto f = via(eastExecutor.get())
128 .then([=](Try<Unit>&& /* t */) {
129 EXPECT_NE(std::this_thread::get_id(), westThreadId);
130 return makeFuture<int>(1);
132 .via(westExecutor.get())
133 .then([=](Try<int>&& t) {
134 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
137 EXPECT_EQ(f.getVia(waiter.get()), 1);
140 TEST_F(ViaFixture, chainVias) {
141 auto westThreadId = std::this_thread::get_id();
142 auto f = via(eastExecutor.get()).then([=]() {
143 EXPECT_NE(std::this_thread::get_id(), westThreadId);
145 }).then([=](int val) {
146 return makeFuture(val).via(westExecutor.get())
147 .then([=](int v) mutable {
148 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
151 }).then([=](int val) {
152 // even though ultimately the future that triggers this one executed in
153 // the west thread, this then() inherited the executor from its
154 // predecessor, ie the eastExecutor.
155 EXPECT_NE(std::this_thread::get_id(), westThreadId);
157 }).via(westExecutor.get()).then([=](int val) {
158 // go back to west, so we can wait on it
159 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
163 EXPECT_EQ(f.getVia(waiter.get()), 4);
166 TEST_F(ViaFixture, bareViaAssignment) {
167 auto f = via(eastExecutor.get());
169 TEST_F(ViaFixture, viaAssignment) {
171 auto f = makeFuture().via(eastExecutor.get());
173 auto f2 = f.via(eastExecutor.get());
179 .thenMulti([] { return 42; })
185 auto f = makeFuture().thenMulti(
186 [&]{ count++; return 3.14159; },
187 [&](double) { count++; return std::string("hello"); },
188 [&]{ count++; return makeFuture(42); });
189 EXPECT_EQ(42, f.get());
193 struct PriorityExecutor : public Executor {
194 void add(Func /* f */) override {}
196 void addWithPriority(Func f, int8_t priority) override {
197 int mid = getNumPriorities() / 2;
198 int p = priority < 0 ?
199 std::max(0, mid + priority) :
200 std::min(getNumPriorities() - 1, mid + priority);
213 uint8_t getNumPriorities() const override {
222 TEST(Via, priority) {
223 PriorityExecutor exe;
224 via(&exe, -1).then([]{});
225 via(&exe, 0).then([]{});
226 via(&exe, 1).then([]{});
227 via(&exe, 42).then([]{}); // overflow should go to max priority
228 via(&exe, -42).then([]{}); // underflow should go to min priority
229 via(&exe).then([]{}); // default to mid priority
230 via(&exe, Executor::LO_PRI).then([]{});
231 via(&exe, Executor::HI_PRI).then([]{});
232 EXPECT_EQ(3, exe.count0);
233 EXPECT_EQ(2, exe.count1);
234 EXPECT_EQ(3, exe.count2);
237 TEST_F(ViaFixture, chainX1) {
240 .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
244 TEST_F(ViaFixture, chainX3) {
245 auto westThreadId = std::this_thread::get_id();
247 auto f = via(westExecutor.get()).thenMultiWithExecutor(
250 EXPECT_NE(std::this_thread::get_id(), westThreadId);
251 count++; return 3.14159;
253 [&](double) { count++; return std::string("hello"); },
256 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
257 return makeFuture(42);
259 EXPECT_EQ(42, f.getVia(waiter.get()));
264 ManualExecutor x1, x2;
265 bool a = false, b = false, c = false;
267 .then([&]{ a = true; })
268 .then(&x2, [&]{ b = true; })
269 .then([&]{ c = true; });
287 TEST(Via, then2Variadic) {
288 struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
291 makeFuture().then(&x, &Foo::foo, &f);
297 #ifndef __APPLE__ // TODO #7372389
298 /// Simple executor that does work in another thread
299 class ThreadExecutor : public Executor {
300 folly::MPMCQueue<Func> funcs;
301 std::atomic<bool> done {false};
303 folly::Baton<> baton;
309 while (!funcs.isEmpty()) {
310 funcs.blockingRead(fn);
317 explicit ThreadExecutor(size_t n = 1024)
319 worker = std::thread(std::bind(&ThreadExecutor::work, this));
322 ~ThreadExecutor() override {
328 void add(Func fn) override {
329 funcs.blockingWrite(std::move(fn));
332 void waitForStartup() {
337 TEST(Via, viaThenGetWasRacy) {
339 std::unique_ptr<int> val =
340 folly::via(&x).then([] { return std::make_unique<int>(42); }).get();
345 TEST(Via, callbackRace) {
349 auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
350 std::vector<Future<Unit>> futures;
352 for (auto& p : *promises) {
353 futures.emplace_back(
356 .then([](Try<Unit>&&){}));
361 for (auto& p : *promises) {
366 return collectAll(futures);
373 class DummyDrivableExecutor : public DrivableExecutor {
375 void add(Func /* f */) override {}
376 void drive() override { ran = true; }
384 auto f = via(&x).then([]{ return true; });
385 EXPECT_TRUE(f.getVia(&x));
391 auto f = via(&x).then();
396 DummyDrivableExecutor x;
397 auto f = makeFuture(true);
398 EXPECT_TRUE(f.getVia(&x));
403 TEST(Via, getTryVia) {
407 auto f = via(&x).then([] { return 23; });
408 EXPECT_FALSE(f.isReady());
409 EXPECT_EQ(23, f.getTryVia(&x).value());
415 auto f = via(&x).then();
416 EXPECT_FALSE(f.isReady());
417 auto t = f.getTryVia(&x);
418 EXPECT_TRUE(t.hasValue());
422 DummyDrivableExecutor x;
423 auto f = makeFuture(23);
424 EXPECT_EQ(23, f.getTryVia(&x).value());
432 auto f = via(&x).then();
433 EXPECT_FALSE(f.isReady());
435 EXPECT_TRUE(f.isReady());
439 // try rvalue as well
441 auto f = via(&x).then().waitVia(&x);
442 EXPECT_TRUE(f.isReady());
446 DummyDrivableExecutor x;
447 makeFuture(true).waitVia(&x);
452 TEST(Via, viaRaces) {
455 auto tid = std::this_thread::get_id();
461 .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
462 .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
463 .then([&](Try<Unit>&&) { done = true; });
477 TEST(Via, viaDummyExecutorFutureSetValueFirst) {
478 // The callback object will get destroyed when passed to the executor.
480 // A promise will be captured by the callback lambda so we can observe that
481 // it will be destroyed.
482 Promise<Unit> captured_promise;
483 auto captured_promise_future = captured_promise.getFuture();
485 DummyDrivableExecutor x;
486 auto future = makeFuture().via(&x).then(
487 [c = std::move(captured_promise)] { return 42; });
489 EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
491 captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
494 TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
495 // The callback object will get destroyed when passed to the executor.
497 // A promise will be captured by the callback lambda so we can observe that
498 // it will be destroyed.
499 Promise<Unit> captured_promise;
500 auto captured_promise_future = captured_promise.getFuture();
502 DummyDrivableExecutor x;
503 Promise<Unit> trigger;
504 auto future = trigger.getFuture().via(&x).then(
505 [c = std::move(captured_promise)] { return 42; });
508 EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
510 captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
513 TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
514 // The callback object will get destroyed when the ManualExecutor runs out
517 // A promise will be captured by the callback lambda so we can observe that
518 // it will be destroyed.
519 Promise<Unit> captured_promise;
520 auto captured_promise_future = captured_promise.getFuture();
522 Optional<Future<int>> future;
525 future = makeFuture().via(&x).then(
526 [c = std::move(captured_promise)] { return 42; });
529 EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
531 captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
534 TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
535 // The callback object will get destroyed when the ManualExecutor runs out
538 // A promise will be captured by the callback lambda so we can observe that
539 // it will be destroyed.
540 Promise<Unit> captured_promise;
541 auto captured_promise_future = captured_promise.getFuture();
543 Optional<Future<int>> future;
546 Promise<Unit> trigger;
547 future = trigger.getFuture().via(&x).then(
548 [c = std::move(captured_promise)] { return 42; });
552 EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
554 captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
557 TEST(ViaFunc, liftsVoid) {
560 Future<Unit> f = via(&x, [&]{ count++; });
567 TEST(ViaFunc, value) {
569 EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
572 TEST(ViaFunc, exception) {
575 via(&x, []() -> int { throw std::runtime_error("expected"); })
580 TEST(ViaFunc, future) {
582 EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
586 TEST(ViaFunc, voidFuture) {
589 via(&x, [&]{ count++; }).getVia(&x);
593 TEST(ViaFunc, isSticky) {
597 auto f = via(&x, [&]{ count++; });
600 f.then([&]{ count++; });
606 TEST(ViaFunc, moveOnly) {
608 auto intp = std::make_unique<int>(42);
610 EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));