unrevert "(wangle) express current Core functionality with a state machine""
authorHans Fugal <fugalh@fb.com>
Mon, 27 Oct 2014 15:53:20 +0000 (08:53 -0700)
committerdcsommer <dcsommer@fb.com>
Wed, 29 Oct 2014 23:05:12 +0000 (16:05 -0700)
Summary: Reverts D1633874. Companion to D1636490 which fixes the bug.

Test Plan:
git reverting code that was git reverted and hasn't changed in the interim
Won't be checked in without the companion bugfix diff (D1636490)

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, net-systems@, fugalh, exa, njormrod, folly-diffs@

FB internal diff: D1636487

Tasks: 5438209

Blame Revision: D1633874

folly/wangle/Future-inl.h
folly/wangle/Future.h
folly/wangle/Promise-inl.h
folly/wangle/Promise.h
folly/wangle/WangleException.h
folly/wangle/detail/Core.h
folly/wangle/test/Interrupts.cpp [new file with mode: 0644]

index e3119cf5534dac204a3d38a3a1bb476847d8259e..f462189cca9a6b0a78a2515571d17bbd1419388c 100644 (file)
@@ -206,6 +206,11 @@ bool Future<T>::isReady() const {
   return core_->ready();
 }
 
+template <class T>
+void Future<T>::raise(std::exception_ptr exception) {
+  core_->raise(exception);
+}
+
 // makeFuture
 
 template <class T>
index bcd4c6949699a9ccbc6af79d83796bc01a28cda4..f3d50ce020aedbf5bcd9b5cc5bc378d6ad718704 100644 (file)
@@ -214,6 +214,25 @@ class Future {
     return core_->isActive();
   }
 
+  template <class E>
+  void raise(E&& exception) {
+    raise(std::make_exception_ptr(std::forward<E>(exception)));
+  }
+
+  /// Raise an interrupt. If the promise holder has an interrupt
+  /// handler it will be called and potentially stop asynchronous work from
+  /// being done. This is advisory only - a promise holder may not set an
+  /// interrupt handler, or may do anything including ignore. But, if you know
+  /// your future supports this the most likely result is stopping or
+  /// preventing the asynchronous operation (if in time), and the promise
+  /// holder setting an exception on the future. (That may happen
+  /// asynchronously, of course.)
+  void raise(std::exception_ptr interrupt);
+
+  void cancel() {
+    raise(FutureCancellation());
+  }
+
  private:
   typedef detail::Core<T>* corePtr;
 
index efb8edfa08190967d361bbe3f66a62ecb0297f51..68d696411f184d37640a0024fabdfcc3a520564c 100644 (file)
@@ -88,6 +88,12 @@ void Promise<T>::setException(std::exception_ptr const& e) {
   core_->setResult(Try<T>(e));
 }
 
+template <class T>
+void Promise<T>::setInterruptHandler(
+  std::function<void(std::exception_ptr const&)> fn) {
+  core_->setInterruptHandler(std::move(fn));
+}
+
 template <class T>
 void Promise<T>::fulfilTry(Try<T>&& t) {
   throwIfFulfilled();
index 404bfb54a31d88d2dfa2ddd7499c4757154d19b2..7442e4515caa86a98b5328d3f86bbfeb1964a066 100644 (file)
@@ -56,6 +56,13 @@ public:
     */
   template <class E> void setException(E const&);
 
+  /// Set an interrupt handler to handle interrupts. See the documentation for
+  /// Future::raise(). Your handler can do whatever it wants, but if you
+  /// bother to set one then you probably will want to fulfil the promise with
+  /// an exception (or special value) indicating how the interrupt was
+  /// handled.
+  void setInterruptHandler(std::function<void(std::exception_ptr const&)>);
+
   /** Fulfil this Promise (only for Promise<void>) */
   void setValue();
 
index aec2297057db362b927edca92853ceeab81be7b6..bddf86c75157e02dd98ac43635c6e85fef950583 100644 (file)
@@ -81,4 +81,9 @@ class UsingUninitializedTry : public WangleException {
       WangleException("Using unitialized try") { }
 };
 
+class FutureCancellation : public WangleException {
+ public:
+  FutureCancellation() : WangleException("Future was cancelled") {}
+};
+
 }}
