Future<Unit> wangle fixup
[folly.git] / folly / wangle / channel / AsyncSocketHandler.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/channel/Handler.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseManager.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
25
26 namespace folly { namespace wangle {
27
28 // This handler may only be used in a single Pipeline
29 class AsyncSocketHandler
30   : public folly::wangle::BytesToBytesHandler,
31     public AsyncSocket::ReadCallback {
32  public:
33   explicit AsyncSocketHandler(
34       std::shared_ptr<AsyncSocket> socket)
35     : socket_(std::move(socket)) {}
36
37   AsyncSocketHandler(AsyncSocketHandler&&) = default;
38
39   ~AsyncSocketHandler() {
40     detachReadCallback();
41   }
42
43   void attachReadCallback() {
44     socket_->setReadCB(socket_->good() ? this : nullptr);
45   }
46
47   void detachReadCallback() {
48     if (socket_ && socket_->getReadCallback() == this) {
49       socket_->setReadCB(nullptr);
50     }
51     auto ctx = getContext();
52     if (ctx && !firedInactive_) {
53       firedInactive_ = true;
54       ctx->fireTransportInactive();
55     }
56   }
57
58   void attachEventBase(folly::EventBase* eventBase) {
59     if (eventBase && !socket_->getEventBase()) {
60       socket_->attachEventBase(eventBase);
61     }
62   }
63
64   void detachEventBase() {
65     detachReadCallback();
66     if (socket_->getEventBase()) {
67       socket_->detachEventBase();
68     }
69   }
70
71   void transportActive(Context* ctx) override {
72     ctx->getPipeline()->setTransport(socket_);
73     attachReadCallback();
74     ctx->fireTransportActive();
75   }
76
77   void detachPipeline(Context* ctx) override {
78     detachReadCallback();
79   }
80
81   folly::Future<Unit> write(
82       Context* ctx,
83       std::unique_ptr<folly::IOBuf> buf) override {
84     if (UNLIKELY(!buf)) {
85       return folly::makeFuture();
86     }
87
88     if (!socket_->good()) {
89       VLOG(5) << "socket is closed in write()";
90       return folly::makeFuture<Unit>(AsyncSocketException(
91           AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
92           "socket is closed in write()"));
93     }
94
95     auto cb = new WriteCallback();
96     auto future = cb->promise_.getFuture();
97     socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
98     return future;
99   };
100
101   folly::Future<Unit> close(Context* ctx) override {
102     if (socket_) {
103       detachReadCallback();
104       socket_->closeNow();
105     }
106     ctx->getPipeline()->deletePipeline();
107     return folly::makeFuture();
108   }
109
110   // Must override to avoid warnings about hidden overloaded virtual due to
111   // AsyncSocket::ReadCallback::readEOF()
112   void readEOF(Context* ctx) override {
113     ctx->fireReadEOF();
114   }
115
116   void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
117     const auto readBufferSettings = getContext()->getReadBufferSettings();
118     const auto ret = bufQueue_.preallocate(
119         readBufferSettings.first,
120         readBufferSettings.second);
121     *bufReturn = ret.first;
122     *lenReturn = ret.second;
123   }
124
125   void readDataAvailable(size_t len) noexcept override {
126     bufQueue_.postallocate(len);
127     getContext()->fireRead(bufQueue_);
128   }
129
130   void readEOF() noexcept override {
131     getContext()->fireReadEOF();
132   }
133
134   void readErr(const AsyncSocketException& ex)
135     noexcept override {
136     getContext()->fireReadException(
137         make_exception_wrapper<AsyncSocketException>(ex));
138   }
139
140  private:
141   class WriteCallback : private AsyncSocket::WriteCallback {
142     void writeSuccess() noexcept override {
143       promise_.setValue();
144       delete this;
145     }
146
147     void writeErr(size_t bytesWritten,
148                     const AsyncSocketException& ex)
149       noexcept override {
150       promise_.setException(ex);
151       delete this;
152     }
153
154    private:
155     friend class AsyncSocketHandler;
156     folly::Promise<Unit> promise_;
157   };
158
159   folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
160   std::shared_ptr<AsyncSocket> socket_{nullptr};
161   bool firedInactive_{false};
162 };
163
164 }}