Fix RequestContext held too long issue in EventBase
[folly.git] / folly / io / async / test / AsyncUDPSocketTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <thread>
18
19 #include <folly/Conv.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBuf.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/AsyncUDPServerSocket.h>
24 #include <folly/io/async/AsyncUDPSocket.h>
25 #include <folly/io/async/EventBase.h>
26 #include <folly/portability/GMock.h>
27 #include <folly/portability/GTest.h>
28
29 using folly::AsyncUDPSocket;
30 using folly::AsyncUDPServerSocket;
31 using folly::AsyncTimeout;
32 using folly::EventBase;
33 using folly::SocketAddress;
34 using folly::IOBuf;
35 using namespace testing;
36
37 class UDPAcceptor
38     : public AsyncUDPServerSocket::Callback {
39  public:
40   UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
41   }
42
43   void onListenStarted() noexcept override {}
44
45   void onListenStopped() noexcept override {}
46
47   void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> /* socket */,
48                        const folly::SocketAddress& client,
49                        std::unique_ptr<folly::IOBuf> data,
50                        bool truncated) noexcept override {
51
52     lastClient_ = client;
53     lastMsg_ = data->moveToFbString().toStdString();
54
55     auto len = data->computeChainDataLength();
56     VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
57             << "(trun:" << truncated << ") from " << client.describe()
58             << " - " << lastMsg_;
59
60     sendPong();
61   }
62
63   void sendPong() noexcept {
64     try {
65       AsyncUDPSocket socket(evb_);
66       socket.bind(folly::SocketAddress("127.0.0.1", 0));
67       socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
68     } catch (const std::exception& ex) {
69       VLOG(4) << "Failed to send PONG " << ex.what();
70     }
71   }
72
73  private:
74   EventBase* const evb_{nullptr};
75   const int n_{-1};
76
77   folly::SocketAddress lastClient_;
78   std::string lastMsg_;
79 };
80
81 class UDPServer {
82  public:
83   UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
84       : evb_(evb), addr_(addr), evbs_(n) {
85   }
86
87   void start() {
88     CHECK(evb_->isInEventBaseThread());
89
90     socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
91
92     try {
93       socket_->bind(addr_);
94       VLOG(4) << "Server listening on " << socket_->address().describe();
95     } catch (const std::exception& ex) {
96       LOG(FATAL) << ex.what();
97     }
98
99     acceptors_.reserve(evbs_.size());
100     threads_.reserve(evbs_.size());
101
102     // Add numWorkers thread
103     int i = 0;
104     for (auto& evb: evbs_) {
105       acceptors_.emplace_back(&evb, i);
106
107       std::thread t([&] () {
108         evb.loopForever();
109       });
110
111       evb.waitUntilRunning();
112
113       socket_->addListener(&evb, &acceptors_[i]);
114       threads_.emplace_back(std::move(t));
115       ++i;
116     }
117
118     socket_->listen();
119   }
120
121   folly::SocketAddress address() const {
122     return socket_->address();
123   }
124
125   void shutdown() {
126     CHECK(evb_->isInEventBaseThread());
127     socket_->close();
128     socket_.reset();
129
130     for (auto& evb: evbs_) {
131       evb.terminateLoopSoon();
132     }
133
134     for (auto& t: threads_) {
135       t.join();
136     }
137   }
138
139  private:
140   EventBase* const evb_{nullptr};
141   const folly::SocketAddress addr_;
142
143   std::unique_ptr<AsyncUDPServerSocket> socket_;
144   std::vector<std::thread> threads_;
145   std::vector<folly::EventBase> evbs_;
146   std::vector<UDPAcceptor> acceptors_;
147 };
148
149 class UDPClient
150     : private AsyncUDPSocket::ReadCallback,
151       private AsyncTimeout {
152  public:
153   explicit UDPClient(EventBase* evb)
154       : AsyncTimeout(evb),
155         evb_(evb) {
156   }
157
158   void start(const folly::SocketAddress& server, int n) {
159     CHECK(evb_->isInEventBaseThread());
160
161     server_ = server;
162     socket_ = std::make_unique<AsyncUDPSocket>(evb_);
163
164     try {
165       socket_->bind(folly::SocketAddress("127.0.0.1", 0));
166       VLOG(4) << "Client bound to " << socket_->address().describe();
167     } catch (const std::exception& ex) {
168       LOG(FATAL) << ex.what();
169     }
170
171     socket_->resumeRead(this);
172
173     n_ = n;
174
175     // Start playing ping pong
176     sendPing();
177   }
178
179   void shutdown() {
180     CHECK(evb_->isInEventBaseThread());
181     socket_->pauseRead();
182     socket_->close();
183     socket_.reset();
184     evb_->terminateLoopSoon();
185   }
186
187   void sendPing() {
188     if (n_ == 0) {
189       shutdown();
190       return;
191     }
192
193     --n_;
194     scheduleTimeout(5);
195     socket_->write(
196         server_,
197         folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
198   }
199
200   void getReadBuffer(void** buf, size_t* len) noexcept override {
201     *buf = buf_;
202     *len = 1024;
203   }
204
205   void onDataAvailable(const folly::SocketAddress& client,
206                        size_t len,
207                        bool truncated) noexcept override {
208     VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
209               << client.describe() << " - " << std::string(buf_, len);
210     VLOG(4) << n_ << " left";
211
212     ++pongRecvd_;
213
214     sendPing();
215   }
216
217   void onReadError(const folly::AsyncSocketException& ex) noexcept override {
218     VLOG(4) << ex.what();
219
220     // Start listening for next PONG
221     socket_->resumeRead(this);
222   }
223
224   void onReadClosed() noexcept override {
225     CHECK(false) << "We unregister reads before closing";
226   }
227
228   void timeoutExpired() noexcept override {
229     VLOG(4) << "Timeout expired";
230     sendPing();
231   }
232
233   int pongRecvd() const {
234     return pongRecvd_;
235   }
236
237  private:
238   EventBase* const evb_{nullptr};
239
240   folly::SocketAddress server_;
241   std::unique_ptr<AsyncUDPSocket> socket_;
242
243   int pongRecvd_{0};
244
245   int n_{0};
246   char buf_[1024];
247 };
248
249 TEST(AsyncSocketTest, PingPong) {
250   folly::EventBase sevb;
251   UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
252
253   // Start event loop in a separate thread
254   auto serverThread = std::thread([&sevb] () {
255     sevb.loopForever();
256   });
257
258   // Wait for event loop to start
259   sevb.waitUntilRunning();
260
261   // Start the server
262   sevb.runInEventBaseThreadAndWait([&]() { server.start(); });
263
264   folly::EventBase cevb;
265   UDPClient client(&cevb);
266
267   // Start event loop in a separate thread
268   auto clientThread = std::thread([&cevb] () {
269     cevb.loopForever();
270   });
271
272   // Wait for event loop to start
273   cevb.waitUntilRunning();
274
275   // Send ping
276   cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
277
278   // Wait for client to finish
279   clientThread.join();
280
281   // Check that some PING/PONGS were exchanged. Out of 1000 transactions
282   // at least 1 should succeed
283   CHECK_GT(client.pongRecvd(), 0);
284
285   // Shutdown server
286   sevb.runInEventBaseThread([&] () {
287     server.shutdown();
288     sevb.terminateLoopSoon();
289   });
290
291   // Wait for server thread to joib
292   serverThread.join();
293 }
294
295 class TestAsyncUDPSocket : public AsyncUDPSocket {
296  public:
297   explicit TestAsyncUDPSocket(EventBase* evb) : AsyncUDPSocket(evb) {}
298
299   MOCK_METHOD3(sendmsg, ssize_t(int, const struct msghdr*, int));
300 };