OutputBufferingHandler
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 17 Nov 2014 23:23:18 +0000 (15:23 -0800)
committerDave Watson <davejwatson@fb.com>
Wed, 19 Nov 2014 20:52:37 +0000 (12:52 -0800)
Summary:
see D1618704 and D1604575
Really need to make a decision about making a push for EventBaseManager everywhere. Getting/setting EBs when there's only ever one seems so silly, but in (e.g.) tests where one-off EBs are used without using the EBM, stuff like this will break.

Test Plan: definitely compiles and sorta works, will write unit tests if the dust settles

Reviewed By: davejwatson@fb.com

Subscribers: folly-diffs@, trunkagent, fugalh, njormrod

FB internal diff: D1618727

folly/experimental/wangle/channel/OutputBufferingHandler.h [new file with mode: 0644]

diff --git a/folly/experimental/wangle/channel/OutputBufferingHandler.h b/folly/experimental/wangle/channel/OutputBufferingHandler.h
new file mode 100644 (file)
index 0000000..04d12d0
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/experimental/wangle/channel/ChannelHandler.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventBaseManager.h>
+#include <folly/io/IOBuf.h>
+#include <folly/io/IOBufQueue.h>
+
+namespace folly { namespace wangle {
+
+/*
+ * OutputBufferingHandler buffers writes in order to minimize syscalls. The
+ * transport will be written to once per event loop instead of on every write.
+ */
+class OutputBufferingHandler : public BytesToBytesHandler,
+                               protected EventBase::LoopCallback {
+ public:
+  Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
+    CHECK(buf);
+    if (!queueSends_) {
+      return ctx->fireWrite(std::move(buf));
+    } else {
+      ctx_ = ctx;
+      // Delay sends to optimize for fewer syscalls
+      if (!sends_) {
+        DCHECK(!isLoopCallbackScheduled());
+        // Buffer all the sends, and call writev once per event loop.
+        sends_ = std::move(buf);
+        ctx->getTransport()->getEventBase()->runInLoop(this);
+      } else {
+        DCHECK(isLoopCallbackScheduled());
+        sends_->prependChain(std::move(buf));
+      }
+      Promise<void> p;
+      auto f = p.getFuture();
+      promises_.push_back(std::move(p));
+      return f;
+    }
+  }
+
+  void runLoopCallback() noexcept override {
+    MoveWrapper<std::vector<Promise<void>>> promises(std::move(promises_));
+    ctx_->fireWrite(std::move(sends_)).then([promises](Try<void>&& t) mutable {
+      try {
+        t.throwIfFailed();
+        for (auto& p : *promises) {
+          p.setValue();
+        }
+      } catch (...) {
+        for (auto& p : *promises) {
+          p.setException(std::current_exception());
+        }
+      }
+    });
+  }
+
+  std::vector<Promise<void>> promises_;
+  std::unique_ptr<IOBuf> sends_{nullptr};
+  bool queueSends_{true};
+  Context* ctx_;
+};
+
+}}