Finagle interfaces
[folly.git] / folly / wangle / bootstrap / BootstrapTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/ChannelHandler.h"
20
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23
24 using namespace folly::wangle;
25 using namespace folly;
26
27 typedef ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> Pipeline;
28
29 typedef ServerBootstrap<Pipeline> TestServer;
30 typedef ClientBootstrap<Pipeline> TestClient;
31
32 class TestClientPipelineFactory : public PipelineFactory<Pipeline> {
33  public:
34   Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
35     CHECK(sock->good());
36
37     // We probably aren't connected immedately, check after a small delay
38     EventBaseManager::get()->getEventBase()->runAfterDelay([sock](){
39       CHECK(sock->readable());
40     }, 100);
41     return nullptr;
42   }
43 };
44
45 class TestPipelineFactory : public PipelineFactory<Pipeline> {
46  public:
47   Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
48     pipelines++;
49     return new Pipeline();
50   }
51   std::atomic<int> pipelines{0};
52 };
53
54 TEST(Bootstrap, Basic) {
55   TestServer server;
56   TestClient client;
57 }
58
59 TEST(Bootstrap, ServerWithPipeline) {
60   TestServer server;
61   server.childPipeline(std::make_shared<TestPipelineFactory>());
62   server.bind(0);
63   server.stop();
64 }
65
66 TEST(Bootstrap, ClientServerTest) {
67   TestServer server;
68   auto factory = std::make_shared<TestPipelineFactory>();
69   server.childPipeline(factory);
70   server.bind(0);
71   auto base = EventBaseManager::get()->getEventBase();
72
73   SocketAddress address;
74   server.getSockets()[0]->getAddress(&address);
75
76   TestClient client;
77   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
78   client.connect(address);
79   base->loop();
80   server.stop();
81
82   CHECK(factory->pipelines == 1);
83 }
84
85 TEST(Bootstrap, ClientConnectionManagerTest) {
86   // Create a single IO thread, and verify that
87   // client connections are pooled properly
88
89   TestServer server;
90   auto factory = std::make_shared<TestPipelineFactory>();
91   server.childPipeline(factory);
92   server.group(std::make_shared<IOThreadPoolExecutor>(1));
93   server.bind(0);
94   auto base = EventBaseManager::get()->getEventBase();
95
96   SocketAddress address;
97   server.getSockets()[0]->getAddress(&address);
98
99   TestClient client;
100   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
101
102   client.connect(address);
103
104   TestClient client2;
105   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
106   client2.connect(address);
107
108   base->loop();
109   server.stop();
110
111   CHECK(factory->pipelines == 2);
112 }
113
114 TEST(Bootstrap, ServerAcceptGroupTest) {
115   // Verify that server is using the accept IO group
116
117   TestServer server;
118   auto factory = std::make_shared<TestPipelineFactory>();
119   server.childPipeline(factory);
120   server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
121   server.bind(0);
122
123   SocketAddress address;
124   server.getSockets()[0]->getAddress(&address);
125
126   boost::barrier barrier(2);
127   auto thread = std::thread([&](){
128     TestClient client;
129     client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
130     client.connect(address);
131     EventBaseManager::get()->getEventBase()->loop();
132     barrier.wait();
133   });
134   barrier.wait();
135   server.stop();
136   thread.join();
137
138   CHECK(factory->pipelines == 1);
139 }
140
141 TEST(Bootstrap, ServerAcceptGroup2Test) {
142   // Verify that server is using the accept IO group
143
144   // Check if reuse port is supported, if not, don't run this test
145   try {
146     EventBase base;
147     auto serverSocket = AsyncServerSocket::newSocket(&base);
148     serverSocket->bind(0);
149     serverSocket->listen(0);
150     serverSocket->startAccepting();
151     serverSocket->setReusePortEnabled(true);
152     serverSocket->stopAccepting();
153   } catch(...) {
154     LOG(INFO) << "Reuse port probably not supported";
155     return;
156   }
157
158   TestServer server;
159   auto factory = std::make_shared<TestPipelineFactory>();
160   server.childPipeline(factory);
161   server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
162   server.bind(0);
163
164   SocketAddress address;
165   server.getSockets()[0]->getAddress(&address);
166
167   TestClient client;
168   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
169
170   client.connect(address);
171   EventBaseManager::get()->getEventBase()->loop();
172
173   server.stop();
174
175   CHECK(factory->pipelines == 1);
176 }
177
178 TEST(Bootstrap, SharedThreadPool) {
179   // Check if reuse port is supported, if not, don't run this test
180   try {
181     EventBase base;
182     auto serverSocket = AsyncServerSocket::newSocket(&base);
183     serverSocket->bind(0);
184     serverSocket->listen(0);
185     serverSocket->startAccepting();
186     serverSocket->setReusePortEnabled(true);
187     serverSocket->stopAccepting();
188   } catch(...) {
189     LOG(INFO) << "Reuse port probably not supported";
190     return;
191   }
192
193   auto pool = std::make_shared<IOThreadPoolExecutor>(2);
194
195   TestServer server;
196   auto factory = std::make_shared<TestPipelineFactory>();
197   server.childPipeline(factory);
198   server.group(pool, pool);
199
200   server.bind(0);
201
202   SocketAddress address;
203   server.getSockets()[0]->getAddress(&address);
204
205   TestClient client;
206   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
207   client.connect(address);
208
209   TestClient client2;
210   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
211   client2.connect(address);
212
213   TestClient client3;
214   client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
215   client3.connect(address);
216
217   TestClient client4;
218   client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
219   client4.connect(address);
220
221   TestClient client5;
222   client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
223   client5.connect(address);
224
225   EventBaseManager::get()->getEventBase()->loop();
226
227   server.stop();
228   CHECK(factory->pipelines == 5);
229 }