(wangle) Interrupts (and therefore, cancellation)
authorHans Fugal <fugalh@fb.com>
Tue, 21 Oct 2014 17:24:10 +0000 (10:24 -0700)
committerdcsommer <dcsommer@fb.com>
Wed, 29 Oct 2014 23:03:07 +0000 (16:03 -0700)
Summary:
Modeled very closely after Finagle's interrupts. Compare with https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Promise.scala if you like.
The basic idea is the promise holder can register an interrupt handler, and then interrupts will call that handler. A typical handler would fulfil the promise with an exception (or special value) indicating that it was interrupted (if it was interrupted in time).
Raising an interrupt does not prevent setting a value or callbacks executing or any of that - it is only advisory to the promise holder.

Test Plan: I wrote some unit tests.

Reviewed By: davejwatson@fb.com

Subscribers: folly-diffs@, net-systems@, fugalh, exa, hannesr, njormrod

FB internal diff: D1620805

Tasks: 4618297

folly/wangle/Future-inl.h
folly/wangle/Future.h
folly/wangle/Promise-inl.h
folly/wangle/Promise.h
folly/wangle/WangleException.h
folly/wangle/detail/Core.h
folly/wangle/test/Interrupts.cpp [new file with mode: 0644]

index 9db9ff5f5ae1803690a6683d743d3907871272e2..5615c7d571076304b0b8c578da69ba9e73c6af2e 100644 (file)
@@ -206,6 +206,11 @@ bool Future<T>::isReady() const {
   return core_->ready();
 }
 
+template <class T>
+void Future<T>::raise(std::exception_ptr exception) {
+  core_->raise(exception);
+}
+
 // makeFuture
 
 template <class T>
index 3f46034bf0a02c59422bf47b339b8d531326e475..4f0348ce3ff6119d7bd8d6bd19e9d5d9879c3027 100644 (file)
@@ -208,6 +208,25 @@ class Future {
     return core_->isActive();
   }
 
+  template <class E>
+  void raise(E&& exception) {
+    raise(std::make_exception_ptr(std::forward<E>(exception)));
+  }
+
+  /// Raise an interrupt. If the promise holder has an interrupt
+  /// handler it will be called and potentially stop asynchronous work from
+  /// being done. This is advisory only - a promise holder may not set an
+  /// interrupt handler, or may do anything including ignore. But, if you know
+  /// your future supports this the most likely result is stopping or
+  /// preventing the asynchronous operation (if in time), and the promise
+  /// holder setting an exception on the future. (That may happen
+  /// asynchronously, of course.)
+  void raise(std::exception_ptr interrupt);
+
+  void cancel() {
+    raise(FutureCancellation());
+  }
+
  private:
   typedef detail::Core<T>* corePtr;
 
index efb8edfa08190967d361bbe3f66a62ecb0297f51..68d696411f184d37640a0024fabdfcc3a520564c 100644 (file)
@@ -88,6 +88,12 @@ void Promise<T>::setException(std::exception_ptr const& e) {
   core_->setResult(Try<T>(e));
 }
 
+template <class T>
+void Promise<T>::setInterruptHandler(
+  std::function<void(std::exception_ptr const&)> fn) {
+  core_->setInterruptHandler(std::move(fn));
+}
+
 template <class T>
 void Promise<T>::fulfilTry(Try<T>&& t) {
   throwIfFulfilled();
index a4eb94b1f51a837694e1af90c8f4b05663a21848..61bc069938327a6e37d4daee82f225391fbf2751 100644 (file)
@@ -57,6 +57,13 @@ public:
     */
   template <class E> void setException(E const&);
 
+  /// Set an interrupt handler to handle interrupts. See the documentation for
+  /// Future::raise(). Your handler can do whatever it wants, but if you
+  /// bother to set one then you probably will want to fulfil the promise with
+  /// an exception (or special value) indicating how the interrupt was
+  /// handled.
+  void setInterruptHandler(std::function<void(std::exception_ptr const&)>);
+
   /** Fulfil this Promise (only for Promise<void>) */
   void setValue();
 
index aec2297057db362b927edca92853ceeab81be7b6..bddf86c75157e02dd98ac43635c6e85fef950583 100644 (file)
@@ -81,4 +81,9 @@ class UsingUninitializedTry : public WangleException {
       WangleException("Using unitialized try") { }
 };
 
+class FutureCancellation : public WangleException {
+ public:
+  FutureCancellation() : WangleException("Future was cancelled") {}
+};
+
 }}
index ee60e65c65d936464583c35740b20bc0dd2011bf..d45b985d4358328243384c7c5a4e47229670bbcd 100644 (file)
@@ -39,6 +39,8 @@ void empty_callback(Try<T>&&) { }
 
 enum class State {
   Waiting,
+  Interruptible,
+  Interrupted,
   Done,
 };
 
