Make it easy to wrap pre-existing cob-style async apis
authorMarc Celani <marccelani@fb.com>
Mon, 3 Mar 2014 19:36:05 +0000 (11:36 -0800)
committerDave Watson <davejwatson@fb.com>
Mon, 10 Mar 2014 20:50:19 +0000 (13:50 -0700)
Summary:
Tao neesd a way to wrap its c-style async apis with Later. Although my comments suggest that you can do this, it turns out I never got around to implementing it. Well, here is the implementation.

Basically, we supply the callback to the pre-existing api, and that callback will fulfill a promise that is used internally within Later. This is thread safe because the async call is not made until the starter is fired, and we can use the future immediately for chaining then() calls.

Test Plan: unit test

Reviewed By: hannesr@fb.com

FB internal diff: D1197721

folly/wangle/Later-inl.h
folly/wangle/Later.h
folly/wangle/test/LaterTest.cpp

index e17f2d7eedd08e9a8d857d694ac4a6a8d521efc2..e20b6f6fc89e26db201fd17676379d5934a90df6 100644 (file)
@@ -68,6 +68,18 @@ Later<T>::Later(U&& input) {
   });
 }
 
+template <class T>
+template <class U, class Unused, class Unused2>
+Later<T>::Later(std::function<void(std::function<void(U&&)>&&)>&& fn) {
+  folly::MoveWrapper<Promise<U>> promise;
+  future_ = promise->getFuture();
+  starter_.getFuture().then([=](Try<void>&& t) mutable {
+    fn([=](U&& output) mutable {
+      promise->setValue(std::move(output));
+    });
+  });
+}
+
 template <class T>
 template <class F>
 typename std::enable_if<
index 49f719c12863b53a355d78f3f0e93b347c76b6be..0d2088cf93ecb279f2f6fd0dbacc68d6b8a79392 100644 (file)
@@ -32,7 +32,8 @@ template <typename T> struct isLater;
  * threadsafe manner.
  *
  * The interface to add additional work is the same as future: a then() method
- * that can take either a type T, a Future<T>, or a Later<T>
+ * that takes a function that can return either a type T, a Future<T>, or a
+ * Later<T>
  *
  * Thread transitions are done by using executors and calling the via() method.
  *
@@ -62,16 +63,44 @@ class Later {
  public:
   typedef T value_type;
 
+  /*
+   * This default constructor is used to build an asynchronous workflow that
+   * takes no input.
+   */
   template <class U = void,
             class = typename std::enable_if<std::is_void<U>::value>::type,
             class = typename std::enable_if<std::is_same<T, U>::value>::type>
   Later();
 
+  /*
+   * This constructor is used to build an asynchronous workflow that takes a
+   * value as input, and that value is passed in.
+   */
   template <class U,
             class = typename std::enable_if<!std::is_void<U>::value>::type,
             class = typename std::enable_if<std::is_same<T, U>::value>::type>
   explicit Later(U&& input);
 
+  /*
+   * This constructor is used to wrap a pre-existing cob-style asynchronous api
+   * so that it can be used in wangle in a threadsafe manner. wangle provides
+   * the callback to this pre-existing api, and this callback will fulfill a
+   * promise so as to incorporate this api into the workflow.
+   *
+   * Example usage:
+   *
+   * // This adds two ints asynchronously. cob is called in another thread.
+   * void addAsync(int a, int b, std::function<void(int&&)>&& cob);
+   *
+   * Later<int> asyncWrapper([=](std::function<void(int&&)>&& fn) {
+   *   addAsync(1, 2, std::move(fn));
+   * });
+   */
+  template <class U,
+            class = typename std::enable_if<!std::is_void<U>::value>::type,
+            class = typename std::enable_if<std::is_same<T, U>::value>::type>
+  explicit Later(std::function<void(std::function<void(U&&)>&&)>&& fn);
+
   /*
    * then() adds additional work to the end of the workflow. If the lambda
    * provided to then() returns a future, that future must be fulfilled in the
index 37d0a4bfd1cdbdcc40190d6613c3145b4dcbb767..a8243f5d095ed7d232b84c4f413165405925cd5f 100644 (file)
@@ -54,6 +54,12 @@ struct LaterFixture : public testing::Test {
     t.join();
   }
 
+  void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
+    eastExecutor->add([=]() {
+      cob(a + b);
+    });
+  }
+
   Later<void> later;
   std::shared_ptr<ManualExecutor> westExecutor;
   std::shared_ptr<ManualExecutor> eastExecutor;
@@ -112,6 +118,25 @@ TEST_F(LaterFixture, thread_hops) {
   EXPECT_EQ(future.value(), 1);
 }
 
+TEST_F(LaterFixture, wrapping_preexisting_async_modules) {
+  auto westThreadId = std::this_thread::get_id();
+  std::function<void(std::function<void(int&&)>&&)> wrapper =
+    [=](std::function<void(int&&)>&& fn) {
+      addAsync(2, 2, std::move(fn));
+    };
+  auto future = Later<int>(std::move(wrapper))
+  .via(westExecutor.get())
+  .then([=](Try<int>&& t) {
+    EXPECT_EQ(std::this_thread::get_id(), westThreadId);
+    return t.value();
+  })
+  .launch();
+  while (!future.isReady()) {
+    waiter->makeProgress();
+  }
+  EXPECT_EQ(future.value(), 4);
+}
+
 TEST_F(LaterFixture, chain_laters) {
   auto westThreadId = std::this_thread::get_id();
   auto future = later.via(eastExecutor.get()).then([=](Try<void>&& t) {