SharedThreadPool unittest
[folly.git] / folly / wangle / bootstrap / BootstrapTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/wangle/bootstrap/ServerBootstrap.h"
18 #include "folly/wangle/bootstrap/ClientBootstrap.h"
19 #include "folly/wangle/channel/ChannelHandler.h"
20
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23
24 using namespace folly::wangle;
25 using namespace folly;
26
27 typedef ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> Pipeline;
28
29 class TestServer : public ServerBootstrap<Pipeline> {
30   Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
31     return nullptr;
32   }
33 };
34
35 class TestClient : public ClientBootstrap<Pipeline> {
36   Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
37     CHECK(sock->good());
38
39     // We probably aren't connected immedately, check after a small delay
40     EventBaseManager::get()->getEventBase()->runAfterDelay([sock](){
41       CHECK(sock->readable());
42     }, 100);
43     return nullptr;
44   }
45 };
46
47 class TestPipelineFactory : public PipelineFactory<Pipeline> {
48  public:
49   Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
50     pipelines++;
51     return new Pipeline();
52   }
53   std::atomic<int> pipelines{0};
54 };
55
56 TEST(Bootstrap, Basic) {
57   TestServer server;
58   TestClient client;
59 }
60
61 TEST(Bootstrap, ServerWithPipeline) {
62   TestServer server;
63   server.childPipeline(std::make_shared<TestPipelineFactory>());
64   server.bind(0);
65   server.stop();
66 }
67
68 TEST(Bootstrap, ClientServerTest) {
69   TestServer server;
70   auto factory = std::make_shared<TestPipelineFactory>();
71   server.childPipeline(factory);
72   server.bind(0);
73   auto base = EventBaseManager::get()->getEventBase();
74
75   SocketAddress address;
76   server.getSockets()[0]->getAddress(&address);
77
78   TestClient client;
79   client.connect(address);
80   base->loop();
81   server.stop();
82
83   CHECK(factory->pipelines == 1);
84 }
85
86 TEST(Bootstrap, ClientConnectionManagerTest) {
87   // Create a single IO thread, and verify that
88   // client connections are pooled properly
89
90   TestServer server;
91   auto factory = std::make_shared<TestPipelineFactory>();
92   server.childPipeline(factory);
93   server.group(std::make_shared<IOThreadPoolExecutor>(1));
94   server.bind(0);
95   auto base = EventBaseManager::get()->getEventBase();
96
97   SocketAddress address;
98   server.getSockets()[0]->getAddress(&address);
99
100   TestClient client;
101   client.connect(address);
102
103   TestClient client2;
104   client2.connect(address);
105
106   base->loop();
107   server.stop();
108
109   CHECK(factory->pipelines == 2);
110 }
111
112 TEST(Bootstrap, ServerAcceptGroupTest) {
113   // Verify that server is using the accept IO group
114
115   TestServer server;
116   auto factory = std::make_shared<TestPipelineFactory>();
117   server.childPipeline(factory);
118   server.group(std::make_shared<IOThreadPoolExecutor>(1), nullptr);
119   server.bind(0);
120
121   SocketAddress address;
122   server.getSockets()[0]->getAddress(&address);
123
124   boost::barrier barrier(2);
125   auto thread = std::thread([&](){
126     TestClient client;
127     client.connect(address);
128     EventBaseManager::get()->getEventBase()->loop();
129     barrier.wait();
130   });
131   barrier.wait();
132   server.stop();
133   thread.join();
134
135   CHECK(factory->pipelines == 1);
136 }
137
138 TEST(Bootstrap, ServerAcceptGroup2Test) {
139   // Verify that server is using the accept IO group
140
141   // Check if reuse port is supported, if not, don't run this test
142   try {
143     EventBase base;
144     auto serverSocket = AsyncServerSocket::newSocket(&base);
145     serverSocket->bind(0);
146     serverSocket->listen(0);
147     serverSocket->startAccepting();
148     serverSocket->setReusePortEnabled(true);
149     serverSocket->stopAccepting();
150   } catch(...) {
151     LOG(INFO) << "Reuse port probably not supported";
152     return;
153   }
154
155   TestServer server;
156   auto factory = std::make_shared<TestPipelineFactory>();
157   server.childPipeline(factory);
158   server.group(std::make_shared<IOThreadPoolExecutor>(4), nullptr);
159   server.bind(0);
160
161   SocketAddress address;
162   server.getSockets()[0]->getAddress(&address);
163
164   TestClient client;
165   client.connect(address);
166   EventBaseManager::get()->getEventBase()->loop();
167
168   server.stop();
169
170   CHECK(factory->pipelines == 1);
171 }
172
173 TEST(Bootstrap, SharedThreadPool) {
174   auto pool = std::make_shared<IOThreadPoolExecutor>(2);
175
176   TestServer server;
177   auto factory = std::make_shared<TestPipelineFactory>();
178   server.childPipeline(factory);
179   server.group(pool, pool);
180
181   server.bind(0);
182
183   SocketAddress address;
184   server.getSockets()[0]->getAddress(&address);
185
186   TestClient client;
187   client.connect(address);
188
189   TestClient client2;
190   client2.connect(address);
191
192   TestClient client3;
193   client3.connect(address);
194
195   TestClient client4;
196   client4.connect(address);
197
198   TestClient client5;
199   client5.connect(address);
200
201   EventBaseManager::get()->getEventBase()->loop();
202
203   server.stop();
204   CHECK(factory->pipelines == 5);
205 }