asyncudpserversocket reuse port
[folly.git] / folly / io / async / AsyncUDPServerSocket.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/MoveWrapper.h>
20 #include <folly/io/IOBufQueue.h>
21 #include <folly/Memory.h>
22 #include <folly/io/async/AsyncUDPSocket.h>
23 #include <folly/io/async/EventBase.h>
24
25 namespace folly {
26
27 /**
28  * UDP server socket
29  *
30  * It wraps a UDP socket waiting for packets and distributes them among
31  * a set of event loops in round robin fashion.
32  *
33  * NOTE: At the moment it is designed to work with single packet protocols
34  *       in mind. We distribute incoming packets among all the listeners in
35  *       round-robin fashion. So, any protocol that expects to send/recv
36  *       more than 1 packet will not work because they will end up with
37  *       different event base to process.
38  */
39 class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
40                            , public AsyncSocketBase {
41  public:
42   class Callback {
43    public:
44     /**
45      * Invoked when we start reading data from socket. It is invoked in
46      * each acceptors/listeners event base thread.
47      */
48      virtual void onListenStarted() noexcept = 0;
49
50     /**
51      * Invoked when the server socket is closed. It is invoked in each
52      * acceptors/listeners event base thread.
53      */
54      virtual void onListenStopped() noexcept = 0;
55
56     /**
57      * Invoked when a new packet is received
58      */
59     virtual void onDataAvailable(const folly::SocketAddress& addr,
60                                  std::unique_ptr<folly::IOBuf> buf,
61                                  bool truncated) noexcept = 0;
62
63     virtual ~Callback() {}
64   };
65
66   /**
67    * Create a new UDP server socket
68    *
69    * Note about packet size - We allocate buffer of packetSize_ size to read.
70    * If packet are larger than this value, as per UDP protocol, remaining data
71    * is dropped and you get `truncated = true` in onDataAvailable callback
72    */
73   explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
74       : evb_(evb),
75         packetSize_(sz),
76         nextListener_(0) {
77   }
78
79   ~AsyncUDPServerSocket() {
80     if (socket_) {
81       close();
82     }
83   }
84
85   void bind(const folly::SocketAddress& addy) {
86     CHECK(!socket_);
87
88     socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
89     socket_->setReusePort(reusePort_);
90     socket_->bind(addy);
91   }
92
93   void setReusePort(bool reusePort) {
94     reusePort_ = reusePort;
95   }
96
97   folly::SocketAddress address() const {
98     CHECK(socket_);
99     return socket_->address();
100   }
101
102   void getAddress(SocketAddress* a) const {
103     *a = address();
104   }
105
106   /**
107    * Add a listener to the round robin list
108    */
109   void addListener(EventBase* evb, Callback* callback) {
110     listeners_.emplace_back(evb, callback);
111   }
112
113   void listen() {
114     CHECK(socket_) << "Need to bind before listening";
115
116     for (auto& listener: listeners_) {
117       auto callback = listener.second;
118
119       listener.first->runInEventBaseThread([callback] () mutable {
120         callback->onListenStarted();
121       });
122     }
123
124     socket_->resumeRead(this);
125   }
126
127   int getFD() const {
128     CHECK(socket_) << "Need to bind before getting FD";
129     return socket_->getFD();
130   }
131
132   void close() {
133     CHECK(socket_) << "Need to bind before closing";
134     socket_.reset();
135   }
136
137   EventBase* getEventBase() const {
138     return evb_;
139   }
140
141  private:
142   // AsyncUDPSocket::ReadCallback
143   void getReadBuffer(void** buf, size_t* len) noexcept {
144     std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
145   }
146
147   void onDataAvailable(const folly::SocketAddress& clientAddress,
148                        size_t len,
149                        bool truncated) noexcept {
150     buf_.postallocate(len);
151     auto data = buf_.split(len);
152
153     if (listeners_.empty()) {
154       LOG(WARNING) << "UDP server socket dropping packet, "
155                    << "no listener registered";
156       return;
157     }
158
159     if (nextListener_ >= listeners_.size()) {
160       nextListener_ = 0;
161     }
162
163     auto client = clientAddress;
164     auto callback = listeners_[nextListener_].second;
165     auto mvp =
166         folly::MoveWrapper<
167             std::unique_ptr<folly::IOBuf>>(std::move(data));
168
169     // Schedule it in the listener's eventbase
170     // XXX: Speed this up
171     std::function<void()> f = [client, callback, mvp, truncated] () mutable {
172       callback->onDataAvailable(client, std::move(*mvp), truncated);
173     };
174
175     listeners_[nextListener_].first->runInEventBaseThread(f);
176     ++nextListener_;
177   }
178
179   void onReadError(const AsyncSocketException& ex) noexcept {
180     LOG(ERROR) << ex.what();
181
182     // Lets register to continue listening for packets
183     socket_->resumeRead(this);
184   }
185
186   void onReadClosed() noexcept {
187     for (auto& listener: listeners_) {
188       auto callback = listener.second;
189
190       listener.first->runInEventBaseThread([callback] () mutable {
191         callback->onListenStopped();
192       });
193     }
194   }
195
196   EventBase* const evb_;
197   const size_t packetSize_;
198
199   std::unique_ptr<AsyncUDPSocket> socket_;
200
201   // List of listener to distribute packets among
202   typedef std::pair<EventBase*, Callback*> Listener;
203   std::vector<Listener> listeners_;
204
205   // Next listener to send packet to
206   uint32_t nextListener_;
207
208   // Temporary buffer for data
209   folly::IOBufQueue buf_;
210
211   bool reusePort_{false};
212 };
213
214 } // Namespace