Get *=default*ed default constructors
[folly.git] / folly / io / async / AsyncUDPServerSocket.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/MoveWrapper.h>
20 #include <folly/io/IOBufQueue.h>
21 #include <folly/Memory.h>
22 #include <folly/io/async/AsyncUDPSocket.h>
23 #include <folly/io/async/EventBase.h>
24
25 namespace folly {
26
27 /**
28  * UDP server socket
29  *
30  * It wraps a UDP socket waiting for packets and distributes them among
31  * a set of event loops in round robin fashion.
32  *
33  * NOTE: At the moment it is designed to work with single packet protocols
34  *       in mind. We distribute incoming packets among all the listeners in
35  *       round-robin fashion. So, any protocol that expects to send/recv
36  *       more than 1 packet will not work because they will end up with
37  *       different event base to process.
38  */
39 class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
40                            , public AsyncSocketBase {
41  public:
42   class Callback {
43    public:
44     /**
45      * Invoked when we start reading data from socket. It is invoked in
46      * each acceptors/listeners event base thread.
47      */
48      virtual void onListenStarted() noexcept = 0;
49
50     /**
51      * Invoked when the server socket is closed. It is invoked in each
52      * acceptors/listeners event base thread.
53      */
54      virtual void onListenStopped() noexcept = 0;
55
56     /**
57      * Invoked when a new packet is received
58      */
59     virtual void onDataAvailable(
60       std::shared_ptr<AsyncUDPSocket> socket,
61       const folly::SocketAddress& addr,
62       std::unique_ptr<folly::IOBuf> buf,
63       bool truncated) noexcept = 0;
64
65     virtual ~Callback() = default;
66   };
67
68   /**
69    * Create a new UDP server socket
70    *
71    * Note about packet size - We allocate buffer of packetSize_ size to read.
72    * If packet are larger than this value, as per UDP protocol, remaining data
73    * is dropped and you get `truncated = true` in onDataAvailable callback
74    */
75   explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
76       : evb_(evb),
77         packetSize_(sz),
78         nextListener_(0) {
79   }
80
81   ~AsyncUDPServerSocket() {
82     if (socket_) {
83       close();
84     }
85   }
86
87   void bind(const folly::SocketAddress& addy) {
88     CHECK(!socket_);
89
90     socket_ = std::make_shared<AsyncUDPSocket>(evb_);
91     socket_->setReusePort(reusePort_);
92     socket_->bind(addy);
93   }
94
95   void setReusePort(bool reusePort) {
96     reusePort_ = reusePort;
97   }
98
99   folly::SocketAddress address() const {
100     CHECK(socket_);
101     return socket_->address();
102   }
103
104   void getAddress(SocketAddress* a) const {
105     *a = address();
106   }
107
108   /**
109    * Add a listener to the round robin list
110    */
111   void addListener(EventBase* evb, Callback* callback) {
112     listeners_.emplace_back(evb, callback);
113   }
114
115   void listen() {
116     CHECK(socket_) << "Need to bind before listening";
117
118     for (auto& listener: listeners_) {
119       auto callback = listener.second;
120
121       listener.first->runInEventBaseThread([callback] () mutable {
122         callback->onListenStarted();
123       });
124     }
125
126     socket_->resumeRead(this);
127   }
128
129   int getFD() const {
130     CHECK(socket_) << "Need to bind before getting FD";
131     return socket_->getFD();
132   }
133
134   void close() {
135     CHECK(socket_) << "Need to bind before closing";
136     socket_->close();
137     socket_.reset();
138   }
139
140   EventBase* getEventBase() const {
141     return evb_;
142   }
143
144  private:
145   // AsyncUDPSocket::ReadCallback
146   void getReadBuffer(void** buf, size_t* len) noexcept {
147     std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
148   }
149
150   void onDataAvailable(const folly::SocketAddress& clientAddress,
151                        size_t len,
152                        bool truncated) noexcept {
153     buf_.postallocate(len);
154     auto data = buf_.split(len);
155
156     if (listeners_.empty()) {
157       LOG(WARNING) << "UDP server socket dropping packet, "
158                    << "no listener registered";
159       return;
160     }
161
162     if (nextListener_ >= listeners_.size()) {
163       nextListener_ = 0;
164     }
165
166     auto client = clientAddress;
167     auto callback = listeners_[nextListener_].second;
168     auto mvp =
169         folly::MoveWrapper<
170             std::unique_ptr<folly::IOBuf>>(std::move(data));
171     auto socket = socket_;
172
173     // Schedule it in the listener's eventbase
174     // XXX: Speed this up
175     std::function<void()> f = [socket, client, callback, mvp, truncated] () mutable {
176       callback->onDataAvailable(socket, client, std::move(*mvp), truncated);
177     };
178
179     listeners_[nextListener_].first->runInEventBaseThread(f);
180     ++nextListener_;
181   }
182
183   void onReadError(const AsyncSocketException& ex) noexcept {
184     LOG(ERROR) << ex.what();
185
186     // Lets register to continue listening for packets
187     socket_->resumeRead(this);
188   }
189
190   void onReadClosed() noexcept {
191     for (auto& listener: listeners_) {
192       auto callback = listener.second;
193
194       listener.first->runInEventBaseThread([callback] () mutable {
195         callback->onListenStopped();
196       });
197     }
198   }
199
200   EventBase* const evb_;
201   const size_t packetSize_;
202
203   std::shared_ptr<AsyncUDPSocket> socket_;
204
205   // List of listener to distribute packets among
206   typedef std::pair<EventBase*, Callback*> Listener;
207   std::vector<Listener> listeners_;
208
209   // Next listener to send packet to
210   uint32_t nextListener_;
211
212   // Temporary buffer for data
213   folly::IOBufQueue buf_;
214
215   bool reusePort_{false};
216 };
217
218 } // Namespace