651eb1cacd446fe6ded1e93c41d47cac839760c5
[folly.git] / folly / io / async / AsyncUDPServerSocket.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/io/IOBufQueue.h>
20 #include <folly/Memory.h>
21 #include <folly/io/async/AsyncUDPSocket.h>
22 #include <folly/io/async/EventBase.h>
23
24 namespace folly {
25
26 /**
27  * UDP server socket
28  *
29  * It wraps a UDP socket waiting for packets and distributes them among
30  * a set of event loops in round robin fashion.
31  *
32  * NOTE: At the moment it is designed to work with single packet protocols
33  *       in mind. We distribute incoming packets among all the listeners in
34  *       round-robin fashion. So, any protocol that expects to send/recv
35  *       more than 1 packet will not work because they will end up with
36  *       different event base to process.
37  */
38 class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
39                            , public AsyncSocketBase {
40  public:
41   class Callback {
42    public:
43     /**
44      * Invoked when we start reading data from socket. It is invoked in
45      * each acceptors/listeners event base thread.
46      */
47      virtual void onListenStarted() noexcept = 0;
48
49     /**
50      * Invoked when the server socket is closed. It is invoked in each
51      * acceptors/listeners event base thread.
52      */
53      virtual void onListenStopped() noexcept = 0;
54
55     /**
56      * Invoked when a new packet is received
57      */
58     virtual void onDataAvailable(
59       std::shared_ptr<AsyncUDPSocket> socket,
60       const folly::SocketAddress& addr,
61       std::unique_ptr<folly::IOBuf> buf,
62       bool truncated) noexcept = 0;
63
64     virtual ~Callback() = default;
65   };
66
67   /**
68    * Create a new UDP server socket
69    *
70    * Note about packet size - We allocate buffer of packetSize_ size to read.
71    * If packet are larger than this value, as per UDP protocol, remaining data
72    * is dropped and you get `truncated = true` in onDataAvailable callback
73    */
74   explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
75       : evb_(evb),
76         packetSize_(sz),
77         nextListener_(0) {
78   }
79
80   ~AsyncUDPServerSocket() {
81     if (socket_) {
82       close();
83     }
84   }
85
86   void bind(const folly::SocketAddress& addy) {
87     CHECK(!socket_);
88
89     socket_ = std::make_shared<AsyncUDPSocket>(evb_);
90     socket_->setReusePort(reusePort_);
91     socket_->bind(addy);
92   }
93
94   void setReusePort(bool reusePort) {
95     reusePort_ = reusePort;
96   }
97
98   folly::SocketAddress address() const {
99     CHECK(socket_);
100     return socket_->address();
101   }
102
103   void getAddress(SocketAddress* a) const {
104     *a = address();
105   }
106
107   /**
108    * Add a listener to the round robin list
109    */
110   void addListener(EventBase* evb, Callback* callback) {
111     listeners_.emplace_back(evb, callback);
112   }
113
114   void listen() {
115     CHECK(socket_) << "Need to bind before listening";
116
117     for (auto& listener: listeners_) {
118       auto callback = listener.second;
119
120       listener.first->runInEventBaseThread([callback] () mutable {
121         callback->onListenStarted();
122       });
123     }
124
125     socket_->resumeRead(this);
126   }
127
128   int getFD() const {
129     CHECK(socket_) << "Need to bind before getting FD";
130     return socket_->getFD();
131   }
132
133   void close() {
134     CHECK(socket_) << "Need to bind before closing";
135     socket_->close();
136     socket_.reset();
137   }
138
139   EventBase* getEventBase() const {
140     return evb_;
141   }
142
143  private:
144   // AsyncUDPSocket::ReadCallback
145   void getReadBuffer(void** buf, size_t* len) noexcept {
146     std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
147   }
148
149   void onDataAvailable(const folly::SocketAddress& clientAddress,
150                        size_t len,
151                        bool truncated) noexcept {
152     buf_.postallocate(len);
153     auto data = buf_.split(len);
154
155     if (listeners_.empty()) {
156       LOG(WARNING) << "UDP server socket dropping packet, "
157                    << "no listener registered";
158       return;
159     }
160
161     if (nextListener_ >= listeners_.size()) {
162       nextListener_ = 0;
163     }
164
165     auto client = clientAddress;
166     auto callback = listeners_[nextListener_].second;
167     auto socket = socket_;
168
169     // Schedule it in the listener's eventbase
170     // XXX: Speed this up
171     auto f = [
172       socket,
173       client,
174       callback,
175       data = std::move(data),
176       truncated
177     ]() mutable {
178       callback->onDataAvailable(socket, client, std::move(data), truncated);
179     };
180
181     listeners_[nextListener_].first->runInEventBaseThread(std::move(f));
182     ++nextListener_;
183   }
184
185   void onReadError(const AsyncSocketException& ex) noexcept {
186     LOG(ERROR) << ex.what();
187
188     // Lets register to continue listening for packets
189     socket_->resumeRead(this);
190   }
191
192   void onReadClosed() noexcept {
193     for (auto& listener: listeners_) {
194       auto callback = listener.second;
195
196       listener.first->runInEventBaseThread([callback] () mutable {
197         callback->onListenStopped();
198       });
199     }
200   }
201
202   EventBase* const evb_;
203   const size_t packetSize_;
204
205   std::shared_ptr<AsyncUDPSocket> socket_;
206
207   // List of listener to distribute packets among
208   typedef std::pair<EventBase*, Callback*> Listener;
209   std::vector<Listener> listeners_;
210
211   // Next listener to send packet to
212   uint32_t nextListener_;
213
214   // Temporary buffer for data
215   folly::IOBufQueue buf_;
216
217   bool reusePort_{false};
218 };
219
220 } // Namespace