e443c00670d8d83007d31a4a765ca05af5ee2ac6
[folly.git] / folly / io / async / AsyncUDPServerSocket.h
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/MoveWrapper.h>
20 #include <folly/io/IOBufQueue.h>
21 #include <folly/Memory.h>
22 #include <folly/io/async/AsyncUDPSocket.h>
23 #include <folly/io/async/EventBase.h>
24
25 namespace folly {
26
27 /**
28  * UDP server socket
29  *
30  * It wraps a UDP socket waiting for packets and distributes them among
31  * a set of event loops in round robin fashion.
32  *
33  * NOTE: At the moment it is designed to work with single packet protocols
34  *       in mind. We distribute incoming packets among all the listeners in
35  *       round-robin fashion. So, any protocol that expects to send/recv
36  *       more than 1 packet will not work because they will end up with
37  *       different event base to process.
38  */
39 class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback {
40  public:
41   class Callback {
42    public:
43     /**
44      * Invoked when we start reading data from socket. It is invoked in
45      * each acceptors/listeners event base thread.
46      */
47      virtual void onListenStarted() noexcept = 0;
48
49     /**
50      * Invoked when the server socket is closed. It is invoked in each
51      * acceptors/listeners event base thread.
52      */
53      virtual void onListenStopped() noexcept = 0;
54
55     /**
56      * Invoked when a new packet is received
57      */
58     virtual void onDataAvailable(const folly::SocketAddress& addr,
59                                  std::unique_ptr<folly::IOBuf> buf,
60                                  bool truncated) noexcept = 0;
61
62     virtual ~Callback() {}
63   };
64
65   /**
66    * Create a new UDP server socket
67    *
68    * Note about packet size - We allocate buffer of packetSize_ size to read.
69    * If packet are larger than this value, as per UDP protocol, remaining data
70    * is dropped and you get `truncated = true` in onDataAvailable callback
71    */
72   explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
73       : evb_(evb),
74         packetSize_(sz),
75         nextListener_(0) {
76   }
77
78   ~AsyncUDPServerSocket() {
79     if (socket_) {
80       close();
81     }
82   }
83
84   void bind(const folly::SocketAddress& address) {
85     CHECK(!socket_);
86
87     socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
88     socket_->bind(address);
89   }
90
91   folly::SocketAddress address() const {
92     CHECK(socket_);
93     return socket_->address();
94   }
95
96   /**
97    * Add a listener to the round robin list
98    */
99   void addListener(EventBase* evb, Callback* callback) {
100     listeners_.emplace_back(evb, callback);
101   }
102
103   void listen() {
104     CHECK(socket_) << "Need to bind before listening";
105
106     for (auto& listener: listeners_) {
107       auto callback = listener.second;
108
109       listener.first->runInEventBaseThread([callback] () mutable {
110         callback->onListenStarted();
111       });
112     }
113
114     socket_->resumeRead(this);
115   }
116
117   int getFD() const {
118     CHECK(socket_) << "Need to bind before getting FD";
119     return socket_->getFD();
120   }
121
122   void close() {
123     CHECK(socket_) << "Need to bind before closing";
124     socket_.reset();
125   }
126
127  private:
128   // AsyncUDPSocket::ReadCallback
129   void getReadBuffer(void** buf, size_t* len) noexcept {
130     std::tie(*buf, *len) = buf_.preallocate(packetSize_, packetSize_);
131   }
132
133   void onDataAvailable(const folly::SocketAddress& clientAddress,
134                        size_t len,
135                        bool truncated) noexcept {
136     buf_.postallocate(len);
137     auto data = buf_.split(len);
138
139     if (listeners_.empty()) {
140       LOG(WARNING) << "UDP server socket dropping packet, "
141                    << "no listener registered";
142       return;
143     }
144
145     if (nextListener_ >= listeners_.size()) {
146       nextListener_ = 0;
147     }
148
149     auto client = clientAddress;
150     auto callback = listeners_[nextListener_].second;
151     auto mvp =
152         folly::MoveWrapper<
153             std::unique_ptr<folly::IOBuf>>(std::move(data));
154
155     // Schedule it in the listener's eventbase
156     // XXX: Speed this up
157     std::function<void()> f = [client, callback, mvp, truncated] () mutable {
158       callback->onDataAvailable(client, std::move(*mvp), truncated);
159     };
160
161     listeners_[nextListener_].first->runInEventBaseThread(f);
162     ++nextListener_;
163   }
164
165   void onReadError(const AsyncSocketException& ex) noexcept {
166     LOG(ERROR) << ex.what();
167
168     // Lets register to continue listening for packets
169     socket_->resumeRead(this);
170   }
171
172   void onReadClosed() noexcept {
173     for (auto& listener: listeners_) {
174       auto callback = listener.second;
175
176       listener.first->runInEventBaseThread([callback] () mutable {
177         callback->onListenStopped();
178       });
179     }
180   }
181
182   EventBase* const evb_;
183   const size_t packetSize_;
184
185   std::unique_ptr<AsyncUDPSocket> socket_;
186
187   // List of listener to distribute packets among
188   typedef std::pair<EventBase*, Callback*> Listener;
189   std::vector<Listener> listeners_;
190
191   // Next listener to send packet to
192   uint32_t nextListener_;
193
194   // Temporary buffer for data
195   folly::IOBufQueue buf_;
196 };
197
198 } // Namespace