Codemod folly::make_unique to std::make_unique
[folly.git] / folly / io / async / test / AsyncUDPSocketTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/async/AsyncUDPSocket.h>
18 #include <folly/io/async/AsyncUDPServerSocket.h>
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/SocketAddress.h>
22
23 #include <folly/io/IOBuf.h>
24 #include <folly/portability/GTest.h>
25
26 #include <thread>
27
28 using folly::AsyncUDPSocket;
29 using folly::AsyncUDPServerSocket;
30 using folly::AsyncTimeout;
31 using folly::EventBase;
32 using folly::SocketAddress;
33 using folly::IOBuf;
34
35 class UDPAcceptor
36     : public AsyncUDPServerSocket::Callback {
37  public:
38   UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
39   }
40
41   void onListenStarted() noexcept override {}
42
43   void onListenStopped() noexcept override {}
44
45   void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> /* socket */,
46                        const folly::SocketAddress& client,
47                        std::unique_ptr<folly::IOBuf> data,
48                        bool truncated) noexcept override {
49
50     lastClient_ = client;
51     lastMsg_ = data->moveToFbString().toStdString();
52
53     auto len = data->computeChainDataLength();
54     VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
55             << "(trun:" << truncated << ") from " << client.describe()
56             << " - " << lastMsg_;
57
58     sendPong();
59   }
60
61   void sendPong() noexcept {
62     try {
63       AsyncUDPSocket socket(evb_);
64       socket.bind(folly::SocketAddress("127.0.0.1", 0));
65       socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
66     } catch (const std::exception& ex) {
67       VLOG(4) << "Failed to send PONG " << ex.what();
68     }
69   }
70
71  private:
72   EventBase* const evb_{nullptr};
73   const int n_{-1};
74
75   folly::SocketAddress lastClient_;
76   std::string lastMsg_;
77 };
78
79 class UDPServer {
80  public:
81   UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
82       : evb_(evb), addr_(addr), evbs_(n) {
83   }
84
85   void start() {
86     CHECK(evb_->isInEventBaseThread());
87
88     socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
89
90     try {
91       socket_->bind(addr_);
92       VLOG(4) << "Server listening on " << socket_->address().describe();
93     } catch (const std::exception& ex) {
94       LOG(FATAL) << ex.what();
95     }
96
97     acceptors_.reserve(evbs_.size());
98     threads_.reserve(evbs_.size());
99
100     // Add numWorkers thread
101     int i = 0;
102     for (auto& evb: evbs_) {
103       acceptors_.emplace_back(&evb, i);
104
105       std::thread t([&] () {
106         evb.loopForever();
107       });
108
109       evb.waitUntilRunning();
110
111       socket_->addListener(&evb, &acceptors_[i]);
112       threads_.emplace_back(std::move(t));
113       ++i;
114     }
115
116     socket_->listen();
117   }
118
119   folly::SocketAddress address() const {
120     return socket_->address();
121   }
122
123   void shutdown() {
124     CHECK(evb_->isInEventBaseThread());
125     socket_->close();
126     socket_.reset();
127
128     for (auto& evb: evbs_) {
129       evb.terminateLoopSoon();
130     }
131
132     for (auto& t: threads_) {
133       t.join();
134     }
135   }
136
137  private:
138   EventBase* const evb_{nullptr};
139   const folly::SocketAddress addr_;
140
141   std::unique_ptr<AsyncUDPServerSocket> socket_;
142   std::vector<std::thread> threads_;
143   std::vector<folly::EventBase> evbs_;
144   std::vector<UDPAcceptor> acceptors_;
145 };
146
147 class UDPClient
148     : private AsyncUDPSocket::ReadCallback,
149       private AsyncTimeout {
150  public:
151   explicit UDPClient(EventBase* evb)
152       : AsyncTimeout(evb),
153         evb_(evb) {
154   }
155
156   void start(const folly::SocketAddress& server, int n) {
157     CHECK(evb_->isInEventBaseThread());
158
159     server_ = server;
160     socket_ = std::make_unique<AsyncUDPSocket>(evb_);
161
162     try {
163       socket_->bind(folly::SocketAddress("127.0.0.1", 0));
164       VLOG(4) << "Client bound to " << socket_->address().describe();
165     } catch (const std::exception& ex) {
166       LOG(FATAL) << ex.what();
167     }
168
169     socket_->resumeRead(this);
170
171     n_ = n;
172
173     // Start playing ping pong
174     sendPing();
175   }
176
177   void shutdown() {
178     CHECK(evb_->isInEventBaseThread());
179     socket_->pauseRead();
180     socket_->close();
181     socket_.reset();
182     evb_->terminateLoopSoon();
183   }
184
185   void sendPing() {
186     if (n_ == 0) {
187       shutdown();
188       return;
189     }
190
191     --n_;
192     scheduleTimeout(5);
193     socket_->write(
194         server_,
195         folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
196   }
197
198   void getReadBuffer(void** buf, size_t* len) noexcept override {
199     *buf = buf_;
200     *len = 1024;
201   }
202
203   void onDataAvailable(const folly::SocketAddress& client,
204                        size_t len,
205                        bool truncated) noexcept override {
206     VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
207               << client.describe() << " - " << std::string(buf_, len);
208     VLOG(4) << n_ << " left";
209
210     ++pongRecvd_;
211
212     sendPing();
213   }
214
215   void onReadError(const folly::AsyncSocketException& ex) noexcept override {
216     VLOG(4) << ex.what();
217
218     // Start listening for next PONG
219     socket_->resumeRead(this);
220   }
221
222   void onReadClosed() noexcept override {
223     CHECK(false) << "We unregister reads before closing";
224   }
225
226   void timeoutExpired() noexcept override {
227     VLOG(4) << "Timeout expired";
228     sendPing();
229   }
230
231   int pongRecvd() const {
232     return pongRecvd_;
233   }
234
235  private:
236   EventBase* const evb_{nullptr};
237
238   folly::SocketAddress server_;
239   std::unique_ptr<AsyncUDPSocket> socket_;
240
241   int pongRecvd_{0};
242
243   int n_{0};
244   char buf_[1024];
245 };
246
247 TEST(AsyncSocketTest, PingPong) {
248   folly::EventBase sevb;
249   UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
250
251   // Start event loop in a separate thread
252   auto serverThread = std::thread([&sevb] () {
253     sevb.loopForever();
254   });
255
256   // Wait for event loop to start
257   sevb.waitUntilRunning();
258
259   // Start the server
260   sevb.runInEventBaseThreadAndWait([&]() { server.start(); });
261
262   folly::EventBase cevb;
263   UDPClient client(&cevb);
264
265   // Start event loop in a separate thread
266   auto clientThread = std::thread([&cevb] () {
267     cevb.loopForever();
268   });
269
270   // Wait for event loop to start
271   cevb.waitUntilRunning();
272
273   // Send ping
274   cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
275
276   // Wait for client to finish
277   clientThread.join();
278
279   // Check that some PING/PONGS were exchanged. Out of 1000 transactions
280   // at least 1 should succeed
281   CHECK_GT(client.pongRecvd(), 0);
282
283   // Shutdown server
284   sevb.runInEventBaseThread([&] () {
285     server.shutdown();
286     sevb.terminateLoopSoon();
287   });
288
289   // Wait for server thread to joib
290   serverThread.join();
291 }