unidirectional pipelines
[folly.git] / folly / wangle / bootstrap / BootstrapTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/Handler.h"
20
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
24
25 using namespace folly::wangle;
26 using namespace folly;
27
28 typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
29
30 typedef ServerBootstrap<BytesPipeline> TestServer;
31 typedef ClientBootstrap<BytesPipeline> TestClient;
32
33 class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
34  public:
35   BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
36     CHECK(sock->good());
37
38     // We probably aren't connected immedately, check after a small delay
39     EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
40       CHECK(sock->readable());
41     }, 100);
42     return nullptr;
43   }
44 };
45
46 class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
47  public:
48   BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
49     pipelines++;
50     return new BytesPipeline();
51   }
52   std::atomic<int> pipelines{0};
53 };
54
55 class TestAcceptor : public Acceptor {
56 EventBase base_;
57  public:
58   TestAcceptor() : Acceptor(ServerSocketConfig()) {
59     Acceptor::init(nullptr, &base_);
60   }
61   void onNewConnection(
62       AsyncSocket::UniquePtr sock,
63       const folly::SocketAddress* address,
64       const std::string& nextProtocolName,
65         const TransportInfo& tinfo) {
66   }
67 };
68
69 class TestAcceptorFactory : public AcceptorFactory {
70  public:
71   std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
72     return std::make_shared<TestAcceptor>();
73   }
74 };
75
76 TEST(Bootstrap, Basic) {
77   TestServer server;
78   TestClient client;
79 }
80
81 TEST(Bootstrap, ServerWithPipeline) {
82   TestServer server;
83   server.childPipeline(std::make_shared<TestPipelineFactory>());
84   server.bind(0);
85   server.stop();
86 }
87
88 TEST(Bootstrap, ServerWithChildHandler) {
89   TestServer server;
90   server.childHandler(std::make_shared<TestAcceptorFactory>());
91   server.bind(0);
92   server.stop();
93 }
94
95 TEST(Bootstrap, ClientServerTest) {
96   TestServer server;
97   auto factory = std::make_shared<TestPipelineFactory>();
98   server.childPipeline(factory);
99   server.bind(0);
100   auto base = EventBaseManager::get()->getEventBase();
101
102   SocketAddress address;
103   server.getSockets()[0]->getAddress(&address);
104
105   TestClient client;
106   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
107   client.connect(address);
108   base->loop();
109   server.stop();
110
111   CHECK(factory->pipelines == 1);
112 }
113
114 TEST(Bootstrap, ClientConnectionManagerTest) {
115   // Create a single IO thread, and verify that
116   // client connections are pooled properly
117
118   TestServer server;
119   auto factory = std::make_shared<TestPipelineFactory>();
120   server.childPipeline(factory);
121   server.group(std::make_shared<IOThreadPoolExecutor>(1));
122   server.bind(0);
123   auto base = EventBaseManager::get()->getEventBase();
124
125   SocketAddress address;
126   server.getSockets()[0]->getAddress(&address);
127
128   TestClient client;
129   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
130
131   client.connect(address);
132
133   TestClient client2;
134   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
135   client2.connect(address);
136
137   base->loop();
138   server.stop();
139
140   CHECK(factory->pipelines == 2);
141 }
142
143 TEST(Bootstrap, ServerAcceptGroupTest) {
144   // Verify that server is using the accept IO group
145
146   TestServer server;
147   auto factory = std::make_shared<TestPipelineFactory>();
148   server.childPipeline(factory);
149   server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
150   server.bind(0);
151
152   SocketAddress address;
153   server.getSockets()[0]->getAddress(&address);
154
155   boost::barrier barrier(2);
156   auto thread = std::thread([&](){
157     TestClient client;
158     client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
159     client.connect(address);
160     EventBaseManager::get()->getEventBase()->loop();
161     barrier.wait();
162   });
163   barrier.wait();
164   server.stop();
165   thread.join();
166
167   CHECK(factory->pipelines == 1);
168 }
169
170 TEST(Bootstrap, ServerAcceptGroup2Test) {
171   // Verify that server is using the accept IO group
172
173   // Check if reuse port is supported, if not, don't run this test
174   try {
175     EventBase base;
176     auto serverSocket = AsyncServerSocket::newSocket(&base);
177     serverSocket->bind(0);
178     serverSocket->listen(0);
179     serverSocket->startAccepting();
180     serverSocket->setReusePortEnabled(true);
181     serverSocket->stopAccepting();
182   } catch(...) {
183     LOG(INFO) << "Reuse port probably not supported";
184     return;
185   }
186
187   TestServer server;
188   auto factory = std::make_shared<TestPipelineFactory>();
189   server.childPipeline(factory);
190   server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
191   server.bind(0);
192
193   SocketAddress address;
194   server.getSockets()[0]->getAddress(&address);
195
196   TestClient client;
197   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
198
199   client.connect(address);
200   EventBaseManager::get()->getEventBase()->loop();
201
202   server.stop();
203
204   CHECK(factory->pipelines == 1);
205 }
206
207 TEST(Bootstrap, SharedThreadPool) {
208   // Check if reuse port is supported, if not, don't run this test
209   try {
210     EventBase base;
211     auto serverSocket = AsyncServerSocket::newSocket(&base);
212     serverSocket->bind(0);
213     serverSocket->listen(0);
214     serverSocket->startAccepting();
215     serverSocket->setReusePortEnabled(true);
216     serverSocket->stopAccepting();
217   } catch(...) {
218     LOG(INFO) << "Reuse port probably not supported";
219     return;
220   }
221
222   auto pool = std::make_shared<IOThreadPoolExecutor>(2);
223
224   TestServer server;
225   auto factory = std::make_shared<TestPipelineFactory>();
226   server.childPipeline(factory);
227   server.group(pool, pool);
228
229   server.bind(0);
230
231   SocketAddress address;
232   server.getSockets()[0]->getAddress(&address);
233
234   TestClient client;
235   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
236   client.connect(address);
237
238   TestClient client2;
239   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
240   client2.connect(address);
241
242   TestClient client3;
243   client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
244   client3.connect(address);
245
246   TestClient client4;
247   client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
248   client4.connect(address);
249
250   TestClient client5;
251   client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
252   client5.connect(address);
253
254   EventBaseManager::get()->getEventBase()->loop();
255
256   server.stop();
257   CHECK(factory->pipelines == 5);
258 }
259
260 TEST(Bootstrap, ExistingSocket) {
261   TestServer server;
262   auto factory = std::make_shared<TestPipelineFactory>();
263   server.childPipeline(factory);
264   folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
265   server.bind(std::move(socket));
266 }
267
268 std::atomic<int> connections{0};
269
270 class TestHandlerPipeline : public InboundHandler<void*> {
271  public:
272   void read(Context* ctx, void* conn) {
273     connections++;
274     return ctx->fireRead(conn);
275   }
276 };
277
278 template <typename HandlerPipeline>
279 class TestHandlerPipelineFactory
280     : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
281  public:
282   ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
283     auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
284     pipeline->addBack(HandlerPipeline());
285     return pipeline;
286   }
287 };
288
289 TEST(Bootstrap, LoadBalanceHandler) {
290   TestServer server;
291   auto factory = std::make_shared<TestPipelineFactory>();
292   server.childPipeline(factory);
293
294   auto pipelinefactory =
295     std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
296   server.pipeline(pipelinefactory);
297   server.bind(0);
298   auto base = EventBaseManager::get()->getEventBase();
299
300   SocketAddress address;
301   server.getSockets()[0]->getAddress(&address);
302
303   TestClient client;
304   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
305   client.connect(address);
306   base->loop();
307   server.stop();
308
309   CHECK(factory->pipelines == 1);
310   CHECK(connections == 1);
311 }
312
313 class TestUDPPipeline : public InboundHandler<void*> {
314  public:
315   void read(Context* ctx, void* conn) {
316     connections++;
317   }
318 };
319
320 TEST(Bootstrap, UDP) {
321   TestServer server;
322   auto factory = std::make_shared<TestPipelineFactory>();
323   auto pipelinefactory =
324     std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
325   server.pipeline(pipelinefactory);
326   server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
327   server.bind(0);
328 }
329
330 TEST(Bootstrap, UDPClientServerTest) {
331   connections = 0;
332
333   TestServer server;
334   auto factory = std::make_shared<TestPipelineFactory>();
335   auto pipelinefactory =
336     std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
337   server.pipeline(pipelinefactory);
338   server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
339   server.bind(0);
340
341   auto base = EventBaseManager::get()->getEventBase();
342
343   SocketAddress address;
344   server.getSockets()[0]->getAddress(&address);
345
346   SocketAddress localhost("::1", 0);
347   AsyncUDPSocket client(base);
348   client.bind(localhost);
349   auto data = IOBuf::create(1);
350   data->append(1);
351   *(data->writableData()) = 'a';
352   client.write(address, std::move(data));
353   base->loop();
354   server.stop();
355
356   CHECK(connections == 1);
357 }