removing non-existing file from the build
[folly.git] / folly / wangle / channel / OutputBufferingHandler.h
index e5ca99aee94bbc01fe8fda8af8b970ad7751fde0..d712b8a0a11c1c2c7e561969b56dcbee937e0d4e 100644 (file)
@@ -16,7 +16,8 @@
 
 #pragma once
 
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/futures/SharedPromise.h>
+#include <folly/wangle/channel/Handler.h>
 #include <folly/io/async/EventBase.h>
 #include <folly/io/async/EventBaseManager.h>
 #include <folly/io/IOBuf.h>
@@ -27,8 +28,10 @@ namespace folly { namespace wangle {
 /*
  * OutputBufferingHandler buffers writes in order to minimize syscalls. The
  * transport will be written to once per event loop instead of on every write.
+ *
+ * This handler may only be used in a single Pipeline.
  */
-class OutputBufferingHandler : public BytesToBytesHandler,
+class OutputBufferingHandler : public OutboundBytesToBytesHandler,
                                protected EventBase::LoopCallback {
  public:
   Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
@@ -36,7 +39,6 @@ class OutputBufferingHandler : public BytesToBytesHandler,
     if (!queueSends_) {
       return ctx->fireWrite(std::move(buf));
     } else {
-      ctx_ = ctx;
       // Delay sends to optimize for fewer syscalls
       if (!sends_) {
         DCHECK(!isLoopCallbackScheduled());
@@ -47,20 +49,17 @@ class OutputBufferingHandler : public BytesToBytesHandler,
         DCHECK(isLoopCallbackScheduled());
         sends_->prependChain(std::move(buf));
       }
-      Promise<void> p;
-      auto f = p.getFuture();
-      promises_.push_back(std::move(p));
-      return f;
+      return sharedPromise_.getFuture();
     }
   }
 
   void runLoopCallback() noexcept override {
-    MoveWrapper<std::vector<Promise<void>>> promises(std::move(promises_));
-    ctx_->fireWrite(std::move(sends_)).then([promises](Try<void> t) mutable {
-      for (auto& p : *promises) {
-        p.setTry(t);
-      }
-    });
+    MoveWrapper<SharedPromise<void>> sharedPromise;
+    std::swap(*sharedPromise, sharedPromise_);
+    getContext()->fireWrite(std::move(sends_))
+      .then([sharedPromise](Try<void> t) mutable {
+        sharedPromise->setTry(std::move(t));
+      });
   }
 
   Future<void> close(Context* ctx) override {
@@ -69,20 +68,17 @@ class OutputBufferingHandler : public BytesToBytesHandler,
     }
 
     // If there are sends queued, cancel them
-    for (auto& promise : promises_) {
-      promise.setException(
-        folly::make_exception_wrapper<std::runtime_error>(
-          "close() called while sends still pending"));
-    }
+    sharedPromise_.setException(
+      folly::make_exception_wrapper<std::runtime_error>(
+        "close() called while sends still pending"));
     sends_.reset();
-    promises_.clear();
+    sharedPromise_ = SharedPromise<void>();
     return ctx->fireClose();
   }
 
-  std::vector<Promise<void>> promises_;
+  SharedPromise<void> sharedPromise_;
   std::unique_ptr<IOBuf> sends_{nullptr};
   bool queueSends_{true};
-  Context* ctx_;
 };
 
 }}