From a91af7b36690a227816b515f3f32e999dcb23e40 Mon Sep 17 00:00:00 2001 From: Matt Dordal Date: Tue, 3 Jun 2014 10:57:38 -0700 Subject: [PATCH] Timed wait for futures Summary: It might be useful to be able to wait for some time (but not forever) on a future, so this is a shot at doing that. It's a very heavyweight implementation, however. Since the current interface for waitWithSemaphore doesn't really make sense if the timeout fires, change it to return a Future. Test Plan: unit tests Reviewed By: hans@fb.com Subscribers: trunkagent, folly@lists, fugalh FB internal diff: D1358230 --- folly/wangle/Future-inl.h | 62 ++++++++++++++++++++++------ folly/wangle/Future.h | 18 +++++--- folly/wangle/test/FutureTest.cpp | 70 +++++++++++++++++++++++++++++--- 3 files changed, 128 insertions(+), 22 deletions(-) diff --git a/folly/wangle/Future-inl.h b/folly/wangle/Future-inl.h index df5831db..933cbe6e 100644 --- a/folly/wangle/Future-inl.h +++ b/folly/wangle/Future-inl.h @@ -419,29 +419,67 @@ whenN(InputIterator first, InputIterator last, size_t n) { return ctx->p.getFuture(); } -template -typename F::value_type -waitWithSemaphore(F&& f) { +template +Future +waitWithSemaphore(Future&& f) { LifoSem sem; - Try done; - f.then([&](Try &&t) { - done = std::move(t); + auto done = f.then([&](Try &&t) { sem.post(); + return std::move(t.value()); }); sem.wait(); - return std::move(done.value()); + return done; } -inline void waitWithSemaphore(Future&& f) { +template<> +inline Future waitWithSemaphore(Future&& f) { LifoSem sem; - Try done; - f.then([&](Try &&t) { - done = std::move(t); + auto done = f.then([&](Try &&t) { sem.post(); + t.value(); }); sem.wait(); - return done.value(); + return done; +} + +template +Future +waitWithSemaphore(Future&& f, Duration timeout) { + auto sem = std::make_shared(); + auto done = f.then([sem](Try &&t) { + sem->post(); + return std::move(t.value()); + }); + std::thread t([sem, timeout](){ + std::this_thread::sleep_for(timeout); + sem->shutdown(); + }); + t.detach(); + try { + sem->wait(); + } catch (ShutdownSemError & ign) { } + return done; } + +template +Future +waitWithSemaphore(Future&& f, Duration timeout) { + auto sem = std::make_shared(); + auto done = f.then([sem](Try &&t) { + sem->post(); + t.value(); + }); + std::thread t([sem, timeout](){ + std::this_thread::sleep_for(timeout); + sem->shutdown(); + }); + t.detach(); + try { + sem->wait(); + } catch (ShutdownSemError & ign) { } + return done; +} + }} // I haven't included a Future specialization because I don't forsee us diff --git a/folly/wangle/Future.h b/folly/wangle/Future.h index e7d0f415..0c866215 100644 --- a/folly/wangle/Future.h +++ b/folly/wangle/Future.h @@ -313,15 +313,23 @@ Future::value_type::value_type>>>> whenN(InputIterator first, InputIterator last, size_t n); -/** Wait for the given future to complete on a semaphore. Returns the result of - * the given future. +/** Wait for the given future to complete on a semaphore. Returns a completed + * future containing the result. * * NB if the promise for the future would be fulfilled in the same thread that * you call this, it will deadlock. */ -template -typename F::value_type -waitWithSemaphore(F&& f); +template +Future waitWithSemaphore(Future&& f); + +/** Wait for up to `timeout` for the given future to complete. Returns a future + * which may or may not be completed depending whether the given future + * completed in time + * + * Note: each call to this starts a (short-lived) thread and allocates memory. + */ +template +Future waitWithSemaphore(Future&& f, Duration timeout); }} // folly::wangle diff --git a/folly/wangle/test/FutureTest.cpp b/folly/wangle/test/FutureTest.cpp index 84977c74..bc6f209b 100644 --- a/folly/wangle/test/FutureTest.cpp +++ b/folly/wangle/test/FutureTest.cpp @@ -632,25 +632,25 @@ TEST(Future, throwIfFailed) { TEST(Future, waitWithSemaphoreImmediate) { waitWithSemaphore(makeFuture()); - auto done = waitWithSemaphore(makeFuture(42)); + auto done = waitWithSemaphore(makeFuture(42)).value(); EXPECT_EQ(42, done); vector v{1,2,3}; - auto done_v = waitWithSemaphore(makeFuture(v)); + auto done_v = waitWithSemaphore(makeFuture(v)).value(); EXPECT_EQ(v.size(), done_v.size()); EXPECT_EQ(v, done_v); vector> v_f; v_f.push_back(makeFuture()); v_f.push_back(makeFuture()); - auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end())); + auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end())).value(); EXPECT_EQ(2, done_v_f.size()); vector> v_fb; v_fb.push_back(makeFuture(true)); v_fb.push_back(makeFuture(false)); auto fut = whenAll(v_fb.begin(), v_fb.end()); - auto done_v_fb = waitWithSemaphore(std::move(fut)); + auto done_v_fb = std::move(waitWithSemaphore(std::move(fut)).value()); EXPECT_EQ(2, done_v_fb.size()); } @@ -667,7 +667,7 @@ TEST(Future, waitWithSemaphore) { return t.value(); }); flag = true; - result.store(waitWithSemaphore(std::move(n))); + result.store(waitWithSemaphore(std::move(n)).value()); LOG(INFO) << result; }, std::move(f) @@ -681,3 +681,63 @@ TEST(Future, waitWithSemaphore) { EXPECT_EQ(id, std::this_thread::get_id()); EXPECT_EQ(result.load(), 42); } + +TEST(Future, waitWithSemaphoreForTime) { + { + Promise p; + Future f = p.getFuture(); + auto t = waitWithSemaphore(std::move(f), + std::chrono::microseconds(1)); + EXPECT_FALSE(t.isReady()); + p.setValue(1); + EXPECT_TRUE(t.isReady()); + } + { + Promise p; + Future f = p.getFuture(); + p.setValue(1); + auto t = waitWithSemaphore(std::move(f), + std::chrono::milliseconds(1)); + EXPECT_TRUE(t.isReady()); + } + { + vector> v_fb; + v_fb.push_back(makeFuture(true)); + v_fb.push_back(makeFuture(false)); + auto f = whenAll(v_fb.begin(), v_fb.end()); + auto t = waitWithSemaphore(std::move(f), + std::chrono::milliseconds(1)); + EXPECT_TRUE(t.isReady()); + EXPECT_EQ(2, t.value().size()); + } + { + vector> v_fb; + Promise p1; + Promise p2; + v_fb.push_back(p1.getFuture()); + v_fb.push_back(p2.getFuture()); + auto f = whenAll(v_fb.begin(), v_fb.end()); + auto t = waitWithSemaphore(std::move(f), + std::chrono::milliseconds(1)); + EXPECT_FALSE(t.isReady()); + p1.setValue(true); + EXPECT_FALSE(t.isReady()); + p2.setValue(true); + EXPECT_TRUE(t.isReady()); + } + { + Promise p; + Future f = p.getFuture(); + auto begin = std::chrono::system_clock::now(); + auto t = waitWithSemaphore(std::move(f), + std::chrono::milliseconds(1)); + auto end = std::chrono::system_clock::now(); + EXPECT_TRUE( end - begin < std::chrono::milliseconds(2)); + EXPECT_FALSE(t.isReady()); + } + { + auto t = waitWithSemaphore(makeFuture(), + std::chrono::milliseconds(1)); + EXPECT_TRUE(t.isReady()); + } +} -- 2.34.1