2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/Handler.h"
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
25 using namespace folly::wangle;
26 using namespace folly;
28 typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
30 typedef ServerBootstrap<BytesPipeline> TestServer;
31 typedef ClientBootstrap<BytesPipeline> TestClient;
33 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
35 std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
36 newPipeline(std::shared_ptr<AsyncSocket> sock) override {
37 // We probably aren't connected immedately, check after a small delay
38 EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
40 CHECK(sock->readable());
46 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
48 std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
49 newPipeline(std::shared_ptr<AsyncSocket> sock) override {
52 return std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>(
55 std::atomic<int> pipelines{0};
58 class TestAcceptor : public Acceptor {
61 TestAcceptor() : Acceptor(ServerSocketConfig()) {
62 Acceptor::init(nullptr, &base_);
64 void onNewConnection(AsyncSocket::UniquePtr sock,
65 const folly::SocketAddress* address,
66 const std::string& nextProtocolName,
67 const TransportInfo& tinfo) override {}
70 class TestAcceptorFactory : public AcceptorFactory {
72 std::shared_ptr<Acceptor> newAcceptor(EventBase* base) override {
73 return std::make_shared<TestAcceptor>();
77 TEST(Bootstrap, Basic) {
82 TEST(Bootstrap, ServerWithPipeline) {
84 server.childPipeline(std::make_shared<TestPipelineFactory>());
89 TEST(Bootstrap, ServerWithChildHandler) {
91 server.childHandler(std::make_shared<TestAcceptorFactory>());
96 TEST(Bootstrap, ClientServerTest) {
98 auto factory = std::make_shared<TestPipelineFactory>();
99 server.childPipeline(factory);
101 auto base = EventBaseManager::get()->getEventBase();
103 SocketAddress address;
104 server.getSockets()[0]->getAddress(&address);
107 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
108 client.connect(address);
112 CHECK(factory->pipelines == 1);
115 TEST(Bootstrap, ClientConnectionManagerTest) {
116 // Create a single IO thread, and verify that
117 // client connections are pooled properly
120 auto factory = std::make_shared<TestPipelineFactory>();
121 server.childPipeline(factory);
122 server.group(std::make_shared<IOThreadPoolExecutor>(1));
124 auto base = EventBaseManager::get()->getEventBase();
126 SocketAddress address;
127 server.getSockets()[0]->getAddress(&address);
130 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
132 client.connect(address);
135 client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
136 client2.connect(address);
141 CHECK(factory->pipelines == 2);
144 TEST(Bootstrap, ServerAcceptGroupTest) {
145 // Verify that server is using the accept IO group
148 auto factory = std::make_shared<TestPipelineFactory>();
149 server.childPipeline(factory);
150 server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
153 SocketAddress address;
154 server.getSockets()[0]->getAddress(&address);
156 boost::barrier barrier(2);
157 auto thread = std::thread([&](){
159 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
160 client.connect(address);
161 EventBaseManager::get()->getEventBase()->loop();
168 CHECK(factory->pipelines == 1);
171 TEST(Bootstrap, ServerAcceptGroup2Test) {
172 // Verify that server is using the accept IO group
174 // Check if reuse port is supported, if not, don't run this test
177 auto serverSocket = AsyncServerSocket::newSocket(&base);
178 serverSocket->bind(0);
179 serverSocket->listen(0);
180 serverSocket->startAccepting();
181 serverSocket->setReusePortEnabled(true);
182 serverSocket->stopAccepting();
184 LOG(INFO) << "Reuse port probably not supported";
189 auto factory = std::make_shared<TestPipelineFactory>();
190 server.childPipeline(factory);
191 server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
194 SocketAddress address;
195 server.getSockets()[0]->getAddress(&address);
198 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
200 client.connect(address);
201 EventBaseManager::get()->getEventBase()->loop();
205 CHECK(factory->pipelines == 1);
208 TEST(Bootstrap, SharedThreadPool) {
209 // Check if reuse port is supported, if not, don't run this test
212 auto serverSocket = AsyncServerSocket::newSocket(&base);
213 serverSocket->bind(0);
214 serverSocket->listen(0);
215 serverSocket->startAccepting();
216 serverSocket->setReusePortEnabled(true);
217 serverSocket->stopAccepting();
219 LOG(INFO) << "Reuse port probably not supported";
223 auto pool = std::make_shared<IOThreadPoolExecutor>(2);
226 auto factory = std::make_shared<TestPipelineFactory>();
227 server.childPipeline(factory);
228 server.group(pool, pool);
232 SocketAddress address;
233 server.getSockets()[0]->getAddress(&address);
236 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
237 client.connect(address);
240 client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
241 client2.connect(address);
244 client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
245 client3.connect(address);
248 client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
249 client4.connect(address);
252 client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
253 client5.connect(address);
255 EventBaseManager::get()->getEventBase()->loop();
258 CHECK(factory->pipelines == 5);
261 TEST(Bootstrap, ExistingSocket) {
263 auto factory = std::make_shared<TestPipelineFactory>();
264 server.childPipeline(factory);
265 folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
266 server.bind(std::move(socket));
269 std::atomic<int> connections{0};
271 class TestHandlerPipeline : public InboundHandler<void*> {
273 void read(Context* ctx, void* conn) override {
275 return ctx->fireRead(conn);
279 template <typename HandlerPipeline>
280 class TestHandlerPipelineFactory
281 : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
283 std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
284 folly::DelayedDestruction::Destructor>
285 newPipeline(std::shared_ptr<AsyncSocket>) override {
287 std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
288 folly::DelayedDestruction::Destructor> pipeline(
289 new ServerBootstrap<BytesPipeline>::AcceptPipeline);
290 pipeline->addBack(HandlerPipeline());
295 TEST(Bootstrap, LoadBalanceHandler) {
297 auto factory = std::make_shared<TestPipelineFactory>();
298 server.childPipeline(factory);
300 auto pipelinefactory =
301 std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
302 server.pipeline(pipelinefactory);
304 auto base = EventBaseManager::get()->getEventBase();
306 SocketAddress address;
307 server.getSockets()[0]->getAddress(&address);
310 client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
311 client.connect(address);
315 CHECK(factory->pipelines == 1);
316 CHECK(connections == 1);
319 class TestUDPPipeline : public InboundHandler<void*> {
321 void read(Context* ctx, void* conn) override { connections++; }
324 TEST(Bootstrap, UDP) {
326 auto factory = std::make_shared<TestPipelineFactory>();
327 auto pipelinefactory =
328 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
329 server.pipeline(pipelinefactory);
330 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
334 TEST(Bootstrap, UDPClientServerTest) {
338 auto factory = std::make_shared<TestPipelineFactory>();
339 auto pipelinefactory =
340 std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
341 server.pipeline(pipelinefactory);
342 server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
345 auto base = EventBaseManager::get()->getEventBase();
347 SocketAddress address;
348 server.getSockets()[0]->getAddress(&address);
350 SocketAddress localhost("::1", 0);
351 AsyncUDPSocket client(base);
352 client.bind(localhost);
353 auto data = IOBuf::create(1);
355 *(data->writableData()) = 'a';
356 client.write(address, std::move(data));
360 CHECK(connections == 1);