return ctx->p.getFuture();
}
-template <typename F>
-typename F::value_type
-waitWithSemaphore(F&& f) {
+template <typename T>
+Future<T>
+waitWithSemaphore(Future<T>&& f) {
LifoSem sem;
- Try<typename F::value_type> done;
- f.then([&](Try<typename F::value_type> &&t) {
- done = std::move(t);
+ auto done = f.then([&](Try<T> &&t) {
sem.post();
+ return std::move(t.value());
});
sem.wait();
- return std::move(done.value());
+ return done;
}
-inline void waitWithSemaphore(Future<void>&& f) {
+template<>
+inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
LifoSem sem;
- Try<void> done;
- f.then([&](Try<void> &&t) {
- done = std::move(t);
+ auto done = f.then([&](Try<void> &&t) {
sem.post();
+ t.value();
});
sem.wait();
- return done.value();
+ return done;
+}
+
+template <typename T, class Duration>
+Future<T>
+waitWithSemaphore(Future<T>&& f, Duration timeout) {
+ auto sem = std::make_shared<LifoSem>();
+ auto done = f.then([sem](Try<T> &&t) {
+ sem->post();
+ return std::move(t.value());
+ });
+ std::thread t([sem, timeout](){
+ std::this_thread::sleep_for(timeout);
+ sem->shutdown();
+ });
+ t.detach();
+ try {
+ sem->wait();
+ } catch (ShutdownSemError & ign) { }
+ return done;
}
+
+template <class Duration>
+Future<void>
+waitWithSemaphore(Future<void>&& f, Duration timeout) {
+ auto sem = std::make_shared<LifoSem>();
+ auto done = f.then([sem](Try<void> &&t) {
+ sem->post();
+ t.value();
+ });
+ std::thread t([sem, timeout](){
+ std::this_thread::sleep_for(timeout);
+ sem->shutdown();
+ });
+ t.detach();
+ try {
+ sem->wait();
+ } catch (ShutdownSemError & ign) { }
+ return done;
+}
+
}}
// I haven't included a Future<T&> specialization because I don't forsee us
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>>
whenN(InputIterator first, InputIterator last, size_t n);
-/** Wait for the given future to complete on a semaphore. Returns the result of
- * the given future.
+/** Wait for the given future to complete on a semaphore. Returns a completed
+ * future containing the result.
*
* NB if the promise for the future would be fulfilled in the same thread that
* you call this, it will deadlock.
*/
-template <class F>
-typename F::value_type
-waitWithSemaphore(F&& f);
+template <class T>
+Future<T> waitWithSemaphore(Future<T>&& f);
+
+/** Wait for up to `timeout` for the given future to complete. Returns a future
+ * which may or may not be completed depending whether the given future
+ * completed in time
+ *
+ * Note: each call to this starts a (short-lived) thread and allocates memory.
+ */
+template <typename T, class Duration>
+Future<T> waitWithSemaphore(Future<T>&& f, Duration timeout);
}} // folly::wangle
TEST(Future, waitWithSemaphoreImmediate) {
waitWithSemaphore(makeFuture());
- auto done = waitWithSemaphore(makeFuture(42));
+ auto done = waitWithSemaphore(makeFuture(42)).value();
EXPECT_EQ(42, done);
vector<int> v{1,2,3};
- auto done_v = waitWithSemaphore(makeFuture(v));
+ auto done_v = waitWithSemaphore(makeFuture(v)).value();
EXPECT_EQ(v.size(), done_v.size());
EXPECT_EQ(v, done_v);
vector<Future<void>> v_f;
v_f.push_back(makeFuture());
v_f.push_back(makeFuture());
- auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end()));
+ auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end())).value();
EXPECT_EQ(2, done_v_f.size());
vector<Future<bool>> v_fb;
v_fb.push_back(makeFuture(true));
v_fb.push_back(makeFuture(false));
auto fut = whenAll(v_fb.begin(), v_fb.end());
- auto done_v_fb = waitWithSemaphore(std::move(fut));
+ auto done_v_fb = std::move(waitWithSemaphore(std::move(fut)).value());
EXPECT_EQ(2, done_v_fb.size());
}
return t.value();
});
flag = true;
- result.store(waitWithSemaphore(std::move(n)));
+ result.store(waitWithSemaphore(std::move(n)).value());
LOG(INFO) << result;
},
std::move(f)
EXPECT_EQ(id, std::this_thread::get_id());
EXPECT_EQ(result.load(), 42);
}
+
+TEST(Future, waitWithSemaphoreForTime) {
+ {
+ Promise<int> p;
+ Future<int> f = p.getFuture();
+ auto t = waitWithSemaphore(std::move(f),
+ std::chrono::microseconds(1));
+ EXPECT_FALSE(t.isReady());
+ p.setValue(1);
+ EXPECT_TRUE(t.isReady());
+ }
+ {
+ Promise<int> p;
+ Future<int> f = p.getFuture();
+ p.setValue(1);
+ auto t = waitWithSemaphore(std::move(f),
+ std::chrono::milliseconds(1));
+ EXPECT_TRUE(t.isReady());
+ }
+ {
+ vector<Future<bool>> v_fb;
+ v_fb.push_back(makeFuture(true));
+ v_fb.push_back(makeFuture(false));
+ auto f = whenAll(v_fb.begin(), v_fb.end());
+ auto t = waitWithSemaphore(std::move(f),
+ std::chrono::milliseconds(1));
+ EXPECT_TRUE(t.isReady());
+ EXPECT_EQ(2, t.value().size());
+ }
+ {
+ vector<Future<bool>> v_fb;
+ Promise<bool> p1;
+ Promise<bool> p2;
+ v_fb.push_back(p1.getFuture());
+ v_fb.push_back(p2.getFuture());
+ auto f = whenAll(v_fb.begin(), v_fb.end());
+ auto t = waitWithSemaphore(std::move(f),
+ std::chrono::milliseconds(1));
+ EXPECT_FALSE(t.isReady());
+ p1.setValue(true);
+ EXPECT_FALSE(t.isReady());
+ p2.setValue(true);
+ EXPECT_TRUE(t.isReady());
+ }
+ {
+ Promise<int> p;
+ Future<int> f = p.getFuture();
+ auto begin = std::chrono::system_clock::now();
+ auto t = waitWithSemaphore(std::move(f),
+ std::chrono::milliseconds(1));
+ auto end = std::chrono::system_clock::now();
+ EXPECT_TRUE( end - begin < std::chrono::milliseconds(2));
+ EXPECT_FALSE(t.isReady());
+ }
+ {
+ auto t = waitWithSemaphore(makeFuture(),
+ std::chrono::milliseconds(1));
+ EXPECT_TRUE(t.isReady());
+ }
+}