X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Ffutures%2Ftest%2FViaTest.cpp;h=9acac1bc86edb62be1af06aa231f11ee01d08bd3;hb=5180b66230ee290a22bdba66f81e9f33e0216dd9;hp=5cb66827879122038f8b007a75c5eb35de37fe2d;hpb=e1c576b407c95030c8d1a063bebd27dd6dc89980;p=folly.git diff --git a/folly/futures/test/ViaTest.cpp b/folly/futures/test/ViaTest.cpp index 5cb66827..9acac1bc 100644 --- a/folly/futures/test/ViaTest.cpp +++ b/folly/futures/test/ViaTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2014 Facebook, Inc. + * Copyright 2015 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,19 +15,22 @@ */ #include -#include #include #include #include #include +#include +#include + +#include using namespace folly; struct ManualWaiter : public DrivableExecutor { explicit ManualWaiter(std::shared_ptr ex) : ex(ex) {} - void add(Func f) { + void add(Func f) override { ex->add(f); } @@ -47,9 +50,9 @@ struct ViaFixture : public testing::Test { done(false) { t = std::thread([=] { - ManualWaiter eastWaiter(eastExecutor); - while (!done) - eastWaiter.drive(); + ManualWaiter eastWaiter(eastExecutor); + while (!done) + eastWaiter.drive(); }); } @@ -73,12 +76,12 @@ struct ViaFixture : public testing::Test { std::thread t; }; -TEST(Via, exception_on_launch) { +TEST(Via, exceptionOnLaunch) { auto future = makeFuture(std::runtime_error("E")); EXPECT_THROW(future.value(), std::runtime_error); } -TEST(Via, then_value) { +TEST(Via, thenValue) { auto future = makeFuture(std::move(1)) .then([](Try&& t) { return t.value() == 1; @@ -88,12 +91,11 @@ TEST(Via, then_value) { EXPECT_TRUE(future.value()); } -TEST(Via, then_future) { +TEST(Via, thenFuture) { auto future = makeFuture(1) .then([](Try&& t) { return makeFuture(t.value() == 1); - }) - ; + }); EXPECT_TRUE(future.value()); } @@ -101,7 +103,7 @@ static Future doWorkStatic(Try&& t) { return makeFuture(t.value() + ";static"); } -TEST(Via, then_function) { +TEST(Via, thenFunction) { struct Worker { Future doWork(Try&& t) { return makeFuture(t.value() + ";class"); @@ -114,34 +116,13 @@ TEST(Via, then_function) { auto f = makeFuture(std::string("start")) .then(doWorkStatic) .then(Worker::doWorkStatic) - .then(&w, &Worker::doWork) + .then(&Worker::doWork, &w) ; EXPECT_EQ(f.value(), "start;static;class-static;class"); } -TEST_F(ViaFixture, deactivateChain) { - bool flag = false; - auto f = makeFuture().deactivate(); - EXPECT_FALSE(f.isActive()); - auto f2 = f.then([&](Try){ flag = true; }); - EXPECT_FALSE(flag); -} - -TEST_F(ViaFixture, deactivateActivateChain) { - bool flag = false; - // you can do this all day long with temporaries. - auto f1 = makeFuture().deactivate().activate().deactivate(); - // Chaining on activate/deactivate requires an rvalue, so you have to move - // one of these two ways (if you're not using a temporary). - auto f2 = std::move(f1).activate(); - f2.deactivate(); - auto f3 = std::move(f2.activate()); - f3.then([&](Try){ flag = true; }); - EXPECT_TRUE(flag); -} - -TEST_F(ViaFixture, thread_hops) { +TEST_F(ViaFixture, threadHops) { auto westThreadId = std::this_thread::get_id(); auto f = via(eastExecutor.get()).then([=](Try&& t) { EXPECT_NE(std::this_thread::get_id(), westThreadId); @@ -154,24 +135,30 @@ TEST_F(ViaFixture, thread_hops) { EXPECT_EQ(f.getVia(waiter.get()), 1); } -TEST_F(ViaFixture, chain_vias) { +TEST_F(ViaFixture, chainVias) { auto westThreadId = std::this_thread::get_id(); - auto f = via(eastExecutor.get()).then([=](Try&& t) { + auto f = via(eastExecutor.get()).then([=]() { EXPECT_NE(std::this_thread::get_id(), westThreadId); - return makeFuture(1); - }).then([=](Try&& t) { - int val = t.value(); - return makeFuture(std::move(val)).via(westExecutor.get()) - .then([=](Try&& t) mutable { + return 1; + }).then([=](int val) { + return makeFuture(val).via(westExecutor.get()) + .then([=](int val) mutable { EXPECT_EQ(std::this_thread::get_id(), westThreadId); - return t.value(); + return val + 1; }); - }).then([=](Try&& t) { + }).then([=](int val) { + // even though ultimately the future that triggers this one executed in + // the west thread, this then() inherited the executor from its + // predecessor, ie the eastExecutor. + EXPECT_NE(std::this_thread::get_id(), westThreadId); + return val + 1; + }).via(westExecutor.get()).then([=](int val) { + // go back to west, so we can wait on it EXPECT_EQ(std::this_thread::get_id(), westThreadId); - return t.value(); + return val + 1; }); - EXPECT_EQ(f.getVia(waiter.get()), 1); + EXPECT_EQ(f.getVia(waiter.get()), 4); } TEST_F(ViaFixture, bareViaAssignment) { @@ -187,16 +174,243 @@ TEST_F(ViaFixture, viaAssignment) { TEST(Via, chain1) { EXPECT_EQ(42, makeFuture() - .then(futures::chain([] { return 42; })) + .thenMulti([] { return 42; }) .get()); } TEST(Via, chain3) { int count = 0; - auto f = makeFuture().then(futures::chain( + auto f = makeFuture().thenMulti( [&]{ count++; return 3.14159; }, [&](double) { count++; return std::string("hello"); }, - [&]{ count++; return makeFuture(42); })); + [&]{ count++; return makeFuture(42); }); EXPECT_EQ(42, f.get()); EXPECT_EQ(3, count); } + +struct PriorityExecutor : public Executor { + void add(Func f) override {} + + void addWithPriority(Func, int8_t priority) override { + int mid = getNumPriorities() / 2; + int p = priority < 0 ? + std::max(0, mid + priority) : + std::min(getNumPriorities() - 1, mid + priority); + EXPECT_LT(p, 3); + EXPECT_GE(p, 0); + if (p == 0) { + count0++; + } else if (p == 1) { + count1++; + } else if (p == 2) { + count2++; + } + } + + uint8_t getNumPriorities() const override { + return 3; + } + + int count0{0}; + int count1{0}; + int count2{0}; +}; + +TEST(Via, priority) { + PriorityExecutor exe; + via(&exe, -1).then([]{}); + via(&exe, 0).then([]{}); + via(&exe, 1).then([]{}); + via(&exe, 42).then([]{}); // overflow should go to max priority + via(&exe, -42).then([]{}); // underflow should go to min priority + via(&exe).then([]{}); // default to mid priority + via(&exe, Executor::LO_PRI).then([]{}); + via(&exe, Executor::HI_PRI).then([]{}); + EXPECT_EQ(3, exe.count0); + EXPECT_EQ(2, exe.count1); + EXPECT_EQ(3, exe.count2); +} + +TEST_F(ViaFixture, chainX1) { + EXPECT_EQ(42, + makeFuture() + .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; }) + .get()); +} + +TEST_F(ViaFixture, chainX3) { + auto westThreadId = std::this_thread::get_id(); + int count = 0; + auto f = via(westExecutor.get()).thenMultiWithExecutor( + eastExecutor.get(), + [&]{ + EXPECT_NE(std::this_thread::get_id(), westThreadId); + count++; return 3.14159; + }, + [&](double) { count++; return std::string("hello"); }, + [&]{ count++; }) + .then([&](){ + EXPECT_EQ(std::this_thread::get_id(), westThreadId); + return makeFuture(42); + }); + EXPECT_EQ(42, f.getVia(waiter.get())); + EXPECT_EQ(3, count); +} + +TEST(Via, then2) { + ManualExecutor x1, x2; + bool a = false, b = false, c = false; + via(&x1) + .then([&]{ a = true; }) + .then(&x2, [&]{ b = true; }) + .then([&]{ c = true; }); + + EXPECT_FALSE(a); + EXPECT_FALSE(b); + + x1.run(); + EXPECT_TRUE(a); + EXPECT_FALSE(b); + EXPECT_FALSE(c); + + x2.run(); + EXPECT_TRUE(b); + EXPECT_FALSE(c); + + x1.run(); + EXPECT_TRUE(c); +} + +TEST(Via, then2Variadic) { + struct Foo { bool a = false; void foo(Try) { a = true; } }; + Foo f; + ManualExecutor x; + makeFuture().then(&x, &Foo::foo, &f); + EXPECT_FALSE(f.a); + x.run(); + EXPECT_TRUE(f.a); +} + +/// Simple executor that does work in another thread +class ThreadExecutor : public Executor { + folly::MPMCQueue funcs; + std::atomic done {false}; + std::thread worker; + folly::Baton<> baton; + + void work() { + baton.post(); + Func fn; + while (!done) { + while (!funcs.isEmpty()) { + funcs.blockingRead(fn); + fn(); + } + } + } + + public: + explicit ThreadExecutor(size_t n = 1024) + : funcs(n) { + worker = std::thread(std::bind(&ThreadExecutor::work, this)); + } + + ~ThreadExecutor() { + done = true; + funcs.write([]{}); + worker.join(); + } + + void add(Func fn) override { + funcs.blockingWrite(std::move(fn)); + } + + void waitForStartup() { + baton.wait(); + } +}; + +TEST(Via, viaThenGetWasRacy) { + ThreadExecutor x; + std::unique_ptr val = folly::via(&x) + .then([] { return folly::make_unique(42); }) + .get(); + ASSERT_TRUE(!!val); + EXPECT_EQ(42, *val); +} + +class DummyDrivableExecutor : public DrivableExecutor { + public: + void add(Func f) override {} + void drive() override { ran = true; } + bool ran{false}; +}; + +TEST(Via, getVia) { + { + // non-void + ManualExecutor x; + auto f = via(&x).then([]{ return true; }); + EXPECT_TRUE(f.getVia(&x)); + } + + { + // void + ManualExecutor x; + auto f = via(&x).then(); + f.getVia(&x); + } + + { + DummyDrivableExecutor x; + auto f = makeFuture(true); + EXPECT_TRUE(f.getVia(&x)); + EXPECT_FALSE(x.ran); + } +} + +TEST(Via, waitVia) { + { + ManualExecutor x; + auto f = via(&x).then(); + EXPECT_FALSE(f.isReady()); + f.waitVia(&x); + EXPECT_TRUE(f.isReady()); + } + + { + // try rvalue as well + ManualExecutor x; + auto f = via(&x).then().waitVia(&x); + EXPECT_TRUE(f.isReady()); + } + + { + DummyDrivableExecutor x; + makeFuture(true).waitVia(&x); + EXPECT_FALSE(x.ran); + } +} + +TEST(Via, viaRaces) { + ManualExecutor x; + Promise p; + auto tid = std::this_thread::get_id(); + bool done = false; + + std::thread t1([&] { + p.getFuture() + .via(&x) + .then([&](Try&&) { EXPECT_EQ(tid, std::this_thread::get_id()); }) + .then([&](Try&&) { EXPECT_EQ(tid, std::this_thread::get_id()); }) + .then([&](Try&&) { done = true; }); + }); + + std::thread t2([&] { + p.setValue(); + }); + + while (!done) x.run(); + t1.join(); + t2.join(); +}