#pragma once
-#include <folly/wangle/channel/ChannelHandler.h>
+#include <folly/futures/SharedPromise.h>
+#include <folly/wangle/channel/Handler.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/io/IOBuf.h>
/*
* OutputBufferingHandler buffers writes in order to minimize syscalls. The
* transport will be written to once per event loop instead of on every write.
+ *
+ * This handler may only be used in a single Pipeline.
*/
-class OutputBufferingHandler : public BytesToBytesHandler,
+class OutputBufferingHandler : public OutboundBytesToBytesHandler,
protected EventBase::LoopCallback {
public:
Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
if (!queueSends_) {
return ctx->fireWrite(std::move(buf));
} else {
- ctx_ = ctx;
// Delay sends to optimize for fewer syscalls
if (!sends_) {
DCHECK(!isLoopCallbackScheduled());
DCHECK(isLoopCallbackScheduled());
sends_->prependChain(std::move(buf));
}
- Promise<void> p;
- auto f = p.getFuture();
- promises_.push_back(std::move(p));
- return f;
+ return sharedPromise_.getFuture();
}
}
void runLoopCallback() noexcept override {
- MoveWrapper<std::vector<Promise<void>>> promises(std::move(promises_));
- ctx_->fireWrite(std::move(sends_)).then([promises](Try<void> t) mutable {
- for (auto& p : *promises) {
- p.setTry(t);
- }
- });
+ MoveWrapper<SharedPromise<void>> sharedPromise;
+ std::swap(*sharedPromise, sharedPromise_);
+ getContext()->fireWrite(std::move(sends_))
+ .then([sharedPromise](Try<void> t) mutable {
+ sharedPromise->setTry(std::move(t));
+ });
}
Future<void> close(Context* ctx) override {
}
// If there are sends queued, cancel them
- for (auto& promise : promises_) {
- promise.setException(
- folly::make_exception_wrapper<std::runtime_error>(
- "close() called while sends still pending"));
- }
+ sharedPromise_.setException(
+ folly::make_exception_wrapper<std::runtime_error>(
+ "close() called while sends still pending"));
sends_.reset();
- promises_.clear();
+ sharedPromise_ = SharedPromise<void>();
return ctx->fireClose();
}
- std::vector<Promise<void>> promises_;
+ SharedPromise<void> sharedPromise_;
std::unique_ptr<IOBuf> sends_{nullptr};
bool queueSends_{true};
- Context* ctx_;
};
}}