4bbd80c22f22a5442be0e84377e1ebe1f76598b8
[folly.git] / folly / wangle / bootstrap / BootstrapTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/ChannelHandler.h"
20
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23 #include <boost/thread.hpp>
24
25 using namespace folly::wangle;
26 using namespace folly;
27
28 typedef ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> Pipeline;
29
30 typedef ServerBootstrap<Pipeline> TestServer;
31 typedef ClientBootstrap<Pipeline> TestClient;
32
33 class TestClientPipelineFactory : public PipelineFactory<Pipeline> {
34  public:
35   Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
36     CHECK(sock->good());
37
38     // We probably aren't connected immedately, check after a small delay
39     EventBaseManager::get()->getEventBase()->runAfterDelay([sock](){
40       CHECK(sock->readable());
41     }, 100);
42     return nullptr;
43   }
44 };
45
46 class TestPipelineFactory : public PipelineFactory<Pipeline> {
47  public:
48   Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
49     pipelines++;
50     return new Pipeline();
51   }
52   std::atomic<int> pipelines{0};
53 };
54
55 TEST(Bootstrap, Basic) {
56   TestServer server;
57   TestClient client;
58 }
59
60 TEST(Bootstrap, ServerWithPipeline) {
61   TestServer server;
62   server.childPipeline(std::make_shared<TestPipelineFactory>());
63   server.bind(0);
64   server.stop();
65 }
66
67 TEST(Bootstrap, ClientServerTest) {
68   TestServer server;
69   auto factory = std::make_shared<TestPipelineFactory>();
70   server.childPipeline(factory);
71   server.bind(0);
72   auto base = EventBaseManager::get()->getEventBase();
73
74   SocketAddress address;
75   server.getSockets()[0]->getAddress(&address);
76
77   TestClient client;
78   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
79   client.connect(address);
80   base->loop();
81   server.stop();
82
83   CHECK(factory->pipelines == 1);
84 }
85
86 TEST(Bootstrap, ClientConnectionManagerTest) {
87   // Create a single IO thread, and verify that
88   // client connections are pooled properly
89
90   TestServer server;
91   auto factory = std::make_shared<TestPipelineFactory>();
92   server.childPipeline(factory);
93   server.group(std::make_shared<IOThreadPoolExecutor>(1));
94   server.bind(0);
95   auto base = EventBaseManager::get()->getEventBase();
96
97   SocketAddress address;
98   server.getSockets()[0]->getAddress(&address);
99
100   TestClient client;
101   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
102
103   client.connect(address);
104
105   TestClient client2;
106   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
107   client2.connect(address);
108
109   base->loop();
110   server.stop();
111
112   CHECK(factory->pipelines == 2);
113 }
114
115 TEST(Bootstrap, ServerAcceptGroupTest) {
116   // Verify that server is using the accept IO group
117
118   TestServer server;
119   auto factory = std::make_shared<TestPipelineFactory>();
120   server.childPipeline(factory);
121   server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
122   server.bind(0);
123
124   SocketAddress address;
125   server.getSockets()[0]->getAddress(&address);
126
127   boost::barrier barrier(2);
128   auto thread = std::thread([&](){
129     TestClient client;
130     client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
131     client.connect(address);
132     EventBaseManager::get()->getEventBase()->loop();
133     barrier.wait();
134   });
135   barrier.wait();
136   server.stop();
137   thread.join();
138
139   CHECK(factory->pipelines == 1);
140 }
141
142 TEST(Bootstrap, ServerAcceptGroup2Test) {
143   // Verify that server is using the accept IO group
144
145   // Check if reuse port is supported, if not, don't run this test
146   try {
147     EventBase base;
148     auto serverSocket = AsyncServerSocket::newSocket(&base);
149     serverSocket->bind(0);
150     serverSocket->listen(0);
151     serverSocket->startAccepting();
152     serverSocket->setReusePortEnabled(true);
153     serverSocket->stopAccepting();
154   } catch(...) {
155     LOG(INFO) << "Reuse port probably not supported";
156     return;
157   }
158
159   TestServer server;
160   auto factory = std::make_shared<TestPipelineFactory>();
161   server.childPipeline(factory);
162   server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
163   server.bind(0);
164
165   SocketAddress address;
166   server.getSockets()[0]->getAddress(&address);
167
168   TestClient client;
169   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
170
171   client.connect(address);
172   EventBaseManager::get()->getEventBase()->loop();
173
174   server.stop();
175
176   CHECK(factory->pipelines == 1);
177 }
178
179 TEST(Bootstrap, SharedThreadPool) {
180   // Check if reuse port is supported, if not, don't run this test
181   try {
182     EventBase base;
183     auto serverSocket = AsyncServerSocket::newSocket(&base);
184     serverSocket->bind(0);
185     serverSocket->listen(0);
186     serverSocket->startAccepting();
187     serverSocket->setReusePortEnabled(true);
188     serverSocket->stopAccepting();
189   } catch(...) {
190     LOG(INFO) << "Reuse port probably not supported";
191     return;
192   }
193
194   auto pool = std::make_shared<IOThreadPoolExecutor>(2);
195
196   TestServer server;
197   auto factory = std::make_shared<TestPipelineFactory>();
198   server.childPipeline(factory);
199   server.group(pool, pool);
200
201   server.bind(0);
202
203   SocketAddress address;
204   server.getSockets()[0]->getAddress(&address);
205
206   TestClient client;
207   client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
208   client.connect(address);
209
210   TestClient client2;
211   client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
212   client2.connect(address);
213
214   TestClient client3;
215   client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
216   client3.connect(address);
217
218   TestClient client4;
219   client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
220   client4.connect(address);
221
222   TestClient client5;
223   client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
224   client5.connect(address);
225
226   EventBaseManager::get()->getEventBase()->loop();
227
228   server.stop();
229   CHECK(factory->pipelines == 5);
230 }
231
232 TEST(Bootstrap, ExistingSocket) {
233   TestServer server;
234   auto factory = std::make_shared<TestPipelineFactory>();
235   server.childPipeline(factory);
236   folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
237   server.bind(std::move(socket));
238 }