Timed wait for futures
authorMatt Dordal <mnd@fb.com>
Tue, 3 Jun 2014 17:57:38 +0000 (10:57 -0700)
committerAnton Likhtarov <alikhtarov@fb.com>
Mon, 9 Jun 2014 22:35:49 +0000 (15:35 -0700)
Summary:
It might be useful to be able to wait for some time (but not forever) on a
future, so this is a shot at doing that. It's a very heavyweight implementation, however.

Since the current interface for waitWithSemaphore doesn't really make sense if
the timeout fires, change it to return a Future<T>.

Test Plan: unit tests

Reviewed By: hans@fb.com

Subscribers: trunkagent, folly@lists, fugalh

FB internal diff: D1358230

folly/wangle/Future-inl.h
folly/wangle/Future.h
folly/wangle/test/FutureTest.cpp

index df5831db8905d77e67aa3174b110ec8379f6602b..933cbe6eae1c6876f6de0c531aa2ce45667fa375 100644 (file)
@@ -419,29 +419,67 @@ whenN(InputIterator first, InputIterator last, size_t n) {
   return ctx->p.getFuture();
 }
 
-template <typename F>
-typename F::value_type
-waitWithSemaphore(F&& f) {
+template <typename T>
+Future<T>
+waitWithSemaphore(Future<T>&& f) {
   LifoSem sem;
-  Try<typename F::value_type> done;
-  f.then([&](Try<typename F::value_type> &&t) {
-    done = std::move(t);
+  auto done = f.then([&](Try<T> &&t) {
     sem.post();
+    return std::move(t.value());
   });
   sem.wait();
-  return std::move(done.value());
+  return done;
 }
 
-inline void waitWithSemaphore(Future<void>&& f) {
+template<>
+inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
   LifoSem sem;
-  Try<void> done;
-  f.then([&](Try<void> &&t) {
-    done = std::move(t);
+  auto done = f.then([&](Try<void> &&t) {
     sem.post();
+    t.value();
   });
   sem.wait();
-  return done.value();
+  return done;
+}
+
+template <typename T, class Duration>
+Future<T>
+waitWithSemaphore(Future<T>&& f, Duration timeout) {
+  auto sem = std::make_shared<LifoSem>();
+  auto done = f.then([sem](Try<T> &&t) {
+    sem->post();
+    return std::move(t.value());
+  });
+  std::thread t([sem, timeout](){
+    std::this_thread::sleep_for(timeout);
+    sem->shutdown();
+    });
+  t.detach();
+  try {
+    sem->wait();
+  } catch (ShutdownSemError & ign) { }
+  return done;
 }
+
+template <class Duration>
+Future<void>
+waitWithSemaphore(Future<void>&& f, Duration timeout) {
+  auto sem = std::make_shared<LifoSem>();
+  auto done = f.then([sem](Try<void> &&t) {
+    sem->post();
+    t.value();
+  });
+  std::thread t([sem, timeout](){
+    std::this_thread::sleep_for(timeout);
+    sem->shutdown();
+    });
+  t.detach();
+  try {
+    sem->wait();
+  } catch (ShutdownSemError & ign) { }
+  return done;
+}
+
 }}
 
 // I haven't included a Future<T&> specialization because I don't forsee us
index e7d0f415832c647a7e99aad9069dc704dd14fb05..0c8662159e2b5b7b9ecb7d691bcc922a7edd1a33 100644 (file)
@@ -313,15 +313,23 @@ Future<std::vector<std::pair<
   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>>
 whenN(InputIterator first, InputIterator last, size_t n);
 
-/** Wait for the given future to complete on a semaphore. Returns the result of
- * the given future.
+/** Wait for the given future to complete on a semaphore. Returns a completed
+ * future containing the result.
  *
  * NB if the promise for the future would be fulfilled in the same thread that
  * you call this, it will deadlock.
  */
-template <class F>
-typename F::value_type
-waitWithSemaphore(F&& f);
+template <class T>
+Future<T> waitWithSemaphore(Future<T>&& f);
+
+/** Wait for up to `timeout` for the given future to complete. Returns a future
+ * which may or may not be completed depending whether the given future
+ * completed in time
+ *
+ * Note: each call to this starts a (short-lived) thread and allocates memory.
+ */
+template <typename T, class Duration>
+Future<T> waitWithSemaphore(Future<T>&& f, Duration timeout);
 
 }} // folly::wangle
 
index 84977c7437285ed9e4322d84392b478ee525545e..bc6f209b3a4844776aee943b228b05f964e8b8c5 100644 (file)
@@ -632,25 +632,25 @@ TEST(Future, throwIfFailed) {
 
 TEST(Future, waitWithSemaphoreImmediate) {
   waitWithSemaphore(makeFuture());
-  auto done = waitWithSemaphore(makeFuture(42));
+  auto done = waitWithSemaphore(makeFuture(42)).value();
   EXPECT_EQ(42, done);
 
   vector<int> v{1,2,3};
-  auto done_v = waitWithSemaphore(makeFuture(v));
+  auto done_v = waitWithSemaphore(makeFuture(v)).value();
   EXPECT_EQ(v.size(), done_v.size());
   EXPECT_EQ(v, done_v);
 
   vector<Future<void>> v_f;
   v_f.push_back(makeFuture());
   v_f.push_back(makeFuture());
-  auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end()));
+  auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end())).value();
   EXPECT_EQ(2, done_v_f.size());
 
   vector<Future<bool>> v_fb;
   v_fb.push_back(makeFuture(true));
   v_fb.push_back(makeFuture(false));
   auto fut = whenAll(v_fb.begin(), v_fb.end());
