2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/async/AsyncUDPSocket.h>
18 #include <folly/io/async/AsyncUDPServerSocket.h>
19 #include <folly/io/async/AsyncTimeout.h>
20 #include <folly/io/async/EventBase.h>
21 #include <folly/SocketAddress.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/portability/GTest.h>
28 using folly::AsyncUDPSocket;
29 using folly::AsyncUDPServerSocket;
30 using folly::AsyncTimeout;
31 using folly::EventBase;
32 using folly::SocketAddress;
36 : public AsyncUDPServerSocket::Callback {
38 UDPAcceptor(EventBase* evb, int n): evb_(evb), n_(n) {
41 void onListenStarted() noexcept override {}
43 void onListenStopped() noexcept override {}
45 void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> /* socket */,
46 const folly::SocketAddress& client,
47 std::unique_ptr<folly::IOBuf> data,
48 bool truncated) noexcept override {
51 lastMsg_ = data->moveToFbString().toStdString();
53 auto len = data->computeChainDataLength();
54 VLOG(4) << "Worker " << n_ << " read " << len << " bytes "
55 << "(trun:" << truncated << ") from " << client.describe()
61 void sendPong() noexcept {
63 AsyncUDPSocket socket(evb_);
64 socket.bind(folly::SocketAddress("127.0.0.1", 0));
65 socket.write(lastClient_, folly::IOBuf::copyBuffer(lastMsg_));
66 } catch (const std::exception& ex) {
67 VLOG(4) << "Failed to send PONG " << ex.what();
72 EventBase* const evb_{nullptr};
75 folly::SocketAddress lastClient_;
81 UDPServer(EventBase* evb, folly::SocketAddress addr, int n)
82 : evb_(evb), addr_(addr), evbs_(n) {
86 CHECK(evb_->isInEventBaseThread());
88 socket_ = std::make_unique<AsyncUDPServerSocket>(evb_, 1500);
92 VLOG(4) << "Server listening on " << socket_->address().describe();
93 } catch (const std::exception& ex) {
94 LOG(FATAL) << ex.what();
97 acceptors_.reserve(evbs_.size());
98 threads_.reserve(evbs_.size());
100 // Add numWorkers thread
102 for (auto& evb: evbs_) {
103 acceptors_.emplace_back(&evb, i);
105 std::thread t([&] () {
109 evb.waitUntilRunning();
111 socket_->addListener(&evb, &acceptors_[i]);
112 threads_.emplace_back(std::move(t));
119 folly::SocketAddress address() const {
120 return socket_->address();
124 CHECK(evb_->isInEventBaseThread());
128 for (auto& evb: evbs_) {
129 evb.terminateLoopSoon();
132 for (auto& t: threads_) {
138 EventBase* const evb_{nullptr};
139 const folly::SocketAddress addr_;
141 std::unique_ptr<AsyncUDPServerSocket> socket_;
142 std::vector<std::thread> threads_;
143 std::vector<folly::EventBase> evbs_;
144 std::vector<UDPAcceptor> acceptors_;
148 : private AsyncUDPSocket::ReadCallback,
149 private AsyncTimeout {
151 explicit UDPClient(EventBase* evb)
156 void start(const folly::SocketAddress& server, int n) {
157 CHECK(evb_->isInEventBaseThread());
160 socket_ = std::make_unique<AsyncUDPSocket>(evb_);
163 socket_->bind(folly::SocketAddress("127.0.0.1", 0));
164 VLOG(4) << "Client bound to " << socket_->address().describe();
165 } catch (const std::exception& ex) {
166 LOG(FATAL) << ex.what();
169 socket_->resumeRead(this);
173 // Start playing ping pong
178 CHECK(evb_->isInEventBaseThread());
179 socket_->pauseRead();
182 evb_->terminateLoopSoon();
195 folly::IOBuf::copyBuffer(folly::to<std::string>("PING ", n_)));
198 void getReadBuffer(void** buf, size_t* len) noexcept override {
203 void onDataAvailable(const folly::SocketAddress& client,
205 bool truncated) noexcept override {
206 VLOG(4) << "Read " << len << " bytes (trun:" << truncated << ") from "
207 << client.describe() << " - " << std::string(buf_, len);
208 VLOG(4) << n_ << " left";
215 void onReadError(const folly::AsyncSocketException& ex) noexcept override {
216 VLOG(4) << ex.what();
218 // Start listening for next PONG
219 socket_->resumeRead(this);
222 void onReadClosed() noexcept override {
223 CHECK(false) << "We unregister reads before closing";
226 void timeoutExpired() noexcept override {
227 VLOG(4) << "Timeout expired";
231 int pongRecvd() const {
236 EventBase* const evb_{nullptr};
238 folly::SocketAddress server_;
239 std::unique_ptr<AsyncUDPSocket> socket_;
247 TEST(AsyncSocketTest, PingPong) {
248 folly::EventBase sevb;
249 UDPServer server(&sevb, folly::SocketAddress("127.0.0.1", 0), 4);
251 // Start event loop in a separate thread
252 auto serverThread = std::thread([&sevb] () {
256 // Wait for event loop to start
257 sevb.waitUntilRunning();
260 sevb.runInEventBaseThreadAndWait([&]() { server.start(); });
262 folly::EventBase cevb;
263 UDPClient client(&cevb);
265 // Start event loop in a separate thread
266 auto clientThread = std::thread([&cevb] () {
270 // Wait for event loop to start
271 cevb.waitUntilRunning();
274 cevb.runInEventBaseThread([&] () { client.start(server.address(), 1000); });
276 // Wait for client to finish
279 // Check that some PING/PONGS were exchanged. Out of 1000 transactions
280 // at least 1 should succeed
281 CHECK_GT(client.pongRecvd(), 0);
284 sevb.runInEventBaseThread([&] () {
286 sevb.terminateLoopSoon();
289 // Wait for server thread to joib