feadc49ca2ddec8655e4ef828ff022241635d407
[folly.git] / folly / io / async / test / AsyncUDPSocketTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18 #include <folly/io/async/AsyncUDPServerSocket.h>
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/SocketAddress.h>
22
23 #include <boost/thread/barrier.hpp>
24
25 #include <folly/io/IOBuf.h>
26
27 #include <thread>
28
29 #include <gtest/gtest.h>
30
31 using folly::AsyncUDPSocket;
32 using folly::AsyncUDPServerSocket;
33 using folly::AsyncTimeout;
34 using folly::EventBase;
35 using folly::SocketAddress;
36 using folly::IOBuf;
37
38 class UDPAcceptor
39     : public AsyncUDPServerSocket::Callback {
40  public:
41   UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
42   }
43
44   void onListenStarted() noexcept {
45   }
46
47   void onListenStopped() noexcept {
48   }
49
50   void onDataAvailable(const folly::SocketAddress& client,
51                        std::unique_ptr<folly::IOBuf> data,
52                        bool truncated) noexcept {
53
54     lastClient_ = client;
55     lastMsg_ = data->moveToFbString().toStdString();
56
57     auto len = data->computeChainDataLength();
58     VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
59             << "(trun:" << truncated << ") from " << client.describe()
60             << " - " << lastMsg_;
61
62     sendPong();
63   }
64
65   void sendPong() noexcept {
66     try {
67       AsyncUDPSocket socket(evb_);
68       socket.bind(folly::SocketAddress("127.0.0.1", 0));
69       socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
70     } catch (const std::exception& ex) {
71       VLOG(4) << "Failed to send PONG " << ex.what();
72     }
73   }
74
75  private:
76   EventBase* const evb_{nullptr};
77   const int n_{-1};
78
79   folly::SocketAddress lastClient_;
80   std::string lastMsg_;
81 };
82
83 class UDPServer {
84  public:
85   UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
86       : evb_(evb), addr_(addr), evbs_(n) {
87   }
88
89   void start() {
90     CHECK(evb_->isInEventBaseThread());
91
92     socket_ = folly::make_unique<AsyncUDPServerSocket>(
93         evb_,
94         1500);
95
96     try {
97       socket_->bind(addr_);
98       VLOG(4) << "Server listening on " << socket_->address().describe();
99     } catch (const std::exception& ex) {
100       LOG(FATAL) << ex.what();
101     }
102
103     acceptors_.reserve(evbs_.size());
104     threads_.reserve(evbs_.size());
105
106     // Add numWorkers thread
107     int i = 0;
108     for (auto& evb: evbs_) {
109       acceptors_.emplace_back(&evb, i);
110
111       std::thread t([&] () {
112         evb.loopForever();
113       });
114
115       auto r = std::make_shared<boost::barrier>(2);
116       evb.runInEventBaseThread([r] () {
117         r->wait();
118       });
119       r->wait();
120
121       socket_->addListener(&evb, &acceptors_[i]);
122       threads_.emplace_back(std::move(t));
123       ++i;
124     }
125
126     socket_->listen();
127   }
128
129   folly::SocketAddress address() const {
130     return socket_->address();
131   }
132
133   void shutdown() {
134     CHECK(evb_->isInEventBaseThread());
135     socket_->close();
136     socket_.reset();
137
138     for (auto& evb: evbs_) {
139       evb.terminateLoopSoon();
140     }
141
142     for (auto& t: threads_) {
143       t.join();
144     }
145   }
146
147  private:
148   EventBase* const evb_{nullptr};
149   const folly::SocketAddress addr_;
150
151   std::unique_ptr<AsyncUDPServerSocket> socket_;
152   std::vector<std::thread> threads_;
153   std::vector<folly::EventBase> evbs_;
154   std::vector<UDPAcceptor> acceptors_;
155 };
156
157 class UDPClient
158     : private AsyncUDPSocket::ReadCallback,
159       private AsyncTimeout {
160  public:
161   explicit UDPClient(EventBase* evb)
162       : AsyncTimeout(evb),
163         evb_(evb) {
164   }
165
166   void start(const folly::SocketAddress& server, int n) {
167     CHECK(evb_->isInEventBaseThread());
168
169     server_ = server;
170     socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
171
172     try {
173       socket_->bind(folly::SocketAddress("127.0.0.1", 0));
174       VLOG(4) << "Client bound to " << socket_->address().describe();
175     } catch (const std::exception& ex) {
176       LOG(FATAL) << ex.what();
177     }
178
179     socket_->resumeRead(this);
180
181     n_ = n;
182
183     // Start playing ping pong
184     sendPing();
185   }
186
187   void shutdown() {
188     CHECK(evb_->isInEventBaseThread());
189     socket_->pauseRead();
190     socket_->close();
191     socket_.reset();
192     evb_->terminateLoopSoon();
193   }
194
195   void sendPing() {
196     if (n_ == 0) {
197       shutdown();
198       return;
199     }
200
201     --n_;
202     scheduleTimeout(5);
203     socket_->write(
204         server_,
205         folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
206   }
207
208   void getReadBuffer(void** buf, size_t* len) noexcept {
209     *buf = buf_;
210     *len = 1024;
211   }
212
213   void onDataAvailable(const folly::SocketAddress& client,
214                        size_t len,
215                        bool truncated) noexcept {
216     VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
217               << client.describe() << " - " << std::string(buf_, len);
218     VLOG(4) << n_ << " left";
219
220     ++pongRecvd_;
221
222     sendPing();
223   }
224
225   void onReadError(const folly::AsyncSocketException& ex) noexcept {
226     VLOG(4) << ex.what();
227
228     // Start listening for next PONG
229     socket_->resumeRead(this);
230   }
231
232   void onReadClosed() noexcept {
233     CHECK(false) << "We unregister reads before closing";
234   }
235
236   void timeoutExpired() noexcept {
237     VLOG(4) << "Timeout expired";
238     sendPing();
239   }
240
241   int pongRecvd() const {
242     return pongRecvd_;
243   }
244
245  private:
246   EventBase* const evb_{nullptr};
247
248   folly::SocketAddress server_;
249   std::unique_ptr<AsyncUDPSocket> socket_;
250
251   int pongRecvd_{0};
252
253   int n_{0};
254   char buf_[1024];
255 };
256
257 TEST(AsyncSocketTest, PingPong) {
258   folly::EventBase sevb;
259   UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
260   boost::barrier barrier(2);
261
262   // Start event loop in a separate thread
263   auto serverThread = std::thread([&sevb] () {
264     sevb.loopForever();
265   });
266
267   // Wait for event loop to start
268   sevb.runInEventBaseThread([&] () { barrier.wait(); });
269   barrier.wait();
270
271   // Start the server
272   sevb.runInEventBaseThread([&] () { server.start(); barrier.wait(); });
273   barrier.wait();
274
275   folly::EventBase cevb;
276   UDPClient client(&cevb);
277
278   // Start event loop in a separate thread
279   auto clientThread = std::thread([&cevb] () {
280     cevb.loopForever();
281   });
282
283   // Wait for event loop to start
284   cevb.runInEventBaseThread([&] () { barrier.wait(); });
285   barrier.wait();
286
287   // Send ping
288   cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
289
290   // Wait for client to finish
291   clientThread.join();
292
293   // Check that some PING/PONGS were exchanged. Out of 1000 transactions
294   // at least 1 should succeed
295   CHECK_GT(client.pongRecvd(), 0);
296
297   // Shutdown server
298   sevb.runInEventBaseThread([&] () {
299     server.shutdown();
300     sevb.terminateLoopSoon();
301   });
302
303   // Wait for server thread to joib
304   serverThread.join();
305 }