collectOne
authorYunqiao Zhang <yunqiao@fb.com>
Fri, 26 Aug 2016 19:53:24 +0000 (12:53 -0700)
committerFacebook Github Bot 9 <facebook-github-bot-9-bot@fb.com>
Fri, 26 Aug 2016 20:08:27 +0000 (13:08 -0700)
Summary:
The resultant future of collectOne will be fulfilled when the first one of
the future in the list completes without exception. If all input futures throws
exception, the resultant future will get the last exception that was thrown.

Reviewed By: andriigrynenko

Differential Revision: D3764760

fbshipit-source-id: 76484254e35182eddc8266865853d65c28170f82

folly/futures/Future-inl.h
folly/futures/helpers.h
folly/futures/test/CollectTest.cpp

index 0d1710066efdfb0cebd582f33e5b2e63be883fae..1cd4035e676623f641e14e01e7d5b256ca020d29 100644 (file)
@@ -695,6 +695,37 @@ collectAny(InputIterator first, InputIterator last) {
   return ctx->p.getFuture();
 }
 
+// collectAnyWithoutException (iterator)
+
+template <class InputIterator>
+Future<std::pair<
+    size_t,
+    typename std::iterator_traits<InputIterator>::value_type::value_type>>
+collectAnyWithoutException(InputIterator first, InputIterator last) {
+  typedef
+      typename std::iterator_traits<InputIterator>::value_type::value_type T;
+
+  struct CollectAnyWithoutExceptionContext {
+    CollectAnyWithoutExceptionContext(){};
+    Promise<std::pair<size_t, T>> p;
+    std::atomic<bool> done{false};
+    std::atomic<size_t> nFulfilled{0};
+    size_t nTotal;
+  };
+
+  auto ctx = std::make_shared<CollectAnyWithoutExceptionContext>();
+  ctx->nTotal = std::distance(first, last);
+
+  mapSetCallback<T>(first, last, [ctx](size_t i, Try<T>&& t) {
+    if (!t.hasException() && !ctx->done.exchange(true)) {
+      ctx->p.setValue(std::make_pair(i, std::move(t.value())));
+    } else if (++ctx->nFulfilled == ctx->nTotal) {
+      ctx->p.setException(t.exception());
+    }
+  });
+  return ctx->p.getFuture();
+}
+
 // collectN (iterator)
 
 template <class InputIterator>
index 75f925374bd22521b2a5c8fc22e6113d823f1d48..7f074a06664a5a363db0cb64508d3e7b3ae2d4c3 100644 (file)
@@ -226,6 +226,23 @@ auto collectAny(Collection&& c) -> decltype(collectAny(c.begin(), c.end())) {
   return collectAny(c.begin(), c.end());
 }
 
+/** Similar to collectAny, collectAnyWithoutException return the first Future to
+ * complete without exceptions. If none of the future complete without
+ * excpetions, the last exception will be returned as a result.
+  */
+template <class InputIterator>
+Future<std::pair<
+    size_t,
+    typename std::iterator_traits<InputIterator>::value_type::value_type>>
+collectAnyWithoutException(InputIterator first, InputIterator last);
+
+/// Sugar for the most common case
+template <class Collection>
+auto collectAnyWithoutException(Collection&& c)
+    -> decltype(collectAnyWithoutException(c.begin(), c.end())) {
+  return collectAnyWithoutException(c.begin(), c.end());
+}
+
 /** when n Futures have completed, the Future completes with a vector of
   the index and Try of those n Futures (the indices refer to the original
   order, but the result vector will be in an arbitrary order)
index 5f05a0b6044933a1c6c59e906984aa69f5564286..673161aa5c8a2dcd09812f0524ee0c498ca575b2 100644 (file)
@@ -333,6 +333,73 @@ TEST(Collect, collectAny) {
   }
 }
 
+TEST(Collect, collectAnyWithoutException) {
+  {
+    std::vector<Promise<int>> promises(10);
+    std::vector<Future<int>> futures;
+
+    for (auto& p : promises) {
+      futures.push_back(p.getFuture());
+    }
+
+    auto onef = collectAnyWithoutException(futures);
+
+    /* futures were moved in, so these are invalid now */
+    EXPECT_FALSE(onef.isReady());
+
+    promises[7].setValue(42);
+    EXPECT_TRUE(onef.isReady());
+    auto& idx_fut = onef.value();
+    EXPECT_EQ(7, idx_fut.first);
+    EXPECT_EQ(42, idx_fut.second);
+  }
+
+  // some exception before ready
+  {
+    std::vector<Promise<int>> promises(10);
+    std::vector<Future<int>> futures;
+
+    for (auto& p : promises) {
+      futures.push_back(p.getFuture());
+    }
+
+    auto onef = collectAnyWithoutException(futures);
+
+    EXPECT_FALSE(onef.isReady());
+
+    promises[3].setException(eggs);
+    EXPECT_FALSE(onef.isReady());
+    promises[4].setException(eggs);
+    EXPECT_FALSE(onef.isReady());
+    promises[0].setValue(99);
+    EXPECT_TRUE(onef.isReady());
+    auto& idx_fut = onef.value();
+    EXPECT_EQ(0, idx_fut.first);
+    EXPECT_EQ(99, idx_fut.second);
+  }
+
+  // all exceptions
+  {
+    std::vector<Promise<int>> promises(10);
+    std::vector<Future<int>> futures;
+
+    for (auto& p : promises) {
+      futures.push_back(p.getFuture());
+    }
+
+    auto onef = collectAnyWithoutException(futures);
+
+    EXPECT_FALSE(onef.isReady());
+    for (int i = 0; i < 9; ++i) {
+      promises[i].setException(eggs);
+    }
+    EXPECT_FALSE(onef.isReady());
+
+    promises[9].setException(eggs);
+    EXPECT_TRUE(onef.isReady());
+    EXPECT_TRUE(onef.hasException());
+  }
+}
 
 TEST(Collect, alreadyCompleted) {
   {