index 1cfdcc131a7835d862297fe2ab2d72697393cf0d..9b9295d89688b2d2e2ecd4b2c64600f85c432116 100644 (file)
@@ -28,6 +28,7 @@
 #include <folly/wangle/Promise.h>
 #include <folly/wangle/Future.h>
 #include <folly/wangle/Executor.h>
+#include <folly/wangle/detail/FSM.h>
 
 namespace folly { namespace wangle { namespace detail {
 
@@ -36,14 +37,21 @@ namespace folly { namespace wangle { namespace detail {
 template<typename T>
 void empty_callback(Try<T>&&) { }
 
+enum class State {
+  Waiting,
+  Interruptible,
+  Interrupted,
+  Done,
+};
+
 /** The shared state object for Future and Promise. */
 template<typename T>
-class Core {
+class Core : protected FSM<State> {
  public:
   // This must be heap-constructed. There's probably a way to enforce that in
   // code but since this is just internal detail code and I don't know how
   // off-hand, I'm punting.
-  Core() = default;
+  Core() : FSM<State>(State::Waiting) {}
   ~Core() {
     assert(calledBack_);
     assert(detached_ == 2);
@@ -67,36 +75,46 @@ class Core {
 
   template <typename F>
   void setCallback(F func) {
-    {
-      std::lock_guard<decltype(mutex_)> lock(mutex_);
-
+    auto setCallback_ = [&]{
       if (callback_) {
         throw std::logic_error("setCallback called twice");
       }
 
       callback_ = std::move(func);
-    }
-
-    maybeCallback();
+    };
+
+    FSM_START
+      case State::Waiting:
+      case State::Interruptible:
+      case State::Interrupted:
+        FSM_UPDATE(state, setCallback_);
+        break;
+
+      case State::Done:
+        FSM_UPDATE2(State::Done,
+          setCallback_,
+          [&]{ maybeCallback(); });
+        break;
+    FSM_END
   }
 
   void setResult(Try<T>&& t) {
-    {
-      std::lock_guard<decltype(mutex_)> lock(mutex_);
-
-      if (ready()) {
+    FSM_START
+      case State::Waiting:
+      case State::Interruptible:
+      case State::Interrupted:
+        FSM_UPDATE2(State::Done,
+          [&]{ result_ = std::move(t); },
+          [&]{ maybeCallback(); });
+        break;
+
+      case State::Done:
         throw std::logic_error("setResult called twice");
-      }
-
-      result_ = std::move(t);
-      assert(ready());
-    }
-
-    maybeCallback();
+    FSM_END
   }
 
   bool ready() const {
-    return result_.hasValue();
+    return getState() == State::Done;
   }
 
   // Called by a destructing Future
@@ -117,54 +135,79 @@ class Core {
   }
 
   void deactivate() {
-    std::lock_guard<decltype(mutex_)> lock(mutex_);
     active_ = false;
   }
 
   void activate() {
-    {
-      std::lock_guard<decltype(mutex_)> lock(mutex_);
-      active_ = true;
+    active_ = true;
+    if (ready()) {
+      maybeCallback();
     }
-    maybeCallback();
   }
 
   bool isActive() { return active_; }
 
   void setExecutor(Executor* x) {
-    std::lock_guard<decltype(mutex_)> lock(mutex_);
     executor_ = x;
   }
 
+  void raise(std::exception_ptr const& e) {
+    FSM_START
+      case State::Interruptible:
+        FSM_UPDATE2(State::Interrupted,
+          [&]{ interrupt_ = e; },
+          [&]{ interruptHandler_(interrupt_); });
+        break;
+
+      case State::Waiting:
+      case State::Interrupted:
+        FSM_UPDATE(State::Interrupted,
+          [&]{ interrupt_ = e; });
+        break;
+
+      case State::Done:
+        FSM_BREAK
+    FSM_END
+  }
+
+  void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
+    FSM_START
+      case State::Waiting:
+      case State::Interruptible:
+        FSM_UPDATE(State::Interruptible,
+          [&]{ interruptHandler_ = std::move(fn); });
+        break;
+
+      case State::Interrupted:
+        fn(interrupt_);
+        FSM_BREAK
+
+      case State::Done:
+        FSM_BREAK
+    FSM_END
+  }
+
  private:
   void maybeCallback() {
-    std::unique_lock<decltype(mutex_)> lock(mutex_);
-    if (!calledBack_ &&
-        result_ && callback_ && isActive()) {
-      // TODO(5306911) we should probably try/catch here
-      if (executor_) {
-        MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
+    assert(ready());
+    if (!calledBack_ && isActive() && callback_) {
+      // TODO(5306911) we should probably try/catch
+      calledBack_ = true;
+      Executor* x = executor_;
+      if (x) {
         MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
-        executor_->add([cb, val]() mutable { (*cb)(std::move(**val)); });
-        calledBack_ = true;
+        MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
+        x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
       } else {
-        calledBack_ = true;
-        lock.unlock();
         callback_(std::move(*result_));
       }
     }
   }
 
   void detachOne() {
-    bool shouldDelete;
-    {
-      std::lock_guard<decltype(mutex_)> lock(mutex_);
-      detached_++;
-      assert(detached_ == 1 || detached_ == 2);
-      shouldDelete = (detached_ == 2);
-    }
-
-    if (shouldDelete) {
+    ++detached_;
+    assert(detached_ == 1 || detached_ == 2);
+    if (detached_ == 2) {
       // we should have already executed the callback with the value
       assert(calledBack_);
       delete this;
@@ -173,15 +216,12 @@ class Core {
 
   folly::Optional<Try<T>> result_;
   std::function<void(Try<T>&&)> callback_;
-  bool calledBack_ = false;
-  unsigned char detached_ = 0;
-  bool active_ = true;
-  Executor* executor_ = nullptr;
-
-  // this lock isn't meant to protect all accesses to members, only the ones
-  // that need to be threadsafe: the act of setting result_ and callback_, and
-  // seeing if they are set and whether we should then continue.
-  folly::MicroSpinLock mutex_ {0};
+  std::atomic<bool> calledBack_ {false};
+  std::atomic<unsigned char> detached_ {0};
+  std::atomic<bool> active_ {true};
+  std::atomic<Executor*> executor_ {nullptr};
+  std::exception_ptr interrupt_;
+  std::function<void(std::exception_ptr const&)> interruptHandler_;
 };
 
 template <typename... Ts>
diff --git a/folly/wangle/test/Interrupts.cpp b/folly/wangle/test/Interrupts.cpp
new file mode 100644 (file)
index 0000000..41d5bf7
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <folly/wangle/Future.h>
+#include <folly/wangle/Promise.h>
+
+using namespace folly::wangle;
+
+TEST(Interrupts, raise) {
+  std::runtime_error eggs("eggs");
+  Promise<void> p;
+  p.setInterruptHandler([&](std::exception_ptr e) {
+    EXPECT_THROW(std::rethrow_exception(e), decltype(eggs));
+  });
+  p.getFuture().raise(eggs);
+}
+
+TEST(Interrupts, cancel) {
+  Promise<void> p;
+  p.setInterruptHandler([&](std::exception_ptr e) {
+    EXPECT_THROW(std::rethrow_exception(e), FutureCancellation);
+  });
+  p.getFuture().cancel();
+}
+
+TEST(Interrupts, handleThenInterrupt) {
+  Promise<int> p;
+  bool flag = false;
+  p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+  p.getFuture().cancel();
+  EXPECT_TRUE(flag);
+}
+
+TEST(Interrupts, interruptThenHandle) {
+  Promise<int> p;
+  bool flag = false;
+  p.getFuture().cancel();
+  p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+  EXPECT_TRUE(flag);
+}
+
+TEST(Interrupts, interruptAfterFulfilNoop) {
+  Promise<void> p;
+  bool flag = false;
+  p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+  p.setValue();
+  p.getFuture().cancel();
+  EXPECT_FALSE(flag);
+}
+
+TEST(Interrupts, secondInterruptNoop) {
+  Promise<void> p;
+  int count = 0;
+  p.setInterruptHandler([&](std::exception_ptr e) { count++; });
+  auto f = p.getFuture();
+  f.cancel();
+  f.cancel();
+  EXPECT_EQ(1, count);
+}