2af6ffb6267c4172a389128aeb7f32b7ba7fca82
[folly.git] / folly / io / async / test / AsyncUDPSocketTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18 #include <folly/io/async/AsyncUDPServerSocket.h>
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/SocketAddress.h>
22
23 #include <boost/thread/barrier.hpp>
24
25 #include <folly/io/IOBuf.h>
26
27 #include <thread>
28
29 #include <gtest/gtest.h>
30
31 using folly::AsyncUDPSocket;
32 using folly::AsyncUDPServerSocket;
33 using folly::AsyncTimeout;
34 using folly::EventBase;
35 using folly::SocketAddress;
36 using folly::IOBuf;
37
38 class UDPAcceptor
39     : public AsyncUDPServerSocket::Callback {
40  public:
41   UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
42   }
43
44   void onListenStarted() noexcept override {}
45
46   void onListenStopped() noexcept override {}
47
48   void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> socket,
49                        const folly::SocketAddress& client,
50                        std::unique_ptr<folly::IOBuf> data,
51                        bool truncated) noexcept override {
52
53     lastClient_ = client;
54     lastMsg_ = data->moveToFbString().toStdString();
55
56     auto len = data->computeChainDataLength();
57     VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
58             << "(trun:" << truncated << ") from " << client.describe()
59             << " - " << lastMsg_;
60
61     sendPong();
62   }
63
64   void sendPong() noexcept {
65     try {
66       AsyncUDPSocket socket(evb_);
67       socket.bind(folly::SocketAddress("127.0.0.1", 0));
68       socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
69     } catch (const std::exception& ex) {
70       VLOG(4) << "Failed to send PONG " << ex.what();
71     }
72   }
73
74  private:
75   EventBase* const evb_{nullptr};
76   const int n_{-1};
77
78   folly::SocketAddress lastClient_;
79   std::string lastMsg_;
80 };
81
82 class UDPServer {
83  public:
84   UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
85       : evb_(evb), addr_(addr), evbs_(n) {
86   }
87
88   void start() {
89     CHECK(evb_->isInEventBaseThread());
90
91     socket_ = folly::make_unique<AsyncUDPServerSocket>(
92         evb_,
93         1500);
94
95     try {
96       socket_->bind(addr_);
97       VLOG(4) << "Server listening on " << socket_->address().describe();
98     } catch (const std::exception& ex) {
99       LOG(FATAL) << ex.what();
100     }
101
102     acceptors_.reserve(evbs_.size());
103     threads_.reserve(evbs_.size());
104
105     // Add numWorkers thread
106     int i = 0;
107     for (auto& evb: evbs_) {
108       acceptors_.emplace_back(&evb, i);
109
110       std::thread t([&] () {
111         evb.loopForever();
112       });
113
114       auto r = std::make_shared<boost::barrier>(2);
115       evb.runInEventBaseThread([r] () {
116         r->wait();
117       });
118       r->wait();
119
120       socket_->addListener(&evb, &acceptors_[i]);
121       threads_.emplace_back(std::move(t));
122       ++i;
123     }
124
125     socket_->listen();
126   }
127
128   folly::SocketAddress address() const {
129     return socket_->address();
130   }
131
132   void shutdown() {
133     CHECK(evb_->isInEventBaseThread());
134     socket_->close();
135     socket_.reset();
136
137     for (auto& evb: evbs_) {
138       evb.terminateLoopSoon();
139     }
140
141     for (auto& t: threads_) {
142       t.join();
143     }
144   }
145
146  private:
147   EventBase* const evb_{nullptr};
148   const folly::SocketAddress addr_;
149
150   std::unique_ptr<AsyncUDPServerSocket> socket_;
151   std::vector<std::thread> threads_;
152   std::vector<folly::EventBase> evbs_;
153   std::vector<UDPAcceptor> acceptors_;
154 };
155
156 class UDPClient
157     : private AsyncUDPSocket::ReadCallback,
158       private AsyncTimeout {
159  public:
160   explicit UDPClient(EventBase* evb)
161       : AsyncTimeout(evb),
162         evb_(evb) {
163   }
164
165   void start(const folly::SocketAddress& server, int n) {
166     CHECK(evb_->isInEventBaseThread());
167
168     server_ = server;
169     socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
170
171     try {
172       socket_->bind(folly::SocketAddress("127.0.0.1", 0));
173       VLOG(4) << "Client bound to " << socket_->address().describe();
174     } catch (const std::exception& ex) {
175       LOG(FATAL) << ex.what();
176     }
177
178     socket_->resumeRead(this);
179
180     n_ = n;
181
182     // Start playing ping pong
183     sendPing();
184   }
185
186   void shutdown() {
187     CHECK(evb_->isInEventBaseThread());
188     socket_->pauseRead();
189     socket_->close();
190     socket_.reset();
191     evb_->terminateLoopSoon();
192   }
193
194   void sendPing() {
195     if (n_ == 0) {
196       shutdown();
197       return;
198     }
199
200     --n_;
201     scheduleTimeout(5);
202     socket_->write(
203         server_,
204         folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
205   }
206
207   void getReadBuffer(void** buf, size_t* len) noexcept override {
208     *buf = buf_;
209     *len = 1024;
210   }
211
212   void onDataAvailable(const folly::SocketAddress& client,
213                        size_t len,
214                        bool truncated) noexcept override {
215     VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
216               << client.describe() << " - " << std::string(buf_, len);
217     VLOG(4) << n_ << " left";
218
219     ++pongRecvd_;
220
221     sendPing();
222   }
223
224   void onReadError(const folly::AsyncSocketException& ex) noexcept override {
225     VLOG(4) << ex.what();
226
227     // Start listening for next PONG
228     socket_->resumeRead(this);
229   }
230
231   void onReadClosed() noexcept override {
232     CHECK(false) << "We unregister reads before closing";
233   }
234
235   void timeoutExpired() noexcept override {
236     VLOG(4) << "Timeout expired";
237     sendPing();
238   }
239
240   int pongRecvd() const {
241     return pongRecvd_;
242   }
243
244  private:
245   EventBase* const evb_{nullptr};
246
247   folly::SocketAddress server_;
248   std::unique_ptr<AsyncUDPSocket> socket_;
249
250   int pongRecvd_{0};
251
252   int n_{0};
253   char buf_[1024];
254 };
255
256 TEST(AsyncSocketTest, PingPong) {
257   folly::EventBase sevb;
258   UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
259   boost::barrier barrier(2);
260
261   // Start event loop in a separate thread
262   auto serverThread = std::thread([&sevb] () {
263     sevb.loopForever();
264   });
265
266   // Wait for event loop to start
267   sevb.runInEventBaseThread([&] () { barrier.wait(); });
268   barrier.wait();
269
270   // Start the server
271   sevb.runInEventBaseThread([&] () { server.start(); barrier.wait(); });
272   barrier.wait();
273
274   folly::EventBase cevb;
275   UDPClient client(&cevb);
276
277   // Start event loop in a separate thread
278   auto clientThread = std::thread([&cevb] () {
279     cevb.loopForever();
280   });
281
282   // Wait for event loop to start
283   cevb.runInEventBaseThread([&] () { barrier.wait(); });
284   barrier.wait();
285
286   // Send ping
287   cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
288
289   // Wait for client to finish
290   clientThread.join();
291
292   // Check that some PING/PONGS were exchanged. Out of 1000 transactions
293   // at least 1 should succeed
294   CHECK_GT(client.pongRecvd(), 0);
295
296   // Shutdown server
297   sevb.runInEventBaseThread([&] () {
298     server.shutdown();
299     sevb.terminateLoopSoon();
300   });
301
302   // Wait for server thread to joib
303   serverThread.join();
304 }