From: James Sedgwick Date: Mon, 17 Nov 2014 23:23:18 +0000 (-0800) Subject: OutputBufferingHandler X-Git-Tag: v0.22.0~169 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=c3ee57ef0e487f62f4461e3073c3c75f17dc250f;p=folly.git OutputBufferingHandler Summary: see D1618704 and D1604575 Really need to make a decision about making a push for EventBaseManager everywhere. Getting/setting EBs when there's only ever one seems so silly, but in (e.g.) tests where one-off EBs are used without using the EBM, stuff like this will break. Test Plan: definitely compiles and sorta works, will write unit tests if the dust settles Reviewed By: davejwatson@fb.com Subscribers: folly-diffs@, trunkagent, fugalh, njormrod FB internal diff: D1618727 --- diff --git a/folly/experimental/wangle/channel/OutputBufferingHandler.h b/folly/experimental/wangle/channel/OutputBufferingHandler.h new file mode 100644 index 00000000..04d12d00 --- /dev/null +++ b/folly/experimental/wangle/channel/OutputBufferingHandler.h @@ -0,0 +1,79 @@ +/* + * Copyright 2014 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace folly { namespace wangle { + +/* + * OutputBufferingHandler buffers writes in order to minimize syscalls. The + * transport will be written to once per event loop instead of on every write. + */ +class OutputBufferingHandler : public BytesToBytesHandler, + protected EventBase::LoopCallback { + public: + Future write(Context* ctx, std::unique_ptr buf) override { + CHECK(buf); + if (!queueSends_) { + return ctx->fireWrite(std::move(buf)); + } else { + ctx_ = ctx; + // Delay sends to optimize for fewer syscalls + if (!sends_) { + DCHECK(!isLoopCallbackScheduled()); + // Buffer all the sends, and call writev once per event loop. + sends_ = std::move(buf); + ctx->getTransport()->getEventBase()->runInLoop(this); + } else { + DCHECK(isLoopCallbackScheduled()); + sends_->prependChain(std::move(buf)); + } + Promise p; + auto f = p.getFuture(); + promises_.push_back(std::move(p)); + return f; + } + } + + void runLoopCallback() noexcept override { + MoveWrapper>> promises(std::move(promises_)); + ctx_->fireWrite(std::move(sends_)).then([promises](Try&& t) mutable { + try { + t.throwIfFailed(); + for (auto& p : *promises) { + p.setValue(); + } + } catch (...) { + for (auto& p : *promises) { + p.setException(std::current_exception()); + } + } + }); + } + + std::vector> promises_; + std::unique_ptr sends_{nullptr}; + bool queueSends_{true}; + Context* ctx_; +}; + +}}