2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/channel/Handler.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseManager.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
26 namespace folly { namespace wangle {
28 class AsyncSocketHandler
29 : public folly::wangle::BytesToBytesHandler,
30 public AsyncSocket::ReadCallback {
32 explicit AsyncSocketHandler(
33 std::shared_ptr<AsyncSocket> socket)
34 : socket_(std::move(socket)) {}
36 AsyncSocketHandler(AsyncSocketHandler&&) = default;
38 ~AsyncSocketHandler() {
44 void attachReadCallback() {
45 socket_->setReadCB(socket_->good() ? this : nullptr);
48 void detachReadCallback() {
49 if (socket_->getReadCallback() == this) {
50 socket_->setReadCB(nullptr);
54 void attachEventBase(folly::EventBase* eventBase) {
55 if (eventBase && !socket_->getEventBase()) {
56 socket_->attachEventBase(eventBase);
60 void detachEventBase() {
62 if (socket_->getEventBase()) {
63 socket_->detachEventBase();
67 void attachPipeline(Context* ctx) override {
72 folly::Future<void> write(
74 std::unique_ptr<folly::IOBuf> buf) override {
76 return folly::makeFuture();
79 if (!socket_->good()) {
80 VLOG(5) << "socket is closed in write()";
81 return folly::makeFuture<void>(AsyncSocketException(
82 AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
83 "socket is closed in write()"));
86 auto cb = new WriteCallback();
87 auto future = cb->promise_.getFuture();
88 socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
92 folly::Future<void> close(Context* ctx) override {
97 return folly::makeFuture();
100 // Must override to avoid warnings about hidden overloaded virtual due to
101 // AsyncSocket::ReadCallback::readEOF()
102 void readEOF(Context* ctx) override {
106 void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
107 const auto readBufferSettings = ctx_->getReadBufferSettings();
108 const auto ret = bufQueue_.preallocate(
109 readBufferSettings.first,
110 readBufferSettings.second);
111 *bufReturn = ret.first;
112 *lenReturn = ret.second;
115 void readDataAvailable(size_t len) noexcept override {
116 bufQueue_.postallocate(len);
117 ctx_->fireRead(bufQueue_);
120 void readEOF() noexcept override {
124 void readErr(const AsyncSocketException& ex)
126 ctx_->fireReadException(make_exception_wrapper<AsyncSocketException>(ex));
130 class WriteCallback : private AsyncSocket::WriteCallback {
131 void writeSuccess() noexcept override {
136 void writeErr(size_t bytesWritten,
137 const AsyncSocketException& ex)
139 promise_.setException(ex);
144 friend class AsyncSocketHandler;
145 folly::Promise<void> promise_;
148 Context* ctx_{nullptr};
149 folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
150 std::shared_ptr<AsyncSocket> socket_{nullptr};