Bump version to 52:0
[folly.git] / folly / wangle / bootstrap / BootstrapTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/Handler.h"
20
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
24
25 using namespace folly::wangle;
26 using namespace folly;
27
28 typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
29
30 typedef ServerBootstrap<BytesPipeline> TestServer;
31 typedef ClientBootstrap<BytesPipeline> TestClient;
32
33 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
34  public:
35   std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
36   newPipeline(std::shared_ptr<AsyncSocket> sock) override {
37     // We probably aren't connected immedately, check after a small delay
38     EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
39       CHECK(sock->good());
40       CHECK(sock->readable());
41     }, 100);
42     return nullptr;
43   }
44 };
45
46 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
47  public:
48   std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
49   newPipeline(std::shared_ptr<AsyncSocket> sock) override {
50
51     pipelines++;
52     return std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>(
53       new BytesPipeline());
54   }
55   std::atomic<int> pipelines{0};
56 };
57
58 class TestAcceptor : public Acceptor {
59 EventBase base_;
60  public:
61   TestAcceptor() : Acceptor(ServerSocketConfig()) {
62     Acceptor::init(nullptr, &base_);
63   }
64   void onNewConnection(AsyncSocket::UniquePtr sock,
65                        const folly::SocketAddress* address,
66                        const std::string& nextProtocolName,
67                        const TransportInfo& tinfo) override {}
68 };
69
70 class TestAcceptorFactory : public AcceptorFactory {
71  public:
72   std::shared_ptr<Acceptor> newAcceptor(EventBase* base) override {
73     return std::make_shared<TestAcceptor>();
74   }
75 };
76
77 TEST(Bootstrap, Basic) {
78   TestServer server;
79   TestClient client;
80 }
81
82 TEST(Bootstrap, ServerWithPipeline) {
83   TestServer server;
84   server.childPipeline(std::make_shared<TestPipelineFactory>());
85   server.bind(0);
86   server.stop();
87 }
88
89 TEST(Bootstrap, ServerWithChildHandler) {
90   TestServer server;
91   server.childHandler(std::make_shared<TestAcceptorFactory>());
92   server.bind(0);
93   server.stop();
94 }
95
96 TEST(Bootstrap, ClientServerTest) {
97   TestServer server;
98   auto factory = std::make_shared<TestPipelineFactory>();
99   server.childPipeline(factory);
100   server.bind(0);
101   auto base = EventBaseManager::get()->getEventBase();
102
103   SocketAddress address;
104   server.getSockets()[0]->getAddress(&address);
105
106   TestClient client;
107   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
108   client.connect(address);
109   base->loop();
110   server.stop();
111
112   CHECK(factory->pipelines == 1);
113 }
114
115 TEST(Bootstrap, ClientConnectionManagerTest) {
116   // Create a single IO thread, and verify that
117   // client connections are pooled properly
118
119   TestServer server;
120   auto factory = std::make_shared<TestPipelineFactory>();
121   server.childPipeline(factory);
122   server.group(std::make_shared<IOThreadPoolExecutor>(1));
123   server.bind(0);
124   auto base = EventBaseManager::get()->getEventBase();
125
126   SocketAddress address;
127   server.getSockets()[0]->getAddress(&address);
128
129   TestClient client;
130   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
131
132   client.connect(address);
133
134   TestClient client2;
135   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
136   client2.connect(address);
137
138   base->loop();
139   server.stop();
140
141   CHECK(factory->pipelines == 2);
142 }
143
144 TEST(Bootstrap, ServerAcceptGroupTest) {
145   // Verify that server is using the accept IO group
146
147   TestServer server;
148   auto factory = std::make_shared<TestPipelineFactory>();
149   server.childPipeline(factory);
150   server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
151   server.bind(0);
152
153   SocketAddress address;
154   server.getSockets()[0]->getAddress(&address);
155
156   boost::barrier barrier(2);
157   auto thread = std::thread([&](){
158     TestClient client;
159     client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
160     client.connect(address);
161     EventBaseManager::get()->getEventBase()->loop();
162     barrier.wait();
163   });
164   barrier.wait();
165   server.stop();
166   thread.join();
167
168   CHECK(factory->pipelines == 1);
169 }
170
171 TEST(Bootstrap, ServerAcceptGroup2Test) {
172   // Verify that server is using the accept IO group
173
174   // Check if reuse port is supported, if not, don't run this test
175   try {
176     EventBase base;
177     auto serverSocket = AsyncServerSocket::newSocket(&base);
178     serverSocket->bind(0);
179     serverSocket->listen(0);
180     serverSocket->startAccepting();
181     serverSocket->setReusePortEnabled(true);
182     serverSocket->stopAccepting();
183   } catch(...) {
184     LOG(INFO) << "Reuse port probably not supported";
185     return;
186   }
187
188   TestServer server;
189   auto factory = std::make_shared<TestPipelineFactory>();
190   server.childPipeline(factory);
191   server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
192   server.bind(0);
193
194   SocketAddress address;
195   server.getSockets()[0]->getAddress(&address);
196
197   TestClient client;
198   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
199
200   client.connect(address);
201   EventBaseManager::get()->getEventBase()->loop();
202
203   server.stop();
204
205   CHECK(factory->pipelines == 1);
206 }
207
208 TEST(Bootstrap, SharedThreadPool) {
209   // Check if reuse port is supported, if not, don't run this test
210   try {
211     EventBase base;
212     auto serverSocket = AsyncServerSocket::newSocket(&base);
213     serverSocket->bind(0);
214     serverSocket->listen(0);
215     serverSocket->startAccepting();
216     serverSocket->setReusePortEnabled(true);
217     serverSocket->stopAccepting();
218   } catch(...) {
219     LOG(INFO) << "Reuse port probably not supported";
220     return;
221   }
222
223   auto pool = std::make_shared<IOThreadPoolExecutor>(2);
224
225   TestServer server;
226   auto factory = std::make_shared<TestPipelineFactory>();
227   server.childPipeline(factory);
228   server.group(pool, pool);
229
230   server.bind(0);
231
232   SocketAddress address;
233   server.getSockets()[0]->getAddress(&address);
234
235   TestClient client;
236   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
237   client.connect(address);
238
239   TestClient client2;
240   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
241   client2.connect(address);
242
243   TestClient client3;
244   client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
245   client3.connect(address);
246
247   TestClient client4;
248   client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
249   client4.connect(address);
250
251   TestClient client5;
252   client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
253   client5.connect(address);
254
255   EventBaseManager::get()->getEventBase()->loop();
256
257   server.stop();
258   CHECK(factory->pipelines == 5);
259 }
260
261 TEST(Bootstrap, ExistingSocket) {
262   TestServer server;
263   auto factory = std::make_shared<TestPipelineFactory>();
264   server.childPipeline(factory);
265   folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
266   server.bind(std::move(socket));
267 }
268
269 std::atomic<int> connections{0};
270
271 class TestHandlerPipeline : public InboundHandler<void*> {
272  public:
273   void read(Context* ctx, void* conn) override {
274     connections++;
275     return ctx->fireRead(conn);
276   }
277 };
278
279 template <typename HandlerPipeline>
280 class TestHandlerPipelineFactory
281     : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
282  public:
283   std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
284                   folly::DelayedDestruction::Destructor>
285       newPipeline(std::shared_ptr<AsyncSocket>) override {
286
287     std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
288                     folly::DelayedDestruction::Destructor> pipeline(
289                       new ServerBootstrap<BytesPipeline>::AcceptPipeline);
290     pipeline->addBack(HandlerPipeline());
291     return pipeline;
292   }
293 };
294
295 TEST(Bootstrap, LoadBalanceHandler) {
296   TestServer server;
297   auto factory = std::make_shared<TestPipelineFactory>();
298   server.childPipeline(factory);
299
300   auto pipelinefactory =
301     std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
302   server.pipeline(pipelinefactory);
303   server.bind(0);
304   auto base = EventBaseManager::get()->getEventBase();
305
306   SocketAddress address;
307   server.getSockets()[0]->getAddress(&address);
308
309   TestClient client;
310   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
311   client.connect(address);
312   base->loop();
313   server.stop();
314
315   CHECK(factory->pipelines == 1);
316   CHECK(connections == 1);
317 }
318
319 class TestUDPPipeline : public InboundHandler<void*> {
320  public:
321   void read(Context* ctx, void* conn) override { connections++; }
322 };
323
324 TEST(Bootstrap, UDP) {
325   TestServer server;
326   auto factory = std::make_shared<TestPipelineFactory>();
327   auto pipelinefactory =
328     std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
329   server.pipeline(pipelinefactory);
330   server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
331   server.bind(0);
332 }
333
334 TEST(Bootstrap, UDPClientServerTest) {
335   connections = 0;
336
337   TestServer server;
338   auto factory = std::make_shared<TestPipelineFactory>();
339   auto pipelinefactory =
340     std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
341   server.pipeline(pipelinefactory);
342   server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
343   server.bind(0);
344
345   auto base = EventBaseManager::get()->getEventBase();
346
347   SocketAddress address;
348   server.getSockets()[0]->getAddress(&address);
349
350   SocketAddress localhost("::1", 0);
351   AsyncUDPSocket client(base);
352   client.bind(localhost);
353   auto data = IOBuf::create(1);
354   data->append(1);
355   *(data->writableData()) = 'a';
356   client.write(address, std::move(data));
357   base->loop();
358   server.stop();
359
360   CHECK(connections == 1);
361 }