2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/channel/Handler.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseManager.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
26 namespace folly { namespace wangle {
28 // This handler may only be used in a single Pipeline
29 class AsyncSocketHandler
30 : public folly::wangle::BytesToBytesHandler,
31 public AsyncSocket::ReadCallback {
33 explicit AsyncSocketHandler(
34 std::shared_ptr<AsyncSocket> socket)
35 : socket_(std::move(socket)) {}
37 AsyncSocketHandler(AsyncSocketHandler&&) = default;
39 ~AsyncSocketHandler() {
45 void attachReadCallback() {
46 socket_->setReadCB(socket_->good() ? this : nullptr);
49 void detachReadCallback() {
50 if (socket_->getReadCallback() == this) {
51 socket_->setReadCB(nullptr);
55 void attachEventBase(folly::EventBase* eventBase) {
56 if (eventBase && !socket_->getEventBase()) {
57 socket_->attachEventBase(eventBase);
61 void detachEventBase() {
63 if (socket_->getEventBase()) {
64 socket_->detachEventBase();
68 void attachPipeline(Context* ctx) override {
72 folly::Future<void> write(
74 std::unique_ptr<folly::IOBuf> buf) override {
76 return folly::makeFuture();
79 if (!socket_->good()) {
80 VLOG(5) << "socket is closed in write()";
81 return folly::makeFuture<void>(AsyncSocketException(
82 AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
83 "socket is closed in write()"));
86 auto cb = new WriteCallback();
87 auto future = cb->promise_.getFuture();
88 socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
92 folly::Future<void> close(Context* ctx) override {
97 ctx->getPipeline()->deletePipeline();
98 return folly::makeFuture();
101 // Must override to avoid warnings about hidden overloaded virtual due to
102 // AsyncSocket::ReadCallback::readEOF()
103 void readEOF(Context* ctx) override {
107 void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
108 const auto readBufferSettings = getContext()->getReadBufferSettings();
109 const auto ret = bufQueue_.preallocate(
110 readBufferSettings.first,
111 readBufferSettings.second);
112 *bufReturn = ret.first;
113 *lenReturn = ret.second;
116 void readDataAvailable(size_t len) noexcept override {
117 bufQueue_.postallocate(len);
118 getContext()->fireRead(bufQueue_);
121 void readEOF() noexcept override {
122 getContext()->fireReadEOF();
125 void readErr(const AsyncSocketException& ex)
127 getContext()->fireReadException(
128 make_exception_wrapper<AsyncSocketException>(ex));
132 class WriteCallback : private AsyncSocket::WriteCallback {
133 void writeSuccess() noexcept override {
138 void writeErr(size_t bytesWritten,
139 const AsyncSocketException& ex)
141 promise_.setException(ex);
146 friend class AsyncSocketHandler;
147 folly::Promise<void> promise_;
150 folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
151 std::shared_ptr<AsyncSocket> socket_{nullptr};