04491bbd5c51992836ea2921eea4cc7998a114bb
[folly.git] / folly / wangle / channel / AsyncSocketHandler.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/channel/Handler.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseManager.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
25
26 namespace folly { namespace wangle {
27
28 // This handler may only be used in a single Pipeline
29 class AsyncSocketHandler
30   : public folly::wangle::BytesToBytesHandler,
31     public AsyncSocket::ReadCallback {
32  public:
33   explicit AsyncSocketHandler(
34       std::shared_ptr<AsyncSocket> socket)
35     : socket_(std::move(socket)) {}
36
37   AsyncSocketHandler(AsyncSocketHandler&&) = default;
38
39   ~AsyncSocketHandler() {
40     if (socket_) {
41       detachReadCallback();
42     }
43   }
44
45   void attachReadCallback() {
46     socket_->setReadCB(socket_->good() ? this : nullptr);
47   }
48
49   void detachReadCallback() {
50     if (socket_->getReadCallback() == this) {
51       socket_->setReadCB(nullptr);
52     }
53   }
54
55   void attachEventBase(folly::EventBase* eventBase) {
56     if (eventBase && !socket_->getEventBase()) {
57       socket_->attachEventBase(eventBase);
58     }
59   }
60
61   void detachEventBase() {
62     detachReadCallback();
63     if (socket_->getEventBase()) {
64       socket_->detachEventBase();
65     }
66   }
67
68   void attachPipeline(Context* ctx) override {
69     attachReadCallback();
70   }
71
72   folly::Future<void> write(
73       Context* ctx,
74       std::unique_ptr<folly::IOBuf> buf) override {
75     if (UNLIKELY(!buf)) {
76       return folly::makeFuture();
77     }
78
79     if (!socket_->good()) {
80       VLOG(5) << "socket is closed in write()";
81       return folly::makeFuture<void>(AsyncSocketException(
82           AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
83           "socket is closed in write()"));
84     }
85
86     auto cb = new WriteCallback();
87     auto future = cb->promise_.getFuture();
88     socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
89     return future;
90   };
91
92   folly::Future<void> close(Context* ctx) override {
93     if (socket_) {
94       detachReadCallback();
95       socket_->closeNow();
96     }
97     ctx->getPipeline()->deletePipeline();
98     return folly::makeFuture();
99   }
100
101   // Must override to avoid warnings about hidden overloaded virtual due to
102   // AsyncSocket::ReadCallback::readEOF()
103   void readEOF(Context* ctx) override {
104     ctx->fireReadEOF();
105   }
106
107   void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
108     const auto readBufferSettings = getContext()->getReadBufferSettings();
109     const auto ret = bufQueue_.preallocate(
110         readBufferSettings.first,
111         readBufferSettings.second);
112     *bufReturn = ret.first;
113     *lenReturn = ret.second;
114   }
115
116   void readDataAvailable(size_t len) noexcept override {
117     bufQueue_.postallocate(len);
118     getContext()->fireRead(bufQueue_);
119   }
120
121   void readEOF() noexcept override {
122     getContext()->fireReadEOF();
123   }
124
125   void readErr(const AsyncSocketException& ex)
126     noexcept override {
127     getContext()->fireReadException(
128         make_exception_wrapper<AsyncSocketException>(ex));
129   }
130
131  private:
132   class WriteCallback : private AsyncSocket::WriteCallback {
133     void writeSuccess() noexcept override {
134       promise_.setValue();
135       delete this;
136     }
137
138     void writeErr(size_t bytesWritten,
139                     const AsyncSocketException& ex)
140       noexcept override {
141       promise_.setException(ex);
142       delete this;
143     }
144
145    private:
146     friend class AsyncSocketHandler;
147     folly::Promise<void> promise_;
148   };
149
150   folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
151   std::shared_ptr<AsyncSocket> socket_{nullptr};
152 };
153
154 }}