-  auto done_v_fb = waitWithSemaphore(std::move(fut));
+  auto done_v_fb = std::move(waitWithSemaphore(std::move(fut)).value());
   EXPECT_EQ(2, done_v_fb.size());
 }
 
@@ -667,7 +667,7 @@ TEST(Future, waitWithSemaphore) {
           return t.value();
         });
       flag = true;
-      result.store(waitWithSemaphore(std::move(n)));
+      result.store(waitWithSemaphore(std::move(n)).value());
       LOG(INFO) << result;
     },
     std::move(f)
@@ -681,3 +681,63 @@ TEST(Future, waitWithSemaphore) {
   EXPECT_EQ(id, std::this_thread::get_id());
   EXPECT_EQ(result.load(), 42);
 }
+
+TEST(Future, waitWithSemaphoreForTime) {
+ {
+  Promise<int> p;
+  Future<int> f = p.getFuture();
+  auto t = waitWithSemaphore(std::move(f),
+    std::chrono::microseconds(1));
+  EXPECT_FALSE(t.isReady());
+  p.setValue(1);
+  EXPECT_TRUE(t.isReady());
+ }
+ {
+  Promise<int> p;
+  Future<int> f = p.getFuture();
+  p.setValue(1);
+  auto t = waitWithSemaphore(std::move(f),
+    std::chrono::milliseconds(1));
+  EXPECT_TRUE(t.isReady());
+ }
+ {
+  vector<Future<bool>> v_fb;
+  v_fb.push_back(makeFuture(true));
+  v_fb.push_back(makeFuture(false));
+  auto f = whenAll(v_fb.begin(), v_fb.end());
+  auto t = waitWithSemaphore(std::move(f),
+    std::chrono::milliseconds(1));
+  EXPECT_TRUE(t.isReady());
+  EXPECT_EQ(2, t.value().size());
+ }
+ {
+  vector<Future<bool>> v_fb;
+  Promise<bool> p1;
+  Promise<bool> p2;
+  v_fb.push_back(p1.getFuture());
+  v_fb.push_back(p2.getFuture());
+  auto f = whenAll(v_fb.begin(), v_fb.end());
+  auto t = waitWithSemaphore(std::move(f),
+    std::chrono::milliseconds(1));
+  EXPECT_FALSE(t.isReady());
+  p1.setValue(true);
+  EXPECT_FALSE(t.isReady());
+  p2.setValue(true);
+  EXPECT_TRUE(t.isReady());
+ }
+ {
+  Promise<int> p;
+  Future<int> f = p.getFuture();
+  auto begin = std::chrono::system_clock::now();
+  auto t = waitWithSemaphore(std::move(f),
+    std::chrono::milliseconds(1));
+  auto end = std::chrono::system_clock::now();
+  EXPECT_TRUE( end - begin < std::chrono::milliseconds(2));
+  EXPECT_FALSE(t.isReady());
+ }
+ {
+  auto t = waitWithSemaphore(makeFuture(),
+    std::chrono::milliseconds(1));
+  EXPECT_TRUE(t.isReady());
+ }
+}