Copyright 2014->2015
[folly.git] / folly / wangle / channel / AsyncSocketHandler.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/channel/ChannelHandler.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/EventBase.h>
22 #include <folly/io/async/EventBaseManager.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/IOBufQueue.h>
25
26 namespace folly { namespace wangle {
27
28 class AsyncSocketHandler
29   : public folly::wangle::BytesToBytesHandler,
30     public AsyncSocket::ReadCallback {
31  public:
32   explicit AsyncSocketHandler(
33       std::shared_ptr<AsyncSocket> socket)
34     : socket_(std::move(socket)) {}
35
36   AsyncSocketHandler(AsyncSocketHandler&&) = default;
37
38   ~AsyncSocketHandler() {
39     if (socket_) {
40       detachReadCallback();
41     }
42   }
43
44   void attachReadCallback() {
45     socket_->setReadCB(socket_->good() ? this : nullptr);
46   }
47
48   void detachReadCallback() {
49     if (socket_->getReadCallback() == this) {
50       socket_->setReadCB(nullptr);
51     }
52   }
53
54   void attachEventBase(folly::EventBase* eventBase) {
55     if (eventBase && !socket_->getEventBase()) {
56       socket_->attachEventBase(eventBase);
57     }
58   }
59
60   void detachEventBase() {
61     detachReadCallback();
62     if (socket_->getEventBase()) {
63       socket_->detachEventBase();
64     }
65   }
66
67   void attachPipeline(Context* ctx) override {
68     CHECK(!ctx_);
69     ctx_ = ctx;
70   }
71
72   folly::Future<void> write(
73       Context* ctx,
74       std::unique_ptr<folly::IOBuf> buf) override {
75     if (UNLIKELY(!buf)) {
76       return folly::makeFuture();
77     }
78
79     if (!socket_->good()) {
80       VLOG(5) << "socket is closed in write()";
81       return folly::makeFuture<void>(AsyncSocketException(
82           AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
83           "socket is closed in write()"));
84     }
85
86     auto cb = new WriteCallback();
87     auto future = cb->promise_.getFuture();
88     socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
89     return future;
90   };
91
92   folly::Future<void> close(Context* ctx) override {
93     if (socket_) {
94       detachReadCallback();
95       socket_->closeNow();
96     }
97     return folly::makeFuture();
98   }
99
100   // Must override to avoid warnings about hidden overloaded virtual due to
101   // AsyncSocket::ReadCallback::readEOF()
102   void readEOF(Context* ctx) override {
103     ctx->fireReadEOF();
104   }
105
106   void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
107     const auto readBufferSettings = ctx_->getReadBufferSettings();
108     const auto ret = bufQueue_.preallocate(
109         readBufferSettings.first,
110         readBufferSettings.second);
111     *bufReturn = ret.first;
112     *lenReturn = ret.second;
113   }
114
115   void readDataAvailable(size_t len) noexcept override {
116     bufQueue_.postallocate(len);
117     ctx_->fireRead(bufQueue_);
118   }
119
120   void readEOF() noexcept override {
121     ctx_->fireReadEOF();
122   }
123
124   void readErr(const AsyncSocketException& ex)
125     noexcept override {
126     ctx_->fireReadException(make_exception_wrapper<AsyncSocketException>(ex));
127   }
128
129  private:
130   class WriteCallback : private AsyncSocket::WriteCallback {
131     void writeSuccess() noexcept override {
132       promise_.setValue();
133       delete this;
134     }
135
136     void writeErr(size_t bytesWritten,
137                     const AsyncSocketException& ex)
138       noexcept override {
139       promise_.setException(ex);
140       delete this;
141     }
142
143    private:
144     friend class AsyncSocketHandler;
145     folly::Promise<void> promise_;
146   };
147
148   Context* ctx_{nullptr};
149   folly::IOBufQueue bufQueue_;
150   std::shared_ptr<AsyncSocket> socket_{nullptr};
151 };
152
153 }}