b9f99af1f1aa70b05fc712f1d15325945e6aad57
[folly.git] / folly / io / async / test / AsyncUDPSocketTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18 #include <folly/io/async/AsyncUDPServerSocket.h>
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/SocketAddress.h>
22
23 #include <boost/thread/barrier.hpp>
24
25 #include <folly/io/IOBuf.h>
26
27 #include <thread>
28
29 #include <gtest/gtest.h>
30
31 using folly::AsyncUDPSocket;
32 using folly::AsyncUDPServerSocket;
33 using folly::AsyncTimeout;
34 using folly::EventBase;
35 using folly::SocketAddress;
36 using folly::IOBuf;
37
38 class UDPAcceptor
39     : public AsyncUDPServerSocket::Callback {
40  public:
41   UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
42   }
43
44   void onListenStarted() noexcept {
45   }
46
47   void onListenStopped() noexcept {
48   }
49
50   void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> socket,
51                        const folly::SocketAddress& client,
52                        std::unique_ptr<folly::IOBuf> data,
53                        bool truncated) noexcept {
54
55     lastClient_ = client;
56     lastMsg_ = data->moveToFbString().toStdString();
57
58     auto len = data->computeChainDataLength();
59     VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
60             << "(trun:" << truncated << ") from " << client.describe()
61             << " - " << lastMsg_;
62
63     sendPong();
64   }
65
66   void sendPong() noexcept {
67     try {
68       AsyncUDPSocket socket(evb_);
69       socket.bind(folly::SocketAddress("127.0.0.1", 0));
70       socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
71     } catch (const std::exception& ex) {
72       VLOG(4) << "Failed to send PONG " << ex.what();
73     }
74   }
75
76  private:
77   EventBase* const evb_{nullptr};
78   const int n_{-1};
79
80   folly::SocketAddress lastClient_;
81   std::string lastMsg_;
82 };
83
84 class UDPServer {
85  public:
86   UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
87       : evb_(evb), addr_(addr), evbs_(n) {
88   }
89
90   void start() {
91     CHECK(evb_->isInEventBaseThread());
92
93     socket_ = folly::make_unique<AsyncUDPServerSocket>(
94         evb_,
95         1500);
96
97     try {
98       socket_->bind(addr_);
99       VLOG(4) << "Server listening on " << socket_->address().describe();
100     } catch (const std::exception& ex) {
101       LOG(FATAL) << ex.what();
102     }
103
104     acceptors_.reserve(evbs_.size());
105     threads_.reserve(evbs_.size());
106
107     // Add numWorkers thread
108     int i = 0;
109     for (auto& evb: evbs_) {
110       acceptors_.emplace_back(&evb, i);
111
112       std::thread t([&] () {
113         evb.loopForever();
114       });
115
116       auto r = std::make_shared<boost::barrier>(2);
117       evb.runInEventBaseThread([r] () {
118         r->wait();
119       });
120       r->wait();
121
122       socket_->addListener(&evb, &acceptors_[i]);
123       threads_.emplace_back(std::move(t));
124       ++i;
125     }
126
127     socket_->listen();
128   }
129
130   folly::SocketAddress address() const {
131     return socket_->address();
132   }
133
134   void shutdown() {
135     CHECK(evb_->isInEventBaseThread());
136     socket_->close();
137     socket_.reset();
138
139     for (auto& evb: evbs_) {
140       evb.terminateLoopSoon();
141     }
142
143     for (auto& t: threads_) {
144       t.join();
145     }
146   }
147
148  private:
149   EventBase* const evb_{nullptr};
150   const folly::SocketAddress addr_;
151
152   std::unique_ptr<AsyncUDPServerSocket> socket_;
153   std::vector<std::thread> threads_;
154   std::vector<folly::EventBase> evbs_;
155   std::vector<UDPAcceptor> acceptors_;
156 };
157
158 class UDPClient
159     : private AsyncUDPSocket::ReadCallback,
160       private AsyncTimeout {
161  public:
162   explicit UDPClient(EventBase* evb)
163       : AsyncTimeout(evb),
164         evb_(evb) {
165   }
166
167   void start(const folly::SocketAddress& server, int n) {
168     CHECK(evb_->isInEventBaseThread());
169
170     server_ = server;
171     socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
172
173     try {
174       socket_->bind(folly::SocketAddress("127.0.0.1", 0));
175       VLOG(4) << "Client bound to " << socket_->address().describe();
176     } catch (const std::exception& ex) {
177       LOG(FATAL) << ex.what();
178     }
179
180     socket_->resumeRead(this);
181
182     n_ = n;
183
184     // Start playing ping pong
185     sendPing();
186   }
187
188   void shutdown() {
189     CHECK(evb_->isInEventBaseThread());
190     socket_->pauseRead();
191     socket_->close();
192     socket_.reset();
193     evb_->terminateLoopSoon();
194   }
195
196   void sendPing() {
197     if (n_ == 0) {
198       shutdown();
199       return;
200     }
201
202     --n_;
203     scheduleTimeout(5);
204     socket_->write(
205         server_,
206         folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
207   }
208
209   void getReadBuffer(void** buf, size_t* len) noexcept {
210     *buf = buf_;
211     *len = 1024;
212   }
213
214   void onDataAvailable(const folly::SocketAddress& client,
215                        size_t len,
216                        bool truncated) noexcept {
217     VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
218               << client.describe() << " - " << std::string(buf_, len);
219     VLOG(4) << n_ << " left";
220
221     ++pongRecvd_;
222
223     sendPing();
224   }
225
226   void onReadError(const folly::AsyncSocketException& ex) noexcept {
227     VLOG(4) << ex.what();
228
229     // Start listening for next PONG
230     socket_->resumeRead(this);
231   }
232
233   void onReadClosed() noexcept {
234     CHECK(false) << "We unregister reads before closing";
235   }
236
237   void timeoutExpired() noexcept {
238     VLOG(4) << "Timeout expired";
239     sendPing();
240   }
241
242   int pongRecvd() const {
243     return pongRecvd_;
244   }
245
246  private:
247   EventBase* const evb_{nullptr};
248
249   folly::SocketAddress server_;
250   std::unique_ptr<AsyncUDPSocket> socket_;
251
252   int pongRecvd_{0};
253
254   int n_{0};
255   char buf_[1024];
256 };
257
258 TEST(AsyncSocketTest, PingPong) {
259   folly::EventBase sevb;
260   UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
261   boost::barrier barrier(2);
262
263   // Start event loop in a separate thread
264   auto serverThread = std::thread([&sevb] () {
265     sevb.loopForever();
266   });
267
268   // Wait for event loop to start
269   sevb.runInEventBaseThread([&] () { barrier.wait(); });
270   barrier.wait();
271
272   // Start the server
273   sevb.runInEventBaseThread([&] () { server.start(); barrier.wait(); });
274   barrier.wait();
275
276   folly::EventBase cevb;
277   UDPClient client(&cevb);
278
279   // Start event loop in a separate thread
280   auto clientThread = std::thread([&cevb] () {
281     cevb.loopForever();
282   });
283
284   // Wait for event loop to start
285   cevb.runInEventBaseThread([&] () { barrier.wait(); });
286   barrier.wait();
287
288   // Send ping
289   cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
290
291   // Wait for client to finish
292   clientThread.join();
293
294   // Check that some PING/PONGS were exchanged. Out of 1000 transactions
295   // at least 1 should succeed
296   CHECK_GT(client.pongRecvd(), 0);
297
298   // Shutdown server
299   sevb.runInEventBaseThread([&] () {
300     server.shutdown();
301     sevb.terminateLoopSoon();
302   });
303
304   // Wait for server thread to joib
305   serverThread.join();
306 }