SharedPromise
authorJames Sedgwick <jsedgwick@fb.com>
Tue, 12 May 2015 15:14:47 +0000 (08:14 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Wed, 20 May 2015 17:56:55 +0000 (10:56 -0700)
Summary: I tried two "smart" ways (deriving from Promise, encapsulating a Promise) and got nothing but trouble. The KISS principle is applied with gusto in this diff.

Test Plan: unit, integrating in 3+ places in separate diffs

Reviewed By: hans@fb.com

Subscribers: craffert, trunkagent, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2035528

Signature: t1:2035528:1431393438:4e554cd30fa531d75b9267dccaade6dc516f2b15

folly/Makefile.am
folly/futures/Promise-inl.h
folly/futures/Promise.h
folly/futures/SharedPromise-inl.h [new file with mode: 0644]
folly/futures/SharedPromise.h [new file with mode: 0644]
folly/futures/Try-inl.h
folly/futures/Try.h
folly/futures/test/SharedPromiseTest.cpp [new file with mode: 0644]
folly/futures/test/Try.cpp
folly/wangle/channel/OutputBufferingHandler.h

index ad215492296154bea5207e0f7fde886884ed0178..92d56b7399f6105a6e995490e9c999e79f0e4272 100644 (file)
@@ -133,6 +133,8 @@ nobase_follyinclude_HEADERS = \
        futures/Promise.h \
        futures/QueuedImmediateExecutor.h \
        futures/ScheduledExecutor.h \
+       futures/SharedPromise.h \
+       futures/SharedPromise-inl.h \
        futures/Timekeeper.h \
        futures/Try-inl.h \
        futures/Try.h \
index 4792b57dfe56044cd39c42aaa15d14a8a1af4330..b855d064e90a9b826b3459989d9029de9d38527c 100644 (file)
@@ -109,7 +109,7 @@ void Promise<T>::setInterruptHandler(
 }
 
 template <class T>
-void Promise<T>::setTry(Try<T> t) {
+void Promise<T>::setTry(Try<T>&& t) {
   throwIfFulfilled();
   core_->setResult(std::move(t));
 }
index 560f4dc7d418e087e303d2b96404ad55e90f7996..0ee979ea440f9705fbfead76837696611c0201de 100644 (file)
@@ -88,7 +88,7 @@ public:
   template <class M>
   void setValue(M&& value);
 
-  void setTry(Try<T> t);
+  void setTry(Try<T>&& t);
 
   /** Fulfill this Promise with the result of a function that takes no
     arguments and returns something implicitly convertible to T.
diff --git a/folly/futures/SharedPromise-inl.h b/folly/futures/SharedPromise-inl.h
new file mode 100644 (file)
index 0000000..2c09a2a
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace folly {
+
+template <class T>
+SharedPromise<T>::SharedPromise(SharedPromise<T>&& other) noexcept {
+  *this = std::move(other);
+}
+
+template <class T>
+SharedPromise<T>& SharedPromise<T>::operator=(
+    SharedPromise<T>&& other) noexcept {
+  if (this == &other) {
+    return *this;
+  }
+
+  // std::lock will perform deadlock avoidance, in case
+  // Thread A: p1 = std::move(p2)
+  // Thread B: p2 = std::move(p1)
+  // race each other
+  std::lock(mutex_, other.mutex_);
+  std::lock_guard<std::mutex> g1(mutex_, std::adopt_lock);
+  std::lock_guard<std::mutex> g2(other.mutex_, std::adopt_lock);
+
+  std::swap(size_, other.size_);
+  std::swap(hasValue_, other.hasValue_);
+  std::swap(try_, other.try_);
+  std::swap(promises_, other.promises_);
+
+  return *this;
+}
+
+template <class T>
+size_t SharedPromise<T>::size() {
+  std::lock_guard<std::mutex> g(mutex_);
+  return size_;
+}
+
+template <class T>
+Future<T> SharedPromise<T>::getFuture() {
+  std::lock_guard<std::mutex> g(mutex_);
+  size_++;
+  if (hasValue_) {
+    return makeFuture<T>(Try<T>(try_));
+  } else {
+    promises_.emplace_back();
+    return promises_.back().getFuture();
+  }
+}
+
+template <class T>
+template <class E>
+typename std::enable_if<std::is_base_of<std::exception, E>::value>::type
+SharedPromise<T>::setException(E const& e) {
+  setTry(Try<T>(e));
+}
+
+template <class T>
+void SharedPromise<T>::setException(std::exception_ptr const& ep) {
+  setTry(Try<T>(ep));
+}
+
+template <class T>
+void SharedPromise<T>::setException(exception_wrapper ew) {
+  setTry(Try<T>(std::move(ew)));
+}
+
+template <class T>
+void SharedPromise<T>::setInterruptHandler(
+    std::function<void(exception_wrapper const&)> fn) {
+  std::lock_guard<std::mutex> g(mutex_);
+  if (hasValue_) {
+    return;
+  }
+  for (auto& p : promises_) {
+    p.setInterruptHandler(fn);
+  }
+}
+
+template <class T>
+template <class M>
+void SharedPromise<T>::setValue(M&& v) {
+  setTry(Try<T>(std::forward<M>(v)));
+}
+
+template <class T>
+template <class F>
+void SharedPromise<T>::setWith(F&& func) {
+  setTry(makeTryFunction(std::forward<F>(func)));
+}
+
+template <class T>
+void SharedPromise<T>::setTry(Try<T>&& t) {
+  std::vector<Promise<T>> promises;
+
+  {
+    std::lock_guard<std::mutex> g(mutex_);
+    if (hasValue_) {
+      throw PromiseAlreadySatisfied();
+    }
+    hasValue_ = true;
+    try_ = std::move(t);
+    promises.swap(promises_);
+  }
+
+  for (auto& p : promises) {
+    p.setTry(Try<T>(try_));
+  }
+}
+
+}
diff --git a/folly/futures/SharedPromise.h b/folly/futures/SharedPromise.h
new file mode 100644 (file)
index 0000000..5041a38
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/futures/Promise.h>
+
+namespace folly {
+
+/*
+ * SharedPromise provides the same interface as Promise, but you can extract
+ * multiple Futures from it, i.e. you can call getFuture() as many times as
+ * you'd like. When the SharedPromise is fulfilled, all of the Futures will be
+ * called back. Calls to getFuture() after the SharedPromise is fulfilled return
+ * a completed Future. If you find yourself constructing collections of Promises
+ * and fulfilling them simultaneously with the same value, consider this
+ * utility instead. Likewise, if you find yourself in need of setting multiple
+ * callbacks on the same Future (which is indefinitely unsupported), consider
+ * refactoring to use SharedPromise to "split" the Future.
+ */
+template <class T>
+class SharedPromise {
+public:
+  SharedPromise() = default;
+  ~SharedPromise() = default;
+
+  // not copyable
+  SharedPromise(SharedPromise const&) = delete;
+  SharedPromise& operator=(SharedPromise const&) = delete;
+
+  // movable
+  SharedPromise(SharedPromise<T>&&) noexcept;
+  SharedPromise& operator=(SharedPromise<T>&&) noexcept;
+
+  /** Return a Future tied to the shared core state. This can be called only
+    once, thereafter Future already retrieved exception will be raised. */
+  Future<T> getFuture();
+
+  /** Return the number of Futures associated with this SharedPromise */
+  size_t size();
+
+  /** Fulfill the SharedPromise with an exception_wrapper */
+  void setException(exception_wrapper ew);
+
+  /** Fulfill the SharedPromise with an exception_ptr, e.g.
+    try {
+      ...
+    } catch (...) {
+      p.setException(std::current_exception());
+    }
+    */
+  void setException(std::exception_ptr const&) DEPRECATED;
+
+  /** Fulfill the SharedPromise with an exception type E, which can be passed to
+    std::make_exception_ptr(). Useful for originating exceptions. If you
+    caught an exception the exception_wrapper form is more appropriate.
+    */
+  template <class E>
+  typename std::enable_if<std::is_base_of<std::exception, E>::value>::type
+  setException(E const&);
+
+  /// Set an interrupt handler to handle interrupts. See the documentation for
+  /// Future::raise(). Your handler can do whatever it wants, but if you
+  /// bother to set one then you probably will want to fulfill the SharedPromise with
+  /// an exception (or special value) indicating how the interrupt was
+  /// handled.
+  void setInterruptHandler(std::function<void(exception_wrapper const&)>);
+
+  /// Fulfill this SharedPromise<void>
+  template <class B = T>
+  typename std::enable_if<std::is_void<B>::value, void>::type
+  setValue() {
+    set(Try<T>());
+  }
+
+  /// Sugar to fulfill this SharedPromise<Unit>
+  template <class B = T>
+  typename std::enable_if<std::is_same<Unit, B>::value, void>::type
+  setValue() {
+    set(Try<T>(T()));
+  }
+
+  /** Set the value (use perfect forwarding for both move and copy) */
+  template <class M>
+  void setValue(M&& value);
+
+  void setTry(Try<T>&& t);
+
+  /** Fulfill this SharedPromise with the result of a function that takes no
+    arguments and returns something implicitly convertible to T.
+    Captures exceptions. e.g.
+
+    p.setWith([] { do something that may throw; return a T; });
+  */
+  template <class F>
+  void setWith(F&& func);
+
+private:
+  std::mutex mutex_;
+  size_t size_{0};
+  bool hasValue_{false};
+  Try<T> try_;
+  std::vector<Promise<T>> promises_;
+};
+
+}
+
+#include <folly/futures/Future.h>
+#include <folly/futures/SharedPromise-inl.h>
index 13e403896c568cbd7c453e76441484ebfade1de5..192873116cf53856bada4973451ca9143fe13449 100644 (file)
@@ -23,7 +23,7 @@
 namespace folly {
 
 template <class T>
-Try<T>::Try(Try<T>&& t) : contains_(t.contains_) {
+Try<T>::Try(Try<T>&& t) noexcept : contains_(t.contains_) {
   if (contains_ == Contains::VALUE) {
     new (&value_)T(std::move(t.value_));
   } else if (contains_ == Contains::EXCEPTION) {
@@ -32,7 +32,11 @@ Try<T>::Try(Try<T>&& t) : contains_(t.contains_) {
 }
 
 template <class T>
-Try<T>& Try<T>::operator=(Try<T>&& t) {
+Try<T>& Try<T>::operator=(Try<T>&& t) noexcept {
+  if (this == &t) {
+    return *this;
+  }
+
   this->~Try();
   contains_ = t.contains_;
   if (contains_ == Contains::VALUE) {
@@ -43,6 +47,36 @@ Try<T>& Try<T>::operator=(Try<T>&& t) {
   return *this;
 }
 
+template <class T>
+Try<T>::Try(const Try<T>& t) {
+  static_assert(
+      std::is_copy_constructible<T>::value,
+      "T must be copyable for Try<T> to be copyable");
+  contains_ = t.contains_;
+  if (contains_ == Contains::VALUE) {
+    new (&value_)T(t.value_);
+  } else if (contains_ == Contains::EXCEPTION) {
+    new (&e_)std::unique_ptr<exception_wrapper>();
+    e_ = folly::make_unique<exception_wrapper>(*(t.e_));
+  }
+}
+
+template <class T>
+Try<T>& Try<T>::operator=(const Try<T>& t) {
+  static_assert(
+      std::is_copy_constructible<T>::value,
+      "T must be copyable for Try<T> to be copyable");
+  this->~Try();
+  contains_ = t.contains_;
+  if (contains_ == Contains::VALUE) {
+    new (&value_)T(t.value_);
+  } else if (contains_ == Contains::EXCEPTION) {
+    new (&e_)std::unique_ptr<exception_wrapper>();
+    e_ = folly::make_unique<exception_wrapper>(*(t.e_));
+  }
+  return *this;
+}
+
 template <class T>
 Try<T>::~Try() {
   if (contains_ == Contains::VALUE) {
index 1fd13c025a1baf1725ebd41f3868e67a36412e7f..9e791dc3f6eab6188057b8c73364a0a19c0a60bd 100644 (file)
@@ -101,14 +101,14 @@ class Try {
   }
 
   // Move constructor
-  Try(Try<T>&& t);
+  Try(Try<T>&& t) noexcept;
   // Move assigner
-  Try& operator=(Try<T>&& t);
+  Try& operator=(Try<T>&& t) noexcept;
 
-  // Non-copyable
-  Try(const Try<T>& t) = delete;
-  // Non-copyable
-  Try& operator=(const Try<T>& t) = delete;
+  // Copy constructor
+  Try(const Try& t);
+  // Copy assigner
+  Try& operator=(const Try& t);
 
   ~Try();
 
diff --git a/folly/futures/test/SharedPromiseTest.cpp b/folly/futures/test/SharedPromiseTest.cpp
new file mode 100644 (file)
index 0000000..280bb64
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/futures/SharedPromise.h>
+#include <gtest/gtest.h>
+
+using namespace folly;
+
+TEST(SharedPromise, SetGet) {
+  SharedPromise<int> p;
+  p.setValue(1);
+  auto f1 = p.getFuture();
+  auto f2 = p.getFuture();
+  EXPECT_EQ(1, f1.value());
+  EXPECT_EQ(1, f2.value());
+}
+TEST(SharedPromise, GetSet) {
+  SharedPromise<int> p;
+  auto f1 = p.getFuture();
+  auto f2 = p.getFuture();
+  p.setValue(1);
+  EXPECT_EQ(1, f1.value());
+  EXPECT_EQ(1, f2.value());
+}
+
+TEST(SharedPromise, GetSetGet) {
+  SharedPromise<int> p;
+  auto f1 = p.getFuture();
+  p.setValue(1);
+  auto f2 = p.getFuture();
+  EXPECT_EQ(1, f1.value());
+  EXPECT_EQ(1, f2.value());
+}
+
+TEST(SharedPromise, Reset) {
+  SharedPromise<int> p;
+
+  auto f1 = p.getFuture();
+  p.setValue(1);
+  EXPECT_EQ(1, f1.value());
+
+  p = SharedPromise<int>();
+  auto f2 = p.getFuture();
+  EXPECT_FALSE(f2.isReady());
+  p.setValue(2);
+  EXPECT_EQ(2, f2.value());
+}
+
+TEST(SharedPromise, GetMoveSet) {
+  SharedPromise<int> p;
+  auto f = p.getFuture();
+  auto p2 = std::move(p);
+  p2.setValue(1);
+  EXPECT_EQ(1, f.value());
+}
+
+TEST(SharedPromise, SetMoveGet) {
+  SharedPromise<int> p;
+  p.setValue(1);
+  auto p2 = std::move(p);
+  auto f = p2.getFuture();
+  EXPECT_EQ(1, f.value());
+}
+
+TEST(SharedPromise, MoveSetGet) {
+  SharedPromise<int> p;
+  auto p2 = std::move(p);
+  p2.setValue(1);
+  auto f = p2.getFuture();
+  EXPECT_EQ(1, f.value());
+}
+
+TEST(SharedPromise, MoveGetSet) {
+  SharedPromise<int> p;
+  auto p2 = std::move(p);
+  auto f = p2.getFuture();
+  p2.setValue(1);
+  EXPECT_EQ(1, f.value());
+}
+
+TEST(SharedPromise, MoveMove) {
+  SharedPromise<std::shared_ptr<int>> p;
+  auto f1 = p.getFuture();
+  auto f2 = p.getFuture();
+  auto p2 = std::move(p);
+  p = std::move(p2);
+  p.setValue(std::make_shared<int>(1));
+}
index 7782c531201ae846d97124f376288d7b5ffa1caa..69a3e0daea2ac4d15ddaacb39f3e23a16c89472d 100644 (file)
 
 using namespace folly;
 
+// Make sure we can copy Trys for copyable types
+TEST(Try, copy) {
+  Try<int> t;
+  auto t2 = t;
+}
+
+// But don't choke on move-only types
+TEST(Try, moveOnly) {
+  Try<std::unique_ptr<int>> t;
+  std::vector<Try<std::unique_ptr<int>>> v;
+  v.reserve(10);
+}
+
 TEST(Try, makeTryFunction) {
   auto func = []() {
     return folly::make_unique<int>(1);
index 48d8aa51b1794ae6780573bc36298c6ee1d4be5a..31abbdb08496492639a8a34cb29ce0ea68b235a0 100644 (file)
@@ -60,7 +60,7 @@ class OutputBufferingHandler : public OutboundBytesToBytesHandler,
     getContext()->fireWrite(std::move(sends_))
       .then([promises](Try<void> t) mutable {
         for (auto& p : *promises) {
-          p.setTry(t);
+          p.setTry(Try<void>(t));
         }
       });
   }