SharedPromise in OutputBufferingHandler
authorJames Sedgwick <jsedgwick@fb.com>
Wed, 13 May 2015 01:42:39 +0000 (18:42 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Wed, 20 May 2015 17:56:58 +0000 (10:56 -0700)
Summary:
as above. I'm torn on whether to sugar "*this = SharedPromise<T>" as SharedPromise<T>::reset()
If I see another use case I'll probably do it

Test Plan: unit

Reviewed By: hans@fb.com

Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2064449

Signature: t1:2064449:1431476780:7113366b11feaf9e8a4ea1dc60fbafb36dd46ac5

folly/futures/SharedPromise.h
folly/wangle/channel/OutputBufferingHandler.h
folly/wangle/channel/test/OutputBufferingHandlerTest.cpp

index 5041a38ab724bab547fd13413fe481a0d0cd3ab8..c681a3f037fc6f504d9652891b3758d8631ed073 100644 (file)
@@ -83,14 +83,14 @@ public:
   template <class B = T>
   typename std::enable_if<std::is_void<B>::value, void>::type
   setValue() {
-    set(Try<T>());
+    setTry(Try<T>());
   }
 
   /// Sugar to fulfill this SharedPromise<Unit>
   template <class B = T>
   typename std::enable_if<std::is_same<Unit, B>::value, void>::type
   setValue() {
-    set(Try<T>(T()));
+    setTry(Try<T>(T()));
   }
 
   /** Set the value (use perfect forwarding for both move and copy) */
index 31abbdb08496492639a8a34cb29ce0ea68b235a0..d712b8a0a11c1c2c7e561969b56dcbee937e0d4e 100644 (file)
@@ -16,6 +16,7 @@
 
 #pragma once
 
+#include <folly/futures/SharedPromise.h>
 #include <folly/wangle/channel/Handler.h>
 #include <folly/io/async/EventBase.h>
 #include <folly/io/async/EventBaseManager.h>
@@ -48,20 +49,16 @@ class OutputBufferingHandler : public OutboundBytesToBytesHandler,
         DCHECK(isLoopCallbackScheduled());
         sends_->prependChain(std::move(buf));
       }
-      Promise<void> p;
-      auto f = p.getFuture();
-      promises_.push_back(std::move(p));
-      return f;
+      return sharedPromise_.getFuture();
     }
   }
 
   void runLoopCallback() noexcept override {
-    MoveWrapper<std::vector<Promise<void>>> promises(std::move(promises_));
+    MoveWrapper<SharedPromise<void>> sharedPromise;
+    std::swap(*sharedPromise, sharedPromise_);
     getContext()->fireWrite(std::move(sends_))
-      .then([promises](Try<void> t) mutable {
-        for (auto& p : *promises) {
-          p.setTry(Try<void>(t));
-        }
+      .then([sharedPromise](Try<void> t) mutable {
+        sharedPromise->setTry(std::move(t));
       });
   }
 
@@ -71,17 +68,15 @@ class OutputBufferingHandler : public OutboundBytesToBytesHandler,
     }
 
     // If there are sends queued, cancel them
-    for (auto& promise : promises_) {
-      promise.setException(
-        folly::make_exception_wrapper<std::runtime_error>(
-          "close() called while sends still pending"));
-    }
+    sharedPromise_.setException(
+      folly::make_exception_wrapper<std::runtime_error>(
+        "close() called while sends still pending"));
     sends_.reset();
-    promises_.clear();
+    sharedPromise_ = SharedPromise<void>();
     return ctx->fireClose();
   }
 
-  std::vector<Promise<void>> promises_;
+  SharedPromise<void> sharedPromise_;
   std::unique_ptr<IOBuf> sends_{nullptr};
   bool queueSends_{true};
 };
index 51f67275eef215c6b5d2199cfddfc20011ad9372..a0279666d543c52089ccd31356fd63af08cf705b 100644 (file)
@@ -56,4 +56,11 @@ TEST(OutputBufferingHandlerTest, Basic) {
   EXPECT_TRUE(f1.isReady());
   EXPECT_TRUE(f2.isReady());
   EXPECT_CALL(mockHandler, detachPipeline(_));
+
+ // Make sure the SharedPromise resets correctly
+  auto f = pipeline.write(IOBuf::copyBuffer("foo"));
+  EXPECT_FALSE(f.isReady());
+  EXPECT_CALL(mockHandler, write_(_, IOBufContains("foo")));
+  eb.loopOnce();
+  EXPECT_TRUE(f.isReady());
 }