allow AsyncSignalHandler to attach and detach from an EventBase
[folly.git] / folly / io / async / AsyncUDPServerSocket.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/io/IOBufQueue.h>
20 #include <folly/Memory.h>
21 #include <folly/io/async/AsyncUDPSocket.h>
22 #include <folly/io/async/EventBase.h>
23
24 namespace folly {
25
26 /**
27  * UDP server socket
28  *
29  * It wraps a UDP socket waiting for packets and distributes them among
30  * a set of event loops in round robin fashion.
31  *
32  * NOTE: At the moment it is designed to work with single packet protocols
33  *       in mind. We distribute incoming packets among all the listeners in
34  *       round-robin fashion. So, any protocol that expects to send/recv
35  *       more than 1 packet will not work because they will end up with
36  *       different event base to process.
37  */
38 class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
39                            , public AsyncSocketBase {
40  public:
41   class Callback {
42    public:
43     /**
44      * Invoked when we start reading data from socket. It is invoked in
45      * each acceptors/listeners event base thread.
46      */
47      virtual void onListenStarted() noexcept = 0;
48
49     /**
50      * Invoked when the server socket is closed. It is invoked in each
51      * acceptors/listeners event base thread.
52      */
53      virtual void onListenStopped() noexcept = 0;
54
55     /**
56      * Invoked when a new packet is received
57      */
58     virtual void onDataAvailable(
59       std::shared_ptr<AsyncUDPSocket> socket,
60       const folly::SocketAddress& addr,
61       std::unique_ptr<folly::IOBuf> buf,
62       bool truncated) noexcept = 0;
63
64     virtual ~Callback() = default;
65   };
66
67   /**
68    * Create a new UDP server socket
69    *
70    * Note about packet size - We allocate buffer of packetSize_ size to read.
71    * If packet are larger than this value, as per UDP protocol, remaining data
72    * is dropped and you get `truncated = true` in onDataAvailable callback
73    */
74   explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
75       : evb_(evb),
76         packetSize_(sz),
77         nextListener_(0) {
78   }
79
80   ~AsyncUDPServerSocket() override {
81     if (socket_) {
82       close();
83     }
84   }
85
86   void bind(const folly::SocketAddress& addy) {
87     CHECK(!socket_);
88
89     socket_ = std::make_shared<AsyncUDPSocket>(evb_);
90     socket_->setReusePort(reusePort_);
91     socket_->bind(addy);
92   }
93
94   void setReusePort(bool reusePort) {
95     reusePort_ = reusePort;
96   }
97
98   folly::SocketAddress address() const {
99     CHECK(socket_);
100     return socket_->address();
101   }
102
103   void getAddress(SocketAddress* a) const override {
104     *a = address();
105   }
106
107   /**
108    * Add a listener to the round robin list
109    */
110   void addListener(EventBase* evb, Callback* callback) {
111     listeners_.emplace_back(evb, callback);
112   }
113
114   void listen() {
115     CHECK(socket_) << "Need to bind before listening";
116
117     for (auto& listener: listeners_) {
118       auto callback = listener.second;
119
120       listener.first->runInEventBaseThread([callback] () mutable {
121         callback->onListenStarted();
122       });
123     }
124
125     socket_->resumeRead(this);
126   }
127
128   int getFD() const {
129     CHECK(socket_) << "Need to bind before getting FD";
130     return socket_->getFD();
131   }
132
133   void close() {
134     CHECK(socket_) << "Need to bind before closing";
135     socket_->close();
136     socket_.reset();
137   }
138
139   EventBase* getEventBase() const override {
140     return evb_;
141   }
142
143  private:
144   // AsyncUDPSocket::ReadCallback
145   void getReadBuffer(void** buf, size_t* len) noexcept override {
146     std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
147   }
148
149   void onDataAvailable(
150       const folly::SocketAddress& clientAddress,
151       size_t len,
152       bool truncated) noexcept override {
153     buf_.postallocate(len);
154     auto data = buf_.split(len);
155
156     if (listeners_.empty()) {
157       LOG(WARNING) << "UDP server socket dropping packet, "
158                    << "no listener registered";
159       return;
160     }
161
162     if (nextListener_ >= listeners_.size()) {
163       nextListener_ = 0;
164     }
165
166     auto client = clientAddress;
167     auto callback = listeners_[nextListener_].second;
168     auto socket = socket_;
169
170     // Schedule it in the listener's eventbase
171     // XXX: Speed this up
172     auto f = [
173       socket,
174       client,
175       callback,
176       data = std::move(data),
177       truncated
178     ]() mutable {
179       callback->onDataAvailable(socket, client, std::move(data), truncated);
180     };
181
182     listeners_[nextListener_].first->runInEventBaseThread(std::move(f));
183     ++nextListener_;
184   }
185
186   void onReadError(const AsyncSocketException& ex) noexcept override {
187     LOG(ERROR) << ex.what();
188
189     // Lets register to continue listening for packets
190     socket_->resumeRead(this);
191   }
192
193   void onReadClosed() noexcept override {
194     for (auto& listener: listeners_) {
195       auto callback = listener.second;
196
197       listener.first->runInEventBaseThread([callback] () mutable {
198         callback->onListenStopped();
199       });
200     }
201   }
202
203   EventBase* const evb_;
204   const size_t packetSize_;
205
206   std::shared_ptr<AsyncUDPSocket> socket_;
207
208   // List of listener to distribute packets among
209   typedef std::pair<EventBase*, Callback*> Listener;
210   std::vector<Listener> listeners_;
211
212   // Next listener to send packet to
213   uint32_t nextListener_;
214
215   // Temporary buffer for data
216   folly::IOBufQueue buf_;
217
218   bool reusePort_{false};
219 };
220
221 } // Namespace