724afec93f24a5b693276372bee8284038bec557
[folly.git] / folly / wangle / bootstrap / BootstrapTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/Handler.h"
20
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
24
25 using namespace folly::wangle;
26 using namespace folly;
27
28 typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
29
30 typedef ServerBootstrap<BytesPipeline> TestServer;
31 typedef ClientBootstrap<BytesPipeline> TestClient;
32
33 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
34  public:
35   std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
36   newPipeline(std::shared_ptr<AsyncSocket> sock) {
37     // We probably aren't connected immedately, check after a small delay
38     EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
39       CHECK(sock->good());
40       CHECK(sock->readable());
41     }, 100);
42     return nullptr;
43   }
44 };
45
46 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
47  public:
48   std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor> newPipeline(
49     std::shared_ptr<AsyncSocket> sock) {
50
51     pipelines++;
52     return std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>(
53       new BytesPipeline());
54   }
55   std::atomic<int> pipelines{0};
56 };
57
58 class TestAcceptor : public Acceptor {
59 EventBase base_;
60  public:
61   TestAcceptor() : Acceptor(ServerSocketConfig()) {
62     Acceptor::init(nullptr, &base_);
63   }
64   void onNewConnection(
65       AsyncSocket::UniquePtr sock,
66       const folly::SocketAddress* address,
67       const std::string& nextProtocolName,
68         const TransportInfo& tinfo) {
69   }
70 };
71
72 class TestAcceptorFactory : public AcceptorFactory {
73  public:
74   std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
75     return std::make_shared<TestAcceptor>();
76   }
77 };
78
79 TEST(Bootstrap, Basic) {
80   TestServer server;
81   TestClient client;
82 }
83
84 TEST(Bootstrap, ServerWithPipeline) {
85   TestServer server;
86   server.childPipeline(std::make_shared<TestPipelineFactory>());
87   server.bind(0);
88   server.stop();
89 }
90
91 TEST(Bootstrap, ServerWithChildHandler) {
92   TestServer server;
93   server.childHandler(std::make_shared<TestAcceptorFactory>());
94   server.bind(0);
95   server.stop();
96 }
97
98 TEST(Bootstrap, ClientServerTest) {
99   TestServer server;
100   auto factory = std::make_shared<TestPipelineFactory>();
101   server.childPipeline(factory);
102   server.bind(0);
103   auto base = EventBaseManager::get()->getEventBase();
104
105   SocketAddress address;
106   server.getSockets()[0]->getAddress(&address);
107
108   TestClient client;
109   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
110   client.connect(address);
111   base->loop();
112   server.stop();
113
114   CHECK(factory->pipelines == 1);
115 }
116
117 TEST(Bootstrap, ClientConnectionManagerTest) {
118   // Create a single IO thread, and verify that
119   // client connections are pooled properly
120
121   TestServer server;
122   auto factory = std::make_shared<TestPipelineFactory>();
123   server.childPipeline(factory);
124   server.group(std::make_shared<IOThreadPoolExecutor>(1));
125   server.bind(0);
126   auto base = EventBaseManager::get()->getEventBase();
127
128   SocketAddress address;
129   server.getSockets()[0]->getAddress(&address);
130
131   TestClient client;
132   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
133
134   client.connect(address);
135
136   TestClient client2;
137   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
138   client2.connect(address);
139
140   base->loop();
141   server.stop();
142
143   CHECK(factory->pipelines == 2);
144 }
145
146 TEST(Bootstrap, ServerAcceptGroupTest) {
147   // Verify that server is using the accept IO group
148
149   TestServer server;
150   auto factory = std::make_shared<TestPipelineFactory>();
151   server.childPipeline(factory);
152   server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
153   server.bind(0);
154
155   SocketAddress address;
156   server.getSockets()[0]->getAddress(&address);
157
158   boost::barrier barrier(2);
159   auto thread = std::thread([&](){
160     TestClient client;
161     client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
162     client.connect(address);
163     EventBaseManager::get()->getEventBase()->loop();
164     barrier.wait();
165   });
166   barrier.wait();
167   server.stop();
168   thread.join();
169
170   CHECK(factory->pipelines == 1);
171 }
172
173 TEST(Bootstrap, ServerAcceptGroup2Test) {
174   // Verify that server is using the accept IO group
175
176   // Check if reuse port is supported, if not, don't run this test
177   try {
178     EventBase base;
179     auto serverSocket = AsyncServerSocket::newSocket(&base);
180     serverSocket->bind(0);
181     serverSocket->listen(0);
182     serverSocket->startAccepting();
183     serverSocket->setReusePortEnabled(true);
184     serverSocket->stopAccepting();
185   } catch(...) {
186     LOG(INFO) << "Reuse port probably not supported";
187     return;
188   }
189
190   TestServer server;
191   auto factory = std::make_shared<TestPipelineFactory>();
192   server.childPipeline(factory);
193   server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
194   server.bind(0);
195
196   SocketAddress address;
197   server.getSockets()[0]->getAddress(&address);
198
199   TestClient client;
200   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
201
202   client.connect(address);
203   EventBaseManager::get()->getEventBase()->loop();
204
205   server.stop();
206
207   CHECK(factory->pipelines == 1);
208 }
209
210 TEST(Bootstrap, SharedThreadPool) {
211   // Check if reuse port is supported, if not, don't run this test
212   try {
213     EventBase base;
214     auto serverSocket = AsyncServerSocket::newSocket(&base);
215     serverSocket->bind(0);
216     serverSocket->listen(0);
217     serverSocket->startAccepting();
218     serverSocket->setReusePortEnabled(true);
219     serverSocket->stopAccepting();
220   } catch(...) {
221     LOG(INFO) << "Reuse port probably not supported";
222     return;
223   }
224
225   auto pool = std::make_shared<IOThreadPoolExecutor>(2);
226
227   TestServer server;
228   auto factory = std::make_shared<TestPipelineFactory>();
229   server.childPipeline(factory);
230   server.group(pool, pool);
231
232   server.bind(0);
233
234   SocketAddress address;
235   server.getSockets()[0]->getAddress(&address);
236
237   TestClient client;
238   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
239   client.connect(address);
240
241   TestClient client2;
242   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
243   client2.connect(address);
244
245   TestClient client3;
246   client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
247   client3.connect(address);
248
249   TestClient client4;
250   client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
251   client4.connect(address);
252
253   TestClient client5;
254   client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
255   client5.connect(address);
256
257   EventBaseManager::get()->getEventBase()->loop();
258
259   server.stop();
260   CHECK(factory->pipelines == 5);
261 }
262
263 TEST(Bootstrap, ExistingSocket) {
264   TestServer server;
265   auto factory = std::make_shared<TestPipelineFactory>();
266   server.childPipeline(factory);
267   folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
268   server.bind(std::move(socket));
269 }
270
271 std::atomic<int> connections{0};
272
273 class TestHandlerPipeline : public InboundHandler<void*> {
274  public:
275   void read(Context* ctx, void* conn) {
276     connections++;
277     return ctx->fireRead(conn);
278   }
279 };
280
281 template <typename HandlerPipeline>
282 class TestHandlerPipelineFactory
283     : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
284  public:
285   std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
286                   folly::DelayedDestruction::Destructor>
287   newPipeline(std::shared_ptr<AsyncSocket>) {
288
289     std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
290                     folly::DelayedDestruction::Destructor> pipeline(
291                       new ServerBootstrap<BytesPipeline>::AcceptPipeline);
292     pipeline->addBack(HandlerPipeline());
293     return pipeline;
294   }
295 };
296
297 TEST(Bootstrap, LoadBalanceHandler) {
298   TestServer server;
299   auto factory = std::make_shared<TestPipelineFactory>();
300   server.childPipeline(factory);
301
302   auto pipelinefactory =
303     std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
304   server.pipeline(pipelinefactory);
305   server.bind(0);
306   auto base = EventBaseManager::get()->getEventBase();
307
308   SocketAddress address;
309   server.getSockets()[0]->getAddress(&address);
310
311   TestClient client;
312   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
313   client.connect(address);
314   base->loop();
315   server.stop();
316
317   CHECK(factory->pipelines == 1);
318   CHECK(connections == 1);
319 }
320
321 class TestUDPPipeline : public InboundHandler<void*> {
322  public:
323   void read(Context* ctx, void* conn) {
324     connections++;
325   }
326 };
327
328 TEST(Bootstrap, UDP) {
329   TestServer server;
330   auto factory = std::make_shared<TestPipelineFactory>();
331   auto pipelinefactory =
332     std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
333   server.pipeline(pipelinefactory);
334   server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
335   server.bind(0);
336 }
337
338 TEST(Bootstrap, UDPClientServerTest) {
339   connections = 0;
340
341   TestServer server;
342   auto factory = std::make_shared<TestPipelineFactory>();
343   auto pipelinefactory =
344     std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
345   server.pipeline(pipelinefactory);
346   server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
347   server.bind(0);
348
349   auto base = EventBaseManager::get()->getEventBase();
350
351   SocketAddress address;
352   server.getSockets()[0]->getAddress(&address);
353
354   SocketAddress localhost("::1", 0);
355   AsyncUDPSocket client(base);
356   client.bind(localhost);
357   auto data = IOBuf::create(1);
358   data->append(1);
359   *(data->writableData()) = 'a';
360   client.write(address, std::move(data));
361   base->loop();
362   server.stop();
363
364   CHECK(connections == 1);
365 }