@@ -81,36 +83,34 @@ class Core : protected FSM<State> {
       callback_ = std::move(func);
     };
 
-    bool done = false;
-    while (!done) {
-      switch (getState()) {
+    FSM_START
       case State::Waiting:
-        done = updateState(State::Waiting, State::Waiting, setCallback_);
+      case State::Interruptible:
+      case State::Interrupted:
+        FSM_UPDATE(state, setCallback_);
         break;
 
       case State::Done:
-        done = updateState(State::Done, State::Done,
-                           setCallback_,
-                           [&]{ maybeCallback(); });
+        FSM_UPDATE2(State::Done,
+          setCallback_,
+          [&]{ maybeCallback(); });
         break;
-      }
-    }
+    FSM_END
   }
 
   void setResult(Try<T>&& t) {
-    bool done = false;
-    while (!done) {
-      switch (getState()) {
+    FSM_START
       case State::Waiting:
-        done = updateState(State::Waiting, State::Done,
+      case State::Interruptible:
+      case State::Interrupted:
+        FSM_UPDATE2(State::Done,
           [&]{ result_ = std::move(t); },
           [&]{ maybeCallback(); });
         break;
 
       case State::Done:
         throw std::logic_error("setResult called twice");
-      }
-    }
+    FSM_END
   }
 
   bool ready() const {
@@ -151,6 +151,42 @@ class Core : protected FSM<State> {
     executor_ = x;
   }
 
+  void raise(std::exception_ptr const& e) {
+    FSM_START
+      case State::Interruptible:
+        FSM_UPDATE2(State::Interrupted,
+          [&]{ interrupt_ = e; },
+          [&]{ interruptHandler_(interrupt_); });
+        break;
+
+      case State::Waiting:
+      case State::Interrupted:
+        FSM_UPDATE(State::Interrupted,
+          [&]{ interrupt_ = e; });
+        break;
+
+      case State::Done:
+        FSM_BREAK
+    FSM_END
+  }
+
+  void setInterruptHandler(std::function<void(std::exception_ptr const&)> fn) {
+    FSM_START
+      case State::Waiting:
+      case State::Interruptible:
+        FSM_UPDATE(State::Interruptible,
+          [&]{ interruptHandler_ = std::move(fn); });
+        break;
+
+      case State::Interrupted:
+        fn(interrupt_);
+        FSM_BREAK
+
+      case State::Done:
+        FSM_BREAK
+    FSM_END
+  }
+
  private:
   void maybeCallback() {
     assert(ready());
@@ -183,6 +219,8 @@ class Core : protected FSM<State> {
   std::atomic<unsigned char> detached_ {0};
   std::atomic<bool> active_ {true};
   std::atomic<Executor*> executor_ {nullptr};
+  std::exception_ptr interrupt_;
+  std::function<void(std::exception_ptr const&)> interruptHandler_;
 };
 
 template <typename... Ts>
diff --git a/folly/wangle/test/Interrupts.cpp b/folly/wangle/test/Interrupts.cpp
new file mode 100644 (file)
index 0000000..41d5bf7
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <folly/wangle/Future.h>
+#include <folly/wangle/Promise.h>
+
+using namespace folly::wangle;
+
+TEST(Interrupts, raise) {
+  std::runtime_error eggs("eggs");
+  Promise<void> p;
+  p.setInterruptHandler([&](std::exception_ptr e) {
+    EXPECT_THROW(std::rethrow_exception(e), decltype(eggs));
+  });
+  p.getFuture().raise(eggs);
+}
+
+TEST(Interrupts, cancel) {
+  Promise<void> p;
+  p.setInterruptHandler([&](std::exception_ptr e) {
+    EXPECT_THROW(std::rethrow_exception(e), FutureCancellation);
+  });
+  p.getFuture().cancel();
+}
+
+TEST(Interrupts, handleThenInterrupt) {
+  Promise<int> p;
+  bool flag = false;
+  p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+  p.getFuture().cancel();
+  EXPECT_TRUE(flag);
+}
+
+TEST(Interrupts, interruptThenHandle) {
+  Promise<int> p;
+  bool flag = false;
+  p.getFuture().cancel();
+  p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+  EXPECT_TRUE(flag);
+}
+
+TEST(Interrupts, interruptAfterFulfilNoop) {
+  Promise<void> p;
+  bool flag = false;
+  p.setInterruptHandler([&](std::exception_ptr e) { flag = true; });
+  p.setValue();
+  p.getFuture().cancel();
+  EXPECT_FALSE(flag);
+}
+
+TEST(Interrupts, secondInterruptNoop) {
+  Promise<void> p;
+  int count = 0;
+  p.setInterruptHandler([&](std::exception_ptr e) { count++; });
+  auto f = p.getFuture();
+  f.cancel();
+  f.cancel();
+  EXPECT_EQ(1, count);
+}